aboutsummaryrefslogtreecommitdiff
path: root/voctocore
diff options
context:
space:
mode:
Diffstat (limited to 'voctocore')
-rw-r--r--voctocore/README.md30
-rw-r--r--voctocore/lib/commands.py74
-rw-r--r--voctocore/lib/controlserver.py173
-rw-r--r--voctocore/lib/notifications.py14
-rw-r--r--voctocore/lib/tcpmulticonnection.py9
-rw-r--r--voctocore/lib/videomix.py3
-rwxr-xr-xvoctocore/voctocore.py3
7 files changed, 169 insertions, 137 deletions
diff --git a/voctocore/README.md b/voctocore/README.md
index e9217e6..b5ad126 100644
--- a/voctocore/README.md
+++ b/voctocore/README.md
@@ -79,34 +79,34 @@ When another Client issues a Command and the Server executed it successfully, th
### Example Communication:
````
-< set_video_a cam1
-> ok
+< set video a cam1
+> ok cam1
-< set_composite_mode side_by_side_equal
-> ok
+< set composite side_by_side_equal
+> ok side_by_side_equal
-< get_output_port
+< get output port
> ok 11000
-< get_video_a
-> ok 0 cam1
+< get video a
+> ok cam1
-< set_composite_mode
+< get composite
> ok side_by_side_equal
-< set_video_a blafoo
+< set video a blafoo
> error "blafoo" is no known src
-< set_stream_blank pause
-> ok
+< set status blank pause
+> ok blank pause
-< set_stream_live
-> ok
+< set status live
+> ok live
-> signal set_video_a cam1
-> signal set_composite_mode side_by_side_equal
+> signal set video a cam1
+> signal set composite side_by_side_equal
````
diff --git a/voctocore/lib/commands.py b/voctocore/lib/commands.py
index 664a6ca..c972c69 100644
--- a/voctocore/lib/commands.py
+++ b/voctocore/lib/commands.py
@@ -46,75 +46,83 @@ class ControlServerCommands():
except Exception as e:
raise IndexError("source %s unknown" % src_id)
+ # Commands are defined below. Errors are sent to the clients by throwing
+ # exceptions, they will be turned into messages outside.
+
+ def fetch(self, command):
+ if command not in ['set', 'get', 'message', 'signal']:
+ raise Exception("unknown command")
+ return getattr(self, command)
def message(self, *args):
- return True
+ return " ".join(args), True
- def set_video_a(self, src_name_or_id):
- src_id = self.decodeSourceName(src_name_or_id)
- self.pipeline.vmix.setVideoSourceA(src_id)
- return True
+ def signal(self, *args):
+ return "", True
- def get_video_a(self):
- src_id = self.pipeline.vmix.getVideoSourceA()
- return (True, self.encodeSourceName(src_id))
+ def get(self, subcommand, *args, signal=False):
+ return getattr(self, "_get_"+subcommand)(*args), signal
- def set_video_b(self, src_name_or_id):
- src_id = self.decodeSourceName(src_name_or_id)
- self.pipeline.vmix.setVideoSourceB(src_id)
- return True
+ def set(self, subcommand, *args):
+ getattr(self, "_set_"+subcommand)(*args)
+ return self.get(subcommand, *args, signal=True)
- def get_video_b(self):
- src_id = self.pipeline.vmix.getVideoSourceB()
- return (True, self.encodeSourceName(src_id))
+ def _get_video(self, target, _=None):
+ if target not in ['a', 'b']:
+ raise Exception("invalid video source name: 'a' or 'b' expected.")
+ cmd = "getVideoSource" + target.upper()
+ src_id = getattr(self.pipeline.vmix, cmd)()
+ return self.encodeSourceName(src_id)
+
+ def _set_video(self, target, src_name_or_id):
+ if target not in ['a', 'b']:
+ raise Exception("invalid video source name: 'a' or 'b' expected.")
+ src_id = self.decodeSourceName(src_name_or_id)
+ getattr(self.pipeline.vmix, 'setVideoSource' + target.upper())(src_id)
- def set_audio(self, src_name_or_id):
+ def _set_audio(self, src_name_or_id):
src_id = self.decodeSourceName(src_name_or_id)
self.pipeline.amix.setAudioSource(src_id)
- return True
- def get_audio(self):
+ def _get_audio(self, _=None):
src_id = self.pipeline.amix.getAudioSource()
- return (True, self.encodeSourceName(src_id))
+ return self.encodeSourceName(src_id)
- def set_composite_mode(self, composite_mode):
+ def _set_composite(self, composite_mode):
try:
mode = CompositeModes[composite_mode]
except KeyError as e:
raise KeyError("composite-mode %s unknown" % composite_mode)
self.pipeline.vmix.setCompositeMode(mode)
- return True
- def get_composite_mode(self):
+ def _get_composite(self, _=None):
try:
mode = self.pipeline.vmix.getCompositeMode()
- return (True, mode.name)
+ return mode.name
except Exception as e:
raise KeyError("composite-mode %s unknown" % mode)
- def set_stream_status(self, *args):
+ def _set_status(self, *args):
try:
if args[0] == "live":
self.pipeline.streamblanker.setBlankSource(None)
- return True
elif args [0] == "blank":
src_id = self.decodeBlankerSourceName(args[1])
self.pipeline.streamblanker.setBlankSource(src_id)
- return True
else:
- return (False, "invocation: set_stream_status (live | blank <mode>)")
+ raise IndexError()
except IndexError as e:
- return (False, "invocation: set_stream_status (live | blank <mode>)")
+ raise Exception("invocation: set_status (live | blank <mode>)")
- def get_stream_status(self):
+ def _get_status(self, *args):
if self.pipeline.streamblanker.blankSource is None:
- return (True, "live")
+ return "live"
name = self.encodeBlankerSourceName(self.pipeline.streamblanker.blankSource)
- return (True, "blank " + name)
+ return "blank " + name
- def get_config(self):
+ def _get_config(self):
confdict = {k: dict(v) for k, v in dict(Config).items()}
- return (True, json.dumps(confdict))
+ return json.dumps(confdict)
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index 1a75a49..4e95e46 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -1,5 +1,6 @@
#!/usr/bin/python3
import socket, logging, traceback
+from queue import Queue
from gi.repository import GObject
from lib.commands import ControlServerCommands
@@ -11,113 +12,113 @@ class ControlServer(TCPMultiConnection):
self.log = logging.getLogger('ControlServer')
super().__init__(port=9999)
+ self.command_queue = Queue()
+
self.commands = ControlServerCommands(pipeline)
+ GObject.idle_add(self.on_loop)
+
def on_accepted(self, conn, addr):
'''Asynchronous connection listener. Starts a handler for each connection.'''
-
self.log.debug('Setting GObject io-watch on Connection')
- GObject.io_add_watch(conn, GObject.IO_IN, self.on_data)
-
- def on_data(self, conn, *args):
- '''Asynchronous connection handler. Processes each line from the socket.'''
- # construct a file-like object fro mthe socket
- # to be able to read linewise and in utf-8
- filelike = conn.makefile('rw')
+ GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
+ GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
- # read a line from the socket
- line = ''
+ def on_data(self, conn, _, leftovers, *args):
+ '''Asynchronous connection handler. Pushes data from socket
+ into command queue linewise'''
try:
- line = filelike.readline().strip()
- except Exception as e:
- self.log.warn("Can't read from socket: %s", e)
-
- # no data = remote closed connection
- if len(line) == 0:
- self.close_connection(conn)
- return False
-
- # 'quit' = remote wants us to close the connection
- if line == 'quit':
- self.log.info("Client asked us to close the Connection")
- self.close_connection(conn)
- return False
-
- # process the received line
- success, msg = self.processLine(conn, line)
-
- # success = False -> error
- if success == False:
- # on error-responses the message is mandatory
- if msg is None:
- msg = '<no message>'
-
- # respond with 'error' and the message
- filelike.write('error '+msg+'\n')
- self.log.info("Function-Call returned an Error: %s", msg)
+ while True:
+ try:
+ leftovers.append(conn.recv(4096).decode(errors='replace'))
+ if len(leftovers[-1]) == 0:
+ self.log.info("Socket was closed")
+ self.close_connection(conn)
+ return False
+ except UnicodeDecodeError as e:
+ continue
+ except BlockingIOError as e:
+ pass
+
+ data = "".join(leftovers)
+ leftovers.clear()
+
+ lines = data.split('\n')
+ for line in lines[:-1]:
+ self.log.debug("Got line: %r", line)
+
+ line = line.strip()
+ # TODO: move quit to on_loop
+ # 'quit' = remote wants us to close the connection
+ if line == 'quit':
+ self.log.info("Client asked us to close the Connection")
+ self.close_connection(conn)
+ return False
+
+ self.command_queue.put((line, conn))
+
+ self.log.debug("Remaining %r", lines[-1])
+ leftovers.append(lines[-1])
+ return True
- # keep on listening on that connection
+ def on_loop(self):
+ '''Command handler. Processes commands in the command queue whenever
+ nothing else is happening (registered as GObject idle callback)'''
+ if self.command_queue.empty():
return True
+ line, requestor = self.command_queue.get()
- # success = True and not message
- if msg is None:
- # respond with a simple 'ok'
- filelike.write('ok\n')
- else:
- # respond with the returned message
- filelike.write('ok '+msg+'\n')
-
- return True
-
- def processLine(self, conn, line):
- # split line into command and optional args
words = line.split()
command = words[0]
args = words[1:]
- # log function-call as parsed
- self.log.info("Read Function-Call from %s: %s( %s )", conn.getpeername(), command, args)
+ try:
+ f = self.commands.fetch(command)
+ message, send_signals = f(*args)
+ response = "ok %s\n" % message
- # check that the function-call is a known Command
- if not hasattr(self.commands, command):
- return False, 'unknown command %s' % command
+ except Exception as e:
+ message = str(e) or "<no message>"
+ response = "error %s\n" % message
+ else:
+ if send_signals:
+ signal = "signal %s\n" % line
+ for conn, queue in self.currentConnections.items():
+ if conn == requestor:
+ continue
+ queue.put(signal)
- try:
- # fetch the function-pointer
- f = getattr(self.commands, command)
+ finally:
+ self.currentConnections[requestor].put(response)
- # call the function
- ret = f(*args)
+ return True
- # signal method call to all other connected clients
- # only signal set_* commands
- if command.split('_')[0] in ["set", "message"]:
- self.signal(conn, command, args)
+ def on_write(self, conn, *args):
+ # TODO: on_loop() is not called as soon as there is a writable socket
+ self.on_loop()
- # if it returned an iterable, probably (Success, Message), pass that on
- if hasattr(ret, '__iter__'):
- return ret
- else:
- # otherwise construct a tuple
- return (ret, None)
+ try:
+ queue = self.currentConnections[conn]
+ except KeyError:
+ return False
+ if queue.empty():
+ return True
+ message = queue.get()
+ try:
+ conn.send(message.encode())
except Exception as e:
- self.log.error("Trapped Exception in Remote-Communication: %s", e)
-
- # In case of an Exception, return that
- return False, str(e)
-
- def signal(self, origin_conn, command, args):
- for conn in self.currentConnections:
- if conn == origin_conn:
- continue
+ self.log.warn(e)
- self.log.debug(
- 'signaling connection %s the successful '
- 'execution of the command %s',
- conn.getpeername(), command)
+ return True
- conn.makefile('w').write(
- "signal %s %s\n" % (command, ' '.join(args))
- )
+ def notify_all(self, msg):
+ try:
+ words = msg.split()
+ words[-1] = self.commands.encodeSourceName(int(words[-1]))
+ msg = " ".join(words) + '\n'
+ for queue in self.currentConnections.values():
+ queue.put(msg)
+ except Exception as e:
+ self.log.debug("Error during notify: %s", e)
diff --git a/voctocore/lib/notifications.py b/voctocore/lib/notifications.py
new file mode 100644
index 0000000..1e101e1
--- /dev/null
+++ b/voctocore/lib/notifications.py
@@ -0,0 +1,14 @@
+#!/usr/bin/python3
+
+# If you know a better way to do this, go for it...
+
+import logging
+
+log = logging.getLogger("Notifications")
+controlserver = None
+
+def notify_all(msg):
+ try:
+ controlserver.notify_all(msg)
+ except Exception as e:
+ log.warn(e)
diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py
index 927ac06..e6f8b5b 100644
--- a/voctocore/lib/tcpmulticonnection.py
+++ b/voctocore/lib/tcpmulticonnection.py
@@ -1,5 +1,6 @@
#!/usr/bin/python3
import logging, socket
+from queue import Queue
from gi.repository import GObject
from lib.config import Config
@@ -10,7 +11,7 @@ class TCPMultiConnection(object):
self.log = logging.getLogger('TCPMultiConnection')
self.boundSocket = None
- self.currentConnections = []
+ self.currentConnections = dict()
self.log.debug('Binding to Source-Socket on [::]:%u', port)
self.boundSocket = socket.socket(socket.AF_INET6)
@@ -24,9 +25,11 @@ class TCPMultiConnection(object):
def on_connect(self, sock, *args):
conn, addr = sock.accept()
+ conn.setblocking(False)
+
self.log.info("Incomming Connection from %s", addr)
- self.currentConnections.append(conn)
+ self.currentConnections[conn] = Queue()
self.log.info('Now %u Receiver connected', len(self.currentConnections))
self.on_accepted(conn, addr)
@@ -34,5 +37,5 @@ class TCPMultiConnection(object):
return True
def close_connection(self, conn):
- self.currentConnections.remove(conn)
+ del(self.currentConnections[conn])
self.log.info('Now %u Receiver connected', len(self.currentConnections))
diff --git a/voctocore/lib/videomix.py b/voctocore/lib/videomix.py
index e0beefb..053219a 100644
--- a/voctocore/lib/videomix.py
+++ b/voctocore/lib/videomix.py
@@ -4,6 +4,7 @@ from gi.repository import Gst
from enum import Enum
from lib.config import Config
+from lib.notifications import notify_all
class CompositeModes(Enum):
fullscreen = 0
@@ -312,6 +313,7 @@ class VideoMix(object):
# swap if required
if self.sourceB == source:
self.sourceB = self.sourceA
+ notify_all("signal set video b %s\n" % self.sourceB)
self.sourceA = source
self.recalculateMixerState()
@@ -323,6 +325,7 @@ class VideoMix(object):
# swap if required
if self.sourceA == source:
self.sourceA = self.sourceB
+ notify_all("signal set video b %s\n" % self.sourceA)
self.sourceB = source
self.recalculateMixerState()
diff --git a/voctocore/voctocore.py b/voctocore/voctocore.py
index 609ef52..0e65bc6 100755
--- a/voctocore/voctocore.py
+++ b/voctocore/voctocore.py
@@ -24,6 +24,7 @@ Gst.init([])
from lib.args import Args
from lib.pipeline import Pipeline
from lib.controlserver import ControlServer
+from lib import notifications
# main class
class Voctocore(object):
@@ -81,6 +82,8 @@ def main():
logging.debug('initializing Voctocore')
voctocore = Voctocore()
+ notifications.controlserver = voctocore.controlserver
+
logging.debug('running Voctocore')
voctocore.run()