From 46aace37db5c035a6d6b432db261dc7c417456f4 Mon Sep 17 00:00:00 2001 From: Markus Otto Date: Sat, 22 Aug 2015 19:15:22 +0200 Subject: make protocol work, some gui stuff --- voctocore/lib/controlserver.py | 173 +++++++++++++++++++++-------------------- 1 file changed, 87 insertions(+), 86 deletions(-) (limited to 'voctocore/lib/controlserver.py') diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 1a75a49..4e95e46 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 import socket, logging, traceback +from queue import Queue from gi.repository import GObject from lib.commands import ControlServerCommands @@ -11,113 +12,113 @@ class ControlServer(TCPMultiConnection): self.log = logging.getLogger('ControlServer') super().__init__(port=9999) + self.command_queue = Queue() + self.commands = ControlServerCommands(pipeline) + GObject.idle_add(self.on_loop) + def on_accepted(self, conn, addr): '''Asynchronous connection listener. Starts a handler for each connection.''' - self.log.debug('Setting GObject io-watch on Connection') - GObject.io_add_watch(conn, GObject.IO_IN, self.on_data) - - def on_data(self, conn, *args): - '''Asynchronous connection handler. Processes each line from the socket.''' - # construct a file-like object fro mthe socket - # to be able to read linewise and in utf-8 - filelike = conn.makefile('rw') + GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, ['']) + GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) - # read a line from the socket - line = '' + def on_data(self, conn, _, leftovers, *args): + '''Asynchronous connection handler. Pushes data from socket + into command queue linewise''' try: - line = filelike.readline().strip() - except Exception as e: - self.log.warn("Can't read from socket: %s", e) - - # no data = remote closed connection - if len(line) == 0: - self.close_connection(conn) - return False - - # 'quit' = remote wants us to close the connection - if line == 'quit': - self.log.info("Client asked us to close the Connection") - self.close_connection(conn) - return False - - # process the received line - success, msg = self.processLine(conn, line) - - # success = False -> error - if success == False: - # on error-responses the message is mandatory - if msg is None: - msg = '' - - # respond with 'error' and the message - filelike.write('error '+msg+'\n') - self.log.info("Function-Call returned an Error: %s", msg) + while True: + try: + leftovers.append(conn.recv(4096).decode(errors='replace')) + if len(leftovers[-1]) == 0: + self.log.info("Socket was closed") + self.close_connection(conn) + return False + except UnicodeDecodeError as e: + continue + except BlockingIOError as e: + pass + + data = "".join(leftovers) + leftovers.clear() + + lines = data.split('\n') + for line in lines[:-1]: + self.log.debug("Got line: %r", line) + + line = line.strip() + # TODO: move quit to on_loop + # 'quit' = remote wants us to close the connection + if line == 'quit': + self.log.info("Client asked us to close the Connection") + self.close_connection(conn) + return False + + self.command_queue.put((line, conn)) + + self.log.debug("Remaining %r", lines[-1]) + leftovers.append(lines[-1]) + return True - # keep on listening on that connection + def on_loop(self): + '''Command handler. Processes commands in the command queue whenever + nothing else is happening (registered as GObject idle callback)''' + if self.command_queue.empty(): return True + line, requestor = self.command_queue.get() - # success = True and not message - if msg is None: - # respond with a simple 'ok' - filelike.write('ok\n') - else: - # respond with the returned message - filelike.write('ok '+msg+'\n') - - return True - - def processLine(self, conn, line): - # split line into command and optional args words = line.split() command = words[0] args = words[1:] - # log function-call as parsed - self.log.info("Read Function-Call from %s: %s( %s )", conn.getpeername(), command, args) + try: + f = self.commands.fetch(command) + message, send_signals = f(*args) + response = "ok %s\n" % message - # check that the function-call is a known Command - if not hasattr(self.commands, command): - return False, 'unknown command %s' % command + except Exception as e: + message = str(e) or "" + response = "error %s\n" % message + else: + if send_signals: + signal = "signal %s\n" % line + for conn, queue in self.currentConnections.items(): + if conn == requestor: + continue + queue.put(signal) - try: - # fetch the function-pointer - f = getattr(self.commands, command) + finally: + self.currentConnections[requestor].put(response) - # call the function - ret = f(*args) + return True - # signal method call to all other connected clients - # only signal set_* commands - if command.split('_')[0] in ["set", "message"]: - self.signal(conn, command, args) + def on_write(self, conn, *args): + # TODO: on_loop() is not called as soon as there is a writable socket + self.on_loop() - # if it returned an iterable, probably (Success, Message), pass that on - if hasattr(ret, '__iter__'): - return ret - else: - # otherwise construct a tuple - return (ret, None) + try: + queue = self.currentConnections[conn] + except KeyError: + return False + if queue.empty(): + return True + message = queue.get() + try: + conn.send(message.encode()) except Exception as e: - self.log.error("Trapped Exception in Remote-Communication: %s", e) - - # In case of an Exception, return that - return False, str(e) - - def signal(self, origin_conn, command, args): - for conn in self.currentConnections: - if conn == origin_conn: - continue + self.log.warn(e) - self.log.debug( - 'signaling connection %s the successful ' - 'execution of the command %s', - conn.getpeername(), command) + return True - conn.makefile('w').write( - "signal %s %s\n" % (command, ' '.join(args)) - ) + def notify_all(self, msg): + try: + words = msg.split() + words[-1] = self.commands.encodeSourceName(int(words[-1])) + msg = " ".join(words) + '\n' + for queue in self.currentConnections.values(): + queue.put(msg) + except Exception as e: + self.log.debug("Error during notify: %s", e) -- cgit v1.2.3 From c61fe2b667079168387376da1c09823967476b21 Mon Sep 17 00:00:00 2001 From: MaZderMind Date: Wed, 2 Sep 2015 15:08:49 +0200 Subject: refactor commands and notify code --- voctocore/lib/commands.py | 207 ++++++++++++++++++++++------------------- voctocore/lib/controlserver.py | 33 ++++--- voctocore/lib/notifications.py | 14 --- voctocore/lib/response.py | 15 +++ voctocore/lib/videomix.py | 3 - voctocore/voctocore.py | 3 - 6 files changed, 149 insertions(+), 126 deletions(-) delete mode 100644 voctocore/lib/notifications.py create mode 100644 voctocore/lib/response.py (limited to 'voctocore/lib/controlserver.py') diff --git a/voctocore/lib/commands.py b/voctocore/lib/commands.py index c972c69..dd4ef0d 100644 --- a/voctocore/lib/commands.py +++ b/voctocore/lib/commands.py @@ -2,127 +2,146 @@ import logging import json - from lib.config import Config from lib.videomix import CompositeModes +from lib.response import NotifyResponse, OkResponse -class ControlServerCommands(): - def __init__(self, pipeline): - self.log = logging.getLogger('ControlServerCommands') +def decodeName(items, name_or_id): + try: + name_or_id = int(name_or_id) + if name_or_id < 0 or name_or_id >= len(items): + raise IndexError("unknown index %d" % name_or_id) - self.pipeline = pipeline - self.sources = Config.getlist('mix', 'sources') - self.blankersources = Config.getlist('stream-blanker', 'sources') + return name_or_id - def decodeSourceName(self, src_name_or_id): - if isinstance(src_name_or_id, str): - try: - return self.sources.index(src_name_or_id) - except Exception as e: - raise IndexError("source %s unknown" % src_name_or_id) + except ValueError as e: + try: + return items.index(name_or_id) + + except ValueError as e: + raise IndexError("unknown name %s" % name_or_id) + +def decodeEnumName(enum, name_or_id): + try: + name_or_id = int(name_or_id) + if name_or_id < 0 or name_or_id >= len(enum): + raise IndexError("unknown index %d" % name_or_id) - if src_name_or_id < 0 or src_name_or_id >= len(self.sources): - raise IndexError("source %s unknown" % src_name_or_id) + return name_or_id - def encodeSourceName(self, src_id): + except ValueError as e: try: - return self.sources[src_id] - except Exception as e: - raise IndexError("source %s unknown" % src_id) + return enum[name_or_id] - def decodeBlankerSourceName(self, src_name_or_id): - if isinstance(src_name_or_id, str): - try: - return self.blankersources.index(src_name_or_id) - except Exception as e: - raise IndexError("source %s unknown" % src_name_or_id) + except KeyError as e: + raise IndexError("unknown name %s" % name_or_id) - if src_name_or_id < 0 or src_name_or_id >= len(self.blankersources): - raise IndexError("source %s unknown" % src_name_or_id) +def encodeName(items, id): + try: + return items[id] + except IndexError as e: + raise IndexError("unknown index %d" % id) - def encodeBlankerSourceName(self, src_id): - try: - return self.blankersources[src_id] - except Exception as e: - raise IndexError("source %s unknown" % src_id) +def encodeEnumName(enum, id): + try: + return enum(id).name + except ValueError as e: + raise IndexError("unknown index %d" % id) + +class ControlServerCommands(object): + def __init__(self, pipeline): + self.log = logging.getLogger('ControlServerCommands') + + self.pipeline = pipeline + + self.sources = Config.getlist('mix', 'sources') + self.blankerSources = Config.getlist('stream-blanker', 'sources') # Commands are defined below. Errors are sent to the clients by throwing # exceptions, they will be turned into messages outside. - def fetch(self, command): - if command not in ['set', 'get', 'message', 'signal']: - raise Exception("unknown command") - return getattr(self, command) - def message(self, *args): - return " ".join(args), True + return NotifyResponse('message', args) - def signal(self, *args): - return "", True - def get(self, subcommand, *args, signal=False): - return getattr(self, "_get_"+subcommand)(*args), signal + def _get_video_status(self): + a = encodeName( self.sources, self.pipeline.vmix.getVideoSourceA() ) + b = encodeName( self.sources, self.pipeline.vmix.getVideoSourceB() ) + return [a, b] - def set(self, subcommand, *args): - getattr(self, "_set_"+subcommand)(*args) - return self.get(subcommand, *args, signal=True) + def get_video(self): + status = self._get_video_status() + return OkResponse('videoStatus', *status) - def _get_video(self, target, _=None): - if target not in ['a', 'b']: - raise Exception("invalid video source name: 'a' or 'b' expected.") - cmd = "getVideoSource" + target.upper() - src_id = getattr(self.pipeline.vmix, cmd)() - return self.encodeSourceName(src_id) + def set_video_a(self, src_name_or_id): + src_id = decodeName(self.sources, src_name_or_id) + self.pipeline.vmix.setVideoSourceA(src_id) - def _set_video(self, target, src_name_or_id): - if target not in ['a', 'b']: - raise Exception("invalid video source name: 'a' or 'b' expected.") - src_id = self.decodeSourceName(src_name_or_id) - getattr(self.pipeline.vmix, 'setVideoSource' + target.upper())(src_id) + status = self._get_video_status() + return NotifyResponse('videoStatus', *status) - def _set_audio(self, src_name_or_id): - src_id = self.decodeSourceName(src_name_or_id) - self.pipeline.amix.setAudioSource(src_id) + def set_video_b(self, src_name_or_id): + src_id = decodeName(self.sources, src_name_or_id) + self.pipeline.vmix.setVideoSourceB(src_id) + + status = self._get_video_status() + return NotifyResponse('videoStatus', *status) - def _get_audio(self, _=None): + + def _get_audio_status(self): src_id = self.pipeline.amix.getAudioSource() - return self.encodeSourceName(src_id) + return encodeName(self.sources, src_id) - def _set_composite(self, composite_mode): - try: - mode = CompositeModes[composite_mode] - except KeyError as e: - raise KeyError("composite-mode %s unknown" % composite_mode) + def get_audio(self): + status = self._get_audio_status() + return OkResponse('audioStatus', status) + + def set_audio(self, src_name_or_id): + src_id = decodeName(self.sources, src_name_or_id) + self.pipeline.amix.setAudioSource(src_id) + status = self._get_audio_status() + return NotifyResponse('audioStatus', status) + + + def _get_composite_status(self): + mode = self.pipeline.vmix.getCompositeMode() + return encodeEnumName(CompositeModes, mode) + + def get_composite(self): + status = self._get_composite_status() + return OkResponse('compositeMode', status) + + def set_composite(self, mode_name_or_id): + mode = decodeEnumName(CompositeModes, mode_name_or_id) self.pipeline.vmix.setCompositeMode(mode) - def _get_composite(self, _=None): - try: - mode = self.pipeline.vmix.getCompositeMode() - return mode.name - except Exception as e: - raise KeyError("composite-mode %s unknown" % mode) + status = self._get_composite_status() + return NotifyResponse('compositeMode', status) - def _set_status(self, *args): - try: - if args[0] == "live": - self.pipeline.streamblanker.setBlankSource(None) - elif args [0] == "blank": - src_id = self.decodeBlankerSourceName(args[1]) - self.pipeline.streamblanker.setBlankSource(src_id) - else: - raise IndexError() - except IndexError as e: - raise Exception("invocation: set_status (live | blank )") - - def _get_status(self, *args): - if self.pipeline.streamblanker.blankSource is None: - return "live" - - name = self.encodeBlankerSourceName(self.pipeline.streamblanker.blankSource) - return "blank " + name - - def _get_config(self): - confdict = {k: dict(v) for k, v in dict(Config).items()} - return json.dumps(confdict) + def _get_stream_status(self): + blankSource = self.pipeline.streamblanker.blankSource + return encodeName(self.blankerSources, blankSource) + + def get_stream_status(self): + status = self._get_stream_status() + return OkResponse('streamStatus', status) + + def set_stream_blank(self, source_name_or_id): + src_id = decodeName(self.blankerSources, source_name_or_id) + self.pipeline.streamblanker.setBlankSource(src_id) + + status = self._get_stream_status() + return NotifyResponse('streamStatus', status) + + def set_stream_live(self): + self.pipeline.streamblanker.setBlankSource(None) + + status = self._get_stream_status() + return NotifyResponse('streamStatus', status) + + + def get_config(self): + confdict = {header: dict(section) for header, section in dict(Config).items()} + return json.dumps(confdict) diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 4e95e46..e0632fc 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -5,6 +5,7 @@ from gi.repository import GObject from lib.commands import ControlServerCommands from lib.tcpmulticonnection import TCPMultiConnection +from lib.response import NotifyResponse, OkResponse class ControlServer(TCPMultiConnection): def __init__(self, pipeline): @@ -73,21 +74,29 @@ class ControlServer(TCPMultiConnection): args = words[1:] try: - f = self.commands.fetch(command) - message, send_signals = f(*args) - response = "ok %s\n" % message + command_function = self.commands.__class__.__dict__[command] - except Exception as e: - message = str(e) or "" - response = "error %s\n" % message + except KeyError as e: + response = "error unknown command %s\n" % command else: - if send_signals: - signal = "signal %s\n" % line - for conn, queue in self.currentConnections.items(): - if conn == requestor: - continue - queue.put(signal) + try: + responseObject = command_function(self.commands, *args) + + except Exception as e: + message = str(e) or "" + response = "error %s\n" % message + + else: + if isinstance(responseObject, NotifyResponse): + signal = "%s\n" % str(responseObject) + for conn, queue in self.currentConnections.items(): + if conn == requestor: + continue + + queue.put(signal) + + response = "%s\n" % str(responseObject) finally: self.currentConnections[requestor].put(response) diff --git a/voctocore/lib/notifications.py b/voctocore/lib/notifications.py deleted file mode 100644 index 1e101e1..0000000 --- a/voctocore/lib/notifications.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/python3 - -# If you know a better way to do this, go for it... - -import logging - -log = logging.getLogger("Notifications") -controlserver = None - -def notify_all(msg): - try: - controlserver.notify_all(msg) - except Exception as e: - log.warn(e) diff --git a/voctocore/lib/response.py b/voctocore/lib/response.py new file mode 100644 index 0000000..866d5f2 --- /dev/null +++ b/voctocore/lib/response.py @@ -0,0 +1,15 @@ +#!/usr/bin/python3 + +class Response(object): + def __init__(self, *args): + self.args = args + + def __str__(self): + return " ".join(map(str, self.args)) + + +class OkResponse(Response): + pass + +class NotifyResponse(Response): + pass diff --git a/voctocore/lib/videomix.py b/voctocore/lib/videomix.py index 053219a..e0beefb 100644 --- a/voctocore/lib/videomix.py +++ b/voctocore/lib/videomix.py @@ -4,7 +4,6 @@ from gi.repository import Gst from enum import Enum from lib.config import Config -from lib.notifications import notify_all class CompositeModes(Enum): fullscreen = 0 @@ -313,7 +312,6 @@ class VideoMix(object): # swap if required if self.sourceB == source: self.sourceB = self.sourceA - notify_all("signal set video b %s\n" % self.sourceB) self.sourceA = source self.recalculateMixerState() @@ -325,7 +323,6 @@ class VideoMix(object): # swap if required if self.sourceA == source: self.sourceA = self.sourceB - notify_all("signal set video b %s\n" % self.sourceA) self.sourceB = source self.recalculateMixerState() diff --git a/voctocore/voctocore.py b/voctocore/voctocore.py index 0e65bc6..609ef52 100755 --- a/voctocore/voctocore.py +++ b/voctocore/voctocore.py @@ -24,7 +24,6 @@ Gst.init([]) from lib.args import Args from lib.pipeline import Pipeline from lib.controlserver import ControlServer -from lib import notifications # main class class Voctocore(object): @@ -82,8 +81,6 @@ def main(): logging.debug('initializing Voctocore') voctocore = Voctocore() - notifications.controlserver = voctocore.controlserver - logging.debug('running Voctocore') voctocore.run() -- cgit v1.2.3 From 13ef8b7d796025f412874349929296e01dab2672 Mon Sep 17 00:00:00 2001 From: MaZderMind Date: Wed, 2 Sep 2015 15:17:54 +0200 Subject: fix command server when dealing with very short lived connections, like --- voctocore/lib/controlserver.py | 17 ++++++++++++++--- voctocore/lib/tcpmulticonnection.py | 3 ++- 2 files changed, 16 insertions(+), 4 deletions(-) (limited to 'voctocore/lib/controlserver.py') diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index e0632fc..631770b 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -28,14 +28,17 @@ class ControlServer(TCPMultiConnection): def on_data(self, conn, _, leftovers, *args): '''Asynchronous connection handler. Pushes data from socket into command queue linewise''' + close_after = False try: while True: try: leftovers.append(conn.recv(4096).decode(errors='replace')) if len(leftovers[-1]) == 0: self.log.info("Socket was closed") - self.close_connection(conn) - return False + leftovers.pop() + close_after = True + break + except UnicodeDecodeError as e: continue except BlockingIOError as e: @@ -58,6 +61,10 @@ class ControlServer(TCPMultiConnection): self.command_queue.put((line, conn)) + if close_after: + self.close_connection(conn) + return False + self.log.debug("Remaining %r", lines[-1]) leftovers.append(lines[-1]) return True @@ -70,6 +77,9 @@ class ControlServer(TCPMultiConnection): line, requestor = self.command_queue.get() words = line.split() + if len(words) < 1: + return True + command = words[0] args = words[1:] @@ -99,7 +109,8 @@ class ControlServer(TCPMultiConnection): response = "%s\n" % str(responseObject) finally: - self.currentConnections[requestor].put(response) + if requestor in self.currentConnections: + self.currentConnections[requestor].put(response) return True diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py index e6f8b5b..e9caf2c 100644 --- a/voctocore/lib/tcpmulticonnection.py +++ b/voctocore/lib/tcpmulticonnection.py @@ -37,5 +37,6 @@ class TCPMultiConnection(object): return True def close_connection(self, conn): - del(self.currentConnections[conn]) + if conn in self.currentConnections: + del(self.currentConnections[conn]) self.log.info('Now %u Receiver connected', len(self.currentConnections)) -- cgit v1.2.3 From 8646386d09ab6cdabf0b8421cece5c1ddd69633f Mon Sep 17 00:00:00 2001 From: MaZderMind Date: Wed, 2 Sep 2015 15:18:10 +0200 Subject: stylistic changes --- voctocore/lib/controlserver.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'voctocore/lib/controlserver.py') diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 631770b..70b9213 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -83,6 +83,8 @@ class ControlServer(TCPMultiConnection): command = words[0] args = words[1:] + self.log.debug("Processing Command %r with args %s", command, args) + try: command_function = self.commands.__class__.__dict__[command] @@ -124,7 +126,8 @@ class ControlServer(TCPMultiConnection): return False if queue.empty(): - return True + return True + message = queue.get() try: conn.send(message.encode()) -- cgit v1.2.3