aboutsummaryrefslogtreecommitdiff
path: root/voctocore
diff options
context:
space:
mode:
Diffstat (limited to 'voctocore')
-rw-r--r--voctocore/lib/controlserver.py30
1 files changed, 17 insertions, 13 deletions
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py
index b0fe3d5..c0ceb0b 100644
--- a/voctocore/lib/controlserver.py
+++ b/voctocore/lib/controlserver.py
@@ -17,13 +17,10 @@ class ControlServer(TCPMultiConnection):
self.commands = ControlServerCommands(pipeline)
- GObject.idle_add(self.on_loop)
-
def on_accepted(self, conn, addr):
'''Asynchronous connection listener. Starts a handler for each connection.'''
self.log.debug('setting gobject io-watch on connection')
GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, [''])
- GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
def on_data(self, conn, _, leftovers, *args):
'''Asynchronous connection handler. Pushes data from socket
@@ -59,6 +56,8 @@ class ControlServer(TCPMultiConnection):
self.close_connection(conn)
return False
+ if self.command_queue.empty():
+ GObject.idle_add(self.on_loop)
self.command_queue.put((line, conn))
if close_after:
@@ -75,12 +74,12 @@ class ControlServer(TCPMultiConnection):
'''Command handler. Processes commands in the command queue whenever
nothing else is happening (registered as GObject idle callback)'''
if self.command_queue.empty():
- return True
+ return False
line, requestor = self.command_queue.get()
words = line.split()
if len(words) < 1:
- return True
+ return False
command = words[0]
args = words[1:]
@@ -115,17 +114,22 @@ class ControlServer(TCPMultiConnection):
if isinstance(responseObject, list):
for obj in responseObject:
signal = "%s\n" % str(obj)
- for conn, queue in self.currentConnections.items():
- queue.put(signal)
-
+ for conn in self.currentConnections:
+ self._schedule_write(conn, signal)
else:
response = "%s\n" % str(responseObject)
finally:
if response is not None and requestor in self.currentConnections:
- self.currentConnections[requestor].put(response)
+ self._schedule_write(requestor, response)
- return True
+ return False
+
+ def _schedule_write(self, conn, message):
+ queue = self.currentConnections[conn]
+ if queue.empty():
+ GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write)
+ queue.put(message)
def on_write(self, conn, *args):
# TODO: on_loop() is not called as soon as there is a writable socket
@@ -137,7 +141,7 @@ class ControlServer(TCPMultiConnection):
return False
if queue.empty():
- return True
+ return False
message = queue.get()
try:
@@ -152,7 +156,7 @@ class ControlServer(TCPMultiConnection):
words = msg.split()
words[-1] = self.commands.encodeSourceName(int(words[-1]))
msg = " ".join(words) + '\n'
- for queue in self.currentConnections.values():
- queue.put(msg)
+ for conn in self.currentConnections:
+ self._schedule_write(conn, msg)
except Exception as e:
self.log.debug("error during notify: %s", e)