Change in vdsm[master]: misc: Replace assert with AssetionError
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: misc: Replace assert with AssetionError
......................................................................
misc: Replace assert with AssetionError
The code was wrongly assumed that asserts are always available. When
running in optimized mode, the check would be skipped, leading to
disastrous results.
The assert was correct in that this is something that should never
happen, unless someone is using the barrier incorrectly, or there is
a bug in the code.
Change-Id: I68f57e393423dc4faf58763a036220849a968e70
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/storage/misc.py
1 file changed, 3 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/63/34363/1
diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py
index ab2b4fd..5abcdcb 100644
--- a/vdsm/storage/misc.py
+++ b/vdsm/storage/misc.py
@@ -696,7 +696,9 @@
def exit(self):
with self._cond:
- assert self._busy, "Attempt to exit a barrier without entering"
+ if not self._busy:
+ raise AssertionError("Attempt to exit a barrier without "
+ "entering")
self._busy = False
self._cond.notifyAll()
--
To view, visit http://gerrit.ovirt.org/34363
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I68f57e393423dc4faf58763a036220849a968e70
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: Configure iscsi_session recovery_tmo for multipath devices
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: Configure iscsi_session recovery_tmo for multipath devices
......................................................................
Configure iscsi_session recovery_tmo for multipath devices
This configuration is done by device-mapper-multipath in version
0.4.9-76 and later. However, device looses configuration after recovery
from network errors.
This patch adds the recovery_tmo configuration to vdsm-multipath udev
rules. Since setting the timeout requires a script, the dmsetup command
was moved into a new multipath-configure-device script.
When we require a multipath version fixing this issue, we can remove the
timeout handling code.
Change-Id: Iaaa40fa5e6dc86beef501ef4aaefa17c4c1574c1
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M .gitignore
M configure.ac
M debian/vdsm.install
M vdsm.spec.in
M vdsm/storage/Makefile.am
A vdsm/storage/multipath-configure-device.in
A vdsm/storage/vdsm-multipath.rules
D vdsm/storage/vdsm-multipath.rules.in
8 files changed, 53 insertions(+), 22 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/82/32582/1
diff --git a/.gitignore b/.gitignore
index fca4f2e..5890806 100644
--- a/.gitignore
+++ b/.gitignore
@@ -61,7 +61,6 @@
vdsm/storage/protect/safelease
vdsm/storage/lvm.env
vdsm/storage/vdsm-lvm.rules
-vdsm/storage/vdsm-multipath.rules
vdsm/sudoers.vdsm
vdsm/svdsm.logger.conf
vdsm/vdscli.py
diff --git a/configure.ac b/configure.ac
index f40d57a..c467c37 100644
--- a/configure.ac
+++ b/configure.ac
@@ -335,10 +335,10 @@
vdsm/rpc/Makefile
vdsm/sos/Makefile
vdsm/storage/Makefile
+ vdsm/storage/multipath-configure-device
vdsm/storage/imageRepository/Makefile
vdsm/storage/protect/Makefile
vdsm/storage/vdsm-lvm.rules
- vdsm/storage/vdsm-multipath.rules
vdsm/virt/Makefile
vdsm_hooks/Makefile
vdsm_hooks/checkimages/Makefile
diff --git a/debian/vdsm.install b/debian/vdsm.install
index bd63c72..759383d 100644
--- a/debian/vdsm.install
+++ b/debian/vdsm.install
@@ -25,6 +25,7 @@
./usr/lib/python2.7/dist-packages/sos/plugins/vdsm.py
./usr/lib/python2.7/dist-packages/vdsmapi.py
./usr/libexec/vdsm/curl-img-wrap
+./usr/libexec/vdsm/multiapth-configure-device
./usr/libexec/vdsm/ovirt_functions.sh
./usr/libexec/vdsm/persist-vdsm-hooks
./usr/libexec/vdsm/safelease
diff --git a/vdsm.spec.in b/vdsm.spec.in
index a5b354c..c912150 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -993,6 +993,7 @@
%{_libexecdir}/%{vdsm_name}/persist-vdsm-hooks
%{_libexecdir}/%{vdsm_name}/unpersist-vdsm-hook
%{_libexecdir}/%{vdsm_name}/ovirt_functions.sh
+%{_libexecdir}/%{vdsm_name}/multipath-configure-device
%{_libexecdir}/%{vdsm_name}/vdsm-gencerts.sh
%{_libexecdir}/%{vdsm_name}/vdsmd_init_common.sh
%{_datadir}/%{vdsm_name}/network/__init__.py*
diff --git a/vdsm/storage/Makefile.am b/vdsm/storage/Makefile.am
index 99b1460..91c2f9c 100644
--- a/vdsm/storage/Makefile.am
+++ b/vdsm/storage/Makefile.am
@@ -72,7 +72,8 @@
volume.py
dist_vdsmexec_SCRIPTS = \
- curl-img-wrap
+ curl-img-wrap \
+ multipath-configure-device
nodist_vdsmstorage_DATA = \
lvm.env \
diff --git a/vdsm/storage/multipath-configure-device.in b/vdsm/storage/multipath-configure-device.in
new file mode 100644
index 0000000..dcdfea5
--- /dev/null
+++ b/vdsm/storage/multipath-configure-device.in
@@ -0,0 +1,33 @@
+#!/bin/sh
+#
+# Copyright 2014 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed to you under the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version. See the files README and
+# LICENSE_GPL_v2 which accompany this distribution.
+#
+
+set -e
+
+# Ensure that multipath devices use no_path_retry fail, instead of
+# no_path_retry queue, which is hardcoded in multipath configuration for many
+# devices.
+#
+# See https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/...
+
+@DMSETUP_PATH@ message $DM_NAME 0 fail_if_no_path
+
+# Set iscsi_session recovery_tmo. This parameter is configured by multipath on
+# startup, but after a device fail and become active again, the configuration
+# is lost and revert back to iscsid.conf default (120).
+
+timeout=5
+
+for slave in /sys${DEVPATH}/slaves/*; do
+ path=$(@UDEVADM_PATH@ info --query=path --path="$slave")
+ session=$(echo "$path" | @SED_PATH@ -rn s'|^/devices/platform/host[0-9]+/(session[0-9]+)/.+$|\1|p')
+ if [ -n "$session" ]; then
+ echo $timeout > "/sys/class/iscsi_session/${session}/recovery_tmo"
+ fi
+done
diff --git a/vdsm/storage/vdsm-multipath.rules b/vdsm/storage/vdsm-multipath.rules
new file mode 100644
index 0000000..d4d0dd7
--- /dev/null
+++ b/vdsm/storage/vdsm-multipath.rules
@@ -0,0 +1,15 @@
+#
+# Copyright 2014 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed to you under the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version. See the files README and
+# LICENSE_GPL_v2 which accompany this distribution.
+#
+
+# Vdsm rules for multipath devices.
+#
+# This rule runs vdsm-mutipath-configure tool to configure multiapth devices to
+# work with vdsm.
+
+ACTION=="add|change", ENV{DM_UUID}=="mpath-?*", RUN+="/usr/libexec/vdsm/multipath-configure-device"
diff --git a/vdsm/storage/vdsm-multipath.rules.in b/vdsm/storage/vdsm-multipath.rules.in
deleted file mode 100644
index cc2a878..0000000
--- a/vdsm/storage/vdsm-multipath.rules.in
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Copyright 2014 Red Hat, Inc. and/or its affiliates.
-#
-# Licensed to you under the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version. See the files README and
-# LICENSE_GPL_v2 which accompany this distribution.
-#
-
-# Vdsm rules for multipath devices.
-#
-# Ensure that multipath devices use no_path_retry fail, instead of
-# no_path_retry queue, which is hardcoded in multipath configuration for many
-# devices.
-#
-# See https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/...
-#
-
-ACTION=="add|change", ENV{DM_UUID}=="mpath-?*", RUN+="@DMSETUP_PATH@ message $env{DM_NAME} 0 fail_if_no_path"
--
To view, visit http://gerrit.ovirt.org/32582
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaaa40fa5e6dc86beef501ef4aaefa17c4c1574c1
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: vm: Require format attribute for drives
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: vm: Require format attribute for drives
......................................................................
vm: Require format attribute for drives
We have seen sampling errors where a drive has no "format" attribute.
These errors spam the system logs, and does not help to debug the real
issue - why the required format attribute is not set?
This patch ensure that a drive cannot be created without a format
attribute, avoding log spam by sampling errors, and hopefully revealing
the real reason for this bug.
One broken test creating a drive without a format was fixed and new test
ensure that creating drive without a format raises.
Change-Id: I01ab1e071ecb76f383cc6dc7d99782e10cc90136
Relates-To: http://gerrit.ovirt.org/22551
Relates-To: https://bugzilla.redhat.com/994534
Relates-To: http://lists.ovirt.org/pipermail/users/2014-February/021007.html
Bug-Url: https://bugzilla.redhat.com/1055437
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M tests/vmTests.py
M vdsm/vm.py
2 files changed, 28 insertions(+), 3 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/34/24234/1
diff --git a/tests/vmTests.py b/tests/vmTests.py
index 1e0a3f6..724d458 100644
--- a/tests/vmTests.py
+++ b/tests/vmTests.py
@@ -449,6 +449,18 @@
redir = vm.RedirDevice(self.conf, self.log, **dev)
self.assertXML(redir.getXML(), redirXML)
+ def testDriveRequiredParameters(self):
+ # TODO: It is not clear what are the other required parameters, and it
+ # the parameters depend on the type of drive. We probbaly need bigger
+ # test here that test many combinations.
+ # Currently this test only missing "format" attribute.
+ conf = {'index': '2', 'propagateErrors': 'off', 'iface': 'ide',
+ 'name': 'hdc', 'device': 'cdrom', 'path': '/tmp/fedora.iso',
+ 'type': 'disk', 'readonly': 'True', 'shared': 'none',
+ 'serial': '54-a672-23e5b495a9ea'}
+ self.assertRaises(vm.InvalidDeviceParameters, vm.Drive, {}, self.log,
+ **conf)
+
def testDriveSharedStatus(self):
sharedConfigs = [
# Backward compatibility
@@ -470,7 +482,8 @@
'exclusive', 'shared', 'none', 'transient',
]
- driveConfig = {'index': '0', 'iface': 'virtio', 'device': 'disk'}
+ driveConfig = {'index': '0', 'iface': 'virtio', 'device': 'disk',
+ 'format': 'raw'}
for driveInput, driveOutput in zip(sharedConfigs, expectedStates):
driveInput.update(driveConfig)
diff --git a/vdsm/vm.py b/vdsm/vm.py
index aae8bd6..b7151b1 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -80,6 +80,10 @@
SMARTCARD_DEVICES = 'smartcard'
+class InvalidDeviceParameters(Exception):
+ """ Raised when creating device with invalid parameters """
+
+
def isVdsmImage(drive):
"""
Tell if drive looks like a vdsm image
@@ -1438,6 +1442,9 @@
VOLWM_CHUNK_REPLICATE_MULT = 2 # Chunk multiplier during replication
def __init__(self, conf, log, **kwargs):
+ if 'format' not in kwargs:
+ raise InvalidDeviceParameters('"format" attribute is required:'
+ ' %r' % kwargs)
if not kwargs.get('serial'):
self.serial = kwargs.get('imageID'[-20:]) or ''
VmDevice.__init__(self, conf, log, **kwargs)
@@ -3108,8 +3115,13 @@
for devType, devClass in self.DeviceMapping:
for dev in devices[devType]:
- self._devices[devType].append(devClass(self.conf, self.log,
- **dev))
+ try:
+ drive = devClass(self.conf, self.log, **dev)
+ except InvalidDeviceParameters as e:
+ self.log.error('Ignoring device with invalid parameters:'
+ ' %s', e, exc_info=True)
+ else:
+ self._devices[devType].append(drive)
# We should set this event as a last part of drives initialization
self._pathsPreparedEvent.set()
--
To view, visit http://gerrit.ovirt.org/24234
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I01ab1e071ecb76f383cc6dc7d99782e10cc90136
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: vm: Prevent multiple threads blocking on same libvirt domain
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: vm: Prevent multiple threads blocking on same libvirt domain
......................................................................
vm: Prevent multiple threads blocking on same libvirt domain
If a libvirt call get stuck because a vm is not responding, we could
have multiple threads blocked on same vm without any limit, using
precious libvirt resources that could be used to run other vms.
This patch adds a new TimedLock lock, that raise a LockTimeout if the
lock cannot be acquired after configured timeout. Using this lock, a vm
allow now only one concurrent libvirt call. If a libvirt call get stuck,
and other threads are tyring to invoke libvirt calls on the same
vm, they will wait until the current call finish, or fail with a
timeout.
This should slow down calls for single vm, since each call is invoked
only when the previous call returns. However, when using many vms, this
creates natural round-robin scheduling, giving each vm equal chance to
make progress, and limiting the load on libvirt.
Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M debian/vdsm-python.install
M lib/vdsm/Makefile.am
A lib/vdsm/locking.py
M tests/Makefile.am
A tests/lockingTests.py
M vdsm.spec.in
M vdsm/virt/vm.py
7 files changed, 172 insertions(+), 8 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/72/30772/1
diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index 9d6a99c..b80fd4f 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -8,6 +8,7 @@
./usr/lib/python2.7/dist-packages/vdsm/exception.py
./usr/lib/python2.7/dist-packages/vdsm/ipwrapper.py
./usr/lib/python2.7/dist-packages/vdsm/libvirtconnection.py
+./usr/lib/python2.7/dist-packages/vdsm/locking.py
./usr/lib/python2.7/dist-packages/vdsm/netconfpersistence.py
./usr/lib/python2.7/dist-packages/vdsm/netinfo.py
./usr/lib/python2.7/dist-packages/vdsm/netlink/__init__.py
diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am
index 8e90e6e..ffb642c 100644
--- a/lib/vdsm/Makefile.am
+++ b/lib/vdsm/Makefile.am
@@ -28,6 +28,7 @@
exception.py \
ipwrapper.py \
libvirtconnection.py \
+ locking.py \
netconfpersistence.py \
netinfo.py \
profile.py \
diff --git a/lib/vdsm/locking.py b/lib/vdsm/locking.py
new file mode 100644
index 0000000..9e92ca3
--- /dev/null
+++ b/lib/vdsm/locking.py
@@ -0,0 +1,54 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import threading
+import time
+
+
+class LockTimeout(Exception):
+ """ Timeout acquiring a TimedLock """
+
+
+class TimedLock(object):
+ """
+ A lock raising a LockTimeout if it cannot be acquired within timeout.
+ """
+
+ def __init__(self, name, timeout):
+ self._name = name
+ self._timeout = timeout
+ self._cond = threading.Condition(threading.Lock())
+ self._busy = False
+
+ def __enter__(self):
+ deadline = time.time() + self._timeout
+ with self._cond:
+ while self._busy:
+ now = time.time()
+ if now >= deadline:
+ raise LockTimeout("Timeout acquiring lock %r" % self._name)
+ self._cond.wait(deadline - now)
+ self._busy = True
+ return self
+
+ def __exit__(self, *args):
+ with self._cond:
+ self._busy = False
+ self._cond.notify()
diff --git a/tests/Makefile.am b/tests/Makefile.am
index aa4a45e..10a2727 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -44,6 +44,7 @@
jsonRpcTests.py \
ksmTests.py \
libvirtconnectionTests.py \
+ lockingTests.py \
lvmTests.py \
main.py \
miscTests.py \
diff --git a/tests/lockingTests.py b/tests/lockingTests.py
new file mode 100644
index 0000000..9ac8a05
--- /dev/null
+++ b/tests/lockingTests.py
@@ -0,0 +1,89 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import time
+import threading
+from vdsm import locking
+from testlib import VdsmTestCase
+
+
+class TimedLockTests(VdsmTestCase):
+
+ def test_free(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0)
+ with self.assertNotRaises():
+ with lock:
+ pass
+
+ def test_busy(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0)
+ with self.assertRaises(locking.LockTimeout):
+ with lock:
+ with lock:
+ pass
+
+ def test_serialize(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0.4)
+ single_thread = threading.BoundedSemaphore(1)
+ passed = [0]
+ timedout = [0]
+
+ def run():
+ try:
+ with lock:
+ with single_thread:
+ time.sleep(0.1)
+ passed[0] += 1
+ except locking.LockTimeout:
+ timedout[0] += 1
+
+ self.run_threads(3, run)
+ self.assertEquals(passed[0], 3)
+ self.assertEquals(timedout[0], 0)
+
+ def test_timeout(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0.1)
+ single_thread = threading.BoundedSemaphore(1)
+ passed = [0]
+ timedout = [0]
+
+ def run():
+ try:
+ with lock:
+ with single_thread:
+ time.sleep(0.2)
+ passed[0] += 1
+ except locking.LockTimeout:
+ timedout[0] += 1
+
+ self.run_threads(3, run)
+ self.assertEquals(passed[0], 1)
+ self.assertEquals(timedout[0], 2)
+
+ def run_threads(self, count, target):
+ threads = []
+ for i in range(count):
+ t = threading.Thread(target=target)
+ t.daemon = True
+ threads.append(t)
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 2d371b1..c7a2b0b 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1159,6 +1159,7 @@
%{python_sitearch}/%{vdsm_name}/exception.py*
%{python_sitearch}/%{vdsm_name}/ipwrapper.py*
%{python_sitearch}/%{vdsm_name}/libvirtconnection.py*
+%{python_sitearch}/%{vdsm_name}/locking.py*
%{python_sitearch}/%{vdsm_name}/netinfo.py*
%{python_sitearch}/%{vdsm_name}/netlink/__init__.py*
%{python_sitearch}/%{vdsm_name}/netlink/addr.py*
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 25eec71..9e9a844 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -37,6 +37,7 @@
# vdsm imports
from vdsm import constants
from vdsm import libvirtconnection
+from vdsm import locking
from vdsm import netinfo
from vdsm import qemuimg
from vdsm import utils
@@ -619,12 +620,24 @@
class NotifyingVirDomain:
- # virDomain wrapper that notifies vm when a method raises an exception with
- # get_error_code() = VIR_ERR_OPERATION_TIMEOUT
+ """
+ Wrap virDomain object for limiting concurrent calls and reporting timeouts.
- def __init__(self, dom, tocb):
+ The wrapper allows only one concurrent call per vm, to prevent blocking of
+ multiple threads when underlying libvirt call get stuck. Invoking a domain
+ method will block if the domain is busy with another call. If the domain is
+ not available after timeout seconds, a timeout is reported and a
+ TimeoutError is raised.
+
+ If a domain method was invoked, and the libvirt call failed with with
+ VIR_ERR_OPERATION_TIMEOUT error code, the timeout is reported, and
+ TimeoutError is raised.
+ """
+
+ def __init__(self, dom, tocb, vmid, timeout=30):
self._dom = dom
self._cb = tocb
+ self._timedlock = locking.TimedLock(vmid, timeout)
def __getattr__(self, name):
attr = getattr(self._dom, name)
@@ -633,7 +646,8 @@
def f(*args, **kwargs):
try:
- ret = attr(*args, **kwargs)
+ with self._timedlock:
+ ret = attr(*args, **kwargs)
self._cb(False)
return ret
except libvirt.libvirtError as e:
@@ -643,6 +657,9 @@
toe.err = e.err
raise toe
raise
+ except locking.LockTimeout as e:
+ self._cb(True)
+ raise TimeoutError(e)
return f
@@ -2892,7 +2909,7 @@
if self.recovering:
self._dom = NotifyingVirDomain(
self._connection.lookupByUUIDString(self.id),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
elif 'restoreState' in self.conf:
fromSnapshot = self.conf.get('restoreFromSnapshot', False)
srcDomXML = self.conf.pop('_srcDomXML')
@@ -2912,7 +2929,7 @@
self._dom = NotifyingVirDomain(
self._connection.lookupByUUIDString(self.id),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
else:
flags = libvirt.VIR_DOMAIN_NONE
if 'launchPaused' in self.conf:
@@ -2921,7 +2938,7 @@
del self.conf['launchPaused']
self._dom = NotifyingVirDomain(
self._connection.createXML(domxml, flags),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
hooks.after_vm_start(self._dom.XMLDesc(0), self.conf)
for dev in self._customDevices():
hooks.after_device_create(dev._deviceXML, self.conf,
@@ -3690,7 +3707,7 @@
# or restart vdsm if connection to libvirt was lost
self._dom = NotifyingVirDomain(
self._connection.lookupByUUIDString(self.id),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
if not self._incomingMigrationFinished.isSet():
state = self._dom.state(0)
--
To view, visit http://gerrit.ovirt.org/30772
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: sampling: Add sampling benchmark for profiling
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: sampling: Add sampling benchmark for profiling
......................................................................
sampling: Add sampling benchmark for profiling
We have trouble profiling vdsm when using 100's of vms. This patch adds
simple benchmark, simulating vdsm sampling, for evaluating the sampling
implementation.
Change-Id: I7bbb73f38fbbbdfc1fe7050ac3e30327411f8f33
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
A tests/perf/sampling_perf.py
1 file changed, 131 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/87/30987/1
diff --git a/tests/perf/sampling_perf.py b/tests/perf/sampling_perf.py
new file mode 100644
index 0000000..f656474
--- /dev/null
+++ b/tests/perf/sampling_perf.py
@@ -0,0 +1,131 @@
+# Compare sampling in multiple threads vs sampling via the scheduler and
+# executor.
+
+import pthreading
+pthreading.monkey_patch()
+
+import sys
+sys.path = ['../../lib', '../../vdsm'] + sys.path
+
+import libvirt
+import logging
+import signal
+import threading
+import time
+from xml.dom import minidom
+
+from vdsm import executor as execute
+from vdsm import profile
+from vdsm import schedule
+from vdsm import utils
+
+from virt import sampling
+
+RUN_SEC = 120
+VMS = 100
+
+connection = libvirt.openReadOnly('qemu:///system')
+
+
+def sampling_task():
+ # Do some libvirt calls and do some xml processing, simulating vdsm
+ # sampling functions.
+ sp = connection.listAllStoragePools()[0]
+ xml = sp.XMLDesc()
+ dom = minidom.parseString(xml)
+ uuid = dom.getElementsByTagName('uuid')[0]
+ return uuid.childNodes[0].data
+
+
+samplings = (
+ sampling.AdvancedStatsFunction(sampling_task, 2), # highWrite
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleVmJobs
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleBalloon
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleCpu
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleNet
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleVcpuPinning
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleCpuTune
+ sampling.AdvancedStatsFunction(sampling_task, 60), # updateVolumes
+ sampling.AdvancedStatsFunction(sampling_task, 60), # sampleDisk
+ sampling.AdvancedStatsFunction(sampling_task, 60), # sampleDiskLatency
+)
+
+
+class SamplerThread(object):
+
+ def __init__(self, name, samplings):
+ self.iterator = sampling.delays(sampling.cycle(samplings))
+ self.thread = threading.Thread(target=self._run, name=name)
+ self.thread.daemon = True
+ self.event = threading.Event()
+ self.last_sample_time = 0
+
+ def start(self):
+ logging.debug("starting %s", self.thread.name)
+ self.thread.start()
+
+ def stop(self):
+ logging.debug("stopping %s", self.thread.name)
+ self.event.set()
+
+ @utils.traceback()
+ def _run(self):
+ logging.debug("%s started", self.thread.name)
+ while not self.event.is_set():
+ delay, task = next(self.iterator)
+ if delay > 0:
+ self.event.wait(delay)
+ if self.event.is_set():
+ break
+ self.last_sample_time = time.time()
+ task()
+ logging.debug("%s stopped", self.thread.name)
+
+
+(a)profile.profile("threads.prof", builtins=True)
+def do_threads(vms):
+ threads = []
+ try:
+ for i in range(vms):
+ t = SamplerThread('SamplerThread-%i' % i, samplings)
+ t.start()
+ threads.append(t)
+ time.sleep(RUN_SEC)
+ finally:
+ for t in threads:
+ t.stop()
+
+
+(a)profile.profile("pool.prof", builtins=True)
+def do_pool(vms):
+ scheduler = schedule.Scheduler()
+ scheduler.start()
+ executor = execute.Executor(5, 1000, scheduler)
+ executor.start()
+ samplers = []
+ try:
+ for i in range(vms):
+ s = sampling.Sampler("Sampler-%i" % i, samplings, None,
+ scheduler=scheduler, executor=executor)
+ s.start()
+ samplers.append(s)
+ time.sleep(RUN_SEC)
+ finally:
+ for s in samplers:
+ s.stop()
+ executor.stop()
+ scheduler.stop()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG)
+ if len(sys.argv) == 1:
+ print 'Usage: pyhton sampling_perf.py threads|pool [vms]'
+ sys.exit(2)
+ name = 'do_' + sys.argv[1]
+ func = globals()[name]
+ if len(sys.argv) > 2:
+ vms = int(sys.argv[2])
+ else:
+ vms = VMS
+ func(vms)
--
To view, visit http://gerrit.ovirt.org/30987
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7bbb73f38fbbbdfc1fe7050ac3e30327411f8f33
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: sampling: Scalable vm sampling
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: sampling: Scalable vm sampling
......................................................................
sampling: Scalable vm sampling
Previously each vm was running a sampling thread, which is not very
efficient in Python when running 100's of vms. This introduces the
Sampler class, managing sampling for one vm, replacing the
AdvancedStatsThread class.
Each VM keeps one Sampler object, running sampling tasks serially using
a thread pool, ensuring that each vm has only single sampling task
running at any time.
If a VM sampling task get stuck, sampling for this vm stops until the
stuck task is finished. This keep the same behavior of the current code
using one sampling thread per vm.
For simplicity, this uses the storage thread pool for running sampling
tasks. In the final version, this should use a more robust thread pool
handling stuck worker threads, see: http://gerrit.ovirt.org/29191
Threads usage:
- One scheduler thread serving all vm samplers
- 20 worker threads. The final count should be determined by testing,
ensuring that we don't overload libvirt. Configurable using new
vars:vm_sampling_threads option.
Change-Id: I5539a728f3a1b3075712331440eff1749da6c530
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/config.py.in
M tests/numaUtilsTests.py
M tests/samplingTests.py
M tests/vmApiTests.py
M tests/vmTests.py
M vdsm/vdsm
M vdsm/virt/sampling.py
M vdsm/virt/vm.py
8 files changed, 420 insertions(+), 121 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/80/29980/1
diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in
index 52e0cd9..c411e4e 100644
--- a/lib/vdsm/config.py.in
+++ b/lib/vdsm/config.py.in
@@ -190,6 +190,14 @@
('vm_sample_cpu_tune_interval', '15', None),
+ ('vm_sampling_threads', '20',
+ 'Number of vm sampling threads. Should match libvirt thread '
+ 'pool size (default 20)'),
+
+ ('vm_sampling_tasks', '500',
+ 'Number of vm sampling tasks. Should be bigger then number of '
+ 'running vms (default 500)'),
+
('trust_store_path', '@TRUSTSTORE@',
'Where the certificates and keys are situated.'),
diff --git a/tests/numaUtilsTests.py b/tests/numaUtilsTests.py
index e5dd293..93e30a9 100644
--- a/tests/numaUtilsTests.py
+++ b/tests/numaUtilsTests.py
@@ -65,7 +65,7 @@
(3, 1, 19590000000L, 2)], 15
-class FakeVmStatsThread:
+class FakeVmStats:
def __init__(self, vm):
self._vm = vm
self.sampleVcpuPinning = FakeAdvancedStatsFunction()
@@ -98,7 +98,7 @@
'memory': '1024',
'nodeIndex': 1}]}
with vmTests.FakeVM(VM_PARAMS) as fake:
- fake._vmStats = FakeVmStatsThread(fake)
+ fake._vmStats = FakeVmStats(fake)
expectedResult = {'0': [0, 1], '1': [0, 1]}
vmNumaNodeRuntimeMap = numaUtils.getVmNumaNodeRuntimeInfo(fake)
self.assertEqual(expectedResult, vmNumaNodeRuntimeMap)
diff --git a/tests/samplingTests.py b/tests/samplingTests.py
index 2ff703e..1089960 100644
--- a/tests/samplingTests.py
+++ b/tests/samplingTests.py
@@ -111,3 +111,228 @@
s1 = sampling.InterfaceSample(lo)
s1.operstate = 'x'
self.assertEquals('operstate:x', s1.connlog_diff(s0))
+
+
+class Task(object):
+
+ def __init__(self, interval, name, error=None):
+ self.interval = interval
+ self.name = name
+ self.error = error
+ self.executed = 0
+
+ def __call__(self):
+ self.executed += 1
+ if self.error:
+ raise self.error
+
+
+class SamplingIterationTests(TestCaseBase):
+
+ task1 = Task(2, "task1")
+ task2 = Task(2, "task2")
+ task3 = Task(3, "task3")
+ samplings = [task1, task2, task3]
+
+ virtual_times = [
+ # cycle 1
+ (2, task1),
+ (2, task2),
+ (3, task3),
+ (4, task1),
+ (4, task2),
+ (6, task1),
+ (6, task2),
+ (6, task3),
+ # cycle 2
+ (8, task1),
+ (8, task2),
+ (9, task3),
+ (10, task1),
+ (10, task2),
+ (12, task1),
+ (12, task2),
+ (12, task3),
+ ]
+
+ delays = [
+ (2, task1),
+ (0, task2),
+ (1, task3),
+ (1, task1),
+ (0, task2),
+ (2, task1),
+ (0, task2),
+ (0, task3),
+ ]
+
+ def test_virtual_time(self):
+ iterator = sampling.cycle(self.samplings)
+ for pair in self.virtual_times:
+ self.assertEquals(next(iterator), pair)
+
+ def test_delays(self):
+ iterator = sampling.delays(self.virtual_times)
+ for i in range(2):
+ for pair in self.delays:
+ self.assertEquals(next(iterator), pair)
+
+ def test_chain(self):
+ iterator = sampling.delays(sampling.cycle(self.samplings))
+ for i in range(3):
+ for pair in self.delays:
+ self.assertEquals(next(iterator), pair)
+
+
+class SamplerTests(TestCaseBase):
+
+ def setUp(self):
+ self.scheduler = FakeScheduler()
+ self.threadpool = FakeThreadPool()
+ self.handler = FakeHandler(True)
+ self.task1 = Task(2, "task1")
+ self.task2 = Task(3, "task2")
+ self.task3 = Task(4, "task3")
+ samplings = [self.task1, self.task2, self.task3]
+ self.sampler = sampling.Sampler("vm-id", samplings, self.handler,
+ scheduler=self.scheduler,
+ threadpool=self.threadpool)
+
+ def test_start(self):
+ self.sampler.start()
+ self.assertEquals(self.scheduler.call.delay, 2)
+ self.assertEquals(self.scheduler.call.func, self.sampler._dispatch)
+ self.assertEquals(self.threadpool.tasks, [])
+
+ def test_stop(self):
+ self.sampler.start()
+ self.sampler.stop()
+ self.assertEquals(self.scheduler.call.func, INVALID)
+ self.assertEquals(self.threadpool.tasks, [])
+
+ def test_dispatch(self):
+ self.sampler.start()
+ self.scheduler.fire()
+ self.assertEquals(self.threadpool.tasks,
+ [("vm-id_task1", self.sampler)])
+
+ def test_run_samplings(self):
+ self.sampler.start()
+
+ # time = 2
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.scheduler.call.delay, 1)
+ self.assertEquals(self.task1.executed, 1)
+
+ # time = 3
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task2.executed, 1)
+ self.assertEquals(self.scheduler.call.delay, 1)
+
+ # time = 4
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task1.executed, 2)
+ self.assertEquals(self.scheduler.call.delay, 0)
+
+ # time = 4
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task3.executed, 1)
+ self.assertEquals(self.scheduler.call.delay, 2)
+
+ # time = 6
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task1.executed, 3)
+ self.assertEquals(self.scheduler.call.delay, 0)
+
+ # time = 6
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task2.executed, 2)
+ self.assertEquals(self.scheduler.call.delay, 2)
+
+
+class SamplerHandlerTests(TestCaseBase):
+
+ def test_does_handle_error(self):
+ self.check(FakeHandler(True))
+
+ def test_does_not_handle_error(self):
+ self.check(FakeHandler(False))
+
+ def check(self, handler):
+ scheduler = FakeScheduler()
+ threadpool = FakeThreadPool()
+ error = Exception("Sampling task failed")
+ task = Task(2, "task", error=error)
+ sampler = sampling.Sampler("vm-id", [task], handler,
+ scheduler=scheduler, threadpool=threadpool)
+ sampler.start()
+ scheduler.fire()
+ threadpool.run()
+ self.assertEquals(handler.error, error)
+
+
+# Helpers
+
+
+class FakeHandler(object):
+
+ def __init__(self, result):
+ self.error = None
+ self.result = result
+
+ def handleStatsException(self, e):
+ self.error = e
+ return self.result
+
+
+class FakeScheduler(object):
+
+ def __init__(self):
+ # Schedule only single call to simplify the tests
+ self.call = None
+
+ def schedule(self, delay, func):
+ assert self.call is None
+ self.call = Call(delay, func)
+ return self.call
+
+ def fire(self):
+ call = self.call
+ self.call = None
+ call.func()
+
+
+class Call(object):
+
+ def __init__(self, delay, func):
+ self.delay = delay
+ self.func = func
+
+ def cancel(self):
+ self.func = INVALID
+
+
+def INVALID():
+ pass
+
+
+class FakeThreadPool(object):
+
+ def __init__(self):
+ self.tasks = []
+
+ def queueTask(self, id, task):
+ # Executes task on a worker thread in the real object. Does nothing
+ # here to simplify the tests.
+ self.tasks.append((id, task))
+
+ def run(self):
+ # Run next task manually to simplify the tests
+ task_id, task = self.tasks.pop()
+ task()
diff --git a/tests/vmApiTests.py b/tests/vmApiTests.py
index bc68c6d..ae2f98f 100644
--- a/tests/vmApiTests.py
+++ b/tests/vmApiTests.py
@@ -22,6 +22,7 @@
import os
import os.path
+from virt import sampling
from virt import vmexitreason
from vdsm import define
from testrunner import VdsmTestCase as TestCaseBase
@@ -56,11 +57,15 @@
@contextmanager
def ensureVmStats(vm):
- vm._initVmStats()
+ sampling.start()
try:
- yield vm
+ vm._initVmStats()
+ try:
+ yield vm
+ finally:
+ vm._vmStats.stop()
finally:
- vm._vmStats.stop()
+ sampling.stop()
class TestVmStats(TestSchemaCompliancyBase):
diff --git a/tests/vmTests.py b/tests/vmTests.py
index fda71f5..3ef04f9 100644
--- a/tests/vmTests.py
+++ b/tests/vmTests.py
@@ -1338,7 +1338,7 @@
self.assertEqual(stats['exitMessage'], msg)
-class TestVmStatsThread(TestCaseBase):
+class TestVmStats(TestCaseBase):
VM_PARAMS = {'displayPort': -1, 'displaySecurePort': -1,
'display': 'qxl', 'displayIp': '127.0.0.1',
'vmType': 'kvm', 'memSize': 1024}
@@ -1349,8 +1349,8 @@
GBPS = 10 ** 9 / 8
MAC = '52:54:00:59:F5:3F'
with FakeVM() as fake:
- mock_stats_thread = vm.VmStatsThread(fake)
- res = mock_stats_thread._getNicStats(
+ mock_stats = vm.VmStats(fake)
+ res = mock_stats._getNicStats(
name='vnettest', model='virtio', mac=MAC,
start_sample=(2 ** 64 - 15 * GBPS, 1, 2, 3, 0, 4, 5, 6),
end_sample=(0, 7, 8, 9, 5 * GBPS, 10, 11, 12),
@@ -1366,9 +1366,9 @@
# bz1073478 - main case
with FakeVM(self.VM_PARAMS, self.DEV_BALLOON) as fake:
self.assertEqual(fake._dom, None)
- mock_stats_thread = vm.VmStatsThread(fake)
+ mock_stats = vm.VmStats(fake)
res = {}
- mock_stats_thread._getBalloonStats(res)
+ mock_stats._getBalloonStats(res)
self.assertIn('balloonInfo', res)
self.assertIn('balloon_cur', res['balloonInfo'])
@@ -1376,9 +1376,9 @@
# bz1073478 - extra case
with FakeVM(self.VM_PARAMS, self.DEV_BALLOON) as fake:
fake._dom = FakeDomain()
- mock_stats_thread = vm.VmStatsThread(fake)
+ mock_stats = vm.VmStats(fake)
res = {}
- mock_stats_thread._getBalloonStats(res)
+ mock_stats._getBalloonStats(res)
self.assertIn('balloonInfo', res)
self.assertIn('balloon_cur', res['balloonInfo'])
diff --git a/vdsm/vdsm b/vdsm/vdsm
index cbc16ff..2343606 100755
--- a/vdsm/vdsm
+++ b/vdsm/vdsm
@@ -38,6 +38,8 @@
from storage.dispatcher import Dispatcher
from storage.hsm import HSM
+from virt import sampling
+
import zombiereaper
import dsaversion
@@ -71,6 +73,7 @@
profile.start()
libvirtconnection.start_event_loop()
+ sampling.start()
if config.getboolean('irs', 'irs_enable'):
try:
@@ -88,6 +91,7 @@
profile.stop()
finally:
cif.prepareForShutdown()
+ sampling.stop()
def run(pidfile=None):
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 3479eee..0cd823a 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -26,24 +26,50 @@
Contains a reverse dictionary pointing from error string to its error code.
"""
-import threading
-import os
-import time
-import logging
import errno
import ethtool
+import itertools
+import logging
+import operator
+import os
import re
+import threading
+import time
+from vdsm.config import config
from vdsm import utils
from vdsm import netinfo
+from vdsm import schedule
from vdsm.ipwrapper import getLinks
from vdsm.constants import P_VDSM_RUN
import caps
+from storage import threadPool
_THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled'
if not os.path.exists(_THP_STATE_PATH):
_THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled'
+
+
+_scheduler = None
+_threadpool = None
+
+
+def start():
+ """ Called during application startup """
+ global _scheduler, _threadpool
+ threads = config.getint('vars', 'vm_sampling_threads')
+ tasks = config.getint('vars', 'vm_sampling_tasks')
+ _threadpool = threadPool.ThreadPool(threads, maxTasks=tasks)
+ _scheduler = schedule.Scheduler("sampling.scheduler")
+
+
+def stop():
+ """ Called during application shutdown """
+ if _threadpool:
+ _threadpool.joinAll(waitForTasks=False, waitForThreads=False)
+ if _scheduler:
+ _scheduler.cancel()
class InterfaceSample:
@@ -318,9 +344,13 @@
def interval(self):
return self._interval
+ @property
+ def name(self):
+ return self._function.__name__
+
def __repr__(self):
return "<AdvancedStatsFunction %s at 0x%x>" % (
- self._function.__name__, id(self._function.__name__))
+ self.name, id(self))
def __call__(self, *args, **kwargs):
retValue = self._function(*args, **kwargs)
@@ -347,115 +377,136 @@
return bgn_sample, end_sample, (end_time - bgn_time)
-class AdvancedStatsThread(threading.Thread):
+class Sampler(object):
"""
- A thread that runs the registered AdvancedStatsFunction objects
- for statistic and monitoring purpose.
+ Execute sampling tasks on the threadpool according to sampling schedule.
+
+ The sepcified samplings tasks are executed on threadpool one at a time, as
+ if there was a thread running them one after another in a loop.
+
+ If one of the sampling task get stuck, the rest of the sampling tasks are
+ delayed. This is important to prevent flooding of the thread pool with
+ possibly blocking sampling task, when the underlying libvirt thread is
+ stuck. When a stuck sampling tasks finish, the sampling tasks continue
+ normally.
+
+ When a sampling task is stuck, no other task can run on the blocked worker
+ thread. The threadpool is responsible for detecting and handling this.
+
+ Each vm should create one sampler, to ensure that there is only one
+ sampling task per vm excuting on the threadpool.
"""
- DEFAULT_LOG = logging.getLogger("AdvancedStatsThread")
- def __init__(self, log=DEFAULT_LOG, daemon=False):
- """
- Initialize an AdvancedStatsThread object
- """
- threading.Thread.__init__(self)
- self.daemon = daemon
+ _log = logging.getLogger("Sampler")
- self._log = log
- self._stopEvent = threading.Event()
- self._contEvent = threading.Event()
-
- self._statsTime = None
- self._statsFunctions = []
-
- def addStatsFunction(self, *args):
- """
- Register the functions listed as arguments
- """
- if self.isAlive():
- raise RuntimeError("AdvancedStatsThread is started")
-
- for statsFunction in args:
- self._statsFunctions.append(statsFunction)
+ def __init__(self, vm_id, samplings, handler, scheduler=None,
+ threadpool=None):
+ self._vm_id = vm_id
+ self._samplings = samplings
+ self._handler = handler
+ self._scheduler = scheduler or _scheduler
+ self._threadpool = threadpool or _threadpool
+ self._lock = threading.Lock()
+ self._running = False
+ self._iterator = None
+ self._task = _INVALID
+ self._call = None
+ self._last_sample_time = 0 # Not sample taken yet
def start(self):
- """
- Start the execution of the thread and exit
- """
- self._log.debug("Start statistics collection")
- threading.Thread.start(self)
+ with self._lock:
+ if self._running:
+ raise AssertionError("Sampler is running")
+ self._log.debug("Starting sampler for vm %s", self._vm_id)
+ self._running = True
+ self._iterator = delays(cycle(self._samplings))
+ self._schedule_next_task()
def stop(self):
+ with self._lock:
+ if self._running:
+ self._log.debug("Stopping sampler for vm %s", self._vm_id)
+ self._running = False
+ self._call.cancel()
+ self._call = None
+ self._task = _INVALID
+ self._iterator = None
+
+ @property
+ def last_sample_time(self):
+ return self._last_sample_time
+
+ # Task interface
+
+ def __call__(self):
+ try:
+ self._last_sample_time = time.time()
+ self._task()
+ except Exception as e:
+ if not self._handler.handleStatsException(e):
+ self._log.exception("Stats function failed: %s", self._task)
+ finally:
+ with self._lock:
+ if self._running:
+ self._schedule_next_task()
+
+ # Private
+
+ def _dispatch(self):
"""
- Stop the execution of the thread and exit
+ Called from the scheduler thread when its time to ran the current
+ sampling task.
"""
- self._log.debug("Stop statistics collection")
- self._stopEvent.set()
- self._contEvent.set()
+ with self._lock:
+ if self._running:
+ task_id = self._vm_id + '_' + self._task.name
+ self._threadpool.queueTask(task_id, self)
- def pause(self):
- """
- Pause the execution of the registered functions
- """
- self._log.debug("Pause statistics collection")
- self._contEvent.clear()
+ def _schedule_next_task(self):
+ delay, self._task = next(self._iterator)
+ self._call = self._scheduler.schedule(delay, self._dispatch)
- def cont(self):
- """
- Resume the execution of the registered functions
- """
- self._log.debug("Resume statistics collection")
- self._contEvent.set()
- def getLastSampleTime(self):
- return self._statsTime
+# Sentinel used to mark a task as invalid, allowing running sampling task
+# without holding a lock.
+def _INVALID():
+ pass
- def run(self):
- self._log.debug("Stats thread started")
- self._contEvent.set()
- while not self._stopEvent.isSet():
- try:
- self.collect()
- except:
- self._log.debug("Stats thread failed", exc_info=True)
+def delays(virtual_times):
+ """
+ Accept stream of tuples (virtual_time, task) and return stream of tuples
+ (delay, task).
+ """
+ last_virtual_time = 0
+ for virtual_time, task in virtual_times:
+ delay = virtual_time - last_virtual_time
+ last_virtual_time = virtual_time
+ yield delay, task
- self._log.debug("Stats thread finished")
- def handleStatsException(self, ex):
- """
- Handle the registered function exceptions and eventually stop the
- sampling if a fatal error occurred.
- """
- return False
+def cycle(samplings):
+ """
+ Returns endless stream of tuples (virtual_time, task)
+ """
+ samplings = group_by_interval(samplings)
+ virtual_time = 1
+ while True:
+ for interval, tasks in samplings:
+ if virtual_time % interval == 0:
+ for task in tasks:
+ yield virtual_time, task
+ virtual_time += 1
- def collect(self):
- # TODO: improve this with lcm
- _mInt = map(lambda x: x.interval, self._statsFunctions)
- maxInterval = reduce(lambda x, y: x * y, set(_mInt), 1)
- intervalAccum = 0
- while not self._stopEvent.isSet():
- self._contEvent.wait()
-
- self._statsTime = time.time()
- waitInterval = maxInterval
-
- for statsFunction in self._statsFunctions:
- thisInterval = statsFunction.interval - (
- intervalAccum % statsFunction.interval)
- waitInterval = min(waitInterval, thisInterval)
-
- if intervalAccum % statsFunction.interval == 0:
- try:
- statsFunction()
- except Exception as e:
- if not self.handleStatsException(e):
- self._log.error("Stats function failed: %s",
- statsFunction, exc_info=True)
-
- self._stopEvent.wait(waitInterval)
- intervalAccum = (intervalAccum + waitInterval) % maxInterval
+def group_by_interval(samplings):
+ """
+ Groups samplings by interval.
+ """
+ keyfn = operator.attrgetter('interval')
+ samplings = sorted(samplings, key=keyfn)
+ return [(interval, tuple(tasks))
+ for interval, tasks in itertools.groupby(samplings, keyfn)]
class HostStatsThread(threading.Thread):
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 7835e99..f1d8983 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -65,7 +65,7 @@
from . import vmstatus
from .vmtune import updateIoTuneDom
-from .sampling import AdvancedStatsFunction, AdvancedStatsThread
+from .sampling import AdvancedStatsFunction, Sampler
from .utils import isVdsmImage, XMLElement
from vmpowerdown import VmShutdown, VmReboot
@@ -167,7 +167,7 @@
pass
-class VmStatsThread(AdvancedStatsThread):
+class VmStats(object):
MBPS_TO_BPS = 10 ** 6 / 8
# CPU tune sampling window
@@ -184,7 +184,6 @@
_libvirt_metadata_supported = True
def __init__(self, vm):
- AdvancedStatsThread.__init__(self, log=vm.log, daemon=True)
self._vm = vm
self.highWrite = (
@@ -237,11 +236,18 @@
config.getint('vars', 'vm_sample_cpu_tune_interval'),
self.CPU_TUNE_SAMPLING_WINDOW))
- self.addStatsFunction(
- self.highWrite, self.updateVolumes, self.sampleCpu,
- self.sampleDisk, self.sampleDiskLatency, self.sampleNet,
- self.sampleBalloon, self.sampleVmJobs, self.sampleVcpuPinning,
- self.sampleCpuTune)
+ samplings = (self.highWrite, self.updateVolumes, self.sampleCpu,
+ self.sampleDisk, self.sampleDiskLatency, self.sampleNet,
+ self.sampleBalloon, self.sampleVmJobs,
+ self.sampleVcpuPinning, self.sampleCpuTune)
+
+ self._sampler = Sampler(self._vm.id, samplings, self)
+
+ def start(self):
+ self._sampler.start()
+
+ def stop(self):
+ self._sampler.stop()
def _highWrite(self):
if not self._vm.isDisksStatsCollectionEnabled():
@@ -311,13 +317,13 @@
metadataCpuLimit = None
try:
- if VmStatsThread._libvirt_metadata_supported:
+ if VmStats._libvirt_metadata_supported:
metadataCpuLimit = self._vm._dom.metadata(
libvirt.VIR_DOMAIN_METADATA_ELEMENT,
METADATA_VM_TUNE_URI, 0)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED:
- VmStatsThread._libvirt_metadata_supported = False
+ VmStats._libvirt_metadata_supported = False
self._log.error("libvirt does not support metadata")
elif (e.get_error_code()
@@ -581,7 +587,7 @@
stats = {}
try:
- stats['statsAge'] = time.time() - self.getLastSampleTime()
+ stats['statsAge'] = time.time() - self._sampler.last_sample_time
except TypeError:
self._log.debug("Stats age not available")
stats['statsAge'] = -1.0
@@ -2778,7 +2784,7 @@
WARNING: this method should only gather statistics by copying data.
Especially avoid costly and dangerous ditrect calls to the _dom
- attribute. Use the VmStatsThread instead!
+ attribute. Use the VmStats instead!
"""
if self.lastStatus == vmstatus.DOWN:
@@ -3059,7 +3065,7 @@
return domxml.toxml()
def _initVmStats(self):
- self._vmStats = VmStatsThread(self)
+ self._vmStats = VmStats(self)
self._vmStats.start()
self._guestEventTime = self._startTime
@@ -3174,7 +3180,7 @@
if drive['device'] == 'disk' and isVdsmImage(drive):
self._syncVolumeChain(drive)
- # VmStatsThread may use block devices info from libvirt.
+ # VmStats may use block devices info from libvirt.
# So, run it after you have this info
self._initVmStats()
self.guestAgent = guestagent.GuestAgent(
--
To view, visit http://gerrit.ovirt.org/29980
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I5539a728f3a1b3075712331440eff1749da6c530
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: resourceManager: Keep resource state if registerResource fails
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: resourceManager: Keep resource state if registerResource fails
......................................................................
resourceManager: Keep resource state if registerResource fails
Previous code was increasing resource activeUsers counter, but
exceptions raised after that caused the method to fail, leaving a locked
resources that nobody can release. Such locked resource may lead to
failure of any pool operation, making the host non-operational, and
requiring a restart of vdsm.
The failure in the field was caused by Python logging bug, raising
OSError when message was logged when log file was rotated. However, such
failure can happen everywhere, and locking code must be written in such
way that failure would never leave a resource locked.
This patch ensure that resource is added and activeUsers counter is
increased only if everything else was fine.
Since simulating logging error is hard, the tests monkeypatch the
RequestRef class to simulate a failure. This is little ugly, depending
on internal implementation detail, but I could not find a cleaner way.
Change-Id: I16abf41ebc8a8a99b292d38c945074752254a34b
Relates-To: https://bugzilla.redhat.com/1065650
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M tests/resourceManagerTests.py
M vdsm/storage/resourceManager.py
2 files changed, 50 insertions(+), 9 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/84/25284/1
diff --git a/tests/resourceManagerTests.py b/tests/resourceManagerTests.py
index 01b0669..e2b6461 100644
--- a/tests/resourceManagerTests.py
+++ b/tests/resourceManagerTests.py
@@ -29,6 +29,7 @@
import storage.resourceManager as resourceManager
from testrunner import VdsmTestCase as TestCaseBase
from testValidation import slowtest, stresstest
+import monkeypatch
class NullResourceFactory(resourceManager.SimpleResourceFactory):
@@ -209,6 +210,32 @@
ex.__class__.__name__)
self.fail("Managed to access an attribute not exposed by wrapper")
+
+ def testRegisterResourceFailureExclusive(self):
+ # This regeisterion must fail
+ with monkeypatch.MonkeyPatchScope(
+ [(resourceManager, 'RequestRef', FakeRequestRef)]):
+ self.assertRaises(Failure, self.manager.registerResource, "string",
+ "test", resourceManager.LockType.exclusive, None)
+
+ # And it should not leave a locked resource
+ with self.manager.acquireResource("string", "test",
+ resourceManager.LockType.exclusive,
+ 0):
+ pass
+
+ def testRegisterResourceFailureShared(self):
+ # This regeisterion must fail
+ with monkeypatch.MonkeyPatchScope(
+ [(resourceManager, 'RequestRef', FakeRequestRef)]):
+ self.assertRaises(Failure, self.manager.registerResource, "string",
+ "test", resourceManager.LockType.shared, None)
+
+ # And it should not leave a locked resource
+ with self.manager.acquireResource("string", "test",
+ resourceManager.LockType.exclusive,
+ 0):
+ pass
def testAccessAttributeNotExposedByRequestRef(self):
resources = []
@@ -705,3 +732,14 @@
except:
resourceManager.ResourceManager._instance = None
raise
+
+# Helpers
+
+
+class Failure(Exception):
+ """ Unique exception for testing """
+
+
+def FakeRequestRef(*a, **kw):
+ """ Used to simulate failures when registering a resource """
+ raise Failure()
diff --git a/vdsm/storage/resourceManager.py b/vdsm/storage/resourceManager.py
index 1be1450..ce144cf 100644
--- a/vdsm/storage/resourceManager.py
+++ b/vdsm/storage/resourceManager.py
@@ -559,23 +559,25 @@
if len(resource.queue) == 0 and \
resource.currentLock == LockType.shared and \
request.lockType == LockType.shared:
- resource.activeUsers += 1
self._log.debug("Resource '%s' found in shared state "
"and queue is empty, Joining current "
"shared lock (%d active users)",
- fullName, resource.activeUsers)
+ fullName, resource.activeUsers + 1)
request.grant()
+ ref = RequestRef(request)
contextCleanup.defer(request.emit,
ResourceRef(namespace, name,
resource.realObj,
request.reqID))
- return RequestRef(request)
+ resource.activeUsers += 1
+ return ref
- resource.queue.insert(0, request)
self._log.debug("Resource '%s' is currently locked, "
"Entering queue (%d in queue)",
- fullName, len(resource.queue))
- return RequestRef(request)
+ fullName, len(resource.queue) + 1)
+ ref = RequestRef(request)
+ resource.queue.insert(0, request)
+ return ref
# TODO : Creating the object inside the namespace lock causes
# the entire namespace to lock and might cause
@@ -592,19 +594,20 @@
contextCleanup.defer(request.cancel)
return RequestRef(request)
- resource = resources[name] = ResourceManager.ResourceInfo(
- obj, namespace, name)
+ resource = ResourceManager.ResourceInfo(obj, namespace, name)
resource.currentLock = request.lockType
resource.activeUsers += 1
self._log.debug("Resource '%s' is free. Now locking as '%s' "
"(1 active user)", fullName, request.lockType)
request.grant()
+ ref = RequestRef(request)
contextCleanup.defer(request.emit,
ResourceRef(namespace, name,
resource.realObj,
request.reqID))
- return RequestRef(request)
+ resources[name] = resource
+ return ref
def releaseResource(self, namespace, name):
# WARN : unlike in resource acquire the user now has the request
--
To view, visit http://gerrit.ovirt.org/25284
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I16abf41ebc8a8a99b292d38c945074752254a34b
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: call stop_event_loop upon exit
by Dan Kenigsberg
Dan Kenigsberg has uploaded a new change for review.
Change subject: call stop_event_loop upon exit
......................................................................
call stop_event_loop upon exit
For cleanliness, whomever starts a thread should stop it when it is no
longer needed.
Change-Id: I9ab0d9b7be976e37a89a96d2f09a353186008731
Signed-off-by: Dan Kenigsberg <danken(a)redhat.com>
---
M vdsm/vdsm
1 file changed, 1 insertion(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/32/26532/1
diff --git a/vdsm/vdsm b/vdsm/vdsm
index 652797c..fd9b3f8 100755
--- a/vdsm/vdsm
+++ b/vdsm/vdsm
@@ -81,6 +81,7 @@
signal.pause()
finally:
cif.prepareForShutdown()
+ libvirtconnection.stop_event_loop()
def run(pidfile=None):
--
To view, visit http://gerrit.ovirt.org/26532
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9ab0d9b7be976e37a89a96d2f09a353186008731
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Dan Kenigsberg <danken(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: virt: let Engine start a VM on an UNKOWN OS
by Dan Kenigsberg
Dan Kenigsberg has uploaded a new change for review.
Change subject: virt: let Engine start a VM on an UNKOWN OS
......................................................................
virt: let Engine start a VM on an UNKOWN OS
This superflouse validation was introduced as a "bandage" to avoid
https://bugzilla.redhat.com/716705, where Vdsm failed to recognize
VMs with "UNKOWN" os injected to their bios.
Now that we recognize VMs based on guest channels existence, we can let
Engine start a VM on whatever host it deems valid.
Backward compatibility caveat: if a VM is started on an UNKOWN os, and
then migrated to an old Vdsm (pre ovirt-3.4.2), it would be killed when
that old Vdsm is to be restarted. It's not probable that someone would
start a long-running VM on an UNKOWN platform, we may want to wait with
this patch until ovirt-3.3 is deprecated.
Change-Id: Iee7b3913e76923043daab8fb85854c3290208237
Signed-off-by: Dan Kenigsberg <danken(a)redhat.com>
---
M vdsm/API.py
1 file changed, 0 insertions(+), 5 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/59/32459/1
diff --git a/vdsm/API.py b/vdsm/API.py
index 8d1bca6..d6f318f 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -237,11 +237,6 @@
vmParams['vmId'])
vmParams['volatileFloppy'] = True
- if caps.osversion()['name'] == caps.OSName.UNKNOWN:
- return {'status': {'code': errCode['createErr']
- ['status']['code'],
- 'message': 'Unknown host operating system'}}
-
if 'sysprepInf' in vmParams:
if not self._createSysprepFloppyFromInf(vmParams['sysprepInf'],
vmParams['floppy']):
--
To view, visit http://gerrit.ovirt.org/32459
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iee7b3913e76923043daab8fb85854c3290208237
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Dan Kenigsberg <danken(a)redhat.com>
8 years, 9 months
Change in vdsm[master]: virt: enable libgfapi with snapshot support
by Federico Simoncelli
Federico Simoncelli has uploaded a new change for review.
Change subject: virt: enable libgfapi with snapshot support
......................................................................
virt: enable libgfapi with snapshot support
Change-Id: Ie0965bef605ba67297670c0bf7924f88fa3b0460
Signed-off-by: Federico Simoncelli <fsimonce(a)redhat.com>
---
M vdsm/storage/hsm.py
M vdsm/virt/vm.py
2 files changed, 38 insertions(+), 17 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/68/33768/1
diff --git a/vdsm/storage/hsm.py b/vdsm/storage/hsm.py
index cd6f4b9..bde04be 100644
--- a/vdsm/storage/hsm.py
+++ b/vdsm/storage/hsm.py
@@ -3225,8 +3225,7 @@
path = os.path.join(dom.domaindir, sd.DOMAIN_IMAGES, imgUUID,
volUUID)
volInfo = {'domainID': sdUUID, 'imageID': imgUUID,
- 'volumeID': volUUID, 'path': path,
- 'volType': "path"}
+ 'volumeID': volUUID, 'path': path}
leasePath, leaseOffset = dom.getVolumeLease(imgUUID, volUUID)
@@ -3237,8 +3236,8 @@
})
imgVolumesInfo.append(volInfo)
- if volUUID == leafUUID:
- leafInfo = volInfo
+
+ leafInfo = dom.produceVolume(imgUUID, volUUID).getVmVolumeInfo()
return {'path': leafPath, 'info': leafInfo,
'imgVolumesInfo': imgVolumesInfo}
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 1eebb69..af0830a 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -3821,20 +3821,43 @@
def snapshot(self, snapDrives, memoryParams):
"""Live snapshot command"""
- def _diskSnapshot(vmDev, newPath, sourceType):
+ def _diskSnapshot(vmDev, oldDrive, newDriveDict):
"""Libvirt snapshot XML"""
+ sourceType = 'file'
+ sourceAttrs = {}
+ hostAttrs = {}
+
+ # TODO: unify these with the Drive.getXML() method
+ if oldDrive.networkDev:
+ sourceType = 'network'
+ sourceAttrs.update({
+ 'name': newDriveDict['volumeInfo']['path'],
+ 'protocol': newDriveDict['volumeInfo']['protocol']})
+ hostAttrs.update({
+ 'name': newDriveDict['volumeInfo']['volfileServer'],
+ 'port': newDriveDict['volumeInfo']['volPort'],
+ 'transport': newDriveDict['volumeInfo']['volTransport']})
+ else:
+ sourceAttrs.update({'file': newDriveDict['path']})
+
+ # Libvirt versions before 1.2.2 do not understand 'type'
+ # and treat all snapshots as if they are type='file'.
+ # In order to ensure proper handling of block snapshots
+ # in modern libvirt versions, we specify type='block'
+ # and dev=path for block volumes but we always speficy
+ # the file=path for backwards compatibility.
+ if oldDrive.blockDev:
+ sourceType = 'block'
+ sourceAttrs.update({'dev': newDriveDict['path']})
disk = vmxml.Element('disk', name=vmDev, snapshot='external',
type=sourceType)
- # Libvirt versions before 1.2.2 do not understand 'type' and treat
- # all snapshots as if they are type='file'. In order to ensure
- # proper handling of block snapshots in modern libvirt versions,
- # we specify type='block' and dev=path for block volumes but we
- # always speficy the file=path for backwards compatibility.
- args = {'type': sourceType, 'file': newPath}
- if sourceType == 'block':
- args['dev'] = newPath
- disk.appendChildWithArgs('source', **args)
+
+ source = disk.appendChildWithArgs('source', **sourceAttrs)
+
+ if hostAttrs:
+ source.appendChildWithArgs('host', **hostAttrs)
+
return disk
def _normSnapDriveParams(drive):
@@ -3964,9 +3987,8 @@
_rollbackDrives(preparedDrives)
return errCode['snapshotErr']
- snapType = 'block' if vmDrives[vmDevName].blockDev else 'file'
- snapelem = _diskSnapshot(vmDevName, newDrives[vmDevName]["path"],
- snapType)
+ snapelem = _diskSnapshot(vmDevName, vmDrives[vmDevName],
+ newDrives[vmDevName])
disks.appendChild(snapelem)
snap.appendChild(disks)
--
To view, visit http://gerrit.ovirt.org/33768
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie0965bef605ba67297670c0bf7924f88fa3b0460
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Federico Simoncelli <fsimonce(a)redhat.com>
8 years, 9 months