aboutsummaryrefslogtreecommitdiff
path: root/voctocore/lib/controlserver.py
blob: b0fe3d56df48f5d415535c30dfb6fedeaeab38a6 (plain)
  1. #!/usr/bin/python3
  2. import socket, logging, traceback
  3. from queue import Queue
  4. from gi.repository import GObject
  5. from lib.commands import ControlServerCommands
  6. from lib.tcpmulticonnection import TCPMultiConnection
  7. from lib.response import NotifyResponse, OkResponse
  8. class ControlServer(TCPMultiConnection):
  9. def __init__(self, pipeline):
  10. '''Initialize server and start listening.'''
  11. self.log = logging.getLogger('ControlServer')
  12. super().__init__(port=9999)
  13. self.command_queue = Queue()
  14. self.commands = ControlServerCommands(pipeline)
  15. GObject.idle_add(self.on_loop)
  16. def on_accepted(self, conn, addr):
  17. '''Asynchronous connection listener. Starts a handler for each connection.'''
  18. self.log.debug('setting gobject io-watch on connection')
  19. GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
  20. GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
  21. def on_data(self, conn, _, leftovers, *args):
  22. '''Asynchronous connection handler. Pushes data from socket
  23. into command queue linewise'''
  24. close_after = False
  25. try:
  26. while True:
  27. try:
  28. leftovers.append(conn.recv(4096).decode(errors='replace'))
  29. if len(leftovers[-1]) == 0:
  30. self.log.info("Socket was closed")
  31. leftovers.pop()
  32. close_after = True
  33. break
  34. except UnicodeDecodeError as e:
  35. continue
  36. except:
  37. pass
  38. data = "".join(leftovers)
  39. del leftovers[:]
  40. lines = data.split('\n')
  41. for line in lines[:-1]:
  42. self.log.debug("got line: %r", line)
  43. line = line.strip()
  44. # TODO: move quit to on_loop
  45. # 'quit' = remote wants us to close the connection
  46. if line == 'quit':
  47. self.log.info("Client asked us to close the Connection")
  48. self.close_connection(conn)
  49. return False
  50. self.command_queue.put((line, conn))
  51. if close_after:
  52. self.close_connection(conn)
  53. return False
  54. if lines[-1] != '':
  55. self.log.debug("remaining %r", lines[-1])
  56. leftovers.append(lines[-1])
  57. return True
  58. def on_loop(self):
  59. '''Command handler. Processes commands in the command queue whenever
  60. nothing else is happening (registered as GObject idle callback)'''
  61. if self.command_queue.empty():
  62. return True
  63. line, requestor = self.command_queue.get()
  64. words = line.split()
  65. if len(words) < 1:
  66. return True
  67. command = words[0]
  68. args = words[1:]
  69. self.log.info("processing command %r with args %s", command, args)
  70. response = None
  71. try:
  72. # deny calling private methods
  73. if command[0] == '_':
  74. self.log.info('private methods are not callable')
  75. raise KeyError()
  76. command_function = self.commands.__class__.__dict__[command]
  77. except KeyError as e:
  78. self.log.info("received unknown command %s", command)
  79. response = "error unknown command %s\n" % command
  80. else:
  81. try:
  82. responseObject = command_function(self.commands, *args)
  83. except Exception as e:
  84. message = str(e) or "<no message>"
  85. response = "error %s\n" % message
  86. else:
  87. if isinstance(responseObject, NotifyResponse):
  88. responseObject = [ responseObject ]
  89. if isinstance(responseObject, list):
  90. for obj in responseObject:
  91. signal = "%s\n" % str(obj)
  92. for conn, queue in self.currentConnections.items():
  93. queue.put(signal)
  94. else:
  95. response = "%s\n" % str(responseObject)
  96. finally:
  97. if response is not None and requestor in self.currentConnections:
  98. self.currentConnections[requestor].put(response)
  99. return True
  100. def on_write(self, conn, *args):
  101. # TODO: on_loop() is not called as soon as there is a writable socket
  102. self.on_loop()
  103. try:
  104. queue = self.currentConnections[conn]
  105. except KeyError:
  106. return False
  107. if queue.empty():
  108. return True
  109. message = queue.get()
  110. try:
  111. conn.send(message.encode())
  112. except Exception as e:
  113. self.log.warn(e)
  114. return True
  115. def notify_all(self, msg):
  116. try:
  117. words = msg.split()
  118. words[-1] = self.commands.encodeSourceName(int(words[-1]))
  119. msg = " ".join(words) + '\n'
  120. for queue in self.currentConnections.values():
  121. queue.put(msg)
  122. except Exception as e:
  123. self.log.debug("error during notify: %s", e)