aboutsummaryrefslogtreecommitdiff
path: root/voctocore/experiments/startuptest/testbin.py
blob: e234b55de1e50af026f89415119edc79bbbf859f (plain)
  1. #!/usr/bin/python3
  2. import time
  3. from gi.repository import GLib, Gst
  4. class TestBin(Gst.Bin):
  5. def __init__(self):
  6. super().__init__()
  7. self.set_name('testbin')
  8. # Create elements
  9. self.shmsrc = Gst.ElementFactory.make('shmsrc', None)
  10. # Add elements to Bin
  11. self.add(self.shmsrc)
  12. self.shmsrc.set_property('socket-path', '/tmp/grabber-v')
  13. self.shmsrc.set_property('is-live', True)
  14. self.shmsrc.set_property('do-timestamp', True)
  15. # Install pad probes
  16. self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, self.event_probe, None)
  17. self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.data_probe, None)
  18. def do_handle_message(self, msg):
  19. if msg.type == Gst.MessageType.ERROR:
  20. print("do_handle_message(): dropping error")
  21. return
  22. print("do_handle_message()", msg.src, msg.type)
  23. Gst.Bin.do_handle_message(self, msg)
  24. def event_probe(self, pad, info, ud):
  25. e = info.get_event()
  26. if e.type == Gst.EventType.EOS:
  27. return Gst.PadProbeReturn.DROP
  28. return Gst.PadProbeReturn.PASS
  29. def data_probe(self, pad, info, ud):
  30. self.last_buffer_arrived = time.time()
  31. return Gst.PadProbeReturn.PASS