Tomas Golembiovsky has uploaded a new change for review.
Change subject: v2v: Add PipelineProc, pipeline wrapper object
......................................................................
v2v: Add PipelineProc, pipeline wrapper object
We plan to pipe the output of virt-v2v to tee to store the log file. In
order to manage the pipelined processes new class is introduced.
Change-Id: I0c3741ae7ef9731a2cd9d587e86766b9e6e64f62
Signed-off-by: Tomáš Golembiovský <tgolembi(a)redhat.com>
---
M lib/vdsm/v2v.py
M tests/v2vTests.py
2 files changed, 132 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/94/62094/1
diff --git a/lib/vdsm/v2v.py b/lib/vdsm/v2v.py
index 860768e..e149ddc 100644
--- a/lib/vdsm/v2v.py
+++ b/lib/vdsm/v2v.py
@@ -34,6 +34,7 @@
import re
import signal
import tarfile
+import time
import threading
import xml.etree.ElementTree as ET
import zipfile
@@ -46,7 +47,8 @@
from vdsm.constants import P_VDSM_RUN, EXT_KVM_2_OVIRT
from vdsm.define import errCode, doneCode
from vdsm import cmdutils, concurrent, libvirtconnection, response
-from vdsm.utils import traceback, CommandPath, NICENESS, IOCLASS
+from vdsm.utils import monotonic_time, traceback, CommandPath, \
+ NICENESS, IOCLASS
try:
import ovirt_imageio_common
@@ -635,6 +637,78 @@
return ret
+class PipelineProc(object):
+
+ def __init__(self, proc1, proc2):
+ self._proc = (proc1, proc2)
+ self._stdout = proc2.stdout
+
+ def kill(self):
+ """
+ Kill all processes in a pipeline. Unlike regular kill() we do not raise
+ OSError if the processess have already terminated.
+ """
+ for p in self._proc:
+ try:
+ logging.debug("Killing pid=%d", p.pid)
+ p.kill()
+ except OSError as e:
+ # Probably the process has already terminated
+ if e.errno != errno.ESRCH:
+ raise e
+
+ @property
+ def pid(self):
+ return [p.pid for p in self._proc]
+
+ @property
+ def returncode(self):
+ """
+ Returns None if any of the processes is still running. Returns 0 if all
+ processes have finished with a zero exit code, otherwise return first
+ nonzero exit code.
+ """
+ ret = 0
+ for p in self._proc:
+ p.poll()
+ if p.returncode is None:
+ return None
+ if p.returncode != 0 and ret == 0:
+ # One of the processes has failed
+ ret = p.returncode
+
+ # All processes have finished
+ return ret
+
+ @property
+ def stdout(self):
+ return self._stdout
+
+ def wait(self, timeout=None):
+ if timeout is not None:
+ deadline = monotonic_time() + timeout
+ else:
+ deadline = None
+
+ for p in self._proc:
+ if deadline is not None:
+ # NOTE: CPopen doesn't support timeout argument.
+ while monotonic_time() < deadline:
+ time.sleep(1)
+ p.poll()
+ if p.returncode is not None:
+ break
+ else:
+ p.wait()
+
+ if deadline is not None:
+ if deadline < monotonic_time() or self.returncode is None:
+ # Timed out
+ return False
+
+ return True
+
+
class ImportVm(object):
TERM_DELAY = 30
PROC_WAIT_TIMEOUT = 30
diff --git a/tests/v2vTests.py b/tests/v2vTests.py
index 6d3a8f4..a0ae3b7 100644
--- a/tests/v2vTests.py
+++ b/tests/v2vTests.py
@@ -579,6 +579,63 @@
self.assertEqual(out, msg)
+@expandPermutations
+class PipelineProcTests(TestCaseBase):
+
+ PROC_WAIT_TIMEOUT = 30
+
+ def testRun(self):
+ msg = 'foo\nbar'
+ p1 = v2v._simple_exec_cmd(['echo', '-n', msg],
+ stdout=subprocess.PIPE)
+ p2 = v2v._simple_exec_cmd(['cat'],
+ stdin=p1.stdout,
+ stdout=subprocess.PIPE)
+
+ p = v2v.PipelineProc(p1, p2)
+ self.assertEqual(p.pid, [p1.pid, p2.pid])
+
+ ret = p.wait(self.PROC_WAIT_TIMEOUT)
+ self.assertEqual(ret, True)
+
+ out = p.stdout.read()
+ self.assertEqual(out, msg)
+
+ @permutations([
+ # (cmd1, cmd2, returncode)
+ ['false', 'true', 1],
+ ['true', 'false', 1],
+ ['true', 'true', 0],
+ ])
+ def testReturncode(self, cmd1, cmd2, returncode):
+ p1 = v2v._simple_exec_cmd([cmd1],
+ stdout=subprocess.PIPE)
+ p2 = v2v._simple_exec_cmd([cmd2],
+ stdin=p1.stdout,
+ stdout=subprocess.PIPE)
+ p = v2v.PipelineProc(p1, p2)
+ p.wait(self.PROC_WAIT_TIMEOUT)
+ self.assertEqual(p.returncode, returncode)
+
+ @permutations([
+ # (cmd1, cmd2, waitRet)
+ [['sleep', '1'], ['sleep', '1'], True],
+ [['sleep', '1'], ['sleep', '3'], False],
+ [['sleep', '3'], ['sleep', '1'], False],
+ [['sleep', '3'], ['sleep', '3'], False],
+ ])
+ def testWait(self, cmd1, cmd2, waitRet):
+ p1 = v2v._simple_exec_cmd(cmd1,
+ stdout=subprocess.PIPE)
+ p2 = v2v._simple_exec_cmd(cmd2,
+ stdin=p1.stdout,
+ stdout=subprocess.PIPE)
+ p = v2v.PipelineProc(p1, p2)
+ ret = p.wait(2)
+ p.kill()
+ self.assertEqual(ret, waitRet)
+
+
class MockVirConnectTests(TestCaseBase):
def setUp(self):
self._vms = [MockVirDomain(*spec) for spec in VM_SPECS]
--
To view, visit
https://gerrit.ovirt.org/62094
To unsubscribe, visit
https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0c3741ae7ef9731a2cd9d587e86766b9e6e64f62
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Tomas Golembiovsky <tgolembi(a)redhat.com>