Author: tmckay
Date: 2013-06-19 20:09:47 +0000 (Wed, 19 Jun 2013)
New Revision: 5757
Modified:
trunk/sage/python/sage/aviary/aviaryoperations.py
Log:
Add file backing for external name nodes and job trackers
Modified: trunk/sage/python/sage/aviary/aviaryoperations.py
===================================================================
--- trunk/sage/python/sage/aviary/aviaryoperations.py 2013-06-19 18:46:21 UTC (rev 5756)
+++ trunk/sage/python/sage/aviary/aviaryoperations.py 2013-06-19 20:09:47 UTC (rev 5757)
@@ -1032,6 +1032,19 @@
self._external_name_nodes = []
self._external_job_trackers = []
+ path = os.path.join(os.sep, "var", "lib", "cumin")
+ if os.path.isdir(path):
+ self._external_nn_path = os.path.join(path, "name_nodes")
+ self._external_jt_path = os.path.join(path, "job_trackers")
+ else:
+ hdef = os.path.expanduser("~")
+ path = os.environ.get("CUMIN_HOME", hdef)
+ self._external_nn_path = os.path.join(path, ".name_nodes")
+ self._external_jt_path = os.path.join(path, ".job_trackers")
+
+ log.debug("External name nodes path %s" % self._external_nn_path)
+ log.debug("External job trackers path %s" % self._external_jt_path)
+
def start_name_node(self, host, bin_file, owner, description, callback):
assert callable(callback)
@@ -1113,25 +1126,42 @@
self._hadoop_lock.acquire()
if not (ipc, url) in self._external_name_nodes:
self._external_name_nodes.append((ipc, url))
+ try:
+ self._write_external(self._external_nn_path,
+ self._external_name_nodes)
+ except Exception:
+ log.debug("Failed to write external name nodes")
self._hadoop_lock.release()
if callback:
callback("OK")
else:
return "OK"
- def get_external_name_nodes(self, callback=None):
+ def get_external_name_nodes(self, callback=None):
+ status = "OK"
+ res = []
self._hadoop_lock.acquire()
- res = copy.deepcopy(self._external_name_nodes)
+ try:
+ res = self._read_external(self._external_nn_path,
+ self._external_name_nodes)
+ except Exception:
+ status = "FAILED"
+ log.debug("Failed to read external name nodes")
self._hadoop_lock.release()
if callback:
- callback("OK", res)
+ callback(status, res)
else:
- return ("OK", res)
+ return (status, res)
def add_external_job_tracker(self, ipc, url, callback=None):
self._hadoop_lock.acquire()
if not (ipc, url) in self._external_job_trackers:
self._external_job_trackers.append((ipc, url))
+ try:
+ self._write_external(self._external_jt_path,
+ self._external_job_trackers)
+ except Exception:
+ log.debug("Failed to write external job trackers")
self._hadoop_lock.release()
if callback:
callback("OK")
@@ -1139,14 +1169,43 @@
return "OK"
def get_external_job_trackers(self, callback=None):
+ status = "OK"
+ res = []
self._hadoop_lock.acquire()
- res = copy.deepcopy(self._external_job_trackers)
+ try:
+ res = self._read_external(self._external_jt_path,
+ self._external_job_trackers)
+ except Exception:
+ status = "FAILED"
+ log.debug("Failed to read external job trackers")
self._hadoop_lock.release()
if callback:
- callback("OK", res)
+ callback(status, res)
else:
- return ("OK", res)
+ return (status, res)
+ def _read_external(self, path, vals):
+ res = []
+ file = open(path, "r")
+ lines = file.readlines()
+ for l in lines:
+ ipc, url = l.split()
+ res.append((ipc, url))
+ return res
+
+ def _write_external(self, path, vals):
+ p = os.path.split(path)[0]
+ if os.path.isdir(p):
+ try:
+ file = open(path, "w+")
+ for v in vals:
+ file.write(v[0] + " " + v[1] + "\n")
+ file.close()
+ return True
+ except Exception:
+ pass
+ return False
+
def _make_id(self, client, val):
def url(v):