summaryrefslogtreecommitdiff
path: root/voctocore/lib/controlserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'voctocore/lib/controlserver.py')
-rw-r--r--voctocore/lib/controlserver.py186
1 files changed, 105 insertions, 81 deletions
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index 1a75a49..70b9213 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -1,9 +1,11 @@
#!/usr/bin/python3
import socket, logging, traceback
+from queue import Queue
from gi.repository import GObject
from lib.commands import ControlServerCommands
from lib.tcpmulticonnection import TCPMultiConnection
+from lib.response import NotifyResponse, OkResponse
class ControlServer(TCPMultiConnection):
def __init__(self, pipeline):
@@ -11,113 +13,135 @@ class ControlServer(TCPMultiConnection):
self.log = logging.getLogger('ControlServer')
super().__init__(port=9999)
+ self.command_queue = Queue()
+
self.commands = ControlServerCommands(pipeline)
+ GObject.idle_add(self.on_loop)
+
def on_accepted(self, conn, addr):
'''Asynchronous connection listener. Starts a handler for each connection.'''
-
self.log.debug('Setting GObject io-watch on Connection')
- GObject.io_add_watch(conn, GObject.IO_IN, self.on_data)
+ GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
+ GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
- def on_data(self, conn, *args):
- '''Asynchronous connection handler. Processes each line from the socket.'''
- # construct a file-like object fro mthe socket
- # to be able to read linewise and in utf-8
- filelike = conn.makefile('rw')
-
- # read a line from the socket
- line = ''
+ def on_data(self, conn, _, leftovers, *args):
+ '''Asynchronous connection handler. Pushes data from socket
+ into command queue linewise'''
+ close_after = False
try:
- line = filelike.readline().strip()
- except Exception as e:
- self.log.warn("Can't read from socket: %s", e)
-
- # no data = remote closed connection
- if len(line) == 0:
+ while True:
+ try:
+ leftovers.append(conn.recv(4096).decode(errors='replace'))
+ if len(leftovers[-1]) == 0:
+ self.log.info("Socket was closed")
+ leftovers.pop()
+ close_after = True
+ break
+
+ except UnicodeDecodeError as e:
+ continue
+ except BlockingIOError as e:
+ pass
+
+ data = "".join(leftovers)
+ leftovers.clear()
+
+ lines = data.split('\n')
+ for line in lines[:-1]:
+ self.log.debug("Got line: %r", line)
+
+ line = line.strip()
+ # TODO: move quit to on_loop
+ # 'quit' = remote wants us to close the connection
+ if line == 'quit':
+ self.log.info("Client asked us to close the Connection")
+ self.close_connection(conn)
+ return False
+
+ self.command_queue.put((line, conn))
+
+ if close_after:
self.close_connection(conn)
return False
- # 'quit' = remote wants us to close the connection
- if line == 'quit':
- self.log.info("Client asked us to close the Connection")
- self.close_connection(conn)
- return False
+ self.log.debug("Remaining %r", lines[-1])
+ leftovers.append(lines[-1])
+ return True
- # process the received line
- success, msg = self.processLine(conn, line)
+ def on_loop(self):
+ '''Command handler. Processes commands in the command queue whenever
+ nothing else is happening (registered as GObject idle callback)'''
+ if self.command_queue.empty():
+ return True
+ line, requestor = self.command_queue.get()
- # success = False -> error
- if success == False:
- # on error-responses the message is mandatory
- if msg is None:
- msg = '<no message>'
+ words = line.split()
+ if len(words) < 1:
+ return True
- # respond with 'error' and the message
- filelike.write('error '+msg+'\n')
- self.log.info("Function-Call returned an Error: %s", msg)
+ command = words[0]
+ args = words[1:]
- # keep on listening on that connection
- return True
+ self.log.debug("Processing Command %r with args %s", command, args)
- # success = True and not message
- if msg is None:
- # respond with a simple 'ok'
- filelike.write('ok\n')
- else:
- # respond with the returned message
- filelike.write('ok '+msg+'\n')
+ try:
+ command_function = self.commands.__class__.__dict__[command]
- return True
+ except KeyError as e:
+ response = "error unknown command %s\n" % command
- def processLine(self, conn, line):
- # split line into command and optional args
- words = line.split()
- command = words[0]
- args = words[1:]
+ else:
+ try:
+ responseObject = command_function(self.commands, *args)
- # log function-call as parsed
- self.log.info("Read Function-Call from %s: %s( %s )", conn.getpeername(), command, args)
+ except Exception as e:
+ message = str(e) or "<no message>"
+ response = "error %s\n" % message
- # check that the function-call is a known Command
- if not hasattr(self.commands, command):
- return False, 'unknown command %s' % command
+ else:
+ if isinstance(responseObject, NotifyResponse):
+ signal = "%s\n" % str(responseObject)
+ for conn, queue in self.currentConnections.items():
+ if conn == requestor:
+ continue
+ queue.put(signal)
- try:
- # fetch the function-pointer
- f = getattr(self.commands, command)
+ response = "%s\n" % str(responseObject)
- # call the function
- ret = f(*args)
+ finally:
+ if requestor in self.currentConnections:
+ self.currentConnections[requestor].put(response)
- # signal method call to all other connected clients
- # only signal set_* commands
- if command.split('_')[0] in ["set", "message"]:
- self.signal(conn, command, args)
+ return True
- # if it returned an iterable, probably (Success, Message), pass that on
- if hasattr(ret, '__iter__'):
- return ret
- else:
- # otherwise construct a tuple
- return (ret, None)
+ def on_write(self, conn, *args):
+ # TODO: on_loop() is not called as soon as there is a writable socket
+ self.on_loop()
- except Exception as e:
- self.log.error("Trapped Exception in Remote-Communication: %s", e)
+ try:
+ queue = self.currentConnections[conn]
+ except KeyError:
+ return False
- # In case of an Exception, return that
- return False, str(e)
+ if queue.empty():
+ return True
- def signal(self, origin_conn, command, args):
- for conn in self.currentConnections:
- if conn == origin_conn:
- continue
+ message = queue.get()
+ try:
+ conn.send(message.encode())
+ except Exception as e:
+ self.log.warn(e)
- self.log.debug(
- 'signaling connection %s the successful '
- 'execution of the command %s',
- conn.getpeername(), command)
+ return True
- conn.makefile('w').write(
- "signal %s %s\n" % (command, ' '.join(args))
- )
+ def notify_all(self, msg):
+ try:
+ words = msg.split()
+ words[-1] = self.commands.encodeSourceName(int(words[-1]))
+ msg = " ".join(words) + '\n'
+ for queue in self.currentConnections.values():
+ queue.put(msg)
+ except Exception as e:
+ self.log.debug("Error during notify: %s", e)