aboutsummaryrefslogtreecommitdiff
path: root/voctocore/experiments/shmsrc.py
blob: 3fd400dbdb43b916513e7de368e314d8794e2f78 (plain)
  1. #!/usr/bin/python3
  2. from gi.repository import GObject, Gst
  3. class ShmSrc(Gst.Bin):
  4. def __init__(self, socket, caps):
  5. super().__init__()
  6. # Create elements
  7. self.shmsrc = Gst.ElementFactory.make('shmsrc', None)
  8. self.caps1 = Gst.ElementFactory.make('capsfilter', None)
  9. self.caps2 = Gst.ElementFactory.make('capsfilter', None)
  10. self.switch = Gst.ElementFactory.make('input-selector', None)
  11. self.secondsrc = Gst.ElementFactory.make('videotestsrc', None)
  12. # Add elements to Bin
  13. self.add(self.shmsrc)
  14. self.add(self.caps1)
  15. self.add(self.caps2)
  16. self.add(self.switch)
  17. self.add(self.secondsrc)
  18. # Get Switcher-Pads
  19. self.firstpad = self.switch.get_request_pad('sink_%u')
  20. self.secondpad = self.switch.get_request_pad('sink_%u')
  21. # Set properties
  22. self.shmsrc.set_property('socket-path', socket)
  23. self.shmsrc.set_property('is-live', True)
  24. self.shmsrc.set_property('do-timestamp', True)
  25. self.caps1.set_property('caps', caps)
  26. self.caps2.set_property('caps', caps)
  27. self.switch.set_property('active-pad', self.firstpad)
  28. self.secondsrc.set_property('pattern', 'snow')
  29. # Link elements
  30. self.shmsrc.link(self.caps1)
  31. self.caps1.get_static_pad('src').link(self.firstpad)
  32. self.secondsrc.link(self.caps2)
  33. self.caps2.get_static_pad('src').link(self.secondpad)
  34. # Install pad probes
  35. self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, self.event_probe, None)
  36. self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.IDLE | Gst.PadProbeType.DATA_DOWNSTREAM, self.data_probe, None)
  37. # Add Ghost Pads
  38. self.add_pad(
  39. Gst.GhostPad.new('sink', self.switch.get_static_pad('src'))
  40. )
  41. def event_probe(self, pad, info, ud):
  42. e = info.get_event()
  43. print("event_probe", e.type)
  44. if e.type == Gst.EventType.EOS:
  45. print("shmsrc reported EOS - switching to failover")
  46. self.switch.set_property('active-pad', self.secondpad)
  47. return Gst.PadProbeReturn.DROP
  48. return Gst.PadProbeReturn.PASS
  49. def data_probe(self, pad, info, ud):
  50. print("shmsrc sends data - switching to shmsrc")
  51. # todo: add a timeout of 2*framerate (ie 2*1/25 seconds)
  52. return Gst.PadProbeReturn.PASS