This patch adds a XML API for backwards file transfers from
slaves to the controller.
Some changes to the previous ctl -> slave transfering API was made
so both the interfaces are similar to each other.
Signed-off-by: Radek Pazdera <rpazdera(a)redhat.com>
---
lnst/Controller/NetTestController.py | 33 +++++++++---
lnst/Slave/NetTestSlave.py | 90 +++++++++++++++++++++++-----------
2 files changed, 86 insertions(+), 37 deletions(-)
diff --git a/lnst/Controller/NetTestController.py b/lnst/Controller/NetTestController.py
index 340cdea..469db04 100644
--- a/lnst/Controller/NetTestController.py
+++ b/lnst/Controller/NetTestController.py
@@ -20,7 +20,6 @@ import tempfile
from xmlrpclib import Binary
from pprint import pprint, pformat
from lnst.Common.Logs import Logs, log_exc_traceback
-from lnst.Common.SshUtils import scp_from_remote
from lnst.Common.XmlRpc import ServerProxy, ServerException
from lnst.Common.NetUtils import MacPool
from lnst.Common.VirtUtils import VirtNetCtl, VirtDomainCtl, BridgeCtl
@@ -490,7 +489,7 @@ class NetTestController:
self._rpc_call(machine_id, 'stop_packet_capture')
def _gather_capture_files(self):
- logging_root = Logs.get_logging_root_path()
+ logging_root = self._log_root_path
logging_root = os.path.abspath(logging_root)
logging.info("Retrieving capture files from slaves")
for machine_id in self._recipe["machines"]:
@@ -510,8 +509,7 @@ class NetTestController:
for remote_path in capture_files:
filename = os.path.basename(remote_path)
local_path = os.path.join(slave_logging_dir, filename)
- scp_from_remote(hostname, "22", "root", rootpass,
- remote_path, local_path)
+ self._copy_from_slave(machine_id, remote_path, local_path)
def _update_system_config(self, machine_id, res_data, persistent=False):
info = self._get_machineinfo(machine_id)
@@ -568,7 +566,7 @@ class NetTestController:
logger.handle(record)
def _copy_to_slave(self, local_path, machine_id, remote_path=None):
- self._rpc_call(machine_id, "start_copy", remote_path)
+ remote_path = self._rpc_call(machine_id, "start_copy_to", remote_path)
f = open(local_path, "r+b")
while True:
@@ -576,11 +574,28 @@ class NetTestController:
if len(data) == 0:
break
- self._rpc_call(machine_id, "copy_part", Binary(data))
+ self._rpc_call(machine_id, "copy_part_to",
+ remote_path, Binary(data))
- # return remote path
- rpath = self._rpc_call(machine_id, "finish_copy")
- return rpath
+ self._rpc_call(machine_id, "finish_copy_to", remote_path)
+ return remote_path
+
+ def _copy_from_slave(self, machine_id, remote_path, local_path):
+ status = self._rpc_call(machine_id, "start_copy_from", remote_path)
+ if not status:
+ raise NetTestError("The requested file cannot be transfered." \
+ "It file does not exist on machine %d" % machine_id)
+
+ local_file = open(local_path, "wb")
+
+ binary = "next"
+ while binary != "":
+ binary = self._rpc_call(machine_id, "copy_part_from",
+ remote_path, 1024*1024) # 1MB buffer
+ local_file.write(binary.data)
+
+ local_file.close()
+ self._rpc_call(machine_id, "finish_copy_from", remote_path)
def _load_test_modules(self, dirs):
modules = {}
diff --git a/lnst/Slave/NetTestSlave.py b/lnst/Slave/NetTestSlave.py
index 28cca22..4106f2d 100644
--- a/lnst/Slave/NetTestSlave.py
+++ b/lnst/Slave/NetTestSlave.py
@@ -14,6 +14,7 @@ jpirko(a)redhat.com (Jiri Pirko)
import signal
import select, logging
import os
+from xmlrpclib import Binary
from tempfile import NamedTemporaryFile
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
from lnst.Common.Logs import Logs, log_exc_traceback
@@ -40,7 +41,8 @@ class NetTestSlaveXMLRPC:
self._netconfig = NetConfig()
self._command_context = command_context
- self._copy_target = None
+ self._copy_targets = {}
+ self._copy_sources = {}
self._cache = ResourceCache(config.get_option("cache",
"dir"),
config.get_option("cache",
"expiration_period"))
@@ -50,11 +52,13 @@ class NetTestSlaveXMLRPC:
def hello(self):
self.clear_resource_table()
self._cache.del_old_entries()
+ self.reset_file_transfers()
return "hello"
def bye(self):
self.clear_resource_table()
self._cache.del_old_entries()
+ self.reset_file_transfers()
return "bye"
def get_new_logs(self):
@@ -153,33 +157,6 @@ class NetTestSlaveXMLRPC:
self._cache.del_old_entries()
return True
- def start_copy(self, filename=None):
- if self._copy_target:
- return False
-
- if filename:
- self._copy_target = open(filename, "w+b")
- else:
- self._copy_target = NamedTemporaryFile("w+b", delete=False)
-
- return True
-
- def copy_part(self, binary_data):
- if self._copy_target:
- self._copy_target.write(binary_data.data)
- return True
-
- return False
-
- def finish_copy(self):
- if self._copy_target:
- name = self._copy_target.name
- del self._copy_target
- self._copy_target = None
- return name
-
- return ""
-
def clear_resource_table(self):
self._resource_table = {}
return True
@@ -206,6 +183,63 @@ class NetTestSlaveXMLRPC:
self._cache.add_cache_entry(file_hash, local_path, name, res_type)
return True
+ def start_copy_to(self, filepath=None):
+ if filepath in self._copy_targets:
+ return ""
+
+ if filepath:
+ self._copy_targets[filepath] = open(filepath, "w+b")
+ else:
+ tmpfile = NamedTemporaryFile("w+b", delete=False)
+ filepath = tmpfile.name
+ self._copy_targets[filepath] = tmpfile
+
+ return filepath
+
+ def copy_part_to(self, filepath, binary_data):
+ if self._copy_targets[filepath]:
+ self._copy_targets[filepath].write(binary_data.data)
+ return True
+
+ return False
+
+ def finish_copy_to(self, filepath):
+ if self._copy_targets[filepath]:
+ self._copy_targets[filepath].close()
+
+ del self._copy_targets[filepath]
+ return True
+
+ return False
+
+ def start_copy_from(self, filepath):
+ if filepath in self._copy_sources or not os.path.exists(filepath):
+ return False
+
+ self._copy_sources[filepath] = open(filepath, "rb")
+ return True
+
+ def copy_part_from(self, filepath, buffsize):
+ data = Binary(self._copy_sources[filepath].read(buffsize))
+ return data
+
+ def finish_copy_from(self, filepath):
+ if filepath in self._copy_sources:
+ self._copy_sources[filepath].close()
+ del self._copy_sources[filepath]
+ return True
+
+ return False
+
+ def reset_file_transfers(self):
+ for file_handle in self._copy_targets.itervalues():
+ file_handle.close()
+ self._copy_targets = {}
+
+ for file_handle in self._copy_sources.itervalues():
+ file_handle.close()
+ self._copy_sources = {}
+
class MySimpleXMLRPCServer(Server):
def __init__(self, command_context, *args, **kwargs):
self._finished = False
--
1.7.7.6