aboutsummaryrefslogtreecommitdiff
path: root/voctocore/lib/controlserver.py
blob: 42ae4b257ee52607a95b8462a6aeafbf85b23275 (plain)
  1. import socket
  2. import logging
  3. import traceback
  4. from queue import Queue
  5. from gi.repository import GObject
  6. from lib.commands import ControlServerCommands
  7. from lib.tcpmulticonnection import TCPMultiConnection
  8. from lib.response import NotifyResponse, OkResponse
  9. class ControlServer(TCPMultiConnection):
  10. def __init__(self, pipeline):
  11. '''Initialize server and start listening.'''
  12. self.log = logging.getLogger('ControlServer')
  13. super().__init__(port=9999)
  14. self.command_queue = Queue()
  15. self.commands = ControlServerCommands(pipeline)
  16. def on_accepted(self, conn, addr):
  17. '''Asynchronous connection listener.
  18. Starts a handler for each connection.'''
  19. self.log.debug('setting gobject io-watch on connection')
  20. GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
  21. def on_data(self, conn, _, leftovers, *args):
  22. '''Asynchronous connection handler.
  23. Pushes data from socket into command queue linewise'''
  24. close_after = False
  25. try:
  26. while True:
  27. try:
  28. leftovers.append(conn.recv(4096).decode(errors='replace'))
  29. if len(leftovers[-1]) == 0:
  30. self.log.info("Socket was closed")
  31. leftovers.pop()
  32. close_after = True
  33. break
  34. except UnicodeDecodeError as e:
  35. continue
  36. except:
  37. pass
  38. data = "".join(leftovers)
  39. del leftovers[:]
  40. lines = data.split('\n')
  41. for line in lines[:-1]:
  42. self.log.debug("got line: %r", line)
  43. line = line.strip()
  44. # 'quit' = remote wants us to close the connection
  45. if line == 'quit' or line == 'exit':
  46. self.log.info("Client asked us to close the Connection")
  47. self.close_connection(conn)
  48. return False
  49. self.log.debug('re-starting on_loop scheduling')
  50. GObject.idle_add(self.on_loop)
  51. self.command_queue.put((line, conn))
  52. if close_after:
  53. self.close_connection(conn)
  54. return False
  55. if lines[-1] != '':
  56. self.log.debug("remaining %r", lines[-1])
  57. leftovers.append(lines[-1])
  58. return True
  59. def on_loop(self):
  60. '''Command handler. Processes commands in the command queue whenever
  61. nothing else is happening (registered as GObject idle callback)'''
  62. self.log.debug('on_loop called')
  63. if self.command_queue.empty():
  64. self.log.debug('command_queue is empty again, '
  65. 'stopping on_loop scheduling')
  66. return False
  67. line, requestor = self.command_queue.get()
  68. words = line.split()
  69. if len(words) < 1:
  70. self.log.debug('command_queue is empty again, '
  71. 'stopping on_loop scheduling')
  72. return True
  73. command = words[0]
  74. args = words[1:]
  75. self.log.info("processing command %r with args %s", command, args)
  76. response = None
  77. try:
  78. # deny calling private methods
  79. if command[0] == '_':
  80. self.log.info('private methods are not callable')
  81. raise KeyError()
  82. command_function = self.commands.__class__.__dict__[command]
  83. except KeyError as e:
  84. self.log.info("received unknown command %s", command)
  85. response = "error unknown command %s\n" % command
  86. else:
  87. try:
  88. responseObject = command_function(self.commands, *args)
  89. except Exception as e:
  90. message = str(e) or "<no message>"
  91. response = "error %s\n" % message
  92. else:
  93. if isinstance(responseObject, NotifyResponse):
  94. responseObject = [responseObject]
  95. if isinstance(responseObject, list):
  96. for obj in responseObject:
  97. signal = "%s\n" % str(obj)
  98. for conn in self.currentConnections:
  99. self._schedule_write(conn, signal)
  100. else:
  101. response = "%s\n" % str(responseObject)
  102. finally:
  103. if response is not None and requestor in self.currentConnections:
  104. self._schedule_write(requestor, response)
  105. return False
  106. def _schedule_write(self, conn, message):
  107. queue = self.currentConnections[conn]
  108. self.log.debug('re-starting on_write[%u] scheduling', conn.fileno())
  109. GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
  110. queue.put(message)
  111. def on_write(self, conn, *args):
  112. self.log.debug('on_write[%u] called', conn.fileno())
  113. try:
  114. queue = self.currentConnections[conn]
  115. except KeyError:
  116. return False
  117. if queue.empty():
  118. self.log.debug('write_queue[%u] is empty again, '
  119. 'stopping on_write scheduling',
  120. conn.fileno())
  121. return False
  122. message = queue.get()
  123. try:
  124. conn.send(message.encode())
  125. except Exception as e:
  126. self.log.warn(e)
  127. return True
  128. def notify_all(self, msg):
  129. try:
  130. words = msg.split()
  131. words[-1] = self.commands.encodeSourceName(int(words[-1]))
  132. msg = " ".join(words) + '\n'
  133. for conn in self.currentConnections:
  134. self._schedule_write(conn, msg)
  135. except Exception as e:
  136. self.log.debug("error during notify: %s", e)