Nir Soffer has uploaded a new change for review.
Change subject: schedule: Introduce scheduling libary ......................................................................
schedule: Introduce scheduling libary
This moudule provides a Scheduler class scheduling execution of callables on a background thread.
This should be part of the new scalable vm sampling implemntation, and can be used also whenever you like to perform a short task on a background thread, without waiting for the completion of the task.
See the module docstring and tests for usage examples.
Change-Id: Ie3764806d93bd37c3b5924080eb5ae4d29e4f4e0 Signed-off-by: Nir Soffer nsoffer@redhat.com --- M debian/vdsm-python.install M lib/vdsm/Makefile.am A lib/vdsm/schedule.py M tests/Makefile.am A tests/scheduleTests.py M vdsm.spec.in 6 files changed, 346 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/07/29607/1
diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install index 2d4bba6..1775241 100644 --- a/debian/vdsm-python.install +++ b/debian/vdsm-python.install @@ -16,6 +16,7 @@ ./usr/lib/python2.7/dist-packages/vdsm/netlink/link.py ./usr/lib/python2.7/dist-packages/vdsm/profile.py ./usr/lib/python2.7/dist-packages/vdsm/qemuimg.py +./usr/lib/python2.7/dist-packages/vdsm/schedule.py ./usr/lib/python2.7/dist-packages/vdsm/sslutils.py ./usr/lib/python2.7/dist-packages/vdsm/tool/__init__.py ./usr/lib/python2.7/dist-packages/vdsm/tool/dummybr.py diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am index c074bb3..89a5573 100644 --- a/lib/vdsm/Makefile.am +++ b/lib/vdsm/Makefile.am @@ -32,6 +32,7 @@ netinfo.py \ profile.py \ qemuimg.py \ + schedule.py \ SecureXMLRPCServer.py \ sslutils.py \ utils.py \ diff --git a/lib/vdsm/schedule.py b/lib/vdsm/schedule.py new file mode 100644 index 0000000..bd75924 --- /dev/null +++ b/lib/vdsm/schedule.py @@ -0,0 +1,213 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +""" +This module provides a Scheduler class scheduling execution of +a callable on a background thread. + +To use a scheduler, create an instance: + + scheduler = schedule.Scheduler() + +When you want to schedule some callable: + + def task(): + print '30 seconds passed' + + scheduler.schedule(30.0, task) + +task will be called after 30.0 seconds on the scheduler background thread. + +If you need to cancel a scheduled call, keep the ScheduledCall object returned +from Scheduler.schedule(), and cancel the task: + + scheduled_call = scheduler.schedule(30.0, call) + ... + scheduled_call.cancel() + +Finally, when the scheduler is not needed any more: + + scheduler.cancel() + +This will cancel any pending calls and terminate the scheduler thread. +""" + +import heapq +import logging +import threading +import time + +from . import utils + + +class Scheduler(object): + """ + Schedule calls for future execution in a background thread. + + This class is thread safe; multiple threads can schedule calls or cancel + the scheudler. + """ + + DEFAULT_DELAY = 30.0 # Used if no timeout are scheduled + + _log = logging.getLogger("vds.Scheduler") + + def __init__(self): + self._log.debug("Starting scheduler") + self._cond = threading.Condition(threading.Lock()) + self._running = True + self._timeouts = [] + t = threading.Thread(target=self._run) + t.daemon = True + t.start() + + def schedule(self, delay, callee): + """ + Schedule callee to be called after delay seconds on the scheduler + thread. + + Callee must not block or take excessive time to complete. It it does + not finish quickly, it may delay other scheduled calls on the scheduler + thread. + + Returns a ScheduledCall that may be canceled if callee was not called + yet. + """ + deadline = time.time() + delay + timeout = _Timeout(deadline, callee) + self._log.debug("Schedulng %s", timeout) + with self._cond: + if self._running: + heapq.heappush(self._timeouts, timeout) + self._cond.notify() + else: + timeout.cancel() + return ScheduledCall(timeout) + + def cancel(self): + """ + Cancel all schedueld calls and invalidate the scheduler. Calls + scheduled after a scheduler was cancel will never be called. + """ + self._log.debug("Canceling scheduler") + with self._cond: + self._running = False + self._cond.notify() + + @utils.traceback(on=_log.name) + def _run(self): + try: + self._log.debug("started") + self._loop() + self._log.debug("canceled") + finally: + self._cleanup() + + def _loop(self): + while True: + with self._cond: + if not self._running: + return + delay = self._time_until_deadline() + if delay > 0.0: + self._cond.wait(delay) + if not self._running: + return + expired = self._pop_expired_timeouts() + for timeout in expired: + timeout.fire() + + def _time_until_deadline(self): + if self._timeouts: + return self._timeouts[0].deadline - time.time() + return self.DEFAULT_DELAY + + def _pop_expired_timeouts(self): + now = time.time() + expired = [] + while self._timeouts: + timeout = self._timeouts[0] + if timeout.deadline > now: + break + heapq.heappop(self._timeouts) + expired.append(timeout) + return expired + + def _cleanup(self): + # Help the garbage collector by breaking reference cycles + with self._cond: + for timeout in self._timeouts: + timeout.cancel() + + +class ScheduledCall(object): + """ + Returned when a callable is scheduled to be called after delay. The caller + may cancel the call if it was not called yet. + + This class is thread safe; any thread can cacnel a call. + """ + + _log = logging.getLogger("vds.Scheduler") + + def __init__(self, timeout): + self._timeout = timeout + + @property + def deadline(self): + return self._timeout.deadline + + def cancel(self): + self._log.debug("Canceling %s", self) + self._timeout.cancel() + + +# Sentinel for marking timeouts as invalid. Callable so we can invaliate a +# timeout in a thread safe manner without locks. +def _INVALID(): + pass + + +class _Timeout(object): + """ + Created for each scheduled call. + """ + + _log = logging.getLogger('vds.Timeout') + + def __init__(self, deadline, callee): + self.deadline = deadline + self.callee = callee + + def fire(self): + if self.callee is _INVALID: + return + try: + self.callee() + except Exception: + self._log.exception("Unhandled exception in scheduled call") + finally: + self.callee = _INVALID + + def cancel(self): + self.callee = _INVALID + + def __cmp__(self, other): + return cmp(self.deadline, other.deadline) diff --git a/tests/Makefile.am b/tests/Makefile.am index 6507165..786dea4 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -65,6 +65,7 @@ remoteFileHandlerTests.py \ resourceManagerTests.py \ samplingTests.py \ + scheduleTests.py \ schemaTests.py \ securableTests.py \ sslTests.py \ diff --git a/tests/scheduleTests.py b/tests/scheduleTests.py new file mode 100644 index 0000000..0c370d9 --- /dev/null +++ b/tests/scheduleTests.py @@ -0,0 +1,129 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import threading +import time + +from vdsm import schedule +from testrunner import VdsmTestCase + + +class SchedulerTests(VdsmTestCase): + + # Time to wait for completion, so test will not fail on overloaded + # machines. If tests fails on CI, increase this value. + GRACETIME = 0.1 + + MAX_TASKS = 1000 + + def setUp(self): + self.scheduler = schedule.Scheduler() + + def tearDown(self): + self.scheduler.cancel() + + def test_schedule(self): + delay = 0.3 + task1 = Task() + task2 = Task() + timeout1 = self.scheduler.schedule(delay, task1) + self.scheduler.schedule(10, task2) + task1.wait(delay + self.GRACETIME) + self.assertTrue(timeout1.deadline <= task1.call_time) + self.assertTrue(task1.call_time < timeout1.deadline + self.GRACETIME) + self.assertEquals(task2.call_time, None) + + def test_schedule_many(self): + delay = 0.3 + tasks = [] + for i in range(self.MAX_TASKS): + task = Task() + timeout = self.scheduler.schedule(delay, task) + tasks.append((task, timeout)) + last_task = tasks[-1][0] + last_task.wait(delay + self.GRACETIME) + for task, timeout in tasks: + self.assertTrue(timeout.deadline <= task.call_time) + self.assertTrue(task.call_time < timeout.deadline + self.GRACETIME) + + def test_continue_after_failures(self): + self.scheduler.schedule(0.3, FailingTask()) + task = Task() + self.scheduler.schedule(0.4, task) + task.wait(0.4 + self.GRACETIME) + self.assertTrue(task.call_time is not None) + + def test_cancel_timeout(self): + delay = 0.3 + task = Task() + timeout = self.scheduler.schedule(delay, task) + timeout.cancel() + task.wait(delay + self.GRACETIME) + self.assertEquals(task.call_time, None) + + def test_cancel_many(self): + delay = 0.3 + tasks = [] + for i in range(self.MAX_TASKS): + task = Task() + timeout = self.scheduler.schedule(delay, task) + tasks.append((task, timeout)) + for task, timeout in tasks: + timeout.cancel() + last_task = tasks[-1][0] + last_task.wait(delay + self.GRACETIME) + for task, timeout in tasks: + self.assertEquals(task.call_time, None) + + def test_cancel(self): + delay = 0.3 + tasks = [] + for i in range(self.MAX_TASKS): + task = Task() + timeout = self.scheduler.schedule(delay, task) + tasks.append((task, timeout)) + self.scheduler.cancel() + last_task = tasks[-1][0] + last_task.wait(delay + self.GRACETIME) + for task, timeout in tasks: + self.assertEquals(task.call_time, None) + + +class Task(object): + + def __init__(self): + self.cond = threading.Condition(threading.Lock()) + self.call_time = None + + def __call__(self): + with self.cond: + self.call_time = time.time() + self.cond.notify() + + def wait(self, timeout): + with self.cond: + if self.call_time is None: + self.cond.wait(timeout) + + +class FailingTask(object): + + def __call__(self): + raise Exception("This task is broken") diff --git a/vdsm.spec.in b/vdsm.spec.in index dfca5bd..8ba0477 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1163,6 +1163,7 @@ %{python_sitearch}/%{vdsm_name}/qemuimg.py* %{python_sitearch}/%{vdsm_name}/SecureXMLRPCServer.py* %{python_sitearch}/%{vdsm_name}/netconfpersistence.py* +%{python_sitearch}/%{vdsm_name}/schedule.py* %{python_sitearch}/%{vdsm_name}/sslutils.py* %{python_sitearch}/%{vdsm_name}/utils.py* %{python_sitearch}/%{vdsm_name}/vdscli.py*