Nir Soffer has uploaded a new change for review.
Change subject: lib: Revert and refine error handling in tmap() ......................................................................
lib: Revert and refine error handling in tmap()
In commit 2b7155b696 (lib: Simplify and generalize concurrent.tmap()), we simplified error handling by returning a named tuple with function results. This turned out less useful then the original error handling.
This patch returns the previous error handling:
- Functions passed to tmap() should not raise - if they raise, this is considered a bug in the function. - The last error is raised by tmap() instead of returning the result. This make it easier to fail loudly for unexpected errors. - The original exception is re-raised now with the original traceback. - Error handling is documented properly now
Previously you had to make sure function raises to signal failures:
def func(): try: code that should not fail... code that may fail... code that should not fail... except ExpectedError: log.error(...) raise except Exception: log.exception(...) raise
results = concurrent.tmap(func, values)
if not all(r.succeeded for r in results): ...
Returning the result as is lets us have nicer code:
def func(): code that should not fail... try: code that may fail... except ExpectedError: log.error(...) return False code that should not fail... return True
succeeded = concurrent.tmap(func, values)
if not all(succeeded): ...
We can ignore unexpected errors, since tmap() will log them and fail loudly. We can also minimize try except block for expected errors.
Change-Id: I0154b28ff7822c63e77181bbbf444c712bd0c31e Signed-off-by: Nir Soffer nsoffer@redhat.com --- M lib/vdsm/concurrent.py M tests/concurrentTests.py 2 files changed, 45 insertions(+), 19 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/11/39211/1
diff --git a/lib/vdsm/concurrent.py b/lib/vdsm/concurrent.py index 64e072d..5498052 100644 --- a/lib/vdsm/concurrent.py +++ b/lib/vdsm/concurrent.py @@ -18,22 +18,42 @@ # Refer to the README and COPYING files for full details of the license #
+import logging import threading -from collections import namedtuple - - -Result = namedtuple("Result", ["succeeded", "value"]) +import sys
def tmap(func, iterable): + """ + Run func with arguments from iterable in multiple threads, a returning the + output in order of arguments. + + func should not raise exceptions - we consider this a bug in func, and will + fail the call and re-raise the exception in the caller thread. + + Expected exceptions should be handled in func. If the caller likes to + handle the error later, func should return it: + + def func(value): + try: + return something(value) + except ExpectedError as e: + return e + + Unexpected exceptions should not be handled, as they are logged in the + worker threads and re-raised in the caller thread. If multiple excpetions + raised, only the last one will be re-raised in the caller thread. + """ args = list(iterable) results = [None] * len(args) + error = [None]
def worker(i, f, arg): try: - results[i] = Result(True, f(arg)) - except Exception as e: - results[i] = Result(False, e) + results[i] = f(arg) + except Exception: + error[0] = sys.exc_info() + logging.exception("Unhandled exception in tmap worker thread")
threads = [] for i, arg in enumerate(args): @@ -45,4 +65,8 @@ for t in threads: t.join()
+ if error[0] is not None: + t, v, tb = error[0] + raise t, v, tb + return results diff --git a/tests/concurrentTests.py b/tests/concurrentTests.py index 307e397..5c0646b 100644 --- a/tests/concurrentTests.py +++ b/tests/concurrentTests.py @@ -26,13 +26,16 @@ from vdsm import concurrent
+class Error(Exception): + pass + + class TMapTests(VdsmTestCase):
def test_results(self): values = tuple(range(10)) results = concurrent.tmap(lambda x: x, values) - expected = [concurrent.Result(True, x) for x in values] - self.assertEqual(results, expected) + self.assertEqual(results, list(values))
def test_results_order(self): def func(x): @@ -40,8 +43,7 @@ return x values = tuple(random.random() * 0.1 for x in range(10)) results = concurrent.tmap(func, values) - expected = [concurrent.Result(True, x) for x in values] - self.assertEqual(results, expected) + self.assertEqual(results, list(values))
def test_concurrency(self): start = time.time() @@ -49,12 +51,12 @@ elapsed = time.time() - start self.assertTrue(0.1 < elapsed < 0.2)
- def test_error(self): - error = RuntimeError("No result for you!") - + def test_raise_last_error(self): def func(x): - raise error - - results = concurrent.tmap(func, range(10)) - expected = [concurrent.Result(False, error)] * 10 - self.assertEqual(results, expected) + raise Error(x) + try: + concurrent.tmap(func, (1, 2, 3)) + except Error as e: + self.assertEqual(e.args, (3,)) + else: + self.fail("Exception was not raised")