aboutsummaryrefslogtreecommitdiff
path: root/voctocore/lib
diff options
context:
space:
mode:
authorMaZderMind <git@mazdermind.de>2015-09-02 15:19:30 +0200
committerMaZderMind <git@mazdermind.de>2015-09-02 15:19:30 +0200
commit20d75f6c7f5cad2f5c1b4da71ad5405848230201 (patch)
tree93894d7bc79a0b018275efbc0db524ac6f686570 /voctocore/lib
parenta004948051e182edb3a7e40f1f7f2e14bebb0e0e (diff)
parent8646386d09ab6cdabf0b8421cece5c1ddd69633f (diff)
Merge branch 'control-server-resilience'
Based on the work made by zuntrax & mithro at cccamp15
Diffstat (limited to 'voctocore/lib')
-rw-r--r--voctocore/lib/commands.py187
-rw-r--r--voctocore/lib/controlserver.py186
-rw-r--r--voctocore/lib/response.py15
-rw-r--r--voctocore/lib/tcpmulticonnection.py10
4 files changed, 234 insertions, 164 deletions
diff --git a/voctocore/lib/commands.py b/voctocore/lib/commands.py
index 664a6ca..dd4ef0d 100644
--- a/voctocore/lib/commands.py
+++ b/voctocore/lib/commands.py
@@ -2,119 +2,146 @@
import logging
import json
-
from lib.config import Config
from lib.videomix import CompositeModes
+from lib.response import NotifyResponse, OkResponse
-class ControlServerCommands():
- def __init__(self, pipeline):
- self.log = logging.getLogger('ControlServerCommands')
+def decodeName(items, name_or_id):
+ try:
+ name_or_id = int(name_or_id)
+ if name_or_id < 0 or name_or_id >= len(items):
+ raise IndexError("unknown index %d" % name_or_id)
- self.pipeline = pipeline
- self.sources = Config.getlist('mix', 'sources')
- self.blankersources = Config.getlist('stream-blanker', 'sources')
+ return name_or_id
- def decodeSourceName(self, src_name_or_id):
- if isinstance(src_name_or_id, str):
- try:
- return self.sources.index(src_name_or_id)
- except Exception as e:
- raise IndexError("source %s unknown" % src_name_or_id)
+ except ValueError as e:
+ try:
+ return items.index(name_or_id)
+
+ except ValueError as e:
+ raise IndexError("unknown name %s" % name_or_id)
+
+def decodeEnumName(enum, name_or_id):
+ try:
+ name_or_id = int(name_or_id)
+ if name_or_id < 0 or name_or_id >= len(enum):
+ raise IndexError("unknown index %d" % name_or_id)
- if src_name_or_id < 0 or src_name_or_id >= len(self.sources):
- raise IndexError("source %s unknown" % src_name_or_id)
+ return name_or_id
- def encodeSourceName(self, src_id):
+ except ValueError as e:
try:
- return self.sources[src_id]
- except Exception as e:
- raise IndexError("source %s unknown" % src_id)
+ return enum[name_or_id]
- def decodeBlankerSourceName(self, src_name_or_id):
- if isinstance(src_name_or_id, str):
- try:
- return self.blankersources.index(src_name_or_id)
- except Exception as e:
- raise IndexError("source %s unknown" % src_name_or_id)
+ except KeyError as e:
+ raise IndexError("unknown name %s" % name_or_id)
- if src_name_or_id < 0 or src_name_or_id >= len(self.blankersources):
- raise IndexError("source %s unknown" % src_name_or_id)
+def encodeName(items, id):
+ try:
+ return items[id]
+ except IndexError as e:
+ raise IndexError("unknown index %d" % id)
- def encodeBlankerSourceName(self, src_id):
- try:
- return self.blankersources[src_id]
- except Exception as e:
- raise IndexError("source %s unknown" % src_id)
+def encodeEnumName(enum, id):
+ try:
+ return enum(id).name
+ except ValueError as e:
+ raise IndexError("unknown index %d" % id)
+class ControlServerCommands(object):
+ def __init__(self, pipeline):
+ self.log = logging.getLogger('ControlServerCommands')
+
+ self.pipeline = pipeline
+
+ self.sources = Config.getlist('mix', 'sources')
+ self.blankerSources = Config.getlist('stream-blanker', 'sources')
+
+ # Commands are defined below. Errors are sent to the clients by throwing
+ # exceptions, they will be turned into messages outside.
def message(self, *args):
- return True
+ return NotifyResponse('message', args)
+
+
+ def _get_video_status(self):
+ a = encodeName( self.sources, self.pipeline.vmix.getVideoSourceA() )
+ b = encodeName( self.sources, self.pipeline.vmix.getVideoSourceB() )
+ return [a, b]
+
+ def get_video(self):
+ status = self._get_video_status()
+ return OkResponse('videoStatus', *status)
def set_video_a(self, src_name_or_id):
- src_id = self.decodeSourceName(src_name_or_id)
+ src_id = decodeName(self.sources, src_name_or_id)
self.pipeline.vmix.setVideoSourceA(src_id)
- return True
- def get_video_a(self):
- src_id = self.pipeline.vmix.getVideoSourceA()
- return (True, self.encodeSourceName(src_id))
+ status = self._get_video_status()
+ return NotifyResponse('videoStatus', *status)
def set_video_b(self, src_name_or_id):
- src_id = self.decodeSourceName(src_name_or_id)
+ src_id = decodeName(self.sources, src_name_or_id)
self.pipeline.vmix.setVideoSourceB(src_id)
- return True
- def get_video_b(self):
- src_id = self.pipeline.vmix.getVideoSourceB()
- return (True, self.encodeSourceName(src_id))
+ status = self._get_video_status()
+ return NotifyResponse('videoStatus', *status)
+
+
+ def _get_audio_status(self):
+ src_id = self.pipeline.amix.getAudioSource()
+ return encodeName(self.sources, src_id)
+
+ def get_audio(self):
+ status = self._get_audio_status()
+ return OkResponse('audioStatus', status)
def set_audio(self, src_name_or_id):
- src_id = self.decodeSourceName(src_name_or_id)
+ src_id = decodeName(self.sources, src_name_or_id)
self.pipeline.amix.setAudioSource(src_id)
- return True
- def get_audio(self):
- src_id = self.pipeline.amix.getAudioSource()
- return (True, self.encodeSourceName(src_id))
+ status = self._get_audio_status()
+ return NotifyResponse('audioStatus', status)
- def set_composite_mode(self, composite_mode):
- try:
- mode = CompositeModes[composite_mode]
- except KeyError as e:
- raise KeyError("composite-mode %s unknown" % composite_mode)
+ def _get_composite_status(self):
+ mode = self.pipeline.vmix.getCompositeMode()
+ return encodeEnumName(CompositeModes, mode)
+
+ def get_composite(self):
+ status = self._get_composite_status()
+ return OkResponse('compositeMode', status)
+
+ def set_composite(self, mode_name_or_id):
+ mode = decodeEnumName(CompositeModes, mode_name_or_id)
self.pipeline.vmix.setCompositeMode(mode)
- return True
- def get_composite_mode(self):
- try:
- mode = self.pipeline.vmix.getCompositeMode()
- return (True, mode.name)
- except Exception as e:
- raise KeyError("composite-mode %s unknown" % mode)
+ status = self._get_composite_status()
+ return NotifyResponse('compositeMode', status)
- def set_stream_status(self, *args):
- try:
- if args[0] == "live":
- self.pipeline.streamblanker.setBlankSource(None)
- return True
- elif args [0] == "blank":
- src_id = self.decodeBlankerSourceName(args[1])
- self.pipeline.streamblanker.setBlankSource(src_id)
- return True
- else:
- return (False, "invocation: set_stream_status (live | blank <mode>)")
- except IndexError as e:
- return (False, "invocation: set_stream_status (live | blank <mode>)")
+
+ def _get_stream_status(self):
+ blankSource = self.pipeline.streamblanker.blankSource
+ return encodeName(self.blankerSources, blankSource)
def get_stream_status(self):
- if self.pipeline.streamblanker.blankSource is None:
- return (True, "live")
+ status = self._get_stream_status()
+ return OkResponse('streamStatus', status)
- name = self.encodeBlankerSourceName(self.pipeline.streamblanker.blankSource)
- return (True, "blank " + name)
+ def set_stream_blank(self, source_name_or_id):
+ src_id = decodeName(self.blankerSources, source_name_or_id)
+ self.pipeline.streamblanker.setBlankSource(src_id)
- def get_config(self):
- confdict = {k: dict(v) for k, v in dict(Config).items()}
- return (True, json.dumps(confdict))
+ status = self._get_stream_status()
+ return NotifyResponse('streamStatus', status)
+ def set_stream_live(self):
+ self.pipeline.streamblanker.setBlankSource(None)
+
+ status = self._get_stream_status()
+ return NotifyResponse('streamStatus', status)
+
+
+ def get_config(self):
+ confdict = {header: dict(section) for header, section in dict(Config).items()}
+ return json.dumps(confdict)
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index 1a75a49..70b9213 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -1,9 +1,11 @@
#!/usr/bin/python3
import socket, logging, traceback
+from queue import Queue
from gi.repository import GObject
from lib.commands import ControlServerCommands
from lib.tcpmulticonnection import TCPMultiConnection
+from lib.response import NotifyResponse, OkResponse
class ControlServer(TCPMultiConnection):
def __init__(self, pipeline):
@@ -11,113 +13,135 @@ class ControlServer(TCPMultiConnection):
self.log = logging.getLogger('ControlServer')
super().__init__(port=9999)
+ self.command_queue = Queue()
+
self.commands = ControlServerCommands(pipeline)
+ GObject.idle_add(self.on_loop)
+
def on_accepted(self, conn, addr):
'''Asynchronous connection listener. Starts a handler for each connection.'''
-
self.log.debug('Setting GObject io-watch on Connection')
- GObject.io_add_watch(conn, GObject.IO_IN, self.on_data)
+ GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
+ GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
- def on_data(self, conn, *args):
- '''Asynchronous connection handler. Processes each line from the socket.'''
- # construct a file-like object fro mthe socket
- # to be able to read linewise and in utf-8
- filelike = conn.makefile('rw')
-
- # read a line from the socket
- line = ''
+ def on_data(self, conn, _, leftovers, *args):
+ '''Asynchronous connection handler. Pushes data from socket
+ into command queue linewise'''
+ close_after = False
try:
- line = filelike.readline().strip()
- except Exception as e:
- self.log.warn("Can't read from socket: %s", e)
-
- # no data = remote closed connection
- if len(line) == 0:
+ while True:
+ try:
+ leftovers.append(conn.recv(4096).decode(errors='replace'))
+ if len(leftovers[-1]) == 0:
+ self.log.info("Socket was closed")
+ leftovers.pop()
+ close_after = True
+ break
+
+ except UnicodeDecodeError as e:
+ continue
+ except BlockingIOError as e:
+ pass
+
+ data = "".join(leftovers)
+ leftovers.clear()
+
+ lines = data.split('\n')
+ for line in lines[:-1]:
+ self.log.debug("Got line: %r", line)
+
+ line = line.strip()
+ # TODO: move quit to on_loop
+ # 'quit' = remote wants us to close the connection
+ if line == 'quit':
+ self.log.info("Client asked us to close the Connection")
+ self.close_connection(conn)
+ return False
+
+ self.command_queue.put((line, conn))
+
+ if close_after:
self.close_connection(conn)
return False
- # 'quit' = remote wants us to close the connection
- if line == 'quit':
- self.log.info("Client asked us to close the Connection")
- self.close_connection(conn)
- return False
+ self.log.debug("Remaining %r", lines[-1])
+ leftovers.append(lines[-1])
+ return True
- # process the received line
- success, msg = self.processLine(conn, line)
+ def on_loop(self):
+ '''Command handler. Processes commands in the command queue whenever
+ nothing else is happening (registered as GObject idle callback)'''
+ if self.command_queue.empty():
+ return True
+ line, requestor = self.command_queue.get()
- # success = False -> error
- if success == False:
- # on error-responses the message is mandatory
- if msg is None:
- msg = '<no message>'
+ words = line.split()
+ if len(words) < 1:
+ return True
- # respond with 'error' and the message
- filelike.write('error '+msg+'\n')
- self.log.info("Function-Call returned an Error: %s", msg)
+ command = words[0]
+ args = words[1:]
- # keep on listening on that connection
- return True
+ self.log.debug("Processing Command %r with args %s", command, args)
- # success = True and not message
- if msg is None:
- # respond with a simple 'ok'
- filelike.write('ok\n')
- else:
- # respond with the returned message
- filelike.write('ok '+msg+'\n')
+ try:
+ command_function = self.commands.__class__.__dict__[command]
- return True
+ except KeyError as e:
+ response = "error unknown command %s\n" % command
- def processLine(self, conn, line):
- # split line into command and optional args
- words = line.split()
- command = words[0]
- args = words[1:]
+ else:
+ try:
+ responseObject = command_function(self.commands, *args)
- # log function-call as parsed
- self.log.info("Read Function-Call from %s: %s( %s )", conn.getpeername(), command, args)
+ except Exception as e:
+ message = str(e) or "<no message>"
+ response = "error %s\n" % message
- # check that the function-call is a known Command
- if not hasattr(self.commands, command):
- return False, 'unknown command %s' % command
+ else:
+ if isinstance(responseObject, NotifyResponse):
+ signal = "%s\n" % str(responseObject)
+ for conn, queue in self.currentConnections.items():
+ if conn == requestor:
+ continue
+ queue.put(signal)
- try:
- # fetch the function-pointer
- f = getattr(self.commands, command)
+ response = "%s\n" % str(responseObject)
- # call the function
- ret = f(*args)
+ finally:
+ if requestor in self.currentConnections:
+ self.currentConnections[requestor].put(response)
- # signal method call to all other connected clients
- # only signal set_* commands
- if command.split('_')[0] in ["set", "message"]:
- self.signal(conn, command, args)
+ return True
- # if it returned an iterable, probably (Success, Message), pass that on
- if hasattr(ret, '__iter__'):
- return ret
- else:
- # otherwise construct a tuple
- return (ret, None)
+ def on_write(self, conn, *args):
+ # TODO: on_loop() is not called as soon as there is a writable socket
+ self.on_loop()
- except Exception as e:
- self.log.error("Trapped Exception in Remote-Communication: %s", e)
+ try:
+ queue = self.currentConnections[conn]
+ except KeyError:
+ return False
- # In case of an Exception, return that
- return False, str(e)
+ if queue.empty():
+ return True
- def signal(self, origin_conn, command, args):
- for conn in self.currentConnections:
- if conn == origin_conn:
- continue
+ message = queue.get()
+ try:
+ conn.send(message.encode())
+ except Exception as e:
+ self.log.warn(e)
- self.log.debug(
- 'signaling connection %s the successful '
- 'execution of the command %s',
- conn.getpeername(), command)
+ return True
- conn.makefile('w').write(
- "signal %s %s\n" % (command, ' '.join(args))
- )
+ def notify_all(self, msg):
+ try:
+ words = msg.split()
+ words[-1] = self.commands.encodeSourceName(int(words[-1]))
+ msg = " ".join(words) + '\n'
+ for queue in self.currentConnections.values():
+ queue.put(msg)
+ except Exception as e:
+ self.log.debug("Error during notify: %s", e)
diff --git a/voctocore/lib/response.py b/voctocore/lib/response.py
new file mode 100644
index 0000000..866d5f2
--- /dev/null
+++ b/voctocore/lib/response.py
@@ -0,0 +1,15 @@
+#!/usr/bin/python3
+
+class Response(object):
+ def __init__(self, *args):
+ self.args = args
+
+ def __str__(self):
+ return " ".join(map(str, self.args))
+
+
+class OkResponse(Response):
+ pass
+
+class NotifyResponse(Response):
+ pass
diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py
index 927ac06..e9caf2c 100644
--- a/voctocore/lib/tcpmulticonnection.py
+++ b/voctocore/lib/tcpmulticonnection.py
@@ -1,5 +1,6 @@
#!/usr/bin/python3
import logging, socket
+from queue import Queue
from gi.repository import GObject
from lib.config import Config
@@ -10,7 +11,7 @@ class TCPMultiConnection(object):
self.log = logging.getLogger('TCPMultiConnection')
self.boundSocket = None
- self.currentConnections = []
+ self.currentConnections = dict()
self.log.debug('Binding to Source-Socket on [::]:%u', port)
self.boundSocket = socket.socket(socket.AF_INET6)
@@ -24,9 +25,11 @@ class TCPMultiConnection(object):
def on_connect(self, sock, *args):
conn, addr = sock.accept()
+ conn.setblocking(False)
+
self.log.info("Incomming Connection from %s", addr)
- self.currentConnections.append(conn)
+ self.currentConnections[conn] = Queue()
self.log.info('Now %u Receiver connected', len(self.currentConnections))
self.on_accepted(conn, addr)
@@ -34,5 +37,6 @@ class TCPMultiConnection(object):
return True
def close_connection(self, conn):
- self.currentConnections.remove(conn)
+ if conn in self.currentConnections:
+ del(self.currentConnections[conn])
self.log.info('Now %u Receiver connected', len(self.currentConnections))