r5594 - branches/elephant/sage/python/sage/aviary
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-12-14 14:58:59 +0000 (Fri, 14 Dec 2012)
New Revision: 5594
Modified:
branches/elephant/sage/python/sage/aviary/aviaryoperations.py
Log:
Make start_name_node async only.
Add stop_name_node (also async).
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-14 14:16:55 UTC (rev 5593)
+++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-14 14:58:59 UTC (rev 5594)
@@ -474,8 +474,8 @@
subtype)
def set_job_attribute(self, scheduler, job_id, name, value, callback, submission):
- assert callback
-
+ assert callable(callback)
+
def my_callback(result):
self.client_pool.return_object(job_client)
result = self._pretty_result(result, scheduler.Machine)
@@ -507,7 +507,7 @@
t.start()
def submit_job(self, scheduler, ad, callback):
- assert callback
+ assert callable(callback)
type_to_aviary = {int: "INTEGER",
float: "FLOAT",
@@ -800,7 +800,7 @@
return res;
def get_job_summaries(self, submission, callback, machine_name):
- assert callback
+ assert callable(callback)
def to_int_seconds(dt):
# Change a datetime.datetime into int seconds since epoch
@@ -904,7 +904,8 @@
# Equivalence? We don't want no stinking QMF structural equivalence!
self.use_MethodResult_for_sync_calls = False
- def start_name_node(self, host, bin_file, owner, callback=None):
+ def start_name_node(self, host, bin_file, owner, callback):
+ assert callable(callback)
client = self.client_pool.get_object()
self._setup_client(client,
@@ -923,23 +924,57 @@
data = result.ref
return (status, data)
- if callback:
- def my_callback(result):
- self.client_pool.return_object(client)
- callback(*result_tuple(result, host))
+ def my_callback(result):
+ self.client_pool.return_object(client)
+ callback(*result_tuple(result, host))
- t = CallThread(self.call_client_retry, my_callback,
- client, "startNameNode", bin_file, owner)
- t.start()
- else:
- def my_process_results(result):
- return result_tuple(result, host)
+ t = CallThread(self.call_client_retry, my_callback,
+ client, "startNameNode", bin_file, owner)
+ t.start()
- res = self._call_sync(self.call_client_retry, my_process_results,
- client, "startNameNode", bin_file, owner)
+ def stop_name_node(self, host, ids, callback):
+ assert callable(callback)
+
+ if type(ids) not in (list, tuple):
+ ids = (ids)
+
+ client = self.client_pool.get_object()
+ self._setup_client(client,
+ self.servers,
+ host,
+ "stopNameNode")
+
+ refs = []
+ for id in ids:
+ new_ref = client.factory.create("ns1:HadoopID")
+ try:
+ if type(id) in (str,unicode):
+ new_ref.ipc = id
+ else:
+ new_ref.id = int(id)
+ except Exception:
+ raise Exception("Invalid HadoopID value")
+ refs.append(new_ref)
+
+ def result_tuple(result, host):
+ data = None
+ result = self._pretty_result(result, host)
+ if isinstance(result, Exception):
+ status = result
+ else:
+ status = _AviaryCommon._get_status(result.status)
+ if status == "OK" and hasattr(result, "results"):
+ data = result.results
+ return (status, data)
+
+ def my_callback(result):
self.client_pool.return_object(client)
- return res;
+ callback(*result_tuple(result, host))
+ t = CallThread(self.call_client_retry, my_callback,
+ client, "stopNameNode", refs)
+ t.start()
+
class AviaryOperations(Catalog):
def __init__(self, name, datadir,
locator_uri="",
11 years, 4 months
r5593 - branches/elephant/sage/python/sage/aviary
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-12-14 14:16:55 +0000 (Fri, 14 Dec 2012)
New Revision: 5593
Modified:
branches/elephant/sage/python/sage/aviary/aviaryoperations.py
Log:
Allow sync calls for hadoop to simply return (status, data)
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-14 13:16:22 UTC (rev 5592)
+++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-14 14:16:55 UTC (rev 5593)
@@ -328,6 +328,13 @@
self.subtype = subtype
self.client_pool = ClientPool(wsdl, None)
+ # Things in Aviary that were born as swap-ins/analogs for
+ # existing QMF methods need to pass synchronous results back
+ # in a MethodResult object because that's what code expected
+ # from QMF. However, newer stuff (like hadoop) doesn't need
+ # to wrap results this way. Let it be controlled.
+ self.use_MethodResult_for_sync_calls = True
+
def get_hosts(self, resource, subtype):
if resource == self.resource and \
(not subtype or self.subtype == subtype):
@@ -435,8 +442,11 @@
result = meth(*meth_args, **meth_kwargs)
except Exception, e:
result = e
- sync.get_completion()(*process_results(result))
- return MethodResult(sync)
+ if self.use_MethodResult_for_sync_calls:
+ sync.get_completion()(*process_results(result))
+ return MethodResult(sync)
+ else:
+ return process_results(result)
class _AviaryJobMethods(_AviaryCommon):
def __init__(self, locator, transports, datadir, job_servers):
@@ -891,6 +901,9 @@
resource,
subtype)
+ # Equivalence? We don't want no stinking QMF structural equivalence!
+ self.use_MethodResult_for_sync_calls = False
+
def start_name_node(self, host, bin_file, owner, callback=None):
client = self.client_pool.get_object()
11 years, 4 months
r5592 - trunk/cumin/resources
by croberts@fedoraproject.org
Author: croberts
Date: 2012-12-14 13:16:22 +0000 (Fri, 14 Dec 2012)
New Revision: 5592
Modified:
trunk/cumin/resources/app.js
Log:
Per BZ886590, disabling the seriesToggle functionality.
Modified: trunk/cumin/resources/app.js
===================================================================
--- trunk/cumin/resources/app.js 2012-12-13 22:20:57 UTC (rev 5591)
+++ trunk/cumin/resources/app.js 2012-12-14 13:16:22 UTC (rev 5592)
@@ -629,7 +629,7 @@
placement: "outside",
rendererOptions:{
numberColumns:6,
- seriesToggle:"fast",
+ seriesToggle:false,
disableIEFading:true
}
},
11 years, 4 months
r5591 - branches/elephant/sage/python/sage/aviary
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-12-13 22:20:57 +0000 (Thu, 13 Dec 2012)
New Revision: 5591
Modified:
branches/elephant/sage/python/sage/aviary/aviaryoperations.py
Log:
More cleanup after refactor
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-13 21:54:54 UTC (rev 5590)
+++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-13 22:20:57 UTC (rev 5591)
@@ -27,7 +27,10 @@
the Suds module. It is meant as a debugging aid during development.
'''
_on = False
- sudslogs = {"suds.client": None, "suds.transport": None, "suds.xsd.schema": None, "suds.wsdl": None}
+ sudslogs = {"suds.client": None,
+ "suds.transport": None,
+ "suds.xsd.schema": None,
+ "suds.wsdl": None}
@classmethod
def set(cls, flag, home):
@@ -69,62 +72,67 @@
#the clients that we pool so that we can call set_options on the
#transport.
-def _get_host(name, servers):
- '''
- Lookup a host in a dictionary produced by sage.util.host_list.
- The host may be just a hostname or hostname:port.
- Return the scheme and URL for the host. The scheme is separated
- out for convenience (although it could easily be parsed from the
- URL)
- '''
- scheme = ""
- host = ""
- port = None
-
- # Support hostnames with ports, too.
- name = socket.getfqdn(name)
- if ":" in name:
- u = parse_URL(name)
- name = u.host
- port = u.port
-
- if name in servers:
- url = None
- urls = servers[name]
- if port is not None:
- # find the matching port
- for u in urls:
- if u.port == port:
- url = u
- break
-
- elif len(urls) > 0:
- url = random.sample(urls, 1)[0]
-
- if url:
- scheme = url.scheme
- host = str(url)
- # A particular method name is going to be appended to path,
- # so ensure the final "/" here.
- if not host.endswith("/"):
- host += "/"
- return scheme, host
-
-# Nice, friendly strings for error messages on lookup
-_nice = {"JOB": "job service",
- "QUERY_SERVER": "query service",
- "HADOOP": "hadoop service"}
-
class BaseServerList(object):
- def __init__(self):
+ # Nice, friendly strings for error messages on lookup
+ _nice = {"JOB": "job service",
+ "QUERY_SERVER": "query service",
+ "HADOOP": "hadoop service"}
+
+ def __init__(self, subtype):
self.servers = None
+ self.should_retry = False
+ try:
+ self.nice = BaseServerList._nice[subtype]
+ except:
+ self.nice = subtype
+
def _get_host_set(self):
hosts = set()
for k,v in self.servers.iteritems():
for url in v:
hosts.add(k+":"+url.port)
return hosts
+
+ def _get_host(self, name):
+ '''
+ Lookup a host in a dictionary produced by sage.util.host_list.
+ The host may be just a hostname or hostname:port.
+ Return the scheme and URL for the host. The scheme is separated
+ out for convenience (although it could be parsed from the URL)
+ '''
+ scheme = ""
+ host = ""
+ port = None
+
+ # Support hostnames with ports, too.
+ name = socket.getfqdn(name)
+ if ":" in name:
+ u = parse_URL(name)
+ name = u.host
+ port = u.port
+
+ if name in self.servers:
+ url = None
+ urls = self.servers[name]
+ if port is not None:
+ # find the matching port
+ for u in urls:
+ if u.port == port:
+ url = u
+ break
+
+ elif len(urls) > 0:
+ url = random.sample(urls, 1)[0]
+
+ if url:
+ scheme = url.scheme
+ host = str(url)
+ # A particular method name is going to be appended to path,
+ # so ensure the final "/" here.
+ if not host.endswith("/"):
+ host += "/"
+ return scheme, host
def get_hosts(self):
return self._get_host_set()
@@ -136,22 +144,17 @@
Allow the cached list to be refreshed on demand.
'''
def __init__(self, locator, resource, subtype):
+ super(ServerList, self).__init__(subtype)
# Since we have a dynamic server list, failed operations
# may be retried
self.should_retry = True
self._lock = Lock()
- self.servers = None
self.locator = locator
self.resource = resource
self.subtype = subtype
- try:
- self.nice = _nice[subtype]
- except:
- self.nice = subtype
-
def _refresh(self):
log.debug("AviaryOperations: refresh server list for %s %s" \
% (self.resource, self.subtype))
@@ -193,7 +196,7 @@
try:
if self.servers is None or refresh:
self._refresh()
- scheme, host = _get_host(machine, self.servers)
+ scheme, host = self._get_host(machine)
finally:
self._lock.release()
return scheme, host
@@ -251,9 +254,8 @@
messages. Predefind values are "JOB" and "QUERY_SERVER".
The scheme for a URL will be "http" if not specified.
'''
- # Fixed server list, there is no point to a retry on failed ops.
- self.should_retry = False
-
+ super(FixedServerList, self).__init__(subtype)
+
# Replace any occurrence of locahost with output of gethostname()
# before parsing to match Machine fields of QMF objects later on.
host = socket.gethostname()
@@ -264,18 +266,13 @@
default_port=port,
default_path=path)
- try:
- self.nice = _nice[subtype]
- except:
- self.nice = subtype
-
def find_server(self, machine, *args):
'''
Lookup a URL in the fixed server list by hostname or hostname:port.
Return the scheme and URL for the specified host if found or raise
an exception if the host could not be found.
'''
- scheme, host = _get_host(machine, self.servers)
+ scheme, host = self._get_host(machine)
if host == "":
log.info("AviaryOperations: failed to locate %s on %s" \
% (self.nice, machine))
@@ -335,7 +332,7 @@
if resource == self.resource and \
(not subtype or self.subtype == subtype):
return self.servers.get_hosts()
- return []
+ return set()
def _set_client_info(self, client, refresh=False):
# See if the server_name is a full URL. If so, we'll use it.
11 years, 4 months
r5590 - branches/elephant/cumin/python/cumin/gridhadoop
by croberts@fedoraproject.org
Author: croberts
Date: 2012-12-13 21:54:54 +0000 (Thu, 13 Dec 2012)
New Revision: 5590
Modified:
branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py
branches/elephant/cumin/python/cumin/gridhadoop/namenode.py
Log:
Initial attempt to wire-in get_hosts and start_name_node
Modified: branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py 2012-12-13 21:24:44 UTC (rev 5589)
+++ branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py 2012-12-13 21:54:54 UTC (rev 5590)
@@ -131,7 +131,7 @@
(session)
def do_get_items(self, session):
- hosts = ["grid2.lab.bos.redhat.com"]
+ hosts = self.app.remote.get_hosts("SCHEDULER", "HADOOP")
return hosts
def render_item_value(self, session, item):
Modified: branches/elephant/cumin/python/cumin/gridhadoop/namenode.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-13 21:24:44 UTC (rev 5589)
+++ branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-13 21:54:54 UTC (rev 5590)
@@ -68,16 +68,13 @@
if result == False:
self.invoc.status = self.invoc.FAILED
self.invoc.end()
-
- def fake_call(self, binfile, owner, hadoophost):
- return True
-
+
def do_invoke(self, session, object, invoc, args):
self.invoc = invoc
(binfile, owner, hadoophost) = args
try:
- call_async(self.callback, self.fake_call, binfile, owner, hadoophost)
+ call_async(self.callback, self.app.remote.start_name_node, hadoophost, binfile, owner)
except Exception, e:
invoc.status = invoc.FAILED
log.debug("Creating name node failed", exc_info=True)
11 years, 4 months
r5588 - branches/elephant/cumin/python/cumin/gridhadoop
by croberts@fedoraproject.org
Author: croberts
Date: 2012-12-13 15:38:59 +0000 (Thu, 13 Dec 2012)
New Revision: 5588
Modified:
branches/elephant/cumin/python/cumin/gridhadoop/datanode.py
branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py
branches/elephant/cumin/python/cumin/gridhadoop/namenode.py
branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py
Log:
Get all 4 hadoop top level "objects" ready for trying out actual calls when available.
Modified: branches/elephant/cumin/python/cumin/gridhadoop/datanode.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/datanode.py 2012-12-13 14:51:02 UTC (rev 5587)
+++ branches/elephant/cumin/python/cumin/gridhadoop/datanode.py 2012-12-13 15:38:59 UTC (rev 5588)
@@ -9,7 +9,7 @@
from hadoop import *
-from sage.util import MethodResult
+from sage.util import *
class DataNodeSelector(ObjectSelector):
@@ -95,6 +95,26 @@
self.form = DataNodeCreateForm(app, self.name, self, cls)
self.invoc = None
+
+
+ def callback(self, result):
+ if result == False:
+ self.invoc.status = self.invoc.FAILED
+ self.invoc.end()
+
+ def fake_call(self, binfile, owner, hadoophost, count, name_node):
+ return True
+
+ def do_invoke(self, session, object, invoc, args):
+ self.invoc = invoc
+ (binfile, owner, hadoophost, count, name_node) = args
+
+ try:
+ call_async(self.callback, self.fake_call, binfile, owner, hadoophost, count, name_node)
+ except Exception, e:
+ invoc.status = invoc.FAILED
+ log.debug("Creating data node failed", exc_info=True)
+ invoc.end()
def get_title(self, session, x):
return "Create data nodes"
@@ -129,10 +149,12 @@
self.page.redirect.set(session, url)
if not self.errors.get(session):
+ binfile = self.binfile.get(session)
+ owner = self.owner.get(session)
+ hadoophost = self.hadoophost.get(session)
count = self.count.get(session)
name_node = self.nameNode.get(session)
-
- print "Here is where I'd do the work and start up %s data nodes bound to name node %s" % (count, name_node)
+ self.task.invoke(session, None, (binfile, owner, hadoophost, count, name_node))
self.task.exit_with_redirect(session, url)
def render_title(self, session):
Modified: branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py 2012-12-13 14:51:02 UTC (rev 5587)
+++ branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py 2012-12-13 15:38:59 UTC (rev 5588)
@@ -9,7 +9,7 @@
from hadoop import *
-from sage.util import MethodResult
+from sage.util import *
class JobTrackerSelector(ObjectSelector):
@@ -94,6 +94,25 @@
self.form = JobTrackerCreateForm(app, self.name, self, cls)
self.invoc = None
+
+ def callback(self, result):
+ if result == False:
+ self.invoc.status = self.invoc.FAILED
+ self.invoc.end()
+
+ def fake_call(self, binfile, owner, hadoophost):
+ return True
+
+ def do_invoke(self, session, object, invoc, args):
+ self.invoc = invoc
+ (binfile, owner, hadoophost) = args
+
+ try:
+ call_async(self.callback, self.fake_call, binfile, owner, hadoophost)
+ except Exception, e:
+ invoc.status = invoc.FAILED
+ log.debug("Creating job tracker failed", exc_info=True)
+ invoc.end()
def get_title(self, session, x):
return "Create a job tracker"
@@ -103,9 +122,6 @@
def __init__(self, app, name, task, cls):
super(JobTrackerCreateForm, self).__init__(app, name, task, cls)
- self.count = self.CountField(app, "count")
- self.add_field(self.count)
-
self.binfile = BinFileLoc(app, "binfile")
self.add_field(self.binfile)
@@ -125,14 +141,11 @@
self.page.redirect.set(session, url)
if not self.errors.get(session):
- count = self.count.get(session)
-
- print "Here is where I'd do the work and start up %s job trackers" % count
+ binfile = self.binfile.get(session)
+ owner = self.owner.get(session)
+ hadoophost = self.hadoophost.get(session)
+ self.task.invoke(session, None, (binfile, owner, hadoophost))
self.task.exit_with_redirect(session, url)
def render_title(self, session):
- return "Create a job tracker"
-
- class CountField(StringField):
- def render_title(self, session):
- return "Number of job trackers to start"
\ No newline at end of file
+ return "Create a job tracker"
\ No newline at end of file
Modified: branches/elephant/cumin/python/cumin/gridhadoop/namenode.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-13 14:51:02 UTC (rev 5587)
+++ branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-13 15:38:59 UTC (rev 5588)
@@ -77,11 +77,8 @@
(binfile, owner, hadoophost) = args
try:
- print "Here is where I'd do the work and start up %s name nodes" % count
call_async(self.callback, self.fake_call, binfile, owner, hadoophost)
- #call_async(self.callback, self.app.hadoop.create_name_node, name_nodes)
- pass
- except:
+ except Exception, e:
invoc.status = invoc.FAILED
log.debug("Creating name node failed", exc_info=True)
invoc.end()
Modified: branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py 2012-12-13 14:51:02 UTC (rev 5587)
+++ branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py 2012-12-13 15:38:59 UTC (rev 5588)
@@ -9,7 +9,7 @@
from hadoop import *
-from sage.util import MethodResult
+from sage.util import *
class TaskTrackerSelector(ObjectSelector):
@@ -94,6 +94,25 @@
self.form = TaskTrackerCreateForm(app, self.name, self, cls)
self.invoc = None
+
+ def callback(self, result):
+ if result == False:
+ self.invoc.status = self.invoc.FAILED
+ self.invoc.end()
+
+ def fake_call(self, binfile, owner, hadoophost, count, job_tracker):
+ return True
+
+ def do_invoke(self, session, object, invoc, args):
+ self.invoc = invoc
+ (binfile, owner, hadoophost, count, job_tracker) = args
+
+ try:
+ call_async(self.callback, self.fake_call, binfile, owner, hadoophost, count, job_tracker)
+ except Exception, e:
+ invoc.status = invoc.FAILED
+ log.debug("Creating task trackers failed", exc_info=True)
+ invoc.end()
def get_title(self, session, x):
return "Create task trackers"
@@ -128,10 +147,12 @@
self.page.redirect.set(session, url)
if not self.errors.get(session):
+ binfile = self.binfile.get(session)
+ owner = self.owner.get(session)
+ hadoophost = self.hadoophost.get(session)
count = self.count.get(session)
job_tracker = self.jobTracker.get(session)
-
- print "Here is where I'd do the work and start up %s task trackers bound to job tracker %s" % (count, job_tracker)
+ self.task.invoke(session, None, (binfile, owner, hadoophost, count, job_tracker))
self.task.exit_with_redirect(session, url)
def render_title(self, session):
11 years, 4 months
r5587 - branches/elephant/cumin/python/cumin/gridhadoop
by croberts@fedoraproject.org
Author: croberts
Date: 2012-12-13 14:51:02 +0000 (Thu, 13 Dec 2012)
New Revision: 5587
Modified:
branches/elephant/cumin/python/cumin/gridhadoop/datanode.py
branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py
branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py
branches/elephant/cumin/python/cumin/gridhadoop/namenode.py
branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py
Log:
More form adjustments, wiring to tasks.
Modified: branches/elephant/cumin/python/cumin/gridhadoop/datanode.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/datanode.py 2012-12-12 18:46:49 UTC (rev 5586)
+++ branches/elephant/cumin/python/cumin/gridhadoop/datanode.py 2012-12-13 14:51:02 UTC (rev 5587)
@@ -109,6 +109,15 @@
self.count = self.CountField(app, "count")
self.add_field(self.count)
+
+ self.binfile = BinFileLoc(app, "binfile")
+ self.add_field(self.binfile)
+
+ self.owner = Owner(app, "owner")
+ self.add_field(self.owner)
+
+ self.hadoophost = HadoopHostField(app, "hadoophost")
+ self.add_field(self.hadoophost)
def process_display(self, session):
self.scheduler.validate(session)
Modified: branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py 2012-12-12 18:46:49 UTC (rev 5586)
+++ branches/elephant/cumin/python/cumin/gridhadoop/hadoop.py 2012-12-13 14:51:02 UTC (rev 5587)
@@ -87,3 +87,67 @@
class HadoopNodeCreateForm(ObjectTaskForm):
def render_form_class(self, session):
return " ".join((super(HadoopNodeCreateForm, self).render_form_class(session), "mform"))
+
+
+class HadoopHostField(ScalarField):
+ def __init__(self, app, name):
+ super(HadoopHostField, self).__init__(app, name, None)
+
+ self.param = StringParameter(app, "param")
+ self.add_parameter(self.param)
+
+ self.input = self.HadoopHostOptions(app, "input", self.param)
+ self.add_child(self.input)
+
+ def get(self, session):
+ return self.input.get(session)
+
+ def validate(self, session):
+ super(HadoopHostField, self).validate(session)
+
+ name_nodes = self.input.get_items(session)
+
+ if not name_nodes:
+ error = FormError("There is no hadoop host to submit to")
+ self.form.errors.add(session, error)
+
+ def render_title(self, session):
+ return "Hadoop host"
+
+ def render_help(self, session):
+ return "Submit hadoop jobs to this host"
+
+ class HadoopHostOptions(OptionInputSet):
+ def do_process(self, session):
+ id = self.param.get(session)
+
+ if id is None:
+ items = self.get_items(session)
+
+ if items:
+ self.param.set(session, items[0])
+
+ super(HadoopHostField.HadoopHostOptions, self).do_process \
+ (session)
+
+ def do_get_items(self, session):
+ hosts = ["grid2.lab.bos.redhat.com"]
+ return hosts
+
+ def render_item_value(self, session, item):
+ return item
+
+ def render_item_content(self, session, item):
+ return xml_escape(item)
+
+ def render_item_selected_attr(self, session, item):
+ if item == self.param.get(session):
+ return "selected=\"selected\""
+
+class BinFileLoc(StringField):
+ def render_title(self, session):
+ return "Location of hadoop bin-tar file"
+
+class Owner(StringField):
+ def render_title(self, session):
+ return "Username of the owner"
\ No newline at end of file
Modified: branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py 2012-12-12 18:46:49 UTC (rev 5586)
+++ branches/elephant/cumin/python/cumin/gridhadoop/jobtracker.py 2012-12-13 14:51:02 UTC (rev 5587)
@@ -105,6 +105,15 @@
self.count = self.CountField(app, "count")
self.add_field(self.count)
+
+ self.binfile = BinFileLoc(app, "binfile")
+ self.add_field(self.binfile)
+
+ self.owner = Owner(app, "owner")
+ self.add_field(self.owner)
+
+ self.hadoophost = HadoopHostField(app, "hadoophost")
+ self.add_field(self.hadoophost)
def process_display(self, session):
self.scheduler.validate(session)
Modified: branches/elephant/cumin/python/cumin/gridhadoop/namenode.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-12 18:46:49 UTC (rev 5586)
+++ branches/elephant/cumin/python/cumin/gridhadoop/namenode.py 2012-12-13 14:51:02 UTC (rev 5587)
@@ -8,7 +8,7 @@
from hadoop import *
-from sage.util import MethodResult
+from sage.util import *
class NameNodeSelector(ObjectSelector):
def __init__(self, app, name):
@@ -63,6 +63,28 @@
self.form = NameNodeCreateForm(app, self.name, self, cls)
self.invoc = None
+
+ def callback(self, result):
+ if result == False:
+ self.invoc.status = self.invoc.FAILED
+ self.invoc.end()
+
+ def fake_call(self, binfile, owner, hadoophost):
+ return True
+
+ def do_invoke(self, session, object, invoc, args):
+ self.invoc = invoc
+ (binfile, owner, hadoophost) = args
+
+ try:
+ print "Here is where I'd do the work and start up %s name nodes" % count
+ call_async(self.callback, self.fake_call, binfile, owner, hadoophost)
+ #call_async(self.callback, self.app.hadoop.create_name_node, name_nodes)
+ pass
+ except:
+ invoc.status = invoc.FAILED
+ log.debug("Creating name node failed", exc_info=True)
+ invoc.end()
def get_title(self, session, x):
return "Create a name node"
@@ -71,9 +93,15 @@
class NameNodeCreateForm(HadoopNodeCreateForm):
def __init__(self, app, name, task, cls):
super(NameNodeCreateForm, self).__init__(app, name, task, cls)
+
+ self.binfile = BinFileLoc(app, "binfile")
+ self.add_field(self.binfile)
- self.count = self.CountField(app, "count")
- self.add_field(self.count)
+ self.owner = Owner(app, "owner")
+ self.add_field(self.owner)
+
+ self.hadoophost = HadoopHostField(app, "hadoophost")
+ self.add_field(self.hadoophost)
def process_display(self, session):
self.scheduler.validate(session)
@@ -85,19 +113,15 @@
self.page.redirect.set(session, url)
if not self.errors.get(session):
- count = self.count.get(session)
-
- print "Here is where I'd do the work and start up %s name nodes" % count
+ binfile = self.binfile.get(session)
+ owner = self.owner.get(session)
+ hadoophost = self.hadoophost.get(session)
+ self.task.invoke(session, None, (binfile, owner, hadoophost))
self.task.exit_with_redirect(session, url)
def render_title(self, session):
return "Create a name node"
-
- class CountField(StringField):
- def render_title(self, session):
- return "Number of name nodes to start"
-
class NameNodeDelete(ObjectSelectorTask):
def __init__(self, app, selector, name):
super(NameNodeDelete, self).__init__(app, selector)
Modified: branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py
===================================================================
--- branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py 2012-12-12 18:46:49 UTC (rev 5586)
+++ branches/elephant/cumin/python/cumin/gridhadoop/tasktracker.py 2012-12-13 14:51:02 UTC (rev 5587)
@@ -107,7 +107,16 @@
self.add_field(self.jobTracker)
self.count = self.CountField(app, "count")
- self.add_field(self.count)
+ self.add_field(self.count)
+
+ self.binfile = BinFileLoc(app, "binfile")
+ self.add_field(self.binfile)
+
+ self.owner = Owner(app, "owner")
+ self.add_field(self.owner)
+
+ self.hadoophost = HadoopHostField(app, "hadoophost")
+ self.add_field(self.hadoophost)
def process_display(self, session):
self.scheduler.validate(session)
11 years, 4 months
r5586 - branches/elephant/wooly/python/wooly
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-12-12 18:46:49 +0000 (Wed, 12 Dec 2012)
New Revision: 5586
Modified:
branches/elephant/wooly/python/wooly/table.py
Log:
Merging in change 5571 from trunk
Modified: branches/elephant/wooly/python/wooly/table.py
===================================================================
--- branches/elephant/wooly/python/wooly/table.py 2012-12-11 21:18:34 UTC (rev 5585)
+++ branches/elephant/wooly/python/wooly/table.py 2012-12-12 18:46:49 UTC (rev 5586)
@@ -261,7 +261,7 @@
def render_content(self, session, record):
return self.parent.do_render_cell_content(session, record)
- def render_cell_title(self, session, record):
+ def render_title(self, session, record):
#gives us the title="" attribute for each table cell
return self.parent.do_render_cell_title(session, record)
@@ -304,10 +304,7 @@
class LinkColumnCell(TableColumnCell):
def render_href(self, session, record):
- return self.parent.render_cell_href(session, record)
-
- def render_title(self, session, record):
- return self.parent.render_cell_title(session, record)
+ return self.parent.render_cell_href(session, record)
class CheckboxColumnHeader(TableColumnHeader):
def render_name(self, session):
11 years, 4 months
r5585 - branches/elephant/cumin/model/access
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-12-11 21:18:34 +0000 (Tue, 11 Dec 2012)
New Revision: 5585
Modified:
branches/elephant/cumin/model/access/persona.xml
Log:
Change personas so that "grid" remains unchanged from previous and "gridoop" adds the hadoop tabs to the admin view.
Modified: branches/elephant/cumin/model/access/persona.xml
===================================================================
--- branches/elephant/cumin/model/access/persona.xml 2012-12-11 21:16:54 UTC (rev 5584)
+++ branches/elephant/cumin/model/access/persona.xml 2012-12-11 21:18:34 UTC (rev 5585)
@@ -29,7 +29,6 @@
<Module name="grid"/>
<Module name="inventory"/>
<Module name="usergrid"/>
- <Module name="gridhadoop"/>
<GroupAccess name="nogroup">
<MainPage name="login.html"/>
@@ -50,7 +49,9 @@
<Persona name="gridoop" auth="True">
<Module name="account"/>
<Module name="configuration"/>
+ <Module name="grid"/>
<Module name="inventory"/>
+ <Module name="usergrid"/>
<Module name="gridhadoop"/>
<GroupAccess name="nogroup">
@@ -67,7 +68,7 @@
<MainPage name="index.html"/>
<ModuleAccess name="*"/>
</GroupAccess>
- </Persona>
+ </Persona>
<Persona name="messaging" auth="False">
<Module name="account"/>
11 years, 4 months
r5584 - in branches/elephant: cumin/bin cumin/python/cumin sage/python/sage/aviary
by tmckay@fedoraproject.org
Author: tmckay
Date: 2012-12-11 21:16:54 +0000 (Tue, 11 Dec 2012)
New Revision: 5584
Modified:
branches/elephant/cumin/bin/cumin-web
branches/elephant/cumin/python/cumin/config.py
branches/elephant/cumin/python/cumin/main.py
branches/elephant/sage/python/sage/aviary/aviaryoperations.py
Log:
Add initial stub for Aviary hadoop operations to aviaryoperations.
Modified: branches/elephant/cumin/bin/cumin-web
===================================================================
--- branches/elephant/cumin/bin/cumin-web 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/cumin/bin/cumin-web 2012-12-11 21:16:54 UTC (rev 5584)
@@ -21,6 +21,7 @@
def set_aviary_configs(cumin, values):
cumin.aviary_job_servers = values.aviary_job_servers
cumin.aviary_query_servers = values.aviary_query_servers
+ cumin.aviary_hadoop_servers = values.aviary_hadoop_servers
cumin.aviary_locator = values.aviary_locator
cumin.aviary_key = values.aviary_key
cumin.aviary_cert = values.aviary_cert
Modified: branches/elephant/cumin/python/cumin/config.py
===================================================================
--- branches/elephant/cumin/python/cumin/config.py 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/cumin/python/cumin/config.py 2012-12-11 21:16:54 UTC (rev 5584)
@@ -196,6 +196,9 @@
param = ConfigParameter(self, "aviary-job-servers", str)
param.default = "http://localhost:9090"
+ param = ConfigParameter(self, "aviary-hadoop-servers", str)
+ param.default = "http://localhost:9090"
+
param = ConfigParameter(self, "aviary-query-servers", str)
param.default = "http://localhost:9091"
Modified: branches/elephant/cumin/python/cumin/main.py
===================================================================
--- branches/elephant/cumin/python/cumin/main.py 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/cumin/python/cumin/main.py 2012-12-11 21:16:54 UTC (rev 5584)
@@ -104,6 +104,7 @@
# Aviary operations for that server type will not be used.
self.aviary_job_servers = ""
self.aviary_query_servers = ""
+ self.aviary_hadoop_servers = ""
self.aviary_key = ""
self.aviary_cert = ""
self.aviary_root_cert = ""
@@ -202,45 +203,53 @@
# given op...
self.remote = Catalog()
ops = [QmfOperations("qmf", self.session)]
+ try:
+ from sage.aviary.aviaryoperations \
+ import SudsLogging, AviaryOperationsFactory
+ imports_ok = True
+ except:
+ imports_ok = False
- imports_ok = True
- if self.aviary_job_servers or self.aviary_query_servers:
- try:
- from sage.aviary.aviaryoperations import \
- SudsLogging, AviaryOperationsFactory
- except:
- imports_ok = False
- if imports_ok:
- SudsLogging.set(self.aviary_suds_logs, self.home)
+ # Turn off hadoop operations config if the module isn't loaded
+ hadoop_loaded = "gridhadoop" in self.authorizator.get_enabled_modules()
+ if not hadoop_loaded:
+ self.aviary_hadoop_servers = ""
- # By default Cumin uses /var/lib/condor/aviary/services for
- # aviary wsdl files if it exists.
- aviary_dir = ["/var/lib/condor/aviary/services",
- os.path.join(self.home, "rpc-defs/aviary")]
- if not self.aviary_prefer_condor:
- aviary_dir = [aviary_dir[1], aviary_dir[0]]
+ if imports_ok:
+ SudsLogging.set(self.aviary_suds_logs, self.home)
- # The factory will choose an impl that gives us jobs, queries,
- # or both depending on whether job_servers and query_servers
- # are empty strings. If locator is non empty, their actual
- # values will be overridden but the presence of a value will
- # still control enable/disable.
- aviary_itf = AviaryOperationsFactory("aviary", aviary_dir,
- self.aviary_locator,
- self.aviary_job_servers,
- self.aviary_query_servers,
- key=self.aviary_key,
- cert=self.aviary_cert,
- root_cert=self.aviary_root_cert,
- domain_verify=self.aviary_domain_verify)
+ # By default Cumin uses /var/lib/condor/aviary/services for
+ # aviary wsdl files if it exists.
+ aviary_dir = ["/var/lib/condor/aviary/services",
+ os.path.join(self.home, "rpc-defs/aviary")]
+ if not self.aviary_prefer_condor:
+ aviary_dir = [aviary_dir[1], aviary_dir[0]]
+
+ # The factory will choose an impls that give us services
+ # based on the values of the 'X_servers' arguments.
+ # Empty/non-empty for a 'X_servers' value controls enable/disable.
+ (aviary_itf,
+ hadoop_itf) = AviaryOperationsFactory("aviary", aviary_dir,
+ self.aviary_locator,
+ self.aviary_job_servers,
+ self.aviary_query_servers,
+ self.aviary_hadoop_servers,
+ key=self.aviary_key,
+ cert=self.aviary_cert,
+ root_cert=self.aviary_root_cert,
+ domain_verify=self.aviary_domain_verify)
+ if hadoop_itf:
+ ops.insert(0, hadoop_itf)
+ if aviary_itf:
ops.insert(0, aviary_itf)
- else:
- log.info("Imports failed for Aviary interface, disabling")
+ else:
+ log.info("Imports failed for Aviary interface, disabling")
log.info("%s Aviary locator interface" % \
((self.aviary_locator and \
(self.aviary_job_servers or \
- self.aviary_query_servers) and \
+ self.aviary_query_servers or
+ self.aviary_hadoop_servers) and \
imports_ok) and "Enabled" or "Disabled"))
log.info("%s Aviary interface for job submission and control." % \
@@ -249,6 +258,10 @@
log.info("%s Aviary interface for query operations." % \
((self.aviary_query_servers and imports_ok) and "Enabled" or "Disabled"))
+ if hadoop_loaded:
+ log.info("%s Aviary interface for hadoop operations." % \
+ ((self.aviary_hadoop_servers and imports_ok) and "Enabled" or "Disabled"))
+
self.remote.add_mechanisms(ops)
# Create RPC interface for Wallaby
Modified: branches/elephant/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-11 20:32:57 UTC (rev 5583)
+++ branches/elephant/sage/python/sage/aviary/aviaryoperations.py 2012-12-11 21:16:54 UTC (rev 5584)
@@ -632,7 +632,24 @@
query_client, "getSubmissionSummary", subId)
t.start()
+class _AviaryHadoopMethods(object):
+ # Do this here rather than __init__ so we don't have to worry about
+ # matching parameter lists in multiple inheritance cases with super
+ def init(self, datadir, hadoop_servers):
+
+ if self.locator:
+ self.hadoop_servers = ServerList(self.locator, "SCHEDULER", "HADOOP")
+ else:
+ self.hadoop_servers = FixedServerList(hadoop_servers,
+ "9090",
+ "/services/hadoop/",
+ "HADOOP")
+
+ hadoop_wsdl = "file:" + os.path.join(get_datadir(datadir, "hadoop"),
+ "aviary-hadoop.wsdl")
+ self.hadoop_client_pool = ClientPool(hadoop_wsdl, None)
+
class _AviaryCommon(object):
def __init__(self, name, locator,
key="", cert="", root_cert="", domain_verify=True):
@@ -783,12 +800,25 @@
_AviaryQueryMethods.init(self, datadir, query_servers)
+class AviaryHadoopOperations(_AviaryCommon, _AviaryHadoopMethods):
+ def __init__(self, name, datadir, locator, hadoop_servers,
+ key="", cert="", root_cert="", domain_verify=True):
+
+ super(AviaryHadoopOperations, self).__init__(name, locator,
+ key, cert, root_cert,
+ domain_verify)
+
+ _AviaryHadoopMethods.init(self, datadir, hadoop_servers)
+
def AviaryOperationsFactory(name, datadir, locator_uri,
- job_servers, query_servers,
+ job_servers, query_servers, hadoop_servers,
key="", cert="", root_cert="", domain_verify=True):
# If locator uri has not been specified, it's disabled and we will
- # use the specified job_servers and query_servers values
+ # use the specified job_servers and query_servers values.
+ # These operations are historically selectable because originally
+ # they were an alternative implementation to QMF methods.
+ # At some point in the future this may change.
if locator_uri:
locator = AviaryLocator(datadir, locator_uri,
key, cert, root_cert, domain_verify)
@@ -796,14 +826,23 @@
locator = None
if job_servers and query_servers:
- res = AviaryOperations(name, datadir, locator,
+ ops = AviaryOperations(name, datadir, locator,
job_servers, query_servers,
key, cert, root_cert, domain_verify)
elif job_servers:
- res = AviaryJobOperations(name, datadir, locator,job_servers,
+ ops = AviaryJobOperations(name, datadir, locator,job_servers,
key, cert, root_cert, domain_verify)
elif query_servers:
- res = AviaryQueryOperations(name, datadir, locator,query_servers,
+ ops = AviaryQueryOperations(name, datadir, locator,query_servers,
key, cert, root_cert, domain_verify)
- return res
+ else:
+ ops = None
+ if hadoop_servers:
+ hadoop = AviaryHadoopOperations(name, datadir, locator, hadoop_servers,
+ key, cert, root_cert, domain_verify)
+ else:
+ hadoop = None
+
+ return ops, hadoop
+
11 years, 4 months