r5459 - branches/tmckay/cumin/python/cumin/grid
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-09-07 16:02:45 +0000 (Fri, 07 Sep 2012)
New Revision: 5459
Modified:
branches/tmckay/cumin/python/cumin/grid/job.py
Log:
Improve some error handling when remote ops generate exceptions.
This should eliminate "Loading..." forever scenarios or unbounded errors.
Modified: branches/tmckay/cumin/python/cumin/grid/job.py
===================================================================
--- branches/tmckay/cumin/python/cumin/grid/job.py 2012-09-06 17:21:28 UTC (rev 5458)
+++ branches/tmckay/cumin/python/cumin/grid/job.py 2012-09-07 16:02:45 UTC (rev 5459)
@@ -24,6 +24,12 @@
strings = StringCatalog(__file__)
log = logging.getLogger("cumin.job")
+class ErrorResult(object):
+ def __init__(self, err):
+ self.status = "Failed"
+ self.error = err
+ self.data = None
+
class SubmissionObjectFrame(ObjectFrame):
def get_submission_sched(self, session, id):
submission = self.get_object(session, id)
@@ -506,17 +512,18 @@
id = self.frame.id.get(session)
submission, sched = self.frame.get_submission_sched(session, id)
job_id = self.frame.job_id.get(session)
- results = self.app.remote.get_job_ad(sched, job_id, submission,
- default={'JobAd': {}})
+ try:
+ results = self.app.remote.get_job_ad(sched, job_id, submission,
+ default={'JobAd': {}})
+ except Exception, e:
+ results = ErrorResult(Exception("Failed to get job ad: exception: "\
+ "%s" % e))
return results
def check_job_owner(self, session, ad):
error = None
if not session.client_session.check_owner(ad['Owner']):
- class InventError:
- def __init__(self, msg):
- self.args = [msg]
- error = InventError("Logged in user does not own the specified job")
+ error = Exception("Logged in user does not own the specified job")
return error
def do_get_items(self, session):
@@ -525,19 +532,21 @@
if not ad_list and not error:
ad_list = list()
results = self.get_job_ad(session)
- error = None
if results.error:
error = results.error
elif self.do_check_viewable:
error = self.check_job_owner(session, results.data['JobAd'])
-
- self.qmf_error.set(session, error)
- ads = results.data['JobAd']
- cls = self.app.model.job_meta_data
- ad_list = [self.gen_item(x, ads[x], cls, dtype=self.get_type(ads, x)) \
- for x in ads if not x.startswith("!!")]
+ else:
+ error = None
+ if error:
+ self.qmf_error.set(session, error)
+ else:
+ ads = results.data['JobAd']
+ cls = self.app.model.job_meta_data
+ ad_list = [self.gen_item(x, ads[x],
+ cls, dtype=self.get_type(ads, x)) \
+ for x in ads if not x.startswith("!!")]
self.items.set(session, ad_list)
-
return ad_list, error
def get_desc(self, descriptors, x):
@@ -615,14 +624,22 @@
id = self.frame.id.get(session)
submission, sched = self.frame.get_submission_sched(session, id)
job_id = self.frame.job_id.get(session)
- results = self.app.remote.get_job_ad(sched, job_id, submission,
- default={'JobAd': {}})
+ try:
+ results = self.app.remote.get_job_ad(sched, job_id, submission,
+ default={'JobAd': {}})
+ except Exception, e:
+ results = ErrorResult(Exception("Failed to get job ad: "\
+ "exception: %s" % e))
error = results.error
- self.qmf_error.set(session, error)
- ads = results.data['JobAd']
- cls = self.app.model.job_meta_data
- ad_list = [self.gen_item(x, ads[x], cls, dtype=self.get_type(ads, x)) \
- for x in ads if not x.startswith("!!") and x in self.app.fast_view_attributes]
+ if error:
+ self.qmf_error.set(session, error)
+ else:
+ ads = results.data['JobAd']
+ cls = self.app.model.job_meta_data
+ ad_list = [self.gen_item(x, ads[x], cls,
+ dtype=self.get_type(ads, x)) \
+ for x in ads if not x.startswith("!!") \
+ and x in self.app.fast_view_attributes]
self.items.set(session, ad_list)
return ad_list, error
@@ -1003,12 +1020,17 @@
state, file, start, end = self.get_file_args(session)
if file:
- result = self.app.remote.fetch_job_data(scheduler, job_id, state,
- file, start, end,
- submission,
- default={'Data': ""})
+ try:
+ result = self.app.remote.fetch_job_data(scheduler,
+ job_id, state,
+ file, start, end,
+ submission,
+ default={'Data': ""})
+ except Exception, e:
+ result = ErrorResult(Exception("Failed to get job data: "\
+ "exception: %s" % e))
if result.error:
- return result.status
+ return result.error
return escape_entity(result.data['Data'])
return self.err_msg
11 years, 8 months
r5458 - in branches/tmckay: cumin/etc cumin/python/cumin sage/python/sage sage/python/sage/aviary
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-09-06 17:21:28 +0000 (Thu, 06 Sep 2012)
New Revision: 5458
Modified:
branches/tmckay/cumin/etc/cumin.conf
branches/tmckay/cumin/python/cumin/main.py
branches/tmckay/sage/python/sage/aviary/aviarylocator.py
branches/tmckay/sage/python/sage/aviary/aviaryoperations.py
branches/tmckay/sage/python/sage/util.py
Log:
Change treatment of defaults for locator, job_servers and query_servers in aviaryoperations.
Add assertions to make sure a source is set for endpoints.
Default if just locator is passed is to enable all operations for the job and query services
(that is, job_servers and query_servers values may be empty).
Fixup datadir treatment for locator.
Modified: branches/tmckay/cumin/etc/cumin.conf
===================================================================
--- branches/tmckay/cumin/etc/cumin.conf 2012-09-05 20:00:19 UTC (rev 5457)
+++ branches/tmckay/cumin/etc/cumin.conf 2012-09-06 17:21:28 UTC (rev 5458)
@@ -26,29 +26,24 @@
# ****************************************************
-# Aviary interface to condor
+# Aviary interface to condor. More details about the following parameters
+# are given in 'Some parameter explanations by section' below.
# The value for this parameter is a comma separated list of URLs for Aviary
-# job servers. If the Aviary locator is used, this value will be overriden
-# but must still be non-empty to enable use of Aviary job servers.
-# Default value is shown. Uncomment and leave the value blank to disable.
-# More details in 'Some parameter explanations by section' below.
+# job servers. If the Aviary locator is used, this value will be overriden.
+# Default is shown.
# aviary-job-servers: http://localhost:9090
# The value for this parameter is a comma separated list of URLs for Aviary
-# query servers. If the Aviary locator is used, this value will be overriden
-# but must still be non-empty to enable use of Aviary query servers.
-# Default value is shown. Uncomment and leave the value blank to disable.
-# More details in 'Some parameter explanations by section' below.
+# query servers. If the Aviary locator is used, this value will be overriden.
+# Default is shown.
# aviary-query-servers: http://localhost:9091
-# The locator allows Cumin to retrive values for Aviary job servers and
+# The locator allows Cumin to retrie ve values for Aviary job servers and
# Aviary query servers automatically. If the Aviary locator is enabled the
-# values for aviary-job-servers and aviary-query-servers will be overriden
-# (but those parameters must still be non-empty to be enabled).
+# values for aviary-job-servers and aviary-query-servers will be overriden.
# Default is empty string (aviary locator will not be used). Uncomment the
-# following line and edit as needed to enable. More details in
-# 'Some parameter explanations by section' below.
+# following line and edit as needed to enable.
# aviary-locator: http://localhost:9000
# Full path to private key file used for ssl communication with aviary servers.
@@ -252,18 +247,13 @@
## sets a port number may be followed by one or more
## port numbers separated by commas to specify
## mulitple job servers whose URIs differ only
-## by port number. This parameter must be non-empty
-## in order for aviary job servers to be used, even
-## if aviary-locator has been set.
+## by port number.
## aviary-query-servers: http://localhost:9091
## Like aviary-job-servers but specifies URIs for aviary
## query servers. The port value defaults to 9091 and the
## path defaults to /services/query/. Other
## defaults are as noted for aviary-job-servers.
-## This parameter must be non-empty in order for aviary
-## query servers to be used, even if aviary-locator
-## has been set.
## aviary-locator: http://localhost:9000
## Specifies the URI for the aviary locator. This is
Modified: branches/tmckay/cumin/python/cumin/main.py
===================================================================
--- branches/tmckay/cumin/python/cumin/main.py 2012-09-05 20:00:19 UTC (rev 5457)
+++ branches/tmckay/cumin/python/cumin/main.py 2012-09-06 17:21:28 UTC (rev 5458)
@@ -198,7 +198,8 @@
ops = [QmfOperations("qmf", self.session)]
imports_ok = True
- if self.aviary_job_servers or self.aviary_query_servers:
+ if self.aviary_locator or \
+ self.aviary_job_servers or self.aviary_query_servers:
try:
from sage.aviary.aviaryoperations import \
SudsLogging, AviaryOperationsFactory
Modified: branches/tmckay/sage/python/sage/aviary/aviarylocator.py
===================================================================
--- branches/tmckay/sage/python/sage/aviary/aviarylocator.py 2012-09-05 20:00:19 UTC (rev 5457)
+++ branches/tmckay/sage/python/sage/aviary/aviarylocator.py 2012-09-06 17:21:28 UTC (rev 5458)
@@ -1,7 +1,7 @@
import logging
import os
-from sage.util import parse_URL
+from sage.util import parse_URL, get_datadir
from clients import OverrideClient, TransportFactory
log = logging.getLogger("sage.aviary.locator")
@@ -21,8 +21,10 @@
'''
self.transport = TransportFactory(key, cert, root_cert, domain_verify)
self.scheme, self.locator_uri = self._get_uri(locator_uri)
- self.datadir = datadir
- self.wsdl = "file:" + os.path.join(self.datadir, "aviary-locator.wsdl")
+
+ self.wsdl = "file:" + os.path.join(get_datadir(datadir, "locator"),
+ "aviary-locator.wsdl")
+
log.info("AviaryLocator: locator URL set to %s" % self.locator_uri)
def _get_uri(self, locator):
Modified: branches/tmckay/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/tmckay/sage/python/sage/aviary/aviaryoperations.py 2012-09-05 20:00:19 UTC (rev 5457)
+++ branches/tmckay/sage/python/sage/aviary/aviaryoperations.py 2012-09-06 17:21:28 UTC (rev 5458)
@@ -11,7 +11,7 @@
from datetime import datetime
from threading import Lock
from suds import *
-from sage.util import CallSync, CallThread, host_list, parse_URL
+from sage.util import CallSync, CallThread, host_list, parse_URL, get_datadir
from aviarylocator import AviaryLocator
from clients import ClientPool, TransportFactory
@@ -242,9 +242,7 @@
"/services/job/",
"JOB")
-
-
- job_wsdl = "file:" + os.path.join(self.get_datadir(datadir,"job"),
+ job_wsdl = "file:" + os.path.join(get_datadir(datadir,"job"),
"aviary-job.wsdl")
self.job_client_pool = ClientPool(job_wsdl, None)
@@ -437,7 +435,7 @@
"/services/query/",
"QUERY_SERVER")
- query_wsdl = "file:" + os.path.join(self.get_datadir(datadir,"query"),
+ query_wsdl = "file:" + os.path.join(get_datadir(datadir,"query"),
"aviary-query.wsdl")
self.query_client_pool = ClientPool(query_wsdl, None)
@@ -722,23 +720,6 @@
self.type_to_aviary = self._type_to_aviary()
self.aviary_to_type = self._aviary_to_type()
- def get_datadir(self, datadir, subdir):
- if not type(datadir) in (tuple, list):
- datadir = [datadir]
-
- # Find the first element in datadir that is a valid
- # path. If the path has a subdirectory called
- # "subdir", consider that part of the path.
- for d in datadir:
- if os.path.isdir(d):
- s = os.path.join(d, subdir)
- if os.path.isdir(s):
- return s
- return d
- # Hmm, well, just return the first one since we're
- # going to get an error anyway.
- return datadir[0]
-
@classmethod
def _type_to_aviary(cls):
# Need to be able to turn simple Python types into Aviary types for attributes
@@ -851,9 +832,18 @@
return sync
class AviaryOperations(_AviaryCommon, _AviaryJobMethods, _AviaryQueryMethods):
- def __init__(self, name, datadir, locator, job_servers, query_servers,
+ def __init__(self, name, datadir,
+ locator=None,
+ job_servers="",
+ query_servers="",
key="", cert="", root_cert="", domain_verify=True):
+ # If job_servers and query_servers is not set, we must have a
+ # locator. If locator has been set, it must be the right type
+ # because it will override the server lists.
+ if not (job_servers and query_servers) or locator is not None:
+ assert isinstance(locator, AviaryLocator)
+
super(AviaryOperations, self).__init__(name, locator,
key, cert, root_cert,
domain_verify)
@@ -863,9 +853,17 @@
class AviaryJobOperations(_AviaryCommon, _AviaryJobMethods):
- def __init__(self, name, datadir, locator, job_servers,
+ def __init__(self, name, datadir,
+ locator=None,
+ job_servers="",
key="", cert="", root_cert="", domain_verify=True):
+ # If job_servers is not set, we must have a
+ # locator. If locator has been set, it must be the right type
+ # because it will override the server list.
+ if not job_servers or locator is not None:
+ assert isinstance(locator, AviaryLocator)
+
super(AviaryJobOperations, self).__init__(name, locator,
key, cert, root_cert,
domain_verify)
@@ -873,19 +871,33 @@
_AviaryJobMethods.init(self, datadir, job_servers)
class AviaryQueryOperations(_AviaryCommon, _AviaryQueryMethods):
- def __init__(self, name, datadir, locator, query_servers,
+ def __init__(self, name, datadir,
+ locator=None,
+ query_servers="",
key="", cert="", root_cert="", domain_verify=True):
+ # If query_servers is not set, we must have a
+ # locator. If locator has been set, it must be the right type
+ # because it will override the server list.
+ if not query_servers or locator is not None:
+ assert isinstance(locator, AviaryLocator)
+
super(AviaryQueryOperations, self).__init__(name, locator,
key, cert, root_cert,
domain_verify)
_AviaryQueryMethods.init(self, datadir, query_servers)
-def AviaryOperationsFactory(name, datadir, locator_uri,
- job_servers, query_servers,
+def AviaryOperationsFactory(name, datadir,
+ locator_uri="",
+ job_servers="",
+ query_servers="",
key="", cert="", root_cert="", domain_verify=True):
+ if not (job_servers or query_servers or locator_uri):
+ raise Exception("locator_uri, job_servers, and query_servers "\
+ "may not all be null strings")
+
# If locator uri has not been specified, it's disabled and we will
# use the specified job_servers and query_servers values
if locator_uri:
@@ -894,15 +906,19 @@
else:
locator = None
- if job_servers and query_servers:
+ # Default case is to supply all operations.
+ # The set of operations can be pruned to one set or the other
+ # by providing a string for either (or any value that evaluates
+ # to True if locator is being used).
+ if not bool(job_servers) ^ bool(query_servers):
res = AviaryOperations(name, datadir, locator,
job_servers, query_servers,
key, cert, root_cert, domain_verify)
elif job_servers:
- res = AviaryJobOperations(name, datadir, locator,job_servers,
+ res = AviaryJobOperations(name, datadir, locator, job_servers,
key, cert, root_cert, domain_verify)
elif query_servers:
- res = AviaryQueryOperations(name, datadir, locator,query_servers,
+ res = AviaryQueryOperations(name, datadir, locator, query_servers,
key, cert, root_cert, domain_verify)
return res
Modified: branches/tmckay/sage/python/sage/util.py
===================================================================
--- branches/tmckay/sage/python/sage/util.py 2012-09-05 20:00:19 UTC (rev 5457)
+++ branches/tmckay/sage/python/sage/util.py 2012-09-06 17:21:28 UTC (rev 5458)
@@ -3,6 +3,7 @@
import re
import copy
import string
+import os
class MethodResult(object):
'''
@@ -308,3 +309,20 @@
else:
mechs = "ANONYMOUS"
return mechs
+
+def get_datadir(datadir, subdir):
+ if not type(datadir) in (tuple, list):
+ datadir = [datadir]
+
+ # Find the first element in datadir that is a valid
+ # path. If the path has a subdirectory called
+ # "subdir", consider that part of the path.
+ for d in datadir:
+ if os.path.isdir(d):
+ s = os.path.join(d, subdir)
+ if os.path.isdir(s):
+ return s
+ return d
+ # Hmm, well, just return the first one since we're
+ # going to get an error anyway.
+ return datadir[0]
11 years, 8 months
r5457 - in branches/tmckay/sage/python/sage: . aviary
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-09-05 20:00:19 +0000 (Wed, 05 Sep 2012)
New Revision: 5457
Modified:
branches/tmckay/sage/python/sage/aviary/aviaryoperations.py
branches/tmckay/sage/python/sage/aviary/clients.py
branches/tmckay/sage/python/sage/util.py
Log:
Add interfaces for Aviary getSubmissionID method
Add assert check on type of object returned to ObjectPool
Add a fake Scheduler object for using sage outside of Cumin
Modified: branches/tmckay/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/tmckay/sage/python/sage/aviary/aviaryoperations.py 2012-09-05 18:55:12 UTC (rev 5456)
+++ branches/tmckay/sage/python/sage/aviary/aviaryoperations.py 2012-09-05 20:00:19 UTC (rev 5457)
@@ -11,7 +11,7 @@
from datetime import datetime
from threading import Lock
from suds import *
-from sage.util import CallSync, CallThread, ObjectPool, host_list
+from sage.util import CallSync, CallThread, host_list, parse_URL
from aviarylocator import AviaryLocator
from clients import ClientPool, TransportFactory
@@ -210,6 +210,24 @@
"settings in cumin.conf" % (self.nice, machine))
return scheme, host
+class MockScheduler(object):
+ '''
+ Most methods in the _AviaryXXX objects were written to be compatible
+ with QMF methods which take a scheduler object.
+
+ This is a mock scheduler object containing the fields
+ referenced by the methods: a scheduler name, a pool identifier,
+ and a machine hostname/ip.
+
+ This object may be used when a scheduler object from Rosemary
+ is not available (ie, sage is being used outside of Cumin). The
+ only required value is machine
+ '''
+ def __init__(self, machine, name = None, pool = None):
+ self.Name = name
+ self.Pool = pool
+ self.Machine = machine
+
class _AviaryJobMethods(object):
# Do this here rather than __init__ so we don't have to worry about
@@ -631,7 +649,63 @@
query_client, "getSubmissionSummary", subId)
t.start()
+ def _get_submission_ids(self, host, page_size, mode, host_is_endpoint,
+ name=None, qdate=None):
+ def my_process_results(result):
+ # Fix up the exception message if necessary
+ result = self._pretty_result(result, host)
+ if isinstance(result, Exception):
+ status = result
+ data = None
+ else:
+ status = 0
+ if not hasattr(result, "ids"):
+ result.ids = []
+ data = result
+ return (status, data)
+
+ # Note, only one of name or qdate is expected to be
+ # set here.
+ query_client = self.query_client_pool.get_object()
+ subId = query_client.factory.create("ns0:SubmissionID")
+ subId.name = name
+ subId.qdate = qdate
+
+ if host_is_endpoint:
+ servers = None
+ else:
+ servers = self.query_servers
+ self._setup_client(query_client,
+ servers, # server lookup object
+ host, # host we want
+ "getSubmissionID")
+
+ res = self._call_sync(my_process_results,
+ self.call_client_retry,
+ query_client,
+ "getSubmissionID",
+ page_size, mode, subId)
+
+ self.query_client_pool.return_object(query_client)
+ return res;
+
+ def get_submission_ids_by_name(self, host, page_size, name,
+ host_is_endpoint = False):
+
+ return self._get_submission_ids(host, page_size, None,
+ host_is_endpoint, name=name)
+
+ def get_submission_ids_by_qdate(self, host, page_size, qdate, mode,
+ host_is_endpoint = False):
+
+ if long(qdate) > sys.maxint:
+ raise Exception("Qdate is larger than max int")
+ if not mode in ("BEFORE", "AFTER"):
+ raise Exception("Mode must be 'BEFORE' or 'AFTER'")
+ return self._get_submission_ids(host, page_size, mode,
+ host_is_endpoint, qdate=qdate)
+
class _AviaryCommon(object):
def __init__(self, name, locator,
key="", cert="", root_cert="", domain_verify=True):
@@ -675,8 +749,17 @@
return {"INTEGER": int, "FLOAT": float, "STRING": str, "BOOLEAN": bool}
def _set_client_info(self, client, refresh=False):
- scheme, host = client.server_list.find_server(client.service_name,
- refresh)
+ # If there is no server_list, then client.server_name is expected to
+ # be an endpoint
+ if client.server_list:
+ scheme, host = client.server_list.find_server(client.server_name,
+ refresh)
+ else:
+ # We need the scheme value separated out for the transport
+ scheme = parse_URL(client.server_name).scheme
+ if scheme is None:
+ scheme = "http"
+ host = client.server_name
# Have to set the URL for the method. This might go away someday...
client.set_options(location=host+client.method_name)
@@ -691,7 +774,7 @@
# Look up the host and construct the URL.
# Store information in the client so that retry is possible.
client.server_list = server_list
- client.service_name = name
+ client.server_name = name
client.method_name = meth_name
# This is initial setup before a call so we want to try a
@@ -732,7 +815,7 @@
return result
def call_client_retry(self, client, meth_name, *meth_args, **meth_kwargs):
- # If we fail with a urllib2.URLError (or anything similar) then try
+ # If we fail with a urllib2.URLError (or anything similar) then
# attempt to get a new endpoint and try again.
meth = getattr(client.service, meth_name)
try:
@@ -742,7 +825,7 @@
# (probably due to a restart on the condor side)
# Let's get new endpoints, reset the client,
# and try again.
- if client.server_list.should_retry:
+ if client.server_list and client.server_list.should_retry:
log.debug("AviaryOperations: received %s, retrying %s"\
% (str(e), client.options.location))
self._set_client_info(client, refresh=True)
Modified: branches/tmckay/sage/python/sage/aviary/clients.py
===================================================================
--- branches/tmckay/sage/python/sage/aviary/clients.py 2012-09-05 18:55:12 UTC (rev 5456)
+++ branches/tmckay/sage/python/sage/aviary/clients.py 2012-09-05 20:00:19 UTC (rev 5457)
@@ -122,7 +122,7 @@
class ClientPool(ObjectPool):
def __init__(self, wsdl, max_size):
- super(ClientPool, self).__init__(max_size)
+ super(ClientPool, self).__init__(max_size, OverrideClient)
self.wsdl = wsdl
def create_object(self):
Modified: branches/tmckay/sage/python/sage/util.py
===================================================================
--- branches/tmckay/sage/python/sage/util.py 2012-09-05 18:55:12 UTC (rev 5456)
+++ branches/tmckay/sage/python/sage/util.py 2012-09-05 20:00:19 UTC (rev 5457)
@@ -178,8 +178,9 @@
ObjectPool must be derived from, and create_object must be overridden.
If max_size is set to None, the pool size is unlimited.
'''
- def __init__(self, max_size):
+ def __init__(self, max_size, obj_type):
self.max_size = max_size
+ self.obj_type = obj_type
self.pool = list()
self.lock = Lock()
@@ -194,6 +195,7 @@
return obj
def return_object(self, obj):
+ assert isinstance(obj, self.obj_type)
self.lock.acquire()
if self.max_size is None or len(self.pool) < self.max_size:
self.pool.append(obj)
11 years, 8 months
r5456 - trunk/wooly/python/wooly
by croberts@fedoraproject.org
Author: croberts
Date: 2012-09-05 18:55:12 +0000 (Wed, 05 Sep 2012)
New Revision: 5456
Modified:
trunk/wooly/python/wooly/table.strings
Log:
Fixing BZ 851205 (table width extending beyond border on Scheduler page) by adding the css property table-layout:fixed (instead of table-layout:auto). This should keep the table inside the border even if the data is lengthy.
Modified: trunk/wooly/python/wooly/table.strings
===================================================================
--- trunk/wooly/python/wooly/table.strings 2012-09-04 18:15:20 UTC (rev 5455)
+++ trunk/wooly/python/wooly/table.strings 2012-09-05 18:55:12 UTC (rev 5456)
@@ -1,6 +1,6 @@
[Table.css]
table.Table {
- table-layout: auto;
+ table-layout: fixed;
width: 100%;
border-collapse: collapse;
}
11 years, 8 months
r5455 - in branches/tmckay/mint/python/mint: . plumage
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-09-04 18:15:20 +0000 (Tue, 04 Sep 2012)
New Revision: 5455
Modified:
branches/tmckay/mint/python/mint/main.py
branches/tmckay/mint/python/mint/plumage/main.py
branches/tmckay/mint/python/mint/plumage/session.py
branches/tmckay/mint/python/mint/plumage/update.py
branches/tmckay/mint/python/mint/session.py
branches/tmckay/mint/python/mint/update.py
Log:
Move the package filter and set up the list of classes to bind in the main
application. Make the class list canonical, even if only a package list
was initially specified. Reformat some lines for length in plumage.
Modified: branches/tmckay/mint/python/mint/main.py
===================================================================
--- branches/tmckay/mint/python/mint/main.py 2012-08-31 17:58:10 UTC (rev 5454)
+++ branches/tmckay/mint/python/mint/main.py 2012-09-04 18:15:20 UTC (rev 5455)
@@ -2,6 +2,7 @@
from expire import ExpireThread
from model import MintModel
from session import MintSession
+from aviary.session import MintAviarySession
from update import UpdateThread
from vacuum import VacuumThread
from cumin.admin import *
@@ -17,6 +18,8 @@
self.database = MintDatabase(self, database_dsn)
self.admin = CuminAdmin(self)
self.update_thread = UpdateThread(self)
+ self.session = None
+ self.aviary_session = None
self.expire_enabled = True
self.expire_thread = ExpireThread(self)
@@ -30,15 +33,19 @@
# mechanisms, according to the sasl documentation
self.sasl_mech_list = None
- # List of classes to bind. Referenced by session and update thread.
- self.qmf_classes = set()
+ # List of classes to bind in QMF. Referenced by session and update thread.
+ self._qmf_classes = set()
- # If binding was not done by class, this is the final list of
- # packages that were bound
- self.qmf_packages = set()
+ # List of classes to gather via Aviary. Derived from self._qmf_classes
+ # in combination with the list of classes supported by Aviary.
+ self._aviary_classes = set()
+ # If self._qmf_classes is not set, this is the list of packages
+ # to bind. This will be translated into self.qmf_classes.
+ self._qmf_packages = set()
+
# Things we know should not be bound.
- self.qmf_package_filter = ["com.redhat.cumin", "com.redhat.cumin.grid"]
+ self._qmf_package_filter = ["com.redhat.cumin", "com.redhat.cumin.grid"]
# Aviary interface.
self.aviary_query_servers = ""
@@ -53,8 +60,11 @@
self.aviary_prefer_condor = True
def set_classes(self, classes):
- self.qmf_classes = classes
+ self._qmf_classes = classes
+ def get_bound_classes(self):
+ return self._aviary_classes.union(self._qmf_classes)
+
def check(self):
log.info("Checking %s", self)
@@ -71,19 +81,39 @@
log.info("Expiration is %s", state(self.expire_enabled))
log.info("Vacuum is %s", state(self.vacuum_enabled))
+ # If self._qmf_classes has not been set, then build classes
+ # here from the package list. We want to be canonical, ie
+ # always have things expressed in terms of classes
+ if not self._qmf_classes:
+ if not self._qmf_packages:
+ self._qmf_packages = self.model._packages
+ for pkg in self._qmf_packages:
+ if pkg._name not in self._qmf_package_filter:
+ self._qmf_classes = self._qmf_classes.union(set(pkg._classes))
+ else:
+ # Apply the package filter to the specified class list
+ black_list = set()
+ for cls in self._qmf_classes:
+ if cls._package._name in self._qmf_package_filter:
+ black_list.add(cls)
+ self._qmf_classes.difference_update(black_list)
+
# If aviary_query_servers or aviary_locator is defined
- # here, then we are going to get submissions from Aviary
+ # here, then we are going to get some classes from Aviary
# instead of QMF. Initialize a MintAviary object and
- # eliminate submissions from the set of things bound by
+ # eliminate classes from the set of things bound by
# the QMF interface.
- # For now, bind by class must be in effect or this option
- # cannot be supported (but bind by class is normal).
- if (self.aviary_query_servers or self.aviary_locator) \
- and self.model.com_redhat_grid.Submission in self.qmf_classes:
- self.qmf_classes.remove(self.model.com_redhat_grid.Submission)
+ if self.aviary_query_servers or self.aviary_locator:
+ supported_classes = [self.model.com_redhat_grid.Submission]
+ for c in supported_classes:
+ if c in self._qmf_classes:
+ self._qmf_classes.remove(c)
+ self._aviary_classes.add(c)
+ if self._aviary_classes:
+ self.aviary_session = MintAviarySession(self,
+ self._aviary_classes)
- self.session = MintSession(self, self.broker_uris)
- self.session.init()
+ self.session = MintSession(self, self.broker_uris, self._qmf_classes)
self.database.init()
self.update_thread.init()
@@ -92,14 +122,12 @@
def start(self):
log.info("Starting %s", self)
-
- # Scan the qmf class/package binding list
- # and do any necessary preprocessing
- self.session.init_qmf_classes()
self.update_thread.start()
self.session.start()
+ if self.aviary_session:
+ self.aviary_session.start()
if self.expire_enabled:
self.expire_thread.start()
@@ -119,6 +147,8 @@
self.vacuum_thread.stop()
self.session.stop()
+ if self.aviary_session:
+ self.aviary_session.stop()
log.info("Session stopped")
def __repr__(self):
Modified: branches/tmckay/mint/python/mint/plumage/main.py
===================================================================
--- branches/tmckay/mint/python/mint/plumage/main.py 2012-08-31 17:58:10 UTC (rev 5454)
+++ branches/tmckay/mint/python/mint/plumage/main.py 2012-09-04 18:15:20 UTC (rev 5455)
@@ -35,15 +35,18 @@
self.print_event_level = 0
- self.packages = set()
+ self._packages = set()
- self.classes = set()
+ self._classes = set()
- self.package_filter = ["com.redhat.cumin"]
+ self._package_filter = ["com.redhat.cumin"]
def set_classes(self, classes):
self.classes = classes
+ def get_bound_classes(self):
+ return self._classes
+
def check(self):
log.info("Checking %s", self)
@@ -63,11 +66,26 @@
self.database.init()
self.update_thread.init()
- self.session.init()
- # The package and class lists will be
- # processed here
- self.session.init_classes()
+ # If self._qmf_classes has not been set, then build classes
+ # here from the package list. We want to be canonical, ie
+ # always have things expressed in terms of classes
+ if not self._classes:
+ if not self._packages:
+ self._packages = self.model._packages
+ for pkg in self._packages:
+ if pkg._name not in self._package_filter:
+ self._classes = self._classes.union(set(pkg._classes))
+ else:
+ # Apply the package filter to the specified class list
+ black_list = set()
+ for cls in self._classes:
+ if cls._package._name in self._package_filter:
+ black_list.add(cls)
+ self._classes.difference_update(black_list)
+
+ self.session.init(self._classes)
+
if self.expire_enabled:
self.expire_thread.init()
Modified: branches/tmckay/mint/python/mint/plumage/session.py
===================================================================
--- branches/tmckay/mint/python/mint/plumage/session.py 2012-08-31 17:58:10 UTC (rev 5454)
+++ branches/tmckay/mint/python/mint/plumage/session.py 2012-09-04 18:15:20 UTC (rev 5455)
@@ -56,45 +56,24 @@
def check(self):
log.info("Checking %s", self)
- def init(self):
+ def init(self, classes):
log.info("Initializing %s", self)
-
- def init_classes(self):
if not imports_ok:
return
# Apply the package filter to the class list
- if len(self.app.classes):
- black_list = set()
- for cls in self.app.classes:
- if cls._package._name in self.app.package_filter:
- black_list.add(cls)
- else:
- try:
- loaders = ClassLoaders()
- # Here we grab the name of the method to use (should be in rosemary.xml package/class/loading_class)
- func = getattr(loaders, cls.loading_class, None)
- func(self, cls)
- except Exception, e:
- log.error("No loading function for class %s (%s). Be sure that the method exists and is defined in rosemary.xml" % (cls._name, str(e)))
+ loaders = ClassLoaders()
+ for cls in classes:
+ try:
+ # Here we grab the name of the method to use
+ # (should be in rosemary.xml package/class/loading_class)
+ func = getattr(loaders, cls.loading_class, None)
+ func(self, cls)
+ except Exception, e:
+ log.error("No loading function for class %s (%s). "\
+ "Be sure that the method exists and is "\
+ "defined in rosemary.xml" % (cls._name, str(e)))
- # Update our list, minus the black_list
- self.app.classes.difference_update(black_list)
-
- else:
- # Generate the package list from the model, minus
- # the package filter
- for pkg in self.app.model._packages:
- if pkg._name not in self.app.package_filter:
- self.app.packages.add(pkg)
- loaders = ClassLoaders()
- for cls in pkg._classes:
- try:
- # Here we grab the name of the method to use (should be in rosemary.xml package/class/loading_class)
- func = getattr(loaders, cls.loading_class, None)
- func(self, cls)
- except Exception, e:
- log.error("No loading function for class %s (%s). Be sure that the method exists and is defined in rosemary.xml" % (cls._name, str(e)))
def start(self):
log.info("Starting %s", self)
if not imports_ok:
@@ -109,21 +88,42 @@
t.stop()
def __repr__(self):
- return "%s(%s:%s)" % (self.__class__.__name__, self.server_host, self.server_port)
+ return "%s(%s:%s)" % (self.__class__.__name__,
+ self.server_host, self.server_port)
class ClassLoaders(object):
- ''' method for loading the com.redhat.grid.plumage.OSUtil data from plumage, name is found in rosemary.xml under package/class/loading_class '''
+ ''' method for loading the com.redhat.grid.plumage.OSUtil data from plumage,
+ name is found in rosemary.xml under package/class/loading_class '''
def OSUtilLoader(self, obj, cls):
- obj.threads.append(CatchUpPlumageOSUtilSessionThread(obj.app, obj.server_host, obj.server_port, cls))
- obj.threads.append(PlumageOSUtilSessionThread(obj.app, obj.server_host, obj.server_port, cls))
- obj.threads.append(CurrentPlumageOSUtilSessionThread(obj.app, obj.server_host, obj.server_port, cls))
+ obj.threads.append(CatchUpPlumageOSUtilSessionThread(obj.app,
+ obj.server_host,
+ obj.server_port,
+ cls))
+ obj.threads.append(PlumageOSUtilSessionThread(obj.app,
+ obj.server_host,
+ obj.server_port,
+ cls))
+ obj.threads.append(CurrentPlumageOSUtilSessionThread(obj.app,
+ obj.server_host,
+ obj.server_port,
+ cls))
- ''' method for loading the com.redhat.grid.plumage.Accountant data from plumage, name is found in rosemary.xml under package/class/loading_class '''
+ ''' method for loading the com.redhat.grid.plumage.Accountant data from plumage,
+ name is found in rosemary.xml under package/class/loading_class '''
def AccountantLoader(self, obj, cls):
log.debug("AccountLoader called")
- obj.threads.append(CatchupPlumageAccountantSessionThread(obj.app, obj.server_host, obj.server_port, cls))
- obj.threads.append(PlumageAccountantSessionThread(obj.app, obj.server_host, obj.server_port, cls))
- obj.threads.append(CurrentPlumageAccountantSessionThread(obj.app, obj.server_host, obj.server_port, cls))
+ obj.threads.append(CatchupPlumageAccountantSessionThread(obj.app,
+ obj.server_host,
+ obj.server_port,
+ cls))
+ obj.threads.append(PlumageAccountantSessionThread(obj.app,
+ obj.server_host,
+ obj.server_port,
+ cls))
+ obj.threads.append(CurrentPlumageAccountantSessionThread(obj.app,
+ obj.server_host,
+ obj.server_port,
+ cls))
class PlumageSessionThread(MintDaemonThread):
def __init__(self, app, server_host, server_port, cls):
@@ -209,11 +209,12 @@
self._init()
# We create objects here. Tag them with the right class,
- # probably specified to us from a config option (with a corresponding
- # query specification in the xml)
+ # probably specified to us from a config option
+ # (with a corresponding query specification in the xml)
(oldest, newest) = self.app.update_thread.get_first_and_last_sample_timestamp(self.cls)
if oldest is None:
- # if we have no oldest record (first run), start at "10 min ago" and start loading everything
+ # if we have no oldest record (first run),
+ # start at "10 min ago" and start loading everything
oldest = datetime.now() - timedelta(seconds=600)
oldest = oldest + UTC_DIFF
@@ -293,9 +294,14 @@
obj = ObjectUpdate(self.app.model, record, self.cls)
self.app.update_thread.enqueue(obj)
- log.info("CatchupPlumageAccountantThread--catch-up: catch-up run completed for records newer than %s and older than %s" % (newest, datetime.now() - timedelta(seconds=300) + UTC_DIFF))
+ log.info("CatchupPlumageAccountantThread--catch-up: "\
+ "catch-up run completed for records newer than %s "\
+ "and older than %s" % \
+ (newest,
+ datetime.now() - timedelta(seconds=300) + UTC_DIFF))
else:
- log.info("CatchUpPlumageSessionThread: Skipping catch-up, no records present (probably first-run)")
+ log.info("CatchUpPlumageSessionThread: "\
+ "Skipping catch-up, no records present (probably first-run)")
except Exception, e:
log.info("%s got exception %s, exiting" % (self.__class__.__name__, str(e)))
@@ -435,8 +441,13 @@
obj = ObjectUpdate(self.app.model, record, self.cls)
self.app.update_thread.enqueue(obj)
- log.info("CatchUpPlumageSessionThread--catch-up: catch-up run completed for records newer than %s and older than %s" % (newest, datetime.now() - timedelta(seconds=300) + UTC_DIFF))
+ log.info("CatchUpPlumageSessionThread--catch-up: "\
+ "catch-up run completed for records newer than %s "\
+ "and older than %s" % \
+ (newest,
+ datetime.now() - timedelta(seconds=300) + UTC_DIFF))
else:
- log.info("CatchUpPlumageSessionThread: Skipping catch-up, no records present (probably first-run)")
+ log.info("CatchUpPlumageSessionThread: "\
+ "Skipping catch-up, no records present (probably first-run)")
except Exception, e:
log.info("%s got exception %s, exiting" % (self.__class__.__name__, str(e)))
Modified: branches/tmckay/mint/python/mint/plumage/update.py
===================================================================
--- branches/tmckay/mint/python/mint/plumage/update.py 2012-08-31 17:58:10 UTC (rev 5454)
+++ branches/tmckay/mint/python/mint/plumage/update.py 2012-09-04 18:15:20 UTC (rev 5455)
@@ -52,17 +52,10 @@
else:
log.debug("Skipping persistent class " + str(cls))
- if len(self.app.classes):
- log.debug("Delete all objects by bound classes " +\
- str(self.app.classes))
- for cls in self.app.classes:
- loop_body(cls)
- else:
- log.debug("Delete all objects by bound packages " +\
- str(self.app.packages))
- for pkg in self.app.packages:
- for cls in pkg._classes:
- loop_body(cls)
+ classes = self.app.get_bound_classes()
+ log.debug("Delete all objects by bound classes " + str(classes))
+ for cls in classes:
+ loop_body(cls)
def init(self):
self.conn = self.app.database.get_connection()
Modified: branches/tmckay/mint/python/mint/session.py
===================================================================
--- branches/tmckay/mint/python/mint/session.py 2012-08-31 17:58:10 UTC (rev 5454)
+++ branches/tmckay/mint/python/mint/session.py 2012-09-04 18:15:20 UTC (rev 5455)
@@ -7,10 +7,11 @@
log = logging.getLogger("mint.session")
class MintSession(object):
- def __init__(self, app, broker_uris):
+ def __init__(self, app, broker_uris, qmf_classes):
self.app = app
self.broker_uris = broker_uris
+ self.qmf_classes = qmf_classes
self.qmf_session = None
self.qmf_brokers = list()
@@ -24,24 +25,6 @@
qmf_broker = self.qmf_session.addBroker(uri, mechanisms=mechs)
self.qmf_brokers.append(qmf_broker)
- def init(self):
- log.info("Initializing %s", self)
-
- def init_qmf_classes(self):
- # Apply the package filter to the class list
- if len(self.app.qmf_classes):
- black_list = set()
- for cls in self.app.qmf_classes:
- if cls._package._name in self.app.qmf_package_filter:
- black_list.add(cls)
- self.app.qmf_classes.difference_update(black_list)
- else:
- # Generate the package list from the model, minus
- # the package filter
- for pkg in self.app.model._packages:
- if pkg._name not in self.app.qmf_package_filter:
- self.app.qmf_packages.add(pkg)
-
def start(self):
log.info("Starting %s", self)
@@ -57,18 +40,11 @@
self.qmf_session.bindAgent("*")
log.info("Binding all agents")
- # Handle bind by class
- if len(self.app.qmf_classes):
- for cls in self.app.qmf_classes:
- pname = cls._package._name
- cname = cls._name
- self.qmf_session.bindClass(pname.lower(), cname.lower())
- log.info("Binding QMF class %s.%s" % (pname, cname))
- else:
- # Handle bind by package
- for pkg in self.app.qmf_packages:
- self.qmf_session.bindPackage(pkg._name.lower())
- log.info("Binding QMF package %s" % pkg._name)
+ for cls in self.qmf_classes:
+ pname = cls._package._name
+ cname = cls._name
+ self.qmf_session.bindClass(pname.lower(), cname.lower())
+ log.info("Binding QMF class %s.%s" % (pname, cname))
for uri in self.broker_uris:
self.add_broker(uri)
Modified: branches/tmckay/mint/python/mint/update.py
===================================================================
--- branches/tmckay/mint/python/mint/update.py 2012-08-31 17:58:10 UTC (rev 5454)
+++ branches/tmckay/mint/python/mint/update.py 2012-09-04 18:15:20 UTC (rev 5455)
@@ -45,19 +45,11 @@
else:
log.debug("Skipping persistent class " + str(cls))
- if len(self.app.qmf_classes):
- log.debug("Delete all objects by bound classes " +\
- str(self.app.qmf_classes))
- for cls in self.app.qmf_classes:
- loop_body(cls)
- else:
- # We bound all classes. Loop over model
- log.debug("Delete all objects by bound packages " +\
- str(self.app.qmf_packages))
- for pkg in self.app.qmf_packages:
- for cls in pkg._classes:
- loop_body(cls)
-
+ classes = self.app.get_bound_classes()
+ log.debug("Delete all objects by bound classes " + str(classes))
+ for cls in classes:
+ loop_body(cls)
+
def init(self):
self.conn = self.app.database.get_connection()
@@ -65,8 +57,8 @@
self.cursor.stats = self.stats
def enqueue(self, update):
+ update.app = self.app
self.updates.put(update)
-
self.stats.enqueued += 1
def run(self):
@@ -216,14 +208,13 @@
class Update(object):
def __init__(self, model):
self.model = model
+ self.app = None
def process(self, thread):
log.debug("Processing %s", self)
try:
- self.do_process(thread.cursor, thread.stats,
- thread.app.qmf_classes, thread.app.qmf_packages)
-
+ self.do_process(thread.cursor, thread.stats)
thread.conn.commit()
except UpdateDropped:
log.debug("Update dropped")
@@ -243,7 +234,7 @@
if thread.halt_on_error:
raise
- def do_process(self, cursor, stats, bound_classes, bound_packages):
+ def do_process(self, cursor, stats):
raise Exception("Not implemented")
def __repr__(self):
@@ -255,7 +246,7 @@
self.qmf_object = qmf_object
- def do_process(self, cursor, stats, bound_classes, bound_packages):
+ def do_process(self, cursor, stats):
cls = self.get_class()
agent_id = self.get_agent_id()
object_id = self.get_object_id()
@@ -654,7 +645,7 @@
def get_agent_id(self):
return make_agent_id(self.qmf_agent)
- def do_process(self, cursor, stats, bound_classes, bound_packages):
+ def do_process(self, cursor, stats):
agent_id = self.get_agent_id()
try:
@@ -671,8 +662,7 @@
stats.agents_updated += 1
- def delete_agent_objects(self, cursor, stats, agent,
- bound_classes, bound_packages):
+ def delete_agent_objects(self, cursor, stats, agent):
def loop_body(cls):
if cls._storage != "none" and cls.check_persistent() == "session":
@@ -681,18 +671,10 @@
#stats.objects_deleted_by_class[cls] += count
cursor.connection.commit()
- if len(bound_classes):
- log.debug("Delete agent objects by bound classes " +\
- str(bound_classes))
- for cls in bound_classes:
- loop_body(cls)
- else:
- # We bound all classes. Loop over model
- log.debug("Delete agent objects by bound packages " +\
- str(bound_packages))
- for pkg in bound_packages:
- for cls in pkg._classes:
- loop_body(cls)
+ classes = self.app.get_bound_classes()
+ log.debug("Delete agent objects by bound classes " + str(classes))
+ for cls in classes:
+ loop_body(cls)
def __repr__(self):
name = self.__class__.__name__
@@ -701,7 +683,7 @@
return "%s(%s)" % (name, agent_id)
class AgentDelete(AgentUpdate):
- def do_process(self, cursor, stats, bound_classes, bound_packages):
+ def do_process(self, cursor, stats):
agent_id = self.get_agent_id()
try:
@@ -713,7 +695,7 @@
stats.agents_deleted += 1
- self.delete_agent_objects(cursor, stats, agent, bound_classes, bound_packages)
+ self.delete_agent_objects(cursor, stats, agent)
class UpdateDropped(Exception):
pass
11 years, 8 months