Changes: Controller now loads all machines from all machine pool dirs, and queries them all at once. This significantly improves waiting time when any number of unavailable devices are in the pool.
Queue is used for storing touples (m_id, available) Process runs function query_machine, for querying machine
Signed-off-by: Jiri Prochazka jprochaz@redhat.com --- lnst/Controller/SlavePool.py | 93 ++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 37 deletions(-)
diff --git a/lnst/Controller/SlavePool.py b/lnst/Controller/SlavePool.py index 22c4379..a6a55cd 100644 --- a/lnst/Controller/SlavePool.py +++ b/lnst/Controller/SlavePool.py @@ -18,6 +18,7 @@ import os import re import copy from xml.dom import minidom +from multiprocessing import Process, Queue from lnst.Common.Config import lnst_config from lnst.Common.NetUtils import normalize_hwaddr from lnst.Common.NetUtils import test_tcp_connection @@ -45,24 +46,50 @@ class SlavePool: logging.info("Checking machine pool availability.") for pool_dir in pool_dirs: self.add_dir(pool_dir) + self.check_availability() + + def query_machine(self, machine, queue): + if self._pool_checks: + available = False + m_id = machine + hostname = self._pool[machine]["params"]["hostname"] + if "rpc_port" in self._pool[machine]["params"]: + port = self._pool[machine]["params"]["rpc_port"] + else: + port = lnst_config.get_option('environment', 'rpcport')
- def add_dir(self, pool_dir): - logging.info("Processing pool dir '%s'" % pool_dir) - dentries = os.listdir(pool_dir) + logging.debug("Querying machine '%s': %s:%s" %\ + (m_id, hostname, port)) + if test_tcp_connection(hostname, port): + available = True
- res = [] - for dirent in dentries: - m_info = self.add_file("%s/%s" % (pool_dir, dirent)) - if m_info != None: - res.append(m_info) + if 'libvirt_domain' in self._pool[machine]['params'] and \ + not self._allow_virt: + logging.debug("libvirtd not running. Removing "\ + "libvirt_domain from machine '%s'" % m_id) + del self._pool[machine]['params']['libvirt_domain'] + else: + available = True + queue.put((m_id, available))
- if len(res) == 0: - logging.warn("No machines found in this directory")
+ def check_availability(self): max_len = 0 - for m_id, _ in res: - if len(m_id) > max_len: - max_len = len(m_id) + res = [] + queue = Queue() + for machine in self._pool: + proc = Process(target=self.query_machine , args=(machine, queue,)) + proc.start() + + proc.join() + + while not queue.empty(): + res.append(queue.get()) + + for machine in res: + if len(machine) > max_len: + max_len = len(machine) + for m_id, available in res: if available: machine_spec = self._pool[m_id] @@ -77,9 +104,23 @@ class SlavePool: else: msg = "%s%s [%s]" % (m_id, (max_len - len(m_id)) * " ", decorate_with_preset("DOWN", "fail")) + del self._pool[m_id]
logging.info(msg)
+ def add_dir(self, pool_dir): + logging.info("Processing pool dir '%s'" % pool_dir) + dentries = os.listdir(pool_dir) + + res = [] + for dirent in dentries: + m_info = self.add_file("%s/%s" % (pool_dir, dirent)) + if m_info != None: + res.append(m_info) + + if len(res) == 0: + logging.warn("No machines found in this directory") + def add_file(self, filepath): if os.path.isfile(filepath) and re.search(".xml$", filepath, re.I): dirname, basename = os.path.split(filepath) @@ -105,31 +146,9 @@ class SlavePool: "your pool ('%s' and '%s')." % (m_id, pm_id) raise SlaveMachineError(msg)
- if self._pool_checks: - available = False + self._pool[m_id] = machine_spec
- hostname = machine_spec["params"]["hostname"] - if "rpc_port" in machine_spec["params"]: - port = machine_spec["params"]["rpc_port"] - else: - port = lnst_config.get_option('environment', 'rpcport') - - logging.debug("Querying machine '%s': %s:%s" %\ - (m_id, hostname, port)) - if test_tcp_connection(hostname, port): - available = True - - if 'libvirt_domain' in machine_spec['params'] and \ - not self._allow_virt: - logging.debug("libvirtd not running. Removing "\ - "libvirt_domain from machine '%s'" % m_id) - del machine_spec['params']['libvirt_domain'] - else: - available = True - - if available: - self._pool[m_id] = machine_spec - return (m_id, available) + return m_id
def _process_machine_xml_data(self, m_id, machine_xml_data): machine_spec = {"interfaces": {}, "params":{}}
Some comments on the patch.
On Thu, Oct 09, 2014 at 01:49:33PM +0200, Jiri Prochazka wrote:
Changes: Controller now loads all machines from all machine pool dirs, and queries them all at once. This significantly improves waiting time when any number of unavailable devices are in the pool.
Queue is used for storing touples (m_id, available) Process runs function query_machine, for querying machine
Signed-off-by: Jiri Prochazka jprochaz@redhat.com
lnst/Controller/SlavePool.py | 93 ++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 37 deletions(-)
diff --git a/lnst/Controller/SlavePool.py b/lnst/Controller/SlavePool.py index 22c4379..a6a55cd 100644 --- a/lnst/Controller/SlavePool.py +++ b/lnst/Controller/SlavePool.py @@ -18,6 +18,7 @@ import os import re import copy from xml.dom import minidom +from multiprocessing import Process, Queue from lnst.Common.Config import lnst_config from lnst.Common.NetUtils import normalize_hwaddr from lnst.Common.NetUtils import test_tcp_connection @@ -45,24 +46,50 @@ class SlavePool: logging.info("Checking machine pool availability.") for pool_dir in pool_dirs: self.add_dir(pool_dir)
self.check_availability()
- def query_machine(self, machine, queue):
if self._pool_checks:
available = False
m_id = machine
hostname = self._pool[machine]["params"]["hostname"]
if "rpc_port" in self._pool[machine]["params"]:
port = self._pool[machine]["params"]["rpc_port"]
else:
port = lnst_config.get_option('environment', 'rpcport')
- def add_dir(self, pool_dir):
logging.info("Processing pool dir '%s'" % pool_dir)
dentries = os.listdir(pool_dir)
logging.debug("Querying machine '%s': %s:%s" %\
(m_id, hostname, port))
if test_tcp_connection(hostname, port):
available = True
res = []
for dirent in dentries:
m_info = self.add_file("%s/%s" % (pool_dir, dirent))
if m_info != None:
res.append(m_info)
if 'libvirt_domain' in self._pool[machine]['params'] and \
not self._allow_virt:
logging.debug("libvirtd not running. Removing "\
"libvirt_domain from machine '%s'" % m_id)
del self._pool[machine]['params']['libvirt_domain']
else:
available = True
queue.put((m_id, available))
if len(res) == 0:
logging.warn("No machines found in this directory")
- def check_availability(self): max_len = 0
for m_id, _ in res:
if len(m_id) > max_len:
max_len = len(m_id)
res = []
queue = Queue()
for machine in self._pool:
proc = Process(target=self.query_machine , args=(machine, queue,))
proc.start()
proc.join()
This should join all the created processes. This way we only wait for the process that was created as the last one.
There is also the corner case of no machines being in a pool which leaves the proc variable undefined which results in an exception.
while not queue.empty():
res.append(queue.get())
for machine in res:
if len(machine) > max_len:
max_len = len(machine)
This is wrong, 'res' is a list of doubles (len(machine) is always 2), max_len should contain the length of the longes machine id, which in this case is machine[0]. This is used for formatting the [UP]/[DOWN] logs. It should use the original code: for m_id, _ in res: if len(m_id) > max_len: max_len = len(m_id)
The rest of the patch looks fine to me.
for m_id, available in res: if available: machine_spec = self._pool[m_id]
@@ -77,9 +104,23 @@ class SlavePool: else: msg = "%s%s [%s]" % (m_id, (max_len - len(m_id)) * " ", decorate_with_preset("DOWN", "fail"))
del self._pool[m_id] logging.info(msg)
def add_dir(self, pool_dir):
logging.info("Processing pool dir '%s'" % pool_dir)
dentries = os.listdir(pool_dir)
res = []
for dirent in dentries:
m_info = self.add_file("%s/%s" % (pool_dir, dirent))
if m_info != None:
res.append(m_info)
if len(res) == 0:
logging.warn("No machines found in this directory")
def add_file(self, filepath): if os.path.isfile(filepath) and re.search(".xml$", filepath, re.I): dirname, basename = os.path.split(filepath)
@@ -105,31 +146,9 @@ class SlavePool: "your pool ('%s' and '%s')." % (m_id, pm_id) raise SlaveMachineError(msg)
if self._pool_checks:
available = False
self._pool[m_id] = machine_spec
hostname = machine_spec["params"]["hostname"]
if "rpc_port" in machine_spec["params"]:
port = machine_spec["params"]["rpc_port"]
else:
port = lnst_config.get_option('environment', 'rpcport')
logging.debug("Querying machine '%s': %s:%s" %\
(m_id, hostname, port))
if test_tcp_connection(hostname, port):
available = True
if 'libvirt_domain' in machine_spec['params'] and \
not self._allow_virt:
logging.debug("libvirtd not running. Removing "\
"libvirt_domain from machine '%s'" % m_id)
del machine_spec['params']['libvirt_domain']
else:
available = True
if available:
self._pool[m_id] = machine_spec
return (m_id, available)
return m_id
def _process_machine_xml_data(self, m_id, machine_xml_data): machine_spec = {"interfaces": {}, "params":{}}
-- 1.9.3
LNST-developers mailing list LNST-developers@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/lnst-developers
lnst-developers@lists.fedorahosted.org