From: Ondrej Lichtner olichtne@redhat.com
This commit refactors the wait_for_condition method method to improve maintainability and readability. It also removes the unused wait_for_job_finish method and unifies handling of machine disconnects. Timeout handling is now part of this method instead of in the Machine class where it doesn't make as much sense...
If a slave that isn't mapped (and is therefore not used by the currently running recipe) the information is logged but the application doesn't die with an exception anymore.
This includes a refactoring of the wait_for_* methods of the Machine class, that now just delegates this functionality to the message dispatcher.
Signed-off-by: Ondrej Lichtner olichtne@redhat.com --- lnst/Controller/Machine.py | 67 +++++----------- lnst/Controller/MessageDispatcher.py | 114 +++++++++++++++++---------- 2 files changed, 90 insertions(+), 91 deletions(-)
diff --git a/lnst/Controller/Machine.py b/lnst/Controller/Machine.py index 1265265..32d933f 100644 --- a/lnst/Controller/Machine.py +++ b/lnst/Controller/Machine.py @@ -330,10 +330,6 @@ class Machine(object): self.cleanup_devices() raise
- def _timeout_handler(self, signum, frame): - msg = "Timeout expired on machine %s" % self.get_id() - raise MachineError(msg) - def _get_base_classes(self, cls): new_bases = [cls] + list(cls.__bases__) bases = [] @@ -379,59 +375,34 @@ class Machine(object): return res
def wait_for_job(self, job, timeout): - res = True if job.id not in self._jobs: raise MachineError("No job '%s' running on Machine %s" % (job.id, self._id))
- prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler) - signal.alarm(timeout) - - try: - if timeout > 0: - logging.info("Waiting for Job %d on Host %s for %d seconds." % - (job.id, self._id, timeout)) - elif timeout == 0: - logging.info("Waiting for Job %d on Host %s." % - (job.id, self._id)) + if timeout > 0: + logging.info("Waiting for Job %d on Host %s for %d seconds." % + (job.id, self._id, timeout)) + elif timeout == 0: + logging.info("Waiting for Job %d on Host %s." % + (job.id, self._id))
- def condition(): - return job.finished + def condition(): + return job.finished
- self._msg_dispatcher.wait_for_condition(condition) - except MachineError as exc: - logging.error(str(exc)) - res = False - - signal.alarm(0) - signal.signal(signal.SIGALRM, prev_handler) - - return res + return self._msg_dispatcher.wait_for_condition(condition, timeout)
def wait_for_tmp_devices(self, timeout): - res = False - prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler) - signal.alarm(timeout) + if timeout > 0: + logging.info("Waiting for Device creation Host %s for %d seconds." % + (self._id, timeout)) + elif timeout == 0: + logging.info("Waiting for Device creation on Host %s." % + (self._id))
- try: - if timeout > 0: - logging.info("Waiting for Device creation Host %s for %d seconds." % - (self._id, timeout)) - elif timeout == 0: - logging.info("Waiting for Device creation on Host %s." % - (self._id)) - - def condition(): - return len(self._tmp_device_database) <= 0 - - self._msg_dispatcher.wait_for_condition(condition) - except MachineError as exc: - logging.error(str(exc)) - res = False - - signal.alarm(0) - signal.signal(signal.SIGALRM, prev_handler) - return res + def condition(): + return len(self._tmp_device_database) <= 0 + + return self._msg_dispatcher.wait_for_condition(condition, timeout)
def job_finished(self, msg): job_id = msg["job_id"] diff --git a/lnst/Controller/MessageDispatcher.py b/lnst/Controller/MessageDispatcher.py index 6b12d68..222bc80 100644 --- a/lnst/Controller/MessageDispatcher.py +++ b/lnst/Controller/MessageDispatcher.py @@ -16,6 +16,7 @@ olichtne@redhat.com (Ondrej Lichtner)
import logging import copy +import signal from lnst.Common.ConnectionHandler import send_data from lnst.Common.ConnectionHandler import ConnectionHandler from lnst.Common.Parameters import Parameters, DeviceParam @@ -81,6 +82,13 @@ def remote_device_to_deviceref(obj): class ConnectionError(ControllerError): pass
+class WaitTimeoutError(ControllerError): + pass + +def _timeout_handler(signum, frame): + msg = "Timeout expired" + raise WaitTimeoutError(msg) + class MessageDispatcher(ConnectionHandler): def __init__(self, log_ctl): super(MessageDispatcher, self).__init__() @@ -104,9 +112,6 @@ class MessageDispatcher(ConnectionHandler): connected_slaves = self._connection_mapping.keys()
messages = self.check_connections() - - remaining_slaves = self._connection_mapping.keys() - for msg in messages: if msg[1]["type"] == "result" and msg[0] == machine: if result is not None: @@ -117,61 +122,69 @@ class MessageDispatcher(ConnectionHandler): else: self._process_message(msg)
+ remaining_slaves = self._connection_mapping.keys() if connected_slaves != remaining_slaves: - disconnected_slaves = set(connected_slaves) -\ - set(remaining_slaves) - msg = "Slaves " + str(list(disconnected_slaves)) + \ - " disconnected from the controller." - raise ConnectionError(msg) + self._handle_disconnects(set(connected_slaves)- + set(remaining_slaves))
if result is not None: return deviceref_to_remote_device(machine, result["result"])
- def wait_for_job_finish(self, job): - def condition_check(): - return job.finished - self.wait_for_condition(condition_check) - return True - - def wait_for_condition(self, condition_check): - wait = True - while wait: - connected_slaves = self._connection_mapping.keys() - - messages = self.check_connections(timeout=1) - - remaining_slaves = self._connection_mapping.keys() - - for msg in messages: - self._process_message(msg) - wait = wait and not condition_check() - - wait = wait and not condition_check() - - if connected_slaves != remaining_slaves: - disconnected_slaves = set(connected_slaves) -\ - set(remaining_slaves) - msg = "Slaves " + str(list(disconnected_slaves)) + \ - " disconnected from the controller." - raise ConnectionError(msg) - return True + def wait_for_condition(self, condition_check, timeout=0): + res = True + prev_handler = signal.signal(signal.SIGALRM, _timeout_handler) + + def condition_wrapper(): + res = condition_check() + if res: + signal.alarm(0) + signal.signal(signal.SIGALRM, prev_handler) + logging.debug("Condition passed, disabling timeout alarm") + return res + + try: + signal.alarm(timeout) + + wait = True + while wait: + connected_slaves = self._connection_mapping.keys() + messages = self.check_connections(timeout=1) + for msg in messages: + try: + self._process_message(msg) + wait = wait and not condition_wrapper() + except WaitTimeoutError as exc: + logging.error("Waiting for condition timed out!") + res = False + wait = False + + wait = wait and not condition_wrapper() + + remaining_slaves = self._connection_mapping.keys() + if connected_slaves != remaining_slaves: + self._handle_disconnects(set(connected_slaves)- + set(remaining_slaves)) + except WaitTimeoutError as exc: + logging.error("Waiting for condition timed out!") + res = False + finally: + signal.alarm(0) + signal.signal(signal.SIGALRM, prev_handler) + + return res
def handle_messages(self): connected_slaves = self._connection_mapping.keys()
messages = self.check_connections()
- remaining_slaves = self._connection_mapping.keys() - for msg in messages: self._process_message(msg)
+ remaining_slaves = self._connection_mapping.keys() if connected_slaves != remaining_slaves: - disconnected_slaves = set(connected_slaves) -\ - set(remaining_slaves) - msg = "Slaves " + str(list(disconnected_slaves)) + \ - " disconnected from the controller." - raise ConnectionError(msg) + self._handle_disconnects(set(connected_slaves)- + set(remaining_slaves)) return True
def _process_message(self, message): @@ -196,6 +209,21 @@ class MessageDispatcher(ConnectionHandler): msg = "Unknown message type: %s" % message[1]["type"] raise ConnectionError(msg)
+ def _handle_disconnects(self, disconnected_slaves): + disconnected_slaves = set(disconnected_slaves) + for slave in list(disconnected_slaves): + if not slave.get_mapped(): + logging.warn("Slave {} soft-disconnected from the " + "controller.".format(slave.get_id())) + disconnected_slaves.remove(slave) + + if len(disconnected_slaves) > 0: + disconnected_names = [x.get_id() + for x in disconnected_slaves] + msg = "Slaves " + str(list(disconnected_names)) + \ + " hard-disconnected from the controller." + raise ConnectionError(msg) + def disconnect_slave(self, machine): soc = self.get_connection(machine) self.remove_connection(soc)