From: Ondrej Lichtner olichtne@redhat.com
This commit heavily reimplements the lnst.Controller.Machine and lnst.Slave.NetTestSlave modules. Since these modules directly depend on each other, these changes are introduced together in the same commit.
On the Controller side: * this switches the Machine class implementation from working with the old XML recipe code to work purely with the Python recipe implementation, where the Machine class is instantiated by a SlavePoolManager and is used by the tester through the Host class providing a tester facing API.
* the Machine class now needs to synchronize different resources to the slave, namely: * Device classes from the lnst.Devices package * test module code, which is instantiated on the Controller and send to the Slave when required (including all parent classes required for the reconstruction of the object on the Slave). * In the future this can be extended to also send the InterfaceManager implementation, and maybe even SlaveMethods implementation.
* includes methods relaying calling the Device methods
* Removes the old Interface implementation since this is all handled by the new Device classes. And the original simple Device class.
* refactors the rpc_call method to only have one implementation for everything (including network namespaces, though they're not supported atm...).
* switches to using the new Job implementation.
On the Slave side: * accept Device, and test module classes, all the dynamically received classes are stored in the new Resource cache. If a dynamically received class is imported during recipe execution it is also cleaned up when the recipe finishes execution.
* ServerHandler now translates Device objects into Device references when sending these objects over sockets.
* switches to using the new Job implementation.
* comments out old code that has not been refactored and is not supported yet with the python recipe implementation - mostly network namespaces.
* all exceptions are now transferred to the Controller where they'll be reconstructed and raised so they can be handled there (unless they can be handled on the Slave)
Signed-off-by: Ondrej Lichtner olichtne@redhat.com
--- v2: * improved class and module synchronization implementation * join split lines in deviceref_to_device method --- lnst/Controller/Machine.py | 1327 ++++++++++---------------------------------- lnst/Slave/NetTestSlave.py | 829 +++++++++++++-------------- 2 files changed, 678 insertions(+), 1478 deletions(-)
diff --git a/lnst/Controller/Machine.py b/lnst/Controller/Machine.py index 3bfcf82..f057390 100644 --- a/lnst/Controller/Machine.py +++ b/lnst/Controller/Machine.py @@ -14,17 +14,24 @@ rpazdera@redhat.com (Radek Pazdera) import logging import socket import os +import sys import tempfile import signal from time import sleep from xmlrpclib import Binary -from functools import wraps from lnst.Common.NetUtils import normalize_hwaddr +from lnst.Common.Utils import sha256sum from lnst.Common.Utils import wait_for, create_tar_archive from lnst.Common.Utils import check_process_running +from lnst.Common.TestModule import BaseTestModule from lnst.Common.NetTestCommand import DEFAULT_TIMEOUT +from lnst.Common.DeviceError import DeviceDeleted, DeviceNotFound from lnst.Controller.Common import ControllerError from lnst.Controller.CtlSecSocket import CtlSecSocket +from lnst.Devices import device_classes +from lnst.Devices.Device import Device +from lnst.Devices.RemoteDevice import RemoteDevice +from lnst.Devices.VirtualDevice import VirtualDevice
# conditional support for libvirt if check_process_running("libvirtd"): @@ -51,7 +58,6 @@ class Machine(object): self._ctl_config = ctl_config self._slave_desc = None self._connection = None - self._configured = False self._system_config = {} self._security = security self._security["identity"] = ctl_config.get_option("security", @@ -76,8 +82,19 @@ class Machine(object): self._interfaces = [] self._namespaces = [] self._bg_cmds = {} + self._jobs = {} + self._job_id_seq = 0
self._device_database = {} + self._tmp_device_database = [] + + self._init_connection() + + def set_id(self, new_id): + self._id = new_id + + def get_id(self): + return self._id
def get_configuration(self): configuration = {} @@ -87,150 +104,96 @@ class Machine(object): configuration["redhat_release"] = self._slave_desc["redhat_release"]
configuration["interfaces"] = {} - for i in self._interfaces: - if not isinstance(i, UnusedInterface): - configuration["interface_"+i.get_id()] = i.get_config() + for dev in self._device_database.items(): + configuration["device_"+dev.name] = dev.get_config() return configuration
- def _if_id_exists(self, if_id): - for iface in self._interfaces: - if if_id == iface.get_id(): - return True - return False + def add_tmp_device(self, dev): + self._tmp_device_database.append(dev) + + def create_remote_device(self, dev): + dev_clsname = dev._dev_cls.__name__ + dev_args = dev._dev_args + dev_kwargs = dev._dev_kwargs + ret = self.rpc_call("create_device", clsname=dev_clsname, + args=dev_args, + kwargs=dev_kwargs) + dev.if_index = ret["if_index"] + self._device_database[ret["if_index"]] = dev + + def device_created(self, dev_data): + if_index = dev_data["if_index"] + if if_index not in self._device_database: + new_dev = None + if len(self._tmp_device_database) > 0: + for dev in self._tmp_device_database: + if dev._match_update_data(dev_data): + new_dev = dev + break
- def _generate_if_id(self, if_type): - i = 0 - while True: - if_id = "gen_%s_%d" % (if_type, i) - if not self._if_id_exists(if_id): - break - i += 1 - return if_id - - def _add_interface(self, if_id, if_type, cls): - if if_id != None: - if self._if_id_exists(if_id): - msg = "Interface '%s' already exists on machine '%s'" \ - % (if_id, self._id) - raise MachineError(msg) - else: - if_id = self._generate_if_id(if_type) + if new_dev is None: + new_dev = RemoteDevice(Device) + new_dev.host = self + new_dev.if_index = if_index + else: + self._tmp_device_database.remove(new_dev)
- iface = cls(self, if_id, if_type) - self._interfaces.append(iface) - return iface + new_dev.if_index = dev_data["if_index"]
- def remove_interface(self, if_id): - iface = self.get_interface(if_id) - self._interfaces.remove(iface) + self._device_database[if_index] = new_dev
- def interface_update(self, if_data): - try: - iface = self.get_interface(if_data["if_id"]) - except: - iface = None - if iface: - iface.update(if_data['if_data']) + def device_delete(self, dev_data): + if dev_data["if_index"] in self._device_database: + self._device_database[dev_data["if_index"]].deleted = True
- if if_data["if_data"]["if_index"] in self._device_database: - dev = self._device_database[if_data["if_data"]["if_index"]] - dev.update_data(if_data['if_data']) + def dev_db_get_if_index(self, if_index): + if if_index in self._device_database: + return self._device_database[if_index] else: - dev = Device(if_data["if_data"], self) - self._device_database[if_data["if_data"]["if_index"]] = dev - - def dev_db_delete(self, update_msg): - if update_msg["if_index"] in self._device_database: - del self._device_database[update_msg["if_index"]] + return None
def dev_db_get_name(self, dev_name): + #TODO move these to Slave to optimize quering for each device for if_index, dev in self._device_database.iteritems(): if dev.get_name() == dev_name: return dev return None
- # - # Factory methods for constructing interfaces on this machine. The - # types of interfaces are explained with the classes below. - # - def new_static_interface(self, if_id, if_type): - return self._add_interface(if_id, if_type, StaticInterface) - - def new_unused_interface(self, if_type): - return self._add_interface(None, if_type, UnusedInterface) - - def new_virtual_interface(self, if_id, if_type): - return self._add_interface(if_id, if_type, VirtualInterface) - - def new_soft_interface(self, if_id, if_type): - return self._add_interface(if_id, if_type, SoftInterface) - - def new_loopback_interface(self, if_id): - return self._add_interface(if_id, 'lo', LoopbackInterface) - - def get_interface(self, if_id): - for iface in self._interfaces: - if iface.get_id != None and if_id == iface.get_id(): - return iface - - msg = "Interface '%s' not found on machine '%s'" % (if_id, self._id) - raise MachineError(msg) - - def get_interfaces(self): - return self._interfaces - - def get_ordered_interfaces(self): - ordered_list = list(self._interfaces) - change = True - while change: - change = False - swap = False - ind1 = 0 - ind2 = 0 - for i in ordered_list: - master = i.get_primary_master() - if master != None: - ind1 = ordered_list.index(i) - ind2 = ordered_list.index(master) - if ind1 > ind2: - swap = True - break - if swap: - change = True - tmp = ordered_list[ind1] - ordered_list[ind1] = ordered_list[ind2] - ordered_list[ind2] = tmp - return ordered_list - - def _rpc_call(self, method_name, *args): - data = {"type": "command", "method_name": method_name, "args": args} - - self._msg_dispatcher.send_message(self._id, data) - result = self._msg_dispatcher.wait_for_result(self._id) - - return result + def get_dev_by_hwaddr(self, hwaddr): + #TODO move these to Slave to optimize quering for each device + for if_index, dev in self._device_database.iteritems(): + if dev.hwaddr == hwaddr: + return dev + return None
- def _rpc_call_to_netns(self, netns, method_name, *args): - data = {"type": "command", "method_name": method_name, "args": args} - msg = {"type": "to_netns", "netns": netns, "data": data} + def rpc_call(self, method_name, *args, **kwargs): + if "netns" in kwargs and kwargs["netns"] is not None: + netns = kwargs["netns"] + del kwargs["netns"] + msg = {"type": "to_netns", + "netns": netns, + "data": {"type": "command", + "method_name": method_name, + "args": args, + "kwargs": kwargs}} + else: + if "netns" in kwargs: + del kwargs["netns"] + msg = {"type": "command", + "method_name": method_name, + "args": args, + "kwargs": kwargs}
- self._msg_dispatcher.send_message(self._id, msg) - result = self._msg_dispatcher.wait_for_result(self._id) + self._msg_dispatcher.send_message(self, msg) + result = self._msg_dispatcher.wait_for_result(self)
return result
- def _rpc_call_x(self, netns, method_name, *args): - if not netns: - return self._rpc_call(method_name, *args) - return self._rpc_call_to_netns(netns, method_name, *args) - - def init_connection(self, recipe_name): + def _init_connection(self): """ Initialize the slave connection
- Calling this method will initialize the rpc connection to the - machine and initialize all the interfaces. Note, that it will - *not* configure the interfaces. They need to be configured - individually later on. + This will connect to the Slave, get it's description (should be + usable for matching), and checks version compatibility """ hostname = self._hostname port = self._port @@ -242,7 +205,7 @@ class Machine(object):
self._msg_dispatcher.add_slave(self, connection)
- hello, slave_desc = self._rpc_call("hello", recipe_name) + hello, slave_desc = self.rpc_call("hello") if hello != "hello": msg = "Unable to establish RPC connection " \ "to machine %s, handshake failed!" % hostname @@ -266,14 +229,45 @@ class Machine(object):
self._slave_desc = slave_desc
- devices = self._rpc_call("get_devices") + def set_recipe(self, recipe_name): + """ Reserves the machine for the specified recipe + + Also sends Device classes from the controller and initializes the + InterfaceManager on the Slave and builds the device database. + """ + self.rpc_call("set_recipe", recipe_name) + self._send_device_classes() + self.rpc_call("init_if_manager") + + devices = self.rpc_call("get_devices") for if_index, dev in devices.items(): - self._device_database[if_index] = Device(dev, self) + remote_dev = RemoteDevice(Device) + remote_dev.host = self + remote_dev.if_index = if_index
- for iface in self._interfaces: - iface.initialize() + self._device_database[if_index] = remote_dev + + def _send_device_classes(self): + classes = [] + for cls_name, cls in device_classes: + classes.extend(reversed(self._get_base_classes(cls))) + + for cls in classes: + if cls is object: + continue + module_name = cls.__module__ + module = sys.modules[module_name] + filename = module.__file__
- self._configured = True + if filename[-3:] == "pyc": + filename = filename[:-1] + + res_hash = self.sync_resource(module_name, filename) + self.rpc_call("load_cached_module", module_name, res_hash) + + for cls_name, cls in device_classes: + module_name = cls.__module__ + self.rpc_call("map_device_class", cls_name, module_name)
def is_git_version(self, version): try: @@ -282,10 +276,12 @@ class Machine(object): except ValueError: return True
- def is_configured(self): - """ Test if the machine was configured """ - - return self._configured + def cleanup_devices(self): + self.rpc_call("destroy_devices") + for dev in self._device_database.values(): + if isinstance(dev, VirtualDevice): + dev.destroy() + self._device_database = {}
def cleanup(self): """ Clean the machine up @@ -295,114 +291,144 @@ class Machine(object): all the interfaces that have been configured on the machine, and finalize and close the rpc connection to the machine. """ - if not self._configured: - return - #connection to the slave was closed - if not self._msg_dispatcher.get_connection(self._id): + if not self._msg_dispatcher.get_connection(self): return
- ordered_ifaces = self.get_ordered_interfaces() try: #dump statistics - for iface in self._interfaces: - # Getting stats only from real interfaces - if isinstance(iface, UnusedInterface): - continue - stats = iface.link_stats() - logging.debug("%s:%s:%s: RX:\t bytes: %d\t packets: %d\t dropped: %d" % - (iface.get_netns(), iface.get_host(), iface.get_id(), - stats["rx_bytes"], stats["rx_packets"], stats["rx_dropped"])) - logging.debug("%s:%s:%s: TX:\t bytes: %d\t packets: %d\t dropped: %d" % - (iface.get_netns(), iface.get_host(), iface.get_id(), - stats["tx_bytes"], stats["tx_packets"], stats["tx_dropped"])) - - self._rpc_call("kill_cmds") + # for iface in self._interfaces: + # # Getting stats only from real interfaces + # if isinstance(iface, UnusedInterface): + # continue + # stats = iface.link_stats() + # logging.debug("%s:%s:%s: RX:\t bytes: %d\t packets: %d\t dropped: %d" % + # (iface.get_netns(), iface.get_host(), iface.get_id(), + # stats["rx_bytes"], stats["rx_packets"], stats["rx_dropped"])) + # logging.debug("%s:%s:%s: TX:\t bytes: %d\t packets: %d\t dropped: %d" % + # (iface.get_netns(), iface.get_host(), iface.get_id(), + # stats["tx_bytes"], stats["tx_packets"], stats["tx_dropped"])) + + self.rpc_call("kill_jobs") for netns in self._namespaces: - self._rpc_call_to_netns(netns, "kill_cmds") + self.rpc_call("kill_jobs", netns=netns)
self.restore_system_config() - - ordered_ifaces.reverse() - for iface in ordered_ifaces: - iface.deconfigure() - for iface in ordered_ifaces: - iface.cleanup() - - self.del_namespaces() - - self.restore_nm_option() - self._rpc_call("bye") + self.cleanup_devices() + # self.del_namespaces() + # self.restore_nm_option() + self.rpc_call("bye") except: #cleanup is only meaningful on dynamic interfaces, and should #always be called when deconfiguration happens- especially #when something on the slave breaks during deconfiguration - for iface in ordered_ifaces: - if not isinstance(iface, VirtualInterface): - continue - iface.cleanup() + self.cleanup_devices() raise - finally: - self._msg_dispatcher.disconnect_slave(self.get_id()) - - self._configured = False
def _timeout_handler(self, signum, frame): - msg = "RPC connection to machine %s timed out" % self.get_id() + msg = "Timeout expired on machine %s" % self.get_id() raise MachineError(msg)
- def run_command(self, command): - """ Run a command on the machine """ + def _get_base_classes(self, cls): + new_bases = [cls] + list(cls.__bases__) + bases = [] + while len(new_bases) != len(bases): + bases = new_bases + new_bases = list(bases) + for b in bases: + for bs in b.__bases__: + if bs not in new_bases: + new_bases.append(bs) + return new_bases + + def run_job(self, job): + job.id = self._job_id_seq + self._job_id_seq += 1 + self._jobs[job.id] = job + + if job._type == "module": + classes = [job._what] + classes.extend(self._get_base_classes(job._what.__class__)) + + for cls in reversed(classes): + if cls is object or cls is BaseTestModule: + continue + m_name = cls.__module__ + m = sys.modules[m_name] + filename = m.__file__ + if filename[-3:] == "pyc": + filename = filename[:-1]
- prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler) + res_hash = self.sync_resource(m_name, filename)
- if 'bg_id' in command: - self._bg_cmds[command['bg_id']] = command - if command["type"] in ["wait", "intr", "kill"]: - bg_cmd = self._bg_cmds[command["proc_id"]] - if bg_cmd["netns"] != None: - command["netns"] = bg_cmd["netns"] - - netns = command["netns"] - if command["type"] == "wait": - logging.debug("Get remaining time of bg process with bg_id == %s" - % command["proc_id"]) - remaining_time = self._rpc_call_x(netns, "get_remaining_time", - command["proc_id"]) - logging.debug("Setting timeout to %d", remaining_time) - if remaining_time > 0: - signal.alarm(remaining_time) - else: - # 2 seconds is enough time to do wait via RPC and collect - # the result - signal.alarm(2) - else: - if "timeout" in command: - timeout = command["timeout"] - logging.debug("Setting timeout to "%d"", timeout) - signal.alarm(timeout) - else: - logging.debug("Setting default timeout (%ds)." % DEFAULT_TIMEOUT) - signal.alarm(DEFAULT_TIMEOUT) + self.rpc_call("load_cached_module", m_name, res_hash) + + logging.info("Host %s executing job %d: %s" % + (self._id, job.id, str(job))) + if job._desc is not None: + logging.info("Job description: %s" % job._desc) + + return self.rpc_call("run_job", job._to_dict(), netns=job._netns) + + def wait_for_job(self, job, timeout): + res = True + if job.id not in self._jobs: + raise MachineError("No job '%s' running on Machine %s" % + (job.id, self._id)) + + prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler) + signal.alarm(timeout)
try: - cmd_res = self._rpc_call_x(netns, "run_command", command) + if timeout > 0: + logging.info("Waiting for Job %d on Host %s for %d seconds." % + (job.id, self._id, timeout)) + elif timeout == 0: + logging.info("Waiting for Job %d on Host %s." % + (job.id, self._id)) + result = self._msg_dispatcher.wait_for_finish(self, job.id) except MachineError as exc: - if "proc_id" in command: - cmd_res = self._rpc_call_x(netns, "kill_command", - command["proc_id"]) - else: - cmd_res = self._rpc_call_x(netns, "kill_command", - None) + logging.error(str(exc)) + res = False
- if "killed" in cmd_res and cmd_res["killed"]: - cmd_res["passed"] = False - cmd_res["msg"] = str(exc) + signal.alarm(0) + signal.signal(signal.SIGALRM, prev_handler) + + return res + + def wait_for_tmp_devices(self, timeout): + res = False + prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler) + signal.alarm(timeout) + + try: + if timeout > 0: + logging.info("Waiting for Device creation Host %s for %d seconds." % + (self._id, timeout)) + elif timeout == 0: + logging.info("Waiting for Device creation on Host %s." % + (self._id)) + + while len(self._tmp_device_database) > 0: + result = self._msg_dispatcher.handle_messages() + except MachineError as exc: + logging.error(str(exc)) + res = False
signal.alarm(0) signal.signal(signal.SIGALRM, prev_handler) + return res + + def job_finished(self, msg): + job_id = msg["job_id"] + job = self._jobs[job_id] + job._res = msg["result"]
- return cmd_res + def kill(self, job, signal): + if job.id not in self._jobs: + raise MachineError("No job '%s' running on Machine %s" % + (job.id(), self._id)) + return self.rpc_call("kill_job", job.id, signal, netns=job.netns)
def get_hostname(self): """ Get hostname/ip of the machine @@ -415,13 +441,6 @@ class Machine(object): def get_libvirt_domain(self): return self._libvirt_domain
- def get_id(self): - """ Returns machine's id as defined in the recipe """ - return self._id - - def set_rpc(self, dispatcher): - self._msg_dispatcher = dispatcher - def get_mac_pool(self): if self._mac_pool: return self._mac_pool @@ -432,9 +451,9 @@ class Machine(object): self._mac_pool = mac_pool
def restore_system_config(self): - self._rpc_call("restore_system_config") + self.rpc_call("restore_system_config") for netns in self._namespaces: - self._rpc_call_to_netns(netns, "restore_system_config") + self.rpc_call("restore_system_config", netns=netns) return True
def set_network_bridges(self, bridges): @@ -459,7 +478,7 @@ class Machine(object):
tmp = {} for netns in namespaces: - tmp.update(self._rpc_call_x(netns, "start_packet_capture", "")) + tmp.update(self.rpc_call("start_packet_capture", "", netns=netns)) return tmp
def stop_packet_capture(self): @@ -468,10 +487,10 @@ class Machine(object): namespaces.add(iface.get_netns())
for netns in namespaces: - self._rpc_call_x(netns, "stop_packet_capture") + self.rpc_call("stop_packet_capture", netns=netns)
def copy_file_to_machine(self, local_path, remote_path=None, netns=None): - remote_path = self._rpc_call_x(netns, "start_copy_to", remote_path) + remote_path = self.rpc_call("start_copy_to", remote_path, netns=netns)
f = open(local_path, "rb")
@@ -480,14 +499,14 @@ class Machine(object): if len(data) == 0: break
- self._rpc_call_x(netns, "copy_part_to", remote_path, Binary(data)) + self.rpc_call("copy_part_to", remote_path, data, netns=netns)
- self._rpc_call_x(netns, "finish_copy_to", remote_path) + self.rpc_call("finish_copy_to", remote_path, netns=netns)
return remote_path
def copy_file_from_machine(self, remote_path, local_path): - status = self._rpc_call("start_copy_from", remote_path) + status = self.rpc_call("start_copy_from", remote_path) if not status: raise MachineError("The requested file cannot be transfered." \ "It does not exist on machine %s" % self.get_id()) @@ -495,836 +514,54 @@ class Machine(object): local_file = open(local_path, "wb")
buf_size = 1024*1024 # 1MB buffer - binary = "next" - while binary != "": - binary = self._rpc_call("copy_part_from", remote_path, buf_size) - local_file.write(binary.data) + while True: + data = self.rpc_call("copy_part_from", remote_path, buf_size) + if data == "": + break + local_file.write(data)
local_file.close() - self._rpc_call("finish_copy_from", remote_path) - - def sync_resources(self, required): - self._rpc_call("clear_resource_table") - - for res_type, resources in required.iteritems(): - for res_name, res in resources.iteritems(): - has_resource = self._rpc_call("has_resource", res["hash"]) - if not has_resource: - msg = "Transfering %s %s to machine %s" % \ - (res_name, res_type, self.get_id()) - logging.info(msg) + self.rpc_call("finish_copy_from", remote_path)
- local_path = required[res_type][res_name]["path"] + def sync_resource(self, res_name, file_path): + digest = sha256sum(file_path)
- if res_type == "tools": - archive = tempfile.NamedTemporaryFile(delete=False) - archive_path = archive.name - archive.close() + if not self.rpc_call("has_resource", digest): + msg = "Transfering %s to machine %s as '%s'" % (file_path, + self.get_id(), + res_name) + logging.debug(msg)
- create_tar_archive(local_path, archive_path, True) - local_path = archive_path + remote_path = self.copy_file_to_machine(file_path) + self.rpc_call("add_resource_to_cache", + "file", remote_path, res_name) + return digest
- remote_path = self.copy_file_to_machine(local_path) - self._rpc_call("add_resource_to_cache", res["hash"], - remote_path, res_name, res["path"], res_type) + # def enable_nm(self): + # return self._rpc_call("enable_nm")
- for ns in self._namespaces: - remote_path = self.copy_file_to_machine(local_path, - netns=ns) - self._rpc_call_to_netns(ns, "add_resource_to_cache", - res["hash"], remote_path, res_name, res["path"], - res_type) + # def disable_nm(self): + # return self._rpc_call("disable_nm")
- if res_type == "tools": - os.unlink(archive_path) - - self._rpc_call("map_resource", res["hash"], res_type, res_name) - for ns in self._namespaces: - self._rpc_call_to_netns(ns, "map_resource", res["hash"], - res_type, res_name) - - def enable_nm(self): - return self._rpc_call("enable_nm") - - def disable_nm(self): - return self._rpc_call("disable_nm") - - def restore_nm_option(self): - return self._rpc_call("restore_nm_option") + # def restore_nm_option(self): + # return self._rpc_call("restore_nm_option")
def __str__(self): return "[Machine hostname(%s) libvirt_domain(%s) interfaces(%d)]" % \ (self._hostname, self._libvirt_domain, len(self._interfaces))
- def add_netns(self, netns): - self._namespaces.append(netns) - return self._rpc_call("add_namespace", netns) + # def add_netns(self, netns): + # self._namespaces.append(netns) + # return self._rpc_call("add_namespace", netns)
- def del_netns(self, netns): - return self._rpc_call("del_namespace", netns) - - def del_namespaces(self): - for netns in self._namespaces: - self.del_netns(netns) - self._namespaces = [] - return True + # def del_netns(self, netns): + # return self._rpc_call("del_namespace", netns)
- def wait_interface_init(self): - return self._rpc_call("wait_interface_init") + # def del_namespaces(self): + # for netns in self._namespaces: + # self.del_netns(netns) + # self._namespaces = [] + # return True
def get_security(self): return self._security - -class Interface(object): - """ Abstraction of a test network interface on a slave machine - - This is a base class for object that represent test interfaces - on a test machine. - """ - def __init__(self, machine, if_id, if_type): - self._machine = machine - self._configured = False - - self._id = if_id - self._type = if_type - - self._hwaddr = None - self._devname = None - self._network = None - self._netem = None - - self._slaves = {} - self._slave_options = {} - self._addresses = [] - self._options = [] - - self._master = {"primary": None, "other": []} - - self._ovs_conf = None - - self._netns = None - self._peer = None - self._mtu = None - self._driver = None - self._devlink = None - - def get_id(self): - return self._id - - def get_type(self): - return self._type - - def get_driver(self): - return self._driver - - def set_hwaddr(self, hwaddr): - self._hwaddr = normalize_hwaddr(hwaddr) - - def get_hwaddr(self): - if not self._hwaddr: - msg = "Hardware address is not available for interface '%s'" \ - % self.get_id() - raise MachineError(msg) - return self._hwaddr - - def set_devname(self, devname): - self._devname = devname - - def get_devname(self): - if not self._devname: - msg = "Device name is not available for interface '%s'" \ - % self.get_id() - raise MachineError(msg) - return self._devname - - def set_network(self, network): - self._network = network - - def get_network(self): - if not self._network: - msg = "Network segment is not available for interface '%s'" \ - % self.get_id() - raise MachineError(msg) - return self._network - - def set_option(self, name, value): - self._options.append((name, value)) - - def set_netem(self, netem): - self._netem = netem - - def add_master(self, master, primary=True): - if primary and self._master["primary"] != None: - msg = "Interface %s already has a primary master."\ - % self.get_id() - raise MachineError(msg) - else: - if primary: - self._master["primary"] = master - else: - self._master["other"].append(master) - - def del_master(self, master): - if self._master["primary"] is master: - self._master["primary"] = None - else: - self._master["other"].remove(master) - - def get_primary_master(self): - return self._master["primary"] - - def add_slave(self, iface): - self._slaves[iface.get_id()] = iface - if self._type in ["vlan", "vxlan"]: - iface.add_master(self, primary=False) - else: - iface.add_master(self) - - def del_slave(self, iface): - iface.del_master(self) - del self._slaves[iface.get_id()] - - def set_slave_option(self, slave_id, name, value): - if slave_id not in self._slave_options: - self._slave_options[slave_id] = [] - self._slave_options[slave_id].append((name, value)) - - def add_address(self, addr): - if (type(addr) == type([])): - for one_addr in addr: - self._addresses.append(one_addr) - else: - self._addresses.append(addr) - - def get_address(self, num): - return self._addresses[num].split('/')[0] - - def get_addresses(self): - addrs = [] - for addr in self._addresses: - addrs.append(tuple(addr.split('/'))) - return addrs - - def set_ovs_conf(self, ovs_conf): - self._ovs_conf = ovs_conf - - def get_ovs_conf(self): - return self._ovs_conf - - def set_netns(self, netns): - self._netns = netns - - def get_netns(self): - return self._netns - - def get_host(self): - return self._machine.get_id() - - def set_peer(self, peer): - self._peer = peer - - def get_peer(self): - return self._peer - - def get_prefix(self, num): - try: - return self._addresses[num].split('/')[1] - except IndexError: - raise PrefixMissingError - - def get_mtu(self): - return self._mtu - - def set_mtu(self, mtu): - command = {"type": "config", - "host": self._machine.get_id(), - "persistent": False, - "options":[ - {"name": "/sys/class/net/%s/mtu" % self._devname, - "value": str(mtu)} - ]} - command["netns"] = self._netns - - self._machine.run_command(command) - self._mtu = mtu - return self._mtu - - def link_stats(self): - stats = self._machine._rpc_call_x(self._netns, "link_stats", - self._id) - return stats - - def set_addresses(self, ips): - self._addresses = ips - self._machine._rpc_call_x(self._netns, "set_addresses", - self._id, ips) - - def add_route(self, dest): - self._machine._rpc_call_x(self._netns, "add_route", - self._id, dest) - - def del_route(self, dest): - self._machine._rpc_call_x(self._netns, "del_route", - self._id, dest) - - def update_from_slave(self): - if_data = self._machine._rpc_call_x(self._netns, "get_if_data", - self._id) - - if if_data is not None: - self.update(if_data) - return - - def update(self, if_data): - self.set_hwaddr(if_data["hwaddr"]) - self.set_devname(if_data["name"]) - self._mtu = if_data["mtu"] - self._driver = if_data["driver"] - self._devlink = if_data["devlink"] - - def get_config(self): - config = {"id": self._id, - "hwaddr": self._hwaddr, - "devname": self._devname, - "network_label": self._network, - "type": self._type, - "addresses": self._addresses, - "slaves": self._slaves.keys(), - "options": self._options, - "slave_options": self._slave_options, - "master": None, - "other_masters": [], - "ovs_conf": self._ovs_conf, - "netns": self._netns, - "peer": self._peer, - "netem": self._netem, - "mtu": self._mtu, - "driver": self._driver} - - if self._master["primary"] != None: - config["master"] = self._master["primary"].get_id() - - for m in self._master["other"]: - config["other_masters"].append(m.get_id()) - - return config - - def up(self): - self._machine._rpc_call_x(self._netns, "set_device_up", self._id) - - def down(self): - self._machine._rpc_call_x(self._netns, "set_device_down", self._id) - - def set_link_up(self): - self._machine._rpc_call_x(self._netns, "set_link_up", self._id) - - def set_link_down(self): - self._machine._rpc_call_x(self._netns, "set_link_down", self._id) - - def initialize(self): - phys_devs = self._machine._rpc_call("map_if_by_hwaddr", - self._id, self._hwaddr) - - if len(phys_devs) == 1: - self.set_devname(phys_devs[0]["name"]) - elif len(phys_devs) < 1: - msg = "Device %s not found on machine %s" \ - % (self.get_id(), self._machine.get_id()) - raise MachineError(msg) - elif len(phys_devs) > 1: - msg = "More than one device with hwaddr %s found on machine %s" \ - % (self._hwaddr, self._machine.get_id()) - raise MachineError(msg) - - self.down() - - def cleanup(self): - self._machine._rpc_call("unmap_if", self._id) - - def configure(self): - if self._configured: - msg = "Unable to configure interface %s on machine %s. " \ - "It has been configured already." % (self.get_id(), - self._machine.get_id()) - raise MachineError(msg) - else: - self._configured = True - - logging.info("Configuring interface %s on machine %s", self.get_id(), - self._machine.get_id()) - - if self._netns != None: - self._machine._rpc_call("set_if_netns", self.get_id(), self._netns) - self._machine._rpc_call_x(self._netns, "configure_interface", - self.get_id(), self.get_config()) - - self.update_from_slave() - - def deconfigure(self): - if not self._configured: - return - - self._machine._rpc_call_x(self._netns, "deconfigure_interface", - self.get_id()) - if self._netns != None: - self._machine._rpc_call_to_netns(self._netns, - "return_if_netns", self.get_id()) - self._configured = False - - def add_br_vlan(self, br_vlan_info): - self._machine._rpc_call_x(self._netns, "add_br_vlan", - self._id, br_vlan_info) - - def del_br_vlan(self, br_vlan_info): - self._machine._rpc_call_x(self._netns, "del_br_vlan", - self._id, br_vlan_info) - - def get_br_vlans(self): - return self._machine._rpc_call_x(self._netns, "get_br_vlans", self._id) - - def add_br_fdb(self, br_fdb_info): - self._machine._rpc_call_x(self._netns, "add_br_fdb", - self._id, br_fdb_info) - - def del_br_fdb(self, br_fdb_info): - self._machine._rpc_call_x(self._netns, "del_br_fdb", - self._id, br_fdb_info) - - def get_br_fdbs(self): - return self._machine._rpc_call_x(self._netns, "get_br_fdbs", self._id) - - def set_br_learning(self, br_learning_info): - self._machine._rpc_call_x(self._netns, "set_br_learning", self._id, - br_learning_info) - - def set_br_learning_sync(self, br_learning_sync_info): - self._machine._rpc_call_x(self._netns, "set_br_learning_sync", self._id, - br_learning_sync_info) - - def set_br_flooding(self, br_flooding_info): - self._machine._rpc_call_x(self._netns, "set_br_flooding", self._id, - br_flooding_info) - - def set_br_state(self, br_state_info): - self._machine._rpc_call_x(self._netns, "set_br_state", self._id, - br_state_info) - - def set_speed(self, speed): - self._machine._rpc_call_x(self._netns, "set_speed", self._id, speed) - - def set_autoneg(self): - self._machine._rpc_call_x(self._netns, "set_autoneg", self._id) - - def slave_add(self, if_id): - self._machine._rpc_call_x(self._netns, "slave_add", self._id, if_id) - self.add_slave(self._machine.get_interface(if_id)) - - def slave_del(self, if_id): - self.del_slave(self._machine.get_interface(if_id)) - self._machine._rpc_call_x(self._netns, "slave_del", self._id, if_id) - - def get_devlink_name(self): - if self._devlink: - return "%s/%s" % (self._devlink["bus_name"], - self._devlink["dev_name"]) - return None - - def get_devlink_port_name(self): - if self._devlink: - return "%s/%u" % (self.get_devlink_name(), - self._devlink["port_index"]) - return None - -class StaticInterface(Interface): - """ Static interface - - This class represents interfaces that are present on the - machine. LNST will only use them for testing without performing - any special actions. - - This type is suitable for physical interfaces. - """ - def __init__(self, machine, if_id, if_type): - super(StaticInterface, self).__init__(machine, if_id, if_type) - -class LoopbackInterface(Interface): - """ Static interface - - This class represents interfaces that are present on the - machine. LNST will only use them for testing without performing - any special actions. - - This type is suitable for physical interfaces. - """ - def __init__(self, machine, if_id, if_type): - super(LoopbackInterface, self).__init__(machine, if_id, if_type) - - def initialize(self): - pass - - def cleanup(self): - pass - - def configure(self): - self._hwaddr = '00:00:00:00:00:00' - self._driver = 'loopback' - - phys_devs = self._machine._rpc_call_x(self._netns, - "map_if_by_params", self._id, - { 'hwaddr': self._hwaddr, - 'driver': self._driver }) - - if len(phys_devs) == 1: - self.set_devname(phys_devs[0]["name"]) - elif len(phys_devs) < 1: - msg = "Device %s not found on machine %s" \ - % (self.get_id(), self._machine.get_id()) - raise MachineError(msg) - elif len(phys_devs) > 1: - msg = "More than one device with hwaddr %s found on machine %s" \ - % (self._hwaddr, self._machine.get_id()) - raise MachineError(msg) - - if self._configured: - msg = "Unable to configure interface %s on machine %s. " \ - "It has been configured already." % (self.get_id(), - self._machine.get_id()) - raise MachineError(msg) - - logging.info("Configuring interface %s on machine %s", self.get_id(), - self._machine.get_id()) - - self._machine._rpc_call_x(self._netns, "configure_interface", - self.get_id(), self.get_config()) - self._configured = True - self.update_from_slave() - - def deconfigure(self): - if not self._configured: - return - - self._machine._rpc_call_x(self._netns, "deconfigure_interface", - self.get_id()) - self._machine._rpc_call_x(self._netns, "unmap_if", self.get_id()) - self._configured = False - -class VirtualInterface(Interface): - """ Dynamically created interface - - This class represents interfaces in libvirt virtual machines - that were created dynamically by LNST just for this test. - - This requires some special handling and communication with - libvirt. - """ - def __init__(self, machine, if_id, if_type): - super(VirtualInterface, self).__init__(machine, if_id, if_type) - self._driver = "virtio" - - def set_driver(self, driver): - self._driver = driver - - def get_driver(self): - return self._driver - - def get_orig_hwaddr(self): - if not self._orig_hwaddr: - msg = "Hardware address is not available for interface '%s'" \ - % self.get_id() - raise MachineError(msg) - return self._orig_hwaddr - - def initialize(self): - domain_ctl = self._machine.get_domain_ctl() - - if self._hwaddr: - query = self._machine._rpc_call('get_devices_by_hwaddr', - self._hwaddr) - if len(query): - msg = "Device with hwaddr %s already exists" % self._hwaddr - raise MachineError(msg) - else: - mac_pool = self._machine.get_mac_pool() - while True: - self._hwaddr = normalize_hwaddr(mac_pool.get_addr()) - query = self._machine._rpc_call('get_devices_by_hwaddr', - self._hwaddr) - if not len(query): - break - - bridges = self._machine.get_network_bridges() - if self._network in bridges: - net_ctl = bridges[self._network] - else: - bridges[self._network] = net_ctl = VirtNetCtl() - net_ctl.init() - - net_name = net_ctl.get_name() - - logging.info("Creating interface %s (%s) on machine %s", - self.get_id(), self._hwaddr, self._machine.get_id()) - - self._orig_hwaddr = self._hwaddr - domain_ctl.attach_interface(self._hwaddr, net_name, self._driver) - - - # The sleep here is necessary, because udev sometimes renames the - # newly created device and if the query for name comes too early, - # the controller will then try to configure an nonexistent device - sleep(1) - - ready = wait_for(self.is_ready, timeout=10) - - if not ready: - msg = "Netdevice initialization failed." \ - "Unable to create device %s (%s) on machine %s" \ - % (self.get_id(), self._hwaddr, self._machine.get_id()) - raise MachineError(msg) - - super(VirtualInterface, self).initialize() - - def cleanup(self): - self._machine._rpc_call("unmap_if", self._id) - domain_ctl = self._machine.get_domain_ctl() - domain_ctl.detach_interface(self._orig_hwaddr) - - def is_ready(self): - ifaces = self._machine._rpc_call('get_devices_by_hwaddr', self._hwaddr) - return len(ifaces) > 0 - -class SoftInterface(Interface): - """ Software interface abstraction - - This type of interface represents interfaces created in the kernel - during the runtime. This includes devices such as bonds and teams. - """ - - def __init__(self, machine, if_id, if_type): - super(SoftInterface, self).__init__(machine, if_id, if_type) - - def initialize(self): - pass - - def cleanup(self): - pass - - def configure(self): - if self._configured: - return - else: - self._configured = True - - logging.info("Configuring interface %s on machine %s", self.get_id(), - self._machine.get_id()) - - if self._type == "veth": - peer_if = self._machine.get_interface(self._peer) - peer_config = peer_if.get_config() - dev_name, peer_name = self._machine._rpc_call("create_if_pair", - self._id, self.get_config(), - self._peer, peer_config) - self.set_devname(dev_name) - peer_if.set_devname(peer_name) - self._configured = True - peer_if._configured = True - return - - dev_name = self._machine._rpc_call_x(self._netns, - "create_soft_interface", - self._id, self.get_config()) - self.set_devname(dev_name) - self.update_from_slave() - - def deconfigure(self): - if not self._configured: - return - - if self._type == "veth": - peer_if = self._machine.get_interface(self._peer) - - self._machine._rpc_call("deconfigure_if_pair", self._id, self._peer) - self._machine._rpc_call("unmap_if", self._id) - self._machine._rpc_call("unmap_if", self._peer) - - self._configured = False - peer_if._configured = False - return - - self._machine._rpc_call_x(self._netns, "deconfigure_interface", - self.get_id()) - self._machine._rpc_call_x(self._netns, "unmap_if", self.get_id()) - self._configured = False - -class UnusedInterface(Interface): - """ Unused interface for this test - - This class represents interfaces that will not be used in the - current test setup. This applies when a slave machine from a - pool has more interfaces then the machine it was matched to - from the recipe. - - LNST still needs to know about these interfaces so it can turn - them off. - """ - - def __init__(self, machine, if_id, if_type): - super(UnusedInterface, self).__init__(machine, if_id, if_type) - - def initialize(self): - self._machine._rpc_call('set_unmapped_device_down', self._hwaddr) - - def set_driver(self, driver): - pass - - def configure(self): - pass - - def deconfigure(self): - pass - - def up(self): - pass - - def down(self): - pass - - def cleanup(self): - pass - -class Device(object): - """ Represents device information received from a Slave""" - - def pre_call_decorate(func): - @wraps(func) - def func_wrapper(inst, *args, **kwargs): - inst.slave_update() - return func(inst, *args, **kwargs) - return func_wrapper - - def __init__(self, data, machine): - self._if_index = data["if_index"] - self._hwaddr = None - self._name = None - self._ip_addrs = None - self._ifi_type = None - self._state = None - self._master = None - self._slaves = None - self._netns = None - self._peer = None - self._mtu = None - self._driver = None - self._devlink = None - - self._machine = machine - - self.update_data(data) - - def update_data(self, data): - if data["if_index"] != self._if_index: - return False - - self._hwaddr = data["hwaddr"] - self._name = data["name"] - self._ip_addrs = data["ip_addrs"] - self._ifi_type = data["ifi_type"] - self._state = data["state"] - self._master = data["master"] - self._slaves = data["slaves"] - self._netns = data["netns"] - self._peer = data["peer"] - self._mtu = data["mtu"] - self._driver = data["driver"] - self._devlink = data["driver"] - return True - - def slave_update(self): - res = self._machine._rpc_call_x(self._netns, - "get_device", - self._if_index) - if res: - self.update_data(res) - return - - def get_if_index(self): - return self._if_index - - @pre_call_decorate - def get_hwaddr(self): - return self._hwaddr - - @pre_call_decorate - def get_name(self): - return self._name - - @pre_call_decorate - def get_ip_addrs(self, selector={}): - return [ip["addr"] - for ip in self._ip_addrs - if selector.items() <= ip.items()] - - @pre_call_decorate - def get_ip_addr(self, num, selector={}): - ips = self.get_ip_addrs(selector) - return ips[num] - - @pre_call_decorate - def get_ifi_type(self): - return self._ifi_type - - @pre_call_decorate - def get_state(self): - return self._state - - @pre_call_decorate - def get_master(self): - return self._master - - @pre_call_decorate - def get_slaves(self): - return self._slaves - - @pre_call_decorate - def get_netns(self): - return self._netns - - @pre_call_decorate - def get_peer(self): - return self._peer - - @pre_call_decorate - def get_mtu(self): - return self._mtu - - def set_mtu(self, mtu): - command = {"type": "config", - "host": self._machine.get_id(), - "persistent": False, - "options":[ - {"name": "/sys/class/net/%s/mtu" % self._name, - "value": str(mtu)} - ]} - command["netns"] = self._netns - - self._machine.run_command(command) - - self.slave_update() - return self._mtu - - @pre_call_decorate - def get_driver(self): - return self._driver - - @pre_call_decorate - def get_devlink_name(self): - if self._devlink: - return "%s/%s" % (self._devlink["bus_name"], - self._devlink["dev_name"]) - return None - - @pre_call_decorate - def get_devlink_port_name(self): - if self._devlink: - return "%s/%u" % (self.get_devlink_name(), - self._devlink["port_index"]) - return None diff --git a/lnst/Slave/NetTestSlave.py b/lnst/Slave/NetTestSlave.py index b18f47d..0f654e3 100644 --- a/lnst/Slave/NetTestSlave.py +++ b/lnst/Slave/NetTestSlave.py @@ -19,8 +19,10 @@ import datetime import socket import ctypes import multiprocessing +import imp +import types from time import sleep, time -from xmlrpclib import Binary +from inspect import isclass from tempfile import NamedTemporaryFile from lnst.Common.Logs import log_exc_traceback from lnst.Common.PacketCapture import PacketCapture @@ -34,118 +36,150 @@ from lnst.Common.Utils import check_process_running from lnst.Common.Utils import is_installed from lnst.Common.ConnectionHandler import send_data from lnst.Common.ConnectionHandler import ConnectionHandler -from lnst.Common.Config import lnst_config from lnst.Common.Config import DefaultRPCPort +from lnst.Common.DeviceRef import DeviceRef +from lnst.Common.LnstError import LnstError +from lnst.Common.DeviceError import DeviceDeleted +from lnst.Common.IpAddress import IpAddress +from lnst.Slave.Job import Job, JobContext from lnst.Slave.InterfaceManager import InterfaceManager from lnst.Slave.BridgeTool import BridgeTool from lnst.Slave.SlaveSecSocket import SlaveSecSocket, SecSocketException
+Devices = types.ModuleType("Devices") +Devices.__path__ = ["lnst.Devices"] + +sys.modules["lnst.Devices"] = Devices + class SlaveMethods: ''' Exported xmlrpc methods ''' - def __init__(self, command_context, log_ctl, if_manager, net_namespaces, - server_handler, slave_server): + def __init__(self, job_context, log_ctl, net_namespaces, + server_handler, slave_config, slave_server): self._packet_captures = {} - self._if_manager = if_manager - self._command_context = command_context + self._if_manager = None + self._job_context = job_context self._log_ctl = log_ctl self._net_namespaces = net_namespaces self._server_handler = server_handler self._slave_server = slave_server + self._slave_config = slave_config
self._capture_files = {} self._copy_targets = {} self._copy_sources = {} self._system_config = {}
- self._cache = ResourceCache(lnst_config.get_option("cache", "dir"), - lnst_config.get_option("cache", "expiration_period")) + self._cache = ResourceCache(slave_config.get_option("cache", "dir"), + slave_config.get_option("cache", "expiration_period")) + + self._dynamic_modules = {} + self._dynamic_classes = {} + + self._bkp_nm_opt_val = slave_config.get_option("environment", "use_nm") + + def hello(self): + logging.info("Recieved a controller connection.") + + slave_desc = {} + if check_process_running("NetworkManager"): + slave_desc["nm_running"] = True + else: + slave_desc["nm_running"] = False
- self._resource_table = {'module': {}, 'tools': {}} + k_release, _ = exec_cmd("uname -r", False, False, False) + r_release, _ = exec_cmd("cat /etc/redhat-release", False, False, False) + slave_desc["kernel_release"] = k_release.strip() + slave_desc["redhat_release"] = r_release.strip() + slave_desc["lnst_version"] = self._slave_config.version
- self._bkp_nm_opt_val = lnst_config.get_option("environment", "use_nm") + return ("hello", slave_desc)
- def hello(self, recipe_path): + def set_recipe(self, recipe_name): self.machine_cleanup() self.restore_nm_option()
- logging.info("Recieved a controller connection.") - self.clear_resource_table() self._cache.del_old_entries() self.reset_file_transfers()
- self._if_manager.rescan_devices() - date = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S") - self._log_ctl.set_recipe(recipe_path, expand=date) + self._log_ctl.set_recipe(recipe_name, expand=date) sleep(1)
- slave_desc = {} if check_process_running("NetworkManager"): logging.warning("=============================================") logging.warning("NetworkManager is running on a slave machine!") - if lnst_config.get_option("environment", "use_nm"): + if self._slave_config.get_option("environment", "use_nm"): logging.warning("Support of NM is still experimental!") else: logging.warning("Usage of NM is disabled!") logging.warning("=============================================") - slave_desc["nm_running"] = True - else: - slave_desc["nm_running"] = False
- k_release, _ = exec_cmd("uname -r", False, False, False) - r_release, _ = exec_cmd("cat /etc/redhat-release", False, False, False) - slave_desc["kernel_release"] = k_release.strip() - slave_desc["redhat_release"] = r_release.strip() - slave_desc["lnst_version"] = lnst_config.version - - return ("hello", slave_desc) + return True
def bye(self): self.restore_system_config() - self.clear_resource_table() self._cache.del_old_entries() self.reset_file_transfers() self._remove_capture_files() return "bye"
- def kill_cmds(self): - logging.info("Killing all forked processes.") - self._command_context.cleanup() - return "Commands killed" + def map_device_class(self, cls_name, module_name): + if cls_name in self._dynamic_classes: + return
- def map_if_by_hwaddr(self, if_id, hwaddr): - devices = self.map_if_by_params(if_id, {'hwaddr' : hwaddr}) + module = self._dynamic_modules[module_name] + cls = getattr(module, cls_name)
- return devices + self._dynamic_classes[cls_name] = cls
- def map_if_by_params(self, if_id, params): - devices = self.get_devices_by_params(params) + setattr(Devices, cls_name, cls)
- if len(devices) == 1: - dev = self._if_manager.get_device_by_params(params) - self._if_manager.map_if(if_id, dev.get_if_index()) + def load_cached_module(self, module_name, res_hash): + self._cache.renew_entry(res_hash) + if module_name in self._dynamic_modules: + return + module_path = self._cache.get_path(res_hash) + module = imp.load_source(module_name, module_path) + self._dynamic_modules[module_name] = module
- return devices + def init_if_manager(self): + self._if_manager = InterfaceManager(self._server_handler) + for cls_name in dir(Devices): + cls = getattr(Devices, cls_name) + if isclass(cls): + self._if_manager.add_device_class(cls_name, cls)
- def unmap_if(self, if_id): - self._if_manager.unmap_if(if_id) + self._if_manager.rescan_devices() + self._server_handler.set_if_manager(self._if_manager) + self._server_handler.add_connection('netlink', + self._if_manager.get_nl_socket()) return True
+ def dev_method(self, if_index, name, args, kwargs): + dev = self._if_manager.get_device(if_index) + method = getattr(dev, name) + + return method(*args, **kwargs) + + def dev_attr(self, if_index, name): + dev = self._if_manager.get_device(if_index) + return getattr(dev, name) + def get_devices(self): self._if_manager.rescan_devices() devices = self._if_manager.get_devices() result = {} for device in devices: - result[device._if_index] = device.get_if_data() + result[device.if_index] = device._get_if_data() return result
def get_device(self, if_index): self._if_manager.rescan_devices() device = self._if_manager.get_device(if_index) if device: - return device.get_if_data() + return device._get_if_data() else: return None
@@ -156,7 +190,6 @@ class SlaveMethods: for entry in name_scan: if entry["name"] == devname: netdevs.append(entry) - return netdevs
def get_devices_by_hwaddr(self, hwaddr): @@ -167,14 +200,13 @@ class SlaveMethods: entry = {"name": dev.get_name(), "hwaddr": dev.get_hwaddr()} matched.append(entry) - return matched
def get_devices_by_params(self, params): devices = self._if_manager.get_devices() matched = [] for dev in devices: - dev_data = dev.get_if_data() + dev_data = dev._get_if_data() entry = {"name": dev.get_name(), "hwaddr": dev.get_hwaddr()} for key, value in params.iteritems(): @@ -184,180 +216,121 @@ class SlaveMethods:
if entry is not None: matched.append(entry) - return matched
- def get_if_data(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is None: - return None - return dev.get_if_data() - - def link_stats(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is None: - logging.error("Device with id '%s' not found." % if_id) - return None - return dev.link_stats() - - def set_addresses(self, if_id, ips): - dev = self._if_manager.get_mapped_device(if_id) - if dev is None: - logging.error("Device with id '%s' not found." % if_id) - return False - dev.set_addresses(ips) - return True - - def add_route(self, if_id, dest): - dev = self._if_manager.get_mapped_device(if_id) - if dev is None: - logging.error("Device with id '%s' not found." % if_id) - return False - dev.add_route(dest) - return True - - def del_route(self, if_id, dest): - dev = self._if_manager.get_mapped_device(if_id) + def destroy_devices(self): + devices = self._if_manager.get_devices() + for dev in devices: + try: + dev.destroy() + except DeviceDeleted: + pass + self._if_manager.rescan_devices() + + # def add_route(self, if_id, dest): + # dev = self._if_manager.get_mapped_device(if_id) + # if dev is None: + # logging.error("Device with id '%s' not found." % if_id) + # return False + # dev.add_route(dest) + # return True + + # def del_route(self, if_id, dest): + # dev = self._if_manager.get_mapped_device(if_id) + # if dev is None: + # logging.error("Device with id '%s' not found." % if_id) + # return False + # dev.del_route(dest) + # return True + + def create_device(self, clsname, args=[], kwargs={}): + dev = self._if_manager.create_device(clsname, args, kwargs) if dev is None: - logging.error("Device with id '%s' not found." % if_id) - return False - dev.del_route(dest) - return True - - def set_device_up(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - dev.up() - return True - - def set_device_down(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.down() - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - - def set_link_up(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.link_up() - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - - def set_link_down(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.link_down() - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - - def set_unmapped_device_down(self, hwaddr): - dev = self._if_manager.get_device_by_hwaddr(hwaddr) - if dev is not None: - dev.down() - else: - logging.warning("Device with hwaddr '%s' not found." % hwaddr) - return True - - def configure_interface(self, if_id, config): - device = self._if_manager.get_mapped_device(if_id) - device.set_configuration(config) - device.configure() - return True - - def create_soft_interface(self, if_id, config): - dev_name = self._if_manager.create_device_from_config(if_id, config) - dev = self._if_manager.get_mapped_device(if_id) - dev.configure() - return dev_name - - def create_if_pair(self, if_id1, config1, if_id2, config2): - dev_names = self._if_manager.create_device_pair(if_id1, config1, - if_id2, config2) - dev1 = self._if_manager.get_mapped_device(if_id1) - dev2 = self._if_manager.get_mapped_device(if_id2) - - while dev1.get_if_index() == None and dev2.get_if_index() == None: - msgs = self._server_handler.get_messages_from_con('netlink') - for msg in msgs: - self._if_manager.handle_netlink_msgs(msg[1]["data"]) - - if config1["netns"] != None: - hwaddr = dev1.get_hwaddr() - self.set_if_netns(if_id1, config1["netns"]) - - msg = {"type": "command", "method_name": "configure_interface", - "args": [if_id1, config1]} - self._server_handler.send_data_to_netns(config1["netns"], msg) - result = self._slave_server.wait_for_result(config1["netns"]) - if result["result"] != True: - raise Exception("Configuration failed.") - else: - dev1.configure() - if config2["netns"] != None: - hwaddr = dev2.get_hwaddr() - self.set_if_netns(if_id2, config2["netns"]) - - msg = {"type": "command", "method_name": "configure_interface", - "args": [if_id2, config2]} - self._server_handler.send_data_to_netns(config2["netns"], msg) - result = self._slave_server.wait_for_result(config2["netns"]) - if result["result"] != True: - raise Exception("Configuration failed.") - else: - dev2.configure() - return dev_names - - def deconfigure_if_pair(self, if_id1, if_id2): - dev1 = self._if_manager.get_mapped_device(if_id1) - dev2 = self._if_manager.get_mapped_device(if_id2) - - if dev1.get_netns() == None: - dev1.deconfigure() - else: - netns = dev1.get_netns() - - msg = {"type": "command", "method_name": "deconfigure_interface", - "args": [if_id1]} - self._server_handler.send_data_to_netns(netns, msg) - result = self._slave_server.wait_for_result(netns) - if result["result"] != True: - raise Exception("Deconfiguration failed.") - - self.return_if_netns(if_id1) - - if dev2.get_netns() == None: - dev2.deconfigure() - else: - netns = dev2.get_netns() - - msg = {"type": "command", "method_name": "deconfigure_interface", - "args": [if_id2]} - self._server_handler.send_data_to_netns(netns, msg) - result = self._slave_server.wait_for_result(netns) - if result["result"] != True: - raise Exception("Deconfiguration failed.") - - self.return_if_netns(if_id2) - - dev1.destroy() - dev2.destroy() - dev1.del_configuration() - dev2.del_configuration() - return True - - def deconfigure_interface(self, if_id): - device = self._if_manager.get_mapped_device(if_id) - if device is not None: - device.clear_configuration() - else: - logging.error("No device with id '%s' to deconfigure." % if_id) - return True + raise Exception("Device creation failed") + return {"if_index": dev.if_index, "name": dev.name} + + # def create_if_pair(self, if_id1, config1, if_id2, config2): + # dev_names = self._if_manager.create_device_pair(if_id1, config1, + # if_id2, config2) + # dev1 = self._if_manager.get_mapped_device(if_id1) + # dev2 = self._if_manager.get_mapped_device(if_id2) + + # while dev1.get_if_index() == None and dev2.get_if_index() == None: + # msgs = self._server_handler.get_messages_from_con('netlink') + # for msg in msgs: + # self._if_manager.handle_netlink_msgs(msg[1]["data"]) + + # if config1["netns"] != None: + # hwaddr = dev1.get_hwaddr() + # self.set_if_netns(if_id1, config1["netns"]) + + # msg = {"type": "command", "method_name": "configure_interface", + # "args": [if_id1, config1]} + # self._server_handler.send_data_to_netns(config1["netns"], msg) + # result = self._slave_server.wait_for_result(config1["netns"]) + # if result["result"] != True: + # raise Exception("Configuration failed.") + # else: + # dev1.configure() + # if config2["netns"] != None: + # hwaddr = dev2.get_hwaddr() + # self.set_if_netns(if_id2, config2["netns"]) + + # msg = {"type": "command", "method_name": "configure_interface", + # "args": [if_id2, config2]} + # self._server_handler.send_data_to_netns(config2["netns"], msg) + # result = self._slave_server.wait_for_result(config2["netns"]) + # if result["result"] != True: + # raise Exception("Configuration failed.") + # else: + # dev2.configure() + # return dev_names + + # def deconfigure_if_pair(self, if_id1, if_id2): + # dev1 = self._if_manager.get_mapped_device(if_id1) + # dev2 = self._if_manager.get_mapped_device(if_id2) + + # if dev1.get_netns() == None: + # dev1.deconfigure() + # else: + # netns = dev1.get_netns() + + # msg = {"type": "command", "method_name": "deconfigure_interface", + # "args": [if_id1]} + # self._server_handler.send_data_to_netns(netns, msg) + # result = self._slave_server.wait_for_result(netns) + # if result["result"] != True: + # raise Exception("Deconfiguration failed.") + + # self.return_if_netns(if_id1) + + # if dev2.get_netns() == None: + # dev2.deconfigure() + # else: + # netns = dev2.get_netns() + + # msg = {"type": "command", "method_name": "deconfigure_interface", + # "args": [if_id2]} + # self._server_handler.send_data_to_netns(netns, msg) + # result = self._slave_server.wait_for_result(netns) + # if result["result"] != True: + # raise Exception("Deconfiguration failed.") + + # self.return_if_netns(if_id2) + + # dev1.destroy() + # dev2.destroy() + # dev1.del_configuration() + # dev2.del_configuration() + # return True + + # def deconfigure_interface(self, if_id): + # device = self._if_manager.get_mapped_device(if_id) + # if device is not None: + # device.clear_configuration() + # else: + # logging.error("No device with id '%s' to deconfigure." % if_id) + # return True
def start_packet_capture(self, filt): if not is_installed("tcpdump"): @@ -448,103 +421,67 @@ class SlaveMethods:
return int(remaining)
- def run_command(self, command): - cmd = NetTestCommand(self._command_context, command, - self._resource_table, self._log_ctl) + def run_job(self, job): + job_instance = Job(job, self._log_ctl) + self._job_context.add_job(job_instance)
- if self._command_context.get_cmd(cmd.get_id()) != None: - prev_cmd = self._command_context.get_cmd(cmd.get_id()) - if not prev_cmd.get_result_sent(): - if cmd.get_id() is None: - raise Exception("Previous foreground command still "\ - "running!") - else: - raise Exception("Different command with id '%s' "\ - "still running!" % cmd.get_id()) - else: - self._command_context.del_cmd(cmd) - self._command_context.add_cmd(cmd) + res = job_instance.run()
- res = cmd.run() - if not cmd.forked(): - self._command_context.del_cmd(cmd) + return res
- if command["type"] == "config": - if res["passed"]: - self._update_system_config(res["res_data"]["options"], - command["persistent"]) - else: - err = "Error occured while setting system "\ - "configuration (%s)" % res["res_data"]["err_msg"] - logging.error(err) + def kill_job(self, job_id, signal): + job = self._job_context.get_job(job_id)
- return res + if job is None: + logging.error("No job %s found" % job_id) + return False
- def kill_command(self, id): - cmd = self._command_context.get_cmd(id) - if cmd is not None: - if not cmd.get_result_sent(): - cmd.kill(None) - result = cmd.get_result() - cmd.set_result_sent() - return result - else: - pass - else: - raise Exception("No command with id '%s'." % id) + return job.kill(signal) + + def kill_jobs(self): + logging.info("Killing all forked processes.") + self._job_context.cleanup() + return "Commands killed"
def machine_cleanup(self): logging.info("Performing machine cleanup.") - self._command_context.cleanup() + self._job_context.cleanup()
self.restore_system_config()
- devs = self._if_manager.get_mapped_devices() - for if_id, dev in devs.iteritems(): - peer = dev.get_peer() - if peer == None: - dev.clear_configuration() - else: - peer_if_index = peer.get_if_index() - peer_id = self._if_manager.get_id_by_if_index(peer_if_index) - self.deconfigure_if_pair(if_id, peer_id) - - self._if_manager.deconfigure_all() + if self._if_manager is not None: + self._if_manager.deconfigure_all()
for netns in self._net_namespaces.keys(): self.del_namespace(netns) self._net_namespaces = {}
- self._if_manager.clear_if_mapping() + for cls_name, cls in self._dynamic_classes.items(): + delattr(Devices, cls_name) + + for module_name, module in self._dynamic_modules.items(): + del sys.modules[module_name] + + self._dynamic_classes = {} + self._dynamic_modules = {} + self._if_manager = None + self._server_handler.set_if_manager(None) self._cache.del_old_entries() self._remove_capture_files() return True
- def clear_resource_table(self): - self._resource_table = {'module': {}, 'tools': {}} - return True - def has_resource(self, res_hash): if self._cache.query(res_hash): return True
return False
- def map_resource(self, res_hash, res_type, res_name): - resource_location = self._cache.get_path(res_hash) - - if not res_type in self._resource_table: - self._resource_table[res_type] = {} - - self._resource_table[res_type][res_name] = resource_location - self._cache.renew_entry(res_hash) - - return True - - def add_resource_to_cache(self, file_hash, local_path, name, - res_hash, res_type): - self._cache.add_cache_entry(file_hash, local_path, name, res_type) - return True + def add_resource_to_cache(self, res_type, local_path, name): + if res_type == "file": + self._cache.add_file_entry(local_path, name) + return True + else: + raise Exception("Unknown resource type")
def start_copy_to(self, filepath=None): if filepath in self._copy_targets: @@ -559,9 +496,9 @@ class SlaveMethods:
return filepath
- def copy_part_to(self, filepath, binary_data): + def copy_part_to(self, filepath, data): if self._copy_targets[filepath]: - self._copy_targets[filepath].write(binary_data.data) + self._copy_targets[filepath].write(data) return True
return False @@ -583,7 +520,7 @@ class SlaveMethods: return True
def copy_part_from(self, filepath, buffsize): - data = Binary(self._copy_sources[filepath].read(buffsize)) + data = self._copy_sources[filepath].read(buffsize) return data
def finish_copy_from(self, filepath): @@ -607,26 +544,27 @@ class SlaveMethods: logging.warning("====================================================") logging.warning("Enabling use of NetworkManager on controller request") logging.warning("====================================================") - val = lnst_config.get_option("environment", "use_nm") - lnst_config.set_option("environment", "use_nm", True) + val = self._slave_config.get_option("environment", "use_nm") + self._slave_config.set_option("environment", "use_nm", True) return val
def disable_nm(self): logging.warning("=====================================================") logging.warning("Disabling use of NetworkManager on controller request") logging.warning("=====================================================") - val = lnst_config.get_option("environment", "use_nm") - lnst_config.set_option("environment", "use_nm", False) + val = self._slave_config.get_option("environment", "use_nm") + self._slave_config.set_option("environment", "use_nm", False) return val
def restore_nm_option(self): - val = lnst_config.get_option("environment", "use_nm") + val = self._slave_config.get_option("environment", "use_nm") if val == self._bkp_nm_opt_val: return val logging.warning("=========================================") logging.warning("Restoring use_nm option to original value") logging.warning("=========================================") - lnst_config.set_option("environment", "use_nm", self._bkp_nm_opt_val) + self._slave_config.set_option("environment", "use_nm", + self._bkp_nm_opt_val) return val
def add_namespace(self, netns): @@ -724,40 +662,40 @@ class SlaveMethods: del self._net_namespaces[netns] return True
- def set_if_netns(self, if_id, netns): - netns_pid = self._net_namespaces[netns]["pid"] - - device = self._if_manager.get_mapped_device(if_id) - dev_name = device.get_name() - device.set_netns(netns) - hwaddr = device.get_hwaddr() - - exec_cmd("ip link set %s netns %d" % (dev_name, netns_pid)) - msg = {"type": "command", "method_name": "map_if_by_hwaddr", - "args": [if_id, hwaddr]} - self._server_handler.send_data_to_netns(netns, msg) - result = self._slave_server.wait_for_result(netns) - return result - - def return_if_netns(self, if_id): - device = self._if_manager.get_mapped_device(if_id) - if device.get_netns() == None: - dev_name = device.get_name() - ppid = os.getppid() - exec_cmd("ip link set %s netns %d" % (dev_name, ppid)) - self._if_manager.unmap_if(if_id) - return True - else: - netns = device.get_netns() - msg = {"type": "command", "method_name": "return_if_netns", - "args": [if_id]} - self._server_handler.send_data_to_netns(netns, msg) - result = self._slave_server.wait_for_result(netns) - if result["result"] != True: - raise Exception("Return from netns failed.") - - device.set_netns(None) - return True + # def set_if_netns(self, if_id, netns): + # netns_pid = self._net_namespaces[netns]["pid"] + + # device = self._if_manager.get_mapped_device(if_id) + # dev_name = device.get_name() + # device.set_netns(netns) + # hwaddr = device.get_hwaddr() + + # exec_cmd("ip link set %s netns %d" % (dev_name, netns_pid)) + # msg = {"type": "command", "method_name": "map_if_by_hwaddr", + # "args": [if_id, hwaddr]} + # self._server_handler.send_data_to_netns(netns, msg) + # result = self._slave_server.wait_for_result(netns) + # return result + + # def return_if_netns(self, if_id): + # device = self._if_manager.get_mapped_device(if_id) + # if device.get_netns() == None: + # dev_name = device.get_name() + # ppid = os.getppid() + # exec_cmd("ip link set %s netns %d" % (dev_name, ppid)) + # self._if_manager.unmap_if(if_id) + # return True + # else: + # netns = device.get_netns() + # msg = {"type": "command", "method_name": "return_if_netns", + # "args": [if_id]} + # self._server_handler.send_data_to_netns(netns, msg) + # result = self._slave_server.wait_for_result(netns) + # if result["result"] != True: + # raise Exception("Return from netns failed.") + + # device.set_netns(None) + # return True
def add_br_vlan(self, if_id, br_vlan_info): dev = self._if_manager.get_mapped_device(if_id) @@ -847,48 +785,8 @@ class SlaveMethods: brt.set_state(br_state_info) return True
- def set_speed(self, if_id, speed): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.set_speed(speed) - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - - def set_autoneg(self, if_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.set_autoneg() - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - - def wait_interface_init(self): - self._if_manager.wait_interface_init() - return True - - def slave_add(self, if_id, slave_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.slave_add(slave_id) - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - - def slave_del(self, if_id, slave_id): - dev = self._if_manager.get_mapped_device(if_id) - if dev is not None: - dev.slave_del(slave_id) - else: - logging.error("Device with id '%s' not found." % if_id) - return False - return True - class ServerHandler(ConnectionHandler): - def __init__(self, addr): + def __init__(self, addr, slave_config): super(ServerHandler, self).__init__() self._netns_con_mapping = {} try: @@ -902,13 +800,34 @@ class ServerHandler(ConnectionHandler):
self._netns = None self._c_socket = None + self._c_dev = None
self._if_manager = None
- self._security = lnst_config.get_section_values("security") + self._security = slave_config.get_section_values("security")
def set_if_manager(self, if_manager): self._if_manager = if_manager + self._update_c_dev() + + def _update_c_dev(self): + if self._c_dev: + self._c_dev.enable() + self._c_dev = None + + if self._if_manager is not None: + ctl_socket = self.get_ctl_sock() + ctl_addr = ctl_socket._socket.getsockname()[0] + matched_dev = None + for dev in self._if_manager.get_devices(): + for ip in dev.ips: + if ip.addr == ctl_addr: + matched_dev = dev + break + if matched_dev: + break + self._c_dev = matched_dev + matched_dev.disable()
def accept_connection(self): self._c_socket, addr = self._s_socket.accept() @@ -932,9 +851,9 @@ class ServerHandler(ConnectionHandler):
def set_ctl_sock(self, sock): if self._c_socket != None: - self._c_socket.close() - self._c_socket = None + self.close_c_sock() self._c_socket = sock + self._update_c_dev() self.add_connection(self._c_socket[1], self._c_socket[0])
def close_s_sock(self): @@ -946,9 +865,14 @@ class ServerHandler(ConnectionHandler): self.remove_connection(self._c_socket[0]) self._c_socket = None
+ if self._c_dev: + self._c_dev.enable() + self._c_dev = None + def check_connections(self): msgs = super(ServerHandler, self).check_connections() - if 'netlink' not in self._connection_mapping: + if 'netlink' not in self._connection_mapping and\ + self._if_manager is not None: self._if_manager.reconnect_netlink() self.add_connection('netlink', self._if_manager.get_nl_socket()) return msgs @@ -970,7 +894,7 @@ class ServerHandler(ConnectionHandler): addr = self._c_socket[1] if self.get_connection(addr) == None: logging.info("Lost controller connection.") - self._c_socket = None + self.close_c_sock() return messages
def get_messages_from_con(self, con_id): @@ -1026,23 +950,72 @@ class ServerHandler(ConnectionHandler): self._connections.remove(con) self._netns_con_mapping = {}
+ +def device_to_deviceref(obj): + try: + Device = Devices.Device + except: + return obj + + if isinstance(obj, Device): + dev_ref = DeviceRef(obj.if_index) + return dev_ref + elif isinstance(obj, dict): + new_dict = {} + for key, value in obj.items(): + new_dict[key] = device_to_deviceref(value) + return new_dict + elif isinstance(obj, list): + new_list = [] + for value in obj: + new_list.append(device_to_deviceref(value)) + return new_list + elif isinstance(obj, tuple): + new_list = [] + for value in obj: + new_list.append(device_to_deviceref(value)) + return tuple(new_list) + else: + return obj + +def deviceref_to_device(if_manager, obj): + if isinstance(obj, DeviceRef): + dev = if_manager.get_device(obj.if_index) + return dev + elif isinstance(obj, dict): + new_dict = {} + for key, value in obj.items(): + new_dict[key] = deviceref_to_device(if_manager, value) + return new_dict + elif isinstance(obj, list): + new_list = [] + for value in obj: + new_list.append(deviceref_to_device(if_manager, value)) + return new_list + elif isinstance(obj, tuple): + new_list = [] + for value in obj: + new_list.append(deviceref_to_device(if_manager, value)) + return tuple(new_list) + else: + return obj + class NetTestSlave: - def __init__(self, log_ctl): + def __init__(self, log_ctl, slave_config): + self._slave_config = slave_config die_when_parent_die()
- self._cmd_context = NetTestCommandContext() - port = lnst_config.get_option("environment", "rpcport") + self._job_context = JobContext() + port = slave_config.get_option("environment", "rpcport") logging.info("Using RPC port %d." % port) - self._server_handler = ServerHandler(("", port)) - self._if_manager = InterfaceManager(self._server_handler) - - self._server_handler.set_if_manager(self._if_manager) + self._server_handler = ServerHandler(("", port), slave_config)
self._net_namespaces = {}
- self._methods = SlaveMethods(self._cmd_context, log_ctl, - self._if_manager, self._net_namespaces, - self._server_handler, self) + self._methods = SlaveMethods(self._job_context, log_ctl, + self._net_namespaces, + self._server_handler, slave_config, + self)
self.register_die_signal(signal.SIGHUP) self.register_die_signal(signal.SIGINT) @@ -1052,9 +1025,6 @@ class NetTestSlave:
self._log_ctl = log_ctl
- self._server_handler.add_connection('netlink', - self._if_manager.get_nl_socket()) - def run(self): while not self._finished: if self._server_handler.get_ctl_sock() == None: @@ -1063,9 +1033,9 @@ class NetTestSlave: logging.info("Waiting for connection.") self._server_handler.accept_connection() except (socket.error, SecSocketException): + log_exc_traceback() continue - self._log_ctl.set_connection( - self._server_handler.get_ctl_sock()) + self._log_ctl.set_connection(self._server_handler.get_ctl_sock())
msgs = self._server_handler.get_messages()
@@ -1092,24 +1062,29 @@ class NetTestSlave: if msg["type"] == "command": method = getattr(self._methods, msg["method_name"], None) if method != None: + if_manager = self._methods._if_manager + if if_manager is not None: + args = deviceref_to_device(if_manager, msg["args"]) + kwargs = deviceref_to_device(if_manager, msg["kwargs"]) + else: + args = msg["args"] + kwargs = msg["kwargs"] + try: - result = method(*msg["args"]) - except: + result = method(*args, **kwargs) + except LnstError as e: log_exc_traceback() - type, value, tb = sys.exc_info() - exc_trace = ''.join(traceback.format_exception(type, - value, tb)) - response = {"type": "exception", "Exception": value} + response = {"type": "exception", "Exception": e}
self._server_handler.send_data_to_ctl(response) return
- if result != None: - response = {"type": "result", "result": result} - self._server_handler.send_data_to_ctl(response) + response = {"type": "result", "result": result} + response = device_to_deviceref(response) + self._server_handler.send_data_to_ctl(response) else: - err = "Method '%s' not supported." % msg["method_name"] - response = {"type": "error", "err": err} + err = LnstError("Method '%s' not supported." % msg["method_name"]) + response = {"type": "exception", "Exception": err} self._server_handler.send_data_to_ctl(response) elif msg["type"] == "log": logger = logging.getLogger() @@ -1122,48 +1097,36 @@ class NetTestSlave: else: logging.debug("Recieved an exception from foreground command") logging.debug(msg["Exception"]) - cmd = self._cmd_context.get_cmd(msg["cmd_id"]) - cmd.join() - self._cmd_context.del_cmd(cmd) + job = self._job_context.get_cmd(msg["job_id"]) + job.join() + self._job_context.del_cmd(job) + self._server_handler.send_data_to_ctl(msg) + elif msg["type"] == "job_finished": + job = self._job_context.get_job(msg["job_id"]) + job.join() + + job.set_finished(msg["result"]) self._server_handler.send_data_to_ctl(msg) - elif msg["type"] == "result": - if msg["cmd_id"] == None: - del msg["cmd_id"] - self._server_handler.send_data_to_ctl(msg) - cmd = self._cmd_context.get_cmd(None) - cmd.join() - cmd.set_result_sent() - else: - cmd = self._cmd_context.get_cmd(msg["cmd_id"]) - cmd.join() - del msg["cmd_id"] - - cmd.set_result(msg["result"]) - if cmd.finished(): - msg["result"] = cmd.get_result() - self._server_handler.send_data_to_ctl(msg) - cmd.set_result_sent() elif msg["type"] == "netlink": - self._if_manager.handle_netlink_msgs(msg["data"]) + if_manager = self._methods._if_manager + if if_manager is not None: + if_manager.handle_netlink_msgs(msg["data"]) elif msg["type"] == "from_netns": self._server_handler.send_data_to_ctl(msg["data"]) elif msg["type"] == "to_netns": netns = msg["netns"] try: self._server_handler.send_data_to_netns(netns, msg["data"]) - except: + except LnstError as e: log_exc_traceback() - type, value, tb = sys.exc_info() - exc_trace = ''.join(traceback.format_exception(type, - value, tb)) - response = {"type": "exception", "Exception": value} + response = {"type": "exception", "Exception": e}
self._server_handler.send_data_to_ctl(response) return else: raise Exception("Recieved unknown command")
- pipes = self._cmd_context.get_read_pipes() + pipes = self._job_context.get_parent_pipes() self._server_handler.update_connections(pipes)
def register_die_signal(self, signum):