- #!/usr/bin/python3
- import time, logging
- from gi.repository import GLib, Gst
- from lib.config import Config
- class FailsafeShmSrc(Gst.Bin):
- log = logging.getLogger('FailsafeShmSrc')
- last_buffer_arrived = 0
- last_restart_retry = 0
- is_in_failstate = True
- def __init__(self, socket, caps, failsrc):
- super().__init__()
- # Create elements
- self.shmsrc = Gst.ElementFactory.make('shmsrc', None)
- self.depay = Gst.ElementFactory.make('gdpdepay', None)
- self.capsfilter = Gst.ElementFactory.make('capsfilter', None)
- self.failsrcsyncer = Gst.ElementFactory.make('identity', None)
- self.switch = Gst.ElementFactory.make('input-selector', None)
- self.failsrc = failsrc
- self.capsstr = caps.to_string()
- if not self.shmsrc or not self.capsfilter or not self.failsrcsyncer or not self.switch or not self.failsrc:
- self.log.error('could not create elements')
- # Add elements to Bin
- self.add(self.shmsrc)
- self.add(self.depay)
- self.add(self.capsfilter)
- self.add(self.failsrcsyncer)
- self.add(self.switch)
- self.add(self.failsrc)
- # Get Switcher-Pads
- self.goodpad = self.switch.get_request_pad('sink_%u')
- self.failpad = self.switch.get_request_pad('sink_%u')
- # Set properties
- self.shmsrc.set_property('socket-path', socket)
- self.shmsrc.link(self.depay)
- self.switch.set_property('active-pad', self.failpad)
- self.failsrcsyncer.set_property('sync', True)
- self.capsfilter.set_property('caps', caps)
- # Link elements
- self.depay.link(self.capsfilter)
- self.capsfilter.get_static_pad('src').link(self.goodpad)
- self.failsrc.link_filtered(self.failsrcsyncer, caps)
- self.failsrcsyncer.get_static_pad('src').link(self.failpad)
- # Install pad probes
- self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, self.event_probe, None)
- self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.data_probe, None)
- # Install Watchdog
- if self.capsstr.startswith('audio'):
- timeoutms = 1000
- else:
- timeoutms = 250
- GLib.timeout_add(timeoutms, self.watchdog)
- # Add Ghost Pads
- self.add_pad(
- Gst.GhostPad.new('src', self.switch.get_static_pad('src'))
- )
- def do_handle_message(self, msg):
- if msg.type == Gst.MessageType.ERROR and msg.src == self.shmsrc:
- (err, debug) = msg.parse_error()
- self.log.warning('received error-message from ShmSrc, dropping: %s', err)
- self.log.debug(' debug-info from shmsrc: %s', debug)
- else:
- Gst.Bin.do_handle_message(self, msg)
- def event_probe(self, pad, info, ud):
- e = info.get_event()
- if e.type == Gst.EventType.EOS:
- self.log.warning('received EOS-event on event-probe, dropping')
- self.switch_to_failstate()
- return Gst.PadProbeReturn.DROP
-
- return Gst.PadProbeReturn.PASS
- def data_probe(self, pad, info, ud):
- self.last_buffer_arrived = time.time()
- self.switch_to_goodstate()
- return Gst.PadProbeReturn.PASS
- def watchdog(self):
- t = time.time()
- if self.last_buffer_arrived + 0.1 < t:
- self.log.warning('watchdog encountered a timeout')
- self.switch_to_failstate()
-
- if self.is_in_failstate and self.last_restart_retry + 1 < t:
- self.last_restart_retry = t
- self.restart()
-
- return True
-
- def restart(self):
- self.log.warning('restarting ShmSrc')
- self.shmsrc.set_state(Gst.State.NULL)
- self.shmsrc.set_base_time(self.get_parent().get_base_time())
- self.shmsrc.set_state(Gst.State.PLAYING)
-
- def switch_to_goodstate(self):
- if not self.is_in_failstate:
- return
-
- self.log.warning('switching output to goodstate')
- self.is_in_failstate = False
- self.switch.set_property('active-pad', self.goodpad)
-
- def switch_to_failstate(self):
- if self.is_in_failstate:
- return
-
- self.log.warning('switching output to failstate')
- self.is_in_failstate = True
- self.switch.set_property('active-pad', self.failpad)
|