Hi all,
for the past couple of weeks I've been going over the meeting recordings
we've had wrt the new Python API of LNST. I've been collecting
everything into a single file that I'm appending to this email. I'm
sending it here so that everyone can join the discussion before the
implementation itself begins. I'll warn you thougn... it's LONG :)
!!!NOTE it's not complete yet, I'm sending it now because we have an
upstream meeting planned for later today, namely Device/Interface API is
not complete.
The structure of the file is following:
1. commented pseudo code of how Test Modules will look like - they'll be
instantiated on the Controller and send ad-hoc to the slave where
they'll be executed --> no more synchronization on test start...
2. commented pseudo code of how Tasks will look like, they'll define
both the network requirements and the test execution as well.
3. short rough idea of how the tests/recipes will be executed.
4. 1st version of the API "specification"/documentation. Here I tried to
go through the current *API objects we currently have and make them more
"Pythonic", thinking of how they'll be used from a Task. I tried writing
it as class-method-attribute definitions with some documentation so
hopefully it makes some sense... Like I've said before,
Device/Interfaces are not complete so there's a lot missing there.
Please take a look and provide feedback. I'm sure there are other parts
in addition to Device/Interface APIs that are missing something so I'll
appreciate any help :).
================================================================================
new_api file:
1. test modules
class BaseTestModule:
def __init__(self, **kwargs):
#by defaults loads the params into self.params - no checks pseudocode:
for x in vars(self):
if isinstance(x, BaseType):
param_class = self.getattr(x)
try:
val = kwargs[x]
except KeyError:
if param_class.is_mandatory():
raise TestModuleError("Option x is mandatory")
self.setattr(x.params, param_class.construct(val))
del kwargs[x]
for x in kwargs.keys():
log.error("Undefined parameter x")
if len(kwargs):
raise TestModuleError("Undefined TestModule parameters")
def run():
#needs to be over-ridden - throw an exception to notify the test developer
class MyTest(BaseTestModule):
param = ParamType()
param2 = ParamType2()
param3 = Multiparam(ParamType())
#optional __init__
#def __init__(self, **kwargs):
#super(MyTest).__init__(kwargs)
#additional tester defined checks
def run():
#do my test
#parameters available in self.params
#in Task:
import lnst
#module lnst.modules will dynamically look for module classes in configured
#locations, similar to how we do it now
ping = lnst.modules.Ping(dst=m2.if1.ip[0], count=100, interval=0.1)
m1.run(ping)
================================================
2. Tasks:
class BaseTask(object):
def __init__(self):
#initialize instance specific requirements
self.requirements = Requirements()
for x in dir(self):
val = getattr(self, x)
setattr(self.requirements, x, val)
def test():
raise Exception("Method test MUST be defined.")
class MyTask(lnst.BaseTask):
#class-wide definition of requirements
m1 = HostSel(param="val", ...)
m1.if1 = IfaceSel(l2net="xyz", param="val", ...)
m2 = HostSel(param="val", ...)
m2.if1 = IfaceSel(l2net="xyz", param="val", ...)
def __init__(self, **kwargs):
super(self, lnst.BaseTask).__init__()
#do something with kwargs
#adjust instance specific requirements
self.requirements.m3 = HostSel(...)
def test():
self.matched.m1.run(Module)
self.matched.m1.run("command")
#or
def test(m1, m2):
m1.run(Module)
m2.run("command")
================================================
3. Running Tasks:
from MyTasks import MyTask
import lnst
task_instance = MyTask(params)
lnst(args)
lnst.run(task_instance)
OR
lnst-ctl -d run MyTask.py -- task_params
# looks for NAME class in the NAME.py file (MyTask in this case for which
# the condition "isinstance(NAME, BaseTask)" must be True
# could also run for all classes in the file where "isinstance(x, BaseTask)" is
# True. with the option to restrict to specific task class (or just run the
# first one?)... lnst-ctl rewritten to do the same as manually running the
# task from it's own python script
First do the second option - easier since we have this already, then refactor
the controller to create the lnst controller for the first option.
Aliases lose meaning - they're parameters passed to the MyTask __init__, when
using the lnst-ctl CLI, use "-- task_params"?? might not work for multiple tasks,
================================================
4. Tester facing API, inside the test() method:
Host objects available in self.matched.selector_name:
class Host: #HostAPI??? name can change
#attributes:
# dynamically filled object of Host attributes such as architecture and
# so on. Use example in test() would look like this:
# if host.params.arch == "x86":
# I separated this into the "params" object so I can overwrite its
# __getattr__ method and return None/UnknownParam exception for unknown
# parameters, and to avoid name conflicts with other attributes
params = object()
# dynamically filled object of NetDevice objects accessible directly as the
# object attributes:
# host.ifaces.eth0.set_ip(...)
# I separated this into the "ifaces" object to avoid name conflicts with
# other attributes
# creation of new NetDevices should be possible through simple assignement:
# m1.devs.new_team0 = TeamDevice(...)
# assignement of an incompatible Type or to an existing Device object will
# return an exception
# assignment of None? or del devs.new_team0 to deconfigure the device?
devs = object()
def run(what, bg=False, fail=False, timeout=60, path="", json=False, netns)
# will run "what" on the remote host
# "what" is either a Module object, or a string command that will be
# executed as a bash command
# "bg" when True, runs "what" on background - the run() call
# immediately returns, and "timeout" is ignored, the background
# process can be controlled through the returned Job object
# "fail" if True then the Job is expected to fail, and will be reported
# as PASSed if it does
# "timeout" in seconds, determines how long to block test execution for
# before killing the Job. Only when running in foreground
# "path" changes the current working directory to the specified path
# before "what" is executed and changes back after execution is
# finished.
# "tool" changes the current working directory to the directory of a
# speficied test_tool before "what" is executed and changes back
# after execution is finished.
# !!!!!!! this is from the current API and i'm not yet sure how we
# !!!!!!! want to handle those... so for now I'll keep it
# "json" if True will attempt to parse the returned stdout of the Job
# as json into a dictionary
# "netns" Job will be run in the specified network namespace
# Returns a Job object
def config(option, value)
# copied from old API, provides a shortcut for "echo $value # >/proc/or/sys/path"
# and returns the original value when the test is finished
def sync_resources(srcpath="", dstpath="", recursive=False)
# copies the specified file from the controller to the specified
# destination path, if recursive == True and srcpath refers to a
# directory it copies the entire directory
def {enable, disable}_service(service)
# copied from old API, enables or disables the specified service
def add_{bond, bridge,...}(params)
# this is how we can currently dynamically create net devices on the
# hosts. Even with the new assignment-based approach this could still,
# be usefull, though the method would need to be dynamically created to
# avoid useless work when adding a new netdev type. Something like:
# add_device("name", "Type", params) which would then do
# self.devs.name = TypeDevice(params) ??
def del_device(name)
# removes the specified device, probably easier (more logical?) to do
# this then "devs.name = None" and "del devs.name" would be unreliable
class Device: #DeviceAPI, InterfaceAPI? name can change...
# attributes:
# dynamically created Device attributes such as driver and so on. Use
# example in test() would look like this:
# if host.devs.eth0.driver == "ixgbe":
# achieved through rewriting of the __getattr__ method of the Device class
# should return None or throw UnknownParam exception for unknown parameters
# this should directly mirror the Device objects that are managed by the
# InterfaceManager on the Slave
# eg:
driver = something
mtu = something
ips = [IpAddress, ...]
class Job: #ProcessAPI? name can change...
#attributes:
# True if the Job finished, False if it's still running in the background
finished = bool
# contains the result data returned by the Job, None for bash commands
result = object
# contain the stdout and stderr generated by the job, None for Module Jobs
stdout = ""
stderr = ""
# simple True/False value indicating success/failure of the Job
passed = bool
def wait(timeout=0):
# for background jobs, will wait until the job finished
# "timeout" in seconds, determines how long to wait for. After timeout
# reached, nothing happens, status of the job can be checked with the
# "finished" attribute. If timeout=0, then wait forever.
def kill(signalnum=signal.SIGKILL):
# sends the specified signal to the process of the Job running in
# background
# "signalnum" the signal to be sent