summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMaZderMind <github@mazdermind.de>2015-05-14 20:41:03 +0200
committerMaZderMind <github@mazdermind.de>2015-05-14 20:41:03 +0200
commitb81461b9a433b8ac0c3b3a098e42b12faf9e8d27 (patch)
tree86791b5677a00a7e9daade692b99f42528478b8d
parentb3b9f5630ecfdb9ff7f4e6955fa875f80ae93f84 (diff)
Extract TCP Connection handling into separate classes
-rw-r--r--voctocore/lib/avpreviewoutput.py43
-rw-r--r--voctocore/lib/avrawoutput.py43
-rw-r--r--voctocore/lib/avsource.py37
-rw-r--r--voctocore/lib/controlserver.py31
-rw-r--r--voctocore/lib/tcpmulticonnection.py43
-rw-r--r--voctocore/lib/tcpsingleconnection.py43
6 files changed, 126 insertions, 114 deletions
diff --git a/voctocore/lib/avpreviewoutput.py b/voctocore/lib/avpreviewoutput.py
index 2fa8c3a..0ad3515 100644
--- a/voctocore/lib/avpreviewoutput.py
+++ b/voctocore/lib/avpreviewoutput.py
@@ -1,26 +1,23 @@
#!/usr/bin/python3
-import logging, socket
-from gi.repository import GObject, Gst
+import logging
+from gi.repository import Gst
from lib.config import Config
+from lib.tcpmulticonnection import TCPMultiConnection
-class AVPreviewOutput(object):
+class AVPreviewOutput(TCPMultiConnection):
log = logging.getLogger('AVPreviewOutput')
name = None
- port = None
caps = None
- boundSocket = None
receiverPipeline = None
- currentConnections = []
-
def __init__(self, channel, port):
self.log = logging.getLogger('AVPreviewOutput['+channel+']')
+ super().__init__(port)
self.channel = channel
- self.port = port
if Config.has_option('previews', 'videocaps'):
vcaps_out = Config.get('previews', 'videocaps')
@@ -63,34 +60,14 @@ class AVPreviewOutput(object):
self.receiverPipeline = Gst.parse_launch(pipeline)
self.receiverPipeline.set_state(Gst.State.PLAYING)
- self.log.debug('Binding to Output-Socket on [::]:%u', port)
- self.boundSocket = socket.socket(socket.AF_INET6)
- self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
- self.boundSocket.bind(('::', port))
- self.boundSocket.listen(1)
-
- self.log.debug('Setting GObject io-watch on Socket')
- GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
-
- def on_connect(self, sock, *args):
- conn, addr = sock.accept()
- self.log.info("Incomming Connection from %s", addr)
+ def on_accepted(self, conn, addr):
+ self.log.debug('Adding fd %u to multifdsink', conn.fileno())
+ fdsink = self.receiverPipeline.get_by_name('fd')
+ fdsink.emit('add', conn.fileno())
def on_disconnect(multifdsink, fileno):
if fileno == conn.fileno():
self.log.debug('fd %u removed from multifdsink', fileno)
+ self.close_connection(conn)
- self.currentConnections.remove(conn)
- self.log.info('Disconnected Receiver %s', addr)
- self.log.info('Now %u Receiver connected', len(self.currentConnections))
-
- self.log.debug('Adding fd %u to multifdsink', conn.fileno())
- fdsink = self.receiverPipeline.get_by_name('fd')
- fdsink.emit('add', conn.fileno())
fdsink.connect('client-fd-removed', on_disconnect)
-
- self.currentConnections.append(conn)
- self.log.info('Now %u Receiver connected', len(self.currentConnections))
-
- return True
diff --git a/voctocore/lib/avrawoutput.py b/voctocore/lib/avrawoutput.py
index 630c2ca..69c90a2 100644
--- a/voctocore/lib/avrawoutput.py
+++ b/voctocore/lib/avrawoutput.py
@@ -1,26 +1,23 @@
#!/usr/bin/python3
-import logging, socket
-from gi.repository import GObject, Gst
+import logging
+from gi.repository import Gst
from lib.config import Config
+from lib.tcpmulticonnection import TCPMultiConnection
-class AVRawOutput(object):
+class AVRawOutput(TCPMultiConnection):
log = logging.getLogger('AVRawOutput')
name = None
- port = None
caps = None
- boundSocket = None
receiverPipeline = None
- currentConnections = []
-
def __init__(self, channel, port):
self.log = logging.getLogger('AVRawOutput['+channel+']')
+ super().__init__(port)
self.channel = channel
- self.port = port
pipeline = """
interaudiosrc channel=audio_{channel} !
@@ -52,34 +49,14 @@ class AVRawOutput(object):
self.receiverPipeline = Gst.parse_launch(pipeline)
self.receiverPipeline.set_state(Gst.State.PLAYING)
- self.log.debug('Binding to Output-Socket on [::]:%u', port)
- self.boundSocket = socket.socket(socket.AF_INET6)
- self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
- self.boundSocket.bind(('::', port))
- self.boundSocket.listen(1)
-
- self.log.debug('Setting GObject io-watch on Socket')
- GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
-
- def on_connect(self, sock, *args):
- conn, addr = sock.accept()
- self.log.info("Incomming Connection from %s", addr)
+ def on_accepted(self, conn, addr):
+ self.log.debug('Adding fd %u to multifdsink', conn.fileno())
+ fdsink = self.receiverPipeline.get_by_name('fd')
+ fdsink.emit('add', conn.fileno())
def on_disconnect(multifdsink, fileno):
if fileno == conn.fileno():
self.log.debug('fd %u removed from multifdsink', fileno)
+ self.close_connection(conn)
- self.currentConnections.remove(conn)
- self.log.info('Disconnected Receiver %s', addr)
- self.log.info('Now %u Receiver connected', len(self.currentConnections))
-
- self.log.debug('Adding fd %u to multifdsink', conn.fileno())
- fdsink = self.receiverPipeline.get_by_name('fd')
- fdsink.emit('add', conn.fileno())
fdsink.connect('client-fd-removed', on_disconnect)
-
- self.currentConnections.append(conn)
- self.log.info('Now %u Receiver connected', len(self.currentConnections))
-
- return True
diff --git a/voctocore/lib/avsource.py b/voctocore/lib/avsource.py
index a809948..920d906 100644
--- a/voctocore/lib/avsource.py
+++ b/voctocore/lib/avsource.py
@@ -1,45 +1,25 @@
#!/usr/bin/python3
-import logging, socket
-from gi.repository import GObject, Gst
+import logging
+from gi.repository import Gst
from lib.config import Config
+from lib.tcpsingleconnection import TCPSingleConnection
-class AVSource(object):
+class AVSource(TCPSingleConnection):
log = logging.getLogger('AVSource')
name = None
- port = None
caps = None
receiverPipeline = None
- boundSocket = None
- currentConnection = None
-
def __init__(self, name, port):
self.log = logging.getLogger('AVSource['+name+']')
+ super().__init__(port)
self.name = name
- self.port = port
-
- self.log.debug('Binding to Source-Socket on [::]:%u', port)
- self.boundSocket = socket.socket(socket.AF_INET6)
- self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
- self.boundSocket.bind(('::', port))
- self.boundSocket.listen(1)
-
- self.log.debug('Setting GObject io-watch on Socket')
- GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
-
- def on_connect(self, sock, *args):
- conn, addr = sock.accept()
- self.log.info("Incomming Connection from %s", addr)
-
- if self.currentConnection is not None:
- self.log.warn("Another Source is already connected")
- return True
+ def on_accepted(self, conn, addr):
pipeline = """
fdsrc fd={fd} !
matroskademux name=demux
@@ -97,8 +77,6 @@ class AVSource(object):
self.receiverPipeline.set_state(Gst.State.PLAYING)
- self.currentConnection = conn
- return True
def on_eos(self, bus, message):
self.log.debug('Received End-of-Stream-Signal on Source-Pipeline')
@@ -114,7 +92,6 @@ class AVSource(object):
self.disconnect()
def disconnect(self):
- self.log.info('Connection closed')
self.receiverPipeline.set_state(Gst.State.NULL)
self.receiverPipeline = None
- self.currentConnection = None
+ self.close_connection()
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index 79aa617..835cc8e 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -3,35 +3,24 @@ import socket, logging, traceback
from gi.repository import GObject
from lib.commands import ControlServerCommands
+from lib.tcpmulticonnection import TCPMultiConnection
-class ControlServer():
+class ControlServer(TCPMultiConnection):
log = logging.getLogger('ControlServer')
boundSocket = None
def __init__(self, pipeline):
'''Initialize server and start listening.'''
- self.commands = ControlServerCommands(pipeline)
-
- port = 9999
- self.log.debug('Binding to Command-Socket on [::]:%u', port)
- self.boundSocket = socket.socket(socket.AF_INET6)
- self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
- self.boundSocket.bind(('::', port))
- self.boundSocket.listen(1)
+ super().__init__(port=9999)
- self.log.debug('Setting GObject io-watch on Socket')
- GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
+ self.commands = ControlServerCommands(pipeline)
- def on_connect(self, sock, *args):
+ def on_accepted(self, conn, addr):
'''Asynchronous connection listener. Starts a handler for each connection.'''
- conn, addr = sock.accept()
- self.log.info("Incomming Connection from %s", addr)
self.log.debug('Setting GObject io-watch on Connection')
GObject.io_add_watch(conn, GObject.IO_IN, self.on_data)
- return True
def on_data(self, conn, *args):
'''Asynchronous connection handler. Processes each line from the socket.'''
@@ -40,16 +29,21 @@ class ControlServer():
filelike = conn.makefile('rw')
# read a line from the socket
- line = filelike.readline().strip()
+ line = ''
+ try:
+ line = filelike.readline().strip()
+ except Exception as e:
+ self.log.warn("Can't read from socket: %s", e)
# no data = remote closed connection
if len(line) == 0:
- self.log.info("Connection closed.")
+ self.close_connection(conn)
return False
# 'quit' = remote wants us to close the connection
if line == 'quit':
self.log.info("Client asked us to close the Connection")
+ self.close_connection(conn)
return False
# process the received line
@@ -75,6 +69,7 @@ class ControlServer():
else:
# respond with the returned message
filelike.write('ok '+msg+'\n')
+
return True
def processLine(self, line):
diff --git a/voctocore/lib/tcpmulticonnection.py b/voctocore/lib/tcpmulticonnection.py
new file mode 100644
index 0000000..2ca6921
--- /dev/null
+++ b/voctocore/lib/tcpmulticonnection.py
@@ -0,0 +1,43 @@
+#!/usr/bin/python3
+import logging, socket
+from gi.repository import GObject
+
+from lib.config import Config
+
+class TCPMultiConnection(object):
+ log = logging.getLogger('TCPMultiConnection')
+
+ port = None
+
+ boundSocket = None
+ currentConnections = []
+
+
+ def __init__(self, port):
+ self.port = port
+
+ self.log.debug('Binding to Source-Socket on [::]:%u', port)
+ self.boundSocket = socket.socket(socket.AF_INET6)
+ self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
+ self.boundSocket.bind(('::', port))
+ self.boundSocket.listen(1)
+
+ self.log.debug('Setting GObject io-watch on Socket')
+ GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
+
+ def on_connect(self, sock, *args):
+ conn, addr = sock.accept()
+ self.log.info("Incomming Connection from %s", addr)
+
+ self.currentConnections.append(conn)
+ self.log.info('Now %u Receiver connected', len(self.currentConnections))
+
+ self.on_accepted(conn, addr)
+
+ return True
+
+ def close_connection(self, conn):
+ self.currentConnections.remove(conn)
+ self.log.info('Disconnected Receiver %s', conn.getpeername())
+ self.log.info('Now %u Receiver connected', len(self.currentConnections))
diff --git a/voctocore/lib/tcpsingleconnection.py b/voctocore/lib/tcpsingleconnection.py
new file mode 100644
index 0000000..d6a05ef
--- /dev/null
+++ b/voctocore/lib/tcpsingleconnection.py
@@ -0,0 +1,43 @@
+#!/usr/bin/python3
+import logging, socket
+from gi.repository import GObject
+
+from lib.config import Config
+
+class TCPSingleConnection(object):
+ log = logging.getLogger('TCPSingleConnection')
+
+ port = None
+
+ boundSocket = None
+ currentConnection = None
+
+ def __init__(self, port):
+ self.port = port
+
+ self.log.debug('Binding to Source-Socket on [::]:%u', port)
+ self.boundSocket = socket.socket(socket.AF_INET6)
+ self.boundSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.boundSocket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
+ self.boundSocket.bind(('::', port))
+ self.boundSocket.listen(1)
+
+ self.log.debug('Setting GObject io-watch on Socket')
+ GObject.io_add_watch(self.boundSocket, GObject.IO_IN, self.on_connect)
+
+ def on_connect(self, sock, *args):
+ conn, addr = sock.accept()
+ self.log.info("Incomming Connection from %s", addr)
+
+ if self.currentConnection is not None:
+ self.log.warn("Another Source is already connected")
+ return True
+
+ self.on_accepted(conn, addr)
+ self.currentConnection = conn
+
+ return True
+
+ def close_connection(self):
+ self.currentConnection = None
+ self.log.info('Connection closed')