#!/usr/bin/python3 import gi, time import socket # import GStreamer and GTK-Helper classes gi.require_version('Gst', '1.0') from gi.repository import GLib, Gst, GObject # init GObject before importing local classes GObject.threads_init() Gst.init(None) class Example: def __init__(self): self.mainloop = GObject.MainLoop() self.vsink = Gst.parse_launch('intervideosrc channel=video ! video/x-raw,height=600,width=800,format=I420,framerate=25/1 ! timeoverlay ! videoconvert ! ximagesink') self.vsource = None self.asink = Gst.parse_launch('interaudiosrc channel=audio ! audio/x-raw,format=S16LE,layout=interleaved,rate=48000,channels=2 ! autoaudiosink') self.asource = None # Create the server, binding to localhost on port 5000 vsock = socket.socket(socket.AF_INET6) vsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) vsock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) vsock.bind(('::', 5000)) vsock.listen(1) # register socket for callback inside the GTK-Mainloop GObject.io_add_watch(vsock, GObject.IO_IN, self.connection_handler_video) # Create the server, binding to localhost on port 6000 asock = socket.socket(socket.AF_INET6) asock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) asock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) asock.bind(('::', 6000)) asock.listen(1) # register socket for callback inside the GTK-Mainloop GObject.io_add_watch(asock, GObject.IO_IN, self.connection_handler_audio) def connection_handler_video(self, sock, *args): '''Asynchronous connection listener. Starts a handler for each connection.''' if self.vsource: return False conn, addr = sock.accept() print("Connection from", addr) self.vsource = Gst.parse_launch('appsrc name=a ! gdpdepay ! video/x-raw,height=600,width=800,format=I420,framerate=25/1 ! timeoverlay halignment=right ! intervideosink channel=video') self.vsource.set_state(Gst.State.PLAYING) # register data-received handler inside the GTK-Mainloop GObject.io_add_watch(conn, GObject.IO_IN, self.data_handler_video) return True def data_handler_video(self, conn, *args): '''Asynchronous data handler. Processes data-blocks line from the socket.''' blob = conn.recv(10000000) # >1920x1080x3 if not len(blob): print("Connection closed.") self.vsource.set_state(Gst.State.NULL) self.vsource = None return False print("Video-Blob of %u bytes" % len(blob)) buf = Gst.Buffer.new_wrapped(blob) self.vsource.get_by_name('a').emit('push-buffer', buf) return True def connection_handler_audio(self, sock, *args): '''Asynchronous connection listener. Starts a handler for each connection.''' if self.asource: return False conn, addr = sock.accept() print("Connection from", addr) self.asource = Gst.parse_launch('appsrc name=a ! gdpdepay ! audio/x-raw,format=S16LE,layout=interleaved,rate=48000,channels=2 ! interaudiosink channel=audio') self.asource.set_state(Gst.State.PLAYING) # register data-received handler inside the GTK-Mainloop GObject.io_add_watch(conn, GObject.IO_IN, self.data_handler_audio) return True def data_handler_audio(self, conn, *args): '''Asynchronous data handler. Processes data-blocks line from the socket.''' blob = conn.recv(10000000) # >1920x1080x3 if not len(blob): print("Connection closed.") self.asource.set_state(Gst.State.NULL) self.asource = None return False print("Audio-Blob of %u bytes" % len(blob)) buf = Gst.Buffer.new_wrapped(blob) self.asource.get_by_name('a').emit('push-buffer', buf) return True def run(self): self.vsink.set_state(Gst.State.PLAYING) self.asink.set_state(Gst.State.PLAYING) self.mainloop.run() def kill(self): self.vsink.set_state(Gst.State.NULL) self.asink.set_state(Gst.State.NULL) self.mainloop.quit() def on_eos(self, bus, msg): print('on_eos()') #self.kill() def on_error(self, bus, msg): print('on_error():', msg.parse_error()) #self.kill() example = Example() example.run()