Nir Soffer has uploaded a new change for review.
Change subject: contrib: Simple jsonrpc client
contrib: Simple jsonrpc client
This is a simple jsonrpc client for communicating with the jsonrpc
server from the command line.
method one of the mehtods described in json schema
params optionl json object with message parameters
Calling method without arguements:
# jsonrpc Host.getVMList
"jsonrpc": "2.0",
"id": "0e043d83-294a-4d31-b1b6-6dc2f2747494",
"result": [
Calling method with arguements:
# jsonrpc VM.getStats '{"vmID": "b3f6fa00-b315-4ad4-8108-f73da817b5c5"}'
"jsonrpc": "2.0",
"id": "cefd25a3-6250-4123-8a56-d7047899e19e",
"result": [
"status": "Down",
"exitMessage": "Admin shut down from the engine",
"vmId": "b3f6fa00-b315-4ad4-8108-f73da817b5c5",
"exitReason": 6,
"timeOffset": "0",
"exitCode": 0
Requires library:
Change-Id: Ia6273eabf6f3601602659d1e4e748d8025ae8084
Signed-off-by: Nir Soffer <nsoffer(a)>
A contrib/jsonrpc
1 file changed, 104 insertions(+), 0 deletions(-)
git pull ssh:// refs/changes/81/35181/1
diff --git a/contrib/jsonrpc b/contrib/jsonrpc
new file mode 100755
index 0000000..3080193
--- /dev/null
+++ b/contrib/jsonrpc
@@ -0,0 +1,104 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
+jsonrpc-cli - Vdsm jsonrpc client
+import json
+import os
+import signal
+import sys
+import time
+import uuid
+import optparse
+import stomp
+# Copied from lib/vdsm/
+PKIDIR = '/etc/pki/vdsm'
+KEYFILE = os.path.join(PKIDIR, 'keys/vdsmkey.pem')
+CERTFILE = os.path.join(PKIDIR, 'certs/vdsmcert.pem')
+CACERT = os.path.join(PKIDIR, 'certs/cacert.pem')
+PORT = 54321
+DESTINATION = "/queue/_local/vdsm/requests"
+class Listener(stomp.ConnectionListener):
+ def on_error(self, headers, message):
+ print 'Error: %s' % message
+ terminate()
+ def on_message(self, headers, message):
+ msg = json.loads(message)
+ print json.dumps(msg, indent=4)
+ terminate()
+def main(args):
+ parser = option_parser()
+ options, args = parser.parse_args(args)
+ if not args:
+ parser.error("method required")
+ msg = {
+ "id": str(uuid.uuid4()),
+ "jsonrpc": "2.0",
+ "method": args[0]
+ }
+ if len(args) > 1:
+ msg["params"] = json.loads(args[1])
+ conn = stomp.Connection10(
+ host_and_ports=((, PORT),),
+ use_ssl=True,
+ ssl_key_file=KEYFILE,
+ ssl_cert_file=CERTFILE,
+ ssl_ca_certs=CACERT)
+ conn.set_listener("", Listener())
+ conn.start()
+ conn.send(body=json.dumps(msg), destination=DESTINATION)
+ try:
+ signal.pause()
+ except KeyboardInterrupt:
+ pass
+ conn.disconnect()
+def option_parser():
+ parser = optparse.OptionParser(usage='%prog [options] method [params]')
+ parser.add_option("-a", "--host", dest="host",
+ help="host address (default localhost)")
+ parser.set_defaults(host="localhost")
+ return parser
+def terminate():
+ os.kill(os.getpid(), signal.SIGINT)
+if __name__ == "__main__":
+ main(sys.argv[1:])
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia6273eabf6f3601602659d1e4e748d8025ae8084
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)>
Nir Soffer has uploaded a new change for review.
Change subject: lvm: Fail loudly if called with unexptected input.
lvm: Fail loudly if called with unexptected input.
When creating pvs with the force option, we are very carefull to accept
only True. When using the jsonrpc transport, engine was sending "true"
and "false", causing the call to fail misteiously.
Now we are also carefull about rejecting invlid input, making debugging
Change-Id: If9e6754d4aa2efaf894a9309cfaa4595d710063b
Signed-off-by: Nir Soffer <nsoffer(a)>
M vdsm/storage/
1 file changed, 5 insertions(+), 0 deletions(-)
git pull ssh:// refs/changes/29/37329/1
diff --git a/vdsm/storage/ b/vdsm/storage/
index aa3c04b..549839a 100644
--- a/vdsm/storage/
+++ b/vdsm/storage/
@@ -724,6 +724,11 @@
+ # We must be very carefull here; any value execpt True or False is a user
+ # error.
+ if type(force) != bool:
+ raise ValueError("Invalid value for 'force': %r" % force)
if force is True:
options = ("-y", "-ff")
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: If9e6754d4aa2efaf894a9309cfaa4595d710063b
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)>
Adam Litke has uploaded a new change for review.
Change subject: Live Merge: Restore watermark tracking
Live Merge: Restore watermark tracking
Change-Id: I632f31e7795ec5d8c6f52a480116b14470c3163f
Signed-off-by: Adam Litke <alitke(a)>
M vdsm/virt/
1 file changed, 108 insertions(+), 10 deletions(-)
git pull ssh:// refs/changes/24/36924/1
diff --git a/vdsm/virt/ b/vdsm/virt/
index f22610d..09080b9 100644
--- a/vdsm/virt/
+++ b/vdsm/virt/
@@ -1512,12 +1512,94 @@
with self._confLock:
self.conf['timeOffset'] = newTimeOffset
+ def _getWriteWatermarks(self):
+ def pathToVolID(drive, path):
+ for vol in drive.volumeChain:
+ if os.path.realpath(vol['path']) == os.path.realpath(path):
+ return vol['volumeID']
+ raise LookupError("Unable to find VolumeID for path '%s'", path)
+ volAllocMap = {}
+ statsFlags = self._libvirtBackingChainStatsFlag()
+ conn = libvirtconnection.get()
+ blkStats = conn.domainListGetStats([self._dom._dom],
+ statsFlags)[0][1]
+ for i in xrange(0, blkStats['block.count']):
+ name = blkStats['' % i]
+ try:
+ drive = self._findDriveByName(name)
+ except LookupError:
+ continue
+ if not drive.blockDev or drive.format != 'cow':
+ continue
+ try:
+ path = blkStats['block.%i.path' % i]
+ alloc = blkStats['block.%i.allocation' % i]
+ except KeyError as e:
+ self.log.debug("Block stats are missing expected key '%s', "
+ "skipping volume", e.args[0])
+ continue
+ volID = pathToVolID(drive, path)
+ volAllocMap[volID] = alloc
+ return volAllocMap
+ def _getLiveMergeExtendCandidates(self):
+ # The common case is that there are no active jobs.
+ if not self.conf['_blockJobs'].values():
+ return {}
+ candidates = {}
+ watermarks = self._getWriteWatermarks()
+ for job in self.conf['_blockJobs'].values():
+ try:
+ drive = self._findDriveByUUIDs(job['disk'])
+ except LookupError:
+ # After an active layer merge completes the vdsm metadata will
+ # be out of sync for a brief period. If we cannot find the old
+ # disk then it's safe to skip it.
+ continue
+ if not drive.blockDev:
+ continue
+ if job['strategy'] == 'commit':
+ volumeID = job['baseVolume']
+ else:
+ self.log.debug("Unrecognized merge strategy '%s'",
+ job['strategy'])
+ continue
+ res = self.cif.irs.getVolumeInfo(drive.domainID, drive.poolID,
+ drive.imageID, volumeID)
+ if res['status']['code'] != 0:
+ self.log.error("Unable to get the info of volume %s (domain: "
+ "%s image: %s)", volumeID, drive.domainID,
+ drive.imageID)
+ continue
+ volInfo = res['info']
+ if volInfo['format'].lower() != 'cow':
+ continue
+ if volumeID in watermarks:
+ self.log.debug("Adding live merge extension candidate: "
+ "volume=%s allocation=%i", volumeID,
+ watermarks[volumeID])
+ candidates[drive.imageID] = {
+ 'alloc': watermarks[volumeID],
+ 'physical': int(volInfo['truesize']),
+ 'capacity': int(volInfo['apparentsize']),
+ 'volumeID': volumeID}
+ else:
+ self.log.warning("No watermark info available for %s",
+ volumeID)
+ return candidates
def _getExtendCandidates(self):
ret = []
- # FIXME: mergeCandidates should be a dictionary of candidate volumes
- # once libvirt starts reporting watermark information for all volumes.
- mergeCandidates = {}
+ mergeCandidates = self._getLiveMergeExtendCandidates()
for drive in self._devices[hwclass.DISK]:
if not drive.blockDev or drive.format != 'cow':
@@ -4771,6 +4853,14 @@
jobsRet[jobID] = entry
return jobsRet
+ def _libvirtBackingChainStatsFlag(self):
+ # Since libvirt 1.2.13, the virConnectGetAllDomainStats API will return
+ # block statistics for all volumes in the chain when using a new flag.
+ try:
+ except AttributeError:
+ return 0
def merge(self, driveSpec, baseVolUUID, topVolUUID, bandwidth, jobUUID):
if not caps.getLiveMergeSupport():
self.log.error("Live merge is not supported on this host")
@@ -4815,6 +4905,8 @@
if res['info']['voltype'] == 'SHARED':
self.log.error("merge: Refusing to merge into a shared volume")
return errCode['mergeErr']
+ baseSize = int(res['info']['apparentsize'])
+ baseCow = bool(res['info']['format'].lower() == 'cow')
# Indicate that we expect libvirt to maintain the relative paths of
# backing files. This is necessary to ensure that a volume chain is
@@ -4865,13 +4957,19 @@
# blockCommit will cause data to be written into the base volume.
# Perform an initial extension to ensure there is enough space to
- # copy all the required data. Normally we'd use monitoring to extend
- # the volume on-demand but internal watermark information is not being
- # reported by libvirt so we must do the full extension up front. In
- # the worst case, we'll need to extend 'base' to the same size as 'top'
- # plus a bit more to accomodate additional writes to 'top' during the
- # live merge operation.
- self.extendDriveVolume(drive, baseVolUUID, topSize)
+ # copy all the required data. If libvirt supports monitoring of
+ # backing chain volumes, just extend by one chunk now and monitor
+ # during the rest of the operation. Otherwise, extend now to
+ # accomodate the worst case scenario: no intersection between the
+ # allocated blocks in the base volume and the top volume.
+ if drive.blockDev and baseCow:
+ if self._libvirtBackingChainStatsFlag():
+ self.extendDrivesIfNeeded()
+ else:
+ extendSize = baseSize + topSize
+ self.log.debug("Preemptively extending volume %s with size %i"
+ "(job: %s)", baseVolUUID, extendSize, jobUUID)
+ self.extendDriveVolume(drive, baseVolUUID, extendCurSize)
# Trigger the collection of stats before returning so that callers
# of getVmStats after this returns will see the new job
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: I632f31e7795ec5d8c6f52a480116b14470c3163f
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Adam Litke <alitke(a)>
Nir Soffer has uploaded a new change for review.
Change subject: libvirtconnection: Replace assert with AssertionError
libvirtconnection: Replace assert with AssertionError
The code wrongly assumed that assert always exists. When running in
optimized mode, the check would be skipped, and instead of getting an
AssertionError, which is the expected error for programmer error
(starting the eventloop twice), we could get a confusing
RuntimeException or RuntimeError from Thread.start (depending on Python
RuntimeError misused in the standard library for all kinds of errors
that do not have builtin errors. It is particularry bad option when used
for usage error.
Change-Id: Icf1564f81f4c1fbf77ccaff6d93c047a02d946da
Signed-off-by: Nir Soffer <nsoffer(a)>
M lib/vdsm/
1 file changed, 2 insertions(+), 1 deletion(-)
git pull ssh:// refs/changes/64/34364/1
diff --git a/lib/vdsm/ b/lib/vdsm/
index 5430c82..009f8b7 100644
--- a/lib/vdsm/
+++ b/lib/vdsm/
@@ -37,7 +37,8 @@
self.__thread = None
def start(self):
- assert not
+ if
+ raise AssertionError("EventLoop is running")
self.__thread = threading.Thread(target=self.__run,
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: Icf1564f81f4c1fbf77ccaff6d93c047a02d946da
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)>
Nir Soffer has uploaded a new change for review.
Change subject: cache: Add caching decorator with invalidation
cache: Add caching decorator with invalidation
The new cache.memoized extends utils.memoized, adding invalidation
Features added:
- An optional "validate" argument. This is a callable invoked each time
the memoized function is called. When the callable returns False, the
cache is invalidated.
- Memoized functions have an "invalidate" method, used to invalidate the
cache during testing.
- file_validator - invalidates the cache when a file changes.
Example usage:
from vdsm.cache import memoized, file_validator
def parse_bigfile():
# Expensive code processing '/bigfile' contents
Change-Id: I6dd8fb29d94286e3e3a3e29b8218501cbdc5c018
Signed-off-by: Nir Soffer <nsoffer(a)>
M lib/vdsm/
A lib/vdsm/
M tests/
A tests/
4 files changed, 366 insertions(+), 0 deletions(-)
git pull ssh:// refs/changes/09/34709/1
diff --git a/lib/vdsm/ b/lib/vdsm/
index b862e71..6f0040d 100644
--- a/lib/vdsm/
+++ b/lib/vdsm/
@@ -23,6 +23,7 @@
dist_vdsmpylib_PYTHON = \ \
+ \ \ \ \
diff --git a/lib/vdsm/ b/lib/vdsm/
new file mode 100644
index 0000000..9806e40
--- /dev/null
+++ b/lib/vdsm/
@@ -0,0 +1,98 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
+import errno
+import os
+import functools
+def memoized(validate=None):
+ """
+ Return a caching decorator supporting invalidation.
+ The decorator accepts an optional validate callable, called each time the
+ memoized function is called. If the validate callable return True, the
+ memoized function will use the cache. If the validate callable return
+ False, the memoized cache is cleared.
+ The memoized function may accept multiple positional arguments. The
+ cache store the result for each combination of arguments. Functions with
+ kwargs are not supported.
+ Memoized functions have an "invalidate" method, used to invalidate the
+ memoized cache during testing.
+ To invalidate the cache when a file changes, use the file_validator from
+ this module.
+ Example usage:
+ from vdsm.cache import memoized, file_validator
+ @memoized(file_validator('/bigfile'))
+ def parse_bigfile():
+ # Expensive code processing '/bigfile' contents
+ """
+ def decorator(f):
+ cache = {}
+ @functools.wraps(f)
+ def wrapper(*args):
+ if validate is not None and not validate():
+ cache.clear()
+ try:
+ value = cache[args]
+ except KeyError:
+ value = cache[args] = f(*args)
+ return value
+ wrapper.invalidate = cache.clear
+ return wrapper
+ return decorator
+class file_validator(object):
+ """
+ I'm a validator returning False when a file has changed since the last
+ validation.
+ """
+ def __init__(self, path):
+ self.path = path
+ self.stats = self.UNKNOWN
+ def __call__(self):
+ try:
+ stats = os.stat(self.path)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ raise
+ stats = self.MISSING
+ else:
+ stats = stats.st_ino, stats.st_size, stats.st_mtime
+ if stats != self.stats:
+ self.stats = stats
+ return False
+ return True
diff --git a/tests/ b/tests/
index 36a1cdd..6fa7e64 100644
--- a/tests/
+++ b/tests/
@@ -26,6 +26,7 @@ \ \ \
+ \ \ \ \
diff --git a/tests/ b/tests/
new file mode 100644
index 0000000..8927b39
--- /dev/null
+++ b/tests/
@@ -0,0 +1,266 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
+import os
+from vdsm.cache import memoized
+from vdsm.cache import file_validator
+from testlib import VdsmTestCase
+from testlib import namedTemporaryDir
+class Validator(object):
+ """ I'm a callable returning a boolean value (self.valid) """
+ def __init__(self):
+ self.valid = True
+ self.count = 0
+ def __call__(self):
+ self.count += 1
+ return self.valid
+class Accessor(object):
+ """ I'm recording how many times a dict was accessed. """
+ def __init__(self, d):
+ self.d = d
+ self.count = 0
+ def get(self, key):
+ self.count += 1
+ return self.d[key]
+class MemoizedTests(VdsmTestCase):
+ def setUp(self):
+ self.values = {'a': 0, 'b': 10, ('a',): 20, ('a', 'b'): 30}
+ def test_no_args(self):
+ accessor = Accessor(self.values)
+ @memoized()
+ def func(key):
+ return accessor.get(key)
+ # Fill the cache
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(func('b'), self.values['b'])
+ self.assertEqual(accessor.count, 2)
+ # Values served now from the cache
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(func('b'), self.values['b'])
+ self.assertEqual(accessor.count, 2)
+ def test_validation(self):
+ accessor = Accessor(self.values)
+ validator = Validator()
+ @memoized(validator)
+ def func(key):
+ return accessor.get(key)
+ # Fill the cache
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(func('b'), self.values['b'])
+ self.assertEqual(accessor.count, 2)
+ self.assertEqual(validator.count, 2)
+ # Values served now from the cache
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(func('b'), self.values['b'])
+ self.assertEqual(accessor.count, 2)
+ self.assertEqual(validator.count, 4)
+ # Values has changed
+ self.values['a'] += 1
+ self.values['b'] += 1
+ # Next call should clear the cache
+ validator.valid = False
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(accessor.count, 3)
+ self.assertEqual(validator.count, 5)
+ # Next call should add next value to cache
+ validator.valid = True
+ self.assertEqual(func('b'), self.values['b'])
+ self.assertEqual(accessor.count, 4)
+ self.assertEqual(validator.count, 6)
+ # Values served now from the cache
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(func('b'), self.values['b'])
+ self.assertEqual(accessor.count, 4)
+ self.assertEqual(validator.count, 8)
+ def test_raise_errors_in_memoized_func(self):
+ accessor = Accessor(self.values)
+ validator = Validator()
+ @memoized(validator)
+ def func(key):
+ return accessor.get(key)
+ # First run should fail, second shold fill the cache
+ self.assertRaises(KeyError, func, 'no such key')
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(accessor.count, 2)
+ self.assertEqual(validator.count, 2)
+ def test_multiple_args(self):
+ accessor = Accessor(self.values)
+ @memoized()
+ def func(*args):
+ return accessor.get(args)
+ # Fill the cache
+ self.assertEqual(func('a'), self.values[('a',)])
+ self.assertEqual(func('a', 'b'), self.values[('a', 'b')])
+ self.assertEqual(accessor.count, 2)
+ # Values served now from the cache
+ self.assertEqual(func('a'), self.values[('a',)])
+ self.assertEqual(func('a', 'b'), self.values[('a', 'b')])
+ self.assertEqual(accessor.count, 2)
+ def test_kwargs_not_supported(self):
+ @memoized()
+ def func(a=None, b=None):
+ pass
+ self.assertRaises(TypeError, func, a=1, b=2)
+ def test_invalidate(self):
+ accessor = Accessor(self.values)
+ @memoized()
+ def func(key):
+ return accessor.get(key)
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(accessor.count, 1)
+ func.invalidate()
+ self.assertEqual(func('a'), self.values['a'])
+ self.assertEqual(accessor.count, 2)
+class FileValidatorTests(VdsmTestCase):
+ def test_no_file(self):
+ with namedTemporaryDir() as tempdir:
+ path = os.path.join(tempdir, 'data')
+ validator = file_validator(path)
+ # Must be False so memoise call the decorated function
+ self.assertEqual(validator(), False)
+ # Since file state did not change, must remain True
+ self.assertEqual(validator(), True)
+ def test_file_created(self):
+ with namedTemporaryDir() as tempdir:
+ path = os.path.join(tempdir, 'data')
+ validator = file_validator(path)
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ with open(path, 'w') as f:
+ f.write('data')
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ def test_file_removed(self):
+ with namedTemporaryDir() as tempdir:
+ path = os.path.join(tempdir, 'data')
+ validator = file_validator(path)
+ with open(path, 'w') as f:
+ f.write('data')
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ os.unlink(path)
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ def test_size_changed(self):
+ with namedTemporaryDir() as tempdir:
+ path = os.path.join(tempdir, 'data')
+ validator = file_validator(path)
+ data = 'old data'
+ with open(path, 'w') as f:
+ f.write(data)
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ with open(path, 'w') as f:
+ f.write(data + ' new data')
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ def test_mtime_changed(self):
+ with namedTemporaryDir() as tempdir:
+ path = os.path.join(tempdir, 'data')
+ validator = file_validator(path)
+ data = 'old data'
+ with open(path, 'w') as f:
+ f.write(data)
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ # Fake timestamp change, as timestamp resolution may not be good
+ # enough when comparing changes during the test.
+ atime = mtime = os.path.getmtime(path) + 1
+ os.utime(path, (atime, mtime))
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ def test_ino_changed(self):
+ with namedTemporaryDir() as tempdir:
+ path = os.path.join(tempdir, 'data')
+ validator = file_validator(path)
+ data = 'old data'
+ with open(path, 'w') as f:
+ f.write(data)
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
+ tmp = path + '.tmp'
+ with open(tmp, 'w') as f:
+ f.write(data)
+ os.rename(tmp, path)
+ self.assertEqual(validator(), False)
+ self.assertEqual(validator(), True)
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6dd8fb29d94286e3e3a3e29b8218501cbdc5c018
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)>
Nir Soffer has uploaded a new change for review.
Change subject: qemuimg: Memoize _supports_qcow2_compat
qemuimg: Memoize _supports_qcow2_compat
We used to run qemu-img twice when creating or converting qcow2 images.
The first run check if qemu-img supports the qcow2 "comapt" option, and
the second run uses the result to format the qemu-img command.
Now we run qemu-img once for "create" and "convert" to learn about its
capabilities, and use the cached value on the next runs. If qemu-img
executable is modified, we drop the cache, in case a new version was
installed with different capabilities.
Change-Id: Ic63f5e8c06993df8e4066bf7ac2dabfb4b4bdbfb
Signed-off-by: Nir Soffer <nsoffer(a)>
M lib/vdsm/
M tests/
2 files changed, 8 insertions(+), 0 deletions(-)
git pull ssh:// refs/changes/11/34711/1
diff --git a/lib/vdsm/ b/lib/vdsm/
index cf428b2..f295acc 100644
--- a/lib/vdsm/
+++ b/lib/vdsm/
@@ -23,6 +23,7 @@
import signal
from . import utils
+from . import cache
_qemuimg = utils.CommandPath("qemu-img",
"/usr/bin/qemu-img",) # Fedora, EL6
@@ -221,6 +222,7 @@
raise QImgError(rc, out, err)
def _supports_qcow2_compat(command):
qemu-img "create" and "convert" commands support a "compat" option in
diff --git a/tests/ b/tests/
index 813a497..a642374 100644
--- a/tests/
+++ b/tests/
@@ -118,6 +118,9 @@
class CreateTests(TestCaseBase):
+ def setUp(self):
+ qemuimg._supports_qcow2_compat.invalidate()
def test_no_format(self):
def create(cmd, **kw):
expected = [QEMU_IMG, 'create', 'image']
@@ -161,6 +164,9 @@
class ConvertTests(TestCaseBase):
+ def setUp(self):
+ qemuimg._supports_qcow2_compat.invalidate()
def test_no_format(self):
def convert(cmd, **kw):
expected = [QEMU_IMG, 'convert', '-t', 'none', 'src', 'dst']
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic63f5e8c06993df8e4066bf7ac2dabfb4b4bdbfb
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)>
Federico Simoncelli has uploaded a new change for review.
Change subject: vdsm: add eventfd and EventFile synchronization
vdsm: add eventfd and EventFile synchronization
Change-Id: I0d237f13c42b1f4505c90d30c6d3c3ecbd1e9fa7
Signed-off-by: Federico Simoncelli <fsimonce(a)>
M lib/vdsm/
A lib/vdsm/
M tests/
A tests/
4 files changed, 251 insertions(+), 0 deletions(-)
git pull ssh:// refs/changes/87/33687/1
diff --git a/lib/vdsm/ b/lib/vdsm/
index 4bebf28..e712cad 100644
--- a/lib/vdsm/
+++ b/lib/vdsm/
@@ -25,6 +25,7 @@ \ \ \
+ \ \ \ \
diff --git a/lib/vdsm/ b/lib/vdsm/
new file mode 100644
index 0000000..b2a7084
--- /dev/null
+++ b/lib/vdsm/
@@ -0,0 +1,140 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
+This module provides the support for eventfd(2).
+More information about eventfd and usage examples can be found in the
+eventfd(2) man page.
+The EventFile class provides a single synchronization object exposing
+the python Event interface and associated eventfds.
+The eventfd() context manager returns a file descriptor that can be
+used to provide the event notice to select, poll and epoll, e.g.
+ import os
+ import sys
+ import select
+ import threading
+ import time
+ from vdsm.eventfd import EventFile, DATASIZE
+ e = EventFile()
+ p = select.epoll()
+ threading.Timer(5, e.set).start()
+ with e.eventfd() as efd:
+ p.register(efd, select.EPOLLIN)
+ p.register(sys.stdin.fileno(), select.EPOLLIN)
+ print "Echoing lines until event is received"
+ event_received = False
+ while not event_received:
+ for fileno, event in p.poll():
+ if not event & select.EPOLLIN:
+ continue
+ if fileno == efd:
+ event_received = True
+ elif fileno == sys.stdin.fileno():
+ print, 1024),
+ print "Event received!"
+The Event set() semantic is preserved in the eventfd context manager:
+if the event is set then the eventfd already contains the notification.
+This is both to maintain the semantic and to avoid possible races as:
+ if not e.is_set():
+ with e.eventfd() as efd:
+ ...
+import os
+import ctypes
+import threading
+from contextlib import contextmanager
+_libc = ctypes.CDLL('', use_errno=True)
+EFD_CLOEXEC = 02000000 # os.O_CLOEXEC in python 3.3
+EFD_SEMAPHORE = 00000001
+DATASIZE = ctypes.sizeof(ctypes.c_ulonglong)
+def eventfd(initval, flags):
+ return _libc.eventfd(initval, flags)
+class EventFile(object):
+ def __init__(self, event=None):
+ self.__lock = threading.Lock()
+ self.__fds = set()
+ self.__event = event or threading.Event()
+ @staticmethod
+ def __fire_event(fd):
+ os.write(fd, ctypes.c_ulonglong(1))
+ def open_eventfd(self):
+ with self.__lock:
+ fd = eventfd(0, 0)
+ self.__fds.add(fd)
+ if self.__event.is_set():
+ self.__fire_event(fd)
+ return fd
+ @contextmanager
+ def eventfd(self):
+ fd = self.open_eventfd()
+ yield fd
+ with self.__lock:
+ self.__fds.remove(fd)
+ os.close(fd)
+ def isSet(self):
+ return self.__event.isSet()
+ is_set = isSet
+ def set(self):
+ with self.__lock:
+ self.__event.set()
+ for fd in self.__fds:
+ self.__fire_event(fd)
+ def clear(self):
+ self.__event.clear()
+ def wait(self, timeout=None, balancing=True):
+ self.__event.wait(timeout)
diff --git a/tests/ b/tests/
index 449d7b1..120712e 100644
--- a/tests/
+++ b/tests/
@@ -31,6 +31,7 @@ \ \ \
+ \ \ \ \
diff --git a/tests/ b/tests/
new file mode 100644
index 0000000..be15248
--- /dev/null
+++ b/tests/
@@ -0,0 +1,109 @@
+# Copyright 2014 Red Hat, Inc.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+# Refer to the README and COPYING files for full details of the license
+import os
+import select
+from vdsm.eventfd import EventFile, DATASIZE
+from import timed, raises, TimeExpired
+def test_set():
+ e = EventFile()
+ e.set()
+ assert e.is_set()
+ assert e.isSet()
+def text_clear():
+ e = EventFile()
+ e.set()
+ assert e.is_set()
+ e.clear()
+ assert not e.is_set()
+def test_wait_set():
+ e = EventFile()
+ e.set()
+ e.wait(WAIT_TIMEOUT)
+def test_wait_noset():
+ e = EventFile()
+ e.wait(WAIT_TIMEOUT)
+def test_eventfd_earlyset():
+ e = EventFile()
+ e.set()
+ with e.eventfd() as fd:
+ assert len(__select_and_read(fd)) == DATASIZE
+def test_eventfd_lateset():
+ e = EventFile()
+ with e.eventfd() as fd:
+ e.set()
+ assert len(__select_and_read(fd)) == DATASIZE
+def test_eventfd_noset():
+ e = EventFile()
+ with e.eventfd() as fd:
+ assert len(__select_and_read(fd)) != DATASIZE
+def test_eventfd_multiple():
+ e = EventFile()
+ e.set()
+ with e.eventfd() as fd1:
+ assert len(__select_and_read(fd1)) == DATASIZE
+ with e.eventfd() as fd2:
+ assert len(__select_and_read(fd2)) == DATASIZE
+ with e.eventfd() as fd3:
+ assert len(__select_and_read(fd3)) == DATASIZE
+def test_eventfd_clear():
+ e = EventFile()
+ e.set()
+ e.clear()
+ with e.eventfd() as fd:
+ assert len(__select_and_read(fd)) != DATASIZE
+def __select_and_read(fd):
+ rd, wr, ex =,), (), (), WAIT_TIMEOUT)
+ if fd in rd:
+ return, DATASIZE)
+ return ''
To view, visit
To unsubscribe, visit
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0d237f13c42b1f4505c90d30c6d3c3ecbd1e9fa7
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Federico Simoncelli <fsimonce(a)>