diff options
-rw-r--r-- | voctocore/lib/controlserver.py | 30 |
1 files changed, 17 insertions, 13 deletions
diff --git a/voctocore/lib/controlserver.py b/voctocore/lib/controlserver.py index b0fe3d5..c0ceb0b 100644 --- a/voctocore/lib/controlserver.py +++ b/voctocore/lib/controlserver.py @@ -17,13 +17,10 @@ class ControlServer(TCPMultiConnection): self.commands = ControlServerCommands(pipeline) - GObject.idle_add(self.on_loop) - def on_accepted(self, conn, addr): '''Asynchronous connection listener. Starts a handler for each connection.''' self.log.debug('setting gobject io-watch on connection') GObject.io_add_watch(conn, GObject.IO_IN, self.on_data, ['']) - GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) def on_data(self, conn, _, leftovers, *args): '''Asynchronous connection handler. Pushes data from socket @@ -59,6 +56,8 @@ class ControlServer(TCPMultiConnection): self.close_connection(conn) return False + if self.command_queue.empty(): + GObject.idle_add(self.on_loop) self.command_queue.put((line, conn)) if close_after: @@ -75,12 +74,12 @@ class ControlServer(TCPMultiConnection): '''Command handler. Processes commands in the command queue whenever nothing else is happening (registered as GObject idle callback)''' if self.command_queue.empty(): - return True + return False line, requestor = self.command_queue.get() words = line.split() if len(words) < 1: - return True + return False command = words[0] args = words[1:] @@ -115,17 +114,22 @@ class ControlServer(TCPMultiConnection): if isinstance(responseObject, list): for obj in responseObject: signal = "%s\n" % str(obj) - for conn, queue in self.currentConnections.items(): - queue.put(signal) - + for conn in self.currentConnections: + self._schedule_write(conn, signal) else: response = "%s\n" % str(responseObject) finally: if response is not None and requestor in self.currentConnections: - self.currentConnections[requestor].put(response) + self._schedule_write(requestor, response) - return True + return False + + def _schedule_write(self, conn, message): + queue = self.currentConnections[conn] + if queue.empty(): + GObject.io_add_watch(conn, GObject.IO_OUT, self.on_write) + queue.put(message) def on_write(self, conn, *args): # TODO: on_loop() is not called as soon as there is a writable socket @@ -137,7 +141,7 @@ class ControlServer(TCPMultiConnection): return False if queue.empty(): - return True + return False message = queue.get() try: @@ -152,7 +156,7 @@ class ControlServer(TCPMultiConnection): words = msg.split() words[-1] = self.commands.encodeSourceName(int(words[-1])) msg = " ".join(words) + '\n' - for queue in self.currentConnections.values(): - queue.put(msg) + for conn in self.currentConnections: + self._schedule_write(conn, msg) except Exception as e: self.log.debug("error during notify: %s", e) |