aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--voctocore/lib/args.py13
-rw-r--r--voctocore/lib/audiomix.py162
-rw-r--r--voctocore/lib/avpreviewoutput.py163
-rw-r--r--voctocore/lib/avrawoutput.py146
-rw-r--r--voctocore/lib/avsource.py234
-rw-r--r--voctocore/lib/commands.py414
-rw-r--r--voctocore/lib/config.py25
-rw-r--r--voctocore/lib/controlserver.py301
-rw-r--r--voctocore/lib/loghandler.py86
-rw-r--r--voctocore/lib/pipeline.py184
-rw-r--r--voctocore/lib/response.py14
-rw-r--r--voctocore/lib/streamblanker.py193
-rw-r--r--voctocore/lib/tcpmulticonnection.py59
-rw-r--r--voctocore/lib/tcpsingleconnection.py58
-rw-r--r--voctocore/lib/videomix.py766
-rwxr-xr-xvoctocore/voctocore.py109
16 files changed, 1526 insertions, 1401 deletions
diff --git a/voctocore/lib/args.py b/voctocore/lib/args.py
index d40cd75..66c298d 100644
--- a/voctocore/lib/args.py
+++ b/voctocore/lib/args.py
@@ -4,16 +4,19 @@ __all__ = ['Args']
parser = argparse.ArgumentParser(description='Voctocore')
parser.add_argument('-v', '--verbose', action='count', default=0,
- help="Also print INFO and DEBUG messages.")
+ help="Also print INFO and DEBUG messages.")
-parser.add_argument('-c', '--color', action='store', choices=['auto', 'always', 'never'], default='auto',
- help="Control the use of colors in the Log-Output")
+parser.add_argument('-c', '--color',
+ action='store',
+ choices=['auto', 'always', 'never'],
+ default='auto',
+ help="Control the use of colors in the Log-Output")
parser.add_argument('-t', '--timestamp', action='store_true',
- help="Enable timestamps in the Log-Output")
+ help="Enable timestamps in the Log-Output")
parser.add_argument('-i', '--ini-file', action='store',
- help="Load a custom config.ini-File")
+ help="Load a custom config.ini-File")
Args = parser.parse_args()
diff --git a/voctocore/lib/audiomix.py b/voctocore/lib/audiomix.py
index 6c1768a..37376fd 100644
--- a/voctocore/lib/audiomix.py
+++ b/voctocore/lib/audiomix.py
@@ -5,83 +5,87 @@ from enum import Enum
from lib.config import Config
from lib.clock import Clock
+
class AudioMix(object):
- def __init__(self):
- self.log = logging.getLogger('AudioMix')
-
- self.selectedSource = 0
-
- self.caps = Config.get('mix', 'audiocaps')
- self.names = Config.getlist('mix', 'sources')
- self.log.info('Configuring Mixer for %u Sources', len(self.names))
-
- pipeline = """
- audiomixer name=mix !
- {caps} !
- queue !
- tee name=tee
-
- tee. ! queue ! interaudiosink channel=audio_mix_out
- """.format(
- caps=self.caps
- )
-
- if Config.getboolean('previews', 'enabled'):
- pipeline += """
- tee. ! queue ! interaudiosink channel=audio_mix_preview
- """
-
- if Config.getboolean('stream-blanker', 'enabled'):
- pipeline += """
- tee. ! queue ! interaudiosink channel=audio_mix_streamblanker
- """
-
- for idx, name in enumerate(self.names):
- pipeline += """
- interaudiosrc channel=audio_{name}_mixer !
- {caps} !
- mix.
- """.format(
- name=name,
- caps=self.caps
- )
-
- self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline)
- self.mixingPipeline = Gst.parse_launch(pipeline)
- self.mixingPipeline.use_clock(Clock)
-
- self.log.debug('Binding Error & End-of-Stream-Signal on Mixing-Pipeline')
- self.mixingPipeline.bus.add_signal_watch()
- self.mixingPipeline.bus.connect("message::eos", self.on_eos)
- self.mixingPipeline.bus.connect("message::error", self.on_error)
-
- self.log.debug('Initializing Mixer-State')
- self.updateMixerState()
-
- self.log.debug('Launching Mixing-Pipeline')
- self.mixingPipeline.set_state(Gst.State.PLAYING)
-
- def updateMixerState(self):
- self.log.info('Updating Mixer-State')
-
- for idx, name in enumerate(self.names):
- volume = int(idx == self.selectedSource)
-
- self.log.debug('Setting Mixerpad %u to volume=%0.2f', idx, volume)
- mixerpad = self.mixingPipeline.get_by_name('mix').get_static_pad('sink_%u' % idx)
- mixerpad.set_property('volume', volume)
-
- def setAudioSource(self, source):
- self.selectedSource = source
- self.updateMixerState()
-
- def getAudioSource(self):
- return self.selectedSource
-
- def on_eos(self, bus, message):
- self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline')
-
- def on_error(self, bus, message):
- self.log.debug('Received Error-Signal on Mixing-Pipeline')
- (error, debug) = message.parse_error()
- self.log.debug('Error-Details: #%u: %s', error.code, debug)
+
+ def __init__(self):
+ self.log = logging.getLogger('AudioMix')
+
+ self.selectedSource = 0
+
+ self.caps = Config.get('mix', 'audiocaps')
+ self.names = Config.getlist('mix', 'sources')
+ self.log.info('Configuring Mixer for %u Sources', len(self.names))
+
+ pipeline = """
+ audiomixer name=mix !
+ {caps} !
+ queue !
+ tee name=tee
+
+ tee. ! queue ! interaudiosink channel=audio_mix_out
+ """.format(
+ caps=self.caps
+ )
+
+ if Config.getboolean('previews', 'enabled'):
+ pipeline += """
+ tee. ! queue ! interaudiosink channel=audio_mix_preview
+ """
+
+ if Config.getboolean('stream-blanker', 'enabled'):
+ pipeline += """
+ tee. ! queue ! interaudiosink channel=audio_mix_streamblanker
+ """
+
+ for idx, name in enumerate(self.names):
+ pipeline += """
+ interaudiosrc channel=audio_{name}_mixer !
+ {caps} !
+ mix.
+ """.format(
+ name=name,
+ caps=self.caps
+ )
+
+ self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline)
+ self.mixingPipeline = Gst.parse_launch(pipeline)
+ self.mixingPipeline.use_clock(Clock)
+
+ self.log.debug('Binding Error & End-of-Stream-Signal '
+ 'on Mixing-Pipeline')
+ self.mixingPipeline.bus.add_signal_watch()
+ self.mixingPipeline.bus.connect("message::eos", self.on_eos)
+ self.mixingPipeline.bus.connect("message::error", self.on_error)
+
+ self.log.debug('Initializing Mixer-State')
+ self.updateMixerState()
+
+ self.log.debug('Launching Mixing-Pipeline')
+ self.mixingPipeline.set_state(Gst.State.PLAYING)
+
+ def updateMixerState(self):
+ self.log.info('Updating Mixer-State')
+
+ for idx, name in enumerate(self.names):
+ volume = int(idx == self.selectedSource)
+
+ self.log.debug('Setting Mixerpad %u to volume=%0.2f', idx, volume)
+ mixerpad = (self.mixingPipeline.get_by_name('mix')
+ .get_static_pad('sink_%u' % idx))
+ mixerpad.set_property('volume', volume)
+
+ def setAudioSource(self, source):
+ self.selectedSource = source
+ self.updateMixerState()
+
+ def getAudioSource(self):
+ return self.selectedSource
+
+ def on_eos(self, bus, message):
+ self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline')
+
+ def on_error(self, bus, message):
+ self.log.debug('Received Error-Signal on Mixing-Pipeline')
+ (error, debug) = message.parse_error()
+ self.log.debug('Error-Details: #%u: %s', error.code, debug)
diff --git a/voctocore/lib/avpreviewoutput.py b/voctocore/lib/avpreviewoutput.py
index d5c2c66..749249b 100644
--- a/voctocore/lib/avpreviewoutput.py
+++ b/voctocore/lib/avpreviewoutput.py
@@ -5,84 +5,87 @@ from lib.config import Config
from lib.tcpmulticonnection import TCPMultiConnection
from lib.clock import Clock
+
class AVPreviewOutput(TCPMultiConnection):
- def __init__(self, channel, port):
- self.log = logging.getLogger('AVPreviewOutput['+channel+']')
- super().__init__(port)
-
- self.channel = channel
-
- if Config.has_option('previews', 'videocaps'):
- vcaps_out = Config.get('previews', 'videocaps')
- else:
- vcaps_out = Config.get('mix', 'videocaps')
-
- deinterlace = ""
- if Config.getboolean('previews', 'deinterlace'):
- deinterlace = "deinterlace mode=interlaced !"
-
- pipeline = """
- intervideosrc channel=video_{channel} !
- {vcaps_in} !
- {deinterlace}
- videoscale !
- videorate !
- {vcaps_out} !
- jpegenc quality=90 !
- queue !
- mux.
-
- interaudiosrc channel=audio_{channel} !
- {acaps} !
- queue !
- mux.
-
- matroskamux
- name=mux
- streamable=true
- writing-app=Voctomix-AVPreviewOutput !
-
- multifdsink
- blocksize=1048576
- buffers-max=500
- sync-method=next-keyframe
- name=fd
- """.format(
- channel=self.channel,
- acaps=Config.get('mix', 'audiocaps'),
- vcaps_in=Config.get('mix', 'videocaps'),
- vcaps_out=vcaps_out,
- deinterlace=deinterlace
- )
-
- self.log.debug('Creating Output-Pipeline:\n%s', pipeline)
- self.outputPipeline = Gst.parse_launch(pipeline)
- self.outputPipeline.use_clock(Clock)
-
- self.log.debug('Binding Error & End-of-Stream-Signal on Output-Pipeline')
- self.outputPipeline.bus.add_signal_watch()
- self.outputPipeline.bus.connect("message::eos", self.on_eos)
- self.outputPipeline.bus.connect("message::error", self.on_error)
-
- self.log.debug('Launching Output-Pipeline')
- self.outputPipeline.set_state(Gst.State.PLAYING)
-
- def on_accepted(self, conn, addr):
- self.log.debug('Adding fd %u to multifdsink', conn.fileno())
- fdsink = self.outputPipeline.get_by_name('fd')
- fdsink.emit('add', conn.fileno())
-
- def on_disconnect(multifdsink, fileno):
- if fileno == conn.fileno():
- self.log.debug('fd %u removed from multifdsink', fileno)
- self.close_connection(conn)
-
- fdsink.connect('client-fd-removed', on_disconnect)
-
- def on_eos(self, bus, message):
- self.log.debug('Received End-of-Stream-Signal on Output-Pipeline')
-
- def on_error(self, bus, message):
- self.log.debug('Received Error-Signal on Output-Pipeline')
- (error, debug) = message.parse_error()
- self.log.debug('Error-Details: #%u: %s', error.code, debug)
+
+ def __init__(self, channel, port):
+ self.log = logging.getLogger('AVPreviewOutput[{}]'.format(channel))
+ super().__init__(port)
+
+ self.channel = channel
+
+ if Config.has_option('previews', 'videocaps'):
+ vcaps_out = Config.get('previews', 'videocaps')
+ else:
+ vcaps_out = Config.get('mix', 'videocaps')
+
+ deinterlace = ""
+ if Config.getboolean('previews', 'deinterlace'):
+ deinterlace = "deinterlace mode=interlaced !"
+
+ pipeline = """
+ intervideosrc channel=video_{channel} !
+ {vcaps_in} !
+ {deinterlace}
+ videoscale !
+ videorate !
+ {vcaps_out} !
+ jpegenc quality=90 !
+ queue !
+ mux.
+
+ interaudiosrc channel=audio_{channel} !
+ {acaps} !
+ queue !
+ mux.
+
+ matroskamux
+ name=mux
+ streamable=true
+ writing-app=Voctomix-AVPreviewOutput !
+
+ multifdsink
+ blocksize=1048576
+ buffers-max=500
+ sync-method=next-keyframe
+ name=fd
+ """.format(
+ channel=self.channel,
+ acaps=Config.get('mix', 'audiocaps'),
+ vcaps_in=Config.get('mix', 'videocaps'),
+ vcaps_out=vcaps_out,
+ deinterlace=deinterlace
+ )
+
+ self.log.debug('Creating Output-Pipeline:\n%s', pipeline)
+ self.outputPipeline = Gst.parse_launch(pipeline)
+ self.outputPipeline.use_clock(Clock)
+
+ self.log.debug('Binding Error & End-of-Stream-Signal '
+ 'on Output-Pipeline')
+ self.outputPipeline.bus.add_signal_watch()
+ self.outputPipeline.bus.connect("message::eos", self.on_eos)
+ self.outputPipeline.bus.connect("message::error", self.on_error)
+
+ self.log.debug('Launching Output-Pipeline')
+ self.outputPipeline.set_state(Gst.State.PLAYING)
+
+ def on_accepted(self, conn, addr):
+ self.log.debug('Adding fd %u to multifdsink', conn.fileno())
+ fdsink = self.outputPipeline.get_by_name('fd')
+ fdsink.emit('add', conn.fileno())
+
+ def on_disconnect(multifdsink, fileno):
+ if fileno == conn.fileno():
+ self.log.debug('fd %u removed from multifdsink', fileno)
+ self.close_connection(conn)
+
+ fdsink.connect('client-fd-removed', on_disconnect)
+
+ def on_eos(self, bus, message):
+ self.log.debug('Received End-of-Stream-Signal on Output-Pipeline')
+
+ def on_error(self, bus, message):
+ self.log.debug('Received Error-Signal on Output-Pipeline')
+ (error, debug) = message.parse_error()
+ self.log.debug('Error-Details: #%u: %s', error.code, debug)
diff --git a/voctocore/lib/avrawoutput.py b/voctocore/lib/avrawoutput.py
index 5523a66..b044e24 100644
--- a/voctocore/lib/avrawoutput.py
+++ b/voctocore/lib/avrawoutput.py
@@ -5,76 +5,78 @@ from lib.config import Config
from lib.tcpmulticonnection import TCPMultiConnection
from lib.clock import Clock
+
class AVRawOutput(TCPMultiConnection):
- def __init__(self, channel, port):
- self.log = logging.getLogger('AVRawOutput['+channel+']')
- super().__init__(port)
-
- self.channel = channel
-
- pipeline = """
- intervideosrc channel=video_{channel} !
- {vcaps} !
- queue !
- mux.
-
- interaudiosrc channel=audio_{channel} !
- {acaps} !
- queue !
- mux.
-
- matroskamux
- name=mux
- streamable=true
- writing-app=Voctomix-AVRawOutput !
-
- multifdsink
- blocksize=1048576
- buffers-max={buffers_max}
- sync-method=next-keyframe
- name=fd
- """.format(
- channel=self.channel,
- acaps=Config.get('mix', 'audiocaps'),
- vcaps=Config.get('mix', 'videocaps'),
- buffers_max=
- Config.get('output-buffers', channel)
- if Config.has_option('output-buffers', channel)
- else 500,
- )
- self.log.debug('Creating Output-Pipeline:\n%s', pipeline)
- self.outputPipeline = Gst.parse_launch(pipeline)
- self.outputPipeline.use_clock(Clock)
-
- self.log.debug('Binding Error & End-of-Stream-Signal on Output-Pipeline')
- self.outputPipeline.bus.add_signal_watch()
- self.outputPipeline.bus.connect("message::eos", self.on_eos)
- self.outputPipeline.bus.connect("message::error", self.on_error)
-
- self.log.debug('Launching Output-Pipeline')
- self.outputPipeline.set_state(Gst.State.PLAYING)
-
- def on_accepted(self, conn, addr):
- self.log.debug('Adding fd %u to multifdsink', conn.fileno())
- fdsink = self.outputPipeline.get_by_name('fd')
- fdsink.emit('add', conn.fileno())
-
- def on_disconnect(multifdsink, fileno):
- if fileno == conn.fileno():
- self.log.debug('fd %u removed from multifdsink', fileno)
- self.close_connection(conn)
-
- def on_about_to_disconnect(multifdsink, fileno, status):
- if fileno == conn.fileno() and status == 3: # Gst.MultiHandleSinkClientStatus.Slow
- self.log.warning('about to remove fd %u from multifdsink because it is too slow!', fileno)
-
- fdsink.connect('client-fd-removed', on_disconnect)
- fdsink.connect('client-removed', on_about_to_disconnect)
-
- def on_eos(self, bus, message):
- self.log.debug('Received End-of-Stream-Signal on Output-Pipeline')
-
- def on_error(self, bus, message):
- self.log.debug('Received Error-Signal on Output-Pipeline')
- (error, debug) = message.parse_error()
- self.log.debug('Error-Details: #%u: %s', error.code, debug)
+
+ def __init__(self, channel, port):
+ self.log = logging.getLogger('AVRawOutput[{}]'.format(channel))
+ super().__init__(port)
+
+ self.channel = channel
+
+ pipeline = """
+ intervideosrc channel=video_{channel} !
+ {vcaps} !
+ queue !
+ mux.
+
+ interaudiosrc channel=audio_{channel} !
+ {acaps} !
+ queue !
+ mux.
+
+ matroskamux
+ name=mux
+ streamable=true
+ writing-app=Voctomix-AVRawOutput !
+
+ multifdsink
+ blocksize=1048576
+ buffers-max={buffers_max}
+ sync-method=next-keyframe
+ name=fd
+ """.format(
+ channel=self.channel,
+ acaps=Config.get('mix', 'audiocaps'),
+ vcaps=Config.get('mix', 'videocaps'),
+ buffers_max=Config.get('output-buffers', channel, fallback=500)
+ )
+ self.log.debug('Creating Output-Pipeline:\n%s', pipeline)
+ self.outputPipeline = Gst.parse_launch(pipeline)
+ self.outputPipeline.use_clock(Clock)
+
+ self.log.debug('Binding Error & End-of-Stream-Signal '
+ 'on Output-Pipeline')
+ self.outputPipeline.bus.add_signal_watch()
+ self.outputPipeline.bus.connect("message::eos", self.on_eos)
+ self.outputPipeline.bus.connect("message::error", self.on_error)
+
+ self.log.debug('Launching Output-Pipeline')
+ self.outputPipeline.set_state(Gst.State.PLAYING)
+
+ def on_accepted(self, conn, addr):
+ self.log.debug('Adding fd %u to multifdsink', conn.fileno())
+ fdsink = self.outputPipeline.get_by_name('fd')
+ fdsink.emit('add', conn.fileno())
+
+ def on_disconnect(multifdsink, fileno):
+ if fileno == conn.fileno():
+ self.log.debug('fd %u removed from multifdsink', fileno)
+ self.close_connection(conn)
+
+ def on_about_to_disconnect(multifdsink, fileno, status):
+ # GST_CLIENT_STATUS_SLOW = 3,
+ if fileno == conn.fileno() and status == 3:
+ self.log.warning('about to remove fd %u from multifdsink '
+ 'because it is too slow!', fileno)
+
+ fdsink.connect('client-fd-removed', on_disconnect)
+ fdsink.connect('client-removed', on_about_to_disconnect)
+
+ def on_eos(self, bus, message):
+ self.log.debug('Received End-of-Stream-Signal on Output-Pipeline')
+
+ def on_error(self, bus, message):
+ self.log.debug('Received Error-Signal on Output-Pipeline')
+ (error, debug) = message.parse_error()
+ self.log.debug('Error-Details: #%u: %s', error.code, debug)
diff --git a/voctocore/lib/avsource.py b/voctocore/lib/avsource.py
index aa604b2..c2e08da 100644
--- a/voctocore/lib/avsource.py
+++ b/voctocore/lib/avsource.py
@@ -5,116 +5,126 @@ from lib.config import Config
from lib.tcpsingleconnection import TCPSingleConnection
from lib.clock import Clock
+
class AVSource(TCPSingleConnection):
- def __init__(self, name, port, outputs=None, has_audio=True, has_video=True):
- self.log = logging.getLogger('AVSource['+name+']')
- super().__init__(port)
-
- if outputs is None:
- outputs = [name]
-
- assert has_audio or has_video
-
- self.name = name
- self.has_audio = has_audio
- self.has_video = has_video
- self.outputs = outputs
-
- def on_accepted(self, conn, addr):
- pipeline = """
- fdsrc fd={fd} blocksize=1048576 !
- queue !
- matroskademux name=demux
- """.format(
- fd=conn.fileno()
- )
-
- if self.has_audio:
- pipeline += """
- demux. !
- {acaps} !
- queue !
- tee name=atee
- """.format(
- acaps=Config.get('mix', 'audiocaps')
- )
-
- for output in self.outputs:
- pipeline += """
- atee. ! queue ! interaudiosink channel=audio_{output}
- """.format(
- output=output
- )
-
- if self.has_video:
- pipeline += """
- demux. !
- {vcaps} !
- queue !
- tee name=vtee
- """.format(
- vcaps=Config.get('mix', 'videocaps')
- )
-
- for output in self.outputs:
- pipeline += """
- vtee. ! queue ! intervideosink channel=video_{output}
- """.format(
- output=output
- )
-
- self.log.debug('Launching Source-Pipeline:\n%s', pipeline)
- self.receiverPipeline = Gst.parse_launch(pipeline)
- self.receiverPipeline.use_clock(Clock)
-
- self.log.debug('Binding End-of-Stream-Signal on Source-Pipeline')
- self.receiverPipeline.bus.add_signal_watch()
- self.receiverPipeline.bus.connect("message::eos", self.on_eos)
- self.receiverPipeline.bus.connect("message::error", self.on_error)
-
- self.all_video_caps = Gst.Caps.from_string('video/x-raw')
- self.video_caps = Gst.Caps.from_string(Config.get('mix', 'videocaps'))
-
- self.all_audio_caps = Gst.Caps.from_string('audio/x-raw')
- self.audio_caps = Gst.Caps.from_string(Config.get('mix', 'audiocaps'))
-
- demux = self.receiverPipeline.get_by_name('demux')
- demux.connect('pad-added', self.on_pad_added)
-
- self.receiverPipeline.set_state(Gst.State.PLAYING)
-
- def on_pad_added(self, demux, src_pad):
- caps = src_pad.query_caps(None)
- self.log.debug('demuxer added pad w/ caps: %s', caps.to_string())
- if caps.can_intersect(self.all_audio_caps):
- self.log.debug('new demuxer-pad is a audio-pad, testing against configured audio-caps')
- if not caps.can_intersect(self.audio_caps):
- self.log.warning('the incoming connection presented a video-stream that is not compatible to the configured caps')
- self.log.warning(' incoming caps: %s', caps.to_string())
- self.log.warning(' configured caps: %s', self.audio_caps.to_string())
-
-
- elif caps.can_intersect(self.all_video_caps):
- self.log.debug('new demuxer-pad is a video-pad, testing against configured video-caps')
- if not caps.can_intersect(self.video_caps):
- self.log.warning('the incoming connection presented a video-stream that is not compatible to the configured caps')
- self.log.warning(' incoming caps: %s', caps.to_string())
- self.log.warning(' configured caps: %s', self.video_caps.to_string())
-
- def on_eos(self, bus, message):
- self.log.debug('Received End-of-Stream-Signal on Source-Pipeline')
- if self.currentConnection is not None:
- self.disconnect()
-
- def on_error(self, bus, message):
- self.log.debug('Received Error-Signal on Source-Pipeline')
- (error, debug) = message.parse_error()
- self.log.debug('Error-Details: #%u: %s', error.code, debug)
-
- if self.currentConnection is not None:
- self.disconnect()
-
- def disconnect(self):
- self.receiverPipeline.set_state(Gst.State.NULL)
- self.receiverPipeline = None
- self.close_connection()
+
+ def __init__(self, name, port, outputs=None,
+ has_audio=True, has_video=True):
+ self.log = logging.getLogger('AVSource[{}]'.format(name))
+ super().__init__(port)
+
+ if outputs is None:
+ outputs = [name]
+
+ assert has_audio or has_video
+
+ self.name = name
+ self.has_audio = has_audio
+ self.has_video = has_video
+ self.outputs = outputs
+
+ def on_accepted(self, conn, addr):
+ pipeline = """
+ fdsrc fd={fd} blocksize=1048576 !
+ queue !
+ matroskademux name=demux
+ """.format(
+ fd=conn.fileno()
+ )
+
+ if self.has_audio:
+ pipeline += """
+ demux. !
+ {acaps} !
+ queue !
+ tee name=atee
+ """.format(
+ acaps=Config.get('mix', 'audiocaps')
+ )
+
+ for output in self.outputs:
+ pipeline += """
+ atee. ! queue ! interaudiosink channel=audio_{output}
+ """.format(
+ output=output
+ )
+
+ if self.has_video:
+ pipeline += """
+ demux. !
+ {vcaps} !
+ queue !
+ tee name=vtee
+ """.format(
+ vcaps=Config.get('mix', 'videocaps')
+ )
+
+ for output in self.outputs:
+ pipeline += """
+ vtee. ! queue ! intervideosink channel=video_{output}
+ """.format(
+ output=output
+ )
+
+ self.log.debug('Launching Source-Pipeline:\n%s', pipeline)
+ self.receiverPipeline = Gst.parse_launch(pipeline)
+ self.receiverPipeline.use_clock(Clock)
+
+ self.log.debug('Binding End-of-Stream-Signal on Source-Pipeline')
+ self.receiverPipeline.bus.add_signal_watch()
+ self.receiverPipeline.bus.connect("message::eos", self.on_eos)
+ self.receiverPipeline.bus.connect("message::error", self.on_error)
+
+ self.all_video_caps = Gst.Caps.from_string('video/x-raw')
+ self.video_caps = Gst.Caps.from_string(Config.get('mix', 'videocaps'))
+
+ self.all_audio_caps = Gst.Caps.from_string('audio/x-raw')
+ self.audio_caps = Gst.Caps.from_string(Config.get('mix', 'audiocaps'))
+
+ demux = self.receiverPipeline.get_by_name('demux')
+ demux.connect('pad-added', self.on_pad_added)
+
+ self.receiverPipeline.set_state(Gst.State.PLAYING)
+
+ def on_pad_added(self, demux, src_pad):
+ caps = src_pad.query_caps(None)
+ self.log.debug('demuxer added pad w/ caps: %s', caps.to_string())
+ if caps.can_intersect(self.all_audio_caps):
+ self.log.debug('new demuxer-pad is a audio-pad, '
+ 'testing against configured audio-caps')
+ if not caps.can_intersect(self.audio_caps):
+ self.log.warning('the incoming connection presented '
+ 'a video-stream that is not compatible '
+ 'to the configured caps')
+ self.log.warning(' incoming caps: %s', caps.to_string())
+ self.log.warning(' configured caps: %s',
+ self.audio_caps.to_string())
+
+ elif caps.can_intersect(self.all_video_caps):
+ self.log.debug('new demuxer-pad is a video-pad, '
+ 'testing against configured video-caps')
+ if not caps.can_intersect(self.video_caps):
+ self.log.warning('the incoming connection presented '
+ 'a video-stream that is not compatible '
+ 'to the configured caps')
+ self.log.warning(' incoming caps: %s', caps.to_string())
+ self.log.warning(' configured caps: %s',
+ self.video_caps.to_string())
+
+ def on_eos(self, bus, message):
+ self.log.debug('Received End-of-Stream-Signal on Source-Pipeline')
+ if self.currentConnection is not None:
+ self.disconnect()
+
+ def on_error(self, bus, message):
+ self.log.debug('Received Error-Signal on Source-Pipeline')
+ (error, debug) = message.parse_error()
+ self.log.debug('Error-Details: #%u: %s', error.code, debug)
+
+ if self.currentConnection is not None:
+ self.disconnect()
+
+ def disconnect(self):
+ self.receiverPipeline.set_state(Gst.State.NULL)
+ self.receiverPipeline = None
+ self.close_connection()
diff --git a/voctocore/lib/commands.py b/voctocore/lib/commands.py
index a5b6ab1..34590b7 100644
--- a/voctocore/lib/commands.py
+++ b/voctocore/lib/commands.py
@@ -6,236 +6,240 @@ from lib.config import Config
from lib.videomix import CompositeModes
from lib.response import NotifyResponse, OkResponse
-def decodeName(items, name_or_id):
- try:
- name_or_id = int(name_or_id)
- if name_or_id < 0 or name_or_id >= len(items):
- raise IndexError("unknown index %d" % name_or_id)
-
- return name_or_id
-
- except ValueError as e:
- try:
- return items.index(name_or_id)
-
- except ValueError as e:
- raise IndexError("unknown name %s" % name_or_id)
-
-def decodeEnumName(enum, name_or_id):
- try:
- name_or_id = int(name_or_id)
- if name_or_id < 0 or name_or_id >= len(enum):
- raise IndexError("unknown index %d" % name_or_id)
-
- return name_or_id
-
- except ValueError as e:
- try:
- return enum[name_or_id]
-
- except KeyError as e:
- raise IndexError("unknown name %s" % name_or_id)
-
-def encodeName(items, id):
- try:
- return items[id]
- except IndexError as e:
- raise IndexError("unknown index %d" % id)
-
-def encodeEnumName(enum, id):
- try:
- return enum(id).name
- except ValueError as e:
- raise IndexError("unknown index %d" % id)
-
-class ControlServerCommands(object):
- def __init__(self, pipeline):
- self.log = logging.getLogger('ControlServerCommands')
-
- self.pipeline = pipeline
- self.sources = Config.getlist('mix', 'sources')
- self.blankerSources = Config.getlist('stream-blanker', 'sources')
-
- # Commands are defined below. Errors are sent to the clients by throwing
- # exceptions, they will be turned into messages outside.
-
- def message(self, *args):
- """sends a message through the control-server, which can be received by
- user-defined scripts. does not change the state of the voctocore."""
- return NotifyResponse('message', *args)
-
- def help(self):
- helplines = []
-
- helplines.append("Commands:")
- for name, func in ControlServerCommands.__dict__.items():
- if name[0] == '_':
- continue
-
- if not func.__code__:
- continue
-
- params = inspect.signature(func).parameters
- params = [str(info) for name, info in params.items()]
- params = ', '.join(params[1:])
-
- command_sig = '\t' + name
-
- if params:
- command_sig += ': '+params
-
- if func.__doc__:
- command_sig += '\n'+'\n'.join(
- ['\t\t'+line.strip() for line in func.__doc__.splitlines()])+'\n'
-
- helplines.append(command_sig)
+def decodeName(items, name_or_id):
+ try:
+ name_or_id = int(name_or_id)
+ if name_or_id < 0 or name_or_id >= len(items):
+ raise IndexError("unknown index %d" % name_or_id)
- helplines.append('\t'+'quit / exit')
+ return name_or_id
- helplines.append("\n")
- helplines.append("Source-Names:")
- for source in self.sources:
- helplines.append("\t"+source)
+ except ValueError as e:
+ try:
+ return items.index(name_or_id)
- helplines.append("\n")
- helplines.append("Stream-Blanker Sources-Names:")
- for source in self.blankerSources:
- helplines.append("\t"+source)
+ except ValueError as e:
+ raise IndexError("unknown name %s" % name_or_id)
- helplines.append("\n")
- helplines.append("Composition-Modes:")
- for mode in CompositeModes:
- helplines.append("\t"+mode.name)
- return OkResponse("\n".join(helplines))
+def decodeEnumName(enum, name_or_id):
+ try:
+ name_or_id = int(name_or_id)
+ if name_or_id < 0 or name_or_id >= len(enum):
+ raise IndexError("unknown index %d" % name_or_id)
- def _get_video_status(self):
- a = encodeName( self.sources, self.pipeline.vmix.getVideoSourceA() )
- b = encodeName( self.sources, self.pipeline.vmix.getVideoSourceB() )
- return [a, b]
+ return name_or_id
- def get_video(self):
- """gets the current video-status, consisting of the name of
- video-source A and video-source B"""
- status = self._get_video_status()
- return OkResponse('video_status', *status)
+ except ValueError as e:
+ try:
+ return enum[name_or_id]
- def set_video_a(self, src_name_or_id):
- """sets the video-source A to the supplied source-name or source-id,
- swapping A and B if the supplied source is currently used as
- video-source B"""
- src_id = decodeName(self.sources, src_name_or_id)
- self.pipeline.vmix.setVideoSourceA(src_id)
+ except KeyError as e:
+ raise IndexError("unknown name %s" % name_or_id)
- status = self._get_video_status()
- return NotifyResponse('video_status', *status)
- def set_video_b(self, src_name_or_id):
- """sets the video-source B to the supplied source-name or source-id,
- swapping A and B if the supplied source is currently used as
- video-source A"""
- src_id = decodeName(self.sources, src_name_or_id)
- self.pipeline.vmix.setVideoSourceB(src_id)
+def encodeName(items, id):
+ try:
+ return items[id]
+ except IndexError as e:
+ raise IndexError("unknown index %d" % id)
- status = self._get_video_status()
- return NotifyResponse('video_status', *status)
+def encodeEnumName(enum, id):
+ try:
+ return enum(id).name
+ except ValueError as e:
+ raise IndexError("unknown index %d" % id)
- def _get_audio_status(self):
- src_id = self.pipeline.amix.getAudioSource()
- return encodeName(self.sources, src_id)
- def get_audio(self):
- """gets the name of the current audio-source"""
- status = self._get_audio_status()
- return OkResponse('audio_status', status)
+class ControlServerCommands(object):
- def set_audio(self, src_name_or_id):
- """sets the audio-source to the supplied source-name or source-id"""
- src_id = decodeName(self.sources, src_name_or_id)
- self.pipeline.amix.setAudioSource(src_id)
+ def __init__(self, pipeline):
+ self.log = logging.getLogger('ControlServerCommands')
- status = self._get_audio_status()
- return NotifyResponse('audio_status', status)
+ self.pipeline = pipeline
+ self.sources = Config.getlist('mix', 'sources')
+ self.blankerSources = Config.getlist('stream-blanker', 'sources')
- def _get_composite_status(self):
- mode = self.pipeline.vmix.getCompositeMode()
- return encodeEnumName(CompositeModes, mode)
+ # Commands are defined below. Errors are sent to the clients by throwing
+ # exceptions, they will be turned into messages outside.
- def get_composite_mode(self):
- """gets the name of the current composite-mode"""
- status = self._get_composite_status()
- return OkResponse('composite_mode', status)
+ def message(self, *args):
+ """sends a message through the control-server, which can be received by
+ user-defined scripts. does not change the state of the voctocore."""
+ return NotifyResponse('message', *args)
- def set_composite_mode(self, mode_name_or_id):
- """sets the name of the id of the composite-mode"""
- mode = decodeEnumName(CompositeModes, mode_name_or_id)
- self.pipeline.vmix.setCompositeMode(mode)
+ def help(self):
+ helplines = []
- composite_status = self._get_composite_status()
- video_status = self._get_video_status()
- return [
- NotifyResponse('composite_mode', composite_status),
- NotifyResponse('video_status', *video_status)
- ]
-
+ helplines.append("Commands:")
+ for name, func in ControlServerCommands.__dict__.items():
+ if name[0] == '_':
+ continue
- def set_videos_and_composite(self, src_a_name_or_id, src_b_name_or_id, mode_name_or_id):
- """sets the A- and the B-source synchronously with the composition-mode
- all parametets can be set to "*" which will leave them unchanged."""
- if src_a_name_or_id != '*':
- src_a_id = decodeName(self.sources, src_a_name_or_id)
- self.pipeline.vmix.setVideoSourceA(src_a_id)
+ if not func.__code__:
+ continue
- if src_b_name_or_id != '*':
- src_b_id = decodeName(self.sources, src_b_name_or_id)
- self.pipeline.vmix.setVideoSourceB(src_b_id)
-
- if mode_name_or_id != '*':
- mode = decodeEnumName(CompositeModes, mode_name_or_id)
- self.pipeline.vmix.setCompositeMode(mode)
-
- composite_status = self._get_composite_status()
- video_status = self._get_video_status()
-
- return [
- NotifyResponse('composite_mode', composite_status),
- NotifyResponse('video_status', *video_status)
- ]
-
-
- def _get_stream_status(self):
- blankSource = self.pipeline.streamblanker.blankSource
- if blankSource is None:
- return ('live',)
-
- return 'blank', encodeName(self.blankerSources, blankSource)
-
- def get_stream_status(self):
- """gets the current streamblanker-status"""
- status = self._get_stream_status()
- return OkResponse('stream_status', *status)
-
- def set_stream_blank(self, source_name_or_id):
- """sets the streamblanker-status to blank with the specified
- blanker-source-name or -id"""
- src_id = decodeName(self.blankerSources, source_name_or_id)
- self.pipeline.streamblanker.setBlankSource(src_id)
-
- status = self._get_stream_status()
- return NotifyResponse('stream_status', *status)
+ params = inspect.signature(func).parameters
+ params = [str(info) for name, info in params.items()]
+ params = ', '.join(params[1:])
- def set_stream_live(self):
- """sets the streamblanker-status to live"""
- self.pipeline.streamblanker.setBlankSource(None)
+ command_sig = '\t' + name
- status = self._get_stream_status()
- return NotifyResponse('stream_status', *status)
+ if params:
+ command_sig += ': ' + params
+
+ if func.__doc__:
+ command_sig += '\n\t\t{}\n'.format('\n\t\t'.join(
+ [line.strip() for line in func.__doc__.splitlines()]
+ ))
+
+ helplines.append(command_sig)
+
+ helplines.append('\t' + 'quit / exit')
+
+ helplines.append("\n")
+ helplines.append("Source-Names:")
+ for source in self.sources:
+ helplines.append("\t" + source)
+ helplines.append("\n")
+ helplines.append("Stream-Blanker Sources-Names:")
+ for source in self.blankerSources:
+ helplines.append("\t" + source)
+
+ helplines.append("\n")
+ helplines.append("Composition-Modes:")
+ for mode in CompositeModes:
+ helplines.append("\t" + mode.name)
+
+ return OkResponse("\n".join(helplines))
- def get_config(self):
- """returns the parsed server-config"""
- confdict = {header: dict(section) for header, section in dict(Config).items()}
- return OkResponse('server_config', json.dumps(confdict))
+ def _get_video_status(self):
+ a = encodeName(self.sources, self.pipeline.vmix.getVideoSourceA())
+ b = encodeName(self.sources, self.pipeline.vmix.getVideoSourceB())
+ return [a, b]
+
+ def get_video(self):
+ """gets the current video-status, consisting of the name of
+ video-source A and video-source B"""
+ status = self._get_video_status()
+ return OkResponse('video_status', *status)
+
+ def set_video_a(self, src_name_or_id):
+ """sets the video-source A to the supplied source-name or source-id,
+ swapping A and B if the supplied source is currently used as
+ video-source B"""
+ src_id = decodeName(self.sources, src_name_or_id)
+ self.pipeline.vmix.setVideoSourceA(src_id)
+
+ status = self._get_video_status()
+ return NotifyResponse('video_status', *status)
+
+ def set_video_b(self, src_name_or_id):
+ """sets the video-source B to the supplied source-name or source-id,
+ swapping A and B if the supplied source is currently used as
+ video-source A"""
+ src_id = decodeName(self.sources, src_name_or_id)
+ self.pipeline.vmix.setVideoSourceB(src_id)
+
+ status = self._get_video_status()
+ return NotifyResponse('video_status', *status)
+
+ def _get_audio_status(self):
+ src_id = self.pipeline.amix.getAudioSource()
+ return encodeName(self.sources, src_id)
+
+ def get_audio(self):
+ """gets the name of the current audio-source"""
+ status = self._get_audio_status()
+ return OkResponse('audio_status', status)
+
+ def set_audio(self, src_name_or_id):
+ """sets the audio-source to the supplied source-name or source-id"""
+ src_id = decodeName(self.sources, src_name_or_id)
+ self.pipeline.amix.setAudioSource(src_id)
+
+ status = self._get_audio_status()
+ return NotifyResponse('audio_status', status)
+
+ def _get_composite_status(self):
+ mode = self.pipeline.vmix.getCompositeMode()
+ return encodeEnumName(CompositeModes, mode)
+
+ def get_composite_mode(self):
+ """gets the name of the current composite-mode"""
+ status = self._get_composite_status()
+ return OkResponse('composite_mode', status)
+
+ def set_composite_mode(self, mode_name_or_id):
+ """sets the name of the id of the composite-mode"""
+ mode = decodeEnumName(CompositeModes, mode_name_or_id)
+ self.pipeline.vmix.setCompositeMode(mode)
+
+ composite_status = self._get_composite_status()
+ video_status = self._get_video_status()
+ return [
+ NotifyResponse('composite_mode', composite_status),
+ NotifyResponse('video_status', *video_status)
+ ]
+
+ def set_videos_and_composite(self, src_a_name_or_id, src_b_name_or_id,
+ mode_name_or_id):
+ """sets the A- and the B-source synchronously with the composition-mode
+ all parametets can be set to "*" which will leave them unchanged."""
+ if src_a_name_or_id != '*':
+ src_a_id = decodeName(self.sources, src_a_name_or_id)
+ self.pipeline.vmix.setVideoSourceA(src_a_id)
+
+ if src_b_name_or_id != '*':
+ src_b_id = decodeName(self.sources, src_b_name_or_id)
+ self.pipeline.vmix.setVideoSourceB(src_b_id)
+
+ if mode_name_or_id != '*':
+ mode = decodeEnumName(CompositeModes, mode_name_or_id)
+ self.pipeline.vmix.setCompositeMode(mode)
+
+ composite_status = self._get_composite_status()
+ video_status = self._get_video_status()
+
+ return [
+ NotifyResponse('composite_mode', composite_status),
+ NotifyResponse('video_status', *video_status)
+ ]
+
+ def _get_stream_status(self):
+ blankSource = self.pipeline.streamblanker.blankSource
+ if blankSource is None:
+ return ('live',)
+
+ return 'blank', encodeName(self.blankerSources, blankSource)
+
+ def get_stream_status(self):
+ """gets the current streamblanker-status"""
+ status = self._get_stream_status()
+ return OkResponse('stream_status', *status)
+
+ def set_stream_blank(self, source_name_or_id):
+ """sets the streamblanker-status to blank with the specified
+ blanker-source-name or -id"""
+ src_id = decodeName(self.blankerSources, source_name_or_id)
+ self.pipeline.streamblanker.setBlankSource(src_id)
+
+ status = self._get_stream_status()
+ return NotifyResponse('stream_status', *status)
+
+ def set_stream_live(self):
+ """sets the streamblanker-status to live"""
+ self.pipeline.streamblanker.setBlankSource(None)
+
+ status = self._get_stream_status()
+ return NotifyResponse('stream_status', *status)
+
+ def get_config(self):
+ """returns the parsed server-config"""
+ confdict = {header: dict(section)
+ for header, section in dict(Config).items()}
+ return OkResponse('server_config', json.dumps(confdict))
diff --git a/voctocore/lib/config.py b/voctocore/lib/config.py
index df0ff9a..388778e 100644
--- a/voctocore/lib/config.py
+++ b/voctocore/lib/config.py
@@ -5,29 +5,32 @@ from lib.args import Args
__all__ = ['Config']
+
def getlist(self, section, option):
- return [x.strip() for x in self.get(section, option).split(',')]
+ return [x.strip() for x in self.get(section, option).split(',')]
SafeConfigParser.getlist = getlist
files = [
- os.path.join(os.path.dirname(os.path.realpath(__file__)), '../default-config.ini'),
- os.path.join(os.path.dirname(os.path.realpath(__file__)), '../config.ini'),
- '/etc/voctomix/voctocore.ini',
- '/etc/voctomix.ini', # deprecated
- '/etc/voctocore.ini',
- os.path.expanduser('~/.voctomix.ini'), # deprecated
- os.path.expanduser('~/.voctocore.ini'),
+ os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ '../default-config.ini'),
+ os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ '../config.ini'),
+ '/etc/voctomix/voctocore.ini',
+ '/etc/voctomix.ini', # deprecated
+ '/etc/voctocore.ini',
+ os.path.expanduser('~/.voctomix.ini'), # deprecated
+ os.path.expanduser('~/.voctocore.ini'),
]
if Args.ini_file is not None:
- files.append(Args.ini_file)
+ files.append(Args.ini_file)
Config = SafeConfigParser()
readfiles = Config.read(files)
log = logging.getLogger('ConfigParser')
log.debug('considered config-files: \n%s',
- "\n".join(["\t\t"+os.path.normpath(file) for file in files]) )
+ "\n".join(["\t\t" + os.path.normpath(file) for file in files]))
log.debug('successfully parsed config-files: \n%s',
- "\n".join(["\t\t"+os.path.normpath(file) for file in readfiles]) )
+ "\n".join(["\t\t" + os.path.normpath(file) for file in readfiles]))
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index 4c506be..42ae4b2 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -1,4 +1,6 @@
-import socket, logging, traceback
+import socket
+import logging
+import traceback
from queue import Queue
from gi.repository import GObject
@@ -6,164 +8,171 @@ from lib.commands import ControlServerCommands
from lib.tcpmulticonnection import TCPMultiConnection
from lib.response import NotifyResponse, OkResponse
-class ControlServer(TCPMultiConnection):
- def __init__(self, pipeline):
- '''Initialize server and start listening.'''
- self.log = logging.getLogger('ControlServer')
- super().__init__(port=9999)
-
- self.command_queue = Queue()
- self.commands = ControlServerCommands(pipeline)
-
- def on_accepted(self, conn, addr):
- '''Asynchronous connection listener. Starts a handler for each connection.'''
- self.log.debug('setting gobject io-watch on connection')
- GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
-
- def on_data(self, conn, _, leftovers, *args):
- '''Asynchronous connection handler. Pushes data from socket
- into command queue linewise'''
- close_after = False
- try:
- while True:
- try:
- leftovers.append(conn.recv(4096).decode(errors='replace'))
- if len(leftovers[-1]) == 0:
- self.log.info("Socket was closed")
- leftovers.pop()
- close_after = True
- break
-
- except UnicodeDecodeError as e:
- continue
- except:
- pass
+class ControlServer(TCPMultiConnection):
- data = "".join(leftovers)
- del leftovers[:]
+ def __init__(self, pipeline):
+ '''Initialize server and start listening.'''
+ self.log = logging.getLogger('ControlServer')
+ super().__init__(port=9999)
- lines = data.split('\n')
- for line in lines[:-1]:
- self.log.debug("got line: %r", line)
+ self.command_queue = Queue()
+
+ self.commands = ControlServerCommands(pipeline)
- line = line.strip()
- # 'quit' = remote wants us to close the connection
- if line == 'quit' or line == 'exit':
- self.log.info("Client asked us to close the Connection")
- self.close_connection(conn)
- return False
+ def on_accepted(self, conn, addr):
+ '''Asynchronous connection listener.
+ Starts a handler for each connection.'''
+ self.log.debug('setting gobject io-watch on connection')
+ GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
+
+ def on_data(self, conn, _, leftovers, *args):
+ '''Asynchronous connection handler.
+ Pushes data from socket into command queue linewise'''
+ close_after = False
+ try:
+ while True:
+ try:
+ leftovers.append(conn.recv(4096).decode(errors='replace'))
+ if len(leftovers[-1]) == 0:
+ self.log.info("Socket was closed")
+ leftovers.pop()
+ close_after = True
+ break
- self.log.debug('re-starting on_loop scheduling')
- GObject.idle_add(self.on_loop)
+ except UnicodeDecodeError as e:
+ continue
+ except:
+ pass
- self.command_queue.put((line, conn))
+ data = "".join(leftovers)
+ del leftovers[:]
- if close_after:
- self.close_connection(conn)
- return False
-
- if lines[-1] != '':
- self.log.debug("remaining %r", lines[-1])
-
- leftovers.append(lines[-1])
- return True
-
- def on_loop(self):
- '''Command handler. Processes commands in the command queue whenever
- nothing else is happening (registered as GObject idle callback)'''
-
- self.log.debug('on_loop called')
-
- if self.command_queue.empty():
- self.log.debug('command_queue is empty again, stopping on_loop scheduling')
- return False
-
- line, requestor = self.command_queue.get()
-
- words = line.split()
- if len(words) < 1:
- self.log.debug('command_queue is empty again, stopping on_loop scheduling')
- return True
-
- command = words[0]
- args = words[1:]
-
- self.log.info("processing command %r with args %s", command, args)
-
- response = None
- try:
- # deny calling private methods
- if command[0] == '_':
- self.log.info('private methods are not callable')
- raise KeyError()
-
- command_function = self.commands.__class__.__dict__[command]
-
- except KeyError as e:
- self.log.info("received unknown command %s", command)
- response = "error unknown command %s\n" % command
-
- else:
- try:
- responseObject = command_function(self.commands, *args)
-
- except Exception as e:
- message = str(e) or "<no message>"
- response = "error %s\n" % message
-
- else:
- if isinstance(responseObject, NotifyResponse):
- responseObject = [ responseObject ]
-
- if isinstance(responseObject, list):
- for obj in responseObject:
- signal = "%s\n" % str(obj)
- for conn in self.currentConnections:
- self._schedule_write(conn, signal)
- else:
- response = "%s\n" % str(responseObject)
-
- finally:
- if response is not None and requestor in self.currentConnections:
- self._schedule_write(requestor, response)
-
- return False
+ lines = data.split('\n')
+ for line in lines[:-1]:
+ self.log.debug("got line: %r", line)
- def _schedule_write(self, conn, message):
- queue = self.currentConnections[conn]
+ line = line.strip()
+ # 'quit' = remote wants us to close the connection
+ if line == 'quit' or line == 'exit':
+ self.log.info("Client asked us to close the Connection")
+ self.close_connection(conn)
+ return False
- self.log.debug('re-starting on_write[%u] scheduling', conn.fileno())
- GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
+ self.log.debug('re-starting on_loop scheduling')
+ GObject.idle_add(self.on_loop)
+
+ self.command_queue.put((line, conn))
+
+ if close_after:
+ self.close_connection(conn)
+ return False
- queue.put(message)
+ if lines[-1] != '':
+ self.log.debug("remaining %r", lines[-1])
+
+ leftovers.append(lines[-1])
+ return True
+
+ def on_loop(self):
+ '''Command handler. Processes commands in the command queue whenever
+ nothing else is happening (registered as GObject idle callback)'''
+
+ self.log.debug('on_loop called')
+
+ if self.command_queue.empty():
+ self.log.debug('command_queue is empty again, '
+ 'stopping on_loop scheduling')
+ return False
+
+ line, requestor = self.command_queue.get()
+
+ words = line.split()
+ if len(words) < 1:
+ self.log.debug('command_queue is empty again, '
+ 'stopping on_loop scheduling')
+ return True
+
+ command = words[0]
+ args = words[1:]
+
+ self.log.info("processing command %r with args %s", command, args)
+
+ response = None
+ try:
+ # deny calling private methods
+ if command[0] == '_':
+ self.log.info('private methods are not callable')
+ raise KeyError()
+
+ command_function = self.commands.__class__.__dict__[command]
+
+ except KeyError as e:
+ self.log.info("received unknown command %s", command)
+ response = "error unknown command %s\n" % command
+
+ else:
+ try:
+ responseObject = command_function(self.commands, *args)
+
+ except Exception as e:
+ message = str(e) or "<no message>"
+ response = "error %s\n" % message
+
+ else:
+ if isinstance(responseObject, NotifyResponse):
+ responseObject = [responseObject]
+
+ if isinstance(responseObject, list):
+ for obj in responseObject:
+ signal = "%s\n" % str(obj)
+ for conn in self.currentConnections:
+ self._schedule_write(conn, signal)
+ else:
+ response = "%s\n" % str(responseObject)
+
+ finally:
+ if response is not None and requestor in self.currentConnections:
+ self._schedule_write(requestor, response)
+
+ return False
+
+ def _schedule_write(self, conn, message):
+ queue = self.currentConnections[conn]
- def on_write(self, conn, *args):
- self.log.debug('on_write[%u] called', conn.fileno())
+ self.log.debug('re-starting on_write[%u] scheduling', conn.fileno())
+ GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
- try:
- queue = self.currentConnections[conn]
- except KeyError:
- return False
+ queue.put(message)
- if queue.empty():
- self.log.debug('write_queue[%u] is empty again, stopping on_write scheduling', conn.fileno())
- return False
+ def on_write(self, conn, *args):
+ self.log.debug('on_write[%u] called', conn.fileno())
- message = queue.get()
- try:
- conn.send(message.encode())
- except Exception as e:
- self.log.warn(e)
+ try:
+ queue = self.currentConnections[conn]
+ except KeyError:
+ return False
+
+ if queue.empty():
+ self.log.debug('write_queue[%u] is empty again, '
+ 'stopping on_write scheduling',
+ conn.fileno())
+ return False
- return True
+ message = queue.get()
+ try:
+ conn.send(message.encode())
+ except Exception as e:
+ self.log.warn(e)
+
+ return True
- def notify_all(self, msg):
- try:
- words = msg.split()
- words[-1] = self.commands.encodeSourceName(int(words[-1]))
- msg = " ".join(words) + '\n'
- for conn in self.currentConnections:
- self._schedule_write(conn, msg)
- except Exception as e:
- self.log.debug("error during notify: %s", e)
+ def notify_all(self, msg):
+ try:
+ words = msg.split()
+ words[-1] = self.commands.encodeSourceName(int(words[-1]))
+ msg = " ".join(words) + '\n'
+ for conn in self.currentConnections:
+ self._schedule_write(conn, msg)
+ except Exception as e:
+ self.log.debug("error during notify: %s", e)
diff --git a/voctocore/lib/loghandler.py b/voctocore/lib/loghandler.py
index 2cc7ceb..6efb890 100644
--- a/voctocore/lib/loghandler.py
+++ b/voctocore/lib/loghandler.py
@@ -1,41 +1,57 @@
-import logging, time
+import logging
+import time
-class LogFormatter(logging.Formatter):
- def __init__(self, docolor, timestamps=False):
- super().__init__()
- self.docolor = docolor
- self.timestamps = timestamps
-
- def formatMessage(self, record):
- if self.docolor:
- c_lvl = 33
- c_mod = 32
- c_msg = 0
-
- if record.levelno == logging.WARNING:
- c_lvl = 31
- #c_mod = 33
- c_msg = 33
-
- elif record.levelno > logging.WARNING:
- c_lvl = 31
- c_mod = 31
- c_msg = 31
- fmt = '\x1b['+str(c_lvl)+'m%(levelname)8s\x1b[0m \x1b['+str(c_mod)+'m%(name)s\x1b['+str(c_msg)+'m: %(message)s\x1b[0m'
- else:
- fmt = '%(levelname)8s %(name)s: %(message)s'
-
- if self.timestamps:
- fmt = '%(asctime)s '+fmt
-
- if not 'asctime' in record.__dict__:
- record.__dict__['asctime']=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(record.__dict__['created']))
+class LogFormatter(logging.Formatter):
- return fmt % record.__dict__
+ def __init__(self, docolor, timestamps=False):
+ super().__init__()
+ self.docolor = docolor
+ self.timestamps = timestamps
+
+ def formatMessage(self, record):
+ if self.docolor:
+ c_lvl = 33
+ c_mod = 32
+ c_msg = 0
+
+ if record.levelno == logging.WARNING:
+ c_lvl = 31
+ # c_mod = 33
+ c_msg = 33
+
+ elif record.levelno > logging.WARNING:
+ c_lvl = 31
+ c_mod = 31
+ c_msg = 31
+
+ fmt = ''.join([
+ '\x1b[%dm' % c_lvl, # set levelname color
+ '%(levelname)8s', # print levelname
+ '\x1b[0m', # reset formatting
+ '\x1b[%dm' % c_mod, # set name color
+ ' %(name)s', # print name
+ '\x1b[%dm' % c_msg, # set message color
+ ': %(message)s', # print message
+ '\x1b[0m' # reset formatting
+ ])
+ else:
+ fmt = '%(levelname)8s %(name)s: %(message)s'
+
+ if self.timestamps:
+ fmt = '%(asctime)s ' + fmt
+
+ if 'asctime' not in record.__dict__:
+ record.__dict__['asctime'] = time.strftime(
+ "%Y-%m-%d %H:%M:%S",
+ time.localtime(record.__dict__['created'])
+ )
+
+ return fmt % record.__dict__
class LogHandler(logging.StreamHandler):
- def __init__(self, docolor, timestamps):
- super().__init__()
- self.setFormatter(LogFormatter(docolor,timestamps))
+
+ def __init__(self, docolor, timestamps):
+ super().__init__()
+ self.setFormatter(LogFormatter(docolor, timestamps))
diff --git a/voctocore/lib/pipeline.py b/voctocore/lib/pipeline.py
index a13f1d2..1a1349e 100644
--- a/voctocore/lib/pipeline.py
+++ b/voctocore/lib/pipeline.py
@@ -10,93 +10,99 @@ from lib.videomix import VideoMix
from lib.audiomix import AudioMix
from lib.streamblanker import StreamBlanker
-class Pipeline(object):
- """mixing, streaming and encoding pipeline constuction and control"""
-
- def __init__(self):
- self.log = logging.getLogger('Pipeline')
- self.log.info('Video-Caps configured to: %s', Config.get('mix', 'videocaps'))
- self.log.info('Audio-Caps configured to: %s', Config.get('mix', 'audiocaps'))
-
- names = Config.getlist('mix', 'sources')
- if len(names) < 1:
- raise RuntimeError("At least one AVSource must be configured!")
-
- self.sources = []
- self.mirrors = []
- self.previews = []
- self.sbsources = []
-
- self.log.info('Creating %u Creating AVSources: %s', len(names), names)
- for idx, name in enumerate(names):
- port = 10000 + idx
- self.log.info('Creating AVSource %s at tcp-port %u', name, port)
-
- outputs = [name+'_mixer', name+'_mirror']
- if Config.getboolean('previews', 'enabled'):
- outputs.append(name+'_preview')
-
- source = AVSource(name, port, outputs=outputs)
- self.sources.append(source)
-
-
- port = 13000 + idx
- self.log.info('Creating Mirror-Output for AVSource %s at tcp-port %u', name, port)
-
- mirror = AVRawOutput('%s_mirror' % name, port)
- self.mirrors.append(mirror)
-
-
- if Config.getboolean('previews', 'enabled'):
- port = 14000 + idx
- self.log.info('Creating Preview-Output for AVSource %s at tcp-port %u', name, port)
-
- preview = AVPreviewOutput('%s_preview' % name, port)
- self.previews.append(preview)
-
- self.log.info('Creating Videmixer')
- self.vmix = VideoMix()
-
- self.log.info('Creating Audiomixer')
- self.amix = AudioMix()
-
- port = 16000
- self.log.info('Creating Mixer-Background VSource at tcp-port %u', port)
- self.bgsrc = AVSource('background', port, has_audio=False)
-
- port = 11000
- self.log.info('Creating Mixer-Output at tcp-port %u', port)
- self.mixout = AVRawOutput('mix_out', port)
-
-
- if Config.getboolean('previews', 'enabled'):
- port = 12000
- self.log.info('Creating Preview-Output for AVSource %s at tcp-port %u', name, port)
-
- self.mixpreview = AVPreviewOutput('mix_preview', port)
-
- if Config.getboolean('stream-blanker', 'enabled'):
- names = Config.getlist('stream-blanker', 'sources')
- if len(names) < 1:
- raise RuntimeError("At least one StreamBlanker-Source must be configured or the StreamBlanker disabled!")
- for idx, name in enumerate(names):
- port = 17000 + idx
- self.log.info('Creating StreamBlanker VSource %s at tcp-port %u', name, port)
-
- source = AVSource('%s_streamblanker' % name, port, has_audio=False)
- self.sbsources.append(source)
-
- port = 18000
- self.log.info('Creating StreamBlanker ASource at tcp-port %u', port)
-
- source = AVSource('streamblanker', port, has_video=False)
- self.sbsources.append(source)
-
-
- self.log.info('Creating StreamBlanker')
- self.streamblanker = StreamBlanker()
-
- port = 15000
- self.log.info('Creating StreamBlanker-Output at tcp-port %u', port)
- self.streamout = AVRawOutput('streamblanker_out', port)
+class Pipeline(object):
+ """mixing, streaming and encoding pipeline constuction and control"""
+
+ def __init__(self):
+ self.log = logging.getLogger('Pipeline')
+ self.log.info('Video-Caps configured to: %s',
+ Config.get('mix', 'videocaps'))
+ self.log.info('Audio-Caps configured to: %s',
+ Config.get('mix', 'audiocaps'))
+
+ names = Config.getlist('mix', 'sources')
+ if len(names) < 1:
+ raise RuntimeError("At least one AVSource must be configured!")
+
+ self.sources = []
+ self.mirrors = []
+ self.previews = []
+ self.sbsources = []
+
+ self.log.info('Creating %u Creating AVSources: %s', len(names), names)
+ for idx, name in enumerate(names):
+ port = 10000 + idx
+ self.log.info('Creating AVSource %s at tcp-port %u', name, port)
+
+ outputs = [name + '_mixer', name + '_mirror']
+ if Config.getboolean('previews', 'enabled'):
+ outputs.append(name + '_preview')
+
+ source = AVSource(name, port, outputs=outputs)
+ self.sources.append(source)
+
+ port = 13000 + idx
+ self.log.info('Creating Mirror-Output for AVSource %s '
+ 'at tcp-port %u', name, port)
+
+ mirror = AVRawOutput('%s_mirror' % name, port)
+ self.mirrors.append(mirror)
+
+ if Config.getboolean('previews', 'enabled'):
+ port = 14000 + idx
+ self.log.info('Creating Preview-Output for AVSource %s '
+ 'at tcp-port %u', name, port)
+
+ preview = AVPreviewOutput('%s_preview' % name, port)
+ self.previews.append(preview)
+
+ self.log.info('Creating Videmixer')
+ self.vmix = VideoMix()
+
+ self.log.info('Creating Audiomixer')
+ self.amix = AudioMix()
+
+ port = 16000
+ self.log.info('Creating Mixer-Background VSource at tcp-port %u', port)
+ self.bgsrc = AVSource('background', port, has_audio=False)
+
+ port = 11000
+ self.log.info('Creating Mixer-Output at tcp-port %u', port)
+ self.mixout = AVRawOutput('mix_out', port)
+
+ if Config.getboolean('previews', 'enabled'):
+ port = 12000
+ self.log.info('Creating Preview-Output for AVSource %s '
+ 'at tcp-port %u', name, port)
+
+ self.mixpreview = AVPreviewOutput('mix_preview', port)
+
+ if Config.getboolean('stream-blanker', 'enabled'):
+ names = Config.getlist('stream-blanker', 'sources')
+ if len(names) < 1:
+ raise RuntimeError('At least one StreamBlanker-Source must '
+ 'be configured or the '
+ 'StreamBlanker disabled!')
+ for idx, name in enumerate(names):
+ port = 17000 + idx
+ self.log.info('Creating StreamBlanker VSource %s '
+ 'at tcp-port %u', name, port)
+
+ source = AVSource('{}_streamblanker'.format(name), port,
+ has_audio=False)
+ self.sbsources.append(source)
+
+ port = 18000
+ self.log.info('Creating StreamBlanker ASource at tcp-port %u',
+ port)
+
+ source = AVSource('streamblanker', port, has_video=False)
+ self.sbsources.append(source)
+
+ self.log.info('Creating StreamBlanker')
+ self.streamblanker = StreamBlanker()
+
+ port = 15000
+ self.log.info('Creating StreamBlanker-Output at tcp-port %u', port)
+ self.streamout = AVRawOutput('streamblanker_out', port)
diff --git a/voctocore/lib/response.py b/voctocore/lib/response.py
index 9f4ad84..b5f7578 100644
--- a/voctocore/lib/response.py
+++ b/voctocore/lib/response.py
@@ -1,14 +1,16 @@
class Response(object):
- def __init__(self, *args):
- self.args = args
- def __str__(self):
- return " ".join(map(str, self.args))
+ def __init__(self, *args):
+ self.args = args
+
+ def __str__(self):
+ return " ".join(map(str, self.args))
class OkResponse(Response):
- pass
+ pass
+
class NotifyResponse(Response):
- pass
+ pass
diff --git a/voctocore/lib/streamblanker.py b/voctocore/lib/streamblanker.py
index fea3d6a..67af0cd 100644
--- a/voctocore/lib/streamblanker.py
+++ b/voctocore/lib/streamblanker.py
@@ -5,97 +5,104 @@ from enum import Enum
from lib.config import Config
from lib.clock import Clock
-class StreamBlanker(object):
- log = logging.getLogger('StreamBlanker')
-
- def __init__(self):
- self.acaps = Config.get('mix', 'audiocaps')
- self.vcaps = Config.get('mix', 'videocaps')
-
- self.names = Config.getlist('stream-blanker', 'sources')
- self.log.info('Configuring StreamBlanker video %u Sources', len(self.names))
-
- pipeline = """
- compositor name=vmix !
- {vcaps} !
- intervideosink channel=video_streamblanker_out
-
- audiomixer name=amix !
- {acaps} !
- interaudiosink channel=audio_streamblanker_out
-
-
- intervideosrc channel=video_mix_streamblanker !
- {vcaps} !
- vmix.
-
- interaudiosrc channel=audio_mix_streamblanker !
- {acaps} !
- amix.
-
-
- interaudiosrc channel=audio_streamblanker !
- {acaps} !
- amix.
- """.format(
- acaps=self.acaps,
- vcaps=self.vcaps
- )
-
- for name in self.names:
- pipeline += """
- intervideosrc channel=video_{name}_streamblanker !
- {vcaps} !
- vmix.
- """.format(
- name=name,
- vcaps=self.vcaps
- )
-
- self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline)
- self.mixingPipeline = Gst.parse_launch(pipeline)
- self.mixingPipeline.use_clock(Clock)
- self.log.debug('Binding Error & End-of-Stream-Signal on Mixing-Pipeline')
- self.mixingPipeline.bus.add_signal_watch()
- self.mixingPipeline.bus.connect("message::eos", self.on_eos)
- self.mixingPipeline.bus.connect("message::error", self.on_error)
-
- self.log.debug('Initializing Mixer-State')
- self.blankSource = None
- self.applyMixerState()
-
- self.log.debug('Launching Mixing-Pipeline')
- self.mixingPipeline.set_state(Gst.State.PLAYING)
-
- def on_eos(self, bus, message):
- self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline')
-
- def on_error(self, bus, message):
- self.log.debug('Received Error-Signal on Mixing-Pipeline')
- (error, debug) = message.parse_error()
- self.log.debug('Error-Details: #%u: %s', error.code, debug)
-
-
- def applyMixerState(self):
- self.applyMixerStateAudio()
- self.applyMixerStateVideo()
-
- def applyMixerStateAudio(self):
- mixpad = self.mixingPipeline.get_by_name('amix').get_static_pad('sink_0')
- blankpad = self.mixingPipeline.get_by_name('amix').get_static_pad('sink_1')
-
- mixpad.set_property('volume', int(self.blankSource is None))
- blankpad.set_property('volume', int(self.blankSource is not None))
-
- def applyMixerStateVideo(self):
- mixpad = self.mixingPipeline.get_by_name('vmix').get_static_pad('sink_0')
- mixpad.set_property('alpha', int(self.blankSource is None))
-
- for idx, name in enumerate(self.names):
- blankpad = self.mixingPipeline.get_by_name('vmix').get_static_pad('sink_%u' % (idx+1))
- blankpad.set_property('alpha', int(self.blankSource == idx))
-
- def setBlankSource(self, source):
- self.blankSource = source
- self.applyMixerState()
+class StreamBlanker(object):
+ log = logging.getLogger('StreamBlanker')
+
+ def __init__(self):
+ self.acaps = Config.get('mix', 'audiocaps')
+ self.vcaps = Config.get('mix', 'videocaps')
+
+ self.names = Config.getlist('stream-blanker', 'sources')
+ self.log.info('Configuring StreamBlanker video %u Sources',
+ len(self.names))
+
+ pipeline = """
+ compositor name=vmix !
+ {vcaps} !
+ intervideosink channel=video_streamblanker_out
+
+ audiomixer name=amix !
+ {acaps} !
+ interaudiosink channel=audio_streamblanker_out
+
+
+ intervideosrc channel=video_mix_streamblanker !
+ {vcaps} !
+ vmix.
+
+ interaudiosrc channel=audio_mix_streamblanker !
+ {acaps} !
+ amix.
+
+
+ interaudiosrc channel=audio_streamblanker !
+ {acaps} !
+ amix.
+ """.format(
+ acaps=self.acaps,
+ vcaps=self.vcaps
+ )
+
+ for name in self.names:
+ pipeline += """
+ intervideosrc channel=video_{name}_streamblanker !
+ {vcaps} !
+ vmix.
+ """.format(
+ name=name,
+ vcaps=self.vcaps
+ )
+
+ self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline)
+ self.mixingPipeline = Gst.parse_launch(pipeline)
+ self.mixingPipeline.use_clock(Clock)
+
+ self.log.debug('Binding Error & End-of-Stream-Signal '
+ 'on Mixing-Pipeline')
+ self.mixingPipeline.bus.add_signal_watch()
+ self.mixingPipeline.bus.connect("message::eos", self.on_eos)
+ self.mixingPipeline.bus.connect("message::error", self.on_error)
+
+ self.log.debug('Initializing Mixer-State')
+ self.blankSource = None
+ self.applyMixerState()
+
+ self.log.debug('Launching Mixing-Pipeline')
+ self.mixingPipeline.set_state(Gst.State.PLAYING)
+
+ def on_eos(self, bus, message):
+ self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline')
+
+ def on_error(self, bus, message):
+ self.log.debug('Received Error-Signal on Mixing-Pipeline')
+ (error, debug) = message.parse_error()
+ self.log.debug('Error-Details: #%u: %s', error.code, debug)
+
+ def applyMixerState(self):
+ self.applyMixerStateAudio()
+ self.applyMixerStateVideo()
+
+ def applyMixerStateAudio(self):
+ mixpad = (self.mixingPipeline.get_by_name('amix')
+ .get_static_pad('sink_0'))
+ blankpad = (self.mixingPipeline.get_by_name('amix')
+ .get_static_pad('sink_1'))
+
+ mixpad.set_property('volume', int(self.blankSource is None))
+ blankpad.set_property('volume', int(self.blankSource is not None))
+
+ def applyMixerStateVideo(self):
+ mixpad = (self.mixingPipeline.get_by_name('vmix')
+ .get_static_pad('sink_0'))
+ mixpad.set_property('alpha', int(self.blankSource is None))
+
+ for idx, name in enumerate(self.names):
+ blankpad = (self.mixingPipeline
+ .get_by_name('vmix')
+ .get_static_pad('sink_%u' % (idx + 1)))
+ blankpad.set_property('alpha', int(self.blankSource == idx))
+
+ def setBlankSource(self, source):
+ self.blankSource = source
+ self.applyMixerState()
diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py
index 66fc33f..ac228a3 100644
--- a/voctocore/lib/tcpmulticonnection.py
+++ b/voctocore/lib/tcpmulticonnection.py
@@ -1,41 +1,48 @@
-import logging, socket
+import logging
+import socket
from queue import Queue
from gi.repository import GObject
from lib.config import Config
+
class TCPMultiConnection(object):
- def __init__(self, port):
- if not hasattr(self, 'log'):
- self.log = logging.getLogger('TCPMultiConnection')
- self.boundSocket = None
- self.currentConnections = dict()
+ def __init__(self, port):
+ if not hasattr(self, 'log'):
+ self.log = logging.getLogger('TCPMultiConnection')
+
+ self.boundSocket = None
+ self.currentConnections = dict()
- self.log.debug('Binding to Source-Socket on [::]:%u', port)
- self.boundSocket = socket.socket(socket.AF_INET6)
- self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
- self.boundSocket.bind(('::', port))
- self.boundSocket.listen(1)
+ self.log.debug('Binding to Source-Socket on [::]:%u', port)
+ self.boundSocket = socket.socket(socket.AF_INET6)
+ self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY,
+ False)
+ self.boundSocket.bind(('::', port))
+ self.boundSocket.listen(1)
- self.log.debug('Setting GObject io-watch on Socket')
- GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
+ self.log.debug('Setting GObject io-watch on Socket')
+ GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
- def on_connect(self, sock, *args):
- conn, addr = sock.accept()
- conn.setblocking(False)
+ def on_connect(self, sock, *args):
+ conn, addr = sock.accept()
+ conn.setblocking(False)
- self.log.info("Incomming Connection from [%s]:%u (fd=%u)", addr[0], addr[1], conn.fileno())
+ self.log.info("Incomming Connection from [%s]:%u (fd=%u)",
+ addr[0], addr[1], conn.fileno())
- self.currentConnections[conn] = Queue()
- self.log.info('Now %u Receiver connected', len(self.currentConnections))
+ self.currentConnections[conn] = Queue()
+ self.log.info('Now %u Receiver connected',
+ len(self.currentConnections))
- self.on_accepted(conn, addr)
+ self.on_accepted(conn, addr)
- return True
+ return True
- def close_connection(self, conn):
- if conn in self.currentConnections:
- del(self.currentConnections[conn])
- self.log.info('Now %u Receiver connected', len(self.currentConnections))
+ def close_connection(self, conn):
+ if conn in self.currentConnections:
+ del(self.currentConnections[conn])
+ self.log.info('Now %u Receiver connected',
+ len(self.currentConnections))
diff --git a/voctocore/lib/tcpsingleconnection.py b/voctocore/lib/tcpsingleconnection.py
index bf89651..2d3c658 100644
--- a/voctocore/lib/tcpsingleconnection.py
+++ b/voctocore/lib/tcpsingleconnection.py
@@ -1,39 +1,45 @@
-import logging, socket, time
+import logging
+import socket
+import time
from gi.repository import GObject
from lib.config import Config
+
class TCPSingleConnection(object):
- def __init__(self, port):
- if not hasattr(self, 'log'):
- self.log = logging.getLogger('TCPMultiConnection')
- self.log.debug('Binding to Source-Socket on [::]:%u', port)
- self.boundSocket = socket.socket(socket.AF_INET6)
- self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
- self.boundSocket.bind(('::', port))
- self.boundSocket.listen(1)
+ def __init__(self, port):
+ if not hasattr(self, 'log'):
+ self.log = logging.getLogger('TCPMultiConnection')
+
+ self.log.debug('Binding to Source-Socket on [::]:%u', port)
+ self.boundSocket = socket.socket(socket.AF_INET6)
+ self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY,
+ False)
+ self.boundSocket.bind(('::', port))
+ self.boundSocket.listen(1)
- self.currentConnection = None
+ self.currentConnection = None
- self.log.debug('Setting GObject io-watch on Socket')
- GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
+ self.log.debug('Setting GObject io-watch on Socket')
+ GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
- def on_connect(self, sock, *args):
- conn, addr = sock.accept()
- self.log.info("Incomming Connection from %s", addr)
+ def on_connect(self, sock, *args):
+ conn, addr = sock.accept()
+ self.log.info('Incomming Connection from %s', addr)
- if self.currentConnection is not None:
- self.log.warn("Another Source is already connected, closing existing pipeline")
- self.disconnect()
- time.sleep(1)
+ if self.currentConnection is not None:
+ self.log.warn('Another Source is already connected, '
+ 'closing existing pipeline')
+ self.disconnect()
+ time.sleep(1)
- self.on_accepted(conn, addr)
- self.currentConnection = conn
+ self.on_accepted(conn, addr)
+ self.currentConnection = conn
- return True
+ return True
- def close_connection(self):
- self.currentConnection = None
- self.log.info('Connection closed')
+ def close_connection(self):
+ self.currentConnection = None
+ self.log.info('Connection closed')
diff --git a/voctocore/lib/videomix.py b/voctocore/lib/videomix.py
index 95d538d..4ab740f 100644
--- a/voctocore/lib/videomix.py
+++ b/voctocore/lib/videomix.py
@@ -5,377 +5,411 @@ from enum import Enum, unique
from lib.config import Config
from lib.clock import Clock
+
@unique
class CompositeModes(Enum):
- fullscreen = 0
- side_by_side_equal = 1
- side_by_side_preview = 2
- picture_in_picture = 3
-
-class PadState(object):
- def __init__(self):
- self.reset()
+ fullscreen = 0
+ side_by_side_equal = 1
+ side_by_side_preview = 2
+ picture_in_picture = 3
- def reset(self):
- self.alpha = 1.0
- self.xpos = 0
- self.ypos = 0
- self.zorder = 1
- self.width = 0
- self.height = 0
-class VideoMix(object):
- log = logging.getLogger('VideoMix')
-
- def __init__(self):
- self.caps = Config.get('mix', 'videocaps')
-
- self.names = Config.getlist('mix', 'sources')
- self.log.info('Configuring Mixer for %u Sources', len(self.names))
-
- pipeline = """
- compositor name=mix !
- {caps} !
- identity name=sig !
- queue !
- tee name=tee
+class PadState(object):
- intervideosrc channel=video_background !
- {caps} !
- mix.
-
- tee. ! queue ! intervideosink channel=video_mix_out
- """.format(
- caps=self.caps
- )
-
- if Config.getboolean('previews', 'enabled'):
- pipeline += """
- tee. ! queue ! intervideosink channel=video_mix_preview
- """
-
- if Config.getboolean('stream-blanker', 'enabled'):
- pipeline += """
- tee. ! queue ! intervideosink channel=video_mix_streamblanker
- """
-
- for idx, name in enumerate(self.names):
- pipeline += """
- intervideosrc channel=video_{name}_mixer !
- {caps} !
- mix.
- """.format(
- name=name,
- caps=self.caps,
- idx=idx
- )
-
- self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline)
- self.mixingPipeline = Gst.parse_launch(pipeline)
- self.mixingPipeline.use_clock(Clock)
-
- self.log.debug('Binding Error & End-of-Stream-Signal on Mixing-Pipeline')
- self.mixingPipeline.bus.add_signal_watch()
- self.mixingPipeline.bus.connect("message::eos", self.on_eos)
- self.mixingPipeline.bus.connect("message::error", self.on_error)
+ def __init__(self):
+ self.reset()
- self.log.debug('Binding Handoff-Handler for Synchronus mixer manipulation')
- sig = self.mixingPipeline.get_by_name('sig')
- sig.connect('handoff', self.on_handoff)
-
- self.padStateDirty = False
- self.padState = list()
- for idx, name in enumerate(self.names):
- self.padState.append(PadState())
-
- self.log.debug('Initializing Mixer-State')
- self.compositeMode = CompositeModes.fullscreen
- self.sourceA = 0
- self.sourceB = 1
- self.recalculateMixerState()
- self.applyMixerState()
+ def reset(self):
+ self.alpha = 1.0
+ self.xpos = 0
+ self.ypos = 0
+ self.zorder = 1
+ self.width = 0
+ self.height = 0
- bgMixerpad = self.mixingPipeline.get_by_name('mix').get_static_pad('sink_0')
- bgMixerpad.set_property('zorder', 0)
- self.log.debug('Launching Mixing-Pipeline')
- self.mixingPipeline.set_state(Gst.State.PLAYING)
-
- def getInputVideoSize(self):
- caps = Gst.Caps.from_string(self.caps)
- struct = caps.get_structure(0)
- _, width = struct.get_int('width')
- _, height = struct.get_int('height')
-
- return width, height
-
- def recalculateMixerState(self):
- if self.compositeMode == CompositeModes.fullscreen:
- self.recalculateMixerStateFullscreen()
-
- elif self.compositeMode == CompositeModes.side_by_side_equal:
- self.recalculateMixerStateSideBySideEqual()
-
- elif self.compositeMode == CompositeModes.side_by_side_preview:
- self.recalculateMixerStateSideBySidePreview()
-
- elif self.compositeMode == CompositeModes.picture_in_picture:
- self.recalculateMixerStatePictureInPicture()
-
- self.log.debug('Marking Pad-State as Dirty')
- self.padStateDirty = True
-
- def recalculateMixerStateFullscreen(self):
- self.log.info('Updating Mixer-State for Fullscreen-Composition')
-
- for idx, name in enumerate(self.names):
- pad = self.padState[idx]
-
- pad.reset()
- pad.alpha = float(idx == self.sourceA)
-
- def recalculateMixerStateSideBySideEqual(self):
- self.log.info('Updating Mixer-State for Side-by-side-Equal-Composition')
-
- width, height = self.getInputVideoSize()
- self.log.debug('Video-Size parsed as %ux%u', width, height)
-
- try:
- gutter = Config.getint('side-by-side-equal', 'gutter')
- self.log.debug('Gutter configured to %u', gutter)
- except:
- gutter = int(width / 100)
- self.log.debug('Gutter calculated to %u', gutter)
-
- targetWidth = int((width - gutter) / 2)
- targetHeight = int(targetWidth / width * height)
-
- self.log.debug('Video-Size calculated to %ux%u', targetWidth, targetHeight)
-
- xa = 0
- xb = width - targetWidth
- y = (height - targetHeight) / 2
-
- try:
- ya = Config.getint('side-by-side-equal', 'atop')
- self.log.debug('A-Video Y-Pos configured to %u', ya)
- except:
- ya = y
- self.log.debug('A-Video Y-Pos calculated to %u', ya)
-
-
- try:
- yb = Config.getint('side-by-side-equal', 'btop')
- self.log.debug('B-Video Y-Pos configured to %u', yb)
- except:
- yb = y
- self.log.debug('B-Video Y-Pos calculated to %u', yb)
-
-
- for idx, name in enumerate(self.names):
- pad = self.padState[idx]
- pad.reset()
-
- pad.width = targetWidth
- pad.height = targetHeight
-
- if idx == self.sourceA:
- pad.xpos = xa
- pad.ypos = ya
- pad.zorder = 1
-
- elif idx == self.sourceB:
- pad.xpos = xb
- pad.ypos = yb
- pad.zorder = 2
-
- else:
- pad.alpha = 0
-
- def recalculateMixerStateSideBySidePreview(self):
- self.log.info('Updating Mixer-State for Side-by-side-Preview-Composition')
-
- width, height = self.getInputVideoSize()
- self.log.debug('Video-Size parsed as %ux%u', width, height)
-
- try:
- asize = [int(i) for i in Config.get('side-by-side-preview', 'asize').split('x', 1)]
- self.log.debug('A-Video-Size configured to %ux%u', asize[0], asize[1])
- except:
- asize = [
- int(width / 1.25), # 80%
- int(height / 1.25) # 80%
- ]
- self.log.debug('A-Video-Size calculated to %ux%u', asize[0], asize[1])
-
- try:
- apos = [int(i) for i in Config.get('side-by-side-preview', 'apos').split('/', 1)]
- self.log.debug('B-Video-Position configured to %u/%u', apos[0], apos[1])
- except:
- apos = [
- int(width / 100), # 1%
- int(width / 100) # 1%
- ]
- self.log.debug('B-Video-Position calculated to %u/%u', apos[0], apos[1])
-
- try:
- bsize = [int(i) for i in Config.get('side-by-side-preview', 'bsize').split('x', 1)]
- self.log.debug('B-Video-Size configured to %ux%u', bsize[0], bsize[1])
- except:
- bsize = [
- int(width / 4), # 25%
- int(height / 4) # 25%
- ]
- self.log.debug('B-Video-Size calculated to %ux%u', bsize[0], bsize[1])
-
- try:
- bpos = [int(i) for i in Config.get('side-by-side-preview', 'bpos').split('/', 1)]
- self.log.debug('B-Video-Position configured to %u/%u', bpos[0], bpos[1])
- except:
- bpos = [
- width - int(width / 100) - bsize[0],
- height - int(width / 100) - bsize[1] # 1%
- ]
- self.log.debug('B-Video-Position calculated to %u/%u', bpos[0], bpos[1])
-
- for idx, name in enumerate(self.names):
- pad = self.padState[idx]
- pad.reset()
-
- if idx == self.sourceA:
- pad.xpos, pad.ypos = apos
- pad.width, pad.height = asize
- pad.zorder = 1
-
- elif idx == self.sourceB:
- pad.xpos, pad.ypos = bpos
- pad.width, pad.height = bsize
- pad.zorder = 2
-
- else:
- pad.alpha = 0
-
- def recalculateMixerStatePictureInPicture(self):
- self.log.info('Updating Mixer-State for Picture-in-Picture-Composition')
-
- width, height = self.getInputVideoSize()
- self.log.debug('Video-Size parsed as %ux%u', width, height)
-
- try:
- pipsize = [int(i) for i in Config.get('picture-in-picture', 'pipsize').split('x', 1)]
- self.log.debug('PIP-Size configured to %ux%u', pipsize[0], pipsize[1])
- except:
- pipsize = [
- int(width / 4), # 25%
- int(height / 4) # 25%
- ]
- self.log.debug('PIP-Size calculated to %ux%u', pipsize[0], pipsize[1])
-
- try:
- pippos = [int(i) for i in Config.get('picture-in-picture', 'pippos').split('/', 1)]
- self.log.debug('PIP-Position configured to %u/%u', pippos[0], pippos[1])
- except:
- pippos = [
- width - pipsize[0] - int(width / 100), # 1%
- height - pipsize[1] -int(width / 100) # 1%
- ]
- self.log.debug('PIP-Position calculated to %u/%u', pippos[0], pippos[1])
-
- for idx, name in enumerate(self.names):
- pad = self.padState[idx]
- pad.reset()
-
- if idx == self.sourceA:
- pass
- elif idx == self.sourceB:
- pad.xpos, pad.ypos = pippos
- pad.width, pad.height = pipsize
- pad.zorder = 2
-
- else:
- pad.alpha = 0
-
- def applyMixerState(self):
- for idx, state in enumerate(self.padState):
- # mixerpad 0 = background
- mixerpad = self.mixingPipeline.get_by_name('mix').get_static_pad('sink_%u' % (idx+1))
-
- self.log.debug('Reconfiguring Mixerpad %u to x/y=%u/%u, w/h=%u/%u alpha=%0.2f, zorder=%u', \
- idx, state.xpos, state.ypos, state.width, state.height, state.alpha, state.zorder)
- mixerpad.set_property('xpos', state.xpos)
- mixerpad.set_property('ypos', state.ypos)
- mixerpad.set_property('width', state.width)
- mixerpad.set_property('height', state.height)
- mixerpad.set_property('alpha', state.alpha)
- mixerpad.set_property('zorder', state.zorder)
-
- def selectCompositeModeDefaultSources(self):
- sectionNames = {
- CompositeModes.fullscreen: 'fullscreen',
- CompositeModes.side_by_side_equal: 'side-by-side-equal',
- CompositeModes.side_by_side_preview: 'side-by-side-preview',
- CompositeModes.picture_in_picture: 'picture-in-picture'
- }
-
- compositeModeName = self.compositeMode.name
- sectionName = sectionNames[self.compositeMode]
-
- try:
- defSource = Config.get(sectionName, 'default-a')
- self.setVideoSourceA(self.names.index(defSource))
- self.log.info('Changing sourceA to default of Mode %s: %s', compositeModeName, defSource)
- except Exception as e:
- pass
-
- try:
- defSource = Config.get(sectionName, 'default-b')
- self.setVideoSourceB(self.names.index(defSource))
- self.log.info('Changing sourceB to default of Mode %s: %s', compositeModeName, defSource)
- except Exception as e:
- pass
-
- def on_handoff(self, object, buffer):
- if self.padStateDirty:
- self.padStateDirty = False
- self.log.debug('[Streaming-Thread]: Pad-State is Dirty, applying new Mixer-State')
- self.applyMixerState()
-
- def on_eos(self, bus, message):
- self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline')
-
- def on_error(self, bus, message):
- self.log.debug('Received Error-Signal on Mixing-Pipeline')
- (error, debug) = message.parse_error()
- self.log.debug('Error-Details: #%u: %s', error.code, debug)
-
-
- def setVideoSourceA(self, source):
- # swap if required
- if self.sourceB == source:
- self.sourceB = self.sourceA
-
- self.sourceA = source
- self.recalculateMixerState()
-
- def getVideoSourceA(self):
- return self.sourceA
-
- def setVideoSourceB(self, source):
- # swap if required
- if self.sourceA == source:
- self.sourceA = self.sourceB
-
- self.sourceB = source
- self.recalculateMixerState()
-
- def getVideoSourceB(self):
- return self.sourceB
-
- def setCompositeMode(self, mode):
- self.compositeMode = mode
-
- self.selectCompositeModeDefaultSources()
- self.recalculateMixerState()
-
- def getCompositeMode(self):
- return self.compositeMode
+class VideoMix(object):
+ log = logging.getLogger('VideoMix')
+
+ def __init__(self):
+ self.caps = Config.get('mix', 'videocaps')
+
+ self.names = Config.getlist('mix', 'sources')
+ self.log.info('Configuring Mixer for %u Sources', len(self.names))
+
+ pipeline = """
+ compositor name=mix !
+ {caps} !
+ identity name=sig !
+ queue !
+ tee name=tee
+
+ intervideosrc channel=video_background !
+ {caps} !
+ mix.
+
+ tee. ! queue ! intervideosink channel=video_mix_out
+ """.format(
+ caps=self.caps
+ )
+
+ if Config.getboolean('previews', 'enabled'):
+ pipeline += """
+ tee. ! queue ! intervideosink channel=video_mix_preview
+ """
+
+ if Config.getboolean('stream-blanker', 'enabled'):
+ pipeline += """
+ tee. ! queue ! intervideosink channel=video_mix_streamblanker
+ """
+
+ for idx, name in enumerate(self.names):
+ pipeline += """
+ intervideosrc channel=video_{name}_mixer !
+ {caps} !
+ mix.
+ """.format(
+ name=name,
+ caps=self.caps,
+ idx=idx
+ )
+
+ self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline)
+ self.mixingPipeline = Gst.parse_launch(pipeline)
+ self.mixingPipeline.use_clock(Clock)
+
+ self.log.debug('Binding Error & End-of-Stream-Signal '
+ 'on Mixing-Pipeline')
+ self.mixingPipeline.bus.add_signal_watch()
+ self.mixingPipeline.bus.connect("message::eos", self.on_eos)
+ self.mixingPipeline.bus.connect("message::error", self.on_error)
+
+ self.log.debug('Binding Handoff-Handler for '
+ 'Synchronus mixer manipulation')
+ sig = self.mixingPipeline.get_by_name('sig')
+ sig.connect('handoff', self.on_handoff)
+
+ self.padStateDirty = False
+ self.padState = list()
+ for idx, name in enumerate(self.names):
+ self.padState.append(PadState())
+
+ self.log.debug('Initializing Mixer-State')
+ self.compositeMode = CompositeModes.fullscreen
+ self.sourceA = 0
+ self.sourceB = 1
+ self.recalculateMixerState()
+ self.applyMixerState()
+
+ bgMixerpad = (self.mixingPipeline.get_by_name('mix')
+ .get_static_pad('sink_0'))
+ bgMixerpad.set_property('zorder', 0)
+
+ self.log.debug('Launching Mixing-Pipeline')
+ self.mixingPipeline.set_state(Gst.State.PLAYING)
+
+ def getInputVideoSize(self):
+ caps = Gst.Caps.from_string(self.caps)
+ struct = caps.get_structure(0)
+ _, width = struct.get_int('width')
+ _, height = struct.get_int('height')
+
+ return width, height
+
+ def recalculateMixerState(self):
+ if self.compositeMode == CompositeModes.fullscreen:
+ self.recalculateMixerStateFullscreen()
+
+ elif self.compositeMode == CompositeModes.side_by_side_equal:
+ self.recalculateMixerStateSideBySideEqual()
+
+ elif self.compositeMode == CompositeModes.side_by_side_preview:
+ self.recalculateMixerStateSideBySidePreview()
+
+ elif self.compositeMode == CompositeModes.picture_in_picture:
+ self.recalculateMixerStatePictureInPicture()
+
+ self.log.debug('Marking Pad-State as Dirty')
+ self.padStateDirty = True
+
+ def recalculateMixerStateFullscreen(self):
+ self.log.info('Updating Mixer-State for Fullscreen-Composition')
+
+ for idx, name in enumerate(self.names):
+ pad = self.padState[idx]
+
+ pad.reset()
+ pad.alpha = float(idx == self.sourceA)
+
+ def recalculateMixerStateSideBySideEqual(self):
+ self.log.info('Updating Mixer-State for '
+ 'Side-by-side-Equal-Composition')
+
+ width, height = self.getInputVideoSize()
+ self.log.debug('Video-Size parsed as %ux%u', width, height)
+
+ try:
+ gutter = Config.getint('side-by-side-equal', 'gutter')
+ self.log.debug('Gutter configured to %u', gutter)
+ except:
+ gutter = int(width / 100)
+ self.log.debug('Gutter calculated to %u', gutter)
+
+ targetWidth = int((width - gutter) / 2)
+ targetHeight = int(targetWidth / width * height)
+
+ self.log.debug('Video-Size calculated to %ux%u',
+ targetWidth, targetHeight)
+
+ xa = 0
+ xb = width - targetWidth
+ y = (height - targetHeight) / 2
+
+ try:
+ ya = Config.getint('side-by-side-equal', 'atop')
+ self.log.debug('A-Video Y-Pos configured to %u', ya)
+ except:
+ ya = y
+ self.log.debug('A-Video Y-Pos calculated to %u', ya)
+
+ try:
+ yb = Config.getint('side-by-side-equal', 'btop')
+ self.log.debug('B-Video Y-Pos configured to %u', yb)
+ except:
+ yb = y
+ self.log.debug('B-Video Y-Pos calculated to %u', yb)
+
+ for idx, name in enumerate(self.names):
+ pad = self.padState[idx]
+ pad.reset()
+
+ pad.width = targetWidth
+ pad.height = targetHeight
+
+ if idx == self.sourceA:
+ pad.xpos = xa
+ pad.ypos = ya
+ pad.zorder = 1
+
+ elif idx == self.sourceB:
+ pad.xpos = xb
+ pad.ypos = yb
+ pad.zorder = 2
+
+ else:
+ pad.alpha = 0
+
+ def recalculateMixerStateSideBySidePreview(self):
+ self.log.info('Updating Mixer-State for '
+ 'Side-by-side-Preview-Composition')
+
+ width, height = self.getInputVideoSize()
+ self.log.debug('Video-Size parsed as %ux%u', width, height)
+
+ try:
+ asize = [int(i) for i in Config.get('side-by-side-preview',
+ 'asize').split('x', 1)]
+ self.log.debug('A-Video-Size configured to %ux%u',
+ asize[0], asize[1])
+ except:
+ asize = [
+ int(width / 1.25), # 80%
+ int(height / 1.25) # 80%
+ ]
+ self.log.debug('A-Video-Size calculated to %ux%u',
+ asize[0], asize[1])
+
+ try:
+ apos = [int(i) for i in Config.get('side-by-side-preview',
+ 'apos').split('/', 1)]
+ self.log.debug('B-Video-Position configured to %u/%u',
+ apos[0], apos[1])
+ except:
+ apos = [
+ int(width / 100), # 1%
+ int(width / 100) # 1%
+ ]
+ self.log.debug('B-Video-Position calculated to %u/%u',
+ apos[0], apos[1])
+
+ try:
+ bsize = [int(i) for i in Config.get('side-by-side-preview',
+ 'bsize').split('x', 1)]
+ self.log.debug('B-Video-Size configured to %ux%u',
+ bsize[0], bsize[1])
+ except:
+ bsize = [
+ int(width / 4), # 25%
+ int(height / 4) # 25%
+ ]
+ self.log.debug('B-Video-Size calculated to %ux%u',
+ bsize[0], bsize[1])
+
+ try:
+ bpos = [int(i) for i in Config.get('side-by-side-preview',
+ 'bpos').split('/', 1)]
+ self.log.debug('B-Video-Position configured to %u/%u',
+ bpos[0], bpos[1])
+ except:
+ bpos = [
+ width - int(width / 100) - bsize[0],
+ height - int(width / 100) - bsize[1] # 1%
+ ]
+ self.log.debug('B-Video-Position calculated to %u/%u',
+ bpos[0], bpos[1])
+
+ for idx, name in enumerate(self.names):
+ pad = self.padState[idx]
+ pad.reset()
+
+ if idx == self.sourceA:
+ pad.xpos, pad.ypos = apos
+ pad.width, pad.height = asize
+ pad.zorder = 1
+
+ elif idx == self.sourceB:
+ pad.xpos, pad.ypos = bpos
+ pad.width, pad.height = bsize
+ pad.zorder = 2
+
+ else:
+ pad.alpha = 0
+
+ def recalculateMixerStatePictureInPicture(self):
+ self.log.info('Updating Mixer-State for '
+ 'Picture-in-Picture-Composition')
+
+ width, height = self.getInputVideoSize()
+ self.log.debug('Video-Size parsed as %ux%u', width, height)
+
+ try:
+ pipsize = [int(i) for i in Config.get('picture-in-picture',
+ 'pipsize').split('x', 1)]
+ self.log.debug('PIP-Size configured to %ux%u',
+ pipsize[0], pipsize[1])
+ except:
+ pipsize = [
+ int(width / 4), # 25%
+ int(height / 4) # 25%
+ ]
+ self.log.debug('PIP-Size calculated to %ux%u',
+ pipsize[0], pipsize[1])
+
+ try:
+ pippos = [int(i) for i in Config.get('picture-in-picture',
+ 'pippos').split('/', 1)]
+ self.log.debug('PIP-Position configured to %u/%u',
+ pippos[0], pippos[1])
+ except:
+ pippos = [
+ width - pipsize[0] - int(width / 100), # 1%
+ height - pipsize[1] - int(width / 100) # 1%
+ ]
+ self.log.debug('PIP-Position calculated to %u/%u',
+ pippos[0], pippos[1])
+
+ for idx, name in enumerate(self.names):
+ pad = self.padState[idx]
+ pad.reset()
+
+ if idx == self.sourceA:
+ pass
+ elif idx == self.sourceB:
+ pad.xpos, pad.ypos = pippos
+ pad.width, pad.height = pipsize
+ pad.zorder = 2
+
+ else:
+ pad.alpha = 0
+
+ def applyMixerState(self):
+ for idx, state in enumerate(self.padState):
+ # mixerpad 0 = background
+ mixerpad = (self.mixingPipeline
+ .get_by_name('mix')
+ .get_static_pad('sink_%u' % (idx + 1)))
+
+ self.log.debug('Reconfiguring Mixerpad %u to '
+ 'x/y=%u/%u, w/h=%u/%u alpha=%0.2f, zorder=%u',
+ idx, state.xpos, state.ypos,
+ state.width, state.height,
+ state.alpha, state.zorder)
+ mixerpad.set_property('xpos', state.xpos)
+ mixerpad.set_property('ypos', state.ypos)
+ mixerpad.set_property('width', state.width)
+ mixerpad.set_property('height', state.height)
+ mixerpad.set_property('alpha', state.alpha)
+ mixerpad.set_property('zorder', state.zorder)
+
+ def selectCompositeModeDefaultSources(self):
+ sectionNames = {
+ CompositeModes.fullscreen: 'fullscreen',
+ CompositeModes.side_by_side_equal: 'side-by-side-equal',
+ CompositeModes.side_by_side_preview: 'side-by-side-preview',
+ CompositeModes.picture_in_picture: 'picture-in-picture'
+ }
+
+ compositeModeName = self.compositeMode.name
+ sectionName = sectionNames[self.compositeMode]
+
+ try:
+ defSource = Config.get(sectionName, 'default-a')
+ self.setVideoSourceA(self.names.index(defSource))
+ self.log.info('Changing sourceA to default of Mode %s: %s',
+ compositeModeName, defSource)
+ except Exception as e:
+ pass
+
+ try:
+ defSource = Config.get(sectionName, 'default-b')
+ self.setVideoSourceB(self.names.index(defSource))
+ self.log.info('Changing sourceB to default of Mode %s: %s',
+ compositeModeName, defSource)
+ except Exception as e:
+ pass
+
+ def on_handoff(self, object, buffer):
+ if self.padStateDirty:
+ self.padStateDirty = False
+ self.log.debug('[Streaming-Thread]: Pad-State is Dirty, '
+ 'applying new Mixer-State')
+ self.applyMixerState()
+
+ def on_eos(self, bus, message):
+ self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline')
+
+ def on_error(self, bus, message):
+ self.log.debug('Received Error-Signal on Mixing-Pipeline')
+ (error, debug) = message.parse_error()
+ self.log.debug('Error-Details: #%u: %s', error.code, debug)
+
+ def setVideoSourceA(self, source):
+ # swap if required
+ if self.sourceB == source:
+ self.sourceB = self.sourceA
+
+ self.sourceA = source
+ self.recalculateMixerState()
+
+ def getVideoSourceA(self):
+ return self.sourceA
+
+ def setVideoSourceB(self, source):
+ # swap if required
+ if self.sourceA == source:
+ self.sourceA = self.sourceB
+
+ self.sourceB = source
+ self.recalculateMixerState()
+
+ def getVideoSourceB(self):
+ return self.sourceB
+
+ def setCompositeMode(self, mode):
+ self.compositeMode = mode
+
+ self.selectCompositeModeDefaultSources()
+ self.recalculateMixerState()
+
+ def getCompositeMode(self):
+ return self.compositeMode
diff --git a/voctocore/voctocore.py b/voctocore/voctocore.py
index 1d04c4f..fd20163 100755
--- a/voctocore/voctocore.py
+++ b/voctocore/voctocore.py
@@ -1,91 +1,100 @@
#!/usr/bin/env python3
-import gi, signal, logging, sys
+import gi
+import signal
+import logging
+import sys
# import GStreamer and GLib-Helper classes
gi.require_version('Gst', '1.0')
gi.require_version('GstNet', '1.0')
from gi.repository import Gst, GstNet, GObject
+# import local classes
+from lib.args import Args
+from lib.loghandler import LogHandler
+
# check min-version
minGst = (1, 5)
minPy = (3, 0)
Gst.init([])
if Gst.version() < minGst:
- raise Exception("GStreamer version", Gst.version(), 'is too old, at least', minGst, 'is required')
+ raise Exception('GStreamer version', Gst.version(),
+ 'is too old, at least', minGst, 'is required')
if sys.version_info < minPy:
- raise Exception("Python version", sys.version_info, 'is too old, at least', minPy, 'is required')
+ raise Exception('Python version', sys.version_info,
+ 'is too old, at least', minPy, 'is required')
# init GObject & Co. before importing local classes
GObject.threads_init()
-# import local classes
-from lib.args import Args
-from lib.loghandler import LogHandler
# main class
class Voctocore(object):
- def __init__(self):
- # import local which use the config or the logging system
- # this is required, so that we can cnfigure logging, before reading the config
- from lib.pipeline import Pipeline
- from lib.controlserver import ControlServer
- self.log = logging.getLogger('Voctocore')
- self.log.debug('creating GObject-MainLoop')
- self.mainloop = GObject.MainLoop()
+ def __init__(self):
+ # import local which use the config or the logging system
+ # this is required, so that we can configure logging,
+ # before reading the config
+ from lib.pipeline import Pipeline
+ from lib.controlserver import ControlServer
+
+ self.log = logging.getLogger('Voctocore')
+ self.log.debug('creating GObject-MainLoop')
+ self.mainloop = GObject.MainLoop()
- # initialize subsystem
- self.log.debug('creating A/V-Pipeline')
- self.pipeline = Pipeline()
+ # initialize subsystem
+ self.log.debug('creating A/V-Pipeline')
+ self.pipeline = Pipeline()
- self.log.debug('creating ControlServer')
- self.controlserver = ControlServer(self.pipeline)
+ self.log.debug('creating ControlServer')
+ self.controlserver = ControlServer(self.pipeline)
- def run(self):
- self.log.info('running GObject-MainLoop')
- try:
- self.mainloop.run()
- except KeyboardInterrupt:
- self.log.info('Terminated via Ctrl-C')
+ def run(self):
+ self.log.info('running GObject-MainLoop')
+ try:
+ self.mainloop.run()
+ except KeyboardInterrupt:
+ self.log.info('Terminated via Ctrl-C')
- def quit(self):
- self.log.info('quitting GObject-MainLoop')
- self.mainloop.quit()
+ def quit(self):
+ self.log.info('quitting GObject-MainLoop')
+ self.mainloop.quit()
# run mainclass
def main():
- # configure logging
- docolor = (Args.color == 'always') or (Args.color == 'auto' and sys.stderr.isatty())
+ # configure logging
+ docolor = (Args.color == 'always') or (Args.color == 'auto' and
+ sys.stderr.isatty())
- handler = LogHandler(docolor, Args.timestamp)
- logging.root.addHandler(handler)
+ handler = LogHandler(docolor, Args.timestamp)
+ logging.root.addHandler(handler)
- if Args.verbose >= 2:
- level = logging.DEBUG
- elif Args.verbose == 1:
- level = logging.INFO
- else:
- level = logging.WARNING
+ if Args.verbose >= 2:
+ level = logging.DEBUG
+ elif Args.verbose == 1:
+ level = logging.INFO
+ else:
+ level = logging.WARNING
- logging.root.setLevel(level)
+ logging.root.setLevel(level)
- # make killable by ctrl-c
- logging.debug('setting SIGINT handler')
- signal.signal(signal.SIGINT, signal.SIG_DFL)
+ # make killable by ctrl-c
+ logging.debug('setting SIGINT handler')
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
- logging.info('Python Version: %s', sys.version_info)
- logging.info('GStreamer Version: %s', Gst.version())
+ logging.info('Python Version: %s', sys.version_info)
+ logging.info('GStreamer Version: %s', Gst.version())
- # init main-class and main-loop
- logging.debug('initializing Voctocore')
- voctocore = Voctocore()
+ # init main-class and main-loop
+ logging.debug('initializing Voctocore')
+ voctocore = Voctocore()
- logging.debug('running Voctocore')
- voctocore.run()
+ logging.debug('running Voctocore')
+ voctocore.run()
if __name__ == '__main__':
- main()
+ main()