diff options
-rw-r--r-- | voctocore/README.md | 30 | ||||
-rw-r--r-- | voctocore/lib/commands.py | 74 | ||||
-rw-r--r-- | voctocore/lib/controlserver.py | 173 | ||||
-rw-r--r-- | voctocore/lib/notifications.py | 14 | ||||
-rw-r--r-- | voctocore/lib/tcpmulticonnection.py | 9 | ||||
-rw-r--r-- | voctocore/lib/videomix.py | 3 | ||||
-rwxr-xr-x | voctocore/voctocore.py | 3 | ||||
-rw-r--r-- | voctogui/lib/connection.py | 33 |
8 files changed, 201 insertions, 138 deletions
diff --git a/voctocore/README.md b/voctocore/README.md index e9217e6..b5ad126 100644 --- a/voctocore/README.md +++ b/voctocore/README.md @@ -79,34 +79,34 @@ When another Client issues a Command and the Server executed it successfully, th ### Example Communication: ```` -< set_video_a cam1 -> ok +< set video a cam1 +> ok cam1 -< set_composite_mode side_by_side_equal -> ok +< set composite side_by_side_equal +> ok side_by_side_equal -< get_output_port +< get output port > ok 11000 -< get_video_a -> ok 0 cam1 +< get video a +> ok cam1 -< set_composite_mode +< get composite > ok side_by_side_equal -< set_video_a blafoo +< set video a blafoo > error "blafoo" is no known src -< set_stream_blank pause -> ok +< set status blank pause +> ok blank pause -< set_stream_live -> ok +< set status live +> ok live … -> signal set_video_a cam1 -> signal set_composite_mode side_by_side_equal +> signal set video a cam1 +> signal set composite side_by_side_equal ```` diff --git a/voctocore/lib/commands.py b/voctocore/lib/commands.py index 664a6ca..c972c69 100644 --- a/voctocore/lib/commands.py +++ b/voctocore/lib/commands.py @@ -46,75 +46,83 @@ class ControlServerCommands(): except Exception as e: raise IndexError("source %s unknown" % src_id) + # Commands are defined below. Errors are sent to the clients by throwing + # exceptions, they will be turned into messages outside. + + def fetch(self, command): + if command not in ['set', 'get', 'message', 'signal']: + raise Exception("unknown command") + return getattr(self, command) def message(self, *args): - return True + return " ".join(args), True - def set_video_a(self, src_name_or_id): - src_id = self.decodeSourceName(src_name_or_id) - self.pipeline.vmix.setVideoSourceA(src_id) - return True + def signal(self, *args): + return "", True - def get_video_a(self): - src_id = self.pipeline.vmix.getVideoSourceA() - return (True, self.encodeSourceName(src_id)) + def get(self, subcommand, *args, signal=False): + return getattr(self, "_get_"+subcommand)(*args), signal - def set_video_b(self, src_name_or_id): - src_id = self.decodeSourceName(src_name_or_id) - self.pipeline.vmix.setVideoSourceB(src_id) - return True + def set(self, subcommand, *args): + getattr(self, "_set_"+subcommand)(*args) + return self.get(subcommand, *args, signal=True) - def get_video_b(self): - src_id = self.pipeline.vmix.getVideoSourceB() - return (True, self.encodeSourceName(src_id)) + def _get_video(self, target, _=None): + if target not in ['a', 'b']: + raise Exception("invalid video source name: 'a' or 'b' expected.") + cmd = "getVideoSource" + target.upper() + src_id = getattr(self.pipeline.vmix, cmd)() + return self.encodeSourceName(src_id) + + def _set_video(self, target, src_name_or_id): + if target not in ['a', 'b']: + raise Exception("invalid video source name: 'a' or 'b' expected.") + src_id = self.decodeSourceName(src_name_or_id) + getattr(self.pipeline.vmix, 'setVideoSource' + target.upper())(src_id) - def set_audio(self, src_name_or_id): + def _set_audio(self, src_name_or_id): src_id = self.decodeSourceName(src_name_or_id) self.pipeline.amix.setAudioSource(src_id) - return True - def get_audio(self): + def _get_audio(self, _=None): src_id = self.pipeline.amix.getAudioSource() - return (True, self.encodeSourceName(src_id)) + return self.encodeSourceName(src_id) - def set_composite_mode(self, composite_mode): + def _set_composite(self, composite_mode): try: mode = CompositeModes[composite_mode] except KeyError as e: raise KeyError("composite-mode %s unknown" % composite_mode) self.pipeline.vmix.setCompositeMode(mode) - return True - def get_composite_mode(self): + def _get_composite(self, _=None): try: mode = self.pipeline.vmix.getCompositeMode() - return (True, mode.name) + return mode.name except Exception as e: raise KeyError("composite-mode %s unknown" % mode) - def set_stream_status(self, *args): + def _set_status(self, *args): try: if args[0] == "live": self.pipeline.streamblanker.setBlankSource(None) - return True elif args [0] == "blank": src_id = self.decodeBlankerSourceName(args[1]) self.pipeline.streamblanker.setBlankSource(src_id) - return True else: - return (False, "invocation: set_stream_status (live | blank <mode>)") + raise IndexError() except IndexError as e: - return (False, "invocation: set_stream_status (live | blank <mode>)") + raise Exception("invocation: set_status (live | blank <mode>)") - def get_stream_status(self): + def _get_status(self, *args): if self.pipeline.streamblanker.blankSource is None: - return (True, "live") + return "live" name = self.encodeBlankerSourceName(self.pipeline.streamblanker.blankSource) - return (True, "blank " + name) + return "blank " + name - def get_config(self): + def _get_config(self): confdict = {k: dict(v) for k, v in dict(Config).items()} - return (True, json.dumps(confdict)) + return json.dumps(confdict) diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 1a75a49..4e95e46 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 import socket, logging, traceback +from queue import Queue from gi.repository import GObject from lib.commands import ControlServerCommands @@ -11,113 +12,113 @@ class ControlServer(TCPMultiConnection): self.log = logging.getLogger('ControlServer') super().__init__(port=9999) + self.command_queue = Queue() + self.commands = ControlServerCommands(pipeline) + GObject.idle_add(self.on_loop) + def on_accepted(self, conn, addr): '''Asynchronous connection listener. Starts a handler for each connection.''' - self.log.debug('Setting GObject io-watch on Connection') - GObject.io_add_watch(conn, GObject.IO_IN, self.on_data) - - def on_data(self, conn, *args): - '''Asynchronous connection handler. Processes each line from the socket.''' - # construct a file-like object fro mthe socket - # to be able to read linewise and in utf-8 - filelike = conn.makefile('rw') + GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, ['']) + GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) - # read a line from the socket - line = '' + def on_data(self, conn, _, leftovers, *args): + '''Asynchronous connection handler. Pushes data from socket + into command queue linewise''' try: - line = filelike.readline().strip() - except Exception as e: - self.log.warn("Can't read from socket: %s", e) - - # no data = remote closed connection - if len(line) == 0: - self.close_connection(conn) - return False - - # 'quit' = remote wants us to close the connection - if line == 'quit': - self.log.info("Client asked us to close the Connection") - self.close_connection(conn) - return False - - # process the received line - success, msg = self.processLine(conn, line) - - # success = False -> error - if success == False: - # on error-responses the message is mandatory - if msg is None: - msg = '<no message>' - - # respond with 'error' and the message - filelike.write('error '+msg+'\n') - self.log.info("Function-Call returned an Error: %s", msg) + while True: + try: + leftovers.append(conn.recv(4096).decode(errors='replace')) + if len(leftovers[-1]) == 0: + self.log.info("Socket was closed") + self.close_connection(conn) + return False + except UnicodeDecodeError as e: + continue + except BlockingIOError as e: + pass + + data = "".join(leftovers) + leftovers.clear() + + lines = data.split('\n') + for line in lines[:-1]: + self.log.debug("Got line: %r", line) + + line = line.strip() + # TODO: move quit to on_loop + # 'quit' = remote wants us to close the connection + if line == 'quit': + self.log.info("Client asked us to close the Connection") + self.close_connection(conn) + return False + + self.command_queue.put((line, conn)) + + self.log.debug("Remaining %r", lines[-1]) + leftovers.append(lines[-1]) + return True - # keep on listening on that connection + def on_loop(self): + '''Command handler. Processes commands in the command queue whenever + nothing else is happening (registered as GObject idle callback)''' + if self.command_queue.empty(): return True + line, requestor = self.command_queue.get() - # success = True and not message - if msg is None: - # respond with a simple 'ok' - filelike.write('ok\n') - else: - # respond with the returned message - filelike.write('ok '+msg+'\n') - - return True - - def processLine(self, conn, line): - # split line into command and optional args words = line.split() command = words[0] args = words[1:] - # log function-call as parsed - self.log.info("Read Function-Call from %s: %s( %s )", conn.getpeername(), command, args) + try: + f = self.commands.fetch(command) + message, send_signals = f(*args) + response = "ok %s\n" % message - # check that the function-call is a known Command - if not hasattr(self.commands, command): - return False, 'unknown command %s' % command + except Exception as e: + message = str(e) or "<no message>" + response = "error %s\n" % message + else: + if send_signals: + signal = "signal %s\n" % line + for conn, queue in self.currentConnections.items(): + if conn == requestor: + continue + queue.put(signal) - try: - # fetch the function-pointer - f = getattr(self.commands, command) + finally: + self.currentConnections[requestor].put(response) - # call the function - ret = f(*args) + return True - # signal method call to all other connected clients - # only signal set_* commands - if command.split('_')[0] in ["set", "message"]: - self.signal(conn, command, args) + def on_write(self, conn, *args): + # TODO: on_loop() is not called as soon as there is a writable socket + self.on_loop() - # if it returned an iterable, probably (Success, Message), pass that on - if hasattr(ret, '__iter__'): - return ret - else: - # otherwise construct a tuple - return (ret, None) + try: + queue = self.currentConnections[conn] + except KeyError: + return False + if queue.empty(): + return True + message = queue.get() + try: + conn.send(message.encode()) except Exception as e: - self.log.error("Trapped Exception in Remote-Communication: %s", e) - - # In case of an Exception, return that - return False, str(e) - - def signal(self, origin_conn, command, args): - for conn in self.currentConnections: - if conn == origin_conn: - continue + self.log.warn(e) - self.log.debug( - 'signaling connection %s the successful ' - 'execution of the command %s', - conn.getpeername(), command) + return True - conn.makefile('w').write( - "signal %s %s\n" % (command, ' '.join(args)) - ) + def notify_all(self, msg): + try: + words = msg.split() + words[-1] = self.commands.encodeSourceName(int(words[-1])) + msg = " ".join(words) + '\n' + for queue in self.currentConnections.values(): + queue.put(msg) + except Exception as e: + self.log.debug("Error during notify: %s", e) diff --git a/voctocore/lib/notifications.py b/voctocore/lib/notifications.py new file mode 100644 index 0000000..1e101e1 --- /dev/null +++ b/voctocore/lib/notifications.py @@ -0,0 +1,14 @@ +#!/usr/bin/python3 + +# If you know a better way to do this, go for it... + +import logging + +log = logging.getLogger("Notifications") +controlserver = None + +def notify_all(msg): + try: + controlserver.notify_all(msg) + except Exception as e: + log.warn(e) diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py index 927ac06..e6f8b5b 100644 --- a/voctocore/lib/tcpmulticonnection.py +++ b/voctocore/lib/tcpmulticonnection.py @@ -1,5 +1,6 @@ #!/usr/bin/python3 import logging, socket +from queue import Queue from gi.repository import GObject from lib.config import Config @@ -10,7 +11,7 @@ class TCPMultiConnection(object): self.log = logging.getLogger('TCPMultiConnection') self.boundSocket = None - self.currentConnections = [] + self.currentConnections = dict() self.log.debug('Binding to Source-Socket on [::]:%u', port) self.boundSocket = socket.socket(socket.AF_INET6) @@ -24,9 +25,11 @@ class TCPMultiConnection(object): def on_connect(self, sock, *args): conn, addr = sock.accept() + conn.setblocking(False) + self.log.info("Incomming Connection from %s", addr) - self.currentConnections.append(conn) + self.currentConnections[conn] = Queue() self.log.info('Now %u Receiver connected', len(self.currentConnections)) self.on_accepted(conn, addr) @@ -34,5 +37,5 @@ class TCPMultiConnection(object): return True def close_connection(self, conn): - self.currentConnections.remove(conn) + del(self.currentConnections[conn]) self.log.info('Now %u Receiver connected', len(self.currentConnections)) diff --git a/voctocore/lib/videomix.py b/voctocore/lib/videomix.py index e0beefb..053219a 100644 --- a/voctocore/lib/videomix.py +++ b/voctocore/lib/videomix.py @@ -4,6 +4,7 @@ from gi.repository import Gst from enum import Enum from lib.config import Config +from lib.notifications import notify_all class CompositeModes(Enum): fullscreen = 0 @@ -312,6 +313,7 @@ class VideoMix(object): # swap if required if self.sourceB == source: self.sourceB = self.sourceA + notify_all("signal set video b %s\n" % self.sourceB) self.sourceA = source self.recalculateMixerState() @@ -323,6 +325,7 @@ class VideoMix(object): # swap if required if self.sourceA == source: self.sourceA = self.sourceB + notify_all("signal set video b %s\n" % self.sourceA) self.sourceB = source self.recalculateMixerState() diff --git a/voctocore/voctocore.py b/voctocore/voctocore.py index 609ef52..0e65bc6 100755 --- a/voctocore/voctocore.py +++ b/voctocore/voctocore.py @@ -24,6 +24,7 @@ Gst.init([]) from lib.args import Args from lib.pipeline import Pipeline from lib.controlserver import ControlServer +from lib import notifications # main class class Voctocore(object): @@ -81,6 +82,8 @@ def main(): logging.debug('initializing Voctocore') voctocore = Voctocore() + notifications.controlserver = voctocore.controlserver + logging.debug('running Voctocore') voctocore.run() diff --git a/voctogui/lib/connection.py b/voctogui/lib/connection.py index 37851f0..ac7243d 100644 --- a/voctogui/lib/connection.py +++ b/voctogui/lib/connection.py @@ -10,6 +10,37 @@ def establish(host): log.info('establishing Connection to %s', host) sock = socket.create_connection( (host, port) ) log.debug('Connection successful \o/') + # TODO: register IO callback here -def ask(command): + +def send(command): print("would send command talk to server now and read back the response") + filelike = sock.makefile('rw') + filelike.write(command + "\n") + filelike.flush() + + +def on_data(args*): + filelike = sock.makefile() + line = '' + try: + line = filelike.readline() + except Exception as e: + log.warn("Can't read from socket: %s", e) + + if len(line) == 0: + close_connection() + return False + + line = line.strip() + + process_line(line) + + +def process_line(line): + msg_type = line.split()[0] + + +def close_connection(): + pass + |