Nir Soffer has uploaded a new change for review.
Change subject: vm: Prevent multiple threads blocking on same libvirt domain ......................................................................
vm: Prevent multiple threads blocking on same libvirt domain
If a libvirt call get stuck because a vm is not responding, we could have multiple threads blocked on same vm without any limit, using precious libvirt resources that could be used to run other vms.
This patch adds a new TimedLock lock, that raise a LockTimeout if the lock cannot be acquired after configured timeout. Using this lock, a vm allow now only one concurrent libvirt call. If a libvirt call get stuck, and other threads are tyring to invoke libvirt calls on the same vm, they will wait until the current call finish, or fail with a timeout.
This should slow down calls for single vm, since each call is invoked only when the previous call returns. However, when using many vms, this creates natural round-robin scheduling, giving each vm equal chance to make progress, and limiting the load on libvirt.
Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990 Signed-off-by: Nir Soffer nsoffer@redhat.com --- M debian/vdsm-python.install M lib/vdsm/Makefile.am A lib/vdsm/locking.py M tests/Makefile.am A tests/lockingTests.py M vdsm.spec.in M vdsm/virt/vm.py 7 files changed, 172 insertions(+), 8 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/72/30772/1
diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install index 9d6a99c..b80fd4f 100644 --- a/debian/vdsm-python.install +++ b/debian/vdsm-python.install @@ -8,6 +8,7 @@ ./usr/lib/python2.7/dist-packages/vdsm/exception.py ./usr/lib/python2.7/dist-packages/vdsm/ipwrapper.py ./usr/lib/python2.7/dist-packages/vdsm/libvirtconnection.py +./usr/lib/python2.7/dist-packages/vdsm/locking.py ./usr/lib/python2.7/dist-packages/vdsm/netconfpersistence.py ./usr/lib/python2.7/dist-packages/vdsm/netinfo.py ./usr/lib/python2.7/dist-packages/vdsm/netlink/__init__.py diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am index 8e90e6e..ffb642c 100644 --- a/lib/vdsm/Makefile.am +++ b/lib/vdsm/Makefile.am @@ -28,6 +28,7 @@ exception.py \ ipwrapper.py \ libvirtconnection.py \ + locking.py \ netconfpersistence.py \ netinfo.py \ profile.py \ diff --git a/lib/vdsm/locking.py b/lib/vdsm/locking.py new file mode 100644 index 0000000..9e92ca3 --- /dev/null +++ b/lib/vdsm/locking.py @@ -0,0 +1,54 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import threading +import time + + +class LockTimeout(Exception): + """ Timeout acquiring a TimedLock """ + + +class TimedLock(object): + """ + A lock raising a LockTimeout if it cannot be acquired within timeout. + """ + + def __init__(self, name, timeout): + self._name = name + self._timeout = timeout + self._cond = threading.Condition(threading.Lock()) + self._busy = False + + def __enter__(self): + deadline = time.time() + self._timeout + with self._cond: + while self._busy: + now = time.time() + if now >= deadline: + raise LockTimeout("Timeout acquiring lock %r" % self._name) + self._cond.wait(deadline - now) + self._busy = True + return self + + def __exit__(self, *args): + with self._cond: + self._busy = False + self._cond.notify() diff --git a/tests/Makefile.am b/tests/Makefile.am index aa4a45e..10a2727 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -44,6 +44,7 @@ jsonRpcTests.py \ ksmTests.py \ libvirtconnectionTests.py \ + lockingTests.py \ lvmTests.py \ main.py \ miscTests.py \ diff --git a/tests/lockingTests.py b/tests/lockingTests.py new file mode 100644 index 0000000..9ac8a05 --- /dev/null +++ b/tests/lockingTests.py @@ -0,0 +1,89 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import time +import threading +from vdsm import locking +from testlib import VdsmTestCase + + +class TimedLockTests(VdsmTestCase): + + def test_free(self): + lock = locking.TimedLock("xxx-yyy-zzz", 0) + with self.assertNotRaises(): + with lock: + pass + + def test_busy(self): + lock = locking.TimedLock("xxx-yyy-zzz", 0) + with self.assertRaises(locking.LockTimeout): + with lock: + with lock: + pass + + def test_serialize(self): + lock = locking.TimedLock("xxx-yyy-zzz", 0.4) + single_thread = threading.BoundedSemaphore(1) + passed = [0] + timedout = [0] + + def run(): + try: + with lock: + with single_thread: + time.sleep(0.1) + passed[0] += 1 + except locking.LockTimeout: + timedout[0] += 1 + + self.run_threads(3, run) + self.assertEquals(passed[0], 3) + self.assertEquals(timedout[0], 0) + + def test_timeout(self): + lock = locking.TimedLock("xxx-yyy-zzz", 0.1) + single_thread = threading.BoundedSemaphore(1) + passed = [0] + timedout = [0] + + def run(): + try: + with lock: + with single_thread: + time.sleep(0.2) + passed[0] += 1 + except locking.LockTimeout: + timedout[0] += 1 + + self.run_threads(3, run) + self.assertEquals(passed[0], 1) + self.assertEquals(timedout[0], 2) + + def run_threads(self, count, target): + threads = [] + for i in range(count): + t = threading.Thread(target=target) + t.daemon = True + threads.append(t) + for t in threads: + t.start() + for t in threads: + t.join() diff --git a/vdsm.spec.in b/vdsm.spec.in index 2d371b1..c7a2b0b 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1159,6 +1159,7 @@ %{python_sitearch}/%{vdsm_name}/exception.py* %{python_sitearch}/%{vdsm_name}/ipwrapper.py* %{python_sitearch}/%{vdsm_name}/libvirtconnection.py* +%{python_sitearch}/%{vdsm_name}/locking.py* %{python_sitearch}/%{vdsm_name}/netinfo.py* %{python_sitearch}/%{vdsm_name}/netlink/__init__.py* %{python_sitearch}/%{vdsm_name}/netlink/addr.py* diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py index 25eec71..9e9a844 100644 --- a/vdsm/virt/vm.py +++ b/vdsm/virt/vm.py @@ -37,6 +37,7 @@ # vdsm imports from vdsm import constants from vdsm import libvirtconnection +from vdsm import locking from vdsm import netinfo from vdsm import qemuimg from vdsm import utils @@ -619,12 +620,24 @@
class NotifyingVirDomain: - # virDomain wrapper that notifies vm when a method raises an exception with - # get_error_code() = VIR_ERR_OPERATION_TIMEOUT + """ + Wrap virDomain object for limiting concurrent calls and reporting timeouts.
- def __init__(self, dom, tocb): + The wrapper allows only one concurrent call per vm, to prevent blocking of + multiple threads when underlying libvirt call get stuck. Invoking a domain + method will block if the domain is busy with another call. If the domain is + not available after timeout seconds, a timeout is reported and a + TimeoutError is raised. + + If a domain method was invoked, and the libvirt call failed with with + VIR_ERR_OPERATION_TIMEOUT error code, the timeout is reported, and + TimeoutError is raised. + """ + + def __init__(self, dom, tocb, vmid, timeout=30): self._dom = dom self._cb = tocb + self._timedlock = locking.TimedLock(vmid, timeout)
def __getattr__(self, name): attr = getattr(self._dom, name) @@ -633,7 +646,8 @@
def f(*args, **kwargs): try: - ret = attr(*args, **kwargs) + with self._timedlock: + ret = attr(*args, **kwargs) self._cb(False) return ret except libvirt.libvirtError as e: @@ -643,6 +657,9 @@ toe.err = e.err raise toe raise + except locking.LockTimeout as e: + self._cb(True) + raise TimeoutError(e) return f
@@ -2892,7 +2909,7 @@ if self.recovering: self._dom = NotifyingVirDomain( self._connection.lookupByUUIDString(self.id), - self._timeoutExperienced) + self._timeoutExperienced, self.id) elif 'restoreState' in self.conf: fromSnapshot = self.conf.get('restoreFromSnapshot', False) srcDomXML = self.conf.pop('_srcDomXML') @@ -2912,7 +2929,7 @@
self._dom = NotifyingVirDomain( self._connection.lookupByUUIDString(self.id), - self._timeoutExperienced) + self._timeoutExperienced, self.id) else: flags = libvirt.VIR_DOMAIN_NONE if 'launchPaused' in self.conf: @@ -2921,7 +2938,7 @@ del self.conf['launchPaused'] self._dom = NotifyingVirDomain( self._connection.createXML(domxml, flags), - self._timeoutExperienced) + self._timeoutExperienced, self.id) hooks.after_vm_start(self._dom.XMLDesc(0), self.conf) for dev in self._customDevices(): hooks.after_device_create(dev._deviceXML, self.conf, @@ -3690,7 +3707,7 @@ # or restart vdsm if connection to libvirt was lost self._dom = NotifyingVirDomain( self._connection.lookupByUUIDString(self.id), - self._timeoutExperienced) + self._timeoutExperienced, self.id)
if not self._incomingMigrationFinished.isSet(): state = self._dom.state(0)