Author: tmckay
Date: 2012-12-11 21:16:54 +0000 (Tue, 11 Dec 2012)
New Revision: 5584
Modified:
branches/elephant/cumin/bin/cumin-web
branches/elephant/cumin/python/cumin/config.py
branches/elephant/cumin/python/cumin/main.py
branches/elephant/sage/python/sage/aviary/aviaryoperations.py
Log:
Add initial stub for Aviary hadoop operations to aviaryoperations.
Modified: branches/elephant/cumin/bin/cumin-web
===================================================================
--- branches/elephant/cumin/bin/cumin-web 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/cumin/bin/cumin-web 2012-12-11 21:16:54 UTC (rev 5584)
@@ -21,6 +21,7 @@
def set_aviary_configs(cumin, values):
cumin.aviary_job_servers = values.aviary_job_servers
cumin.aviary_query_servers = values.aviary_query_servers
+ cumin.aviary_hadoop_servers = values.aviary_hadoop_servers
cumin.aviary_locator = values.aviary_locator
cumin.aviary_key = values.aviary_key
cumin.aviary_cert = values.aviary_cert
Modified: branches/elephant/cumin/python/cumin/config.py
===================================================================
--- branches/elephant/cumin/python/cumin/config.py 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/cumin/python/cumin/config.py 2012-12-11 21:16:54 UTC (rev 5584)
@@ -196,6 +196,9 @@
param = ConfigParameter(self, "aviary-job-servers", str)
param.default = "http://localhost:9090"
+ param = ConfigParameter(self, "aviary-hadoop-servers", str)
+ param.default = "http://localhost:9090"
+
param = ConfigParameter(self, "aviary-query-servers", str)
param.default = "http://localhost:9091"
Modified: branches/elephant/cumin/python/cumin/main.py
===================================================================
--- branches/elephant/cumin/python/cumin/main.py 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/cumin/python/cumin/main.py 2012-12-11 21:16:54 UTC (rev 5584)
@@ -104,6 +104,7 @@
# Aviary operations for that server type will not be used.
self.aviary_job_servers = ""
self.aviary_query_servers = ""
+ self.aviary_hadoop_servers = ""
self.aviary_key = ""
self.aviary_cert = ""
self.aviary_root_cert = ""
@@ -202,45 +203,53 @@
# given op...
self.remote = Catalog()
ops = [QmfOperations("qmf", self.session)]
+ try:
+ from sage.aviary.aviaryoperations \
+ import SudsLogging, AviaryOperationsFactory
+ imports_ok = True
+ except:
+ imports_ok = False
- imports_ok = True
- if self.aviary_job_servers or self.aviary_query_servers:
- try:
- from sage.aviary.aviaryoperations import \
- SudsLogging, AviaryOperationsFactory
- except:
- imports_ok = False
- if imports_ok:
- SudsLogging.set(self.aviary_suds_logs, self.home)
+ # Turn off hadoop operations config if the module isn't loaded
+ hadoop_loaded = "gridhadoop" in
self.authorizator.get_enabled_modules()
+ if not hadoop_loaded:
+ self.aviary_hadoop_servers = ""
- # By default Cumin uses /var/lib/condor/aviary/services for
- # aviary wsdl files if it exists.
- aviary_dir = ["/var/lib/condor/aviary/services",
- os.path.join(self.home, "rpc-defs/aviary")]
- if not self.aviary_prefer_condor:
- aviary_dir = [aviary_dir[1], aviary_dir[0]]
+ if imports_ok:
+ SudsLogging.set(self.aviary_suds_logs, self.home)
- # The factory will choose an impl that gives us jobs, queries,
- # or both depending on whether job_servers and query_servers
- # are empty strings. If locator is non empty, their actual
- # values will be overridden but the presence of a value will
- # still control enable/disable.
- aviary_itf = AviaryOperationsFactory("aviary", aviary_dir,
- self.aviary_locator,
- self.aviary_job_servers,
- self.aviary_query_servers,
- key=self.aviary_key,
- cert=self.aviary_cert,
- root_cert=self.aviary_root_cert,
- domain_verify=self.aviary_domain_verify)
+ # By default Cumin uses /var/lib/condor/aviary/services for
+ # aviary wsdl files if it exists.
+ aviary_dir = ["/var/lib/condor/aviary/services",
+ os.path.join(self.home, "rpc-defs/aviary")]
+ if not self.aviary_prefer_condor:
+ aviary_dir = [aviary_dir[1], aviary_dir[0]]
+
+ # The factory will choose an impls that give us services
+ # based on the values of the 'X_servers' arguments.
+ # Empty/non-empty for a 'X_servers' value controls enable/disable.
+ (aviary_itf,
+ hadoop_itf) = AviaryOperationsFactory("aviary", aviary_dir,
+ self.aviary_locator,
+ self.aviary_job_servers,
+ self.aviary_query_servers,
+ self.aviary_hadoop_servers,
+ key=self.aviary_key,
+ cert=self.aviary_cert,
+ root_cert=self.aviary_root_cert,
+ domain_verify=self.aviary_domain_verify)
+ if hadoop_itf:
+ ops.insert(0, hadoop_itf)
+ if aviary_itf:
ops.insert(0, aviary_itf)
- else:
- log.info("Imports failed for Aviary interface, disabling")
+ else:
+ log.info("Imports failed for Aviary interface, disabling")
log.info("%s Aviary locator interface" % \
((self.aviary_locator and \
(self.aviary_job_servers or \
- self.aviary_query_servers) and \
+ self.aviary_query_servers or
+ self.aviary_hadoop_servers) and \
imports_ok) and "Enabled" or "Disabled"))
log.info("%s Aviary interface for job submission and control." % \
@@ -249,6 +258,10 @@
log.info("%s Aviary interface for query operations." % \
((self.aviary_query_servers and imports_ok) and "Enabled" or
"Disabled"))
+ if hadoop_loaded:
+ log.info("%s Aviary interface for hadoop operations." % \
+ ((self.aviary_hadoop_servers and imports_ok) and
"Enabled" or "Disabled"))
+
self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-11 20:32:57 UTC
(rev 5583)
+++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-11 21:16:54 UTC
(rev 5584)
@@ -632,7 +632,24 @@
query_client, "getSubmissionSummary", subId)
t.start()
+class _AviaryHadoopMethods(object):
+ # Do this here rather than __init__ so we don't have to worry about
+ # matching parameter lists in multiple inheritance cases with super
+ def init(self, datadir, hadoop_servers):
+
+ if self.locator:
+ self.hadoop_servers = ServerList(self.locator, "SCHEDULER",
"HADOOP")
+ else:
+ self.hadoop_servers = FixedServerList(hadoop_servers,
+ "9090",
+ "/services/hadoop/",
+ "HADOOP")
+
+ hadoop_wsdl = "file:" + os.path.join(get_datadir(datadir,
"hadoop"),
+ "aviary-hadoop.wsdl")
+ self.hadoop_client_pool = ClientPool(hadoop_wsdl, None)
+
class _AviaryCommon(object):
def __init__(self, name, locator,
key="", cert="", root_cert="",
domain_verify=True):
@@ -783,12 +800,25 @@
_AviaryQueryMethods.init(self, datadir, query_servers)
+class AviaryHadoopOperations(_AviaryCommon, _AviaryHadoopMethods):
+ def __init__(self, name, datadir, locator, hadoop_servers,
+ key="", cert="", root_cert="",
domain_verify=True):
+
+ super(AviaryHadoopOperations, self).__init__(name, locator,
+ key, cert, root_cert,
+ domain_verify)
+
+ _AviaryHadoopMethods.init(self, datadir, hadoop_servers)
+
def AviaryOperationsFactory(name, datadir, locator_uri,
- job_servers, query_servers,
+ job_servers, query_servers, hadoop_servers,
key="", cert="", root_cert="",
domain_verify=True):
# If locator uri has not been specified, it's disabled and we will
- # use the specified job_servers and query_servers values
+ # use the specified job_servers and query_servers values.
+ # These operations are historically selectable because originally
+ # they were an alternative implementation to QMF methods.
+ # At some point in the future this may change.
if locator_uri:
locator = AviaryLocator(datadir, locator_uri,
key, cert, root_cert, domain_verify)
@@ -796,14 +826,23 @@
locator = None
if job_servers and query_servers:
- res = AviaryOperations(name, datadir, locator,
+ ops = AviaryOperations(name, datadir, locator,
job_servers, query_servers,
key, cert, root_cert, domain_verify)
elif job_servers:
- res = AviaryJobOperations(name, datadir, locator,job_servers,
+ ops = AviaryJobOperations(name, datadir, locator,job_servers,
key, cert, root_cert, domain_verify)
elif query_servers:
- res = AviaryQueryOperations(name, datadir, locator,query_servers,
+ ops = AviaryQueryOperations(name, datadir, locator,query_servers,
key, cert, root_cert, domain_verify)
- return res
+ else:
+ ops = None
+ if hadoop_servers:
+ hadoop = AviaryHadoopOperations(name, datadir, locator, hadoop_servers,
+ key, cert, root_cert, domain_verify)
+ else:
+ hadoop = None
+
+ return ops, hadoop
+