From: Ondrej Lichtner olichtne@redhat.com
Defines classes of jobs to be run on a slave. Based on the old lnst.Common.NetTestCommand code, but simplified to work with newer 'Job' implementation that started on the Controller.
Defines a Job class that is used as a container object by the Slave to fork and run a GenericJob object. For now only a ShellExecJob and a ModuleJob are supported. Previous "control" commands are now only methods of the Job container object.
Signed-off-by: Ondrej Lichtner olichtne@redhat.com --- lnst/Slave/Job.py | 251 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 lnst/Slave/Job.py
diff --git a/lnst/Slave/Job.py b/lnst/Slave/Job.py new file mode 100644 index 0000000..234cd61 --- /dev/null +++ b/lnst/Slave/Job.py @@ -0,0 +1,251 @@ +""" +This module defines classes of jobs to be run on a slave + +Copyright 2017 Red Hat, Inc. +Licensed under the GNU General Public License, version 2 as +published by the Free Software Foundation; see COPYING for details. +""" + +__author__ = """ +olichtne@redhat.com (Ondrej Lichtner) +""" + +import os +import re +import sys +import signal +import logging +import multiprocessing +from lnst.Common.JobError import JobError +from lnst.Common.ExecCmd import exec_cmd, ExecCmdFail +from lnst.Common.ConnectionHandler import send_data +from lnst.Common.Logs import log_exc_traceback + +def get_job_class(what): + if what["type"] == "shell": + return ShellExecJob(what) + elif what["type"] == "module": + return ModuleJob(what) + else: + logging.error("Unknown job type "%s"" % what["type"]) + raise JobError("Unknown command type "%s"" % what["type"]) + +class JobContext(object): + def __init__(self): + self._dict = {} + + def add_job(self, job): + self._dict[job.get_id()] = job + + def del_job(self, job): + del self._dict[job.get_id()] + + def get_job(self, id): + if id in self._dict: + return self._dict[id] + else: + return None + + def _kill_all_jobs(self): + for id in self._dict: + self._dict[id].kill(signal=signal.SIGKILL) + + def cleanup(self): + logging.debug("Cleaning up leftover processes.") + self._kill_all_jobs() + self._dict = {} + + def get_parent_pipes(self): + pipes = {} + for key in self._dict: + pipe = self._dict[key].get_parent_pipe() + if pipe != None: + pipes[key] = pipe + return pipes + +class Job(object): + def __init__(self, what, log_ctl): + self._job_cls = get_job_class(what) + self._what = what + + self._id = what["job_id"] + self._parent_pipe = None + self._child_pipe = None + self._process = None + self._pid = None + self._log_ctl = log_ctl + self._finished = False + + def get_id(self): + return self._id + + def get_parent_pipe(self): + return self._parent_pipe + + def run(self): + self._parent_pipe, self._child_pipe = multiprocessing.Pipe() + self._process = multiprocessing.Process(target=self._run) + + self._process.daemon = False + self._process.start() + self._pid = self._process.pid + + logging.info("Running job %d with pid "%d"" % (self._id, self._pid)) + return True + + def _run(self): + os.setpgrp() + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + self._log_ctl.disable_logging() + self._log_ctl.set_connection(self._child_pipe) + + result = {} + try: + self._job_cls.run() + except: + log_exc_traceback() + type, value, tb = sys.exc_info() + data = {"Exception": "%s" % value} + # self._job_cls.set_fail(data) + finally: + res_data = self._job_cls.get_result() + result["type"] = "job_finished" + result["job_id"] = self._id + result["result"] = res_data + + send_data(self._child_pipe, result) + self._child_pipe.close() + + def kill(self, signal=signal.SIGKILL): + if self._finished: + return True + try: + logging.debug("Sending signal %s to pid %d" % (signal, self._pid)) + os.kill(self._pid, signal) + return True + except OSError as exc: + logging.error(str(exc)) + return False + + def join(self): + self._process.join() + + def set_finished(self, result): + self._finished = True + self._result = result + + def get_result(self): + return self._result + +class GenericJob(object): + def __init__(self, what): + self._what = what + self._result = {"passed": False, + "res_data": None, + "msg": None} + + def run(self): + raise JobError("Method run must be defined.") + + def get_result(self): + return self._result + + # def format_res_data(self, res_data, level=0): + # self._check_res_data(res_data) + # formatted_data = "" + # if res_data: + # max_key_len = 0 + # for key in res_data.keys(): + # if len(key) > max_key_len: + # max_key_len = len(key) + # for key, value in res_data.iteritems(): + # if type(value) == dict: + # formatted_data += level*4*" " + str(key) + ":\n" + # formatted_data += self.format_res_data(value, level+1) + # if type(value) == list: + # formatted_data += level*4*" " + str(key) + ":\n" + # for i in range(0, len(value)): + # formatted_data += (level+1)*4*" " +\ + # "item %d:" % (i+1) + "\n" + # formatted_data += self.format_res_data(value[i], + # level+2) + # else: + # formatted_data += level*4*" " + str(key) + ":" + \ + # (max_key_len-len(key))*" " + \ + # "\t" + str(value) + "\n" + + # return formatted_data + + # def _check_res_data(self, res_data): + # name_start_char = u":A-Z_a-z\xC0-\xD6\xD8-\xF6\xF8-\u02FF"\ + # u"\u0370-\u037D\u037F-\u1FFF\u200C-\u200D"\ + # u"\u2070-\u218F\u2C00-\u2FEF\u3001-\uD7FF"\ + # u"\uF900-\uFDCF\uFDF0-\uFFFD\U00010000-\U000EFFFF" + # name_char = name_start_char + u"-.0-9\xB7\u0300-\u036F\u203F-\u2040" + # name = u"[%s]([%s])*$" % (name_start_char, name_char) + # char_data = u"[^<&]*" + # if isinstance(res_data, dict): + # for key in res_data: + # if not re.match(name, key, re.UNICODE): + # msg = "'%s' can't be used as an xml element name!" % key + # raise JobError(msg) + # else: + # self._check_res_data(res_data[key]) + # elif isinstance(res_data, list): + # for i in res_data: + # self._check_res_data(i) + # else: + # try: + # string = str(res_data) + # except: + # msg = "res_data can only contain dictionaries, lists or "\ + # "stringable objects!" + # raise JobError(msg) + # if not re.match(char_data, string, re.UNICODE): + # msg = "'%s' can't be used as character data in xml!" % string + # raise JobError(msg) + + # def _format_cmd_res_header(self): + # if self._what["netns"] != None: + # netns = "(%s) " % self._what["netns"] + # else: + # netns = "" + + # return "%-9s" % (self._what["type"] + netns) + +class ShellExecJob(GenericJob): + def run(self): + try: + stdout, stderr = exec_cmd(self._what["command"], self._what["json"]) + self._result["passed"] = True + self._result["res_data"] = {"stdout": stdout, "stderr": stderr} + except ExecCmdFail as e: + self._result["passed"] = False + self._result["res_data"] = res_data = {"stdout": e.get_stdout(), + "stderr": e.get_stderr()} + + # def _format_cmd_res_header(self): + # cmd_type = self._what["type"] + # cmd_val = self._what["command"] + + # if self._what["netns"] != None: + # netns = "(%s) " % self._what["netns"] + # else: + # netns = "" + + # cmd = "%-9scmd: "%s"" %(cmd_type + netns, cmd_val) + # return cmd + +class ModuleJob(GenericJob): + def run(self): + try: + self._result["passed"] = self._what["module"].run() + self._result["res_data"] = self._what["module"].get_res_data() + except Exception as e: + self._result["passed"] = False + self._result["res_data"] = {"exception": str(e)} + # self._result["res_data"] = {"stdout": e.get_stdout(), + # "stderr": e.get_stderr()}