From: Ondrej Lichtner <olichtne(a)redhat.com>
This commit refactors the wait_for_condition method method to improve
maintainability and readability. It also removes the unused
wait_for_job_finish method and unifies handling of machine disconnects.
Timeout handling is now part of this method instead of in the Machine
class where it doesn't make as much sense...
If a slave that isn't mapped (and is therefore not used by the currently
running recipe) the information is logged but the application doesn't
die with an exception anymore.
This includes a refactoring of the wait_for_* methods of the Machine
class, that now just delegates this functionality to the message
dispatcher.
Signed-off-by: Ondrej Lichtner <olichtne(a)redhat.com>
---
lnst/Controller/Machine.py | 67 +++++-----------
lnst/Controller/MessageDispatcher.py | 114 +++++++++++++++++----------
2 files changed, 90 insertions(+), 91 deletions(-)
diff --git a/lnst/Controller/Machine.py b/lnst/Controller/Machine.py
index 1265265..32d933f 100644
--- a/lnst/Controller/Machine.py
+++ b/lnst/Controller/Machine.py
@@ -330,10 +330,6 @@ class Machine(object):
self.cleanup_devices()
raise
- def _timeout_handler(self, signum, frame):
- msg = "Timeout expired on machine %s" % self.get_id()
- raise MachineError(msg)
-
def _get_base_classes(self, cls):
new_bases = [cls] + list(cls.__bases__)
bases = []
@@ -379,59 +375,34 @@ class Machine(object):
return res
def wait_for_job(self, job, timeout):
- res = True
if job.id not in self._jobs:
raise MachineError("No job '%s' running on Machine %s" %
(job.id, self._id))
- prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
- signal.alarm(timeout)
-
- try:
- if timeout > 0:
- logging.info("Waiting for Job %d on Host %s for %d seconds." %
- (job.id, self._id, timeout))
- elif timeout == 0:
- logging.info("Waiting for Job %d on Host %s." %
- (job.id, self._id))
+ if timeout > 0:
+ logging.info("Waiting for Job %d on Host %s for %d seconds." %
+ (job.id, self._id, timeout))
+ elif timeout == 0:
+ logging.info("Waiting for Job %d on Host %s." %
+ (job.id, self._id))
- def condition():
- return job.finished
+ def condition():
+ return job.finished
- self._msg_dispatcher.wait_for_condition(condition)
- except MachineError as exc:
- logging.error(str(exc))
- res = False
-
- signal.alarm(0)
- signal.signal(signal.SIGALRM, prev_handler)
-
- return res
+ return self._msg_dispatcher.wait_for_condition(condition, timeout)
def wait_for_tmp_devices(self, timeout):
- res = False
- prev_handler = signal.signal(signal.SIGALRM, self._timeout_handler)
- signal.alarm(timeout)
+ if timeout > 0:
+ logging.info("Waiting for Device creation Host %s for %d seconds."
%
+ (self._id, timeout))
+ elif timeout == 0:
+ logging.info("Waiting for Device creation on Host %s." %
+ (self._id))
- try:
- if timeout > 0:
- logging.info("Waiting for Device creation Host %s for %d
seconds." %
- (self._id, timeout))
- elif timeout == 0:
- logging.info("Waiting for Device creation on Host %s." %
- (self._id))
-
- def condition():
- return len(self._tmp_device_database) <= 0
-
- self._msg_dispatcher.wait_for_condition(condition)
- except MachineError as exc:
- logging.error(str(exc))
- res = False
-
- signal.alarm(0)
- signal.signal(signal.SIGALRM, prev_handler)
- return res
+ def condition():
+ return len(self._tmp_device_database) <= 0
+
+ return self._msg_dispatcher.wait_for_condition(condition, timeout)
def job_finished(self, msg):
job_id = msg["job_id"]
diff --git a/lnst/Controller/MessageDispatcher.py b/lnst/Controller/MessageDispatcher.py
index 6b12d68..222bc80 100644
--- a/lnst/Controller/MessageDispatcher.py
+++ b/lnst/Controller/MessageDispatcher.py
@@ -16,6 +16,7 @@ olichtne(a)redhat.com (Ondrej Lichtner)
import logging
import copy
+import signal
from lnst.Common.ConnectionHandler import send_data
from lnst.Common.ConnectionHandler import ConnectionHandler
from lnst.Common.Parameters import Parameters, DeviceParam
@@ -81,6 +82,13 @@ def remote_device_to_deviceref(obj):
class ConnectionError(ControllerError):
pass
+class WaitTimeoutError(ControllerError):
+ pass
+
+def _timeout_handler(signum, frame):
+ msg = "Timeout expired"
+ raise WaitTimeoutError(msg)
+
class MessageDispatcher(ConnectionHandler):
def __init__(self, log_ctl):
super(MessageDispatcher, self).__init__()
@@ -104,9 +112,6 @@ class MessageDispatcher(ConnectionHandler):
connected_slaves = self._connection_mapping.keys()
messages = self.check_connections()
-
- remaining_slaves = self._connection_mapping.keys()
-
for msg in messages:
if msg[1]["type"] == "result" and msg[0] == machine:
if result is not None:
@@ -117,61 +122,69 @@ class MessageDispatcher(ConnectionHandler):
else:
self._process_message(msg)
+ remaining_slaves = self._connection_mapping.keys()
if connected_slaves != remaining_slaves:
- disconnected_slaves = set(connected_slaves) -\
- set(remaining_slaves)
- msg = "Slaves " + str(list(disconnected_slaves)) + \
- " disconnected from the controller."
- raise ConnectionError(msg)
+ self._handle_disconnects(set(connected_slaves)-
+ set(remaining_slaves))
if result is not None:
return deviceref_to_remote_device(machine, result["result"])
- def wait_for_job_finish(self, job):
- def condition_check():
- return job.finished
- self.wait_for_condition(condition_check)
- return True
-
- def wait_for_condition(self, condition_check):
- wait = True
- while wait:
- connected_slaves = self._connection_mapping.keys()
-
- messages = self.check_connections(timeout=1)
-
- remaining_slaves = self._connection_mapping.keys()
-
- for msg in messages:
- self._process_message(msg)
- wait = wait and not condition_check()
-
- wait = wait and not condition_check()
-
- if connected_slaves != remaining_slaves:
- disconnected_slaves = set(connected_slaves) -\
- set(remaining_slaves)
- msg = "Slaves " + str(list(disconnected_slaves)) + \
- " disconnected from the controller."
- raise ConnectionError(msg)
- return True
+ def wait_for_condition(self, condition_check, timeout=0):
+ res = True
+ prev_handler = signal.signal(signal.SIGALRM, _timeout_handler)
+
+ def condition_wrapper():
+ res = condition_check()
+ if res:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+ logging.debug("Condition passed, disabling timeout alarm")
+ return res
+
+ try:
+ signal.alarm(timeout)
+
+ wait = True
+ while wait:
+ connected_slaves = self._connection_mapping.keys()
+ messages = self.check_connections(timeout=1)
+ for msg in messages:
+ try:
+ self._process_message(msg)
+ wait = wait and not condition_wrapper()
+ except WaitTimeoutError as exc:
+ logging.error("Waiting for condition timed out!")
+ res = False
+ wait = False
+
+ wait = wait and not condition_wrapper()
+
+ remaining_slaves = self._connection_mapping.keys()
+ if connected_slaves != remaining_slaves:
+ self._handle_disconnects(set(connected_slaves)-
+ set(remaining_slaves))
+ except WaitTimeoutError as exc:
+ logging.error("Waiting for condition timed out!")
+ res = False
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, prev_handler)
+
+ return res
def handle_messages(self):
connected_slaves = self._connection_mapping.keys()
messages = self.check_connections()
- remaining_slaves = self._connection_mapping.keys()
-
for msg in messages:
self._process_message(msg)
+ remaining_slaves = self._connection_mapping.keys()
if connected_slaves != remaining_slaves:
- disconnected_slaves = set(connected_slaves) -\
- set(remaining_slaves)
- msg = "Slaves " + str(list(disconnected_slaves)) + \
- " disconnected from the controller."
- raise ConnectionError(msg)
+ self._handle_disconnects(set(connected_slaves)-
+ set(remaining_slaves))
return True
def _process_message(self, message):
@@ -196,6 +209,21 @@ class MessageDispatcher(ConnectionHandler):
msg = "Unknown message type: %s" % message[1]["type"]
raise ConnectionError(msg)
+ def _handle_disconnects(self, disconnected_slaves):
+ disconnected_slaves = set(disconnected_slaves)
+ for slave in list(disconnected_slaves):
+ if not slave.get_mapped():
+ logging.warn("Slave {} soft-disconnected from the "
+ "controller.".format(slave.get_id()))
+ disconnected_slaves.remove(slave)
+
+ if len(disconnected_slaves) > 0:
+ disconnected_names = [x.get_id()
+ for x in disconnected_slaves]
+ msg = "Slaves " + str(list(disconnected_names)) + \
+ " hard-disconnected from the controller."
+ raise ConnectionError(msg)
+
def disconnect_slave(self, machine):
soc = self.get_connection(machine)
self.remove_connection(soc)
--
2.17.0