#!/usr/bin/python3 import time from gi.repository import GLib, Gst class TestBin(Gst.Bin): def __init__(self): super().__init__() self.set_name('testbin') # Create elements self.shmsrc = Gst.ElementFactory.make('shmsrc', None) # Add elements to Bin self.add(self.shmsrc) self.shmsrc.set_property('socket-path', '/tmp/grabber-v') self.shmsrc.set_property('is-live', True) self.shmsrc.set_property('do-timestamp', True) # Install pad probes self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, self.event_probe, None) self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.data_probe, None) def do_handle_message(self, msg): if msg.type == Gst.MessageType.ERROR: print("do_handle_message(): dropping error") return print("do_handle_message()", msg.src, msg.type) Gst.Bin.do_handle_message(self, msg) def event_probe(self, pad, info, ud): e = info.get_event() if e.type == Gst.EventType.EOS: return Gst.PadProbeReturn.DROP return Gst.PadProbeReturn.PASS def data_probe(self, pad, info, ud): self.last_buffer_arrived = time.time() return Gst.PadProbeReturn.PASS