Change in vdsm[master]: jsonrpc: executor based thread factory
by Piotr Kliczewski
Piotr Kliczewski has uploaded a new change for review.
Change subject: jsonrpc: executor based thread factory
......................................................................
jsonrpc: executor based thread factory
Creating new thread for every request is not efficient so we introduce
usage of the executor for request processing.
Change-Id: I56b307633a8bf7e4aad8f87cc97a4129c9ed0970
Signed-off-by: pkliczewski <piotr.kliczewski(a)gmail.com>
---
M lib/vdsm/config.py.in
M vdsm/rpc/bindingjsonrpc.py
2 files changed, 49 insertions(+), 5 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/59/43759/1
diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in
index 3221aab..bffa949 100644
--- a/lib/vdsm/config.py.in
+++ b/lib/vdsm/config.py.in
@@ -201,6 +201,12 @@
('connection_stats_timeout', '3600',
'Time in seconds defining how frequently we log transport stats'),
+
+ ('worker_threads', '8',
+ 'Number of worker threads to serve jsonrpc server.'),
+
+ ('tasks_per_worker', '100',
+ 'Max number of tasks which can be queued per workers.'),
]),
# Section: [mom]
diff --git a/vdsm/rpc/bindingjsonrpc.py b/vdsm/rpc/bindingjsonrpc.py
index 80e3388..ca15fde 100644
--- a/vdsm/rpc/bindingjsonrpc.py
+++ b/vdsm/rpc/bindingjsonrpc.py
@@ -19,18 +19,53 @@
from yajsonrpc import JsonRpcServer
from yajsonrpc.stompreactor import StompReactor
+from vdsm import executor
+from vdsm import schedule
+from vdsm.config import config
+from vdsm.utils import monotonic_time
-def _simpleThreadFactory(func):
- t = threading.Thread(target=func)
- t.setDaemon(False)
- t.start()
+
+# TODO test what should be the default values
+_THREADS = config.getint('vars', 'worker_threads')
+_TASK_PER_WORKER = config.getint('vars', 'tasks_per_worker')
+_TASKS = _THREADS * _TASK_PER_WORKER
+
+
+_scheduler = schedule.Scheduler(name="jsonrpc.Scheduler",
+ clock=monotonic_time)
+
+_executor = executor.Executor(name="jsonrpc.Executor",
+ workers_count=_THREADS,
+ max_tasks=_TASKS,
+ scheduler=_scheduler)
+
+
+class _RequestWorker(object):
+
+ _log = logging.getLogger("jsonrpc._RequestWorker")
+
+ def __init__(self, func):
+ self._func = func
+
+ def __call__(self):
+ try:
+ self._func()
+ except Exception:
+ self._log.exception("%s processing request failed", self._func)
+
+
+def _executorFactory(func):
+ _executor.dispatch(_RequestWorker(func))
class BindingJsonRpc(object):
log = logging.getLogger('BindingJsonRpc')
def __init__(self, bridge, subs, timeout):
- self._server = JsonRpcServer(bridge, timeout, _simpleThreadFactory)
+ _scheduler.start()
+ _executor.start()
+
+ self._server = JsonRpcServer(bridge, timeout, _executorFactory)
self._reactor = StompReactor(subs)
self.startReactor()
@@ -60,3 +95,6 @@
def stop(self):
self._server.stop()
self._reactor.stop()
+
+ _scheduler.stop()
+ _executor.stop()
--
To view, visit https://gerrit.ovirt.org/43759
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I56b307633a8bf7e4aad8f87cc97a4129c9ed0970
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <piotr.kliczewski(a)gmail.com>
8 years, 6 months
Change in vdsm[master]: supervdsm: fix trigger docstring
by Martin Polednik
Martin Polednik has uploaded a new change for review.
Change subject: supervdsm: fix trigger docstring
......................................................................
supervdsm: fix trigger docstring
Change-Id: Ief9af5dd73fc38b366fb4903b313bc420e2a4172
Signed-off-by: Martin Polednik <mpolednik(a)redhat.com>
---
M lib/vdsm/udevadm.py
1 file changed, 6 insertions(+), 6 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/39/45939/1
diff --git a/lib/vdsm/udevadm.py b/lib/vdsm/udevadm.py
index 35970fe..88c19ee 100644
--- a/lib/vdsm/udevadm.py
+++ b/lib/vdsm/udevadm.py
@@ -67,29 +67,29 @@
Arguments:
- attr_matches List of 2-tuples that contain attribute name and
- it's value. These are expanded like this:
+ attr_matches An iterable of attribute name, value pairs.
+ These are expanded like this:
[('a', 'b'), ('c', 'd')] ~>
--attr-match=a=b --attr-match=c=d
- and causes only events from devices that match
+ and cause only events from devices that match
given attributes to be triggered.
- property_matches Similar to attr_matches. Expects list of 2-tuples
+ property_matches Similar to attr_matches. Expects an iterable of pairs
that expand in similar fashion, that is
[('a', 'b'), ('c', 'd')] ~>
--property-match=a=b --property-match=c=d
- and causes only events from devices that match
+ and cause only events from devices that match
given property to be triggered.
subsystem_matches Expects an iterable of subsystems.
('a', 'b') ~> --subsystem-match=a --subsystem-match=b
- Causes only events related to specified subsystem to
+ Cause only events related to specified subsystem to
be triggered.
'''
_run_command(['control', '--reload'])
--
To view, visit https://gerrit.ovirt.org/45939
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ief9af5dd73fc38b366fb4903b313bc420e2a4172
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Martin Polednik <mpolednik(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: periodic: extract required_on staticmethod
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: periodic: extract required_on staticmethod
......................................................................
periodic: extract required_on staticmethod
It possible, that certains periodic operations should not be done
on some VMs, perhaps temporarily.
To check this, Operations exported a "required" attribute.
Problem is, this attribute was instance attribute, forcing the
periodic code to do a clumsy dance like:
- assume every operation is required on each vm
- create per-vm Operation instances
- check if a give operation should indeed run on a given vm
- if the operation should not run, discard it.
This patch eliminates this clumsiness translating "required"
attribute into a "required_on" staticmethod.
Now the periodic code can filter out unneeded Operation instances
in a much simpler way.
Change-Id: I2ef0750948c905a87750b99adf702c4c08fc925d
Bug-Url: https://bugzilla.redhat.com/1250839
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M vdsm/virt/periodic.py
1 file changed, 29 insertions(+), 35 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/33/46833/1
diff --git a/vdsm/virt/periodic.py b/vdsm/virt/periodic.py
index 68f8888..050c789 100644
--- a/vdsm/virt/periodic.py
+++ b/vdsm/virt/periodic.py
@@ -64,7 +64,8 @@
def per_vm_operation(func, period):
disp = VmDispatcher(
- cif.getVMs, _executor, func, _timeout_from(period))
+ cif.getVMs, _executor, func, _timeout_from(period),
+ func.required_on)
return Operation(disp, period, scheduler)
_operations = [
@@ -204,7 +205,8 @@
_log = logging.getLogger("virt.periodic.VmDispatcher")
- def __init__(self, get_vms, executor, create, timeout):
+ def __init__(self, get_vms, executor, create, timeout,
+ required_on=lambda vm_obj: True):
"""
get_vms: callable which will return a dict which maps
vm_ids to vm_instances
@@ -216,12 +218,15 @@
self._executor = executor
self._create = create
self._timeout = timeout
+ self._required_on = required_on
self._tracker = VmTracker()
def __call__(self):
# python 3 future compatibility: we want to materialize the sequence
vm_objs = set(self._get_vms().values())
- candidates = set(vm.id for vm in vm_objs)
+ candidates = set(vm_obj.id
+ for vm_obj in vm_objs
+ if self._required_on(vm_obj))
available = set(self._tracker.reserve_available(candidates))
skipped = set(candidates - available)
@@ -233,27 +238,11 @@
# still OK if occasional misdetection occours, but we
# definitely want to avoid known-bad situation and to
# needlessly overload libvirt.
- if vm_obj.id not in available:
- skipped.append(vm_obj.id)
- continue
-
+ op = self._create(self._tracker, vm_obj)
try:
- op = self._create(self._tracker, vm_obj)
-
- if not op.required:
- self._tracker.release(vm_obj.id)
- continue
-
- except Exception:
- self._tracker.release(vm_obj.id)
- # we want to make sure to have VM UUID logged
- self._log.exception("while dispatching %s to VM '%s'",
- self._create, vm_obj.id)
- else:
- try:
- self._executor.dispatch(op, self._timeout)
- except executor.TooManyTasks:
- skipped.add(vm_obj.id)
+ self._executor.dispatch(op, self._timeout)
+ except executor.TooManyTasks:
+ skipped.add(vm_obj.id)
if skipped:
self._log.warning('could not run %s on %s',
@@ -265,9 +254,14 @@
class _RunnableOnVm(object):
+
def __init__(self, tracker, vm):
self._tracker = tracker
self._vm = vm
+
+ @staticmethod
+ def required_on(vm):
+ return True
def __call__(self):
try:
@@ -286,10 +280,10 @@
class UpdateVolumes(_RunnableOnVm):
- @property
- def required(self):
+ @staticmathod
+ def required_on(vm):
# Avoid queries from storage during recovery process
- return self._vm.isDisksStatsCollectionEnabled()
+ return vm.isDisksStatsCollectionEnabled()
def _execute(self):
for drive in self._vm.getDiskDevices():
@@ -301,9 +295,9 @@
class NumaInfoMonitor(_RunnableOnVm):
- @property
- def required(self):
- return self._vm.hasGuestNumaNode
+ @staticmethod
+ def required_on(vm):
+ return vm.hasGuestNumaNode
def _execute(self):
self._vm.updateNumaInfo()
@@ -311,14 +305,14 @@
class BlockjobMonitor(_RunnableOnVm):
- @property
- def required(self):
+ @staticmethod
+ def required_on(vm):
# For performance reasons, we must avoid as much
# as possible to create per-vm executor tasks, even
# though they will do nothing but a few check and exit
# early, as they do if a VM doesn't have Block Jobs to
# monitor (most often true).
- return self._vm.hasVmJobs
+ return vm.hasVmJobs
def _execute(self):
self._vm.updateVmJobs()
@@ -326,10 +320,10 @@
class DriveWatermarkMonitor(_RunnableOnVm):
- @property
- def required(self):
+ @staticmethod
+ def required_on(vm):
# Avoid queries from storage during recovery process
- return self._vm.isDisksStatsCollectionEnabled()
+ return vm.isDisksStatsCollectionEnabled()
def _execute(self):
self._vm.extendDrivesIfNeeded()
--
To view, visit https://gerrit.ovirt.org/46833
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ef0750948c905a87750b99adf702c4c08fc925d
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: virt: cosmetic fixes
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: virt: cosmetic fixes
......................................................................
virt: cosmetic fixes
Harmless stupid fixes to make the code look prettier.
Change-Id: If57f99257ae1fc36541d7d563e71e26ac9db4c54
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M vdsm/virt/periodic.py
M vdsm/virt/sampling.py
2 files changed, 9 insertions(+), 7 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/97/44797/1
diff --git a/vdsm/virt/periodic.py b/vdsm/virt/periodic.py
index b9c30c3..9781afe 100644
--- a/vdsm/virt/periodic.py
+++ b/vdsm/virt/periodic.py
@@ -84,14 +84,14 @@
# Job monitoring need QEMU monitor access.
per_vm_operation(
- BlockjobMonitor,
+ BlockJobMonitor,
config.getint('vars', 'vm_sample_jobs_interval')),
# libvirt sampling using bulk stats can block, but unresponsive
- # domains are handled inside VMBulkSampler for performance reasons;
+ # domains are handled inside BulkStatsMonitor for performance reasons;
# thus, does not need dispatching.
Operation(
- sampling.VMBulkSampler(
+ sampling.BulkStatsMonitor(
libvirtconnection.get(cif),
cif.getVMs,
sampling.stats_cache),
@@ -299,7 +299,7 @@
self._vm.updateNumaInfo()
-class BlockjobMonitor(_RunnableVmOperation):
+class BlockJobMonitor(_RunnableVmOperation):
@property
def required(self):
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index f17a5b3..15d8b71 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -497,7 +497,7 @@
_TIMEOUT = 40.0
-class VMBulkSampler(object):
+class BulkStatsMonitor(object):
def __init__(self, conn, get_vms, stats_cache,
stats_flags=0, timeout=_TIMEOUT):
self._conn = conn
@@ -506,14 +506,16 @@
self._stats_flags = stats_flags
self._skip_doms = ExpiringCache(timeout)
self._sampling = threading.Semaphore() # used as glorified counter
- self._log = logging.getLogger("virt.sampling.VMBulkSampler")
+ self._log = logging.getLogger("virt.sampling.BulkStatsMonitor")
def __call__(self):
timestamp = self._stats_cache.clock()
# we are deep in the hot path. bool(ExpiringCache)
# *is* costly so we should avoid it if we can.
fast_path = (
- self._sampling.acquire(blocking=False) and not self._skip_doms)
+ self._sampling.acquire(blocking=False) and
+ not self._skip_doms
+ )
try:
if fast_path:
# This is expected to be the common case.
--
To view, visit https://gerrit.ovirt.org/44797
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If57f99257ae1fc36541d7d563e71e26ac9db4c54
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: tests: add tests for sampling.SampleVMs
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: tests: add tests for sampling.SampleVMs
......................................................................
tests: add tests for sampling.SampleVMs
The SampleVMs class is responisble for handling bulk stats sampling,
and includes all the logic to deal with blocked domains.
However, this logic has the time among its variables, and it is
built on quite some assumptions, like running on an executor and so on.
All those factors make it difficult to test the bare logic without
some (minor) cheating and without some significant faking.
However, the nasty spots should be the ones more tested, not less
tested, so this patch adds the initial batch of unit tests for the
sampling logic.
Change-Id: Id66dbd420ca29d08ae4063dc83b858be34b8940f
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M tests/Makefile.am
A tests/bulkSamplingTests.py
2 files changed, 213 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/53/40053/1
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 7fa1c09..4f984dd 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -26,6 +26,7 @@
alignmentScanTests.py \
blocksdTests.py \
bridgeTests.py \
+ bulkSamplingTests.py \
cPopenTests.py \
capsTests.py \
clientifTests.py \
diff --git a/tests/bulkSamplingTests.py b/tests/bulkSamplingTests.py
new file mode 100644
index 0000000..5c536ff
--- /dev/null
+++ b/tests/bulkSamplingTests.py
@@ -0,0 +1,212 @@
+#
+# Copyright 2015 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import virt.sampling as sampling
+
+from testlib import VdsmTestCase as TestCaseBase
+
+
+# these tests are complex and fragile. In order to maximize
+# readability and robustness, intentionally includes tailor-made
+# minimal Fakes to support those tests.
+
+
+class FakeStatsCache(object):
+ def __init__(self):
+ self.data = []
+ self._clock = 0 # private only for API clash
+
+ def clock(self):
+ return self._clock
+
+ def set_clock(self, value):
+ self._clock = value
+
+ def put(self, bulk_stats, timestamp):
+ self.data.append((bulk_stats, timestamp))
+
+
+class FakeDomain(object):
+ def __init__(self, name):
+ self._name = name
+ self._dom = self # yep, this is an ugly hack
+
+ def UUIDString(self):
+ # yes this is cheating
+ return self._name
+
+
+class FakeVM(object):
+ def __init__(self, vmid):
+ self.id = vmid
+ self._dom = FakeDomain(vmid)
+ self.ready_for_commands = True
+
+ def isDomainReadyForCommands(self):
+ return self.ready_for_commands
+
+
+class CollectMode(object):
+ NONE = 0
+ SLOW = 1
+ FAST = 2
+
+
+class FakeConnection(object):
+ def __init__(self, vms):
+ self.vms = vms
+ # testable fields
+ self.executions = []
+
+ def getVMs(self):
+ return self.vms
+
+ def getAllDomainStats(self, flags=0):
+ self.executions.append(CollectMode.FAST)
+ return [(vm._dom, {'vmid': vm._dom.UUIDString()})
+ for vm in self.vms.values()]
+
+ def domainListGetStats(self, doms, flags=0):
+ self.executions.append(CollectMode.SLOW)
+ return [(dom, {'vmid': dom.UUIDString()})
+ for dom in doms
+ if dom.UUIDString() in self.vms]
+
+
+class SampleVMsTests(TestCaseBase):
+
+ def test_collect_fast_path_as_default(self):
+ cache = FakeStatsCache()
+ vms = {
+ '1': FakeVM('1'),
+ '2': FakeVM('2')
+ }
+ conn = FakeConnection(vms)
+
+ sampler = sampling.SampleVMs(conn, conn.getVMs, cache)
+ sampler()
+
+ self.assertEqual(conn.executions[0], CollectMode.FAST)
+ self.assertVmsInBulkStats(vms.keys(), cache.data[0][0])
+
+ def test_collect_slow_path_after_blocked(self):
+ cache = FakeStatsCache()
+ vms = {
+ '1': FakeVM('1'),
+ '2': FakeVM('2')
+ }
+ conn = FakeConnection(vms)
+
+ sampler = sampling.SampleVMs(conn, conn.getVMs, cache)
+ # this is cheating, but setting up proper blocking is costly
+ # and fragile.
+ sampler._sampling = True
+ sampler()
+
+ self.assertEqual(conn.executions[0], CollectMode.SLOW)
+ # SampleVMs doesn't care in the slow path if all registed
+ # VMs are actually responsive or not. Quite the opposite, it is
+ # good sign if all VMs are now responsive, it means we can
+ # restart using getAllDomainStats on the next cycle.
+ self.assertVmsInBulkStats(vms.keys(), cache.data[0][0])
+
+ def test_collect_unresponsive_vm(self):
+ cache = FakeStatsCache()
+ vms = {
+ '1': FakeVM('1'),
+ '2': FakeVM('2'),
+ '3': FakeVM('3')
+ }
+ vms['2'].ready_for_commands = False
+ conn = FakeConnection(vms)
+
+ sampler = sampling.SampleVMs(conn, conn.getVMs, cache)
+ # this is cheating, but setting up proper blocking is costly
+ # and fragile.
+ sampler._sampling = True
+ sampler()
+
+ self.assertEqual(conn.executions[0], CollectMode.SLOW)
+ self.assertVmsInBulkStats(('1', '3'), cache.data[0][0])
+ self.assertVmStuck(sampler, '2')
+
+ def test_slow_collect_while_unresponsive_vm(self):
+ cache = FakeStatsCache()
+ vms = {
+ '1': FakeVM('1'),
+ '2': FakeVM('2'),
+ '3': FakeVM('3')
+ }
+ vms['2'].ready_for_commands = False
+ conn = FakeConnection(vms)
+
+ sampler = sampling.SampleVMs(conn, conn.getVMs, cache)
+ sampler._sampling = True # cheat to bootstrap slow collection
+ sampler()
+ sampler()
+ sampler()
+
+ self.assertSamplingExecutions(
+ conn.executions,
+ (CollectMode.SLOW, CollectMode.SLOW, CollectMode.SLOW))
+ for datum in cache.data:
+ self.assertVmsInBulkStats(('1', '3'), datum[0])
+ self.assertVmStuck(sampler, '2')
+
+ def test_fast_collect_once_vm_responsive(self):
+ cache = FakeStatsCache()
+ vms = {
+ '1': FakeVM('1'),
+ '2': FakeVM('2'),
+ '3': FakeVM('3')
+ }
+ vms['2'].ready_for_commands = False
+ conn = FakeConnection(vms)
+
+ sampler = sampling.SampleVMs(conn, conn.getVMs, cache)
+ sampler._sampling = True # cheat to bootstrap slow collection
+ sampler()
+ sampler()
+ vms['2'].ready_for_commands = True
+ sampler._skip_doms.clear() # cheating, again
+ sampler()
+
+ self.assertSamplingExecutions(
+ conn.executions,
+ (CollectMode.SLOW, CollectMode.SLOW, CollectMode.FAST))
+ for datum in cache.data[:-1]:
+ self.assertVmsInBulkStats(('1', '3'), datum[0])
+ self.assertVmsInBulkStats(vms.keys(), cache.data[-1][0])
+
+ def assertSamplingExecutions(self, executions, expected):
+ for done, exp in zip(executions[-len(expected):], expected):
+ self.assertEqual(done, exp)
+
+ def assertVmsInBulkStats(self, vm_ids, bulk_stats):
+ """
+ checks only for the presence of stats. Stats data is fake,
+ so ignore it as meaningless.
+ """
+ self.assertEqual(len(vm_ids), len(bulk_stats))
+ for vm_id in vm_ids:
+ self.assertIn(vm_id, bulk_stats)
+
+ def assertVmStuck(self, sampler, vm_id):
+ self.assertTrue(sampler._skip_doms.get(vm_id))
--
To view, visit https://gerrit.ovirt.org/40053
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id66dbd420ca29d08ae4063dc83b858be34b8940f
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: janitorial: sslutils, kaxmlrpclib: drop python-2.6-specific ...
by Dan Kenigsberg
Dan Kenigsberg has uploaded a new change for review.
Change subject: janitorial: sslutils, kaxmlrpclib: drop python-2.6-specific code
......................................................................
janitorial: sslutils, kaxmlrpclib: drop python-2.6-specific code
Change-Id: Icdb4ba3a52a144d7d28d9d8adf36e8b0621f10dd
Signed-off-by: Dan Kenigsberg <danken(a)redhat.com>
---
M lib/vdsm/sslutils.py
M vdsm/kaxmlrpclib.py
2 files changed, 12 insertions(+), 64 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/64/40964/1
diff --git a/lib/vdsm/sslutils.py b/lib/vdsm/sslutils.py
index 4b863a7..7e9319a 100644
--- a/lib/vdsm/sslutils.py
+++ b/lib/vdsm/sslutils.py
@@ -112,52 +112,13 @@
self._timeout = timeout
def make_connection(self, host):
- """Return VerifyingHTTPS object that is aware of ca_certs, and will
- create VerifyingHTTPSConnection.
- In Python 2.7, return VerifyingHTTPSConnection object
- """
+ """Return a VerifyingHTTPSConnection object"""
chost, self._extra_headers, x509 = self.get_host_info(host)
- if hasattr(xmlrpclib.SafeTransport, "single_request"): # Python 2.7
- return VerifyingHTTPSConnection(
- chost, None, key_file=self.key_file, strict=None,
- cert_file=self.cert_file, timeout=self._timeout,
- ca_certs=self.ca_certs,
- cert_reqs=self.cert_reqs)
- else:
- return VerifyingHTTPS(
- chost, None, key_file=self.key_file,
- cert_file=self.cert_file, timeout=self._timeout,
- ca_certs=self.ca_certs,
- cert_reqs=self.cert_reqs)
-
-
-class VerifyingHTTPS(httplib.HTTPS):
- _connection_class = VerifyingHTTPSConnection
-
- def __init__(self, host='', port=None, key_file=None, cert_file=None,
- strict=None, timeout=SOCKET_DEFAULT_TIMEOUT,
- ca_certs=None, cert_reqs=ssl.CERT_REQUIRED):
- """A ca_cert-aware HTTPS object,
- that creates a VerifyingHTTPSConnection
- """
- # provide a default host, pass the X509 cert info
-
- # urf. compensate for bad input.
- if port == 0:
- port = None
- self._setup(self._connection_class(host=host,
- port=port,
- key_file=key_file,
- cert_file=cert_file,
- strict=strict,
- timeout=timeout,
- ca_certs=ca_certs,
- cert_reqs=cert_reqs))
-
- # we never actually use these for anything, but we keep them
- # here for compatibility with post-1.5.2 CVS.
- self.key_file = key_file
- self.cert_file = cert_file
+ return VerifyingHTTPSConnection(
+ chost, None, key_file=self.key_file, strict=None,
+ cert_file=self.cert_file, timeout=self._timeout,
+ ca_certs=self.ca_certs,
+ cert_reqs=self.cert_reqs)
class SSLHandshakeDispatcher(object):
diff --git a/vdsm/kaxmlrpclib.py b/vdsm/kaxmlrpclib.py
index 6374726..e5ff677 100644
--- a/vdsm/kaxmlrpclib.py
+++ b/vdsm/kaxmlrpclib.py
@@ -57,10 +57,7 @@
class TcpkeepTransport(xmlrpclib.Transport):
def make_connection(self, host):
- if hasattr(xmlrpclib.Transport, "single_request"): # Python 2.7
- return TcpkeepHTTPConnection(host)
- else:
- return TcpkeepHTTP(host)
+ return TcpkeepHTTPConnection(host)
class TcpkeepHTTPConnection(httplib.HTTPConnection):
@@ -126,17 +123,11 @@
def make_connection(self, host):
chost, self._extra_headers, x509 = self.get_host_info(host)
- if hasattr(xmlrpclib.SafeTransport, "single_request"): # Python 2.7
- return TcpkeepHTTPSConnection(
- chost, None, key_file=self.key_file, strict=None,
- timeout=CONNECTTIMEOUT,
- cert_file=self.cert_file, ca_certs=self.ca_certs,
- cert_reqs=self.cert_reqs)
- else:
- return TcpkeepHTTPS(
- chost, None, key_file=self.key_file,
- cert_file=self.cert_file, ca_certs=self.ca_certs,
- cert_reqs=self.cert_reqs)
+ return TcpkeepHTTPSConnection(
+ chost, None, key_file=self.key_file, strict=None,
+ timeout=CONNECTTIMEOUT,
+ cert_file=self.cert_file, ca_certs=self.ca_certs,
+ cert_reqs=self.cert_reqs)
class TcpkeepHTTPSConnection(sslutils.VerifyingHTTPSConnection):
@@ -158,7 +149,3 @@
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, KEEPIDLE)
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, KEEPINTVL)
self.sock.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, KEEPCNT)
-
-
-class TcpkeepHTTPS(sslutils.VerifyingHTTPS):
- _connection_class = TcpkeepHTTPSConnection
--
To view, visit https://gerrit.ovirt.org/40964
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Icdb4ba3a52a144d7d28d9d8adf36e8b0621f10dd
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Dan Kenigsberg <danken(a)redhat.com>
8 years, 7 months
Change in vdsm[master]: tests: functional - convert to run over jsonrpc
by ykaplan@redhat.com
Yeela Kaplan has uploaded a new change for review.
Change subject: tests: functional - convert to run over jsonrpc
......................................................................
tests: functional - convert to run over jsonrpc
will run functional tests by default over jsonrpc
Change-Id: Iaba1e2811f010e4509a658acef8040ad8f39cece
Signed-off-by: Yeela Kaplan <ykaplan(a)redhat.com>
---
M lib/vdsm/jsonrpcvdscli.py
M lib/vdsm/response.py
M tests/functional/utils.py
3 files changed, 58 insertions(+), 16 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/89/45789/1
diff --git a/lib/vdsm/jsonrpcvdscli.py b/lib/vdsm/jsonrpcvdscli.py
index 21e135c..606bbef 100644
--- a/lib/vdsm/jsonrpcvdscli.py
+++ b/lib/vdsm/jsonrpcvdscli.py
@@ -33,17 +33,34 @@
_COMMAND_CONVERTER = {
- 'ping': 'Host.ping',
+ 'addNetwork': 'Host.addNetwork',
+ 'create': 'VM.create',
+ 'delNetwork': 'Host.delNetwork',
'destroy': 'VM.destroy',
+ 'editNetwork': 'Host.editNetwork',
+ 'fullList': 'Host.getVMFullList',
+ 'getAllVmStats': 'Host.getAllVmStats',
+ 'getVdsCapabilities': 'Host.getCapabilities',
+ 'getVdsStats': 'Host.getStats',
'getVmStats': 'VM.getStats',
+ 'list': 'Host.getVMList',
'migrationCreate': 'VM.migrationCreate',
+ 'ping': 'Host.ping',
+ 'setBalloonTarget': 'VM.setBalloonTarget',
+ 'setCpuTunePeriod': 'VM.setCpuTunePeriod',
+ 'setCpuTuneQuota': 'VM.setCpuTuneQuota',
+ 'setMOMPolicy': 'Host.setMOMPolicy',
+ 'setSafeNetworkConfig': 'Host.setSafeNetworkConfig',
+ 'setupNetworks': 'Host.setupNetworks',
+ 'updateVmPolicy': 'VM.updateVmPolicy',
}
class _Server(object):
- def __init__(self, client):
+ def __init__(self, client, compat):
self._client = client
+ self._compat = compat
def _callMethod(self, methodName, *args):
try:
@@ -64,6 +81,9 @@
return response.error_raw(resp.error["code"],
resp.error["message"])
+ if not self._compat:
+ return response.success_raw(resp.result)
+
if resp.result and resp.result is not True:
# None is translated to True inside our JSONRPC implementation
return response.success(**resp.result)
@@ -72,6 +92,11 @@
def migrationCreate(self, params):
return self._callMethod('migrationCreate',
+ params['vmId'],
+ params)
+
+ def create(self, params):
+ return self._callMethod('create',
params['vmId'],
params)
@@ -107,7 +132,7 @@
def connect(requestQueue, stompClient=None,
host=None, port=None,
useSSL=None,
- responseQueue=None):
+ responseQueue=None, compat=True):
if not stompClient:
client = _create(requestQueue,
host, port, useSSL,
@@ -119,4 +144,4 @@
str(uuid4())
)
- return _Server(client)
+ return _Server(client, compat)
diff --git a/lib/vdsm/response.py b/lib/vdsm/response.py
index 62ed49e..160898f 100644
--- a/lib/vdsm/response.py
+++ b/lib/vdsm/response.py
@@ -41,6 +41,21 @@
return kwargs
+def success_raw(result=None, message=None):
+ ret = {
+ 'status':
+ {
+ "code": doneCode["code"],
+ "message": message or doneCode["message"],
+ }
+ }
+
+ if result:
+ ret['result'] = result
+
+ return ret
+
+
def error(name, message=None):
status = errCode[name]["status"]
return {
diff --git a/tests/functional/utils.py b/tests/functional/utils.py
index 6bd8dd9..b8bdce6 100644
--- a/tests/functional/utils.py
+++ b/tests/functional/utils.py
@@ -26,9 +26,9 @@
from vdsm.config import config
from vdsm.utils import retry
from vdsm import ipwrapper
+from vdsm import jsonrpcvdscli
from vdsm import netinfo
from vdsm import supervdsm
-from vdsm import vdscli
from vdsm.netconfpersistence import RunningConfig
@@ -71,7 +71,9 @@
retry(self.start, (socket.error, KeyError), tries=30)
def start(self):
- self.vdscli = vdscli.connect()
+ requestQueues = config.get('addresses', 'request_queues')
+ requestQueue = requestQueues.split(",")[0]
+ self.vdscli = jsonrpcvdscli.connect(requestQueue, compat=False)
self.netinfo = self._get_netinfo()
if config.get('vars', 'net_persistence') == 'unified':
self.config = RunningConfig()
@@ -204,40 +206,40 @@
def getVdsStats(self):
result = self.vdscli.getVdsStats()
- return _parse_result(result, 'info')
+ return _parse_result(result, True)
def getAllVmStats(self):
result = self.vdscli.getAllVmStats()
- return _parse_result(result, 'statsList')
+ return _parse_result(result, True)
def getVmStats(self, vmId):
result = self.vdscli.getVmStats(vmId)
- if 'statsList' in result:
- code, msg, stats = _parse_result(result, 'statsList')
+ if 'result' in result:
+ code, msg, stats = _parse_result(result, True)
return code, msg, stats[0]
else:
return _parse_result(result)
def getVmList(self, vmId):
- result = self.vdscli.list('true', [vmId])
- code, msg, vm_list = _parse_result(result, 'vmList')
+ result = self.vdscli.fullList([vmId])
+ code, msg, vm_list = _parse_result(result, True)
return code, msg, vm_list[0]
def getVdsCapabilities(self):
result = self.vdscli.getVdsCapabilities()
- return _parse_result(result, 'info')
+ return _parse_result(result, True)
def updateVmPolicy(self, vmId, vcpuLimit):
result = self.vdscli.updateVmPolicy([vmId, vcpuLimit])
return _parse_result(result)
-def _parse_result(result, return_value=None):
+def _parse_result(result, return_value=False):
status = result['status']
code = status['code']
msg = status['message']
- if code == SUCCESS and return_value is not None:
- return code, msg, result[return_value]
+ if code == SUCCESS and return_value:
+ return code, msg, result['result']
else:
return code, msg
--
To view, visit https://gerrit.ovirt.org/45789
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaba1e2811f010e4509a658acef8040ad8f39cece
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Yeela Kaplan <ykaplan(a)redhat.com>
8 years, 7 months
Change in vdsm[master]: v2v: janitorial: use the response module
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: v2v: janitorial: use the response module
......................................................................
v2v: janitorial: use the response module
This patch makes v2v.py use the response module.
Change-Id: I678e764a2895e7c34df6852759b1d43baeb687a1
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M vdsm/v2v.py
1 file changed, 4 insertions(+), 5 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/47/38447/1
diff --git a/vdsm/v2v.py b/vdsm/v2v.py
index 4fbecf0..18f59d9 100644
--- a/vdsm/v2v.py
+++ b/vdsm/v2v.py
@@ -22,8 +22,8 @@
import libvirt
-from vdsm.define import errCode, doneCode
from vdsm import libvirtconnection
+from vdsm import response
import caps
@@ -39,7 +39,7 @@
def get_external_vms(uri, username, password):
if not supported():
- return errCode["noimpl"]
+ return response.error("noimpl")
try:
conn = libvirtconnection.open_connection(uri=uri,
@@ -47,8 +47,7 @@
passwd=password)
except libvirt.libvirtError as e:
logging.error('error connection to hypervisor: %r', e.message)
- return {'status': {'code': errCode['V2VConnection']['status']['code'],
- 'message': e.message}}
+ return response.error('V2VConnection', e.message)
with closing(conn):
vms = []
@@ -67,7 +66,7 @@
for disk in params['disks']:
_add_disk_info(conn, disk)
vms.append(params)
- return {'status': doneCode, 'vmList': vms}
+ return response.success(vmList=vms)
def _mem_to_mib(size, unit):
--
To view, visit https://gerrit.ovirt.org/38447
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I678e764a2895e7c34df6852759b1d43baeb687a1
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
8 years, 7 months
Change in vdsm[master]: migration: make stop() update internal status
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: migration: make stop() update internal status
......................................................................
migration: make stop() update internal status
The SourceThread.stop() operation should update
the internal status accordingly in case of success.
Previously, it was the caller that updated the status (!)
adding unneeded coupling.
Change-Id: I0ab50fc789dde969b2fb9ab969241ed4ad12545c
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M vdsm/virt/migration.py
1 file changed, 4 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/20/40520/1
diff --git a/vdsm/virt/migration.py b/vdsm/virt/migration.py
index 1eaed05..61d5a7b 100644
--- a/vdsm/virt/migration.py
+++ b/vdsm/virt/migration.py
@@ -371,6 +371,10 @@
except libvirt.libvirtError:
if not self._preparingMigrationEvt:
raise
+ else:
+ self.status['status']['message'] = \
+ 'Migration process cancelled'
+ return self.status
def exponential_downtime(downtime, steps):
--
To view, visit https://gerrit.ovirt.org/40520
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0ab50fc789dde969b2fb9ab969241ed4ad12545c
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
8 years, 7 months
Change in vdsm[master]: WIP
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: WIP
......................................................................
WIP
Change-Id: I39c2e6e4bca286a513992b7231f1356e8dd871a1
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M vdsm/virt/sampling.py
1 file changed, 47 insertions(+), 37 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/31/40431/1
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 45afdfd..eec5ae5 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -551,51 +551,30 @@
return doms
-class HostStatsThread(threading.Thread):
- """
- A thread that periodically samples host statistics.
- """
- _CONNLOG = logging.getLogger('connectivity')
+class HostSampler(object):
+ _connlog = logging.getlogger('connectivity')
def __init__(self, log):
- self.startTime = time.time()
-
- threading.Thread.__init__(self)
- self.daemon = True
self._log = log
- self._stopEvent = threading.Event()
+ self._start_time = time.time()
self._samples = SampleWindow(size=5)
self._pid = os.getpid()
self._ncpus = max(os.sysconf('SC_NPROCESSORS_ONLN'), 1)
- self._sampleInterval = \
- config.getint('vars', 'host_sample_stats_interval')
+ self._sample_interval = config.getint(
+ 'vars', 'host_sample_stats_interval')
- def stop(self):
- self._stopEvent.set()
-
- def run(self):
- try:
- # wait a bit before starting to sample
- time.sleep(self._sampleInterval)
- while not self._stopEvent.isSet():
- try:
- sample = HostSample(self._pid)
- self._samples.append(sample)
- prev, last = self._samples.last_pair()
- if prev is None:
- self._CONNLOG.debug('%s', sample.to_connlog())
- else:
- diff = sample.connlog_diff(prev)
- if diff:
- self._CONNLOG.debug('%s', diff)
- except TimeoutError:
- self._log.exception("Timeout while sampling stats")
- self._stopEvent.wait(self._sampleInterval)
- except:
- if not self._stopEvent.isSet():
- self._log.exception("Error while sampling stats")
+ def __call__(self):
+ sample = HostSample(self._pid)
+ self._samples.append(sample)
+ prev, last = self._samples.last_pair()
+ if prev is None:
+ self._CONNLOG.debug('%s', sample.to_connlog())
+ else:
+ diff = sample.connlog_diff(prev)
+ if diff:
+ self._CONNLOG.debug('%s', diff)
@utils.memoized
def _boot_time(self):
@@ -625,7 +604,7 @@
hs0, hs1, _ = self._samples.stats()
interval = hs1.timestamp - hs0.timestamp
- stats.update(self._get_interfaces_stats(hs0, hs1, interval))
+ stats.update(_get_interfaces_stats(hs0, hs1, interval))
jiffies = (hs1.pidcpu.user - hs0.pidcpu.user) % (2 ** 32)
stats['cpuUserVdsmd'] = jiffies / interval
@@ -655,6 +634,37 @@
return stats
+
+class HostStatsThread(threading.Thread):
+ """
+ A thread that periodically samples host statistics.
+ """
+ _connlog = logging.getlogger('connectivity')
+
+ def __init__(self, log):
+ threading.Thread.__init__(self)
+ self._sampler = HostSampler(log)
+ self.daemon = True
+ self._stopEvent = threading.Event()
+
+ def stop(self):
+ self._stopEvent.set()
+
+ def run(self):
+ try:
+ # wait a bit before starting to sample
+ time.sleep(self._sampleInterval)
+ while not self._stopEvent.isSet():
+ try:
+ self._sampler()
+ except TimeoutError:
+ self._log.exception("Timeout while sampling stats")
+ self._stopEvent.wait(self._sampleInterval)
+ except:
+ if not self._stopEvent.isSet():
+ self._log.exception("Error while sampling stats")
+
+
def _get_cpu_core_stats(hs0, hs1):
"""
:returns: a dict that with the following formats:
--
To view, visit https://gerrit.ovirt.org/40431
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I39c2e6e4bca286a513992b7231f1356e8dd871a1
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
8 years, 7 months