diff options
author | Florian Zeitz <florob@babelmonkeys.de> | 2016-09-15 22:55:36 +0200 |
---|---|---|
committer | Florian Zeitz <florob@babelmonkeys.de> | 2016-09-15 22:55:36 +0200 |
commit | 9ffea4ad9ef0f0b23e5b4e5ebbfe5f140ecf5450 (patch) | |
tree | 54b1279870df9ce4fa828373d35d879e8ef431d7 /voctocore/lib | |
parent | 746d75dee7fb6291a900e0162345354e6eb41c13 (diff) |
voctocore: pep8ify
* Indent by 4 spaces
* Reformat some argument lists
* Two newlines before free functions
* One newline before methods
* Spaces around infix operators
Diffstat (limited to 'voctocore/lib')
-rw-r--r-- | voctocore/lib/args.py | 13 | ||||
-rw-r--r-- | voctocore/lib/audiomix.py | 162 | ||||
-rw-r--r-- | voctocore/lib/avpreviewoutput.py | 163 | ||||
-rw-r--r-- | voctocore/lib/avrawoutput.py | 146 | ||||
-rw-r--r-- | voctocore/lib/avsource.py | 234 | ||||
-rw-r--r-- | voctocore/lib/commands.py | 414 | ||||
-rw-r--r-- | voctocore/lib/config.py | 25 | ||||
-rw-r--r-- | voctocore/lib/controlserver.py | 301 | ||||
-rw-r--r-- | voctocore/lib/loghandler.py | 86 | ||||
-rw-r--r-- | voctocore/lib/pipeline.py | 184 | ||||
-rw-r--r-- | voctocore/lib/response.py | 14 | ||||
-rw-r--r-- | voctocore/lib/streamblanker.py | 193 | ||||
-rw-r--r-- | voctocore/lib/tcpmulticonnection.py | 59 | ||||
-rw-r--r-- | voctocore/lib/tcpsingleconnection.py | 58 | ||||
-rw-r--r-- | voctocore/lib/videomix.py | 766 |
15 files changed, 1467 insertions, 1351 deletions
diff --git a/voctocore/lib/args.py b/voctocore/lib/args.py index d40cd75..66c298d 100644 --- a/voctocore/lib/args.py +++ b/voctocore/lib/args.py @@ -4,16 +4,19 @@ __all__ = ['Args'] parser = argparse.ArgumentParser(description='Voctocore') parser.add_argument('-v', '--verbose', action='count', default=0, - help="Also print INFO and DEBUG messages.") + help="Also print INFO and DEBUG messages.") -parser.add_argument('-c', '--color', action='store', choices=['auto', 'always', 'never'], default='auto', - help="Control the use of colors in the Log-Output") +parser.add_argument('-c', '--color', + action='store', + choices=['auto', 'always', 'never'], + default='auto', + help="Control the use of colors in the Log-Output") parser.add_argument('-t', '--timestamp', action='store_true', - help="Enable timestamps in the Log-Output") + help="Enable timestamps in the Log-Output") parser.add_argument('-i', '--ini-file', action='store', - help="Load a custom config.ini-File") + help="Load a custom config.ini-File") Args = parser.parse_args() diff --git a/voctocore/lib/audiomix.py b/voctocore/lib/audiomix.py index 6c1768a..37376fd 100644 --- a/voctocore/lib/audiomix.py +++ b/voctocore/lib/audiomix.py @@ -5,83 +5,87 @@ from enum import Enum from lib.config import Config from lib.clock import Clock + class AudioMix(object): - def __init__(self): - self.log = logging.getLogger('AudioMix') - - self.selectedSource = 0 - - self.caps = Config.get('mix', 'audiocaps') - self.names = Config.getlist('mix', 'sources') - self.log.info('Configuring Mixer for %u Sources', len(self.names)) - - pipeline = """ - audiomixer name=mix ! - {caps} ! - queue ! - tee name=tee - - tee. ! queue ! interaudiosink channel=audio_mix_out - """.format( - caps=self.caps - ) - - if Config.getboolean('previews', 'enabled'): - pipeline += """ - tee. ! queue ! interaudiosink channel=audio_mix_preview - """ - - if Config.getboolean('stream-blanker', 'enabled'): - pipeline += """ - tee. ! queue ! interaudiosink channel=audio_mix_streamblanker - """ - - for idx, name in enumerate(self.names): - pipeline += """ - interaudiosrc channel=audio_{name}_mixer ! - {caps} ! - mix. - """.format( - name=name, - caps=self.caps - ) - - self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline) - self.mixingPipeline = Gst.parse_launch(pipeline) - self.mixingPipeline.use_clock(Clock) - - self.log.debug('Binding Error & End-of-Stream-Signal on Mixing-Pipeline') - self.mixingPipeline.bus.add_signal_watch() - self.mixingPipeline.bus.connect("message::eos", self.on_eos) - self.mixingPipeline.bus.connect("message::error", self.on_error) - - self.log.debug('Initializing Mixer-State') - self.updateMixerState() - - self.log.debug('Launching Mixing-Pipeline') - self.mixingPipeline.set_state(Gst.State.PLAYING) - - def updateMixerState(self): - self.log.info('Updating Mixer-State') - - for idx, name in enumerate(self.names): - volume = int(idx == self.selectedSource) - - self.log.debug('Setting Mixerpad %u to volume=%0.2f', idx, volume) - mixerpad = self.mixingPipeline.get_by_name('mix').get_static_pad('sink_%u' % idx) - mixerpad.set_property('volume', volume) - - def setAudioSource(self, source): - self.selectedSource = source - self.updateMixerState() - - def getAudioSource(self): - return self.selectedSource - - def on_eos(self, bus, message): - self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline') - - def on_error(self, bus, message): - self.log.debug('Received Error-Signal on Mixing-Pipeline') - (error, debug) = message.parse_error() - self.log.debug('Error-Details: #%u: %s', error.code, debug) + + def __init__(self): + self.log = logging.getLogger('AudioMix') + + self.selectedSource = 0 + + self.caps = Config.get('mix', 'audiocaps') + self.names = Config.getlist('mix', 'sources') + self.log.info('Configuring Mixer for %u Sources', len(self.names)) + + pipeline = """ + audiomixer name=mix ! + {caps} ! + queue ! + tee name=tee + + tee. ! queue ! interaudiosink channel=audio_mix_out + """.format( + caps=self.caps + ) + + if Config.getboolean('previews', 'enabled'): + pipeline += """ + tee. ! queue ! interaudiosink channel=audio_mix_preview + """ + + if Config.getboolean('stream-blanker', 'enabled'): + pipeline += """ + tee. ! queue ! interaudiosink channel=audio_mix_streamblanker + """ + + for idx, name in enumerate(self.names): + pipeline += """ + interaudiosrc channel=audio_{name}_mixer ! + {caps} ! + mix. + """.format( + name=name, + caps=self.caps + ) + + self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline) + self.mixingPipeline = Gst.parse_launch(pipeline) + self.mixingPipeline.use_clock(Clock) + + self.log.debug('Binding Error & End-of-Stream-Signal ' + 'on Mixing-Pipeline') + self.mixingPipeline.bus.add_signal_watch() + self.mixingPipeline.bus.connect("message::eos", self.on_eos) + self.mixingPipeline.bus.connect("message::error", self.on_error) + + self.log.debug('Initializing Mixer-State') + self.updateMixerState() + + self.log.debug('Launching Mixing-Pipeline') + self.mixingPipeline.set_state(Gst.State.PLAYING) + + def updateMixerState(self): + self.log.info('Updating Mixer-State') + + for idx, name in enumerate(self.names): + volume = int(idx == self.selectedSource) + + self.log.debug('Setting Mixerpad %u to volume=%0.2f', idx, volume) + mixerpad = (self.mixingPipeline.get_by_name('mix') + .get_static_pad('sink_%u' % idx)) + mixerpad.set_property('volume', volume) + + def setAudioSource(self, source): + self.selectedSource = source + self.updateMixerState() + + def getAudioSource(self): + return self.selectedSource + + def on_eos(self, bus, message): + self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline') + + def on_error(self, bus, message): + self.log.debug('Received Error-Signal on Mixing-Pipeline') + (error, debug) = message.parse_error() + self.log.debug('Error-Details: #%u: %s', error.code, debug) diff --git a/voctocore/lib/avpreviewoutput.py b/voctocore/lib/avpreviewoutput.py index d5c2c66..749249b 100644 --- a/voctocore/lib/avpreviewoutput.py +++ b/voctocore/lib/avpreviewoutput.py @@ -5,84 +5,87 @@ from lib.config import Config from lib.tcpmulticonnection import TCPMultiConnection from lib.clock import Clock + class AVPreviewOutput(TCPMultiConnection): - def __init__(self, channel, port): - self.log = logging.getLogger('AVPreviewOutput['+channel+']') - super().__init__(port) - - self.channel = channel - - if Config.has_option('previews', 'videocaps'): - vcaps_out = Config.get('previews', 'videocaps') - else: - vcaps_out = Config.get('mix', 'videocaps') - - deinterlace = "" - if Config.getboolean('previews', 'deinterlace'): - deinterlace = "deinterlace mode=interlaced !" - - pipeline = """ - intervideosrc channel=video_{channel} ! - {vcaps_in} ! - {deinterlace} - videoscale ! - videorate ! - {vcaps_out} ! - jpegenc quality=90 ! - queue ! - mux. - - interaudiosrc channel=audio_{channel} ! - {acaps} ! - queue ! - mux. - - matroskamux - name=mux - streamable=true - writing-app=Voctomix-AVPreviewOutput ! - - multifdsink - blocksize=1048576 - buffers-max=500 - sync-method=next-keyframe - name=fd - """.format( - channel=self.channel, - acaps=Config.get('mix', 'audiocaps'), - vcaps_in=Config.get('mix', 'videocaps'), - vcaps_out=vcaps_out, - deinterlace=deinterlace - ) - - self.log.debug('Creating Output-Pipeline:\n%s', pipeline) - self.outputPipeline = Gst.parse_launch(pipeline) - self.outputPipeline.use_clock(Clock) - - self.log.debug('Binding Error & End-of-Stream-Signal on Output-Pipeline') - self.outputPipeline.bus.add_signal_watch() - self.outputPipeline.bus.connect("message::eos", self.on_eos) - self.outputPipeline.bus.connect("message::error", self.on_error) - - self.log.debug('Launching Output-Pipeline') - self.outputPipeline.set_state(Gst.State.PLAYING) - - def on_accepted(self, conn, addr): - self.log.debug('Adding fd %u to multifdsink', conn.fileno()) - fdsink = self.outputPipeline.get_by_name('fd') - fdsink.emit('add', conn.fileno()) - - def on_disconnect(multifdsink, fileno): - if fileno == conn.fileno(): - self.log.debug('fd %u removed from multifdsink', fileno) - self.close_connection(conn) - - fdsink.connect('client-fd-removed', on_disconnect) - - def on_eos(self, bus, message): - self.log.debug('Received End-of-Stream-Signal on Output-Pipeline') - - def on_error(self, bus, message): - self.log.debug('Received Error-Signal on Output-Pipeline') - (error, debug) = message.parse_error() - self.log.debug('Error-Details: #%u: %s', error.code, debug) + + def __init__(self, channel, port): + self.log = logging.getLogger('AVPreviewOutput[{}]'.format(channel)) + super().__init__(port) + + self.channel = channel + + if Config.has_option('previews', 'videocaps'): + vcaps_out = Config.get('previews', 'videocaps') + else: + vcaps_out = Config.get('mix', 'videocaps') + + deinterlace = "" + if Config.getboolean('previews', 'deinterlace'): + deinterlace = "deinterlace mode=interlaced !" + + pipeline = """ + intervideosrc channel=video_{channel} ! + {vcaps_in} ! + {deinterlace} + videoscale ! + videorate ! + {vcaps_out} ! + jpegenc quality=90 ! + queue ! + mux. + + interaudiosrc channel=audio_{channel} ! + {acaps} ! + queue ! + mux. + + matroskamux + name=mux + streamable=true + writing-app=Voctomix-AVPreviewOutput ! + + multifdsink + blocksize=1048576 + buffers-max=500 + sync-method=next-keyframe + name=fd + """.format( + channel=self.channel, + acaps=Config.get('mix', 'audiocaps'), + vcaps_in=Config.get('mix', 'videocaps'), + vcaps_out=vcaps_out, + deinterlace=deinterlace + ) + + self.log.debug('Creating Output-Pipeline:\n%s', pipeline) + self.outputPipeline = Gst.parse_launch(pipeline) + self.outputPipeline.use_clock(Clock) + + self.log.debug('Binding Error & End-of-Stream-Signal ' + 'on Output-Pipeline') + self.outputPipeline.bus.add_signal_watch() + self.outputPipeline.bus.connect("message::eos", self.on_eos) + self.outputPipeline.bus.connect("message::error", self.on_error) + + self.log.debug('Launching Output-Pipeline') + self.outputPipeline.set_state(Gst.State.PLAYING) + + def on_accepted(self, conn, addr): + self.log.debug('Adding fd %u to multifdsink', conn.fileno()) + fdsink = self.outputPipeline.get_by_name('fd') + fdsink.emit('add', conn.fileno()) + + def on_disconnect(multifdsink, fileno): + if fileno == conn.fileno(): + self.log.debug('fd %u removed from multifdsink', fileno) + self.close_connection(conn) + + fdsink.connect('client-fd-removed', on_disconnect) + + def on_eos(self, bus, message): + self.log.debug('Received End-of-Stream-Signal on Output-Pipeline') + + def on_error(self, bus, message): + self.log.debug('Received Error-Signal on Output-Pipeline') + (error, debug) = message.parse_error() + self.log.debug('Error-Details: #%u: %s', error.code, debug) diff --git a/voctocore/lib/avrawoutput.py b/voctocore/lib/avrawoutput.py index 5523a66..b044e24 100644 --- a/voctocore/lib/avrawoutput.py +++ b/voctocore/lib/avrawoutput.py @@ -5,76 +5,78 @@ from lib.config import Config from lib.tcpmulticonnection import TCPMultiConnection from lib.clock import Clock + class AVRawOutput(TCPMultiConnection): - def __init__(self, channel, port): - self.log = logging.getLogger('AVRawOutput['+channel+']') - super().__init__(port) - - self.channel = channel - - pipeline = """ - intervideosrc channel=video_{channel} ! - {vcaps} ! - queue ! - mux. - - interaudiosrc channel=audio_{channel} ! - {acaps} ! - queue ! - mux. - - matroskamux - name=mux - streamable=true - writing-app=Voctomix-AVRawOutput ! - - multifdsink - blocksize=1048576 - buffers-max={buffers_max} - sync-method=next-keyframe - name=fd - """.format( - channel=self.channel, - acaps=Config.get('mix', 'audiocaps'), - vcaps=Config.get('mix', 'videocaps'), - buffers_max= - Config.get('output-buffers', channel) - if Config.has_option('output-buffers', channel) - else 500, - ) - self.log.debug('Creating Output-Pipeline:\n%s', pipeline) - self.outputPipeline = Gst.parse_launch(pipeline) - self.outputPipeline.use_clock(Clock) - - self.log.debug('Binding Error & End-of-Stream-Signal on Output-Pipeline') - self.outputPipeline.bus.add_signal_watch() - self.outputPipeline.bus.connect("message::eos", self.on_eos) - self.outputPipeline.bus.connect("message::error", self.on_error) - - self.log.debug('Launching Output-Pipeline') - self.outputPipeline.set_state(Gst.State.PLAYING) - - def on_accepted(self, conn, addr): - self.log.debug('Adding fd %u to multifdsink', conn.fileno()) - fdsink = self.outputPipeline.get_by_name('fd') - fdsink.emit('add', conn.fileno()) - - def on_disconnect(multifdsink, fileno): - if fileno == conn.fileno(): - self.log.debug('fd %u removed from multifdsink', fileno) - self.close_connection(conn) - - def on_about_to_disconnect(multifdsink, fileno, status): - if fileno == conn.fileno() and status == 3: # Gst.MultiHandleSinkClientStatus.Slow - self.log.warning('about to remove fd %u from multifdsink because it is too slow!', fileno) - - fdsink.connect('client-fd-removed', on_disconnect) - fdsink.connect('client-removed', on_about_to_disconnect) - - def on_eos(self, bus, message): - self.log.debug('Received End-of-Stream-Signal on Output-Pipeline') - - def on_error(self, bus, message): - self.log.debug('Received Error-Signal on Output-Pipeline') - (error, debug) = message.parse_error() - self.log.debug('Error-Details: #%u: %s', error.code, debug) + + def __init__(self, channel, port): + self.log = logging.getLogger('AVRawOutput[{}]'.format(channel)) + super().__init__(port) + + self.channel = channel + + pipeline = """ + intervideosrc channel=video_{channel} ! + {vcaps} ! + queue ! + mux. + + interaudiosrc channel=audio_{channel} ! + {acaps} ! + queue ! + mux. + + matroskamux + name=mux + streamable=true + writing-app=Voctomix-AVRawOutput ! + + multifdsink + blocksize=1048576 + buffers-max={buffers_max} + sync-method=next-keyframe + name=fd + """.format( + channel=self.channel, + acaps=Config.get('mix', 'audiocaps'), + vcaps=Config.get('mix', 'videocaps'), + buffers_max=Config.get('output-buffers', channel, fallback=500) + ) + self.log.debug('Creating Output-Pipeline:\n%s', pipeline) + self.outputPipeline = Gst.parse_launch(pipeline) + self.outputPipeline.use_clock(Clock) + + self.log.debug('Binding Error & End-of-Stream-Signal ' + 'on Output-Pipeline') + self.outputPipeline.bus.add_signal_watch() + self.outputPipeline.bus.connect("message::eos", self.on_eos) + self.outputPipeline.bus.connect("message::error", self.on_error) + + self.log.debug('Launching Output-Pipeline') + self.outputPipeline.set_state(Gst.State.PLAYING) + + def on_accepted(self, conn, addr): + self.log.debug('Adding fd %u to multifdsink', conn.fileno()) + fdsink = self.outputPipeline.get_by_name('fd') + fdsink.emit('add', conn.fileno()) + + def on_disconnect(multifdsink, fileno): + if fileno == conn.fileno(): + self.log.debug('fd %u removed from multifdsink', fileno) + self.close_connection(conn) + + def on_about_to_disconnect(multifdsink, fileno, status): + # GST_CLIENT_STATUS_SLOW = 3, + if fileno == conn.fileno() and status == 3: + self.log.warning('about to remove fd %u from multifdsink ' + 'because it is too slow!', fileno) + + fdsink.connect('client-fd-removed', on_disconnect) + fdsink.connect('client-removed', on_about_to_disconnect) + + def on_eos(self, bus, message): + self.log.debug('Received End-of-Stream-Signal on Output-Pipeline') + + def on_error(self, bus, message): + self.log.debug('Received Error-Signal on Output-Pipeline') + (error, debug) = message.parse_error() + self.log.debug('Error-Details: #%u: %s', error.code, debug) diff --git a/voctocore/lib/avsource.py b/voctocore/lib/avsource.py index aa604b2..c2e08da 100644 --- a/voctocore/lib/avsource.py +++ b/voctocore/lib/avsource.py @@ -5,116 +5,126 @@ from lib.config import Config from lib.tcpsingleconnection import TCPSingleConnection from lib.clock import Clock + class AVSource(TCPSingleConnection): - def __init__(self, name, port, outputs=None, has_audio=True, has_video=True): - self.log = logging.getLogger('AVSource['+name+']') - super().__init__(port) - - if outputs is None: - outputs = [name] - - assert has_audio or has_video - - self.name = name - self.has_audio = has_audio - self.has_video = has_video - self.outputs = outputs - - def on_accepted(self, conn, addr): - pipeline = """ - fdsrc fd={fd} blocksize=1048576 ! - queue ! - matroskademux name=demux - """.format( - fd=conn.fileno() - ) - - if self.has_audio: - pipeline += """ - demux. ! - {acaps} ! - queue ! - tee name=atee - """.format( - acaps=Config.get('mix', 'audiocaps') - ) - - for output in self.outputs: - pipeline += """ - atee. ! queue ! interaudiosink channel=audio_{output} - """.format( - output=output - ) - - if self.has_video: - pipeline += """ - demux. ! - {vcaps} ! - queue ! - tee name=vtee - """.format( - vcaps=Config.get('mix', 'videocaps') - ) - - for output in self.outputs: - pipeline += """ - vtee. ! queue ! intervideosink channel=video_{output} - """.format( - output=output - ) - - self.log.debug('Launching Source-Pipeline:\n%s', pipeline) - self.receiverPipeline = Gst.parse_launch(pipeline) - self.receiverPipeline.use_clock(Clock) - - self.log.debug('Binding End-of-Stream-Signal on Source-Pipeline') - self.receiverPipeline.bus.add_signal_watch() - self.receiverPipeline.bus.connect("message::eos", self.on_eos) - self.receiverPipeline.bus.connect("message::error", self.on_error) - - self.all_video_caps = Gst.Caps.from_string('video/x-raw') - self.video_caps = Gst.Caps.from_string(Config.get('mix', 'videocaps')) - - self.all_audio_caps = Gst.Caps.from_string('audio/x-raw') - self.audio_caps = Gst.Caps.from_string(Config.get('mix', 'audiocaps')) - - demux = self.receiverPipeline.get_by_name('demux') - demux.connect('pad-added', self.on_pad_added) - - self.receiverPipeline.set_state(Gst.State.PLAYING) - - def on_pad_added(self, demux, src_pad): - caps = src_pad.query_caps(None) - self.log.debug('demuxer added pad w/ caps: %s', caps.to_string()) - if caps.can_intersect(self.all_audio_caps): - self.log.debug('new demuxer-pad is a audio-pad, testing against configured audio-caps') - if not caps.can_intersect(self.audio_caps): - self.log.warning('the incoming connection presented a video-stream that is not compatible to the configured caps') - self.log.warning(' incoming caps: %s', caps.to_string()) - self.log.warning(' configured caps: %s', self.audio_caps.to_string()) - - - elif caps.can_intersect(self.all_video_caps): - self.log.debug('new demuxer-pad is a video-pad, testing against configured video-caps') - if not caps.can_intersect(self.video_caps): - self.log.warning('the incoming connection presented a video-stream that is not compatible to the configured caps') - self.log.warning(' incoming caps: %s', caps.to_string()) - self.log.warning(' configured caps: %s', self.video_caps.to_string()) - - def on_eos(self, bus, message): - self.log.debug('Received End-of-Stream-Signal on Source-Pipeline') - if self.currentConnection is not None: - self.disconnect() - - def on_error(self, bus, message): - self.log.debug('Received Error-Signal on Source-Pipeline') - (error, debug) = message.parse_error() - self.log.debug('Error-Details: #%u: %s', error.code, debug) - - if self.currentConnection is not None: - self.disconnect() - - def disconnect(self): - self.receiverPipeline.set_state(Gst.State.NULL) - self.receiverPipeline = None - self.close_connection() + + def __init__(self, name, port, outputs=None, + has_audio=True, has_video=True): + self.log = logging.getLogger('AVSource[{}]'.format(name)) + super().__init__(port) + + if outputs is None: + outputs = [name] + + assert has_audio or has_video + + self.name = name + self.has_audio = has_audio + self.has_video = has_video + self.outputs = outputs + + def on_accepted(self, conn, addr): + pipeline = """ + fdsrc fd={fd} blocksize=1048576 ! + queue ! + matroskademux name=demux + """.format( + fd=conn.fileno() + ) + + if self.has_audio: + pipeline += """ + demux. ! + {acaps} ! + queue ! + tee name=atee + """.format( + acaps=Config.get('mix', 'audiocaps') + ) + + for output in self.outputs: + pipeline += """ + atee. ! queue ! interaudiosink channel=audio_{output} + """.format( + output=output + ) + + if self.has_video: + pipeline += """ + demux. ! + {vcaps} ! + queue ! + tee name=vtee + """.format( + vcaps=Config.get('mix', 'videocaps') + ) + + for output in self.outputs: + pipeline += """ + vtee. ! queue ! intervideosink channel=video_{output} + """.format( + output=output + ) + + self.log.debug('Launching Source-Pipeline:\n%s', pipeline) + self.receiverPipeline = Gst.parse_launch(pipeline) + self.receiverPipeline.use_clock(Clock) + + self.log.debug('Binding End-of-Stream-Signal on Source-Pipeline') + self.receiverPipeline.bus.add_signal_watch() + self.receiverPipeline.bus.connect("message::eos", self.on_eos) + self.receiverPipeline.bus.connect("message::error", self.on_error) + + self.all_video_caps = Gst.Caps.from_string('video/x-raw') + self.video_caps = Gst.Caps.from_string(Config.get('mix', 'videocaps')) + + self.all_audio_caps = Gst.Caps.from_string('audio/x-raw') + self.audio_caps = Gst.Caps.from_string(Config.get('mix', 'audiocaps')) + + demux = self.receiverPipeline.get_by_name('demux') + demux.connect('pad-added', self.on_pad_added) + + self.receiverPipeline.set_state(Gst.State.PLAYING) + + def on_pad_added(self, demux, src_pad): + caps = src_pad.query_caps(None) + self.log.debug('demuxer added pad w/ caps: %s', caps.to_string()) + if caps.can_intersect(self.all_audio_caps): + self.log.debug('new demuxer-pad is a audio-pad, ' + 'testing against configured audio-caps') + if not caps.can_intersect(self.audio_caps): + self.log.warning('the incoming connection presented ' + 'a video-stream that is not compatible ' + 'to the configured caps') + self.log.warning(' incoming caps: %s', caps.to_string()) + self.log.warning(' configured caps: %s', + self.audio_caps.to_string()) + + elif caps.can_intersect(self.all_video_caps): + self.log.debug('new demuxer-pad is a video-pad, ' + 'testing against configured video-caps') + if not caps.can_intersect(self.video_caps): + self.log.warning('the incoming connection presented ' + 'a video-stream that is not compatible ' + 'to the configured caps') + self.log.warning(' incoming caps: %s', caps.to_string()) + self.log.warning(' configured caps: %s', + self.video_caps.to_string()) + + def on_eos(self, bus, message): + self.log.debug('Received End-of-Stream-Signal on Source-Pipeline') + if self.currentConnection is not None: + self.disconnect() + + def on_error(self, bus, message): + self.log.debug('Received Error-Signal on Source-Pipeline') + (error, debug) = message.parse_error() + self.log.debug('Error-Details: #%u: %s', error.code, debug) + + if self.currentConnection is not None: + self.disconnect() + + def disconnect(self): + self.receiverPipeline.set_state(Gst.State.NULL) + self.receiverPipeline = None + self.close_connection() diff --git a/voctocore/lib/commands.py b/voctocore/lib/commands.py index a5b6ab1..34590b7 100644 --- a/voctocore/lib/commands.py +++ b/voctocore/lib/commands.py @@ -6,236 +6,240 @@ from lib.config import Config from lib.videomix import CompositeModes from lib.response import NotifyResponse, OkResponse -def decodeName(items, name_or_id): - try: - name_or_id = int(name_or_id) - if name_or_id < 0 or name_or_id >= len(items): - raise IndexError("unknown index %d" % name_or_id) - - return name_or_id - - except ValueError as e: - try: - return items.index(name_or_id) - - except ValueError as e: - raise IndexError("unknown name %s" % name_or_id) - -def decodeEnumName(enum, name_or_id): - try: - name_or_id = int(name_or_id) - if name_or_id < 0 or name_or_id >= len(enum): - raise IndexError("unknown index %d" % name_or_id) - - return name_or_id - - except ValueError as e: - try: - return enum[name_or_id] - - except KeyError as e: - raise IndexError("unknown name %s" % name_or_id) - -def encodeName(items, id): - try: - return items[id] - except IndexError as e: - raise IndexError("unknown index %d" % id) - -def encodeEnumName(enum, id): - try: - return enum(id).name - except ValueError as e: - raise IndexError("unknown index %d" % id) - -class ControlServerCommands(object): - def __init__(self, pipeline): - self.log = logging.getLogger('ControlServerCommands') - - self.pipeline = pipeline - self.sources = Config.getlist('mix', 'sources') - self.blankerSources = Config.getlist('stream-blanker', 'sources') - - # Commands are defined below. Errors are sent to the clients by throwing - # exceptions, they will be turned into messages outside. - - def message(self, *args): - """sends a message through the control-server, which can be received by - user-defined scripts. does not change the state of the voctocore.""" - return NotifyResponse('message', *args) - - def help(self): - helplines = [] - - helplines.append("Commands:") - for name, func in ControlServerCommands.__dict__.items(): - if name[0] == '_': - continue - - if not func.__code__: - continue - - params = inspect.signature(func).parameters - params = [str(info) for name, info in params.items()] - params = ', '.join(params[1:]) - - command_sig = '\t' + name - - if params: - command_sig += ': '+params - - if func.__doc__: - command_sig += '\n'+'\n'.join( - ['\t\t'+line.strip() for line in func.__doc__.splitlines()])+'\n' - - helplines.append(command_sig) +def decodeName(items, name_or_id): + try: + name_or_id = int(name_or_id) + if name_or_id < 0 or name_or_id >= len(items): + raise IndexError("unknown index %d" % name_or_id) - helplines.append('\t'+'quit / exit') + return name_or_id - helplines.append("\n") - helplines.append("Source-Names:") - for source in self.sources: - helplines.append("\t"+source) + except ValueError as e: + try: + return items.index(name_or_id) - helplines.append("\n") - helplines.append("Stream-Blanker Sources-Names:") - for source in self.blankerSources: - helplines.append("\t"+source) + except ValueError as e: + raise IndexError("unknown name %s" % name_or_id) - helplines.append("\n") - helplines.append("Composition-Modes:") - for mode in CompositeModes: - helplines.append("\t"+mode.name) - return OkResponse("\n".join(helplines)) +def decodeEnumName(enum, name_or_id): + try: + name_or_id = int(name_or_id) + if name_or_id < 0 or name_or_id >= len(enum): + raise IndexError("unknown index %d" % name_or_id) - def _get_video_status(self): - a = encodeName( self.sources, self.pipeline.vmix.getVideoSourceA() ) - b = encodeName( self.sources, self.pipeline.vmix.getVideoSourceB() ) - return [a, b] + return name_or_id - def get_video(self): - """gets the current video-status, consisting of the name of - video-source A and video-source B""" - status = self._get_video_status() - return OkResponse('video_status', *status) + except ValueError as e: + try: + return enum[name_or_id] - def set_video_a(self, src_name_or_id): - """sets the video-source A to the supplied source-name or source-id, - swapping A and B if the supplied source is currently used as - video-source B""" - src_id = decodeName(self.sources, src_name_or_id) - self.pipeline.vmix.setVideoSourceA(src_id) + except KeyError as e: + raise IndexError("unknown name %s" % name_or_id) - status = self._get_video_status() - return NotifyResponse('video_status', *status) - def set_video_b(self, src_name_or_id): - """sets the video-source B to the supplied source-name or source-id, - swapping A and B if the supplied source is currently used as - video-source A""" - src_id = decodeName(self.sources, src_name_or_id) - self.pipeline.vmix.setVideoSourceB(src_id) +def encodeName(items, id): + try: + return items[id] + except IndexError as e: + raise IndexError("unknown index %d" % id) - status = self._get_video_status() - return NotifyResponse('video_status', *status) +def encodeEnumName(enum, id): + try: + return enum(id).name + except ValueError as e: + raise IndexError("unknown index %d" % id) - def _get_audio_status(self): - src_id = self.pipeline.amix.getAudioSource() - return encodeName(self.sources, src_id) - def get_audio(self): - """gets the name of the current audio-source""" - status = self._get_audio_status() - return OkResponse('audio_status', status) +class ControlServerCommands(object): - def set_audio(self, src_name_or_id): - """sets the audio-source to the supplied source-name or source-id""" - src_id = decodeName(self.sources, src_name_or_id) - self.pipeline.amix.setAudioSource(src_id) + def __init__(self, pipeline): + self.log = logging.getLogger('ControlServerCommands') - status = self._get_audio_status() - return NotifyResponse('audio_status', status) + self.pipeline = pipeline + self.sources = Config.getlist('mix', 'sources') + self.blankerSources = Config.getlist('stream-blanker', 'sources') - def _get_composite_status(self): - mode = self.pipeline.vmix.getCompositeMode() - return encodeEnumName(CompositeModes, mode) + # Commands are defined below. Errors are sent to the clients by throwing + # exceptions, they will be turned into messages outside. - def get_composite_mode(self): - """gets the name of the current composite-mode""" - status = self._get_composite_status() - return OkResponse('composite_mode', status) + def message(self, *args): + """sends a message through the control-server, which can be received by + user-defined scripts. does not change the state of the voctocore.""" + return NotifyResponse('message', *args) - def set_composite_mode(self, mode_name_or_id): - """sets the name of the id of the composite-mode""" - mode = decodeEnumName(CompositeModes, mode_name_or_id) - self.pipeline.vmix.setCompositeMode(mode) + def help(self): + helplines = [] - composite_status = self._get_composite_status() - video_status = self._get_video_status() - return [ - NotifyResponse('composite_mode', composite_status), - NotifyResponse('video_status', *video_status) - ] - + helplines.append("Commands:") + for name, func in ControlServerCommands.__dict__.items(): + if name[0] == '_': + continue - def set_videos_and_composite(self, src_a_name_or_id, src_b_name_or_id, mode_name_or_id): - """sets the A- and the B-source synchronously with the composition-mode - all parametets can be set to "*" which will leave them unchanged.""" - if src_a_name_or_id != '*': - src_a_id = decodeName(self.sources, src_a_name_or_id) - self.pipeline.vmix.setVideoSourceA(src_a_id) + if not func.__code__: + continue - if src_b_name_or_id != '*': - src_b_id = decodeName(self.sources, src_b_name_or_id) - self.pipeline.vmix.setVideoSourceB(src_b_id) - - if mode_name_or_id != '*': - mode = decodeEnumName(CompositeModes, mode_name_or_id) - self.pipeline.vmix.setCompositeMode(mode) - - composite_status = self._get_composite_status() - video_status = self._get_video_status() - - return [ - NotifyResponse('composite_mode', composite_status), - NotifyResponse('video_status', *video_status) - ] - - - def _get_stream_status(self): - blankSource = self.pipeline.streamblanker.blankSource - if blankSource is None: - return ('live',) - - return 'blank', encodeName(self.blankerSources, blankSource) - - def get_stream_status(self): - """gets the current streamblanker-status""" - status = self._get_stream_status() - return OkResponse('stream_status', *status) - - def set_stream_blank(self, source_name_or_id): - """sets the streamblanker-status to blank with the specified - blanker-source-name or -id""" - src_id = decodeName(self.blankerSources, source_name_or_id) - self.pipeline.streamblanker.setBlankSource(src_id) - - status = self._get_stream_status() - return NotifyResponse('stream_status', *status) + params = inspect.signature(func).parameters + params = [str(info) for name, info in params.items()] + params = ', '.join(params[1:]) - def set_stream_live(self): - """sets the streamblanker-status to live""" - self.pipeline.streamblanker.setBlankSource(None) + command_sig = '\t' + name - status = self._get_stream_status() - return NotifyResponse('stream_status', *status) + if params: + command_sig += ': ' + params + + if func.__doc__: + command_sig += '\n\t\t{}\n'.format('\n\t\t'.join( + [line.strip() for line in func.__doc__.splitlines()] + )) + + helplines.append(command_sig) + + helplines.append('\t' + 'quit / exit') + + helplines.append("\n") + helplines.append("Source-Names:") + for source in self.sources: + helplines.append("\t" + source) + helplines.append("\n") + helplines.append("Stream-Blanker Sources-Names:") + for source in self.blankerSources: + helplines.append("\t" + source) + + helplines.append("\n") + helplines.append("Composition-Modes:") + for mode in CompositeModes: + helplines.append("\t" + mode.name) + + return OkResponse("\n".join(helplines)) - def get_config(self): - """returns the parsed server-config""" - confdict = {header: dict(section) for header, section in dict(Config).items()} - return OkResponse('server_config', json.dumps(confdict)) + def _get_video_status(self): + a = encodeName(self.sources, self.pipeline.vmix.getVideoSourceA()) + b = encodeName(self.sources, self.pipeline.vmix.getVideoSourceB()) + return [a, b] + + def get_video(self): + """gets the current video-status, consisting of the name of + video-source A and video-source B""" + status = self._get_video_status() + return OkResponse('video_status', *status) + + def set_video_a(self, src_name_or_id): + """sets the video-source A to the supplied source-name or source-id, + swapping A and B if the supplied source is currently used as + video-source B""" + src_id = decodeName(self.sources, src_name_or_id) + self.pipeline.vmix.setVideoSourceA(src_id) + + status = self._get_video_status() + return NotifyResponse('video_status', *status) + + def set_video_b(self, src_name_or_id): + """sets the video-source B to the supplied source-name or source-id, + swapping A and B if the supplied source is currently used as + video-source A""" + src_id = decodeName(self.sources, src_name_or_id) + self.pipeline.vmix.setVideoSourceB(src_id) + + status = self._get_video_status() + return NotifyResponse('video_status', *status) + + def _get_audio_status(self): + src_id = self.pipeline.amix.getAudioSource() + return encodeName(self.sources, src_id) + + def get_audio(self): + """gets the name of the current audio-source""" + status = self._get_audio_status() + return OkResponse('audio_status', status) + + def set_audio(self, src_name_or_id): + """sets the audio-source to the supplied source-name or source-id""" + src_id = decodeName(self.sources, src_name_or_id) + self.pipeline.amix.setAudioSource(src_id) + + status = self._get_audio_status() + return NotifyResponse('audio_status', status) + + def _get_composite_status(self): + mode = self.pipeline.vmix.getCompositeMode() + return encodeEnumName(CompositeModes, mode) + + def get_composite_mode(self): + """gets the name of the current composite-mode""" + status = self._get_composite_status() + return OkResponse('composite_mode', status) + + def set_composite_mode(self, mode_name_or_id): + """sets the name of the id of the composite-mode""" + mode = decodeEnumName(CompositeModes, mode_name_or_id) + self.pipeline.vmix.setCompositeMode(mode) + + composite_status = self._get_composite_status() + video_status = self._get_video_status() + return [ + NotifyResponse('composite_mode', composite_status), + NotifyResponse('video_status', *video_status) + ] + + def set_videos_and_composite(self, src_a_name_or_id, src_b_name_or_id, + mode_name_or_id): + """sets the A- and the B-source synchronously with the composition-mode + all parametets can be set to "*" which will leave them unchanged.""" + if src_a_name_or_id != '*': + src_a_id = decodeName(self.sources, src_a_name_or_id) + self.pipeline.vmix.setVideoSourceA(src_a_id) + + if src_b_name_or_id != '*': + src_b_id = decodeName(self.sources, src_b_name_or_id) + self.pipeline.vmix.setVideoSourceB(src_b_id) + + if mode_name_or_id != '*': + mode = decodeEnumName(CompositeModes, mode_name_or_id) + self.pipeline.vmix.setCompositeMode(mode) + + composite_status = self._get_composite_status() + video_status = self._get_video_status() + + return [ + NotifyResponse('composite_mode', composite_status), + NotifyResponse('video_status', *video_status) + ] + + def _get_stream_status(self): + blankSource = self.pipeline.streamblanker.blankSource + if blankSource is None: + return ('live',) + + return 'blank', encodeName(self.blankerSources, blankSource) + + def get_stream_status(self): + """gets the current streamblanker-status""" + status = self._get_stream_status() + return OkResponse('stream_status', *status) + + def set_stream_blank(self, source_name_or_id): + """sets the streamblanker-status to blank with the specified + blanker-source-name or -id""" + src_id = decodeName(self.blankerSources, source_name_or_id) + self.pipeline.streamblanker.setBlankSource(src_id) + + status = self._get_stream_status() + return NotifyResponse('stream_status', *status) + + def set_stream_live(self): + """sets the streamblanker-status to live""" + self.pipeline.streamblanker.setBlankSource(None) + + status = self._get_stream_status() + return NotifyResponse('stream_status', *status) + + def get_config(self): + """returns the parsed server-config""" + confdict = {header: dict(section) + for header, section in dict(Config).items()} + return OkResponse('server_config', json.dumps(confdict)) diff --git a/voctocore/lib/config.py b/voctocore/lib/config.py index df0ff9a..388778e 100644 --- a/voctocore/lib/config.py +++ b/voctocore/lib/config.py @@ -5,29 +5,32 @@ from lib.args import Args __all__ = ['Config'] + def getlist(self, section, option): - return [x.strip() for x in self.get(section, option).split(',')] + return [x.strip() for x in self.get(section, option).split(',')] SafeConfigParser.getlist = getlist files = [ - os.path.join(os.path.dirname(os.path.realpath(__file__)), '../default-config.ini'), - os.path.join(os.path.dirname(os.path.realpath(__file__)), '../config.ini'), - '/etc/voctomix/voctocore.ini', - '/etc/voctomix.ini', # deprecated - '/etc/voctocore.ini', - os.path.expanduser('~/.voctomix.ini'), # deprecated - os.path.expanduser('~/.voctocore.ini'), + os.path.join(os.path.dirname(os.path.realpath(__file__)), + '../default-config.ini'), + os.path.join(os.path.dirname(os.path.realpath(__file__)), + '../config.ini'), + '/etc/voctomix/voctocore.ini', + '/etc/voctomix.ini', # deprecated + '/etc/voctocore.ini', + os.path.expanduser('~/.voctomix.ini'), # deprecated + os.path.expanduser('~/.voctocore.ini'), ] if Args.ini_file is not None: - files.append(Args.ini_file) + files.append(Args.ini_file) Config = SafeConfigParser() readfiles = Config.read(files) log = logging.getLogger('ConfigParser') log.debug('considered config-files: \n%s', - "\n".join(["\t\t"+os.path.normpath(file) for file in files]) ) + "\n".join(["\t\t" + os.path.normpath(file) for file in files])) log.debug('successfully parsed config-files: \n%s', - "\n".join(["\t\t"+os.path.normpath(file) for file in readfiles]) ) + "\n".join(["\t\t" + os.path.normpath(file) for file in readfiles])) diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 4c506be..42ae4b2 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -1,4 +1,6 @@ -import socket, logging, traceback +import socket +import logging +import traceback from queue import Queue from gi.repository import GObject @@ -6,164 +8,171 @@ from lib.commands import ControlServerCommands from lib.tcpmulticonnection import TCPMultiConnection from lib.response import NotifyResponse, OkResponse -class ControlServer(TCPMultiConnection): - def __init__(self, pipeline): - '''Initialize server and start listening.''' - self.log = logging.getLogger('ControlServer') - super().__init__(port=9999) - - self.command_queue = Queue() - self.commands = ControlServerCommands(pipeline) - - def on_accepted(self, conn, addr): - '''Asynchronous connection listener. Starts a handler for each connection.''' - self.log.debug('setting gobject io-watch on connection') - GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, ['']) - - def on_data(self, conn, _, leftovers, *args): - '''Asynchronous connection handler. Pushes data from socket - into command queue linewise''' - close_after = False - try: - while True: - try: - leftovers.append(conn.recv(4096).decode(errors='replace')) - if len(leftovers[-1]) == 0: - self.log.info("Socket was closed") - leftovers.pop() - close_after = True - break - - except UnicodeDecodeError as e: - continue - except: - pass +class ControlServer(TCPMultiConnection): - data = "".join(leftovers) - del leftovers[:] + def __init__(self, pipeline): + '''Initialize server and start listening.''' + self.log = logging.getLogger('ControlServer') + super().__init__(port=9999) - lines = data.split('\n') - for line in lines[:-1]: - self.log.debug("got line: %r", line) + self.command_queue = Queue() + + self.commands = ControlServerCommands(pipeline) - line = line.strip() - # 'quit' = remote wants us to close the connection - if line == 'quit' or line == 'exit': - self.log.info("Client asked us to close the Connection") - self.close_connection(conn) - return False + def on_accepted(self, conn, addr): + '''Asynchronous connection listener. + Starts a handler for each connection.''' + self.log.debug('setting gobject io-watch on connection') + GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, ['']) + + def on_data(self, conn, _, leftovers, *args): + '''Asynchronous connection handler. + Pushes data from socket into command queue linewise''' + close_after = False + try: + while True: + try: + leftovers.append(conn.recv(4096).decode(errors='replace')) + if len(leftovers[-1]) == 0: + self.log.info("Socket was closed") + leftovers.pop() + close_after = True + break - self.log.debug('re-starting on_loop scheduling') - GObject.idle_add(self.on_loop) + except UnicodeDecodeError as e: + continue + except: + pass - self.command_queue.put((line, conn)) + data = "".join(leftovers) + del leftovers[:] - if close_after: - self.close_connection(conn) - return False - - if lines[-1] != '': - self.log.debug("remaining %r", lines[-1]) - - leftovers.append(lines[-1]) - return True - - def on_loop(self): - '''Command handler. Processes commands in the command queue whenever - nothing else is happening (registered as GObject idle callback)''' - - self.log.debug('on_loop called') - - if self.command_queue.empty(): - self.log.debug('command_queue is empty again, stopping on_loop scheduling') - return False - - line, requestor = self.command_queue.get() - - words = line.split() - if len(words) < 1: - self.log.debug('command_queue is empty again, stopping on_loop scheduling') - return True - - command = words[0] - args = words[1:] - - self.log.info("processing command %r with args %s", command, args) - - response = None - try: - # deny calling private methods - if command[0] == '_': - self.log.info('private methods are not callable') - raise KeyError() - - command_function = self.commands.__class__.__dict__[command] - - except KeyError as e: - self.log.info("received unknown command %s", command) - response = "error unknown command %s\n" % command - - else: - try: - responseObject = command_function(self.commands, *args) - - except Exception as e: - message = str(e) or "<no message>" - response = "error %s\n" % message - - else: - if isinstance(responseObject, NotifyResponse): - responseObject = [ responseObject ] - - if isinstance(responseObject, list): - for obj in responseObject: - signal = "%s\n" % str(obj) - for conn in self.currentConnections: - self._schedule_write(conn, signal) - else: - response = "%s\n" % str(responseObject) - - finally: - if response is not None and requestor in self.currentConnections: - self._schedule_write(requestor, response) - - return False + lines = data.split('\n') + for line in lines[:-1]: + self.log.debug("got line: %r", line) - def _schedule_write(self, conn, message): - queue = self.currentConnections[conn] + line = line.strip() + # 'quit' = remote wants us to close the connection + if line == 'quit' or line == 'exit': + self.log.info("Client asked us to close the Connection") + self.close_connection(conn) + return False - self.log.debug('re-starting on_write[%u] scheduling', conn.fileno()) - GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) + self.log.debug('re-starting on_loop scheduling') + GObject.idle_add(self.on_loop) + + self.command_queue.put((line, conn)) + + if close_after: + self.close_connection(conn) + return False - queue.put(message) + if lines[-1] != '': + self.log.debug("remaining %r", lines[-1]) + + leftovers.append(lines[-1]) + return True + + def on_loop(self): + '''Command handler. Processes commands in the command queue whenever + nothing else is happening (registered as GObject idle callback)''' + + self.log.debug('on_loop called') + + if self.command_queue.empty(): + self.log.debug('command_queue is empty again, ' + 'stopping on_loop scheduling') + return False + + line, requestor = self.command_queue.get() + + words = line.split() + if len(words) < 1: + self.log.debug('command_queue is empty again, ' + 'stopping on_loop scheduling') + return True + + command = words[0] + args = words[1:] + + self.log.info("processing command %r with args %s", command, args) + + response = None + try: + # deny calling private methods + if command[0] == '_': + self.log.info('private methods are not callable') + raise KeyError() + + command_function = self.commands.__class__.__dict__[command] + + except KeyError as e: + self.log.info("received unknown command %s", command) + response = "error unknown command %s\n" % command + + else: + try: + responseObject = command_function(self.commands, *args) + + except Exception as e: + message = str(e) or "<no message>" + response = "error %s\n" % message + + else: + if isinstance(responseObject, NotifyResponse): + responseObject = [responseObject] + + if isinstance(responseObject, list): + for obj in responseObject: + signal = "%s\n" % str(obj) + for conn in self.currentConnections: + self._schedule_write(conn, signal) + else: + response = "%s\n" % str(responseObject) + + finally: + if response is not None and requestor in self.currentConnections: + self._schedule_write(requestor, response) + + return False + + def _schedule_write(self, conn, message): + queue = self.currentConnections[conn] - def on_write(self, conn, *args): - self.log.debug('on_write[%u] called', conn.fileno()) + self.log.debug('re-starting on_write[%u] scheduling', conn.fileno()) + GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) - try: - queue = self.currentConnections[conn] - except KeyError: - return False + queue.put(message) - if queue.empty(): - self.log.debug('write_queue[%u] is empty again, stopping on_write scheduling', conn.fileno()) - return False + def on_write(self, conn, *args): + self.log.debug('on_write[%u] called', conn.fileno()) - message = queue.get() - try: - conn.send(message.encode()) - except Exception as e: - self.log.warn(e) + try: + queue = self.currentConnections[conn] + except KeyError: + return False + + if queue.empty(): + self.log.debug('write_queue[%u] is empty again, ' + 'stopping on_write scheduling', + conn.fileno()) + return False - return True + message = queue.get() + try: + conn.send(message.encode()) + except Exception as e: + self.log.warn(e) + + return True - def notify_all(self, msg): - try: - words = msg.split() - words[-1] = self.commands.encodeSourceName(int(words[-1])) - msg = " ".join(words) + '\n' - for conn in self.currentConnections: - self._schedule_write(conn, msg) - except Exception as e: - self.log.debug("error during notify: %s", e) + def notify_all(self, msg): + try: + words = msg.split() + words[-1] = self.commands.encodeSourceName(int(words[-1])) + msg = " ".join(words) + '\n' + for conn in self.currentConnections: + self._schedule_write(conn, msg) + except Exception as e: + self.log.debug("error during notify: %s", e) diff --git a/voctocore/lib/loghandler.py b/voctocore/lib/loghandler.py index 2cc7ceb..6efb890 100644 --- a/voctocore/lib/loghandler.py +++ b/voctocore/lib/loghandler.py @@ -1,41 +1,57 @@ -import logging, time +import logging +import time -class LogFormatter(logging.Formatter): - def __init__(self, docolor, timestamps=False): - super().__init__() - self.docolor = docolor - self.timestamps = timestamps - - def formatMessage(self, record): - if self.docolor: - c_lvl = 33 - c_mod = 32 - c_msg = 0 - - if record.levelno == logging.WARNING: - c_lvl = 31 - #c_mod = 33 - c_msg = 33 - - elif record.levelno > logging.WARNING: - c_lvl = 31 - c_mod = 31 - c_msg = 31 - fmt = '\x1b['+str(c_lvl)+'m%(levelname)8s\x1b[0m \x1b['+str(c_mod)+'m%(name)s\x1b['+str(c_msg)+'m: %(message)s\x1b[0m' - else: - fmt = '%(levelname)8s %(name)s: %(message)s' - - if self.timestamps: - fmt = '%(asctime)s '+fmt - - if not 'asctime' in record.__dict__: - record.__dict__['asctime']=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(record.__dict__['created'])) +class LogFormatter(logging.Formatter): - return fmt % record.__dict__ + def __init__(self, docolor, timestamps=False): + super().__init__() + self.docolor = docolor + self.timestamps = timestamps + + def formatMessage(self, record): + if self.docolor: + c_lvl = 33 + c_mod = 32 + c_msg = 0 + + if record.levelno == logging.WARNING: + c_lvl = 31 + # c_mod = 33 + c_msg = 33 + + elif record.levelno > logging.WARNING: + c_lvl = 31 + c_mod = 31 + c_msg = 31 + + fmt = ''.join([ + '\x1b[%dm' % c_lvl, # set levelname color + '%(levelname)8s', # print levelname + '\x1b[0m', # reset formatting + '\x1b[%dm' % c_mod, # set name color + ' %(name)s', # print name + '\x1b[%dm' % c_msg, # set message color + ': %(message)s', # print message + '\x1b[0m' # reset formatting + ]) + else: + fmt = '%(levelname)8s %(name)s: %(message)s' + + if self.timestamps: + fmt = '%(asctime)s ' + fmt + + if 'asctime' not in record.__dict__: + record.__dict__['asctime'] = time.strftime( + "%Y-%m-%d %H:%M:%S", + time.localtime(record.__dict__['created']) + ) + + return fmt % record.__dict__ class LogHandler(logging.StreamHandler): - def __init__(self, docolor, timestamps): - super().__init__() - self.setFormatter(LogFormatter(docolor,timestamps)) + + def __init__(self, docolor, timestamps): + super().__init__() + self.setFormatter(LogFormatter(docolor, timestamps)) diff --git a/voctocore/lib/pipeline.py b/voctocore/lib/pipeline.py index a13f1d2..1a1349e 100644 --- a/voctocore/lib/pipeline.py +++ b/voctocore/lib/pipeline.py @@ -10,93 +10,99 @@ from lib.videomix import VideoMix from lib.audiomix import AudioMix from lib.streamblanker import StreamBlanker -class Pipeline(object): - """mixing, streaming and encoding pipeline constuction and control""" - - def __init__(self): - self.log = logging.getLogger('Pipeline') - self.log.info('Video-Caps configured to: %s', Config.get('mix', 'videocaps')) - self.log.info('Audio-Caps configured to: %s', Config.get('mix', 'audiocaps')) - - names = Config.getlist('mix', 'sources') - if len(names) < 1: - raise RuntimeError("At least one AVSource must be configured!") - - self.sources = [] - self.mirrors = [] - self.previews = [] - self.sbsources = [] - - self.log.info('Creating %u Creating AVSources: %s', len(names), names) - for idx, name in enumerate(names): - port = 10000 + idx - self.log.info('Creating AVSource %s at tcp-port %u', name, port) - - outputs = [name+'_mixer', name+'_mirror'] - if Config.getboolean('previews', 'enabled'): - outputs.append(name+'_preview') - - source = AVSource(name, port, outputs=outputs) - self.sources.append(source) - - - port = 13000 + idx - self.log.info('Creating Mirror-Output for AVSource %s at tcp-port %u', name, port) - - mirror = AVRawOutput('%s_mirror' % name, port) - self.mirrors.append(mirror) - - - if Config.getboolean('previews', 'enabled'): - port = 14000 + idx - self.log.info('Creating Preview-Output for AVSource %s at tcp-port %u', name, port) - - preview = AVPreviewOutput('%s_preview' % name, port) - self.previews.append(preview) - - self.log.info('Creating Videmixer') - self.vmix = VideoMix() - - self.log.info('Creating Audiomixer') - self.amix = AudioMix() - - port = 16000 - self.log.info('Creating Mixer-Background VSource at tcp-port %u', port) - self.bgsrc = AVSource('background', port, has_audio=False) - - port = 11000 - self.log.info('Creating Mixer-Output at tcp-port %u', port) - self.mixout = AVRawOutput('mix_out', port) - - - if Config.getboolean('previews', 'enabled'): - port = 12000 - self.log.info('Creating Preview-Output for AVSource %s at tcp-port %u', name, port) - - self.mixpreview = AVPreviewOutput('mix_preview', port) - - if Config.getboolean('stream-blanker', 'enabled'): - names = Config.getlist('stream-blanker', 'sources') - if len(names) < 1: - raise RuntimeError("At least one StreamBlanker-Source must be configured or the StreamBlanker disabled!") - for idx, name in enumerate(names): - port = 17000 + idx - self.log.info('Creating StreamBlanker VSource %s at tcp-port %u', name, port) - - source = AVSource('%s_streamblanker' % name, port, has_audio=False) - self.sbsources.append(source) - - port = 18000 - self.log.info('Creating StreamBlanker ASource at tcp-port %u', port) - - source = AVSource('streamblanker', port, has_video=False) - self.sbsources.append(source) - - - self.log.info('Creating StreamBlanker') - self.streamblanker = StreamBlanker() - - port = 15000 - self.log.info('Creating StreamBlanker-Output at tcp-port %u', port) - self.streamout = AVRawOutput('streamblanker_out', port) +class Pipeline(object): + """mixing, streaming and encoding pipeline constuction and control""" + + def __init__(self): + self.log = logging.getLogger('Pipeline') + self.log.info('Video-Caps configured to: %s', + Config.get('mix', 'videocaps')) + self.log.info('Audio-Caps configured to: %s', + Config.get('mix', 'audiocaps')) + + names = Config.getlist('mix', 'sources') + if len(names) < 1: + raise RuntimeError("At least one AVSource must be configured!") + + self.sources = [] + self.mirrors = [] + self.previews = [] + self.sbsources = [] + + self.log.info('Creating %u Creating AVSources: %s', len(names), names) + for idx, name in enumerate(names): + port = 10000 + idx + self.log.info('Creating AVSource %s at tcp-port %u', name, port) + + outputs = [name + '_mixer', name + '_mirror'] + if Config.getboolean('previews', 'enabled'): + outputs.append(name + '_preview') + + source = AVSource(name, port, outputs=outputs) + self.sources.append(source) + + port = 13000 + idx + self.log.info('Creating Mirror-Output for AVSource %s ' + 'at tcp-port %u', name, port) + + mirror = AVRawOutput('%s_mirror' % name, port) + self.mirrors.append(mirror) + + if Config.getboolean('previews', 'enabled'): + port = 14000 + idx + self.log.info('Creating Preview-Output for AVSource %s ' + 'at tcp-port %u', name, port) + + preview = AVPreviewOutput('%s_preview' % name, port) + self.previews.append(preview) + + self.log.info('Creating Videmixer') + self.vmix = VideoMix() + + self.log.info('Creating Audiomixer') + self.amix = AudioMix() + + port = 16000 + self.log.info('Creating Mixer-Background VSource at tcp-port %u', port) + self.bgsrc = AVSource('background', port, has_audio=False) + + port = 11000 + self.log.info('Creating Mixer-Output at tcp-port %u', port) + self.mixout = AVRawOutput('mix_out', port) + + if Config.getboolean('previews', 'enabled'): + port = 12000 + self.log.info('Creating Preview-Output for AVSource %s ' + 'at tcp-port %u', name, port) + + self.mixpreview = AVPreviewOutput('mix_preview', port) + + if Config.getboolean('stream-blanker', 'enabled'): + names = Config.getlist('stream-blanker', 'sources') + if len(names) < 1: + raise RuntimeError('At least one StreamBlanker-Source must ' + 'be configured or the ' + 'StreamBlanker disabled!') + for idx, name in enumerate(names): + port = 17000 + idx + self.log.info('Creating StreamBlanker VSource %s ' + 'at tcp-port %u', name, port) + + source = AVSource('{}_streamblanker'.format(name), port, + has_audio=False) + self.sbsources.append(source) + + port = 18000 + self.log.info('Creating StreamBlanker ASource at tcp-port %u', + port) + + source = AVSource('streamblanker', port, has_video=False) + self.sbsources.append(source) + + self.log.info('Creating StreamBlanker') + self.streamblanker = StreamBlanker() + + port = 15000 + self.log.info('Creating StreamBlanker-Output at tcp-port %u', port) + self.streamout = AVRawOutput('streamblanker_out', port) diff --git a/voctocore/lib/response.py b/voctocore/lib/response.py index 9f4ad84..b5f7578 100644 --- a/voctocore/lib/response.py +++ b/voctocore/lib/response.py @@ -1,14 +1,16 @@ class Response(object): - def __init__(self, *args): - self.args = args - def __str__(self): - return " ".join(map(str, self.args)) + def __init__(self, *args): + self.args = args + + def __str__(self): + return " ".join(map(str, self.args)) class OkResponse(Response): - pass + pass + class NotifyResponse(Response): - pass + pass diff --git a/voctocore/lib/streamblanker.py b/voctocore/lib/streamblanker.py index fea3d6a..67af0cd 100644 --- a/voctocore/lib/streamblanker.py +++ b/voctocore/lib/streamblanker.py @@ -5,97 +5,104 @@ from enum import Enum from lib.config import Config from lib.clock import Clock -class StreamBlanker(object): - log = logging.getLogger('StreamBlanker') - - def __init__(self): - self.acaps = Config.get('mix', 'audiocaps') - self.vcaps = Config.get('mix', 'videocaps') - - self.names = Config.getlist('stream-blanker', 'sources') - self.log.info('Configuring StreamBlanker video %u Sources', len(self.names)) - - pipeline = """ - compositor name=vmix ! - {vcaps} ! - intervideosink channel=video_streamblanker_out - - audiomixer name=amix ! - {acaps} ! - interaudiosink channel=audio_streamblanker_out - - - intervideosrc channel=video_mix_streamblanker ! - {vcaps} ! - vmix. - - interaudiosrc channel=audio_mix_streamblanker ! - {acaps} ! - amix. - - - interaudiosrc channel=audio_streamblanker ! - {acaps} ! - amix. - """.format( - acaps=self.acaps, - vcaps=self.vcaps - ) - - for name in self.names: - pipeline += """ - intervideosrc channel=video_{name}_streamblanker ! - {vcaps} ! - vmix. - """.format( - name=name, - vcaps=self.vcaps - ) - - self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline) - self.mixingPipeline = Gst.parse_launch(pipeline) - self.mixingPipeline.use_clock(Clock) - self.log.debug('Binding Error & End-of-Stream-Signal on Mixing-Pipeline') - self.mixingPipeline.bus.add_signal_watch() - self.mixingPipeline.bus.connect("message::eos", self.on_eos) - self.mixingPipeline.bus.connect("message::error", self.on_error) - - self.log.debug('Initializing Mixer-State') - self.blankSource = None - self.applyMixerState() - - self.log.debug('Launching Mixing-Pipeline') - self.mixingPipeline.set_state(Gst.State.PLAYING) - - def on_eos(self, bus, message): - self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline') - - def on_error(self, bus, message): - self.log.debug('Received Error-Signal on Mixing-Pipeline') - (error, debug) = message.parse_error() - self.log.debug('Error-Details: #%u: %s', error.code, debug) - - - def applyMixerState(self): - self.applyMixerStateAudio() - self.applyMixerStateVideo() - - def applyMixerStateAudio(self): - mixpad = self.mixingPipeline.get_by_name('amix').get_static_pad('sink_0') - blankpad = self.mixingPipeline.get_by_name('amix').get_static_pad('sink_1') - - mixpad.set_property('volume', int(self.blankSource is None)) - blankpad.set_property('volume', int(self.blankSource is not None)) - - def applyMixerStateVideo(self): - mixpad = self.mixingPipeline.get_by_name('vmix').get_static_pad('sink_0') - mixpad.set_property('alpha', int(self.blankSource is None)) - - for idx, name in enumerate(self.names): - blankpad = self.mixingPipeline.get_by_name('vmix').get_static_pad('sink_%u' % (idx+1)) - blankpad.set_property('alpha', int(self.blankSource == idx)) - - def setBlankSource(self, source): - self.blankSource = source - self.applyMixerState() +class StreamBlanker(object): + log = logging.getLogger('StreamBlanker') + + def __init__(self): + self.acaps = Config.get('mix', 'audiocaps') + self.vcaps = Config.get('mix', 'videocaps') + + self.names = Config.getlist('stream-blanker', 'sources') + self.log.info('Configuring StreamBlanker video %u Sources', + len(self.names)) + + pipeline = """ + compositor name=vmix ! + {vcaps} ! + intervideosink channel=video_streamblanker_out + + audiomixer name=amix ! + {acaps} ! + interaudiosink channel=audio_streamblanker_out + + + intervideosrc channel=video_mix_streamblanker ! + {vcaps} ! + vmix. + + interaudiosrc channel=audio_mix_streamblanker ! + {acaps} ! + amix. + + + interaudiosrc channel=audio_streamblanker ! + {acaps} ! + amix. + """.format( + acaps=self.acaps, + vcaps=self.vcaps + ) + + for name in self.names: + pipeline += """ + intervideosrc channel=video_{name}_streamblanker ! + {vcaps} ! + vmix. + """.format( + name=name, + vcaps=self.vcaps + ) + + self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline) + self.mixingPipeline = Gst.parse_launch(pipeline) + self.mixingPipeline.use_clock(Clock) + + self.log.debug('Binding Error & End-of-Stream-Signal ' + 'on Mixing-Pipeline') + self.mixingPipeline.bus.add_signal_watch() + self.mixingPipeline.bus.connect("message::eos", self.on_eos) + self.mixingPipeline.bus.connect("message::error", self.on_error) + + self.log.debug('Initializing Mixer-State') + self.blankSource = None + self.applyMixerState() + + self.log.debug('Launching Mixing-Pipeline') + self.mixingPipeline.set_state(Gst.State.PLAYING) + + def on_eos(self, bus, message): + self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline') + + def on_error(self, bus, message): + self.log.debug('Received Error-Signal on Mixing-Pipeline') + (error, debug) = message.parse_error() + self.log.debug('Error-Details: #%u: %s', error.code, debug) + + def applyMixerState(self): + self.applyMixerStateAudio() + self.applyMixerStateVideo() + + def applyMixerStateAudio(self): + mixpad = (self.mixingPipeline.get_by_name('amix') + .get_static_pad('sink_0')) + blankpad = (self.mixingPipeline.get_by_name('amix') + .get_static_pad('sink_1')) + + mixpad.set_property('volume', int(self.blankSource is None)) + blankpad.set_property('volume', int(self.blankSource is not None)) + + def applyMixerStateVideo(self): + mixpad = (self.mixingPipeline.get_by_name('vmix') + .get_static_pad('sink_0')) + mixpad.set_property('alpha', int(self.blankSource is None)) + + for idx, name in enumerate(self.names): + blankpad = (self.mixingPipeline + .get_by_name('vmix') + .get_static_pad('sink_%u' % (idx + 1))) + blankpad.set_property('alpha', int(self.blankSource == idx)) + + def setBlankSource(self, source): + self.blankSource = source + self.applyMixerState() diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py index 66fc33f..ac228a3 100644 --- a/voctocore/lib/tcpmulticonnection.py +++ b/voctocore/lib/tcpmulticonnection.py @@ -1,41 +1,48 @@ -import logging, socket +import logging +import socket from queue import Queue from gi.repository import GObject from lib.config import Config + class TCPMultiConnection(object): - def __init__(self, port): - if not hasattr(self, 'log'): - self.log = logging.getLogger('TCPMultiConnection') - self.boundSocket = None - self.currentConnections = dict() + def __init__(self, port): + if not hasattr(self, 'log'): + self.log = logging.getLogger('TCPMultiConnection') + + self.boundSocket = None + self.currentConnections = dict() - self.log.debug('Binding to Source-Socket on [::]:%u', port) - self.boundSocket = socket.socket(socket.AF_INET6) - self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - self.boundSocket.bind(('::', port)) - self.boundSocket.listen(1) + self.log.debug('Binding to Source-Socket on [::]:%u', port) + self.boundSocket = socket.socket(socket.AF_INET6) + self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, + False) + self.boundSocket.bind(('::', port)) + self.boundSocket.listen(1) - self.log.debug('Setting GObject io-watch on Socket') - GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) + self.log.debug('Setting GObject io-watch on Socket') + GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) - def on_connect(self, sock, *args): - conn, addr = sock.accept() - conn.setblocking(False) + def on_connect(self, sock, *args): + conn, addr = sock.accept() + conn.setblocking(False) - self.log.info("Incomming Connection from [%s]:%u (fd=%u)", addr[0], addr[1], conn.fileno()) + self.log.info("Incomming Connection from [%s]:%u (fd=%u)", + addr[0], addr[1], conn.fileno()) - self.currentConnections[conn] = Queue() - self.log.info('Now %u Receiver connected', len(self.currentConnections)) + self.currentConnections[conn] = Queue() + self.log.info('Now %u Receiver connected', + len(self.currentConnections)) - self.on_accepted(conn, addr) + self.on_accepted(conn, addr) - return True + return True - def close_connection(self, conn): - if conn in self.currentConnections: - del(self.currentConnections[conn]) - self.log.info('Now %u Receiver connected', len(self.currentConnections)) + def close_connection(self, conn): + if conn in self.currentConnections: + del(self.currentConnections[conn]) + self.log.info('Now %u Receiver connected', + len(self.currentConnections)) diff --git a/voctocore/lib/tcpsingleconnection.py b/voctocore/lib/tcpsingleconnection.py index bf89651..2d3c658 100644 --- a/voctocore/lib/tcpsingleconnection.py +++ b/voctocore/lib/tcpsingleconnection.py @@ -1,39 +1,45 @@ -import logging, socket, time +import logging +import socket +import time from gi.repository import GObject from lib.config import Config + class TCPSingleConnection(object): - def __init__(self, port): - if not hasattr(self, 'log'): - self.log = logging.getLogger('TCPMultiConnection') - self.log.debug('Binding to Source-Socket on [::]:%u', port) - self.boundSocket = socket.socket(socket.AF_INET6) - self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - self.boundSocket.bind(('::', port)) - self.boundSocket.listen(1) + def __init__(self, port): + if not hasattr(self, 'log'): + self.log = logging.getLogger('TCPMultiConnection') + + self.log.debug('Binding to Source-Socket on [::]:%u', port) + self.boundSocket = socket.socket(socket.AF_INET6) + self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, + False) + self.boundSocket.bind(('::', port)) + self.boundSocket.listen(1) - self.currentConnection = None + self.currentConnection = None - self.log.debug('Setting GObject io-watch on Socket') - GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) + self.log.debug('Setting GObject io-watch on Socket') + GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) - def on_connect(self, sock, *args): - conn, addr = sock.accept() - self.log.info("Incomming Connection from %s", addr) + def on_connect(self, sock, *args): + conn, addr = sock.accept() + self.log.info('Incomming Connection from %s', addr) - if self.currentConnection is not None: - self.log.warn("Another Source is already connected, closing existing pipeline") - self.disconnect() - time.sleep(1) + if self.currentConnection is not None: + self.log.warn('Another Source is already connected, ' + 'closing existing pipeline') + self.disconnect() + time.sleep(1) - self.on_accepted(conn, addr) - self.currentConnection = conn + self.on_accepted(conn, addr) + self.currentConnection = conn - return True + return True - def close_connection(self): - self.currentConnection = None - self.log.info('Connection closed') + def close_connection(self): + self.currentConnection = None + self.log.info('Connection closed') diff --git a/voctocore/lib/videomix.py b/voctocore/lib/videomix.py index 95d538d..4ab740f 100644 --- a/voctocore/lib/videomix.py +++ b/voctocore/lib/videomix.py @@ -5,377 +5,411 @@ from enum import Enum, unique from lib.config import Config from lib.clock import Clock + @unique class CompositeModes(Enum): - fullscreen = 0 - side_by_side_equal = 1 - side_by_side_preview = 2 - picture_in_picture = 3 - -class PadState(object): - def __init__(self): - self.reset() + fullscreen = 0 + side_by_side_equal = 1 + side_by_side_preview = 2 + picture_in_picture = 3 - def reset(self): - self.alpha = 1.0 - self.xpos = 0 - self.ypos = 0 - self.zorder = 1 - self.width = 0 - self.height = 0 -class VideoMix(object): - log = logging.getLogger('VideoMix') - - def __init__(self): - self.caps = Config.get('mix', 'videocaps') - - self.names = Config.getlist('mix', 'sources') - self.log.info('Configuring Mixer for %u Sources', len(self.names)) - - pipeline = """ - compositor name=mix ! - {caps} ! - identity name=sig ! - queue ! - tee name=tee +class PadState(object): - intervideosrc channel=video_background ! - {caps} ! - mix. - - tee. ! queue ! intervideosink channel=video_mix_out - """.format( - caps=self.caps - ) - - if Config.getboolean('previews', 'enabled'): - pipeline += """ - tee. ! queue ! intervideosink channel=video_mix_preview - """ - - if Config.getboolean('stream-blanker', 'enabled'): - pipeline += """ - tee. ! queue ! intervideosink channel=video_mix_streamblanker - """ - - for idx, name in enumerate(self.names): - pipeline += """ - intervideosrc channel=video_{name}_mixer ! - {caps} ! - mix. - """.format( - name=name, - caps=self.caps, - idx=idx - ) - - self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline) - self.mixingPipeline = Gst.parse_launch(pipeline) - self.mixingPipeline.use_clock(Clock) - - self.log.debug('Binding Error & End-of-Stream-Signal on Mixing-Pipeline') - self.mixingPipeline.bus.add_signal_watch() - self.mixingPipeline.bus.connect("message::eos", self.on_eos) - self.mixingPipeline.bus.connect("message::error", self.on_error) + def __init__(self): + self.reset() - self.log.debug('Binding Handoff-Handler for Synchronus mixer manipulation') - sig = self.mixingPipeline.get_by_name('sig') - sig.connect('handoff', self.on_handoff) - - self.padStateDirty = False - self.padState = list() - for idx, name in enumerate(self.names): - self.padState.append(PadState()) - - self.log.debug('Initializing Mixer-State') - self.compositeMode = CompositeModes.fullscreen - self.sourceA = 0 - self.sourceB = 1 - self.recalculateMixerState() - self.applyMixerState() + def reset(self): + self.alpha = 1.0 + self.xpos = 0 + self.ypos = 0 + self.zorder = 1 + self.width = 0 + self.height = 0 - bgMixerpad = self.mixingPipeline.get_by_name('mix').get_static_pad('sink_0') - bgMixerpad.set_property('zorder', 0) - self.log.debug('Launching Mixing-Pipeline') - self.mixingPipeline.set_state(Gst.State.PLAYING) - - def getInputVideoSize(self): - caps = Gst.Caps.from_string(self.caps) - struct = caps.get_structure(0) - _, width = struct.get_int('width') - _, height = struct.get_int('height') - - return width, height - - def recalculateMixerState(self): - if self.compositeMode == CompositeModes.fullscreen: - self.recalculateMixerStateFullscreen() - - elif self.compositeMode == CompositeModes.side_by_side_equal: - self.recalculateMixerStateSideBySideEqual() - - elif self.compositeMode == CompositeModes.side_by_side_preview: - self.recalculateMixerStateSideBySidePreview() - - elif self.compositeMode == CompositeModes.picture_in_picture: - self.recalculateMixerStatePictureInPicture() - - self.log.debug('Marking Pad-State as Dirty') - self.padStateDirty = True - - def recalculateMixerStateFullscreen(self): - self.log.info('Updating Mixer-State for Fullscreen-Composition') - - for idx, name in enumerate(self.names): - pad = self.padState[idx] - - pad.reset() - pad.alpha = float(idx == self.sourceA) - - def recalculateMixerStateSideBySideEqual(self): - self.log.info('Updating Mixer-State for Side-by-side-Equal-Composition') - - width, height = self.getInputVideoSize() - self.log.debug('Video-Size parsed as %ux%u', width, height) - - try: - gutter = Config.getint('side-by-side-equal', 'gutter') - self.log.debug('Gutter configured to %u', gutter) - except: - gutter = int(width / 100) - self.log.debug('Gutter calculated to %u', gutter) - - targetWidth = int((width - gutter) / 2) - targetHeight = int(targetWidth / width * height) - - self.log.debug('Video-Size calculated to %ux%u', targetWidth, targetHeight) - - xa = 0 - xb = width - targetWidth - y = (height - targetHeight) / 2 - - try: - ya = Config.getint('side-by-side-equal', 'atop') - self.log.debug('A-Video Y-Pos configured to %u', ya) - except: - ya = y - self.log.debug('A-Video Y-Pos calculated to %u', ya) - - - try: - yb = Config.getint('side-by-side-equal', 'btop') - self.log.debug('B-Video Y-Pos configured to %u', yb) - except: - yb = y - self.log.debug('B-Video Y-Pos calculated to %u', yb) - - - for idx, name in enumerate(self.names): - pad = self.padState[idx] - pad.reset() - - pad.width = targetWidth - pad.height = targetHeight - - if idx == self.sourceA: - pad.xpos = xa - pad.ypos = ya - pad.zorder = 1 - - elif idx == self.sourceB: - pad.xpos = xb - pad.ypos = yb - pad.zorder = 2 - - else: - pad.alpha = 0 - - def recalculateMixerStateSideBySidePreview(self): - self.log.info('Updating Mixer-State for Side-by-side-Preview-Composition') - - width, height = self.getInputVideoSize() - self.log.debug('Video-Size parsed as %ux%u', width, height) - - try: - asize = [int(i) for i in Config.get('side-by-side-preview', 'asize').split('x', 1)] - self.log.debug('A-Video-Size configured to %ux%u', asize[0], asize[1]) - except: - asize = [ - int(width / 1.25), # 80% - int(height / 1.25) # 80% - ] - self.log.debug('A-Video-Size calculated to %ux%u', asize[0], asize[1]) - - try: - apos = [int(i) for i in Config.get('side-by-side-preview', 'apos').split('/', 1)] - self.log.debug('B-Video-Position configured to %u/%u', apos[0], apos[1]) - except: - apos = [ - int(width / 100), # 1% - int(width / 100) # 1% - ] - self.log.debug('B-Video-Position calculated to %u/%u', apos[0], apos[1]) - - try: - bsize = [int(i) for i in Config.get('side-by-side-preview', 'bsize').split('x', 1)] - self.log.debug('B-Video-Size configured to %ux%u', bsize[0], bsize[1]) - except: - bsize = [ - int(width / 4), # 25% - int(height / 4) # 25% - ] - self.log.debug('B-Video-Size calculated to %ux%u', bsize[0], bsize[1]) - - try: - bpos = [int(i) for i in Config.get('side-by-side-preview', 'bpos').split('/', 1)] - self.log.debug('B-Video-Position configured to %u/%u', bpos[0], bpos[1]) - except: - bpos = [ - width - int(width / 100) - bsize[0], - height - int(width / 100) - bsize[1] # 1% - ] - self.log.debug('B-Video-Position calculated to %u/%u', bpos[0], bpos[1]) - - for idx, name in enumerate(self.names): - pad = self.padState[idx] - pad.reset() - - if idx == self.sourceA: - pad.xpos, pad.ypos = apos - pad.width, pad.height = asize - pad.zorder = 1 - - elif idx == self.sourceB: - pad.xpos, pad.ypos = bpos - pad.width, pad.height = bsize - pad.zorder = 2 - - else: - pad.alpha = 0 - - def recalculateMixerStatePictureInPicture(self): - self.log.info('Updating Mixer-State for Picture-in-Picture-Composition') - - width, height = self.getInputVideoSize() - self.log.debug('Video-Size parsed as %ux%u', width, height) - - try: - pipsize = [int(i) for i in Config.get('picture-in-picture', 'pipsize').split('x', 1)] - self.log.debug('PIP-Size configured to %ux%u', pipsize[0], pipsize[1]) - except: - pipsize = [ - int(width / 4), # 25% - int(height / 4) # 25% - ] - self.log.debug('PIP-Size calculated to %ux%u', pipsize[0], pipsize[1]) - - try: - pippos = [int(i) for i in Config.get('picture-in-picture', 'pippos').split('/', 1)] - self.log.debug('PIP-Position configured to %u/%u', pippos[0], pippos[1]) - except: - pippos = [ - width - pipsize[0] - int(width / 100), # 1% - height - pipsize[1] -int(width / 100) # 1% - ] - self.log.debug('PIP-Position calculated to %u/%u', pippos[0], pippos[1]) - - for idx, name in enumerate(self.names): - pad = self.padState[idx] - pad.reset() - - if idx == self.sourceA: - pass - elif idx == self.sourceB: - pad.xpos, pad.ypos = pippos - pad.width, pad.height = pipsize - pad.zorder = 2 - - else: - pad.alpha = 0 - - def applyMixerState(self): - for idx, state in enumerate(self.padState): - # mixerpad 0 = background - mixerpad = self.mixingPipeline.get_by_name('mix').get_static_pad('sink_%u' % (idx+1)) - - self.log.debug('Reconfiguring Mixerpad %u to x/y=%u/%u, w/h=%u/%u alpha=%0.2f, zorder=%u', \ - idx, state.xpos, state.ypos, state.width, state.height, state.alpha, state.zorder) - mixerpad.set_property('xpos', state.xpos) - mixerpad.set_property('ypos', state.ypos) - mixerpad.set_property('width', state.width) - mixerpad.set_property('height', state.height) - mixerpad.set_property('alpha', state.alpha) - mixerpad.set_property('zorder', state.zorder) - - def selectCompositeModeDefaultSources(self): - sectionNames = { - CompositeModes.fullscreen: 'fullscreen', - CompositeModes.side_by_side_equal: 'side-by-side-equal', - CompositeModes.side_by_side_preview: 'side-by-side-preview', - CompositeModes.picture_in_picture: 'picture-in-picture' - } - - compositeModeName = self.compositeMode.name - sectionName = sectionNames[self.compositeMode] - - try: - defSource = Config.get(sectionName, 'default-a') - self.setVideoSourceA(self.names.index(defSource)) - self.log.info('Changing sourceA to default of Mode %s: %s', compositeModeName, defSource) - except Exception as e: - pass - - try: - defSource = Config.get(sectionName, 'default-b') - self.setVideoSourceB(self.names.index(defSource)) - self.log.info('Changing sourceB to default of Mode %s: %s', compositeModeName, defSource) - except Exception as e: - pass - - def on_handoff(self, object, buffer): - if self.padStateDirty: - self.padStateDirty = False - self.log.debug('[Streaming-Thread]: Pad-State is Dirty, applying new Mixer-State') - self.applyMixerState() - - def on_eos(self, bus, message): - self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline') - - def on_error(self, bus, message): - self.log.debug('Received Error-Signal on Mixing-Pipeline') - (error, debug) = message.parse_error() - self.log.debug('Error-Details: #%u: %s', error.code, debug) - - - def setVideoSourceA(self, source): - # swap if required - if self.sourceB == source: - self.sourceB = self.sourceA - - self.sourceA = source - self.recalculateMixerState() - - def getVideoSourceA(self): - return self.sourceA - - def setVideoSourceB(self, source): - # swap if required - if self.sourceA == source: - self.sourceA = self.sourceB - - self.sourceB = source - self.recalculateMixerState() - - def getVideoSourceB(self): - return self.sourceB - - def setCompositeMode(self, mode): - self.compositeMode = mode - - self.selectCompositeModeDefaultSources() - self.recalculateMixerState() - - def getCompositeMode(self): - return self.compositeMode +class VideoMix(object): + log = logging.getLogger('VideoMix') + + def __init__(self): + self.caps = Config.get('mix', 'videocaps') + + self.names = Config.getlist('mix', 'sources') + self.log.info('Configuring Mixer for %u Sources', len(self.names)) + + pipeline = """ + compositor name=mix ! + {caps} ! + identity name=sig ! + queue ! + tee name=tee + + intervideosrc channel=video_background ! + {caps} ! + mix. + + tee. ! queue ! intervideosink channel=video_mix_out + """.format( + caps=self.caps + ) + + if Config.getboolean('previews', 'enabled'): + pipeline += """ + tee. ! queue ! intervideosink channel=video_mix_preview + """ + + if Config.getboolean('stream-blanker', 'enabled'): + pipeline += """ + tee. ! queue ! intervideosink channel=video_mix_streamblanker + """ + + for idx, name in enumerate(self.names): + pipeline += """ + intervideosrc channel=video_{name}_mixer ! + {caps} ! + mix. + """.format( + name=name, + caps=self.caps, + idx=idx + ) + + self.log.debug('Creating Mixing-Pipeline:\n%s', pipeline) + self.mixingPipeline = Gst.parse_launch(pipeline) + self.mixingPipeline.use_clock(Clock) + + self.log.debug('Binding Error & End-of-Stream-Signal ' + 'on Mixing-Pipeline') + self.mixingPipeline.bus.add_signal_watch() + self.mixingPipeline.bus.connect("message::eos", self.on_eos) + self.mixingPipeline.bus.connect("message::error", self.on_error) + + self.log.debug('Binding Handoff-Handler for ' + 'Synchronus mixer manipulation') + sig = self.mixingPipeline.get_by_name('sig') + sig.connect('handoff', self.on_handoff) + + self.padStateDirty = False + self.padState = list() + for idx, name in enumerate(self.names): + self.padState.append(PadState()) + + self.log.debug('Initializing Mixer-State') + self.compositeMode = CompositeModes.fullscreen + self.sourceA = 0 + self.sourceB = 1 + self.recalculateMixerState() + self.applyMixerState() + + bgMixerpad = (self.mixingPipeline.get_by_name('mix') + .get_static_pad('sink_0')) + bgMixerpad.set_property('zorder', 0) + + self.log.debug('Launching Mixing-Pipeline') + self.mixingPipeline.set_state(Gst.State.PLAYING) + + def getInputVideoSize(self): + caps = Gst.Caps.from_string(self.caps) + struct = caps.get_structure(0) + _, width = struct.get_int('width') + _, height = struct.get_int('height') + + return width, height + + def recalculateMixerState(self): + if self.compositeMode == CompositeModes.fullscreen: + self.recalculateMixerStateFullscreen() + + elif self.compositeMode == CompositeModes.side_by_side_equal: + self.recalculateMixerStateSideBySideEqual() + + elif self.compositeMode == CompositeModes.side_by_side_preview: + self.recalculateMixerStateSideBySidePreview() + + elif self.compositeMode == CompositeModes.picture_in_picture: + self.recalculateMixerStatePictureInPicture() + + self.log.debug('Marking Pad-State as Dirty') + self.padStateDirty = True + + def recalculateMixerStateFullscreen(self): + self.log.info('Updating Mixer-State for Fullscreen-Composition') + + for idx, name in enumerate(self.names): + pad = self.padState[idx] + + pad.reset() + pad.alpha = float(idx == self.sourceA) + + def recalculateMixerStateSideBySideEqual(self): + self.log.info('Updating Mixer-State for ' + 'Side-by-side-Equal-Composition') + + width, height = self.getInputVideoSize() + self.log.debug('Video-Size parsed as %ux%u', width, height) + + try: + gutter = Config.getint('side-by-side-equal', 'gutter') + self.log.debug('Gutter configured to %u', gutter) + except: + gutter = int(width / 100) + self.log.debug('Gutter calculated to %u', gutter) + + targetWidth = int((width - gutter) / 2) + targetHeight = int(targetWidth / width * height) + + self.log.debug('Video-Size calculated to %ux%u', + targetWidth, targetHeight) + + xa = 0 + xb = width - targetWidth + y = (height - targetHeight) / 2 + + try: + ya = Config.getint('side-by-side-equal', 'atop') + self.log.debug('A-Video Y-Pos configured to %u', ya) + except: + ya = y + self.log.debug('A-Video Y-Pos calculated to %u', ya) + + try: + yb = Config.getint('side-by-side-equal', 'btop') + self.log.debug('B-Video Y-Pos configured to %u', yb) + except: + yb = y + self.log.debug('B-Video Y-Pos calculated to %u', yb) + + for idx, name in enumerate(self.names): + pad = self.padState[idx] + pad.reset() + + pad.width = targetWidth + pad.height = targetHeight + + if idx == self.sourceA: + pad.xpos = xa + pad.ypos = ya + pad.zorder = 1 + + elif idx == self.sourceB: + pad.xpos = xb + pad.ypos = yb + pad.zorder = 2 + + else: + pad.alpha = 0 + + def recalculateMixerStateSideBySidePreview(self): + self.log.info('Updating Mixer-State for ' + 'Side-by-side-Preview-Composition') + + width, height = self.getInputVideoSize() + self.log.debug('Video-Size parsed as %ux%u', width, height) + + try: + asize = [int(i) for i in Config.get('side-by-side-preview', + 'asize').split('x', 1)] + self.log.debug('A-Video-Size configured to %ux%u', + asize[0], asize[1]) + except: + asize = [ + int(width / 1.25), # 80% + int(height / 1.25) # 80% + ] + self.log.debug('A-Video-Size calculated to %ux%u', + asize[0], asize[1]) + + try: + apos = [int(i) for i in Config.get('side-by-side-preview', + 'apos').split('/', 1)] + self.log.debug('B-Video-Position configured to %u/%u', + apos[0], apos[1]) + except: + apos = [ + int(width / 100), # 1% + int(width / 100) # 1% + ] + self.log.debug('B-Video-Position calculated to %u/%u', + apos[0], apos[1]) + + try: + bsize = [int(i) for i in Config.get('side-by-side-preview', + 'bsize').split('x', 1)] + self.log.debug('B-Video-Size configured to %ux%u', + bsize[0], bsize[1]) + except: + bsize = [ + int(width / 4), # 25% + int(height / 4) # 25% + ] + self.log.debug('B-Video-Size calculated to %ux%u', + bsize[0], bsize[1]) + + try: + bpos = [int(i) for i in Config.get('side-by-side-preview', + 'bpos').split('/', 1)] + self.log.debug('B-Video-Position configured to %u/%u', + bpos[0], bpos[1]) + except: + bpos = [ + width - int(width / 100) - bsize[0], + height - int(width / 100) - bsize[1] # 1% + ] + self.log.debug('B-Video-Position calculated to %u/%u', + bpos[0], bpos[1]) + + for idx, name in enumerate(self.names): + pad = self.padState[idx] + pad.reset() + + if idx == self.sourceA: + pad.xpos, pad.ypos = apos + pad.width, pad.height = asize + pad.zorder = 1 + + elif idx == self.sourceB: + pad.xpos, pad.ypos = bpos + pad.width, pad.height = bsize + pad.zorder = 2 + + else: + pad.alpha = 0 + + def recalculateMixerStatePictureInPicture(self): + self.log.info('Updating Mixer-State for ' + 'Picture-in-Picture-Composition') + + width, height = self.getInputVideoSize() + self.log.debug('Video-Size parsed as %ux%u', width, height) + + try: + pipsize = [int(i) for i in Config.get('picture-in-picture', + 'pipsize').split('x', 1)] + self.log.debug('PIP-Size configured to %ux%u', + pipsize[0], pipsize[1]) + except: + pipsize = [ + int(width / 4), # 25% + int(height / 4) # 25% + ] + self.log.debug('PIP-Size calculated to %ux%u', + pipsize[0], pipsize[1]) + + try: + pippos = [int(i) for i in Config.get('picture-in-picture', + 'pippos').split('/', 1)] + self.log.debug('PIP-Position configured to %u/%u', + pippos[0], pippos[1]) + except: + pippos = [ + width - pipsize[0] - int(width / 100), # 1% + height - pipsize[1] - int(width / 100) # 1% + ] + self.log.debug('PIP-Position calculated to %u/%u', + pippos[0], pippos[1]) + + for idx, name in enumerate(self.names): + pad = self.padState[idx] + pad.reset() + + if idx == self.sourceA: + pass + elif idx == self.sourceB: + pad.xpos, pad.ypos = pippos + pad.width, pad.height = pipsize + pad.zorder = 2 + + else: + pad.alpha = 0 + + def applyMixerState(self): + for idx, state in enumerate(self.padState): + # mixerpad 0 = background + mixerpad = (self.mixingPipeline + .get_by_name('mix') + .get_static_pad('sink_%u' % (idx + 1))) + + self.log.debug('Reconfiguring Mixerpad %u to ' + 'x/y=%u/%u, w/h=%u/%u alpha=%0.2f, zorder=%u', + idx, state.xpos, state.ypos, + state.width, state.height, + state.alpha, state.zorder) + mixerpad.set_property('xpos', state.xpos) + mixerpad.set_property('ypos', state.ypos) + mixerpad.set_property('width', state.width) + mixerpad.set_property('height', state.height) + mixerpad.set_property('alpha', state.alpha) + mixerpad.set_property('zorder', state.zorder) + + def selectCompositeModeDefaultSources(self): + sectionNames = { + CompositeModes.fullscreen: 'fullscreen', + CompositeModes.side_by_side_equal: 'side-by-side-equal', + CompositeModes.side_by_side_preview: 'side-by-side-preview', + CompositeModes.picture_in_picture: 'picture-in-picture' + } + + compositeModeName = self.compositeMode.name + sectionName = sectionNames[self.compositeMode] + + try: + defSource = Config.get(sectionName, 'default-a') + self.setVideoSourceA(self.names.index(defSource)) + self.log.info('Changing sourceA to default of Mode %s: %s', + compositeModeName, defSource) + except Exception as e: + pass + + try: + defSource = Config.get(sectionName, 'default-b') + self.setVideoSourceB(self.names.index(defSource)) + self.log.info('Changing sourceB to default of Mode %s: %s', + compositeModeName, defSource) + except Exception as e: + pass + + def on_handoff(self, object, buffer): + if self.padStateDirty: + self.padStateDirty = False + self.log.debug('[Streaming-Thread]: Pad-State is Dirty, ' + 'applying new Mixer-State') + self.applyMixerState() + + def on_eos(self, bus, message): + self.log.debug('Received End-of-Stream-Signal on Mixing-Pipeline') + + def on_error(self, bus, message): + self.log.debug('Received Error-Signal on Mixing-Pipeline') + (error, debug) = message.parse_error() + self.log.debug('Error-Details: #%u: %s', error.code, debug) + + def setVideoSourceA(self, source): + # swap if required + if self.sourceB == source: + self.sourceB = self.sourceA + + self.sourceA = source + self.recalculateMixerState() + + def getVideoSourceA(self): + return self.sourceA + + def setVideoSourceB(self, source): + # swap if required + if self.sourceA == source: + self.sourceA = self.sourceB + + self.sourceB = source + self.recalculateMixerState() + + def getVideoSourceB(self): + return self.sourceB + + def setCompositeMode(self, mode): + self.compositeMode = mode + + self.selectCompositeModeDefaultSources() + self.recalculateMixerState() + + def getCompositeMode(self): + return self.compositeMode |