aboutsummaryrefslogtreecommitdiff
path: root/voctocore/experiments
diff options
context:
space:
mode:
Diffstat (limited to 'voctocore/experiments')
-rwxr-xr-xvoctocore/experiments/startuptest/startuptest.py63
-rw-r--r--voctocore/experiments/startuptest/testbin.py41
2 files changed, 104 insertions, 0 deletions
diff --git a/voctocore/experiments/startuptest/startuptest.py b/voctocore/experiments/startuptest/startuptest.py
new file mode 100755
index 0000000..6e1802e
--- /dev/null
+++ b/voctocore/experiments/startuptest/startuptest.py
@@ -0,0 +1,63 @@
+#!/usr/bin/python3
+
+# Example for the startup-problem
+#
+# The Pipeline will start but the test-image will be still, as long as no
+# ShmSink at /tmp/grabber-v is present (run ../test-grabber-src.sh in another shell)
+#
+# Even though the ShmSrc is not linked to anything and logically not required for
+# the videotestsrc or the ximagesink, the whole pipeline won't startup when this element
+# fails to start.
+#
+# once the pipeline is running, it does not matter what happens to the ShmSink on the
+# other end, because the TestBin filters all EOS and ERROR Messages coming from the ShmSrc,
+# but somehow this is not enough to make the pipeline start in an error-condition..
+#
+
+import gi
+
+# import GStreamer and GTK-Helper classes
+gi.require_version('Gst', '1.0')
+from gi.repository import GLib, Gst, GObject
+
+# init GObject before importing local classes
+GObject.threads_init()
+Gst.init(None)
+
+from testbin import TestBin
+
+class Example:
+ def __init__(self):
+ self.mainloop = GObject.MainLoop()
+ self.pipeline = Gst.Pipeline()
+
+ self.src = Gst.ElementFactory.make('videotestsrc', None)
+ self.sink = Gst.ElementFactory.make('ximagesink', None)
+
+ self.testbin = TestBin()
+
+ # Add elements to pipeline
+ self.pipeline.add(self.testbin)
+ self.pipeline.add(self.src)
+ self.pipeline.add(self.sink)
+
+ self.src.link(self.sink)
+
+ def run(self):
+ self.pipeline.set_state(Gst.State.PLAYING)
+ self.mainloop.run()
+
+ def kill(self):
+ self.pipeline.set_state(Gst.State.NULL)
+ self.mainloop.quit()
+
+ def on_eos(self, bus, msg):
+ print('on_eos()')
+ #self.kill()
+
+ def on_error(self, bus, msg):
+ print('on_error():', msg.parse_error())
+ #self.kill()
+
+example = Example()
+example.run()
diff --git a/voctocore/experiments/startuptest/testbin.py b/voctocore/experiments/startuptest/testbin.py
new file mode 100644
index 0000000..e234b55
--- /dev/null
+++ b/voctocore/experiments/startuptest/testbin.py
@@ -0,0 +1,41 @@
+#!/usr/bin/python3
+import time
+from gi.repository import GLib, Gst
+
+class TestBin(Gst.Bin):
+ def __init__(self):
+ super().__init__()
+ self.set_name('testbin')
+
+ # Create elements
+ self.shmsrc = Gst.ElementFactory.make('shmsrc', None)
+
+ # Add elements to Bin
+ self.add(self.shmsrc)
+
+ self.shmsrc.set_property('socket-path', '/tmp/grabber-v')
+ self.shmsrc.set_property('is-live', True)
+ self.shmsrc.set_property('do-timestamp', True)
+
+ # Install pad probes
+ self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.EVENT_DOWNSTREAM, self.event_probe, None)
+ self.shmsrc.get_static_pad('src').add_probe(Gst.PadProbeType.BLOCK | Gst.PadProbeType.BUFFER, self.data_probe, None)
+
+ def do_handle_message(self, msg):
+ if msg.type == Gst.MessageType.ERROR:
+ print("do_handle_message(): dropping error")
+ return
+
+ print("do_handle_message()", msg.src, msg.type)
+ Gst.Bin.do_handle_message(self, msg)
+
+ def event_probe(self, pad, info, ud):
+ e = info.get_event()
+ if e.type == Gst.EventType.EOS:
+ return Gst.PadProbeReturn.DROP
+
+ return Gst.PadProbeReturn.PASS
+
+ def data_probe(self, pad, info, ud):
+ self.last_buffer_arrived = time.time()
+ return Gst.PadProbeReturn.PASS