aboutsummaryrefslogtreecommitdiff
path: root/voctocore/lib/controlserver.py
blob: e0632fcbdfb3d51201a0b44ac75495d672a1fa7d (plain)
  1. #!/usr/bin/python3
  2. import socket, logging, traceback
  3. from queue import Queue
  4. from gi.repository import GObject
  5. from lib.commands import ControlServerCommands
  6. from lib.tcpmulticonnection import TCPMultiConnection
  7. from lib.response import NotifyResponse, OkResponse
  8. class ControlServer(TCPMultiConnection):
  9. def __init__(self, pipeline):
  10. '''Initialize server and start listening.'''
  11. self.log = logging.getLogger('ControlServer')
  12. super().__init__(port=9999)
  13. self.command_queue = Queue()
  14. self.commands = ControlServerCommands(pipeline)
  15. GObject.idle_add(self.on_loop)
  16. def on_accepted(self, conn, addr):
  17. '''Asynchronous connection listener. Starts a handler for each connection.'''
  18. self.log.debug('Setting GObject io-watch on Connection')
  19. GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
  20. GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
  21. def on_data(self, conn, _, leftovers, *args):
  22. '''Asynchronous connection handler. Pushes data from socket
  23. into command queue linewise'''
  24. try:
  25. while True:
  26. try:
  27. leftovers.append(conn.recv(4096).decode(errors='replace'))
  28. if len(leftovers[-1]) == 0:
  29. self.log.info("Socket was closed")
  30. self.close_connection(conn)
  31. return False
  32. except UnicodeDecodeError as e:
  33. continue
  34. except BlockingIOError as e:
  35. pass
  36. data = "".join(leftovers)
  37. leftovers.clear()
  38. lines = data.split('\n')
  39. for line in lines[:-1]:
  40. self.log.debug("Got line: %r", line)
  41. line = line.strip()
  42. # TODO: move quit to on_loop
  43. # 'quit' = remote wants us to close the connection
  44. if line == 'quit':
  45. self.log.info("Client asked us to close the Connection")
  46. self.close_connection(conn)
  47. return False
  48. self.command_queue.put((line, conn))
  49. self.log.debug("Remaining %r", lines[-1])
  50. leftovers.append(lines[-1])
  51. return True
  52. def on_loop(self):
  53. '''Command handler. Processes commands in the command queue whenever
  54. nothing else is happening (registered as GObject idle callback)'''
  55. if self.command_queue.empty():
  56. return True
  57. line, requestor = self.command_queue.get()
  58. words = line.split()
  59. command = words[0]
  60. args = words[1:]
  61. try:
  62. command_function = self.commands.__class__.__dict__[command]
  63. except KeyError as e:
  64. response = "error unknown command %s\n" % command
  65. else:
  66. try:
  67. responseObject = command_function(self.commands, *args)
  68. except Exception as e:
  69. message = str(e) or "<no message>"
  70. response = "error %s\n" % message
  71. else:
  72. if isinstance(responseObject, NotifyResponse):
  73. signal = "%s\n" % str(responseObject)
  74. for conn, queue in self.currentConnections.items():
  75. if conn == requestor:
  76. continue
  77. queue.put(signal)
  78. response = "%s\n" % str(responseObject)
  79. finally:
  80. self.currentConnections[requestor].put(response)
  81. return True
  82. def on_write(self, conn, *args):
  83. # TODO: on_loop() is not called as soon as there is a writable socket
  84. self.on_loop()
  85. try:
  86. queue = self.currentConnections[conn]
  87. except KeyError:
  88. return False
  89. if queue.empty():
  90. return True
  91. message = queue.get()
  92. try:
  93. conn.send(message.encode())
  94. except Exception as e:
  95. self.log.warn(e)
  96. return True
  97. def notify_all(self, msg):
  98. try:
  99. words = msg.split()
  100. words[-1] = self.commands.encodeSourceName(int(words[-1]))
  101. msg = " ".join(words) + '\n'
  102. for queue in self.currentConnections.values():
  103. queue.put(msg)
  104. except Exception as e:
  105. self.log.debug("Error during notify: %s", e)