From: Ondrej Lichtner <olichtne(a)redhat.com>
This commit heavily reimplements the lnst.Controller.Machine and
lnst.Slave.NetTestSlave modules. Since these modules directly depend on
each other, these changes are introduced together in the same commit.
On the Controller side:
* this switches the Machine class implementation from working with the
old XML recipe code to work purely with the Python recipe
implementation, where the Machine class is instantiated by a
SlavePoolManager and is used by the tester through the Host class
providing a tester facing API.
* the Machine class now needs to synchronize different resources to the
slave, namely:
* Device classes from the lnst.Devices package
* test module code, which is instantiated on the Controller and send
to the Slave when required (including all parent classes required
for the reconstruction of the object on the Slave).
* In the future this can be extended to also send the
InterfaceManager implementation, and maybe even SlaveMethods
implementation.
* includes methods relaying calling the Device methods
* Removes the old Interface implementation since this is all handled by
the new Device classes. And the original simple Device class.
* refactors the rpc_call method to only have one implementation for
everything (including network namespaces, though they're not supported
atm...).
* switches to using the new Job implementation.
On the Slave side:
* accept Device, and test module classes, all the dynamically received
classes are stored in the new Resource cache. If a dynamically received
class is imported during recipe execution it is also cleaned up when the
recipe finishes execution.
* ServerHandler now translates Device objects into Device references
when sending these objects over sockets.
* switches to using the new Job implementation.
* comments out old code that has not been refactored and is not
supported yet with the python recipe implementation - mostly network
namespaces.
* all exceptions are now transferred to the Controller where they'll be
reconstructed and raised so they can be handled there (unless they can
be handled on the Slave)
Signed-off-by: Ondrej Lichtner <olichtne(a)redhat.com>
---
v2:
* improved class and module synchronization implementation
* join split lines in deviceref_to_device method
---
lnst/Controller/Machine.py | 1327 ++++++++++----------------------------------
lnst/Slave/NetTestSlave.py | 829 +++++++++++++--------------
2 files changed, 678 insertions(+), 1478 deletions(-)
diff --git a/lnst/Controller/Machine.py b/lnst/Controller/Machine.py
index 3bfcf82..f057390 100644
--- a/lnst/Controller/Machine.py
+++ b/lnst/Controller/Machine.py
@@ -14,17 +14,24 @@ rpazdera(a)redhat.com (Radek Pazdera)
import logging
import socket
import os
+import sys
import tempfile
import signal
from time import sleep
from xmlrpclib import Binary
-from functools import wraps
from lnst.Common.NetUtils import normalize_hwaddr
+from lnst.Common.Utils import sha256sum
from lnst.Common.Utils import wait_for, create_tar_archive
from lnst.Common.Utils import check_process_running
+from lnst.Common.TestModule import BaseTestModule
from lnst.Common.NetTestCommand import DEFAULT_TIMEOUT
+from lnst.Common.DeviceError import DeviceDeleted, DeviceNotFound
from lnst.Controller.Common import ControllerError
from lnst.Controller.CtlSecSocket import CtlSecSocket
+from lnst.Devices import device_classes
+from lnst.Devices.Device import Device
+from lnst.Devices.RemoteDevice import RemoteDevice
+from lnst.Devices.VirtualDevice import VirtualDevice
# conditional support for libvirt
if check_process_running("libvirtd"):
@@ -51,7 +58,6 @@ class Machine(object):
self._ctl_config = ctl_config
self._slave_desc = None
self._connection = None
- self._configured = False
self._system_config = {}
self._security = security
self._security["identity"] =
ctl_config.get_option("security",
@@ -76,8 +82,19 @@ class Machine(object):
self._interfaces = []
self._namespaces = []
self._bg_cmds = {}
+ self._jobs = {}
+ self._job_id_seq = 0
self._device_database = {}
+ self._tmp_device_database = []
+
+ self._init_connection()
+
+ def set_id(self, new_id):
+ self._id = new_id
+
+ def get_id(self):
+ return self._id
def get_configuration(self):
configuration = {}
@@ -87,150 +104,96 @@ class Machine(object):
configuration["redhat_release"] =
self._slave_desc["redhat_release"]
configuration["interfaces"] = {}
- for i in self._interfaces:
- if not isinstance(i, UnusedInterface):
- configuration["interface_"+i.get_id()] = i.get_config()
+ for dev in self._device_database.items():
+ configuration["device_"+dev.name] = dev.get_config()
return configuration
- def _if_id_exists(self, if_id):
- for iface in self._interfaces:
- if if_id == iface.get_id():
- return True
- return False
+ def add_tmp_device(self, dev):
+ self._tmp_device_database.append(dev)
+
+ def create_remote_device(self, dev):
+ dev_clsname = dev._dev_cls.__name__
+ dev_args = dev._dev_args
+ dev_kwargs = dev._dev_kwargs
+ ret = self.rpc_call("create_device", clsname=dev_clsname,
+ args=dev_args,
+ kwargs=dev_kwargs)
+ dev.if_index = ret["if_index"]
+ self._device_database[ret["if_index"]] = dev
+
+ def device_created(self, dev_data):
+ if_index = dev_data["if_index"]
+ if if_index not in self._device_database:
+ new_dev = None
+ if len(self._tmp_device_database) > 0:
+ for dev in self._tmp_device_database:
+ if dev._match_update_data(dev_data):
+ new_dev = dev
+ break
- def _generate_if_id(self, if_type):
- i = 0
- while True:
- if_id = "gen_%s_%d" % (if_type, i)
- if not self._if_id_exists(if_id):
- break
- i += 1
- return if_id
-
- def _add_interface(self, if_id, if_type, cls):
- if if_id != None:
- if self._if_id_exists(if_id):
- msg = "Interface '%s' already exists on machine
'%s'" \
- % (if_id, self._id)
- raise MachineError(msg)
- else:
- if_id = self._generate_if_id(if_type)
+ if new_dev is None:
+ new_dev = RemoteDevice(Device)
+ new_dev.host = self
+ new_dev.if_index = if_index
+ else:
+ self._tmp_device_database.remove(new_dev)
- iface = cls(self, if_id, if_type)
- self._interfaces.append(iface)
- return iface
+ new_dev.if_index = dev_data["if_index"]
- def remove_interface(self, if_id):
- iface = self.get_interface(if_id)
- self._interfaces.remove(iface)
+ self._device_database[if_index] = new_dev
- def interface_update(self, if_data):
- try:
- iface = self.get_interface(if_data["if_id"])
- except:
- iface = None
- if iface:
- iface.update(if_data['if_data'])
+ def device_delete(self, dev_data):
+ if dev_data["if_index"] in self._device_database:
+ self._device_database[dev_data["if_index"]].deleted = True
- if if_data["if_data"]["if_index"] in self._device_database:
- dev =
self._device_database[if_data["if_data"]["if_index"]]
- dev.update_data(if_data['if_data'])
+ def dev_db_get_if_index(self, if_index):
+ if if_index in self._device_database:
+ return self._device_database[if_index]
else:
- dev = Device(if_data["if_data"], self)
- self._device_database[if_data["if_data"]["if_index"]] =
dev
-
- def dev_db_delete(self, update_msg):
- if update_msg["if_index"] in self._device_database:
- del self._device_database[update_msg["if_index"]]
+ return None
def dev_db_get_name(self, dev_name):
+ #TODO move these to Slave to optimize quering for each device
for if_index, dev in self._device_database.iteritems():
if dev.get_name() == dev_name:
return dev
return None
- #
- # Factory methods for constructing interfaces on this machine. The
- # types of interfaces are explained with the classes below.
- #
- def new_static_interface(self, if_id, if_type):
- return self._add_interface(if_id, if_type, StaticInterface)
-
- def new_unused_interface(self, if_type):
- return self._add_interface(None, if_type, UnusedInterface)
-
- def new_virtual_interface(self, if_id, if_type):
- return self._add_interface(if_id, if_type, VirtualInterface)
-
- def new_soft_interface(self, if_id, if_type):
- return self._add_interface(if_id, if_type, SoftInterface)
-
- def new_loopback_interface(self, if_id):
- return self._add_interface(if_id, 'lo', LoopbackInterface)
-
- def get_interface(self, if_id):
- for iface in self._interfaces:
- if iface.get_id != None and if_id == iface.get_id():
- return iface
-
- msg = "Interface '%s' not found on machine '%s'" %
(if_id, self._id)
- raise MachineError(msg)
-
- def get_interfaces(self):
- return self._interfaces
-
- def get_ordered_interfaces(self):
- ordered_list = list(self._interfaces)
- change = True
- while change:
- change = False
- swap = False
- ind1 = 0
- ind2 = 0
- for i in ordered_list:
- master = i.get_primary_master()
- if master != None:
- ind1 = ordered_list.index(i)
- ind2 = ordered_list.index(master)
- if ind1 > ind2:
- swap = True
- break
- if swap:
- change = True
- tmp = ordered_list[ind1]
- ordered_list[ind1] = ordered_list[ind2]
- ordered_list[ind2] = tmp
- return ordered_list
-
- def _rpc_call(self, method_name, *args):
- data = {"type": "command", "method_name":
method_name, "args": args}
-
- self._msg_dispatcher.send_message(self._id, data)
- result = self._msg_dispatcher.wait_for_result(self._id)
-
- return result
+ def get_dev_by_hwaddr(self, hwaddr):
+ #TODO move these to Slave to optimize quering for each device
+ for if_index, dev in self._device_database.iteritems():
+ if dev.hwaddr == hwaddr:
+ return dev
+ return None
- def _rpc_call_to_netns(self, netns, method_name, *args):
- data = {"type": "command", "method_name":
method_name, "args": args}
- msg = {"type": "to_netns", "netns": netns,
"data": data}
+ def rpc_call(self, method_name, *args, **kwargs):
+ if "netns" in kwargs and kwargs["netns"] is not None:
+ netns = kwargs["netns"]
+ del kwargs["netns"]
+ msg = {"type": "to_netns",
+ "netns": netns,
+ "data": {"type": "command",
+ "method_name": method_name,
+ "args": args,
+ "kwargs": kwargs}}
+ else:
+ if "netns" in kwargs:
+ del kwargs["netns"]
+ msg = {"type": "command",
+ "method_name": method_name,
+ "args": args,
+ "kwargs": kwargs}
- self._msg_dispatcher.send_message(self._id, msg)
- result = self._msg_dispatcher.wait_for_result(self._id)
+ self._msg_dispatcher.send_message(self, msg)
+ result = self._msg_dispatcher.wait_for_result(self)
return result
- def _rpc_call_x(self, netns, method_name, *args):
- if not netns:
- return self._rpc_call(method_name, *args)
- return self._rpc_call_to_netns(netns, method_name, *args)
-
- def init_connection(self, recipe_name):
+ def _init_connection(self):
""" Initialize the slave connection
- Calling this method will initialize the rpc connection to the
- machine and initialize all the interfaces. Note, that it will
- *not* configure the interfaces. They need to be configured
- individually later on.
+ This will connect to the Slave, get it's description (should be
+ usable for matching), and checks version compatibility
"""
hostname = self._hostname
port = self._port
@@ -242,7 +205,7 @@ class Machine(object):
self._msg_dispatcher.add_slave(self, connection)
- hello, slave_desc = self._rpc_call("hello", recipe_name)
+ hello, slave_desc = self.rpc_call("hello")
if hello != "hello":
msg = "Unable to establish RPC connection " \
"to machine %s, handshake failed!" % hostname
@@ -266,14 +229,45 @@ class Machine(object):
self._slave_desc = slave_desc
- devices = self._rpc_call("get_devices")
+ def set_recipe(self, recipe_name):
+ """ Reserves the machine for the specified recipe
+
+ Also sends Device classes from the controller and initializes the
+ InterfaceManager on the Slave and builds the device database.
+ """
+ self.rpc_call("set_recipe", recipe_name)
+ self._send_device_classes()
+ self.rpc_call("init_if_manager")
+
+ devices = self.rpc_call("get_devices")
for if_index, dev in devices.items():
- self._device_database[if_index] = Device(dev, self)
+ remote_dev = RemoteDevice(Device)
+ remote_dev.host = self
+ remote_dev.if_index = if_index
- for iface in self._interfaces:
- iface.initialize()
+ self._device_database[if_index] = remote_dev
+
+ def _send_device_classes(self):
+ classes = []
+ for cls_name, cls in device_classes:
+ classes.extend(reversed(self._get_base_classes(cls)))
+
+ for cls in classes:
+ if cls is object:
+ continue
+ module_name = cls.__module__
+ module = sys.modules[module_name]
+ filename = module.__file__
- self._configured = True
+ if filename[-3:] == "pyc":
+ filename = filename[:-1]
+
+ res_hash = self.sync_resource(module_name, filename)
+ self.rpc_call("load_cached_module", module_name, res_hash)
+
+ for cls_name, cls in device_classes:
+ module_name = cls.__module__
+ self.rpc_call("map_device_class", cls_name, module_name)
def is_git_version(self, version):
try:
@@ -282,10 +276,12 @@ class Machine(object):
except ValueError:
return True
- def is_configured(self):
- """ Test if the machine was configured """
-
- return self._configured
+ def cleanup_devices(self):
+ self.rpc_call("destroy_devices")
+ for dev in self._device_database.values():
+ if isinstance(dev, VirtualDevice):
+ dev.destroy()
+ self._device_database = {}
def cleanup(self):
""" Clean the machine up
@@ -295,114 +291,144 @@ class Machine(object):
all the interfaces that have been configured on the machine,
and finalize and close the rpc connection to the machine.
"""
- if not self._configured:
- return
-
#connection to the slave was closed
- if not self._msg_dispatcher.get_connection(self._id):
+ if not self._msg_dispatcher.get_connection(self):
return
- ordered_ifaces = self.get_ordered_interfaces()
try:
#dump statistics
- for iface in self._interfaces:
- # Getting stats only from real interfaces
- if isinstance(iface, UnusedInterface):
- continue
- stats = iface.link_stats()
- logging.debug("%s:%s:%s: RX:\t bytes: %d\t packets: %d\t dropped:
%d" %
- (iface.get_netns(), iface.get_host(), iface.get_id(),
- stats["rx_bytes"], stats["rx_packets"],
stats["rx_dropped"]))
- logging.debug("%s:%s:%s: TX:\t bytes: %d\t packets: %d\t dropped:
%d" %
- (iface.get_netns(), iface.get_host(), iface.get_id(),
- stats["tx_bytes"], stats["tx_packets"],
stats["tx_dropped"]))
-
- self._rpc_call("kill_cmds")
+ # for iface in self._interfaces:
+ # # Getting stats only from real interfaces
+ # if isinstance(iface, UnusedInterface):
+ # continue
+ # stats = iface.link_stats()
+ # logging.debug("%s:%s:%s: RX:\t bytes: %d\t packets: %d\t dropped:
%d" %
+ # (iface.get_netns(), iface.get_host(), iface.get_id(),
+ # stats["rx_bytes"],
stats["rx_packets"], stats["rx_dropped"]))
+ # logging.debug("%s:%s:%s: TX:\t bytes: %d\t packets: %d\t dropped:
%d" %
+ # (iface.get_netns(), iface.get_host(), iface.get_id(),
+ # stats["tx_bytes"],
stats["tx_packets"], stats["tx_dropped"]))
+
+ self.rpc_call("kill_jobs")
for netns in self._namespaces:
- self._rpc_call_to_netns(netns, "kill_cmds")
+ self.rpc_call("kill_jobs", netns=netns)
self.restore_system_config()
-
- ordered_ifaces.reverse()
- for iface in ordered_ifaces:
- iface.deconfigure()
- for iface in ordered_ifaces:
- iface.cleanup()
-
- self.del_namespaces()
-
- self.restore_nm_option()
- self._rpc_call("bye")
+ self.cleanup_devices()
+ # self.del_namespaces()
+ # self.restore_nm_option()
+ self.rpc_call("bye")
except:
#cleanup is only meaningful on dynamic interfaces, and should
#always be called when deconfiguration happens- especially
#when something on the slave breaks during deconfiguration
- for iface in ordered_ifaces:
- if not isinstance(iface, VirtualInterface):
- continue
- iface.cleanup()
+ self.cleanup_devices()
raise
- finally:
- self._msg_dispatcher.disconnect_slave(self.get_id())
-
- self._configured = False
def _timeout_handler(self, signum, frame):
- msg = "RPC connection to machine %s timed out" % self.get_id()
+ msg = "Timeout expired on machine %s" % self.get_id()
raise MachineError(msg)
- def run_command(self, command):
- """ Run a command on the machine """
+ def _get_base_classes(self, cls):
+ new_bases = [cls] + list(cls.__bases__)
+ bases = []
+ while len(new_bases) != len(bases):
+ bases = new_bases
+ new_bases = list(bases)
+ for b in bases:
+ for bs in b.__bases__:
+ if bs not in new_bases:
+ new_bases.append(bs)
+ return new_bases
+
+ def run_job(self, job):
+ job.id = self._job_id_seq
+ self._job_id_seq += 1
+ self._jobs[job.id] = job
+
+ if job._type == "module":
+ classes = [job._what]
+ classes.extend(self._get_base_classes(job._what.__class__))
+
+ for cls in reversed(classes):
+ if cls is object or cls is BaseTestModule:
+ continue
+ m_name = cls.__module__
+ m = sys.modules[m_name]
+ filename = m.__file__
+ if filename[-3:] == "pyc":
+ filename = filename[:-1]
- prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
+ res_hash = self.sync_resource(m_name, filename)
- if 'bg_id' in command:
- self._bg_cmds[command['bg_id']] = command
- if command["type"] in ["wait", "intr",
"kill"]:
- bg_cmd = self._bg_cmds[command["proc_id"]]
- if bg_cmd["netns"] != None:
- command["netns"] = bg_cmd["netns"]
-
- netns = command["netns"]
- if command["type"] == "wait":
- logging.debug("Get remaining time of bg process with bg_id == %s"
- % command["proc_id"])
- remaining_time = self._rpc_call_x(netns, "get_remaining_time",
- command["proc_id"])
- logging.debug("Setting timeout to %d", remaining_time)
- if remaining_time > 0:
- signal.alarm(remaining_time)
- else:
- # 2 seconds is enough time to do wait via RPC and collect
- # the result
- signal.alarm(2)
- else:
- if "timeout" in command:
- timeout = command["timeout"]
- logging.debug("Setting timeout to \"%d\"", timeout)
- signal.alarm(timeout)
- else:
- logging.debug("Setting default timeout (%ds)." %
DEFAULT_TIMEOUT)
- signal.alarm(DEFAULT_TIMEOUT)
+ self.rpc_call("load_cached_module", m_name, res_hash)
+
+ logging.info("Host %s executing job %d: %s" %
+ (self._id, job.id, str(job)))
+ if job._desc is not None:
+ logging.info("Job description: %s" % job._desc)
+
+ return self.rpc_call("run_job", job._to_dict(), netns=job._netns)
+
+ def wait_for_job(self, job, timeout):
+ res = True
+ if job.id not in self._jobs:
+ raise MachineError("No job '%s' running on Machine %s" %
+ (job.id, self._id))
+
+ prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
+ signal.alarm(timeout)
try:
- cmd_res = self._rpc_call_x(netns, "run_command", command)
+ if timeout > 0:
+ logging.info("Waiting for Job %d on Host %s for %d seconds." %
+ (job.id, self._id, timeout))
+ elif timeout == 0:
+ logging.info("Waiting for Job %d on Host %s." %
+ (job.id, self._id))
+ result = self._msg_dispatcher.wait_for_finish(self, job.id)
except MachineError as exc:
- if "proc_id" in command:
- cmd_res = self._rpc_call_x(netns, "kill_command",
- command["proc_id"])
- else:
- cmd_res = self._rpc_call_x(netns, "kill_command",
- None)
+ logging.error(str(exc))
+ res = False
- if "killed" in cmd_res and cmd_res["killed"]:
- cmd_res["passed"] = False
- cmd_res["msg"] = str(exc)
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+
+ return res
+
+ def wait_for_tmp_devices(self, timeout):
+ res = False
+ prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
+ signal.alarm(timeout)
+
+ try:
+ if timeout > 0:
+ logging.info("Waiting for Device creation Host %s for %d
seconds." %
+ (self._id, timeout))
+ elif timeout == 0:
+ logging.info("Waiting for Device creation on Host %s." %
+ (self._id))
+
+ while len(self._tmp_device_database) > 0:
+ result = self._msg_dispatcher.handle_messages()
+ except MachineError as exc:
+ logging.error(str(exc))
+ res = False
signal.alarm(0)
signal.signal(signal.SIGALRM, prev_handler)
+ return res
+
+ def job_finished(self, msg):
+ job_id = msg["job_id"]
+ job = self._jobs[job_id]
+ job._res = msg["result"]
- return cmd_res
+ def kill(self, job, signal):
+ if job.id not in self._jobs:
+ raise MachineError("No job '%s' running on Machine %s" %
+ (job.id(), self._id))
+ return self.rpc_call("kill_job", job.id, signal, netns=job.netns)
def get_hostname(self):
""" Get hostname/ip of the machine
@@ -415,13 +441,6 @@ class Machine(object):
def get_libvirt_domain(self):
return self._libvirt_domain
- def get_id(self):
- """ Returns machine's id as defined in the recipe
"""
- return self._id
-
- def set_rpc(self, dispatcher):
- self._msg_dispatcher = dispatcher
-
def get_mac_pool(self):
if self._mac_pool:
return self._mac_pool
@@ -432,9 +451,9 @@ class Machine(object):
self._mac_pool = mac_pool
def restore_system_config(self):
- self._rpc_call("restore_system_config")
+ self.rpc_call("restore_system_config")
for netns in self._namespaces:
- self._rpc_call_to_netns(netns, "restore_system_config")
+ self.rpc_call("restore_system_config", netns=netns)
return True
def set_network_bridges(self, bridges):
@@ -459,7 +478,7 @@ class Machine(object):
tmp = {}
for netns in namespaces:
- tmp.update(self._rpc_call_x(netns, "start_packet_capture",
""))
+ tmp.update(self.rpc_call("start_packet_capture", "",
netns=netns))
return tmp
def stop_packet_capture(self):
@@ -468,10 +487,10 @@ class Machine(object):
namespaces.add(iface.get_netns())
for netns in namespaces:
- self._rpc_call_x(netns, "stop_packet_capture")
+ self.rpc_call("stop_packet_capture", netns=netns)
def copy_file_to_machine(self, local_path, remote_path=None, netns=None):
- remote_path = self._rpc_call_x(netns, "start_copy_to", remote_path)
+ remote_path = self.rpc_call("start_copy_to", remote_path, netns=netns)
f = open(local_path, "rb")
@@ -480,14 +499,14 @@ class Machine(object):
if len(data) == 0:
break
- self._rpc_call_x(netns, "copy_part_to", remote_path, Binary(data))
+ self.rpc_call("copy_part_to", remote_path, data, netns=netns)
- self._rpc_call_x(netns, "finish_copy_to", remote_path)
+ self.rpc_call("finish_copy_to", remote_path, netns=netns)
return remote_path
def copy_file_from_machine(self, remote_path, local_path):
- status = self._rpc_call("start_copy_from", remote_path)
+ status = self.rpc_call("start_copy_from", remote_path)
if not status:
raise MachineError("The requested file cannot be transfered." \
"It does not exist on machine %s" % self.get_id())
@@ -495,836 +514,54 @@ class Machine(object):
local_file = open(local_path, "wb")
buf_size = 1024*1024 # 1MB buffer
- binary = "next"
- while binary != "":
- binary = self._rpc_call("copy_part_from", remote_path, buf_size)
- local_file.write(binary.data)
+ while True:
+ data = self.rpc_call("copy_part_from", remote_path, buf_size)
+ if data == "":
+ break
+ local_file.write(data)
local_file.close()
- self._rpc_call("finish_copy_from", remote_path)
-
- def sync_resources(self, required):
- self._rpc_call("clear_resource_table")
-
- for res_type, resources in required.iteritems():
- for res_name, res in resources.iteritems():
- has_resource = self._rpc_call("has_resource",
res["hash"])
- if not has_resource:
- msg = "Transfering %s %s to machine %s" % \
- (res_name, res_type, self.get_id())
- logging.info(msg)
+ self.rpc_call("finish_copy_from", remote_path)
- local_path = required[res_type][res_name]["path"]
+ def sync_resource(self, res_name, file_path):
+ digest = sha256sum(file_path)
- if res_type == "tools":
- archive = tempfile.NamedTemporaryFile(delete=False)
- archive_path = archive.name
- archive.close()
+ if not self.rpc_call("has_resource", digest):
+ msg = "Transfering %s to machine %s as '%s'" % (file_path,
+ self.get_id(),
+ res_name)
+ logging.debug(msg)
- create_tar_archive(local_path, archive_path, True)
- local_path = archive_path
+ remote_path = self.copy_file_to_machine(file_path)
+ self.rpc_call("add_resource_to_cache",
+ "file", remote_path, res_name)
+ return digest
- remote_path = self.copy_file_to_machine(local_path)
- self._rpc_call("add_resource_to_cache",
res["hash"],
- remote_path, res_name, res["path"],
res_type)
+ # def enable_nm(self):
+ # return self._rpc_call("enable_nm")
- for ns in self._namespaces:
- remote_path = self.copy_file_to_machine(local_path,
- netns=ns)
- self._rpc_call_to_netns(ns, "add_resource_to_cache",
- res["hash"], remote_path, res_name,
res["path"],
- res_type)
+ # def disable_nm(self):
+ # return self._rpc_call("disable_nm")
- if res_type == "tools":
- os.unlink(archive_path)
-
- self._rpc_call("map_resource", res["hash"], res_type,
res_name)
- for ns in self._namespaces:
- self._rpc_call_to_netns(ns, "map_resource",
res["hash"],
- res_type, res_name)
-
- def enable_nm(self):
- return self._rpc_call("enable_nm")
-
- def disable_nm(self):
- return self._rpc_call("disable_nm")
-
- def restore_nm_option(self):
- return self._rpc_call("restore_nm_option")
+ # def restore_nm_option(self):
+ # return self._rpc_call("restore_nm_option")
def __str__(self):
return "[Machine hostname(%s) libvirt_domain(%s) interfaces(%d)]" % \
(self._hostname, self._libvirt_domain, len(self._interfaces))
- def add_netns(self, netns):
- self._namespaces.append(netns)
- return self._rpc_call("add_namespace", netns)
+ # def add_netns(self, netns):
+ # self._namespaces.append(netns)
+ # return self._rpc_call("add_namespace", netns)
- def del_netns(self, netns):
- return self._rpc_call("del_namespace", netns)
-
- def del_namespaces(self):
- for netns in self._namespaces:
- self.del_netns(netns)
- self._namespaces = []
- return True
+ # def del_netns(self, netns):
+ # return self._rpc_call("del_namespace", netns)
- def wait_interface_init(self):
- return self._rpc_call("wait_interface_init")
+ # def del_namespaces(self):
+ # for netns in self._namespaces:
+ # self.del_netns(netns)
+ # self._namespaces = []
+ # return True
def get_security(self):
return self._security
-
-class Interface(object):
- """ Abstraction of a test network interface on a slave machine
-
- This is a base class for object that represent test interfaces
- on a test machine.
- """
- def __init__(self, machine, if_id, if_type):
- self._machine = machine
- self._configured = False
-
- self._id = if_id
- self._type = if_type
-
- self._hwaddr = None
- self._devname = None
- self._network = None
- self._netem = None
-
- self._slaves = {}
- self._slave_options = {}
- self._addresses = []
- self._options = []
-
- self._master = {"primary": None, "other": []}
-
- self._ovs_conf = None
-
- self._netns = None
- self._peer = None
- self._mtu = None
- self._driver = None
- self._devlink = None
-
- def get_id(self):
- return self._id
-
- def get_type(self):
- return self._type
-
- def get_driver(self):
- return self._driver
-
- def set_hwaddr(self, hwaddr):
- self._hwaddr = normalize_hwaddr(hwaddr)
-
- def get_hwaddr(self):
- if not self._hwaddr:
- msg = "Hardware address is not available for interface
'%s'" \
- % self.get_id()
- raise MachineError(msg)
- return self._hwaddr
-
- def set_devname(self, devname):
- self._devname = devname
-
- def get_devname(self):
- if not self._devname:
- msg = "Device name is not available for interface '%s'" \
- % self.get_id()
- raise MachineError(msg)
- return self._devname
-
- def set_network(self, network):
- self._network = network
-
- def get_network(self):
- if not self._network:
- msg = "Network segment is not available for interface '%s'"
\
- % self.get_id()
- raise MachineError(msg)
- return self._network
-
- def set_option(self, name, value):
- self._options.append((name, value))
-
- def set_netem(self, netem):
- self._netem = netem
-
- def add_master(self, master, primary=True):
- if primary and self._master["primary"] != None:
- msg = "Interface %s already has a primary master."\
- % self.get_id()
- raise MachineError(msg)
- else:
- if primary:
- self._master["primary"] = master
- else:
- self._master["other"].append(master)
-
- def del_master(self, master):
- if self._master["primary"] is master:
- self._master["primary"] = None
- else:
- self._master["other"].remove(master)
-
- def get_primary_master(self):
- return self._master["primary"]
-
- def add_slave(self, iface):
- self._slaves[iface.get_id()] = iface
- if self._type in ["vlan", "vxlan"]:
- iface.add_master(self, primary=False)
- else:
- iface.add_master(self)
-
- def del_slave(self, iface):
- iface.del_master(self)
- del self._slaves[iface.get_id()]
-
- def set_slave_option(self, slave_id, name, value):
- if slave_id not in self._slave_options:
- self._slave_options[slave_id] = []
- self._slave_options[slave_id].append((name, value))
-
- def add_address(self, addr):
- if (type(addr) == type([])):
- for one_addr in addr:
- self._addresses.append(one_addr)
- else:
- self._addresses.append(addr)
-
- def get_address(self, num):
- return self._addresses[num].split('/')[0]
-
- def get_addresses(self):
- addrs = []
- for addr in self._addresses:
- addrs.append(tuple(addr.split('/')))
- return addrs
-
- def set_ovs_conf(self, ovs_conf):
- self._ovs_conf = ovs_conf
-
- def get_ovs_conf(self):
- return self._ovs_conf
-
- def set_netns(self, netns):
- self._netns = netns
-
- def get_netns(self):
- return self._netns
-
- def get_host(self):
- return self._machine.get_id()
-
- def set_peer(self, peer):
- self._peer = peer
-
- def get_peer(self):
- return self._peer
-
- def get_prefix(self, num):
- try:
- return self._addresses[num].split('/')[1]
- except IndexError:
- raise PrefixMissingError
-
- def get_mtu(self):
- return self._mtu
-
- def set_mtu(self, mtu):
- command = {"type": "config",
- "host": self._machine.get_id(),
- "persistent": False,
- "options":[
- {"name": "/sys/class/net/%s/mtu" %
self._devname,
- "value": str(mtu)}
- ]}
- command["netns"] = self._netns
-
- self._machine.run_command(command)
- self._mtu = mtu
- return self._mtu
-
- def link_stats(self):
- stats = self._machine._rpc_call_x(self._netns, "link_stats",
- self._id)
- return stats
-
- def set_addresses(self, ips):
- self._addresses = ips
- self._machine._rpc_call_x(self._netns, "set_addresses",
- self._id, ips)
-
- def add_route(self, dest):
- self._machine._rpc_call_x(self._netns, "add_route",
- self._id, dest)
-
- def del_route(self, dest):
- self._machine._rpc_call_x(self._netns, "del_route",
- self._id, dest)
-
- def update_from_slave(self):
- if_data = self._machine._rpc_call_x(self._netns, "get_if_data",
- self._id)
-
- if if_data is not None:
- self.update(if_data)
- return
-
- def update(self, if_data):
- self.set_hwaddr(if_data["hwaddr"])
- self.set_devname(if_data["name"])
- self._mtu = if_data["mtu"]
- self._driver = if_data["driver"]
- self._devlink = if_data["devlink"]
-
- def get_config(self):
- config = {"id": self._id,
- "hwaddr": self._hwaddr,
- "devname": self._devname,
- "network_label": self._network,
- "type": self._type,
- "addresses": self._addresses,
- "slaves": self._slaves.keys(),
- "options": self._options,
- "slave_options": self._slave_options,
- "master": None,
- "other_masters": [],
- "ovs_conf": self._ovs_conf,
- "netns": self._netns,
- "peer": self._peer,
- "netem": self._netem,
- "mtu": self._mtu,
- "driver": self._driver}
-
- if self._master["primary"] != None:
- config["master"] = self._master["primary"].get_id()
-
- for m in self._master["other"]:
- config["other_masters"].append(m.get_id())
-
- return config
-
- def up(self):
- self._machine._rpc_call_x(self._netns, "set_device_up", self._id)
-
- def down(self):
- self._machine._rpc_call_x(self._netns, "set_device_down", self._id)
-
- def set_link_up(self):
- self._machine._rpc_call_x(self._netns, "set_link_up", self._id)
-
- def set_link_down(self):
- self._machine._rpc_call_x(self._netns, "set_link_down", self._id)
-
- def initialize(self):
- phys_devs = self._machine._rpc_call("map_if_by_hwaddr",
- self._id, self._hwaddr)
-
- if len(phys_devs) == 1:
- self.set_devname(phys_devs[0]["name"])
- elif len(phys_devs) < 1:
- msg = "Device %s not found on machine %s" \
- % (self.get_id(), self._machine.get_id())
- raise MachineError(msg)
- elif len(phys_devs) > 1:
- msg = "More than one device with hwaddr %s found on machine %s" \
- % (self._hwaddr, self._machine.get_id())
- raise MachineError(msg)
-
- self.down()
-
- def cleanup(self):
- self._machine._rpc_call("unmap_if", self._id)
-
- def configure(self):
- if self._configured:
- msg = "Unable to configure interface %s on machine %s. " \
- "It has been configured already." % (self.get_id(),
- self._machine.get_id())
- raise MachineError(msg)
- else:
- self._configured = True
-
- logging.info("Configuring interface %s on machine %s", self.get_id(),
- self._machine.get_id())
-
- if self._netns != None:
- self._machine._rpc_call("set_if_netns", self.get_id(),
self._netns)
- self._machine._rpc_call_x(self._netns, "configure_interface",
- self.get_id(), self.get_config())
-
- self.update_from_slave()
-
- def deconfigure(self):
- if not self._configured:
- return
-
- self._machine._rpc_call_x(self._netns, "deconfigure_interface",
- self.get_id())
- if self._netns != None:
- self._machine._rpc_call_to_netns(self._netns,
- "return_if_netns", self.get_id())
- self._configured = False
-
- def add_br_vlan(self, br_vlan_info):
- self._machine._rpc_call_x(self._netns, "add_br_vlan",
- self._id, br_vlan_info)
-
- def del_br_vlan(self, br_vlan_info):
- self._machine._rpc_call_x(self._netns, "del_br_vlan",
- self._id, br_vlan_info)
-
- def get_br_vlans(self):
- return self._machine._rpc_call_x(self._netns, "get_br_vlans",
self._id)
-
- def add_br_fdb(self, br_fdb_info):
- self._machine._rpc_call_x(self._netns, "add_br_fdb",
- self._id, br_fdb_info)
-
- def del_br_fdb(self, br_fdb_info):
- self._machine._rpc_call_x(self._netns, "del_br_fdb",
- self._id, br_fdb_info)
-
- def get_br_fdbs(self):
- return self._machine._rpc_call_x(self._netns, "get_br_fdbs", self._id)
-
- def set_br_learning(self, br_learning_info):
- self._machine._rpc_call_x(self._netns, "set_br_learning", self._id,
- br_learning_info)
-
- def set_br_learning_sync(self, br_learning_sync_info):
- self._machine._rpc_call_x(self._netns, "set_br_learning_sync",
self._id,
- br_learning_sync_info)
-
- def set_br_flooding(self, br_flooding_info):
- self._machine._rpc_call_x(self._netns, "set_br_flooding", self._id,
- br_flooding_info)
-
- def set_br_state(self, br_state_info):
- self._machine._rpc_call_x(self._netns, "set_br_state", self._id,
- br_state_info)
-
- def set_speed(self, speed):
- self._machine._rpc_call_x(self._netns, "set_speed", self._id, speed)
-
- def set_autoneg(self):
- self._machine._rpc_call_x(self._netns, "set_autoneg", self._id)
-
- def slave_add(self, if_id):
- self._machine._rpc_call_x(self._netns, "slave_add", self._id, if_id)
- self.add_slave(self._machine.get_interface(if_id))
-
- def slave_del(self, if_id):
- self.del_slave(self._machine.get_interface(if_id))
- self._machine._rpc_call_x(self._netns, "slave_del", self._id, if_id)
-
- def get_devlink_name(self):
- if self._devlink:
- return "%s/%s" % (self._devlink["bus_name"],
- self._devlink["dev_name"])
- return None
-
- def get_devlink_port_name(self):
- if self._devlink:
- return "%s/%u" % (self.get_devlink_name(),
- self._devlink["port_index"])
- return None
-
-class StaticInterface(Interface):
- """ Static interface
-
- This class represents interfaces that are present on the
- machine. LNST will only use them for testing without performing
- any special actions.
-
- This type is suitable for physical interfaces.
- """
- def __init__(self, machine, if_id, if_type):
- super(StaticInterface, self).__init__(machine, if_id, if_type)
-
-class LoopbackInterface(Interface):
- """ Static interface
-
- This class represents interfaces that are present on the
- machine. LNST will only use them for testing without performing
- any special actions.
-
- This type is suitable for physical interfaces.
- """
- def __init__(self, machine, if_id, if_type):
- super(LoopbackInterface, self).__init__(machine, if_id, if_type)
-
- def initialize(self):
- pass
-
- def cleanup(self):
- pass
-
- def configure(self):
- self._hwaddr = '00:00:00:00:00:00'
- self._driver = 'loopback'
-
- phys_devs = self._machine._rpc_call_x(self._netns,
- "map_if_by_params", self._id,
- { 'hwaddr': self._hwaddr,
- 'driver': self._driver })
-
- if len(phys_devs) == 1:
- self.set_devname(phys_devs[0]["name"])
- elif len(phys_devs) < 1:
- msg = "Device %s not found on machine %s" \
- % (self.get_id(), self._machine.get_id())
- raise MachineError(msg)
- elif len(phys_devs) > 1:
- msg = "More than one device with hwaddr %s found on machine %s" \
- % (self._hwaddr, self._machine.get_id())
- raise MachineError(msg)
-
- if self._configured:
- msg = "Unable to configure interface %s on machine %s. " \
- "It has been configured already." % (self.get_id(),
- self._machine.get_id())
- raise MachineError(msg)
-
- logging.info("Configuring interface %s on machine %s", self.get_id(),
- self._machine.get_id())
-
- self._machine._rpc_call_x(self._netns, "configure_interface",
- self.get_id(), self.get_config())
- self._configured = True
- self.update_from_slave()
-
- def deconfigure(self):
- if not self._configured:
- return
-
- self._machine._rpc_call_x(self._netns, "deconfigure_interface",
- self.get_id())
- self._machine._rpc_call_x(self._netns, "unmap_if", self.get_id())
- self._configured = False
-
-class VirtualInterface(Interface):
- """ Dynamically created interface
-
- This class represents interfaces in libvirt virtual machines
- that were created dynamically by LNST just for this test.
-
- This requires some special handling and communication with
- libvirt.
- """
- def __init__(self, machine, if_id, if_type):
- super(VirtualInterface, self).__init__(machine, if_id, if_type)
- self._driver = "virtio"
-
- def set_driver(self, driver):
- self._driver = driver
-
- def get_driver(self):
- return self._driver
-
- def get_orig_hwaddr(self):
- if not self._orig_hwaddr:
- msg = "Hardware address is not available for interface
'%s'" \
- % self.get_id()
- raise MachineError(msg)
- return self._orig_hwaddr
-
- def initialize(self):
- domain_ctl = self._machine.get_domain_ctl()
-
- if self._hwaddr:
- query = self._machine._rpc_call('get_devices_by_hwaddr',
- self._hwaddr)
- if len(query):
- msg = "Device with hwaddr %s already exists" % self._hwaddr
- raise MachineError(msg)
- else:
- mac_pool = self._machine.get_mac_pool()
- while True:
- self._hwaddr = normalize_hwaddr(mac_pool.get_addr())
- query = self._machine._rpc_call('get_devices_by_hwaddr',
- self._hwaddr)
- if not len(query):
- break
-
- bridges = self._machine.get_network_bridges()
- if self._network in bridges:
- net_ctl = bridges[self._network]
- else:
- bridges[self._network] = net_ctl = VirtNetCtl()
- net_ctl.init()
-
- net_name = net_ctl.get_name()
-
- logging.info("Creating interface %s (%s) on machine %s",
- self.get_id(), self._hwaddr, self._machine.get_id())
-
- self._orig_hwaddr = self._hwaddr
- domain_ctl.attach_interface(self._hwaddr, net_name, self._driver)
-
-
- # The sleep here is necessary, because udev sometimes renames the
- # newly created device and if the query for name comes too early,
- # the controller will then try to configure an nonexistent device
- sleep(1)
-
- ready = wait_for(self.is_ready, timeout=10)
-
- if not ready:
- msg = "Netdevice initialization failed." \
- "Unable to create device %s (%s) on machine %s" \
- % (self.get_id(), self._hwaddr, self._machine.get_id())
- raise MachineError(msg)
-
- super(VirtualInterface, self).initialize()
-
- def cleanup(self):
- self._machine._rpc_call("unmap_if", self._id)
- domain_ctl = self._machine.get_domain_ctl()
- domain_ctl.detach_interface(self._orig_hwaddr)
-
- def is_ready(self):
- ifaces = self._machine._rpc_call('get_devices_by_hwaddr', self._hwaddr)
- return len(ifaces) > 0
-
-class SoftInterface(Interface):
- """ Software interface abstraction
-
- This type of interface represents interfaces created in the kernel
- during the runtime. This includes devices such as bonds and teams.
- """
-
- def __init__(self, machine, if_id, if_type):
- super(SoftInterface, self).__init__(machine, if_id, if_type)
-
- def initialize(self):
- pass
-
- def cleanup(self):
- pass
-
- def configure(self):
- if self._configured:
- return
- else:
- self._configured = True
-
- logging.info("Configuring interface %s on machine %s", self.get_id(),
- self._machine.get_id())
-
- if self._type == "veth":
- peer_if = self._machine.get_interface(self._peer)
- peer_config = peer_if.get_config()
- dev_name, peer_name = self._machine._rpc_call("create_if_pair",
- self._id, self.get_config(),
- self._peer, peer_config)
- self.set_devname(dev_name)
- peer_if.set_devname(peer_name)
- self._configured = True
- peer_if._configured = True
- return
-
- dev_name = self._machine._rpc_call_x(self._netns,
- "create_soft_interface",
- self._id, self.get_config())
- self.set_devname(dev_name)
- self.update_from_slave()
-
- def deconfigure(self):
- if not self._configured:
- return
-
- if self._type == "veth":
- peer_if = self._machine.get_interface(self._peer)
-
- self._machine._rpc_call("deconfigure_if_pair", self._id,
self._peer)
- self._machine._rpc_call("unmap_if", self._id)
- self._machine._rpc_call("unmap_if", self._peer)
-
- self._configured = False
- peer_if._configured = False
- return
-
- self._machine._rpc_call_x(self._netns, "deconfigure_interface",
- self.get_id())
- self._machine._rpc_call_x(self._netns, "unmap_if", self.get_id())
- self._configured = False
-
-class UnusedInterface(Interface):
- """ Unused interface for this test
-
- This class represents interfaces that will not be used in the
- current test setup. This applies when a slave machine from a
- pool has more interfaces then the machine it was matched to
- from the recipe.
-
- LNST still needs to know about these interfaces so it can turn
- them off.
- """
-
- def __init__(self, machine, if_id, if_type):
- super(UnusedInterface, self).__init__(machine, if_id, if_type)
-
- def initialize(self):
- self._machine._rpc_call('set_unmapped_device_down', self._hwaddr)
-
- def set_driver(self, driver):
- pass
-
- def configure(self):
- pass
-
- def deconfigure(self):
- pass
-
- def up(self):
- pass
-
- def down(self):
- pass
-
- def cleanup(self):
- pass
-
-class Device(object):
- """ Represents device information received from a
Slave"""
-
- def pre_call_decorate(func):
- @wraps(func)
- def func_wrapper(inst, *args, **kwargs):
- inst.slave_update()
- return func(inst, *args, **kwargs)
- return func_wrapper
-
- def __init__(self, data, machine):
- self._if_index = data["if_index"]
- self._hwaddr = None
- self._name = None
- self._ip_addrs = None
- self._ifi_type = None
- self._state = None
- self._master = None
- self._slaves = None
- self._netns = None
- self._peer = None
- self._mtu = None
- self._driver = None
- self._devlink = None
-
- self._machine = machine
-
- self.update_data(data)
-
- def update_data(self, data):
- if data["if_index"] != self._if_index:
- return False
-
- self._hwaddr = data["hwaddr"]
- self._name = data["name"]
- self._ip_addrs = data["ip_addrs"]
- self._ifi_type = data["ifi_type"]
- self._state = data["state"]
- self._master = data["master"]
- self._slaves = data["slaves"]
- self._netns = data["netns"]
- self._peer = data["peer"]
- self._mtu = data["mtu"]
- self._driver = data["driver"]
- self._devlink = data["driver"]
- return True
-
- def slave_update(self):
- res = self._machine._rpc_call_x(self._netns,
- "get_device",
- self._if_index)
- if res:
- self.update_data(res)
- return
-
- def get_if_index(self):
- return self._if_index
-
- @pre_call_decorate
- def get_hwaddr(self):
- return self._hwaddr
-
- @pre_call_decorate
- def get_name(self):
- return self._name
-
- @pre_call_decorate
- def get_ip_addrs(self, selector={}):
- return [ip["addr"]
- for ip in self._ip_addrs
- if selector.items() <= ip.items()]
-
- @pre_call_decorate
- def get_ip_addr(self, num, selector={}):
- ips = self.get_ip_addrs(selector)
- return ips[num]
-
- @pre_call_decorate
- def get_ifi_type(self):
- return self._ifi_type
-
- @pre_call_decorate
- def get_state(self):
- return self._state
-
- @pre_call_decorate
- def get_master(self):
- return self._master
-
- @pre_call_decorate
- def get_slaves(self):
- return self._slaves
-
- @pre_call_decorate
- def get_netns(self):
- return self._netns
-
- @pre_call_decorate
- def get_peer(self):
- return self._peer
-
- @pre_call_decorate
- def get_mtu(self):
- return self._mtu
-
- def set_mtu(self, mtu):
- command = {"type": "config",
- "host": self._machine.get_id(),
- "persistent": False,
- "options":[
- {"name": "/sys/class/net/%s/mtu" %
self._name,
- "value": str(mtu)}
- ]}
- command["netns"] = self._netns
-
- self._machine.run_command(command)
-
- self.slave_update()
- return self._mtu
-
- @pre_call_decorate
- def get_driver(self):
- return self._driver
-
- @pre_call_decorate
- def get_devlink_name(self):
- if self._devlink:
- return "%s/%s" % (self._devlink["bus_name"],
- self._devlink["dev_name"])
- return None
-
- @pre_call_decorate
- def get_devlink_port_name(self):
- if self._devlink:
- return "%s/%u" % (self.get_devlink_name(),
- self._devlink["port_index"])
- return None
diff --git a/lnst/Slave/NetTestSlave.py b/lnst/Slave/NetTestSlave.py
index b18f47d..0f654e3 100644
--- a/lnst/Slave/NetTestSlave.py
+++ b/lnst/Slave/NetTestSlave.py
@@ -19,8 +19,10 @@ import datetime
import socket
import ctypes
import multiprocessing
+import imp
+import types
from time import sleep, time
-from xmlrpclib import Binary
+from inspect import isclass
from tempfile import NamedTemporaryFile
from lnst.Common.Logs import log_exc_traceback
from lnst.Common.PacketCapture import PacketCapture
@@ -34,118 +36,150 @@ from lnst.Common.Utils import check_process_running
from lnst.Common.Utils import is_installed
from lnst.Common.ConnectionHandler import send_data
from lnst.Common.ConnectionHandler import ConnectionHandler
-from lnst.Common.Config import lnst_config
from lnst.Common.Config import DefaultRPCPort
+from lnst.Common.DeviceRef import DeviceRef
+from lnst.Common.LnstError import LnstError
+from lnst.Common.DeviceError import DeviceDeleted
+from lnst.Common.IpAddress import IpAddress
+from lnst.Slave.Job import Job, JobContext
from lnst.Slave.InterfaceManager import InterfaceManager
from lnst.Slave.BridgeTool import BridgeTool
from lnst.Slave.SlaveSecSocket import SlaveSecSocket, SecSocketException
+Devices = types.ModuleType("Devices")
+Devices.__path__ = ["lnst.Devices"]
+
+sys.modules["lnst.Devices"] = Devices
+
class SlaveMethods:
'''
Exported xmlrpc methods
'''
- def __init__(self, command_context, log_ctl, if_manager, net_namespaces,
- server_handler, slave_server):
+ def __init__(self, job_context, log_ctl, net_namespaces,
+ server_handler, slave_config, slave_server):
self._packet_captures = {}
- self._if_manager = if_manager
- self._command_context = command_context
+ self._if_manager = None
+ self._job_context = job_context
self._log_ctl = log_ctl
self._net_namespaces = net_namespaces
self._server_handler = server_handler
self._slave_server = slave_server
+ self._slave_config = slave_config
self._capture_files = {}
self._copy_targets = {}
self._copy_sources = {}
self._system_config = {}
- self._cache = ResourceCache(lnst_config.get_option("cache",
"dir"),
- lnst_config.get_option("cache",
"expiration_period"))
+ self._cache = ResourceCache(slave_config.get_option("cache",
"dir"),
+ slave_config.get_option("cache",
"expiration_period"))
+
+ self._dynamic_modules = {}
+ self._dynamic_classes = {}
+
+ self._bkp_nm_opt_val = slave_config.get_option("environment",
"use_nm")
+
+ def hello(self):
+ logging.info("Recieved a controller connection.")
+
+ slave_desc = {}
+ if check_process_running("NetworkManager"):
+ slave_desc["nm_running"] = True
+ else:
+ slave_desc["nm_running"] = False
- self._resource_table = {'module': {}, 'tools': {}}
+ k_release, _ = exec_cmd("uname -r", False, False, False)
+ r_release, _ = exec_cmd("cat /etc/redhat-release", False, False,
False)
+ slave_desc["kernel_release"] = k_release.strip()
+ slave_desc["redhat_release"] = r_release.strip()
+ slave_desc["lnst_version"] = self._slave_config.version
- self._bkp_nm_opt_val = lnst_config.get_option("environment",
"use_nm")
+ return ("hello", slave_desc)
- def hello(self, recipe_path):
+ def set_recipe(self, recipe_name):
self.machine_cleanup()
self.restore_nm_option()
- logging.info("Recieved a controller connection.")
- self.clear_resource_table()
self._cache.del_old_entries()
self.reset_file_transfers()
- self._if_manager.rescan_devices()
-
date = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
- self._log_ctl.set_recipe(recipe_path, expand=date)
+ self._log_ctl.set_recipe(recipe_name, expand=date)
sleep(1)
- slave_desc = {}
if check_process_running("NetworkManager"):
logging.warning("=============================================")
logging.warning("NetworkManager is running on a slave machine!")
- if lnst_config.get_option("environment", "use_nm"):
+ if self._slave_config.get_option("environment",
"use_nm"):
logging.warning("Support of NM is still experimental!")
else:
logging.warning("Usage of NM is disabled!")
logging.warning("=============================================")
- slave_desc["nm_running"] = True
- else:
- slave_desc["nm_running"] = False
- k_release, _ = exec_cmd("uname -r", False, False, False)
- r_release, _ = exec_cmd("cat /etc/redhat-release", False, False,
False)
- slave_desc["kernel_release"] = k_release.strip()
- slave_desc["redhat_release"] = r_release.strip()
- slave_desc["lnst_version"] = lnst_config.version
-
- return ("hello", slave_desc)
+ return True
def bye(self):
self.restore_system_config()
- self.clear_resource_table()
self._cache.del_old_entries()
self.reset_file_transfers()
self._remove_capture_files()
return "bye"
- def kill_cmds(self):
- logging.info("Killing all forked processes.")
- self._command_context.cleanup()
- return "Commands killed"
+ def map_device_class(self, cls_name, module_name):
+ if cls_name in self._dynamic_classes:
+ return
- def map_if_by_hwaddr(self, if_id, hwaddr):
- devices = self.map_if_by_params(if_id, {'hwaddr' : hwaddr})
+ module = self._dynamic_modules[module_name]
+ cls = getattr(module, cls_name)
- return devices
+ self._dynamic_classes[cls_name] = cls
- def map_if_by_params(self, if_id, params):
- devices = self.get_devices_by_params(params)
+ setattr(Devices, cls_name, cls)
- if len(devices) == 1:
- dev = self._if_manager.get_device_by_params(params)
- self._if_manager.map_if(if_id, dev.get_if_index())
+ def load_cached_module(self, module_name, res_hash):
+ self._cache.renew_entry(res_hash)
+ if module_name in self._dynamic_modules:
+ return
+ module_path = self._cache.get_path(res_hash)
+ module = imp.load_source(module_name, module_path)
+ self._dynamic_modules[module_name] = module
- return devices
+ def init_if_manager(self):
+ self._if_manager = InterfaceManager(self._server_handler)
+ for cls_name in dir(Devices):
+ cls = getattr(Devices, cls_name)
+ if isclass(cls):
+ self._if_manager.add_device_class(cls_name, cls)
- def unmap_if(self, if_id):
- self._if_manager.unmap_if(if_id)
+ self._if_manager.rescan_devices()
+ self._server_handler.set_if_manager(self._if_manager)
+ self._server_handler.add_connection('netlink',
+ self._if_manager.get_nl_socket())
return True
+ def dev_method(self, if_index, name, args, kwargs):
+ dev = self._if_manager.get_device(if_index)
+ method = getattr(dev, name)
+
+ return method(*args, **kwargs)
+
+ def dev_attr(self, if_index, name):
+ dev = self._if_manager.get_device(if_index)
+ return getattr(dev, name)
+
def get_devices(self):
self._if_manager.rescan_devices()
devices = self._if_manager.get_devices()
result = {}
for device in devices:
- result[device._if_index] = device.get_if_data()
+ result[device.if_index] = device._get_if_data()
return result
def get_device(self, if_index):
self._if_manager.rescan_devices()
device = self._if_manager.get_device(if_index)
if device:
- return device.get_if_data()
+ return device._get_if_data()
else:
return None
@@ -156,7 +190,6 @@ class SlaveMethods:
for entry in name_scan:
if entry["name"] == devname:
netdevs.append(entry)
-
return netdevs
def get_devices_by_hwaddr(self, hwaddr):
@@ -167,14 +200,13 @@ class SlaveMethods:
entry = {"name": dev.get_name(),
"hwaddr": dev.get_hwaddr()}
matched.append(entry)
-
return matched
def get_devices_by_params(self, params):
devices = self._if_manager.get_devices()
matched = []
for dev in devices:
- dev_data = dev.get_if_data()
+ dev_data = dev._get_if_data()
entry = {"name": dev.get_name(),
"hwaddr": dev.get_hwaddr()}
for key, value in params.iteritems():
@@ -184,180 +216,121 @@ class SlaveMethods:
if entry is not None:
matched.append(entry)
-
return matched
- def get_if_data(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is None:
- return None
- return dev.get_if_data()
-
- def link_stats(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is None:
- logging.error("Device with id '%s' not found." % if_id)
- return None
- return dev.link_stats()
-
- def set_addresses(self, if_id, ips):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is None:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- dev.set_addresses(ips)
- return True
-
- def add_route(self, if_id, dest):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is None:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- dev.add_route(dest)
- return True
-
- def del_route(self, if_id, dest):
- dev = self._if_manager.get_mapped_device(if_id)
+ def destroy_devices(self):
+ devices = self._if_manager.get_devices()
+ for dev in devices:
+ try:
+ dev.destroy()
+ except DeviceDeleted:
+ pass
+ self._if_manager.rescan_devices()
+
+ # def add_route(self, if_id, dest):
+ # dev = self._if_manager.get_mapped_device(if_id)
+ # if dev is None:
+ # logging.error("Device with id '%s' not found." % if_id)
+ # return False
+ # dev.add_route(dest)
+ # return True
+
+ # def del_route(self, if_id, dest):
+ # dev = self._if_manager.get_mapped_device(if_id)
+ # if dev is None:
+ # logging.error("Device with id '%s' not found." % if_id)
+ # return False
+ # dev.del_route(dest)
+ # return True
+
+ def create_device(self, clsname, args=[], kwargs={}):
+ dev = self._if_manager.create_device(clsname, args, kwargs)
if dev is None:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- dev.del_route(dest)
- return True
-
- def set_device_up(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- dev.up()
- return True
-
- def set_device_down(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.down()
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
- def set_link_up(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.link_up()
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
- def set_link_down(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.link_down()
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
- def set_unmapped_device_down(self, hwaddr):
- dev = self._if_manager.get_device_by_hwaddr(hwaddr)
- if dev is not None:
- dev.down()
- else:
- logging.warning("Device with hwaddr '%s' not found." %
hwaddr)
- return True
-
- def configure_interface(self, if_id, config):
- device = self._if_manager.get_mapped_device(if_id)
- device.set_configuration(config)
- device.configure()
- return True
-
- def create_soft_interface(self, if_id, config):
- dev_name = self._if_manager.create_device_from_config(if_id, config)
- dev = self._if_manager.get_mapped_device(if_id)
- dev.configure()
- return dev_name
-
- def create_if_pair(self, if_id1, config1, if_id2, config2):
- dev_names = self._if_manager.create_device_pair(if_id1, config1,
- if_id2, config2)
- dev1 = self._if_manager.get_mapped_device(if_id1)
- dev2 = self._if_manager.get_mapped_device(if_id2)
-
- while dev1.get_if_index() == None and dev2.get_if_index() == None:
- msgs = self._server_handler.get_messages_from_con('netlink')
- for msg in msgs:
- self._if_manager.handle_netlink_msgs(msg[1]["data"])
-
- if config1["netns"] != None:
- hwaddr = dev1.get_hwaddr()
- self.set_if_netns(if_id1, config1["netns"])
-
- msg = {"type": "command", "method_name":
"configure_interface",
- "args": [if_id1, config1]}
- self._server_handler.send_data_to_netns(config1["netns"], msg)
- result = self._slave_server.wait_for_result(config1["netns"])
- if result["result"] != True:
- raise Exception("Configuration failed.")
- else:
- dev1.configure()
- if config2["netns"] != None:
- hwaddr = dev2.get_hwaddr()
- self.set_if_netns(if_id2, config2["netns"])
-
- msg = {"type": "command", "method_name":
"configure_interface",
- "args": [if_id2, config2]}
- self._server_handler.send_data_to_netns(config2["netns"], msg)
- result = self._slave_server.wait_for_result(config2["netns"])
- if result["result"] != True:
- raise Exception("Configuration failed.")
- else:
- dev2.configure()
- return dev_names
-
- def deconfigure_if_pair(self, if_id1, if_id2):
- dev1 = self._if_manager.get_mapped_device(if_id1)
- dev2 = self._if_manager.get_mapped_device(if_id2)
-
- if dev1.get_netns() == None:
- dev1.deconfigure()
- else:
- netns = dev1.get_netns()
-
- msg = {"type": "command", "method_name":
"deconfigure_interface",
- "args": [if_id1]}
- self._server_handler.send_data_to_netns(netns, msg)
- result = self._slave_server.wait_for_result(netns)
- if result["result"] != True:
- raise Exception("Deconfiguration failed.")
-
- self.return_if_netns(if_id1)
-
- if dev2.get_netns() == None:
- dev2.deconfigure()
- else:
- netns = dev2.get_netns()
-
- msg = {"type": "command", "method_name":
"deconfigure_interface",
- "args": [if_id2]}
- self._server_handler.send_data_to_netns(netns, msg)
- result = self._slave_server.wait_for_result(netns)
- if result["result"] != True:
- raise Exception("Deconfiguration failed.")
-
- self.return_if_netns(if_id2)
-
- dev1.destroy()
- dev2.destroy()
- dev1.del_configuration()
- dev2.del_configuration()
- return True
-
- def deconfigure_interface(self, if_id):
- device = self._if_manager.get_mapped_device(if_id)
- if device is not None:
- device.clear_configuration()
- else:
- logging.error("No device with id '%s' to deconfigure." %
if_id)
- return True
+ raise Exception("Device creation failed")
+ return {"if_index": dev.if_index, "name": dev.name}
+
+ # def create_if_pair(self, if_id1, config1, if_id2, config2):
+ # dev_names = self._if_manager.create_device_pair(if_id1, config1,
+ # if_id2, config2)
+ # dev1 = self._if_manager.get_mapped_device(if_id1)
+ # dev2 = self._if_manager.get_mapped_device(if_id2)
+
+ # while dev1.get_if_index() == None and dev2.get_if_index() == None:
+ # msgs = self._server_handler.get_messages_from_con('netlink')
+ # for msg in msgs:
+ # self._if_manager.handle_netlink_msgs(msg[1]["data"])
+
+ # if config1["netns"] != None:
+ # hwaddr = dev1.get_hwaddr()
+ # self.set_if_netns(if_id1, config1["netns"])
+
+ # msg = {"type": "command", "method_name":
"configure_interface",
+ # "args": [if_id1, config1]}
+ # self._server_handler.send_data_to_netns(config1["netns"], msg)
+ # result = self._slave_server.wait_for_result(config1["netns"])
+ # if result["result"] != True:
+ # raise Exception("Configuration failed.")
+ # else:
+ # dev1.configure()
+ # if config2["netns"] != None:
+ # hwaddr = dev2.get_hwaddr()
+ # self.set_if_netns(if_id2, config2["netns"])
+
+ # msg = {"type": "command", "method_name":
"configure_interface",
+ # "args": [if_id2, config2]}
+ # self._server_handler.send_data_to_netns(config2["netns"], msg)
+ # result = self._slave_server.wait_for_result(config2["netns"])
+ # if result["result"] != True:
+ # raise Exception("Configuration failed.")
+ # else:
+ # dev2.configure()
+ # return dev_names
+
+ # def deconfigure_if_pair(self, if_id1, if_id2):
+ # dev1 = self._if_manager.get_mapped_device(if_id1)
+ # dev2 = self._if_manager.get_mapped_device(if_id2)
+
+ # if dev1.get_netns() == None:
+ # dev1.deconfigure()
+ # else:
+ # netns = dev1.get_netns()
+
+ # msg = {"type": "command", "method_name":
"deconfigure_interface",
+ # "args": [if_id1]}
+ # self._server_handler.send_data_to_netns(netns, msg)
+ # result = self._slave_server.wait_for_result(netns)
+ # if result["result"] != True:
+ # raise Exception("Deconfiguration failed.")
+
+ # self.return_if_netns(if_id1)
+
+ # if dev2.get_netns() == None:
+ # dev2.deconfigure()
+ # else:
+ # netns = dev2.get_netns()
+
+ # msg = {"type": "command", "method_name":
"deconfigure_interface",
+ # "args": [if_id2]}
+ # self._server_handler.send_data_to_netns(netns, msg)
+ # result = self._slave_server.wait_for_result(netns)
+ # if result["result"] != True:
+ # raise Exception("Deconfiguration failed.")
+
+ # self.return_if_netns(if_id2)
+
+ # dev1.destroy()
+ # dev2.destroy()
+ # dev1.del_configuration()
+ # dev2.del_configuration()
+ # return True
+
+ # def deconfigure_interface(self, if_id):
+ # device = self._if_manager.get_mapped_device(if_id)
+ # if device is not None:
+ # device.clear_configuration()
+ # else:
+ # logging.error("No device with id '%s' to deconfigure." %
if_id)
+ # return True
def start_packet_capture(self, filt):
if not is_installed("tcpdump"):
@@ -448,103 +421,67 @@ class SlaveMethods:
return int(remaining)
- def run_command(self, command):
- cmd = NetTestCommand(self._command_context, command,
- self._resource_table, self._log_ctl)
+ def run_job(self, job):
+ job_instance = Job(job, self._log_ctl)
+ self._job_context.add_job(job_instance)
- if self._command_context.get_cmd(cmd.get_id()) != None:
- prev_cmd = self._command_context.get_cmd(cmd.get_id())
- if not prev_cmd.get_result_sent():
- if cmd.get_id() is None:
- raise Exception("Previous foreground command still "\
- "running!")
- else:
- raise Exception("Different command with id '%s' "\
- "still running!" % cmd.get_id())
- else:
- self._command_context.del_cmd(cmd)
- self._command_context.add_cmd(cmd)
+ res = job_instance.run()
- res = cmd.run()
- if not cmd.forked():
- self._command_context.del_cmd(cmd)
+ return res
- if command["type"] == "config":
- if res["passed"]:
-
self._update_system_config(res["res_data"]["options"],
- command["persistent"])
- else:
- err = "Error occured while setting system "\
- "configuration (%s)" %
res["res_data"]["err_msg"]
- logging.error(err)
+ def kill_job(self, job_id, signal):
+ job = self._job_context.get_job(job_id)
- return res
+ if job is None:
+ logging.error("No job %s found" % job_id)
+ return False
- def kill_command(self, id):
- cmd = self._command_context.get_cmd(id)
- if cmd is not None:
- if not cmd.get_result_sent():
- cmd.kill(None)
- result = cmd.get_result()
- cmd.set_result_sent()
- return result
- else:
- pass
- else:
- raise Exception("No command with id '%s'." % id)
+ return job.kill(signal)
+
+ def kill_jobs(self):
+ logging.info("Killing all forked processes.")
+ self._job_context.cleanup()
+ return "Commands killed"
def machine_cleanup(self):
logging.info("Performing machine cleanup.")
- self._command_context.cleanup()
+ self._job_context.cleanup()
self.restore_system_config()
- devs = self._if_manager.get_mapped_devices()
- for if_id, dev in devs.iteritems():
- peer = dev.get_peer()
- if peer == None:
- dev.clear_configuration()
- else:
- peer_if_index = peer.get_if_index()
- peer_id = self._if_manager.get_id_by_if_index(peer_if_index)
- self.deconfigure_if_pair(if_id, peer_id)
-
- self._if_manager.deconfigure_all()
+ if self._if_manager is not None:
+ self._if_manager.deconfigure_all()
for netns in self._net_namespaces.keys():
self.del_namespace(netns)
self._net_namespaces = {}
- self._if_manager.clear_if_mapping()
+ for cls_name, cls in self._dynamic_classes.items():
+ delattr(Devices, cls_name)
+
+ for module_name, module in self._dynamic_modules.items():
+ del sys.modules[module_name]
+
+ self._dynamic_classes = {}
+ self._dynamic_modules = {}
+ self._if_manager = None
+ self._server_handler.set_if_manager(None)
self._cache.del_old_entries()
self._remove_capture_files()
return True
- def clear_resource_table(self):
- self._resource_table = {'module': {}, 'tools': {}}
- return True
-
def has_resource(self, res_hash):
if self._cache.query(res_hash):
return True
return False
- def map_resource(self, res_hash, res_type, res_name):
- resource_location = self._cache.get_path(res_hash)
-
- if not res_type in self._resource_table:
- self._resource_table[res_type] = {}
-
- self._resource_table[res_type][res_name] = resource_location
- self._cache.renew_entry(res_hash)
-
- return True
-
- def add_resource_to_cache(self, file_hash, local_path, name,
- res_hash, res_type):
- self._cache.add_cache_entry(file_hash, local_path, name, res_type)
- return True
+ def add_resource_to_cache(self, res_type, local_path, name):
+ if res_type == "file":
+ self._cache.add_file_entry(local_path, name)
+ return True
+ else:
+ raise Exception("Unknown resource type")
def start_copy_to(self, filepath=None):
if filepath in self._copy_targets:
@@ -559,9 +496,9 @@ class SlaveMethods:
return filepath
- def copy_part_to(self, filepath, binary_data):
+ def copy_part_to(self, filepath, data):
if self._copy_targets[filepath]:
- self._copy_targets[filepath].write(binary_data.data)
+ self._copy_targets[filepath].write(data)
return True
return False
@@ -583,7 +520,7 @@ class SlaveMethods:
return True
def copy_part_from(self, filepath, buffsize):
- data = Binary(self._copy_sources[filepath].read(buffsize))
+ data = self._copy_sources[filepath].read(buffsize)
return data
def finish_copy_from(self, filepath):
@@ -607,26 +544,27 @@ class SlaveMethods:
logging.warning("====================================================")
logging.warning("Enabling use of NetworkManager on controller
request")
logging.warning("====================================================")
- val = lnst_config.get_option("environment", "use_nm")
- lnst_config.set_option("environment", "use_nm", True)
+ val = self._slave_config.get_option("environment", "use_nm")
+ self._slave_config.set_option("environment", "use_nm", True)
return val
def disable_nm(self):
logging.warning("=====================================================")
logging.warning("Disabling use of NetworkManager on controller
request")
logging.warning("=====================================================")
- val = lnst_config.get_option("environment", "use_nm")
- lnst_config.set_option("environment", "use_nm", False)
+ val = self._slave_config.get_option("environment", "use_nm")
+ self._slave_config.set_option("environment", "use_nm",
False)
return val
def restore_nm_option(self):
- val = lnst_config.get_option("environment", "use_nm")
+ val = self._slave_config.get_option("environment", "use_nm")
if val == self._bkp_nm_opt_val:
return val
logging.warning("=========================================")
logging.warning("Restoring use_nm option to original value")
logging.warning("=========================================")
- lnst_config.set_option("environment", "use_nm",
self._bkp_nm_opt_val)
+ self._slave_config.set_option("environment", "use_nm",
+ self._bkp_nm_opt_val)
return val
def add_namespace(self, netns):
@@ -724,40 +662,40 @@ class SlaveMethods:
del self._net_namespaces[netns]
return True
- def set_if_netns(self, if_id, netns):
- netns_pid = self._net_namespaces[netns]["pid"]
-
- device = self._if_manager.get_mapped_device(if_id)
- dev_name = device.get_name()
- device.set_netns(netns)
- hwaddr = device.get_hwaddr()
-
- exec_cmd("ip link set %s netns %d" % (dev_name, netns_pid))
- msg = {"type": "command", "method_name":
"map_if_by_hwaddr",
- "args": [if_id, hwaddr]}
- self._server_handler.send_data_to_netns(netns, msg)
- result = self._slave_server.wait_for_result(netns)
- return result
-
- def return_if_netns(self, if_id):
- device = self._if_manager.get_mapped_device(if_id)
- if device.get_netns() == None:
- dev_name = device.get_name()
- ppid = os.getppid()
- exec_cmd("ip link set %s netns %d" % (dev_name, ppid))
- self._if_manager.unmap_if(if_id)
- return True
- else:
- netns = device.get_netns()
- msg = {"type": "command", "method_name":
"return_if_netns",
- "args": [if_id]}
- self._server_handler.send_data_to_netns(netns, msg)
- result = self._slave_server.wait_for_result(netns)
- if result["result"] != True:
- raise Exception("Return from netns failed.")
-
- device.set_netns(None)
- return True
+ # def set_if_netns(self, if_id, netns):
+ # netns_pid = self._net_namespaces[netns]["pid"]
+
+ # device = self._if_manager.get_mapped_device(if_id)
+ # dev_name = device.get_name()
+ # device.set_netns(netns)
+ # hwaddr = device.get_hwaddr()
+
+ # exec_cmd("ip link set %s netns %d" % (dev_name, netns_pid))
+ # msg = {"type": "command", "method_name":
"map_if_by_hwaddr",
+ # "args": [if_id, hwaddr]}
+ # self._server_handler.send_data_to_netns(netns, msg)
+ # result = self._slave_server.wait_for_result(netns)
+ # return result
+
+ # def return_if_netns(self, if_id):
+ # device = self._if_manager.get_mapped_device(if_id)
+ # if device.get_netns() == None:
+ # dev_name = device.get_name()
+ # ppid = os.getppid()
+ # exec_cmd("ip link set %s netns %d" % (dev_name, ppid))
+ # self._if_manager.unmap_if(if_id)
+ # return True
+ # else:
+ # netns = device.get_netns()
+ # msg = {"type": "command", "method_name":
"return_if_netns",
+ # "args": [if_id]}
+ # self._server_handler.send_data_to_netns(netns, msg)
+ # result = self._slave_server.wait_for_result(netns)
+ # if result["result"] != True:
+ # raise Exception("Return from netns failed.")
+
+ # device.set_netns(None)
+ # return True
def add_br_vlan(self, if_id, br_vlan_info):
dev = self._if_manager.get_mapped_device(if_id)
@@ -847,48 +785,8 @@ class SlaveMethods:
brt.set_state(br_state_info)
return True
- def set_speed(self, if_id, speed):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.set_speed(speed)
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
- def set_autoneg(self, if_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.set_autoneg()
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
- def wait_interface_init(self):
- self._if_manager.wait_interface_init()
- return True
-
- def slave_add(self, if_id, slave_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.slave_add(slave_id)
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
- def slave_del(self, if_id, slave_id):
- dev = self._if_manager.get_mapped_device(if_id)
- if dev is not None:
- dev.slave_del(slave_id)
- else:
- logging.error("Device with id '%s' not found." % if_id)
- return False
- return True
-
class ServerHandler(ConnectionHandler):
- def __init__(self, addr):
+ def __init__(self, addr, slave_config):
super(ServerHandler, self).__init__()
self._netns_con_mapping = {}
try:
@@ -902,13 +800,34 @@ class ServerHandler(ConnectionHandler):
self._netns = None
self._c_socket = None
+ self._c_dev = None
self._if_manager = None
- self._security = lnst_config.get_section_values("security")
+ self._security = slave_config.get_section_values("security")
def set_if_manager(self, if_manager):
self._if_manager = if_manager
+ self._update_c_dev()
+
+ def _update_c_dev(self):
+ if self._c_dev:
+ self._c_dev.enable()
+ self._c_dev = None
+
+ if self._if_manager is not None:
+ ctl_socket = self.get_ctl_sock()
+ ctl_addr = ctl_socket._socket.getsockname()[0]
+ matched_dev = None
+ for dev in self._if_manager.get_devices():
+ for ip in dev.ips:
+ if ip.addr == ctl_addr:
+ matched_dev = dev
+ break
+ if matched_dev:
+ break
+ self._c_dev = matched_dev
+ matched_dev.disable()
def accept_connection(self):
self._c_socket, addr = self._s_socket.accept()
@@ -932,9 +851,9 @@ class ServerHandler(ConnectionHandler):
def set_ctl_sock(self, sock):
if self._c_socket != None:
- self._c_socket.close()
- self._c_socket = None
+ self.close_c_sock()
self._c_socket = sock
+ self._update_c_dev()
self.add_connection(self._c_socket[1], self._c_socket[0])
def close_s_sock(self):
@@ -946,9 +865,14 @@ class ServerHandler(ConnectionHandler):
self.remove_connection(self._c_socket[0])
self._c_socket = None
+ if self._c_dev:
+ self._c_dev.enable()
+ self._c_dev = None
+
def check_connections(self):
msgs = super(ServerHandler, self).check_connections()
- if 'netlink' not in self._connection_mapping:
+ if 'netlink' not in self._connection_mapping and\
+ self._if_manager is not None:
self._if_manager.reconnect_netlink()
self.add_connection('netlink', self._if_manager.get_nl_socket())
return msgs
@@ -970,7 +894,7 @@ class ServerHandler(ConnectionHandler):
addr = self._c_socket[1]
if self.get_connection(addr) == None:
logging.info("Lost controller connection.")
- self._c_socket = None
+ self.close_c_sock()
return messages
def get_messages_from_con(self, con_id):
@@ -1026,23 +950,72 @@ class ServerHandler(ConnectionHandler):
self._connections.remove(con)
self._netns_con_mapping = {}
+
+def device_to_deviceref(obj):
+ try:
+ Device = Devices.Device
+ except:
+ return obj
+
+ if isinstance(obj, Device):
+ dev_ref = DeviceRef(obj.if_index)
+ return dev_ref
+ elif isinstance(obj, dict):
+ new_dict = {}
+ for key, value in obj.items():
+ new_dict[key] = device_to_deviceref(value)
+ return new_dict
+ elif isinstance(obj, list):
+ new_list = []
+ for value in obj:
+ new_list.append(device_to_deviceref(value))
+ return new_list
+ elif isinstance(obj, tuple):
+ new_list = []
+ for value in obj:
+ new_list.append(device_to_deviceref(value))
+ return tuple(new_list)
+ else:
+ return obj
+
+def deviceref_to_device(if_manager, obj):
+ if isinstance(obj, DeviceRef):
+ dev = if_manager.get_device(obj.if_index)
+ return dev
+ elif isinstance(obj, dict):
+ new_dict = {}
+ for key, value in obj.items():
+ new_dict[key] = deviceref_to_device(if_manager, value)
+ return new_dict
+ elif isinstance(obj, list):
+ new_list = []
+ for value in obj:
+ new_list.append(deviceref_to_device(if_manager, value))
+ return new_list
+ elif isinstance(obj, tuple):
+ new_list = []
+ for value in obj:
+ new_list.append(deviceref_to_device(if_manager, value))
+ return tuple(new_list)
+ else:
+ return obj
+
class NetTestSlave:
- def __init__(self, log_ctl):
+ def __init__(self, log_ctl, slave_config):
+ self._slave_config = slave_config
die_when_parent_die()
- self._cmd_context = NetTestCommandContext()
- port = lnst_config.get_option("environment", "rpcport")
+ self._job_context = JobContext()
+ port = slave_config.get_option("environment", "rpcport")
logging.info("Using RPC port %d." % port)
- self._server_handler = ServerHandler(("", port))
- self._if_manager = InterfaceManager(self._server_handler)
-
- self._server_handler.set_if_manager(self._if_manager)
+ self._server_handler = ServerHandler(("", port), slave_config)
self._net_namespaces = {}
- self._methods = SlaveMethods(self._cmd_context, log_ctl,
- self._if_manager, self._net_namespaces,
- self._server_handler, self)
+ self._methods = SlaveMethods(self._job_context, log_ctl,
+ self._net_namespaces,
+ self._server_handler, slave_config,
+ self)
self.register_die_signal(signal.SIGHUP)
self.register_die_signal(signal.SIGINT)
@@ -1052,9 +1025,6 @@ class NetTestSlave:
self._log_ctl = log_ctl
- self._server_handler.add_connection('netlink',
- self._if_manager.get_nl_socket())
-
def run(self):
while not self._finished:
if self._server_handler.get_ctl_sock() == None:
@@ -1063,9 +1033,9 @@ class NetTestSlave:
logging.info("Waiting for connection.")
self._server_handler.accept_connection()
except (socket.error, SecSocketException):
+ log_exc_traceback()
continue
- self._log_ctl.set_connection(
- self._server_handler.get_ctl_sock())
+ self._log_ctl.set_connection(self._server_handler.get_ctl_sock())
msgs = self._server_handler.get_messages()
@@ -1092,24 +1062,29 @@ class NetTestSlave:
if msg["type"] == "command":
method = getattr(self._methods, msg["method_name"], None)
if method != None:
+ if_manager = self._methods._if_manager
+ if if_manager is not None:
+ args = deviceref_to_device(if_manager, msg["args"])
+ kwargs = deviceref_to_device(if_manager, msg["kwargs"])
+ else:
+ args = msg["args"]
+ kwargs = msg["kwargs"]
+
try:
- result = method(*msg["args"])
- except:
+ result = method(*args, **kwargs)
+ except LnstError as e:
log_exc_traceback()
- type, value, tb = sys.exc_info()
- exc_trace = ''.join(traceback.format_exception(type,
- value, tb))
- response = {"type": "exception",
"Exception": value}
+ response = {"type": "exception",
"Exception": e}
self._server_handler.send_data_to_ctl(response)
return
- if result != None:
- response = {"type": "result", "result":
result}
- self._server_handler.send_data_to_ctl(response)
+ response = {"type": "result", "result":
result}
+ response = device_to_deviceref(response)
+ self._server_handler.send_data_to_ctl(response)
else:
- err = "Method '%s' not supported." %
msg["method_name"]
- response = {"type": "error", "err": err}
+ err = LnstError("Method '%s' not supported." %
msg["method_name"])
+ response = {"type": "exception",
"Exception": err}
self._server_handler.send_data_to_ctl(response)
elif msg["type"] == "log":
logger = logging.getLogger()
@@ -1122,48 +1097,36 @@ class NetTestSlave:
else:
logging.debug("Recieved an exception from foreground command")
logging.debug(msg["Exception"])
- cmd = self._cmd_context.get_cmd(msg["cmd_id"])
- cmd.join()
- self._cmd_context.del_cmd(cmd)
+ job = self._job_context.get_cmd(msg["job_id"])
+ job.join()
+ self._job_context.del_cmd(job)
+ self._server_handler.send_data_to_ctl(msg)
+ elif msg["type"] == "job_finished":
+ job = self._job_context.get_job(msg["job_id"])
+ job.join()
+
+ job.set_finished(msg["result"])
self._server_handler.send_data_to_ctl(msg)
- elif msg["type"] == "result":
- if msg["cmd_id"] == None:
- del msg["cmd_id"]
- self._server_handler.send_data_to_ctl(msg)
- cmd = self._cmd_context.get_cmd(None)
- cmd.join()
- cmd.set_result_sent()
- else:
- cmd = self._cmd_context.get_cmd(msg["cmd_id"])
- cmd.join()
- del msg["cmd_id"]
-
- cmd.set_result(msg["result"])
- if cmd.finished():
- msg["result"] = cmd.get_result()
- self._server_handler.send_data_to_ctl(msg)
- cmd.set_result_sent()
elif msg["type"] == "netlink":
- self._if_manager.handle_netlink_msgs(msg["data"])
+ if_manager = self._methods._if_manager
+ if if_manager is not None:
+ if_manager.handle_netlink_msgs(msg["data"])
elif msg["type"] == "from_netns":
self._server_handler.send_data_to_ctl(msg["data"])
elif msg["type"] == "to_netns":
netns = msg["netns"]
try:
self._server_handler.send_data_to_netns(netns, msg["data"])
- except:
+ except LnstError as e:
log_exc_traceback()
- type, value, tb = sys.exc_info()
- exc_trace = ''.join(traceback.format_exception(type,
- value, tb))
- response = {"type": "exception",
"Exception": value}
+ response = {"type": "exception",
"Exception": e}
self._server_handler.send_data_to_ctl(response)
return
else:
raise Exception("Recieved unknown command")
- pipes = self._cmd_context.get_read_pipes()
+ pipes = self._job_context.get_parent_pipes()
self._server_handler.update_connections(pipes)
def register_die_signal(self, signum):
--
2.13.0