diff options
author | MaZderMind <github@mazdermind.de> | 2015-05-14 20:41:03 +0200 |
---|---|---|
committer | MaZderMind <github@mazdermind.de> | 2015-05-14 20:41:03 +0200 |
commit | b81461b9a433b8ac0c3b3a098e42b12faf9e8d27 (patch) | |
tree | 86791b5677a00a7e9daade692b99f42528478b8d /voctocore | |
parent | b3b9f5630ecfdb9ff7f4e6955fa875f80ae93f84 (diff) |
Extract TCP Connection handling into separate classes
Diffstat (limited to 'voctocore')
-rw-r--r-- | voctocore/lib/avpreviewoutput.py | 43 | ||||
-rw-r--r-- | voctocore/lib/avrawoutput.py | 43 | ||||
-rw-r--r-- | voctocore/lib/avsource.py | 37 | ||||
-rw-r--r-- | voctocore/lib/controlserver.py | 31 | ||||
-rw-r--r-- | voctocore/lib/tcpmulticonnection.py | 43 | ||||
-rw-r--r-- | voctocore/lib/tcpsingleconnection.py | 43 |
6 files changed, 126 insertions, 114 deletions
diff --git a/voctocore/lib/avpreviewoutput.py b/voctocore/lib/avpreviewoutput.py index 2fa8c3a..0ad3515 100644 --- a/voctocore/lib/avpreviewoutput.py +++ b/voctocore/lib/avpreviewoutput.py @@ -1,26 +1,23 @@ #!/usr/bin/python3 -import logging, socket -from gi.repository import GObject, Gst +import logging +from gi.repository import Gst from lib.config import Config +from lib.tcpmulticonnection import TCPMultiConnection -class AVPreviewOutput(object): +class AVPreviewOutput(TCPMultiConnection): log = logging.getLogger('AVPreviewOutput') name = None - port = None caps = None - boundSocket = None receiverPipeline = None - currentConnections = [] - def __init__(self, channel, port): self.log = logging.getLogger('AVPreviewOutput['+channel+']') + super().__init__(port) self.channel = channel - self.port = port if Config.has_option('previews', 'videocaps'): vcaps_out = Config.get('previews', 'videocaps') @@ -63,34 +60,14 @@ class AVPreviewOutput(object): self.receiverPipeline = Gst.parse_launch(pipeline) self.receiverPipeline.set_state(Gst.State.PLAYING) - self.log.debug('Binding to Output-Socket on [::]:%u', port) - self.boundSocket = socket.socket(socket.AF_INET6) - self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - self.boundSocket.bind(('::', port)) - self.boundSocket.listen(1) - - self.log.debug('Setting GObject io-watch on Socket') - GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) - - def on_connect(self, sock, *args): - conn, addr = sock.accept() - self.log.info("Incomming Connection from %s", addr) + def on_accepted(self, conn, addr): + self.log.debug('Adding fd %u to multifdsink', conn.fileno()) + fdsink = self.receiverPipeline.get_by_name('fd') + fdsink.emit('add', conn.fileno()) def on_disconnect(multifdsink, fileno): if fileno == conn.fileno(): self.log.debug('fd %u removed from multifdsink', fileno) + self.close_connection(conn) - self.currentConnections.remove(conn) - self.log.info('Disconnected Receiver %s', addr) - self.log.info('Now %u Receiver connected', len(self.currentConnections)) - - self.log.debug('Adding fd %u to multifdsink', conn.fileno()) - fdsink = self.receiverPipeline.get_by_name('fd') - fdsink.emit('add', conn.fileno()) fdsink.connect('client-fd-removed', on_disconnect) - - self.currentConnections.append(conn) - self.log.info('Now %u Receiver connected', len(self.currentConnections)) - - return True diff --git a/voctocore/lib/avrawoutput.py b/voctocore/lib/avrawoutput.py index 630c2ca..69c90a2 100644 --- a/voctocore/lib/avrawoutput.py +++ b/voctocore/lib/avrawoutput.py @@ -1,26 +1,23 @@ #!/usr/bin/python3 -import logging, socket -from gi.repository import GObject, Gst +import logging +from gi.repository import Gst from lib.config import Config +from lib.tcpmulticonnection import TCPMultiConnection -class AVRawOutput(object): +class AVRawOutput(TCPMultiConnection): log = logging.getLogger('AVRawOutput') name = None - port = None caps = None - boundSocket = None receiverPipeline = None - currentConnections = [] - def __init__(self, channel, port): self.log = logging.getLogger('AVRawOutput['+channel+']') + super().__init__(port) self.channel = channel - self.port = port pipeline = """ interaudiosrc channel=audio_{channel} ! @@ -52,34 +49,14 @@ class AVRawOutput(object): self.receiverPipeline = Gst.parse_launch(pipeline) self.receiverPipeline.set_state(Gst.State.PLAYING) - self.log.debug('Binding to Output-Socket on [::]:%u', port) - self.boundSocket = socket.socket(socket.AF_INET6) - self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - self.boundSocket.bind(('::', port)) - self.boundSocket.listen(1) - - self.log.debug('Setting GObject io-watch on Socket') - GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) - - def on_connect(self, sock, *args): - conn, addr = sock.accept() - self.log.info("Incomming Connection from %s", addr) + def on_accepted(self, conn, addr): + self.log.debug('Adding fd %u to multifdsink', conn.fileno()) + fdsink = self.receiverPipeline.get_by_name('fd') + fdsink.emit('add', conn.fileno()) def on_disconnect(multifdsink, fileno): if fileno == conn.fileno(): self.log.debug('fd %u removed from multifdsink', fileno) + self.close_connection(conn) - self.currentConnections.remove(conn) - self.log.info('Disconnected Receiver %s', addr) - self.log.info('Now %u Receiver connected', len(self.currentConnections)) - - self.log.debug('Adding fd %u to multifdsink', conn.fileno()) - fdsink = self.receiverPipeline.get_by_name('fd') - fdsink.emit('add', conn.fileno()) fdsink.connect('client-fd-removed', on_disconnect) - - self.currentConnections.append(conn) - self.log.info('Now %u Receiver connected', len(self.currentConnections)) - - return True diff --git a/voctocore/lib/avsource.py b/voctocore/lib/avsource.py index a809948..920d906 100644 --- a/voctocore/lib/avsource.py +++ b/voctocore/lib/avsource.py @@ -1,45 +1,25 @@ #!/usr/bin/python3 -import logging, socket -from gi.repository import GObject, Gst +import logging +from gi.repository import Gst from lib.config import Config +from lib.tcpsingleconnection import TCPSingleConnection -class AVSource(object): +class AVSource(TCPSingleConnection): log = logging.getLogger('AVSource') name = None - port = None caps = None receiverPipeline = None - boundSocket = None - currentConnection = None - def __init__(self, name, port): self.log = logging.getLogger('AVSource['+name+']') + super().__init__(port) self.name = name - self.port = port - - self.log.debug('Binding to Source-Socket on [::]:%u', port) - self.boundSocket = socket.socket(socket.AF_INET6) - self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - self.boundSocket.bind(('::', port)) - self.boundSocket.listen(1) - - self.log.debug('Setting GObject io-watch on Socket') - GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) - - def on_connect(self, sock, *args): - conn, addr = sock.accept() - self.log.info("Incomming Connection from %s", addr) - - if self.currentConnection is not None: - self.log.warn("Another Source is already connected") - return True + def on_accepted(self, conn, addr): pipeline = """ fdsrc fd={fd} ! matroskademux name=demux @@ -97,8 +77,6 @@ class AVSource(object): self.receiverPipeline.set_state(Gst.State.PLAYING) - self.currentConnection = conn - return True def on_eos(self, bus, message): self.log.debug('Received End-of-Stream-Signal on Source-Pipeline') @@ -114,7 +92,6 @@ class AVSource(object): self.disconnect() def disconnect(self): - self.log.info('Connection closed') self.receiverPipeline.set_state(Gst.State.NULL) self.receiverPipeline = None - self.currentConnection = None + self.close_connection() diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index 79aa617..835cc8e 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -3,35 +3,24 @@ import socket, logging, traceback from gi.repository import GObject from lib.commands import ControlServerCommands +from lib.tcpmulticonnection import TCPMultiConnection -class ControlServer(): +class ControlServer(TCPMultiConnection): log = logging.getLogger('ControlServer') boundSocket = None def __init__(self, pipeline): '''Initialize server and start listening.''' - self.commands = ControlServerCommands(pipeline) - - port = 9999 - self.log.debug('Binding to Command-Socket on [::]:%u', port) - self.boundSocket = socket.socket(socket.AF_INET6) - self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) - self.boundSocket.bind(('::', port)) - self.boundSocket.listen(1) + super().__init__(port=9999) - self.log.debug('Setting GObject io-watch on Socket') - GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) + self.commands = ControlServerCommands(pipeline) - def on_connect(self, sock, *args): + def on_accepted(self, conn, addr): '''Asynchronous connection listener. Starts a handler for each connection.''' - conn, addr = sock.accept() - self.log.info("Incomming Connection from %s", addr) self.log.debug('Setting GObject io-watch on Connection') GObject.io_add_watch(conn, GObject.IO_IN, self.on_data) - return True def on_data(self, conn, *args): '''Asynchronous connection handler. Processes each line from the socket.''' @@ -40,16 +29,21 @@ class ControlServer(): filelike = conn.makefile('rw') # read a line from the socket - line = filelike.readline().strip() + line = '' + try: + line = filelike.readline().strip() + except Exception as e: + self.log.warn("Can't read from socket: %s", e) # no data = remote closed connection if len(line) == 0: - self.log.info("Connection closed.") + self.close_connection(conn) return False # 'quit' = remote wants us to close the connection if line == 'quit': self.log.info("Client asked us to close the Connection") + self.close_connection(conn) return False # process the received line @@ -75,6 +69,7 @@ class ControlServer(): else: # respond with the returned message filelike.write('ok '+msg+'\n') + return True def processLine(self, line): diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py new file mode 100644 index 0000000..2ca6921 --- /dev/null +++ b/voctocore/lib/tcpmulticonnection.py @@ -0,0 +1,43 @@ +#!/usr/bin/python3 +import logging, socket +from gi.repository import GObject + +from lib.config import Config + +class TCPMultiConnection(object): + log = logging.getLogger('TCPMultiConnection') + + port = None + + boundSocket = None + currentConnections = [] + + + def __init__(self, port): + self.port = port + + self.log.debug('Binding to Source-Socket on [::]:%u', port) + self.boundSocket = socket.socket(socket.AF_INET6) + self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) + self.boundSocket.bind(('::', port)) + self.boundSocket.listen(1) + + self.log.debug('Setting GObject io-watch on Socket') + GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) + + def on_connect(self, sock, *args): + conn, addr = sock.accept() + self.log.info("Incomming Connection from %s", addr) + + self.currentConnections.append(conn) + self.log.info('Now %u Receiver connected', len(self.currentConnections)) + + self.on_accepted(conn, addr) + + return True + + def close_connection(self, conn): + self.currentConnections.remove(conn) + self.log.info('Disconnected Receiver %s', conn.getpeername()) + self.log.info('Now %u Receiver connected', len(self.currentConnections)) diff --git a/voctocore/lib/tcpsingleconnection.py b/voctocore/lib/tcpsingleconnection.py new file mode 100644 index 0000000..d6a05ef --- /dev/null +++ b/voctocore/lib/tcpsingleconnection.py @@ -0,0 +1,43 @@ +#!/usr/bin/python3 +import logging, socket +from gi.repository import GObject + +from lib.config import Config + +class TCPSingleConnection(object): + log = logging.getLogger('TCPSingleConnection') + + port = None + + boundSocket = None + currentConnection = None + + def __init__(self, port): + self.port = port + + self.log.debug('Binding to Source-Socket on [::]:%u', port) + self.boundSocket = socket.socket(socket.AF_INET6) + self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False) + self.boundSocket.bind(('::', port)) + self.boundSocket.listen(1) + + self.log.debug('Setting GObject io-watch on Socket') + GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect) + + def on_connect(self, sock, *args): + conn, addr = sock.accept() + self.log.info("Incomming Connection from %s", addr) + + if self.currentConnection is not None: + self.log.warn("Another Source is already connected") + return True + + self.on_accepted(conn, addr) + self.currentConnection = conn + + return True + + def close_connection(self): + self.currentConnection = None + self.log.info('Connection closed') |