aboutsummaryrefslogtreecommitdiff
path: root/voctocore/lib/controlserver.py
blob: 4e95e464a503b098da147eb3d5713e4e9596de8e (plain)
  1. #!/usr/bin/python3
  2. import socket, logging, traceback
  3. from queue import Queue
  4. from gi.repository import GObject
  5. from lib.commands import ControlServerCommands
  6. from lib.tcpmulticonnection import TCPMultiConnection
  7. class ControlServer(TCPMultiConnection):
  8. def __init__(self, pipeline):
  9. '''Initialize server and start listening.'''
  10. self.log = logging.getLogger('ControlServer')
  11. super().__init__(port=9999)
  12. self.command_queue = Queue()
  13. self.commands = ControlServerCommands(pipeline)
  14. GObject.idle_add(self.on_loop)
  15. def on_accepted(self, conn, addr):
  16. '''Asynchronous connection listener. Starts a handler for each connection.'''
  17. self.log.debug('Setting GObject io-watch on Connection')
  18. GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
  19. GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
  20. def on_data(self, conn, _, leftovers, *args):
  21. '''Asynchronous connection handler. Pushes data from socket
  22. into command queue linewise'''
  23. try:
  24. while True:
  25. try:
  26. leftovers.append(conn.recv(4096).decode(errors='replace'))
  27. if len(leftovers[-1]) == 0:
  28. self.log.info("Socket was closed")
  29. self.close_connection(conn)
  30. return False
  31. except UnicodeDecodeError as e:
  32. continue
  33. except BlockingIOError as e:
  34. pass
  35. data = "".join(leftovers)
  36. leftovers.clear()
  37. lines = data.split('\n')
  38. for line in lines[:-1]:
  39. self.log.debug("Got line: %r", line)
  40. line = line.strip()
  41. # TODO: move quit to on_loop
  42. # 'quit' = remote wants us to close the connection
  43. if line == 'quit':
  44. self.log.info("Client asked us to close the Connection")
  45. self.close_connection(conn)
  46. return False
  47. self.command_queue.put((line, conn))
  48. self.log.debug("Remaining %r", lines[-1])
  49. leftovers.append(lines[-1])
  50. return True
  51. def on_loop(self):
  52. '''Command handler. Processes commands in the command queue whenever
  53. nothing else is happening (registered as GObject idle callback)'''
  54. if self.command_queue.empty():
  55. return True
  56. line, requestor = self.command_queue.get()
  57. words = line.split()
  58. command = words[0]
  59. args = words[1:]
  60. try:
  61. f = self.commands.fetch(command)
  62. message, send_signals = f(*args)
  63. response = "ok %s\n" % message
  64. except Exception as e:
  65. message = str(e) or "<no message>"
  66. response = "error %s\n" % message
  67. else:
  68. if send_signals:
  69. signal = "signal %s\n" % line
  70. for conn, queue in self.currentConnections.items():
  71. if conn == requestor:
  72. continue
  73. queue.put(signal)
  74. finally:
  75. self.currentConnections[requestor].put(response)
  76. return True
  77. def on_write(self, conn, *args):
  78. # TODO: on_loop() is not called as soon as there is a writable socket
  79. self.on_loop()
  80. try:
  81. queue = self.currentConnections[conn]
  82. except KeyError:
  83. return False
  84. if queue.empty():
  85. return True
  86. message = queue.get()
  87. try:
  88. conn.send(message.encode())
  89. except Exception as e:
  90. self.log.warn(e)
  91. return True
  92. def notify_all(self, msg):
  93. try:
  94. words = msg.split()
  95. words[-1] = self.commands.encodeSourceName(int(words[-1]))
  96. msg = " ".join(words) + '\n'
  97. for queue in self.currentConnections.values():
  98. queue.put(msg)
  99. except Exception as e:
  100. self.log.debug("Error during notify: %s", e)