diff options
| author | MaZderMind <git@mazdermind.de> | 2015-09-02 15:19:30 +0200 |
|---|---|---|
| committer | MaZderMind <git@mazdermind.de> | 2015-09-02 15:19:30 +0200 |
| commit | 20d75f6c7f5cad2f5c1b4da71ad5405848230201 (patch) | |
| tree | 93894d7bc79a0b018275efbc0db524ac6f686570 /voctocore/lib/controlserver.py | |
| parent | a004948051e182edb3a7e40f1f7f2e14bebb0e0e (diff) | |
| parent | 8646386d09ab6cdabf0b8421cece5c1ddd69633f (diff) | |
Merge branch 'control-server-resilience'
Based on the work made by zuntrax & mithro at cccamp15
Diffstat (limited to 'voctocore/lib/controlserver.py')
| -rw-r--r-- | voctocore/lib/controlserver.py | 182 |
1 files changed, 103 insertions, 79 deletions
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 1a75a49..70b9213 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -1,9 +1,11 @@ #!/usr/bin/python3 import socket, logging, traceback +from queue import Queue from gi.repository import GObject from lib.commands import ControlServerCommands from lib.tcpmulticonnection import TCPMultiConnection +from lib.response import NotifyResponse, OkResponse class ControlServer(TCPMultiConnection): def __init__(self, pipeline): @@ -11,113 +13,135 @@ class ControlServer(TCPMultiConnection): self.log = logging.getLogger('ControlServer') super().__init__(port=9999) + self.command_queue = Queue() + self.commands = ControlServerCommands(pipeline) + GObject.idle_add(self.on_loop) + def on_accepted(self, conn, addr): '''Asynchronous connection listener. Starts a handler for each connection.''' - self.log.debug('Setting GObject io-watch on Connection') - GObject.io_add_watch(conn, GObject.IO_IN, self.on_data) + GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, ['']) + GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) - def on_data(self, conn, *args): - '''Asynchronous connection handler. Processes each line from the socket.''' - # construct a file-like object fro mthe socket - # to be able to read linewise and in utf-8 - filelike = conn.makefile('rw') - - # read a line from the socket - line = '' + def on_data(self, conn, _, leftovers, *args): + '''Asynchronous connection handler. Pushes data from socket + into command queue linewise''' + close_after = False try: - line = filelike.readline().strip() - except Exception as e: - self.log.warn("Can't read from socket: %s", e) + while True: + try: + leftovers.append(conn.recv(4096).decode(errors='replace')) + if len(leftovers[-1]) == 0: + self.log.info("Socket was closed") + leftovers.pop() + close_after = True + break - # no data = remote closed connection - if len(line) == 0: - self.close_connection(conn) - return False + except UnicodeDecodeError as e: + continue + except BlockingIOError as e: + pass - # 'quit' = remote wants us to close the connection - if line == 'quit': - self.log.info("Client asked us to close the Connection") - self.close_connection(conn) - return False + data = "".join(leftovers) + leftovers.clear() - # process the received line - success, msg = self.processLine(conn, line) + lines = data.split('\n') + for line in lines[:-1]: + self.log.debug("Got line: %r", line) - # success = False -> error - if success == False: - # on error-responses the message is mandatory - if msg is None: - msg = '<no message>' + line = line.strip() + # TODO: move quit to on_loop + # 'quit' = remote wants us to close the connection + if line == 'quit': + self.log.info("Client asked us to close the Connection") + self.close_connection(conn) + return False - # respond with 'error' and the message - filelike.write('error '+msg+'\n') - self.log.info("Function-Call returned an Error: %s", msg) + self.command_queue.put((line, conn)) - # keep on listening on that connection - return True - - # success = True and not message - if msg is None: - # respond with a simple 'ok' - filelike.write('ok\n') - else: - # respond with the returned message - filelike.write('ok '+msg+'\n') + if close_after: + self.close_connection(conn) + return False + self.log.debug("Remaining %r", lines[-1]) + leftovers.append(lines[-1]) return True - def processLine(self, conn, line): - # split line into command and optional args + def on_loop(self): + '''Command handler. Processes commands in the command queue whenever + nothing else is happening (registered as GObject idle callback)''' + if self.command_queue.empty(): + return True + line, requestor = self.command_queue.get() + words = line.split() + if len(words) < 1: + return True + command = words[0] args = words[1:] - # log function-call as parsed - self.log.info("Read Function-Call from %s: %s( %s )", conn.getpeername(), command, args) - - # check that the function-call is a known Command - if not hasattr(self.commands, command): - return False, 'unknown command %s' % command - + self.log.debug("Processing Command %r with args %s", command, args) try: - # fetch the function-pointer - f = getattr(self.commands, command) + command_function = self.commands.__class__.__dict__[command] + + except KeyError as e: + response = "error unknown command %s\n" % command - # call the function - ret = f(*args) + else: + try: + responseObject = command_function(self.commands, *args) - # signal method call to all other connected clients - # only signal set_* commands - if command.split('_')[0] in ["set", "message"]: - self.signal(conn, command, args) + except Exception as e: + message = str(e) or "<no message>" + response = "error %s\n" % message - # if it returned an iterable, probably (Success, Message), pass that on - if hasattr(ret, '__iter__'): - return ret else: - # otherwise construct a tuple - return (ret, None) + if isinstance(responseObject, NotifyResponse): + signal = "%s\n" % str(responseObject) + for conn, queue in self.currentConnections.items(): + if conn == requestor: + continue - except Exception as e: - self.log.error("Trapped Exception in Remote-Communication: %s", e) + queue.put(signal) + + response = "%s\n" % str(responseObject) + + finally: + if requestor in self.currentConnections: + self.currentConnections[requestor].put(response) + + return True + + def on_write(self, conn, *args): + # TODO: on_loop() is not called as soon as there is a writable socket + self.on_loop() - # In case of an Exception, return that - return False, str(e) + try: + queue = self.currentConnections[conn] + except KeyError: + return False - def signal(self, origin_conn, command, args): - for conn in self.currentConnections: - if conn == origin_conn: - continue + if queue.empty(): + return True - self.log.debug( - 'signaling connection %s the successful ' - 'execution of the command %s', - conn.getpeername(), command) + message = queue.get() + try: + conn.send(message.encode()) + except Exception as e: + self.log.warn(e) - conn.makefile('w').write( - "signal %s %s\n" % (command, ' '.join(args)) - ) + return True + + def notify_all(self, msg): + try: + words = msg.split() + words[-1] = self.commands.encodeSourceName(int(words[-1])) + msg = " ".join(words) + '\n' + for queue in self.currentConnections.values(): + queue.put(msg) + except Exception as e: + self.log.debug("Error during notify: %s", e) |
