summaryrefslogtreecommitdiff
path: root/voctocore/lib/controlserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'voctocore/lib/controlserver.py')
-rw-r--r--voctocore/lib/controlserver.py301
1 files changed, 155 insertions, 146 deletions
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index 4c506be..42ae4b2 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -1,4 +1,6 @@
-import socket, logging, traceback
+import socket
+import logging
+import traceback
from queue import Queue
from gi.repository import GObject
@@ -6,164 +8,171 @@ from lib.commands import ControlServerCommands
from lib.tcpmulticonnection import TCPMultiConnection
from lib.response import NotifyResponse, OkResponse
-class ControlServer(TCPMultiConnection):
- def __init__(self, pipeline):
- '''Initialize server and start listening.'''
- self.log = logging.getLogger('ControlServer')
- super().__init__(port=9999)
-
- self.command_queue = Queue()
- self.commands = ControlServerCommands(pipeline)
-
- def on_accepted(self, conn, addr):
- '''Asynchronous connection listener. Starts a handler for each connection.'''
- self.log.debug('setting gobject io-watch on connection')
- GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
-
- def on_data(self, conn, _, leftovers, *args):
- '''Asynchronous connection handler. Pushes data from socket
- into command queue linewise'''
- close_after = False
- try:
- while True:
- try:
- leftovers.append(conn.recv(4096).decode(errors='replace'))
- if len(leftovers[-1]) == 0:
- self.log.info("Socket was closed")
- leftovers.pop()
- close_after = True
- break
-
- except UnicodeDecodeError as e:
- continue
- except:
- pass
+class ControlServer(TCPMultiConnection):
- data = "".join(leftovers)
- del leftovers[:]
+ def __init__(self, pipeline):
+ '''Initialize server and start listening.'''
+ self.log = logging.getLogger('ControlServer')
+ super().__init__(port=9999)
- lines = data.split('\n')
- for line in lines[:-1]:
- self.log.debug("got line: %r", line)
+ self.command_queue = Queue()
+
+ self.commands = ControlServerCommands(pipeline)
- line = line.strip()
- # 'quit' = remote wants us to close the connection
- if line == 'quit' or line == 'exit':
- self.log.info("Client asked us to close the Connection")
- self.close_connection(conn)
- return False
+ def on_accepted(self, conn, addr):
+ '''Asynchronous connection listener.
+ Starts a handler for each connection.'''
+ self.log.debug('setting gobject io-watch on connection')
+ GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
+
+ def on_data(self, conn, _, leftovers, *args):
+ '''Asynchronous connection handler.
+ Pushes data from socket into command queue linewise'''
+ close_after = False
+ try:
+ while True:
+ try:
+ leftovers.append(conn.recv(4096).decode(errors='replace'))
+ if len(leftovers[-1]) == 0:
+ self.log.info("Socket was closed")
+ leftovers.pop()
+ close_after = True
+ break
- self.log.debug('re-starting on_loop scheduling')
- GObject.idle_add(self.on_loop)
+ except UnicodeDecodeError as e:
+ continue
+ except:
+ pass
- self.command_queue.put((line, conn))
+ data = "".join(leftovers)
+ del leftovers[:]
- if close_after:
- self.close_connection(conn)
- return False
-
- if lines[-1] != '':
- self.log.debug("remaining %r", lines[-1])
-
- leftovers.append(lines[-1])
- return True
-
- def on_loop(self):
- '''Command handler. Processes commands in the command queue whenever
- nothing else is happening (registered as GObject idle callback)'''
-
- self.log.debug('on_loop called')
-
- if self.command_queue.empty():
- self.log.debug('command_queue is empty again, stopping on_loop scheduling')
- return False
-
- line, requestor = self.command_queue.get()
-
- words = line.split()
- if len(words) < 1:
- self.log.debug('command_queue is empty again, stopping on_loop scheduling')
- return True
-
- command = words[0]
- args = words[1:]
-
- self.log.info("processing command %r with args %s", command, args)
-
- response = None
- try:
- # deny calling private methods
- if command[0] == '_':
- self.log.info('private methods are not callable')
- raise KeyError()
-
- command_function = self.commands.__class__.__dict__[command]
-
- except KeyError as e:
- self.log.info("received unknown command %s", command)
- response = "error unknown command %s\n" % command
-
- else:
- try:
- responseObject = command_function(self.commands, *args)
-
- except Exception as e:
- message = str(e) or "<no message>"
- response = "error %s\n" % message
-
- else:
- if isinstance(responseObject, NotifyResponse):
- responseObject = [ responseObject ]
-
- if isinstance(responseObject, list):
- for obj in responseObject:
- signal = "%s\n" % str(obj)
- for conn in self.currentConnections:
- self._schedule_write(conn, signal)
- else:
- response = "%s\n" % str(responseObject)
-
- finally:
- if response is not None and requestor in self.currentConnections:
- self._schedule_write(requestor, response)
-
- return False
+ lines = data.split('\n')
+ for line in lines[:-1]:
+ self.log.debug("got line: %r", line)
- def _schedule_write(self, conn, message):
- queue = self.currentConnections[conn]
+ line = line.strip()
+ # 'quit' = remote wants us to close the connection
+ if line == 'quit' or line == 'exit':
+ self.log.info("Client asked us to close the Connection")
+ self.close_connection(conn)
+ return False
- self.log.debug('re-starting on_write[%u] scheduling', conn.fileno())
- GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
+ self.log.debug('re-starting on_loop scheduling')
+ GObject.idle_add(self.on_loop)
+
+ self.command_queue.put((line, conn))
+
+ if close_after:
+ self.close_connection(conn)
+ return False
- queue.put(message)
+ if lines[-1] != '':
+ self.log.debug("remaining %r", lines[-1])
+
+ leftovers.append(lines[-1])
+ return True
+
+ def on_loop(self):
+ '''Command handler. Processes commands in the command queue whenever
+ nothing else is happening (registered as GObject idle callback)'''
+
+ self.log.debug('on_loop called')
+
+ if self.command_queue.empty():
+ self.log.debug('command_queue is empty again, '
+ 'stopping on_loop scheduling')
+ return False
+
+ line, requestor = self.command_queue.get()
+
+ words = line.split()
+ if len(words) < 1:
+ self.log.debug('command_queue is empty again, '
+ 'stopping on_loop scheduling')
+ return True
+
+ command = words[0]
+ args = words[1:]
+
+ self.log.info("processing command %r with args %s", command, args)
+
+ response = None
+ try:
+ # deny calling private methods
+ if command[0] == '_':
+ self.log.info('private methods are not callable')
+ raise KeyError()
+
+ command_function = self.commands.__class__.__dict__[command]
+
+ except KeyError as e:
+ self.log.info("received unknown command %s", command)
+ response = "error unknown command %s\n" % command
+
+ else:
+ try:
+ responseObject = command_function(self.commands, *args)
+
+ except Exception as e:
+ message = str(e) or "<no message>"
+ response = "error %s\n" % message
+
+ else:
+ if isinstance(responseObject, NotifyResponse):
+ responseObject = [responseObject]
+
+ if isinstance(responseObject, list):
+ for obj in responseObject:
+ signal = "%s\n" % str(obj)
+ for conn in self.currentConnections:
+ self._schedule_write(conn, signal)
+ else:
+ response = "%s\n" % str(responseObject)
+
+ finally:
+ if response is not None and requestor in self.currentConnections:
+ self._schedule_write(requestor, response)
+
+ return False
+
+ def _schedule_write(self, conn, message):
+ queue = self.currentConnections[conn]
- def on_write(self, conn, *args):
- self.log.debug('on_write[%u] called', conn.fileno())
+ self.log.debug('re-starting on_write[%u] scheduling', conn.fileno())
+ GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
- try:
- queue = self.currentConnections[conn]
- except KeyError:
- return False
+ queue.put(message)
- if queue.empty():
- self.log.debug('write_queue[%u] is empty again, stopping on_write scheduling', conn.fileno())
- return False
+ def on_write(self, conn, *args):
+ self.log.debug('on_write[%u] called', conn.fileno())
- message = queue.get()
- try:
- conn.send(message.encode())
- except Exception as e:
- self.log.warn(e)
+ try:
+ queue = self.currentConnections[conn]
+ except KeyError:
+ return False
+
+ if queue.empty():
+ self.log.debug('write_queue[%u] is empty again, '
+ 'stopping on_write scheduling',
+ conn.fileno())
+ return False
- return True
+ message = queue.get()
+ try:
+ conn.send(message.encode())
+ except Exception as e:
+ self.log.warn(e)
+
+ return True
- def notify_all(self, msg):
- try:
- words = msg.split()
- words[-1] = self.commands.encodeSourceName(int(words[-1]))
- msg = " ".join(words) + '\n'
- for conn in self.currentConnections:
- self._schedule_write(conn, msg)
- except Exception as e:
- self.log.debug("error during notify: %s", e)
+ def notify_all(self, msg):
+ try:
+ words = msg.split()
+ words[-1] = self.commands.encodeSourceName(int(words[-1]))
+ msg = " ".join(words) + '\n'
+ for conn in self.currentConnections:
+ self._schedule_write(conn, msg)
+ except Exception as e:
+ self.log.debug("error during notify: %s", e)