From: Ondrej Lichtner olichtne@redhat.com
This file defines a new class ServerHandler. This class is a logging handler and therefore is a decendant of the class logging.Handler.
This new class contains a TCP server socket listening on a specified port. Until an incoming connection is recieved emitted messages are stored in a buffer. After recieving a connection the stored messages are all sent to the client and the buffer is flushed.
To avoid using threads or a new process for this server, the server socket is created as nonblocking and we check for incoming connections only when a new log message is emitted.
This handler was created because there is no suitable alternative for this functionality in the python standard library. However this handler is inspired by the SocketHandler and uses some of it's code.
Signed-off-by: Ondrej Lichtner olichtne@redhat.com --- Common/LoggingHandler.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 Common/LoggingHandler.py
diff --git a/Common/LoggingHandler.py b/Common/LoggingHandler.py new file mode 100644 index 0000000..9b08f9d --- /dev/null +++ b/Common/LoggingHandler.py @@ -0,0 +1,108 @@ +""" +Server-like logging handler. +Stores logged messages in a buffer. Every time a new message is emitted it +checks for incoming connections. If a connection is established it flushes the +messages stored in the buffer to the connecting client. + +Copyright 2012 Red Hat, Inc. +Licensed under the GNU General Public License, version 2 as +published by the Free Software Foundation; see COPYING for details. +""" + +__autor__ = """ +olichtne@redhat.com (Ondrej Lichtner) +""" + +import socket, struct, pickle +import logging + +DEFAULT_LOG_PORT = 9998 + +class ServerHandler(logging.Handler): + def __init__(self, port=DEFAULT_LOG_PORT): + logging.Handler.__init__(self) + self.port = port + + self.sock = None + self.buf = [] + + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setblocking(0) + self.server_socket.bind(('0.0.0.0', self.port)) + self.server_socket.listen(1) + + def makePickle(self, record): + """ + Pickles the record in binary format with a length prefix, and + returns it ready for transmission across the socket. + + Function taken from class SocketHandler from standard python + library logging.handlers + """ + d = dict(record.__dict__) + d['msg'] = record.getMessage() + d['args'] = None + d['exc_info'] = None + s = pickle.dumps(d, 1) + slen = struct.pack(">L", len(s)) + return slen + s + + def emit(self, record): + try: + s = self.makePickle(record) + self.buf.append(s) + self.send_all() + except (KeyboardInterrupt, SystemExit): + raise + except: + logging.Handler.handleError(self, record) + + def client_connection(self): + if self.sock == None: + try: + self.sock = self.server_socket.accept()[0] + return True + except: + self.sock = None + return False + else: + return True + + def send_all(self): + if self.client_connection(): + sent = len(self.buf) + for record in self.buf: + if not self.send(record): + sent = self.buf.index(record) + break + + self.buf = self.buf[sent:] + + def send(self, record): + try: + if hasattr(self.sock, "sendall"): + self.sock.sendall(record) + else: + sentsofar = 0 + left = len(record) + while left > 0: + sent = self.sock.send(record[sentsofar:]) + sentsofar = sentsofar + sent + left = left - sent + return True + except socket.error: + self.sock.close() + self.sock = None + return False + + def close(self): + if self.sock: + self.sock.close() + self.sock = None + + self.server_socket.close() + self.server_socket = None + + self.buf = [] + + logging.Handler.close(self)
From: Ondrej Lichtner olichtne@redhat.com
This commit changes the LoggingServer so that it doesn't listen for incoming connections and instead initiates them. This solves the problem of slave logs not arriving when the controller is using a firewall or is located behind a NAT.
To connect a new slave its ip address and the port number are sent to the server process through a pipe. The server then attempts to create a new connection. This expects that there is a listening server socket on the other side, which for us means that the slave is using the ServerHandler.
Signed-off-by: Ondrej Lichtner olichtne@redhat.com --- Common/LoggingServer.py | 59 ++++++++++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 28 deletions(-)
diff --git a/Common/LoggingServer.py b/Common/LoggingServer.py index 4bd4f19..7aab431 100644 --- a/Common/LoggingServer.py +++ b/Common/LoggingServer.py @@ -18,26 +18,24 @@ from Common.Utils import die_when_parent_die
class LoggingServer: - DEFAULT_PORT = 9998 - def __init__(self, port, root_path, debug): - self.port = port + def __init__(self, root_path, debug): self.pid = None - self.socket = None self.stopped = False self.root_path = root_path self.debug = debug self.childSocket = {} + self.read_pipe = None + self.write_pipe = None
def server_stop_handler(self, sig, frame): """ Call function. Used for signal handle. """ + os.close(self.read_pipe) if (sig == signal.SIGTERM): for sock in self.childSocket.itervalues(): sock[2].close() - self.socket.shutdown(socket.SHUT_RDWR) - self.socket.close() sys.exit()
@@ -48,26 +46,20 @@ class LoggingServer: @param port: Port on which logging server listen. @return: Pid of logging process. """ - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self.socket.bind(('0.0.0.0',self.port)) - except socket.error, e: - if (e.errno == 98): - logging.error("Another Logging server listen" - " on port %d" % self.port) - raise + self.read_pipe, self.write_pipe = os.pipe() self.pid = os.fork() if not self.pid: + os.close(self.write_pipe) die_when_parent_die() signal.signal(signal.SIGTERM, self.server_stop_handler) self._forked() else: + os.close(self.read_pipe) time.sleep(0.5)
def prepare_logging(self, root_path, sock): - address = sock[1][0] + address = sock.getpeername()[0] slave_root_path = os.path.join(root_path, address) try: os.mkdir(slave_root_path) @@ -77,7 +69,7 @@ class LoggingServer: logger = logging.getLogger(address) Logs(self.debug, False, logger, log_root=slave_root_path, to_display=False, date="") - return (logger, address, sock[0]) + return (logger, address, sock)
def recv_slave_log(self, logger, address, sock): @@ -111,23 +103,34 @@ class LoggingServer:
@param port: Port for listening. """ - self.socket.listen(100) - wait_socket = [self.socket.fileno()] + wait_socket = [self.read_pipe] while True: (r,w,e) = select.select(wait_socket, [], []) - if (self.socket.fileno() in r): - csock = self.socket.accept() - slave = self.prepare_logging(self.root_path, csock) - self.childSocket[csock[0].fileno()] = slave - wait_socket.append(slave[2].fileno()) - r.remove(self.socket.fileno()) + if (self.read_pipe in r): + slave_ip = os.read(self.read_pipe, 4096) + host, port = slave_ip.split() + + try: + csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + csock.connect((host, int(port))) + except socket.error: + logging.debug("Failed to connect to slave: "+ slave_ip) + else: + slave = self.prepare_logging(self.root_path, csock) + self.childSocket[csock.fileno()] = slave + wait_socket.append(slave[2].fileno()) + finally: + r.remove(self.read_pipe) + for so in r: if not self.recv_slave_log(*self.childSocket[so]): self.childSocket[so][2].close() del self.childSocket[so] wait_socket.remove(so) - self.socket.close() - sys.exit() + + + def addSlave(self, hostname, port): + os.write(self.write_pipe, hostname+' '+port)
def stop(self): @@ -149,7 +152,7 @@ if __name__ == '__main__': logger = logging.getLogger() c = logging.FileHandler("out.txt") logger.addHandler(c) - l = LoggingServer(LoggingServer.DEFAULT_PORT) + l = LoggingServer() l.start() raw_input() l.stop()
From: Ondrej Lichtner olichtne@redhat.com
This commit makes slaves use the newly implemented ServerHandler instead of a combination of a MemoryHandler and a SocketHandler. This is another one of the changes required for the reversal of the communication between controller and slave logging.
I also removed the function append_network_hadler since it won't be used from now
Signed-off-by: Ondrej Lichtner olichtne@redhat.com --- Common/Logs.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-)
diff --git a/Common/Logs.py b/Common/Logs.py index bf20e80..85099d4 100644 --- a/Common/Logs.py +++ b/Common/Logs.py @@ -13,6 +13,7 @@ import os, sys, shutil, datetime from logging import Formatter import logging.handlers import traceback +from Common.LoggingHandler import ServerHandler
LOCAL_IP = "(127.0.0.1)"
@@ -214,14 +215,6 @@ class Logs: cls.root_path = cls.prepare_logging(debug, waitForNet, recipe_path, to_display)
- - @staticmethod - def append_network_hadler(address, port): - """ - Append to log network handler. - """ - logging.net_handler.setTarget(logging.handlers.SocketHandler(address, port)) - @classmethod def clean_root_log_folder(cls, logRootPath): try: @@ -297,9 +290,8 @@ class Logs: root_logger.addHandler(display)
if waitForNet: - memory_handler = logging.handlers.MemoryHandler(1) - root_logger.addHandler(memory_handler) - logging.net_handler = memory_handler + server_handler = ServerHandler() + root_logger.addHandler(server_handler)
log_root_folder = cls.set_logging_root_path(recipe_path)
From: Ondrej Lichtner olichtne@redhat.com
NetTestController now needs to have access to the logging server so nettestctl now passes it as an argument to the constructor.
The function _init_slave_logging now doesn't communicate with the slave at all. Instead it sends the slaves address to the logging server which then tries to connect to it.
Signed-off-by: Ondrej Lichtner olichtne@redhat.com --- NetTest/NetTestController.py | 18 +++++++++--------- nettestctl.py | 13 ++++++------- 2 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/NetTest/NetTestController.py b/NetTest/NetTestController.py index e9735b4..2629c40 100644 --- a/NetTest/NetTestController.py +++ b/NetTest/NetTestController.py @@ -26,6 +26,7 @@ from Common.LoggingServer import LoggingServer from Common.VirtUtils import VirtNetCtl, VirtDomainCtl, BridgeCtl from Common.Utils import wait_for from NetTest.MachinePool import MachinePool +from Common.LoggingHandler import DEFAULT_LOG_PORT
class NetTestError(Exception): pass @@ -35,12 +36,13 @@ def ignore_event(**kwarg):
class NetTestController: def __init__(self, recipe_path, remoteexec=False, cleanup=False, - res_serializer=None, config=None): + res_serializer=None, config=None, logServer=None): self._remoteexec = remoteexec self._docleanup = cleanup self._res_serializer = res_serializer self._remote_capture_files = {} self._config = config + self._logServer = logServer self._command_context = NetTestCommandContext() self._machine_pool = MachinePool(config.get_option('environment', 'pool_dirs')) @@ -257,15 +259,13 @@ class NetTestController:
def _init_slave_logging(self, machine_id): info = self._get_machineinfo(machine_id) + logServer = self._logServer + hostname = info["hostname"] - logging.info("Setting logging server on machine %s", hostname) - rpc = self._get_machinerpc(machine_id) - ip_addr = get_corespond_local_ip(hostname) - if not rpc.set_logging(ip_addr, self._config.get_option('log', 'port')): - logging.error("==================================================") - logging.error("Machine %s is unable to connect to the logging "\ - "server! Check your firewall settings." % hostname) - logging.error("==================================================") + port = str(DEFAULT_LOG_PORT) + + logging.info("Connecting to the logging server on machine %s", hostname) + logServer.addSlave(hostname, port)
def _deconfigure_slaves(self): if 'machines' not in self._recipe: diff --git a/nettestctl.py b/nettestctl.py index f9973e4..c4154d4 100755 --- a/nettestctl.py +++ b/nettestctl.py @@ -45,12 +45,12 @@ def usage(): print " -x, --result=FILE file to write xml_result" sys.exit()
-def process_recipe(action, file_path, remoteexec, cleanup, - res_serializer, packet_capture, config): +def process_recipe(action, file_path, remoteexec, cleanup, res_serializer, + packet_capture, config, loggingServer): nettestctl = NetTestController(file_path, remoteexec=remoteexec, cleanup=cleanup, res_serializer=res_serializer, - config=config) + config=config, logServer=loggingServer) if action == "run": return nettestctl.run_recipe(packet_capture) elif action == "dump": @@ -75,11 +75,10 @@ def get_recipe_result(args, file_path, remoteexec, cleanup, res_serializer, packet_capture, config): res_serializer.add_recipe(file_path) Logs.set_logging_root_path(file_path) - loggingServer = LoggingServer(config.get_option('log', 'port'), - Logs.root_path, Logs.debug) + loggingServer = LoggingServer(Logs.root_path, Logs.debug) loggingServer.start() - res = process_recipe(args, file_path, remoteexec, cleanup, - res_serializer, packet_capture, config) + res = process_recipe(args, file_path, remoteexec, cleanup, res_serializer, + packet_capture, config, loggingServer) loggingServer.stop() return ((file_path, res))
You patchset looks good to me. I'll apply that and try out.
Also, did you already try to push this to upstream python code?
Jiri
Tue, Oct 02, 2012 at 01:42:10PM CEST, olichtne@redhat.com wrote:
From: Ondrej Lichtner olichtne@redhat.com
This file defines a new class ServerHandler. This class is a logging handler and therefore is a decendant of the class logging.Handler.
This new class contains a TCP server socket listening on a specified port. Until an incoming connection is recieved emitted messages are stored in a buffer. After recieving a connection the stored messages are all sent to the client and the buffer is flushed.
To avoid using threads or a new process for this server, the server socket is created as nonblocking and we check for incoming connections only when a new log message is emitted.
This handler was created because there is no suitable alternative for this functionality in the python standard library. However this handler is inspired by the SocketHandler and uses some of it's code.
Signed-off-by: Ondrej Lichtner olichtne@redhat.com
Common/LoggingHandler.py | 108 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 Common/LoggingHandler.py
diff --git a/Common/LoggingHandler.py b/Common/LoggingHandler.py new file mode 100644 index 0000000..9b08f9d --- /dev/null +++ b/Common/LoggingHandler.py @@ -0,0 +1,108 @@ +""" +Server-like logging handler. +Stores logged messages in a buffer. Every time a new message is emitted it +checks for incoming connections. If a connection is established it flushes the +messages stored in the buffer to the connecting client.
+Copyright 2012 Red Hat, Inc. +Licensed under the GNU General Public License, version 2 as +published by the Free Software Foundation; see COPYING for details. +"""
+__autor__ = """ +olichtne@redhat.com (Ondrej Lichtner) +"""
+import socket, struct, pickle +import logging
+DEFAULT_LOG_PORT = 9998
+class ServerHandler(logging.Handler):
- def __init__(self, port=DEFAULT_LOG_PORT):
logging.Handler.__init__(self)self.port = portself.sock = Noneself.buf = []self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setblocking(0)self.server_socket.bind(('0.0.0.0', self.port))self.server_socket.listen(1)- def makePickle(self, record):
"""Pickles the record in binary format with a length prefix, andreturns it ready for transmission across the socket.Function taken from class SocketHandler from standard pythonlibrary logging.handlers"""d = dict(record.__dict__)d['msg'] = record.getMessage()d['args'] = Noned['exc_info'] = Nones = pickle.dumps(d, 1)slen = struct.pack(">L", len(s))return slen + s- def emit(self, record):
try:s = self.makePickle(record)self.buf.append(s)self.send_all()except (KeyboardInterrupt, SystemExit):raiseexcept:logging.Handler.handleError(self, record)- def client_connection(self):
if self.sock == None:try:self.sock = self.server_socket.accept()[0]return Trueexcept:self.sock = Nonereturn Falseelse:return True- def send_all(self):
if self.client_connection():sent = len(self.buf)for record in self.buf:if not self.send(record):sent = self.buf.index(record)breakself.buf = self.buf[sent:]- def send(self, record):
try:if hasattr(self.sock, "sendall"):self.sock.sendall(record)else:sentsofar = 0left = len(record)while left > 0:sent = self.sock.send(record[sentsofar:])sentsofar = sentsofar + sentleft = left - sentreturn Trueexcept socket.error:self.sock.close()self.sock = Nonereturn False- def close(self):
if self.sock:self.sock.close()self.sock = Noneself.server_socket.close()self.server_socket = Noneself.buf = []logging.Handler.close(self)-- 1.7.11.4
LNST-developers mailing list LNST-developers@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/lnst-developers
On Thu, Oct 04, 2012 at 09:06:22AM +0200, Jiri Pirko wrote:
You patchset looks good to me. I'll apply that and try out.
Also, did you already try to push this to upstream python code?
Jiri
No, I wanted to look at their guidelines or rules or whatever they have today, but currently I want to fix the problems that just became visible. Upstream will hav eto wait...
Ondrej
Fri, Oct 05, 2012 at 08:05:35PM CEST, olichtne@redhat.com wrote:
On Thu, Oct 04, 2012 at 09:06:22AM +0200, Jiri Pirko wrote:
You patchset looks good to me. I'll apply that and try out.
Also, did you already try to push this to upstream python code?
Jiri
No, I wanted to look at their guidelines or rules or whatever they have today, but currently I want to fix the problems that just became visible. Upstream will hav eto wait...
np, I'm just curious :)
Ondrej
lnst-developers@lists.fedorahosted.org