Change in vdsm[master]: hook: diskunmap: To include UNMAP support for disk and lun d...
by apahim@redhat.com
Amador Pahim has uploaded a new change for review.
Change subject: hook: diskunmap: To include UNMAP support for disk and lun devices
......................................................................
hook: diskunmap: To include UNMAP support for disk and lun devices
This hook goes through VM definitions xml file and manipulate its
disk device if it "disk" or "lun" and if the bus is "scsi" or "ide",
adding "discard=unmap" option to "-drive" qemu command line patameter.
UNMAP support was added to qemu 1.5 and it's intendded to return the
unused/freed blocks back to the storage, optimizing the utilizationi,
specially useful for thin provisioned LUNs.
Result example:
<disk device="disk" snapshot="no" type="block">
...
<target bus="ide" dev="hda"/>
...
<driver cache="none" discard="unmap" ... />
</disk>
<disk device="lun" sgio="filtered" snapshot="no" type="block">
...
<target bus="scsi" dev="sda"/>
...
<driver cache="none" discard="unmap" ... />
</disk>
Change-Id: I36385f1af24043755b3d4b6594bbe598b0d9518d
Signed-off-by: Amador Pahim <apahim(a)redhat.com>
---
M configure.ac
M vdsm.spec.in
M vdsm_hooks/Makefile.am
A vdsm_hooks/diskunmap/Makefile.am
A vdsm_hooks/diskunmap/README
A vdsm_hooks/diskunmap/before_vm_start.py
6 files changed, 178 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/70/29770/1
diff --git a/configure.ac b/configure.ac
index d741c1b..ce9d800 100644
--- a/configure.ac
+++ b/configure.ac
@@ -285,6 +285,7 @@
vdsm_hooks/Makefile
vdsm_hooks/checkimages/Makefile
vdsm_hooks/directlun/Makefile
+ vdsm_hooks/diskunmap/Makefile
vdsm_hooks/ethtool_options/Makefile
vdsm_hooks/extnet/Makefile
vdsm_hooks/fakevmstats/Makefile
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 2b67962..2c48e3e 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -366,6 +366,14 @@
VDSM hook used to perform consistency check on a qcow2 format disk image
using the QEMU disk image utility.
+%package hook-diskunmap
+Summary: Activate UNMAP for disk/lun devices
+BuildArch: noarch
+Requires: qemu-kvm >= 1.5
+
+%description hook-diskunmap
+VDSM hooks which allow to activate disk UNMAP.
+
%package hook-ethtool-options
Summary: Allow setting custom ethtool options for vdsm controlled nics
BuildArch: noarch
@@ -1282,6 +1290,10 @@
%{_libexecdir}/%{vdsm_name}/hooks/after_vm_destroy/50_directlun
%{_libexecdir}/%{vdsm_name}/hooks/before_vm_migrate_destination/50_directlun
+%files hook-diskunmap
+%defattr(-, root, root, -)
+%{_libexecdir}/%{vdsm_name}/hooks/before_vm_start/50_diskunmap
+
%files hook-fakevmstats
%defattr(-, root, root, -)
%{_libexecdir}/%{vdsm_name}/hooks/after_get_all_vm_stats/10_fakevmstats
diff --git a/vdsm_hooks/Makefile.am b/vdsm_hooks/Makefile.am
index 5e4d731..20578db 100644
--- a/vdsm_hooks/Makefile.am
+++ b/vdsm_hooks/Makefile.am
@@ -27,6 +27,7 @@
SUBDIRS += \
checkimages \
directlun \
+ diskunmap \
extnet \
fileinject \
fakevmstats \
diff --git a/vdsm_hooks/diskunmap/Makefile.am b/vdsm_hooks/diskunmap/Makefile.am
new file mode 100644
index 0000000..f2b9a85
--- /dev/null
+++ b/vdsm_hooks/diskunmap/Makefile.am
@@ -0,0 +1,31 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+#
+EXTRA_DIST = \
+ before_vm_start.py
+
+install-data-local:
+ $(MKDIR_P) $(DESTDIR)$(vdsmhooksdir)/before_vm_start
+ $(INSTALL_SCRIPT) $(srcdir)/before_vm_start.py \
+ $(DESTDIR)$(vdsmhooksdir)/before_vm_start/50_diskunmap
+
+uninstall-local:
+ $(RM) $(DESTDIR)$(vdsmhooksdir)/before_vm_start/50_diskunmap
+
diff --git a/vdsm_hooks/diskunmap/README b/vdsm_hooks/diskunmap/README
new file mode 100644
index 0000000..35751d6
--- /dev/null
+++ b/vdsm_hooks/diskunmap/README
@@ -0,0 +1,40 @@
+diskunmap vdsm hook:
+==================
+This hook goes through all of the VM's disks and manipulate its XML
+file acccording to the input. This can be used to enable UNMAP feature
+to discard unused blocks for better use of thin provisioned storage
+devices.
+
+Syntax:
+ diskunmap=(off|on)
+
+Where:
+ 'on' is unmap enabled and off (default) is unmap disabled.
+
+Example:
+ diskunmap=on
+
+Installation:
+ - Use the engine-config to append the proper custom property:
+ $ sudo engine-config -s UserDefinedVMProperties='diskunmap=^(off|on)$'
+ - Verify that the macbind custom property was properly added:
+ $ sudo engine-config -g UserDefinedVMProperties
+
+Usage:
+ In the VM configuration window, open the custom properites tab, select
+ diskmap and select 'on', activating disk UNMAP for all disks and LUNs.
+ Only devices using IDE or VirtIO-SCSI interface will be affected. UNMAP is
+ not available for VirtIO interface.
+
+Expected Result:
+ For every DISK or LUN device, this configuration will include
+ "discard=unmap" to disk driver xml:
+
+ <disk device="disk" snapshot="no" type="block">
+ ...
+ <driver cache="none" discard="unmap" ... />
+ </disk>
+
+ This option will be tranlated to qemu as bellow:
+
+ ... -drive file=<file>,discard=unmap,...
diff --git a/vdsm_hooks/diskunmap/before_vm_start.py b/vdsm_hooks/diskunmap/before_vm_start.py
new file mode 100644
index 0000000..2fc0b81
--- /dev/null
+++ b/vdsm_hooks/diskunmap/before_vm_start.py
@@ -0,0 +1,93 @@
+#!/usr/bin/env python2
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+
+'''
+Hook to enable disk UNMAP for disk devices.
+
+Syntax:
+ diskunmap=(off|on)
+
+Example:
+ diskunmap=on
+'''
+
+import os
+import sys
+import traceback
+from xml.dom import minidom
+
+import hooking
+
+
+def addDiscardUnmap(domxml):
+ for disk in domxml.getElementsByTagName('disk'):
+ device = disk.getAttribute('device')
+ target = disk.getElementsByTagName('target')[0]
+ bus = target.getAttribute('bus')
+ if ((device == 'disk' or device == 'lun')
+ and (bus == 'scsi' or bus == 'ide')):
+ driver = disk.getElementsByTagName('driver')[0]
+ driver.setAttribute('discard', 'unmap')
+
+
+def main():
+ if 'diskunmap' in os.environ:
+ unmapConfig = os.environ['diskunmap']
+ domxml = hooking.read_domxml()
+ if unmapConfig == 'on':
+ addDiscardUnmap(domxml)
+ hooking.write_domxml(domxml)
+
+
+def test():
+ text = '''<disk device="disk" snapshot="no" type="block">
+<address bus="0" controller="0" target="0" type="drive" unit="0"/>
+<source dev="/rhev/data-center/mnt/blockSD/
+b4cf7d74-6a07-4138-9d4f-80b14c3acefd/images/
+1580607a-b240-4199-99ac-3d2162934ba6/e5ba276f-dcba-4582-b13f-c165afa2f575"/>
+<target bus="ide" dev="hda"/>
+<serial>1580607a-b240-4199-99ac-3d2162934ba6</serial>
+<driver cache="none" error_policy="stop" io="native"
+name="qemu" type="raw"/>
+</disk>'''
+
+ xmldom = minidom.parseString(text)
+
+ disk = xmldom.getElementsByTagName('disk')[0]
+ print "\nDisk device definition before execution: \n%s"\
+ % disk.toxml(encoding='UTF-8')
+
+ addDiscardUnmap(xmldom)
+
+ print "\nDisk device after setting discard attribute: \n%s"\
+ % disk.toxml(encoding='UTF-8')
+
+
+if __name__ == '__main__':
+ try:
+ if '--test' in sys.argv:
+ test()
+ else:
+ main()
+ except:
+ hooking.exit_hook(' diskunmap hook: [unexpected error]: %s\n' %
+ traceback.format_exc())
--
To view, visit http://gerrit.ovirt.org/29770
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I36385f1af24043755b3d4b6594bbe598b0d9518d
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Amador Pahim <apahim(a)redhat.com>
8 years, 11 months
Change in vdsm[ovirt-3.3]: vm: Continue to sample after errors
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: vm: Continue to sample after errors
......................................................................
vm: Continue to sample after errors
When vm is running, we monitor disk usage, and if the disk becomes too
full, we extend the disk. This avoid pausing of the vm after io errors.
However, when sampling vm with multiple disks, an error when sampling
one disk exit the sampling function and skip the next disks, making this
machnisim useless.
This patch logs exceptions raised when sampling one disk and continue to
sample others.
This patch is for ovirt-3.3.1 only - master patch must be different
because of recent refactoring in this area.
Change-Id: I8dbe60a4d3b216a5cd998d163407c09b12f2f28c
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/vm.py
1 file changed, 14 insertions(+), 10 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/75/22575/1
diff --git a/vdsm/vm.py b/vdsm/vm.py
index 90ab5a7..2d0cece 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -506,20 +506,24 @@
return
for vmDrive in self._vm._devices[DISK_DEVICES]:
- if not vmDrive.isExtendable():
- continue
+ try:
+ if not vmDrive.isExtendable():
+ continue
- capacity, alloc, physical = \
- self._vm._dom.blockInfo(vmDrive.path, 0)
+ capacity, alloc, physical = \
+ self._vm._dom.blockInfo(vmDrive.path, 0)
- if physical - alloc >= vmDrive.watermarkLimit:
- continue
+ if physical - alloc >= vmDrive.watermarkLimit:
+ continue
- self._log.info('%s/%s apparent: %s capacity: %s, alloc: %s, '
- 'phys: %s', vmDrive.domainID, vmDrive.volumeID,
- vmDrive.apparentsize, capacity, alloc, physical)
+ self._log.info('%s/%s apparent: %s capacity: %s, alloc: %s, '
+ 'phys: %s', vmDrive.domainID, vmDrive.volumeID,
+ vmDrive.apparentsize, capacity, alloc, physical)
- self._vm.extendDriveVolume(vmDrive)
+ self._vm.extendDriveVolume(vmDrive)
+ except Exception:
+ self._log.exception("%s/%s", vmDrive.domainID,
+ vmDrive.volumeID)
def _updateVolumes(self):
if not self._vm.isDisksStatsCollectionEnabled():
--
To view, visit http://gerrit.ovirt.org/22575
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8dbe60a4d3b216a5cd998d163407c09b12f2f28c
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: ovirt-3.3
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 11 months
Change in vdsm[ovirt-3.3]: vm: Fix attribute error when accessing drive in sampling method
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: vm: Fix attribute error when accessing drive in sampling method
......................................................................
vm: Fix attribute error when accessing drive in sampling method
Du to race when migration is finished and monitoring, drive may not have
a format attribute when accessing it from the monitor. This patch use
getattr to log spam.
Change-Id: Ia50e8af94b9c9b54332066a3f30999ce73e7a56f
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/vm.py
1 file changed, 2 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/18/22518/1
diff --git a/vdsm/vm.py b/vdsm/vm.py
index bb4a7ec..7e2d220 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -506,7 +506,8 @@
return
for vmDrive in self._vm._devices[DISK_DEVICES]:
- if not vmDrive.blockDev or vmDrive.format != 'cow':
+ # Note: drive may not have a format attribute during migration
+ if not vmDrive.blockDev or getattr(vmDrive, 'format', None) != 'cow':
continue
capacity, alloc, physical = \
--
To view, visit http://gerrit.ovirt.org/22518
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia50e8af94b9c9b54332066a3f30999ce73e7a56f
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: ovirt-3.3
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 11 months
Change in vdsm[master]: Moving conf init things together
by Vinzenz Feenstra
Vinzenz Feenstra has uploaded a new change for review.
Change subject: Moving conf init things together
......................................................................
Moving conf init things together
Change-Id: I41ac420cd0da5a3118ce5b3c4c3643b292ec2e86
Signed-off-by: Vinzenz Feenstra <vfeenstr(a)redhat.com>
---
M vdsm/rpc/Bridge.py
M vdsm/virt/vm.py
2 files changed, 2 insertions(+), 3 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/20/29320/1
diff --git a/vdsm/rpc/Bridge.py b/vdsm/rpc/Bridge.py
index 539c70c..12ed98e 100644
--- a/vdsm/rpc/Bridge.py
+++ b/vdsm/rpc/Bridge.py
@@ -286,6 +286,7 @@
"""
return [v['vmId'] for v in ret['vmList']]
+
def Host_queryVms_Ret(ret):
"""
The result contains two data structures which must be merged
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 2c5c910..098a3d5 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -1814,8 +1814,7 @@
"""
self._dom = None
self.recovering = recover
- self.conf = {'pid': '0'}
- self.conf['_blockJobs'] = {}
+ self.conf = {'pid': '0', '_blockJobs': {}, 'clientIp': ''}
self.conf.update(params)
self._initLegacyConf() # restore placeholders for BC sake
self.cif = cif
@@ -1825,7 +1824,6 @@
str(self.conf['vmId']) + '.recovery'
self.user_destroy = False
self._monitorResponse = 0
- self.conf['clientIp'] = ''
self.memCommitted = 0
self._confLock = threading.Lock()
self._jobsLock = threading.Lock()
--
To view, visit http://gerrit.ovirt.org/29320
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I41ac420cd0da5a3118ce5b3c4c3643b292ec2e86
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Vinzenz Feenstra <vfeenstr(a)redhat.com>
8 years, 11 months
Change in vdsm[master]: sampling: use constants for counter bounds
by Dan Kenigsberg
Dan Kenigsberg has uploaded a new change for review.
Change subject: sampling: use constants for counter bounds
......................................................................
sampling: use constants for counter bounds
When we report cpu and network usage, we take two samples of Linux
counters, and divide their difference by the elapsed time. If a sampled
counter wraps around its upper bound, we might report an invalid
negative value. To avoid that, we take the modulu of the difference.
For example, assume that the first sample was (2**64 - 10) jiffies and
30 jiffies have passed until the second sample, the difference would be
the hugely negative value (30 - 2**64). Taking modulu 2**64 returns the
correct value of 30 jiffies.
JIFFIES_BOUND is taken from the size of clock_t and NETSTATS_BOUND -
from the size of the fields of struct net_device_stats. I am not aware
of any programmatic way to acquire this value, but they are both of 64
bit size on x86_64 and ppc64.
Taking modulu 2**32 works perfectly well, since two subsequent samples
are unlikly to be that far apart, and it has the benefit of working well
on a 32 bit host, too.
Change-Id: I706000106c3bc31edf8541c980bce1f49464ebf8
Signed-off-by: Dan Kenigsberg <danken(a)redhat.com>
---
M vdsm/sampling.py
M vdsm/vm.py
2 files changed, 11 insertions(+), 8 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/94/24194/1
diff --git a/vdsm/sampling.py b/vdsm/sampling.py
index 54b6381..a5492ce 100644
--- a/vdsm/sampling.py
+++ b/vdsm/sampling.py
@@ -42,6 +42,9 @@
if not os.path.exists(_THP_STATE_PATH):
_THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled'
+JIFFIES_BOUND = 2 ** 32
+NETSTATS_BOUND = 2 ** 32
+
class InterfaceSample:
"""
@@ -430,14 +433,14 @@
return stats
hs0, hs1 = self._samples[0], self._samples[-1]
interval = hs1.timestamp - hs0.timestamp
- jiffies = (hs1.pidcpu.user - hs0.pidcpu.user) % (2 ** 32)
+ jiffies = (hs1.pidcpu.user - hs0.pidcpu.user) % JIFFIES_BOUND
stats['cpuUserVdsmd'] = (jiffies / interval)
- jiffies = hs1.pidcpu.sys - hs0.pidcpu.sys % (2 ** 32)
+ jiffies = hs1.pidcpu.sys - hs0.pidcpu.sys % JIFFIES_BOUND
stats['cpuSysVdsmd'] = (jiffies / interval)
- jiffies = (hs1.totcpu.user - hs0.totcpu.user) % (2 ** 32)
+ jiffies = (hs1.totcpu.user - hs0.totcpu.user) % JIFFIES_BOUND
stats['cpuUser'] = jiffies / interval / self._ncpus
- jiffies = (hs1.totcpu.sys - hs0.totcpu.sys) % (2 ** 32)
+ jiffies = (hs1.totcpu.sys - hs0.totcpu.sys) % JIFFIES_BOUND
stats['cpuSys'] = jiffies / interval / self._ncpus
stats['cpuIdle'] = max(0.0,
100.0 - stats['cpuUser'] - stats['cpuSys'])
@@ -479,9 +482,9 @@
ifrate = ifrate or 1000
Mbps2Bps = (10 ** 6) / 8
thisRx = (hs1.interfaces[ifid].rx - hs0.interfaces[ifid].rx) % \
- (2 ** 32)
+ NETSTATS_BOUND
thisTx = (hs1.interfaces[ifid].tx - hs0.interfaces[ifid].tx) % \
- (2 ** 32)
+ NETSTATS_BOUND
rxRate = 100.0 * thisRx / interval / ifrate / Mbps2Bps
txRate = 100.0 * thisTx / interval / ifrate / Mbps2Bps
if txRate > 100 or rxRate > 100:
diff --git a/vdsm/vm.py b/vdsm/vm.py
index aae8bd6..07fb581 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -606,11 +606,11 @@
ifRxBytes = (100.0 *
(eInfo[nic.name][0] - sInfo[nic.name][0]) %
- 2 ** 32 /
+ sampling.NETSTATS_BOUND /
sampleInterval / ifSpeed / self.MBPS_TO_BPS)
ifTxBytes = (100.0 *
(eInfo[nic.name][4] - sInfo[nic.name][4]) %
- 2 ** 32 /
+ sampling.NETSTATS_BOUND /
sampleInterval / ifSpeed / self.MBPS_TO_BPS)
ifStats['rxRate'] = '%.1f' % ifRxBytes
--
To view, visit http://gerrit.ovirt.org/24194
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I706000106c3bc31edf8541c980bce1f49464ebf8
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Dan Kenigsberg <danken(a)redhat.com>
8 years, 11 months
Change in vdsm[master]: gluster: fix volume name parsing in getVmVolumeInfo
by Federico Simoncelli
Federico Simoncelli has uploaded a new change for review.
Change subject: gluster: fix volume name parsing in getVmVolumeInfo
......................................................................
gluster: fix volume name parsing in getVmVolumeInfo
It is permitted to use slashes '/' in gluster mount strings even
though they're prohibited in the volume name.
In order to recognize and strip them out we should use getRealPath
instead of getRemotePath that translates slashes into permitted
underscores.
It is now allowed to mount gluster volumes with slashes (serv:/volume)
and correctly identifying the volume name and its information.
Change-Id: Icfdbc573ec2aec31b78323253f8178425a07302c
Signed-off-by: Federico Simoncelli <fsimonce(a)redhat.com>
---
M vdsm/storage/glusterVolume.py
1 file changed, 3 insertions(+), 4 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/37/36237/1
diff --git a/vdsm/storage/glusterVolume.py b/vdsm/storage/glusterVolume.py
index 647b1ec..8f701ba 100644
--- a/vdsm/storage/glusterVolume.py
+++ b/vdsm/storage/glusterVolume.py
@@ -19,10 +19,9 @@
"""
Send info to represent Gluster volume as a network block device
"""
- rpath = sdCache.produce(self.sdUUID).getRemotePath()
- rpath_list = rpath.rsplit(":", 1)
- volfileServer = rpath_list[0]
- volname = rpath_list[1]
+ rpath = sdCache.produce(self.sdUUID).getRealPath()
+ volfileServer, volname = rpath.rsplit(":", 1)
+ volname = volname.strip('/')
# Volume transport to Libvirt transport mapping
VOLUME_TRANS_MAP = {'TCP': 'tcp', 'RDMA': 'rdma'}
--
To view, visit http://gerrit.ovirt.org/36237
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Icfdbc573ec2aec31b78323253f8178425a07302c
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Federico Simoncelli <fsimonce(a)redhat.com>
8 years, 11 months
Change in vdsm[master]: lvm: Cleanup lvm udev rules
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: lvm: Cleanup lvm udev rules
......................................................................
lvm: Cleanup lvm udev rules
Make the vdsm lvm rules suck less:
- Check the vg name once, instead repating the horrible uuid pattern on
every rule
- Group rules with same settings (e.g. sanlock rules)
- Document each group of rules
- Remove irelevant documentation, copied from dm-lvm.rules
Change-Id: Ib185a7d9a5da54036b8b0163a83ff6376b511533
Releates-To: https://bugzilla.redhat.com/1149883
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/storage/vdsm-lvm.rules.tpl.in
1 file changed, 17 insertions(+), 20 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/74/33874/1
diff --git a/vdsm/storage/vdsm-lvm.rules.tpl.in b/vdsm/storage/vdsm-lvm.rules.tpl.in
index fb6c87a..bdb33f1 100644
--- a/vdsm/storage/vdsm-lvm.rules.tpl.in
+++ b/vdsm/storage/vdsm-lvm.rules.tpl.in
@@ -1,5 +1,5 @@
#
-# Copyright 2010 Red Hat, Inc. and/or its affiliates.
+# Copyright 2010-2014 Red Hat, Inc. and/or its affiliates.
#
# Licensed to you under the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
@@ -7,17 +7,8 @@
# LICENSE_GPL_v2 which accompany this distribution.
#
-# Udev rules for LVM.
-#
-# These rules create symlinks for LVM logical volumes in
-# /dev/VG directory (VG is an actual VG name). Some udev
-# environment variables are set (they can be used in later
-# rules as well):
-# DM_LV_NAME - logical volume name
-# DM_VG_NAME - volume group name
-# DM_LV_LAYER - logical volume layer (blank if not set)
+# Vdsm rules for lvm logical volumes.
{{if chcon_hack}}
-#
# The libvirt image label is required to allow qemu to access volumes. Libvirt
# sets this label on volumes when starting a vm, but on EL7 and Fedora the
# label is lost after refreshing a logical volume, and vm get paused. This rule
@@ -28,15 +19,21 @@
# "add" event is processed on coldplug only, so we need "change", too.
ACTION!="add|change", GOTO="lvm_end"
-# Fix ownership for RHEV volumes
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@"{{if chcon_hack}}, RUN+="vdsm-chcon"{{endif}}, GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]_MERGE", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="_remove_me_[a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9]_[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="metadata", MODE:="0600", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="ids", MODE:="0660", OWNER:="@VDSMUSER@", GROUP:="@SNLKGROUP@", GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="inbox", MODE:="0600", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="outbox", MODE:="0600", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
-ENV{DM_VG_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", ENV{DM_LV_NAME}=="leases", MODE:="0660", OWNER:="@VDSMUSER@", GROUP:="@SNLKGROUP@", GOTO="lvm_end"
+# Filter out vgs which do not look like a vdsm vg
+ENV{DM_VG_NAME}!="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", GOTO="lvm_end"
+
+# Volumes used as vdsm images
+ENV{DM_LV_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@"{{if chcon_hack}}, RUN+="vdsm-chcon"{{endif}}, GOTO="lvm_end"
+
+# Temprory volumes - not accessed by libvirt/qemu
+ENV{DM_LV_NAME}=="[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]_MERGE", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
+ENV{DM_LV_NAME}=="_remove_me_[a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9][a-zA-Z0-9]_[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9]-[a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9][a-f0-9]", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
+
+# Special storage domain volumes used by vdsm
+ENV{DM_LV_NAME}=="metadata|inbox|outbox", MODE:="0600", OWNER:="@VDSMUSER@", GROUP:="@QEMUGROUP@", GOTO="lvm_end"
+
+# Special storage domain volumes used by sanlock
+ENV{DM_LV_NAME}=="ids|leases", MODE:="0660", OWNER:="@VDSMUSER@", GROUP:="@SNLKGROUP@", GOTO="lvm_end"
# FIXME: make special lvs vdsm-only readable, MODE doesn't work on rhel6
--
To view, visit http://gerrit.ovirt.org/33874
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib185a7d9a5da54036b8b0163a83ff6376b511533
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 12 months
Change in vdsm[master]: broker_support
by smizrahi@redhat.com
Saggi Mizrahi has uploaded a new change for review.
Change subject: broker_support
......................................................................
broker_support
Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71
Signed-off-by: Saggi Mizrahi <smizrahi(a)redhat.com>
---
A lib/stompClient.py
M lib/yajsonrpc/__init__.py
M lib/yajsonrpc/stomp.py
M lib/yajsonrpc/stompReactor.py
4 files changed, 340 insertions(+), 215 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/68/36368/1
diff --git a/lib/stompClient.py b/lib/stompClient.py
new file mode 100644
index 0000000..d3cd0ab
--- /dev/null
+++ b/lib/stompClient.py
@@ -0,0 +1,136 @@
+#!/usr/bin/python
+import yajsonrpc as yjrpc
+import yajsonrpc.stompReactor as sr
+import socket
+from threading import Thread, Lock
+import time
+
+_reactor = None
+_reactorLock = Lock()
+
+
+def get_reactor():
+ global _reactor
+ if _reactor is None:
+ with _reactorLock:
+ if _reactor is None:
+ _reactor = sr.StompReactor()
+ t = Thread(target=_reactor.process_requests)
+ t.setDaemon(True)
+ t.start()
+
+ return _reactor
+
+
+class EchoServer(object):
+ def echo(self, text):
+ return text
+
+ def register_server_address(self, *args, **kwargs):
+ pass
+
+ def unregister_server_address(self, *args, **kwargs):
+ pass
+
+
+def dummy_server(address):
+ sock = socket.create_connection(address)
+ reactor = get_reactor()
+ stomp_client = reactor.createClient(sock)
+ server = yjrpc.JsonRpcServer(EchoServer())
+ t = Thread(target=server.serve_requests)
+ t.setDaemon(True)
+ t.start()
+ sub = stomp_client.subscribe(
+ sr._DEFAULT_REQUEST_DESTINATION,
+ message_handler=ServerRpcContextAdapter.subscription_handler(server)
+ )
+ server._sub_ = sub
+ return server
+
+
+class ServerRpcContextAdapter(object):
+ @classmethod
+ def subscription_handler(cls, server):
+ def handler(sub, frame):
+ server.queueRequest(
+ (
+ ServerRpcContextAdapter(sub.client, frame),
+ frame.body
+ )
+ )
+
+ return handler
+
+ def __init__(self, client, request_frame):
+ self._client = client
+ self._reply_to = request_frame.headers.get('reply-to', None)
+
+ def get_local_address(self, *args, **kwargs):
+ return ""
+
+ def send(self, data):
+ if self._reply_to is None:
+ return
+
+ self._client.send(
+ self._reply_to,
+ data,
+ {
+ "content-type": "application/json",
+ }
+ )
+
+
+class ClientRpcTransportAdapter(object):
+ def __init__(self, sub, destination, client):
+ self._sub = sub
+ sub.set_message_handler(self._handle_message)
+ self._destination = destination
+ self._client = client
+ self._message_handler = lambda arg: None
+
+ def setMessageHandler(self, handler):
+ self._message_handler = handler
+
+ def send(self, data):
+ headers = {
+ "content-type": "application/json",
+ "reply-to": self._sub.destination,
+ }
+ self._client.send(
+ data,
+ self._destination,
+ headers,
+ )
+
+ def _handle_message(self, sub, frame):
+ self._message_handler((self, frame.body))
+
+ def close(self):
+ self._sub.unsubscribe()
+
+
+def connect(address):
+ sock = socket.create_connection(address)
+ reactor = get_reactor()
+ stomp_client = reactor.createClient(sock)
+ subscription = stomp_client.subscribe(sr._DEFAULT_RESPONSE_DESTINATIOM)
+ client = yjrpc.JsonRpcClient(
+ ClientRpcTransportAdapter(
+ subscription,
+ sr._DEFAULT_REQUEST_DESTINATION,
+ stomp_client,
+ )
+ )
+ return client
+
+
+BROKER_ADDRESS = ("127.0.0.1", 5445)
+
+server = dummy_server(BROKER_ADDRESS)
+
+time.sleep(2)
+
+client = connect(BROKER_ADDRESS)
+client.callMethod("echo", ["123"], 1)
diff --git a/lib/yajsonrpc/__init__.py b/lib/yajsonrpc/__init__.py
index 8120c03..6e485bf 100644
--- a/lib/yajsonrpc/__init__.py
+++ b/lib/yajsonrpc/__init__.py
@@ -50,7 +50,7 @@
log = logging.getLogger("JsonRpcInvalidRequestError")
def __init__(self, object_name, msg_content):
- self.log.error("Invalid message found " + msg_content)
+ self.log.error("Invalid message found %s", msg_content)
JsonRpcError.__init__(self, -32600,
"The JSON sent is not a valid Request object "
"with " + object_name)
@@ -100,7 +100,7 @@
raise JsonRpcInvalidRequestError("missing method header", obj)
reqId = obj.get("id")
- if not isinstance(reqId, (str, unicode)):
+ if not isinstance(reqId, (str, unicode, int)):
raise JsonRpcInvalidRequestError("missing request identifier",
obj)
@@ -151,19 +151,7 @@
@staticmethod
def decode(msg):
obj = json.loads(msg, 'utf-8')
-
- if "result" not in obj and "error" not in obj:
- raise JsonRpcInvalidRequestError("missing result or error info",
- obj)
-
- result = obj.get('result')
- error = JsonRpcError(**obj.get('error'))
-
- reqId = obj.get('id')
- if not isinstance(reqId, (str, unicode)):
- raise JsonRpcInvalidRequestError("missing response identifier",
- obj)
- return JsonRpcResponse(result, error, reqId)
+ return JsonRpcResponse.fromRawObject(obj)
@staticmethod
def fromRawObject(obj):
@@ -178,9 +166,6 @@
error = obj.get("error")
reqId = obj.get("id")
- if not isinstance(reqId, (str, unicode)):
- raise JsonRpcInvalidRequestError("missing response identifier",
- obj)
return JsonRpcResponse(result, error, reqId)
@@ -297,12 +282,6 @@
self._lock = Lock()
self._eventcbs = []
- def setTimeout(self, timeout):
- self._transport.setTimeout(timeout)
-
- def connect(self):
- self._transport.connect()
-
def callMethod(self, methodName, params=[], rid=None):
resp = self.call(JsonRpcRequest(methodName, params, rid))[0]
if resp.error:
@@ -354,6 +333,12 @@
resp = JsonRpcResponse.fromRawObject(resp)
with self._lock:
+ if resp.id is None:
+ self.log.warning(
+ "Got an error from server without an ID (%s)",
+ resp.error,
+ )
+
ctx = self._runningRequests.pop(resp.id)
ctx.addResponse(resp)
diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py
index 142b22a..250b0bd 100644
--- a/lib/yajsonrpc/stomp.py
+++ b/lib/yajsonrpc/stomp.py
@@ -16,13 +16,10 @@
import logging
import os
import socket
-from threading import Timer, Event
+from threading import Timer
from uuid import uuid4
from collections import deque
-import time
-from betterAsyncore import Dispatcher
-import asyncore
import re
_RE_ESCAPE_SEQUENCE = re.compile(r"\\(.)")
@@ -53,6 +50,16 @@
CONNECTED = "CONNECTED"
ERROR = "ERROR"
RECEIPT = "RECEIPT"
+
+
+class Headers:
+ CONTENT_LENGTH = "content-length"
+ CONTENT_TYPE = "content-type"
+ SUBSCRIPTION = "subscription"
+ DESTINATION = "destination"
+ ACCEPT_VERSION = "accept-version"
+ REPLY_TO = "reply-to"
+ HOST = "host"
COMMANDS = tuple([command for command in dir(Command)
@@ -120,8 +127,11 @@
def decodeValue(s):
# Make sure to leave this check before decoding as ':' can appear in the
# value after decoding using \c
- if ":" in s:
- raise ValueError("Contains illigal charachter `:`")
+ # Disabled due to bug in hornetq:
+ # https://issues.jboss.org/browse/HORNETQ-1454
+
+ # if ":" in s:
+ # raise ValueError("Contains illigal charachter `:`")
try:
s = _RE_ESCAPE_SEQUENCE.sub(
@@ -278,141 +288,15 @@
return None
-class Client(object):
- def __init__(self, sock=None):
- """
- Initialize the client.
-
- The socket parameter can be an already initialized socket. Should be
- used to pass specialized socket objects like SSL sockets.
- """
- if sock is None:
- sock = self._sock = socket.socket()
- else:
- self._sock = sock
-
- self._map = {}
- # Because we don't know how large the frames are
- # we have to use non bolocking IO
- sock.setblocking(False)
-
- # We have our own timeout for operations we
- # pretend to be synchronous (like connect).
- self._timeout = None
- self._connected = Event()
- self._subscriptions = {}
-
- self._aclient = None
- self._adisp = None
-
- self._inbox = deque()
-
- @property
- def outgoing(self):
- return self._adisp.outgoing
-
- def _registerSubscription(self, sub):
- self._subscriptions[sub.id] = sub
-
- def _unregisterSubscription(self, sub):
- del self._subscriptions[sub.id]
-
- @property
- def connected(self):
- return self._connected.isSet()
-
- def handle_connect(self, aclient, frame):
- self._connected.set()
-
- def handle_message(self, aclient, frame):
- self._inbox.append(frame)
-
- def process(self, timeout=None):
- if timeout is None:
- timeout = self._timeout
-
- asyncore.loop(use_poll=True, timeout=timeout, map=self._map, count=1)
-
- def connect(self, address, hostname):
- sock = self._sock
-
- self._aclient = AsyncClient(self, hostname)
- adisp = self._adisp = AsyncDispatcher(self._aclient)
- disp = self._disp = Dispatcher(adisp, sock, self._map)
- sock.setblocking(True)
- disp.connect(address)
- sock.setblocking(False)
- self.recv() # wait for CONNECTED msg
-
- if not self._connected.isSet():
- sock.close()
- raise socket.error()
-
- def recv(self):
- timeout = self._timeout
- s = time.time()
- duration = 0
- while timeout is None or duration < timeout:
- try:
- return self._inbox.popleft()
- except IndexError:
- td = timeout - duration if timeout is not None else None
- self.process(td)
- duration = time.time() - s
-
- return None
-
- def put_subscribe(self, destination, ack=None):
- subid = self._aclient.subscribe(self._adisp, destination, ack)
- sub = Subscription(self, subid, ack)
- self._registerSubscription(sub)
- return sub
-
- def put_send(self, destination, data="", headers=None):
- self._aclient.send(self._adisp, destination, data, headers)
-
- def put(self, frame):
- self._adisp.send_raw(frame)
-
- def send(self):
- disp = self._disp
- timeout = self._timeout
- duration = 0
- s = time.time()
- while ((timeout is None or duration < timeout) and
- (disp.writable() or not self._connected.isSet())):
- td = timeout - duration if timeout is not None else None
- self.process(td)
- duration = time.time() - s
-
- def gettimout(self):
- return self._timeout
-
- def settimeout(self, value):
- self._timeout = value
-
-
class AsyncDispatcher(object):
log = logging.getLogger("stomp.AsyncDispatcher")
- def __init__(self, frameHandler, bufferSize=4096):
- self._frameHandler = frameHandler
+ def __init__(self, frame_handler, bufferSize=4096):
+ self._frame_handler = frame_handler
self._bufferSize = bufferSize
self._parser = Parser()
- self._outbox = deque()
self._outbuf = None
self._outgoingHeartBeat = 0
-
- def _queueFrame(self, frame):
- self._outbox.append(frame)
-
- @property
- def outgoing(self):
- n = len(self._outbox)
- if self._outbuf != "":
- n += 1
-
- return n
def setHeartBeat(self, outgoing, incoming=0):
if incoming != 0:
@@ -422,7 +306,8 @@
self._outgoingHeartBeat = outgoing
def handle_connect(self, dispatcher):
- self._frameHandler.handle_connect(self)
+ self._outbuf = None
+ self._frame_handler.handle_connect(self)
def handle_read(self, dispatcher):
pending = self._bufferSize
@@ -444,11 +329,10 @@
if data is not None:
parser.parse(data)
- frameHandler = self._frameHandler
+ frame_handler = self._frame_handler
while parser.pending > 0:
frame = parser.popFrame()
- if hasattr(frameHandler, "handle_frame"):
- frameHandler.handle_frame(self, frame)
+ frame_handler.handle_frame(self, frame)
def popFrame(self):
return self._parser.popFrame()
@@ -456,7 +340,7 @@
def handle_write(self, dispatcher):
if self._outbuf is None:
try:
- frame = self._outbox.popleft()
+ frame = self._frame_handler.peek_message()
except IndexError:
return
@@ -467,14 +351,16 @@
self._lastOutgoingTimeStamp = self._milis()
if numSent == len(data):
self._outbuf = None
+ # Throw away the frame that was sent to the server
+ self._frame_handler.pop_message()
else:
self._outbuf = data[numSent:]
- def send_raw(self, frame):
- self._queueFrame(frame)
-
def writable(self, dispatcher):
- if len(self._outbox) > 0 or self._outbuf is not None:
+ if self._frame_handler.has_outgoing_messages:
+ return True
+
+ if self._outbuf is not None:
return True
if (self._outgoingHeartBeat > 0
@@ -493,51 +379,79 @@
class AsyncClient(object):
- log = logging.getLogger("yajsonrpc.protocols.stomp.AsyncClient")
+ log = logging.getLogger("yajsonrpc.stomp.AsyncClient")
- def __init__(self, frameHandler, hostname):
+ def __init__(self, hostname):
self._hostname = hostname
- self._frameHandler = frameHandler
self._connected = False
+ self._outbox = deque()
self._error = None
+ self._subscriptions = {}
self._commands = {
Command.CONNECTED: self._process_connected,
Command.MESSAGE: self._process_message,
Command.RECEIPT: self._process_receipt,
- Command.ERROR: self._process_error}
+ Command.ERROR: self._process_error,
+ }
@property
def connected(self):
return self._connected
+ def queue_frame(self, frame):
+ self._outbox.append(frame)
+
+ @property
+ def has_outgoing_messages(self):
+ return (self._outbox.count > 0)
+
+ def peek_message(self):
+ return self._outbox[0]
+
+ def pop_message(self):
+ return self._outbox.popleft()
+
def getLastError(self):
return self._error
- def handle_connect(self, dispatcher):
+ def handle_connect(self):
hostname = self._hostname
- frame = Frame(
+ # TODO : reset subscriptions
+ # We use appendleft to make sure this is the first frame we send in
+ # case of a reconnect
+ self._outbox.appendleft(Frame(
Command.CONNECT,
- {"accept-version": "1.2",
- "host": hostname})
-
- dispatcher.send_raw(frame)
+ {
+ Headers.ACCEPT_VERSION: "1.2",
+ Headers.HOST: hostname,
+ }
+ ))
def handle_frame(self, dispatcher, frame):
self._commands[frame.command](frame, dispatcher)
def _process_connected(self, frame, dispatcher):
self._connected = True
- frameHandler = self._frameHandler
- if hasattr(frameHandler, "handle_connect"):
- frameHandler.handle_connect(self, frame)
self.log.debug("Stomp connection established")
def _process_message(self, frame, dispatcher):
- frameHandler = self._frameHandler
+ sub_id = frame.headers.get(Headers.SUBSCRIPTION)
+ if sub_id is None:
+ self.log.warning(
+ "Got message without a subscription"
+ )
+ return
- if hasattr(frameHandler, "handle_message"):
- frameHandler.handle_message(self, frame)
+ sub = self._subscriptions.get(sub_id)
+ if sub is None:
+ self.log.warning(
+ "Got message without an unknown subscription id '%s'",
+ sub_id
+ )
+ return
+
+ sub._handle_message(frame)
def _process_receipt(self, frame, dispatcher):
self.log.warning("Receipt frame received and ignored")
@@ -545,42 +459,86 @@
def _process_error(self, frame, dispatcher):
raise StompError(frame)
- def send(self, dispatcher, destination, data="", headers=None):
- frame = Frame(
+ def send(self, destination, data="", headers=None):
+ final_headers = {"destination": destination}
+ final_headers.update(headers)
+ self.queue_frame(Frame(
Command.SEND,
- {"destination": destination},
- data)
+ final_headers,
+ data
+ ))
- dispatcher.send_raw(frame)
-
- def subscribe(self, dispatcher, destination, ack=None):
+ def subscribe(
+ self,
+ destination,
+ ack=None,
+ sub_id=None,
+ message_handler=None
+ ):
if ack is None:
ack = AckMode.AUTO
- subscriptionID = str(uuid4())
+ if message_handler is None:
+ message_handler = lambda sub, frame: None
- frame = Frame(
+ if sub_id is None:
+ sub_id = str(uuid4())
+
+ self.queue_frame(Frame(
Command.SUBSCRIBE,
- {"destination": destination,
- "ack": ack,
- "id": subscriptionID})
+ {
+ "destination": destination,
+ "ack": ack,
+ "id": sub_id
+ }
+ ))
- dispatcher.send_raw(frame)
+ sub = Subscription(
+ self,
+ destination,
+ sub_id,
+ ack,
+ message_handler,
+ )
+ self._subscriptions[sub_id] = sub
- return subscriptionID
+ return sub
class Subscription(object):
- def __init__(self, client, subid, ack):
+ def __init__(
+ self,
+ client,
+ destination,
+ subid,
+ ack,
+ message_handler
+ ):
self._ack = ack
self._subid = subid
self._client = client
self._valid = True
+ self._message_handler = message_handler
+ self._destination = destination
+
+ def _handle_message(self, frame):
+ self._message_handler(self, frame)
+
+ def set_message_handler(self, handler):
+ self._message_handler = handler
@property
def id(self):
return self._subid
+ @property
+ def destination(self):
+ return self._destination
+
+ @property
+ def client(self):
+ return self._client
+
def unsubscribe(self):
client = self._client
subid = self._subid
diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py
index 3aa85e8..68c21b6 100644
--- a/lib/yajsonrpc/stompReactor.py
+++ b/lib/yajsonrpc/stompReactor.py
@@ -17,6 +17,7 @@
import os
import threading
import logging
+from collections import deque
import stomp
@@ -27,8 +28,10 @@
_STATE_MSG = "Waiting for message"
-_DEFAULT_RESPONSE_DESTINATIOM = "/queue/_local/vdsm/reponses"
-_DEFAULT_REQUEST_DESTINATION = "/queue/_local/vdsm/requests"
+_DEFAULT_RESPONSE_DESTINATIOM = "jms.topic.vdsm_legacy_responses"
+_DEFAULT_REQUEST_DESTINATION = "jms.topic.vdsm_legacy_requests"
+
+_FAKE_SUB_ID = "__vdsm_fake_broker__"
def parseHeartBeatHeader(v):
@@ -55,6 +58,7 @@
def __init__(self, reactor, messageHandler):
self._reactor = reactor
+ self._outbox = deque()
self._messageHandler = messageHandler
self._commands = {
stomp.Command.CONNECT: self._cmd_connect,
@@ -62,13 +66,30 @@
stomp.Command.SUBSCRIBE: self._cmd_subscribe,
stomp.Command.UNSUBSCRIBE: self._cmd_unsubscribe}
+ @property
+ def has_outgoing_messages(self):
+ return (self._outbox.count > 0)
+
+ def peek_message(self):
+ return self._outbox[0]
+
+ def pop_message(self):
+ return self._outbox.popleft()
+
+ def queue_frame(self, frame):
+ self._outbox.append(frame)
+
def _cmd_connect(self, dispatcher, frame):
self.log.info("Processing CONNECT request")
version = frame.headers.get("accept-version", None)
if version != "1.2":
- res = stomp.Frame(stomp.Command.ERROR, None, "Version unsupported")
+ resp = stomp.Frame(
+ stomp.Command.ERROR,
+ None,
+ "Version unsupported"
+ )
else:
- res = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"})
+ resp = stomp.Frame(stomp.Command.CONNECTED, {"version": "1.2"})
cx, cy = parseHeartBeatHeader(
frame.headers.get("heart-beat", "0,0")
)
@@ -79,10 +100,10 @@
# The server can send a heart-beat every cy ms and doesn't want
# to receive any heart-beat from the client.
- res.headers["heart-beat"] = "%d,0" % (cy,)
+ resp.headers["heart-beat"] = "%d,0" % (cy,)
dispatcher.setHeartBeat(cy)
- dispatcher.send_raw(res)
+ self.queue_frame(resp)
self._reactor.wakeup()
def _cmd_subscribe(self, dispatcher, frame):
@@ -109,11 +130,12 @@
self._reactor = reactor
self._messageHandler = None
+ self._aclient = aclient
adisp = self._adisp = stomp.AsyncDispatcher(aclient)
self._dispatcher = Dispatcher(adisp, sock=sock, map=reactor._map)
def send_raw(self, msg):
- self._adisp.send_raw(msg)
+ self._aclient.queue_frame(msg)
self._reactor.wakeup()
def setTimeout(self, timeout):
@@ -161,10 +183,15 @@
def send(self, message):
self.log.debug("Sending response")
- res = stomp.Frame(stomp.Command.MESSAGE,
- {"destination": _DEFAULT_RESPONSE_DESTINATIOM,
- "content-type": "application/json"},
- message)
+ res = stomp.Frame(
+ stomp.Command.MESSAGE,
+ {
+ stomp.Headers.DESTINATION: _DEFAULT_RESPONSE_DESTINATIOM,
+ stomp.Headers.SUBSCRIPTION: _FAKE_SUB_ID,
+ stomp.Headers.CONTENT_TYPE: "application/json",
+ },
+ message
+ )
self._stompConn.send_raw(res)
def close(self):
@@ -182,12 +209,13 @@
self._messageHandler = None
self._socket = sock
- self._aclient = stomp.AsyncClient(self, "vdsm")
+ self._aclient = stomp.AsyncClient("vdsm")
self._stompConn = _StompConnection(
self._aclient,
sock,
reactor
)
+ self._aclient.handle_connect()
def setTimeout(self, timeout):
self._stompConn.setTimeout(timeout)
@@ -195,7 +223,7 @@
def connect(self):
self._stompConn.connect()
- def handle_message(self, impl, frame):
+ def handle_message(self, sub, frame):
if self._messageHandler is not None:
self._messageHandler((self, frame.body))
@@ -207,11 +235,29 @@
if isinstance(self._socket, SSLSocket) and self._socket.pending() > 0:
self._stompConn._dispatcher.handle_read()
- def send(self, message):
+ def subscribe(
+ self,
+ *args,
+ **kwargs
+ ):
+ return self._aclient.subscribe(*args, **kwargs)
+
+ def send(
+ self,
+ message,
+ destination=None,
+ headers=None
+ ):
+ if destination is None:
+ destination = _DEFAULT_REQUEST_DESTINATION
+
self.log.debug("Sending response")
- self._aclient.send(self._stompConn, _DEFAULT_REQUEST_DESTINATION,
- message,
- {"content-type": "application/json"})
+ self._aclient.send(
+ destination,
+ message,
+ headers
+ )
+ self._reactor.wakeup()
def close(self):
self._stompConn.close()
--
To view, visit http://gerrit.ovirt.org/36368
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8e43658f1cebd637ea3abf4396d388afa041ae71
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <smizrahi(a)redhat.com>
9 years
Change in vdsm[master]: task: add the support for abortEvent
by Federico Simoncelli
Federico Simoncelli has uploaded a new change for review.
Change subject: task: add the support for abortEvent
......................................................................
task: add the support for abortEvent
Change-Id: Ib82289e28e5ad9ea142850c31ccff3366b8397dc
Signed-off-by: Federico Simoncelli <fsimonce(a)redhat.com>
---
M vdsm/storage/task.py
1 file changed, 5 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/89/33689/1
diff --git a/vdsm/storage/task.py b/vdsm/storage/task.py
index cc1d85e..97c30bf 100644
--- a/vdsm/storage/task.py
+++ b/vdsm/storage/task.py
@@ -58,6 +58,7 @@
from threadLocal import vars
from weakref import proxy
from vdsm.config import config
+from vdsm.eventfd import EventFile
import outOfProcess as oop
from logUtils import SimpleLogAdapter
@@ -488,6 +489,7 @@
self.mng = None
self._aborting = False
+ self.abortEvent = EventFile()
self._forceAbort = False
self.ref = 0
@@ -544,6 +546,7 @@
def __state_aborting(self, fromState):
if self.ref > 1:
return
+ self.abortEvent.set()
self.log.debug("_aborting: recover policy %s", self.recoveryPolicy)
if self.recoveryPolicy == TaskRecoveryType.auto:
self._updateState(State.racquiring)
@@ -564,6 +567,7 @@
def __state_raborting(self, fromState):
if self.ref == 1:
+ self.abortEvent.set()
self._updateState(State.failed)
else:
self.log.warn("State was change to 'raborting' "
@@ -1225,6 +1229,7 @@
"ignoring", self.state)
return
+ self.abortEvent.set()
self._aborting = True
self._forceAbort = force
finally:
--
To view, visit http://gerrit.ovirt.org/33689
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib82289e28e5ad9ea142850c31ccff3366b8397dc
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Federico Simoncelli <fsimonce(a)redhat.com>
9 years
Change in vdsm[master]: gluster: Added logging for libgfapi related operations.
by dnarayan@redhat.com
Darshan N has uploaded a new change for review.
Change subject: gluster: Added logging for libgfapi related operations.
......................................................................
gluster: Added logging for libgfapi related operations.
This patch adds logging for libgfapi related operations
in gfapi.py. It makes use of the glfs_set_logging() method
provided by libgfapi.
Change-Id: I1cbb7199d740b54def5e9180875757d7c13c2634
Signed-off-by: ndarshan <dnarayan(a)redhat.com>
---
M vdsm/gluster/gfapi.py
1 file changed, 14 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/80/35280/1
diff --git a/vdsm/gluster/gfapi.py b/vdsm/gluster/gfapi.py
index 2aee49b..efd2e42 100644
--- a/vdsm/gluster/gfapi.py
+++ b/vdsm/gluster/gfapi.py
@@ -23,12 +23,14 @@
import exception as ge
from . import makePublic
+from vdsm import constants
GLUSTER_VOL_PROTOCAL = 'tcp'
GLUSTER_VOL_HOST = 'localhost'
GLUSTER_VOL_PORT = 24007
GLUSTER_VOL_PATH = "/"
+LIBGFAPI_LOG_LEVAL = 7
class StatVfsStruct(ctypes.Structure):
@@ -50,6 +52,11 @@
def glfsInit(volumeId, host, port, protocol):
fs = _glfs_new(volumeId)
+
+ logfile = constants.P_VDSM_LOG + "/vdsm.log"
+ rc = _glfs_set_logging(fs, logfile, LIBGFAPI_LOG_LEVAL)
+ if rc != 0:
+ raise ge.GlfsInitException(rc=rc, err=["Glfs loging failed"])
rc = _glfs_set_volfile_server(fs,
protocol,
@@ -117,6 +124,13 @@
_glfs_new = ctypes.CFUNCTYPE(
ctypes.c_void_p, ctypes.c_char_p)(('glfs_new', _lib))
+_glfs_set_logging = ctypes.CFUNCTYPE(
+ ctypes.c_int,
+ ctypes.c_void_p,
+ ctypes.c_char_p,
+ ctypes.c_int)(('glfs_set_logging', _lib))
+
+
_glfs_set_volfile_server = ctypes.CFUNCTYPE(
ctypes.c_int,
ctypes.c_void_p,
@@ -147,7 +161,6 @@
import json
import argparse
-from vdsm import constants
from vdsm import utils
--
To view, visit http://gerrit.ovirt.org/35280
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I1cbb7199d740b54def5e9180875757d7c13c2634
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Darshan N <dnarayan(a)redhat.com>
9 years