Change in vdsm[master]: vdsm hostdev: add support for SCSI devices
by mpoledni@redhat.com
Martin Polednik has uploaded a new change for review.
Change subject: vdsm hostdev: add support for SCSI devices
......................................................................
vdsm hostdev: add support for SCSI devices
Libvirt allows passthrough of SCSI devices - this patch
exposes the functionality in vdsm
Change-Id: Ia953bcd5eda1b97235a8dd2f5f9593d8f302e5d6
Signed-off-by: Martin Polednik <mpoledni(a)redhat.com>
---
M vdsm/caps.py
M vdsm/rpc/vdsmapi-schema.json
M vdsm/virt/vm.py
3 files changed, 51 insertions(+), 3 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/58/29058/1
diff --git a/vdsm/caps.py b/vdsm/caps.py
index 8c16af6..6112763 100644
--- a/vdsm/caps.py
+++ b/vdsm/caps.py
@@ -530,7 +530,7 @@
# back that we could use to uniquely identify and initiate a device
continue
- if capability in ('pci', 'usb_device'):
+ if capability in ('pci', 'usb_device', 'scsi'):
# Libvirt only allows to attach USB device with capability 'usb',
# but the bus identifies itself as 'usb' while device as
# 'usb_device'
diff --git a/vdsm/rpc/vdsmapi-schema.json b/vdsm/rpc/vdsmapi-schema.json
index ac10144..52f20c1 100644
--- a/vdsm/rpc/vdsmapi-schema.json
+++ b/vdsm/rpc/vdsmapi-schema.json
@@ -3180,9 +3180,11 @@
#
# @usb: USB device
#
+# @scsi: SCSI device
+#
# Since: 4.16.0
##
-{'enum': 'HostDeviceCapability', 'data': ['pci', 'usb']}
+{'enum': 'HostDeviceCapability', 'data': ['pci', 'usb', 'scsi']}
##
# @HostDeviceSpecParams:
@@ -3200,11 +3202,18 @@
# @startupPolicy: #optional Possible boot handling with attached device
# (for @usb)
#
+# @readonly #optional If present, indicates that the device is read
+# only (for @scsi)
+#
+# @shareable #optional If present, this indicates the device is
+# expected to be shared between domains (for @scsi)
+#
# Since: 4.16.0
##
{'type': 'HostDeviceSpecParams',
'data': {'*bootorder': 'int', '*bar': 'bool', '*file': 'str',
- '*startupPolicy': 'StartupPolicy'}}
+ '*startupPolicy': 'StartupPolicy', '*shareable': 'bool',
+ '*readonly*': 'bool'}}
##
# @HostDevice:
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 6a6978e..3cb2eb9 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -1670,6 +1670,33 @@
addr['vendor_id'], addr['bus'], addr['device'])
return addr
+ def getScsiAddr(self):
+ capsxml = self._parsecaps()
+ addr = {}
+
+ addr['type'] = 'scsi'
+ addr['bus'] = capsxml.getElementsByTagName('bus')[0].firstChild. \
+ nodeValue
+ addr['target'] = capsxml.getElementsByTagName('target')[0]. \
+ firstChild.nodeValue
+ addr['unit'] = capsxml.getElementsByTagName('lun')[0]. \
+ firstChild.nodeValue
+
+ self.log.debug('SCSI device %s at address '
+ '{bus: %s, target: %s, unit: %s}',
+ self.name, addr['bus'], addr['target'], addr['unit'],
+ addr['device'])
+ return addr
+
+ def getScsiAdapter(self):
+ capsxml = self._parsecaps()
+
+ adapter = 'scsi_host{}'.format(
+ capsxml.getElementsByTagName('host')[0].firstChild.nodeValue)
+
+ self.log.debug('SCSI device %s adapter %s', self.name, adapter)
+ return adapter
+
def getXML(self):
"""
Create domxml for a hostdev device.
@@ -1717,6 +1744,18 @@
if 'startupPolicy' in self.specParams:
source.setAttrs(startupPolicy=self.specParams['startupPolicy'])
+ elif self.capability == 'scsi':
+ source.appendChildWithArgs('address', None,
+ **self.getScsiHost())
+ source.appendChildWithArgs('adapter', None,
+ **self.getScsiAdapter())
+
+ if 'readonly' in self.specParams:
+ hostdev.appendChild('readonly')
+
+ if 'shareable' in self.specParams:
+ hostdev.appendChild('shareable')
+
return hostdev
--
To view, visit http://gerrit.ovirt.org/29058
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia953bcd5eda1b97235a8dd2f5f9593d8f302e5d6
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Martin Polednik <mpoledni(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: VDSM: implement nodeDeviceMapper
by Martin Polednik
Martin Polednik has uploaded a new change for review.
Change subject: VDSM: implement nodeDeviceMapper
......................................................................
VDSM: implement nodeDeviceMapper
NodeDeviceMapper is structure to keep track of available node devices
and provide easy access to querying and managing them. The mapper
is needed in order to allow VDSM to meaningfully report the usage
of these devices, along with managing their availability.
Change-Id: I6f21d465a90cfe2eb16ba70943e5fcf1683f1656
Signed-off-by: Martin Polednik <mpoledni(a)redhat.com>
---
M client/vdsClient.py
M debian/vdsm.install
M vdsm.spec.in
M vdsm/API.py
M vdsm/clientIF.py
M vdsm/rpc/BindingXMLRPC.py
M vdsm/virt/Makefile.am
7 files changed, 44 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/09/30209/1
diff --git a/client/vdsClient.py b/client/vdsClient.py
index 2c09b28..89dceab 100644
--- a/client/vdsClient.py
+++ b/client/vdsClient.py
@@ -461,6 +461,14 @@
def do_getAllVmStats(self, args):
return self.ExecAndExit(self.s.getAllVmStats())
+ def do_getDevicesByCaps(self, args):
+ caps = list(args)
+ return self.ExecAndExit(self.s.getDevicesByCaps(caps))
+
+ def do_getDevicesByDomain(self, args):
+ vmId = args[0]
+ return self.ExecAndExit(self.s.getDevicesByDomain(vmId))
+
def desktopLogin(self, args):
vmId, domain, user, password = tuple(args)
response = self.s.desktopLogin(vmId, domain, user, password)
@@ -2077,6 +2085,16 @@
('',
'Get Statistics info for all existing VMs'
)),
+ 'getDevicesByCaps': (serv.do_getDevicesByCaps,
+ ('[<caps>]',
+ 'Get available node devices on the host with '
+ 'given capability'
+ )),
+ 'getDevicesByDomain': (serv.do_getDevicesByDomain,
+ ('<vmId>',
+ 'Get available node devices attached to specified '
+ 'domain'
+ )),
'getVGList': (serv.getVGList,
('storageType',
'List of all VGs.'
diff --git a/debian/vdsm.install b/debian/vdsm.install
index 3af1100..e1d8652 100644
--- a/debian/vdsm.install
+++ b/debian/vdsm.install
@@ -142,6 +142,7 @@
./usr/share/vdsm/virt/__init__.py
./usr/share/vdsm/virt/guestagent.py
./usr/share/vdsm/virt/migration.py
+./usr/share/vdsm/virt/nodedev.py
./usr/share/vdsm/virt/sampling.py
./usr/share/vdsm/virt/vm.py
./usr/share/vdsm/virt/vmchannels.py
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 16b7834..0ddacbe 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -939,6 +939,7 @@
%{_datadir}/%{vdsm_name}/virt/__init__.py*
%{_datadir}/%{vdsm_name}/virt/guestagent.py*
%{_datadir}/%{vdsm_name}/virt/migration.py*
+%{_datadir}/%{vdsm_name}/virt/nodedev.py*
%{_datadir}/%{vdsm_name}/virt/vmchannels.py*
%{_datadir}/%{vdsm_name}/virt/vmstatus.py*
%{_datadir}/%{vdsm_name}/virt/vmtune.py*
diff --git a/vdsm/API.py b/vdsm/API.py
index 6feccf6..e891b80 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -1244,6 +1244,18 @@
statsList = hooks.after_get_all_vm_stats(statsList)
return {'status': doneCode, 'statsList': statsList}
+ def getDevicesByCaps(self, caps):
+ """
+ """
+ devices = self._cif.nodeDeviceMapper.getDevicesByCaps(caps)
+ return {'status': doneCode, 'devices': devices}
+
+ def getDevicesByDomain(self, vmId):
+ """
+ """
+ devices = self._cif.nodeDeviceMapper.getDevicesByDomain(vmId)
+ return {'status': doneCode, 'devices': devices}
+
def getStats(self):
"""
Report host statistics.
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index d5372f3..118652d 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -91,6 +91,7 @@
self.gluster = None
try:
self.vmContainer = {}
+ self.nodeDeviceMapper = NodeDeviceMapper(vmContainer, log)
self._hostStats = sampling.HostStatsThread(log=log)
self._hostStats.start()
self.lastRemoteAccess = 0
diff --git a/vdsm/rpc/BindingXMLRPC.py b/vdsm/rpc/BindingXMLRPC.py
index d6663c3..a31d76c 100644
--- a/vdsm/rpc/BindingXMLRPC.py
+++ b/vdsm/rpc/BindingXMLRPC.py
@@ -482,6 +482,14 @@
api = API.Global()
return api.getAllVmStats()
+ def getDevicesByCaps(self, caps):
+ api = API.Global()
+ return api.getDevicesByCaps(caps)
+
+ def getDevicesByDomain(self, vmId):
+ api = API.Global()
+ return api.getDevicesByDomain(vmId)
+
def vmGetIoTunePolicy(self, vmId):
vm = API.VM(vmId)
return vm.getIoTunePolicy()
@@ -989,6 +997,8 @@
(self.getStats, 'getVdsStats'),
(self.vmGetStats, 'getVmStats'),
(self.getAllVmStats, 'getAllVmStats'),
+ (self.getDevicesByCaps, 'getDevicesByCaps'),
+ (self.getDevicesByDomain, 'getDevicesByDomain'),
(self.vmMigrationCreate, 'migrationCreate'),
(self.vmDesktopLogin, 'desktopLogin'),
(self.vmDesktopLogoff, 'desktopLogoff'),
diff --git a/vdsm/virt/Makefile.am b/vdsm/virt/Makefile.am
index 423839b..4b7b1cc 100644
--- a/vdsm/virt/Makefile.am
+++ b/vdsm/virt/Makefile.am
@@ -25,6 +25,7 @@
__init__.py \
guestagent.py \
migration.py \
+ nodedev.py \
sampling.py \
vm.py \
vmchannels.py \
--
To view, visit http://gerrit.ovirt.org/30209
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6f21d465a90cfe2eb16ba70943e5fcf1683f1656
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Martin Polednik <mpolednik(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: [WIP] vdsm: add support for PCI passthrough
by mpoledni@redhat.com
Martin Polednik has uploaded a new change for review.
Change subject: [WIP] vdsm: add support for PCI passthrough
......................................................................
[WIP] vdsm: add support for PCI passthrough
required functionality:
* report PCI devices available on host [x]
* handle createVm xml generation [x]
* hotplugHostdev [ ] (required for after-migration)
* hotpunlugHostdev [ ] (required for migration)
Change-Id: I363d2622d72ca2db75f60032fe0892c348bab121
Signed-off-by: Martin Polednik <mpoledni(a)redhat.com>
---
M lib/vdsm/define.py
M vdsm/caps.py
M vdsm/vm.py
3 files changed, 83 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/62/22462/1
diff --git a/lib/vdsm/define.py b/lib/vdsm/define.py
index eb78633..9605f93 100644
--- a/lib/vdsm/define.py
+++ b/lib/vdsm/define.py
@@ -132,6 +132,12 @@
'transientErr': {'status': {
'code': 59,
'message': 'Action not permitted on a VM with transient disks'}},
+ 'hotplugHostdev': {'status': {
+ 'code': 60,
+ 'message': 'Failed to hotplug hostdev'}},
+ 'hotunplugHostdev': {'status': {
+ 'code': 61,
+ 'message': 'Failed to hotunplug hostdev'}},
'recovery': {'status': {
'code': 99,
'message': 'Recovering from crash or Initializing'}},
diff --git a/vdsm/caps.py b/vdsm/caps.py
index 3839134..d6af375 100644
--- a/vdsm/caps.py
+++ b/vdsm/caps.py
@@ -308,6 +308,38 @@
return dict(release=release, version=version, name=osname)
+def hostdevList():
+ devices = []
+ for device in libvirtconnection.get().listAllDevices():
+ devXML = minidom.parseString(device.XMLDesc())
+ dev = {}
+
+ # we have to grab attributes that will most likely not only
+ # uniquely identify device, but also serve as human readable
+ # representation of the device
+ try:
+ dev['name'] = devXML.getElementsByTagName('name')[0].\
+ childNodes[0].data
+ capability = devXML.getElementsByTagName('capability')[0]
+ try:
+ dev['product'] = capability.getElementsByTagName('product')[0]\
+ .childNodes[0].data
+ dev['vendor'] = capability.getElementsByTagName('vendor')[0].\
+ childNodes[0].data
+ except IndexError:
+ # althought the retrieval of product/vendor was not successful,
+ # we can still report back the name
+ pass
+ except IndexError:
+ # should device not have a name, there is nothing engine could send
+ # back that we could use to uniquely identify and initiate a device
+ continue
+
+ devices.append(dev)
+
+ return devices
+
+
def get():
targetArch = platform.machine()
@@ -360,6 +392,7 @@
config.getint('vars', 'extra_mem_reserve'))
caps['guestOverhead'] = config.get('vars', 'guest_ram_overhead')
caps['rngSources'] = _getRngSources()
+ caps['hostDevices'] = hostdevList()
return caps
diff --git a/vdsm/vm.py b/vdsm/vm.py
index a5d923b..a477bc9 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -78,6 +78,7 @@
WATCHDOG_DEVICES = 'watchdog'
CONSOLE_DEVICES = 'console'
SMARTCARD_DEVICES = 'smartcard'
+HOSTDEV_DEVICES = 'hostdev'
def isVdsmImage(drive):
@@ -1656,6 +1657,27 @@
return m
+class HostDevice(VmDevice):
+ def getXML(self):
+ """
+ Create domxml for a hostdev device.
+
+ <devices>
+ <hostdev mode='subsystem' type='usb'>
+ <source startupPolicy='optional'>
+ <vendor id='0x1234'/>
+ <product id='0xbeef'/>
+ </source>
+ <boot order='2'/>
+ </hostdev>
+ </devices>
+ """
+ # libvirt gives us direct api call to construct the XML
+ return xml.dom.minidom.parseString(libvirtconnection.get().
+ nodeDeviceLookupByName(self.name).
+ XMLDesc())
+
+
class WatchdogDevice(VmDevice):
def __init__(self, *args, **kwargs):
super(WatchdogDevice, self).__init__(*args, **kwargs)
@@ -1769,7 +1791,8 @@
(CONSOLE_DEVICES, ConsoleDevice),
(REDIR_DEVICES, RedirDevice),
(RNG_DEVICES, RngDevice),
- (SMARTCARD_DEVICES, SmartCardDevice))
+ (SMARTCARD_DEVICES, SmartCardDevice),
+ (HOSTDEV_DEVICES, HostDevice))
def _makeDeviceDict(self):
return dict((dev, []) for dev, _ in self.DeviceMapping)
@@ -3127,6 +3150,26 @@
break
+ def hotplugHostdev(self, params):
+ hostdev = HostDevice(self.conf, self.log, **params)
+ self._devices[HOSTDEV_DEVICES].append(hostdev)
+ hostdevXML = hostdev.getXML().toprettyxml(encoding='utf-8')
+ hostdev._deviceXML = hostdevXML
+ self.log.debug("Hotplug hostdev xml: %s", hostdevXML)
+
+ try:
+ self._dom.attachDevice(hostdevXML)
+ except libvirt.libvirtError as e:
+ self.log.error("Hotplug failed", exc_info=True)
+ if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
+ return errCode['noVM']
+ return {'status': {'code':
+ errCode['hotplugHostdev']['status']['code'],
+ 'message': e.message}}
+
+ def hotunplugHostdev(self, name):
+ pass
+
def hotplugNic(self, params):
if self.isMigrating():
return errCode['migInProgress']
--
To view, visit http://gerrit.ovirt.org/22462
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I363d2622d72ca2db75f60032fe0892c348bab121
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Martin Polednik <mpoledni(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: netinfo: improve which ipv4 addr is reported if there are mu...
by asegurap@redhat.com
Antoni Segura Puimedon has uploaded a new change for review.
Change subject: netinfo: improve which ipv4 addr is reported if there are multiple primary
......................................................................
netinfo: improve which ipv4 addr is reported if there are multiple primary
The current code assumed that additional configured addresses for a
device would have the 'secondary' flag. However, this is no longer
true in recent kernels, as multiple primary addresses can be set for
a device.
The improvement is that now we will check if any of the addresses is
in the subnet of the gateway and report one of them. If there is no
default gw in the main table for the device we return the last
set primary ip and if there is a default gw but going through another
hop, we return the first set ip.
Change-Id: I8666cfef5bd8ea63edf8979e501d4785db5f4893
Signed-off-by: Antoni S. Puimedon <asegurap(a)redhat.com>
---
M lib/vdsm/netinfo.py
1 file changed, 35 insertions(+), 7 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/75/33375/1
diff --git a/lib/vdsm/netinfo.py b/lib/vdsm/netinfo.py
index 6491505..816810d 100644
--- a/lib/vdsm/netinfo.py
+++ b/lib/vdsm/netinfo.py
@@ -292,25 +292,49 @@
struct.pack("!I", int('1' * prefix + '0' * (32 - prefix), 2)))
+def prefix2int(prefix):
+ if not 0 <= prefix <= 32:
+ raise ValueError('%s is not a valid prefix value. It must be between '
+ '0 and 32')
+ return (2 ** prefix - 1) << (32 - prefix)
+
+
+def addr2int(address):
+ return struct.unpack('!I', socket.inet_aton(address))[0]
+
+
def getDefaultGateway():
output = routeShowGateways('main')
return Route.fromText(output[0]) if output else None
-def getIpInfo(dev, ipaddrs=None):
+def getIpInfo(dev, ipaddrs=None, ipv4_gateway=None):
if ipaddrs is None:
ipaddrs = _getIpAddrs()
ipv4addr = ipv4netmask = ''
ipv4addrs = []
ipv6addrs = []
+ gateway_int = addr2int(ipv4_gateway) if ipv4_gateway else None
+
for addr in ipaddrs[dev]:
if addr['family'] == 'inet':
ipv4addrs.append(addr['address'])
if 'secondary' not in addr['flags']:
- ipv4addr, prefix = addr['address'].split('/')
- ipv4netmask = prefix2netmask(addr['prefixlen'])
+ address, _ = addr['address'].split('/')
+ mask = prefix2int(addr['prefixlen'])
+ if (gateway_int is None or
+ addr2int(address) & mask == gateway_int & mask):
+ ipv4addr = address
+ ipv4netmask = prefix2netmask(addr['prefixlen'])
else:
ipv6addrs.append(addr['address'])
+ if ipv4addrs and ipv4addr == '':
+ # If we didn't find an address in the gateway subnet (which is
+ # legal if there is another route that takes us to the gateway) we
+ # choose to report the first address
+ ipv4addr, prefix = ipv4addrs[0].split('/')
+ ipv4netmask = prefix2netmask(int(prefix))
+
return ipv4addr, ipv4netmask, ipv4addrs, ipv6addrs
@@ -501,14 +525,16 @@
# comment when the version is no longer supported.
data['interface'] = iface
- ipv4addr, ipv4netmask, ipv4addrs, ipv6addrs = getIpInfo(iface, ipaddrs)
+ gateway = _get_gateway(routes, iface)
+ ipv4addr, ipv4netmask, ipv4addrs, ipv6addrs = getIpInfo(
+ iface, ipaddrs, gateway)
data.update({'iface': iface, 'bridged': bridged,
'addr': ipv4addr, 'netmask': ipv4netmask,
'bootproto4': ('dhcp' if ipv4addr and iface in dhcp4
else 'none'),
'ipv4addrs': ipv4addrs,
'ipv6addrs': ipv6addrs,
- 'gateway': _get_gateway(routes, iface),
+ 'gateway': gateway,
'ipv6gateway': _get_gateway(routes, iface, family=6),
'mtu': str(getMtu(iface))})
except (IOError, OSError) as e:
@@ -549,12 +575,14 @@
def _devinfo(link, routes, ipaddrs, dhcp4):
- ipv4addr, ipv4netmask, ipv4addrs, ipv6addrs = getIpInfo(link.name, ipaddrs)
+ gateway = _get_gateway(routes, link.name)
+ ipv4addr, ipv4netmask, ipv4addrs, ipv6addrs = getIpInfo(
+ link.name, ipaddrs, gateway)
info = {'addr': ipv4addr,
'cfg': getIfaceCfg(link.name),
'ipv4addrs': ipv4addrs,
'ipv6addrs': ipv6addrs,
- 'gateway': _get_gateway(routes, link.name),
+ 'gateway': gateway,
'ipv6gateway': _get_gateway(routes, link.name, family=6),
'bootproto4': ('dhcp' if ipv4addr and link.name in dhcp4
else 'none'),
--
To view, visit http://gerrit.ovirt.org/33375
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8666cfef5bd8ea63edf8979e501d4785db5f4893
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Antoni Segura Puimedon <asegurap(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: ipwrapper: Use zombiereaper to wait for process
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: ipwrapper: Use zombiereaper to wait for process
......................................................................
ipwrapper: Use zombiereaper to wait for process
Commit aea8aab95f eliminated ip process zombies by waiting for ip
process. This is may introduce unlimited wait if the process is not
responsive. We can avoid this issue by using zombiereaper.
Note that zombiereaper requires explicit initialization (registering
signal handler for SIGCHLD), so this make ipwrapper less useful as
generic utility.
Change-Id: I26263c5b4811d7f46f7c65d1d1b00bd96af02eb8
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/ipwrapper.py
1 file changed, 2 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/16/32216/1
diff --git a/lib/vdsm/ipwrapper.py b/lib/vdsm/ipwrapper.py
index af31159..781e95b 100644
--- a/lib/vdsm/ipwrapper.py
+++ b/lib/vdsm/ipwrapper.py
@@ -27,6 +27,7 @@
import signal
import socket
import struct
+import zombiereaper
from netaddr.core import AddrFormatError
from netaddr import IPAddress
@@ -638,7 +639,7 @@
def stop(self):
self.proc.kill()
- self.proc.wait()
+ zombiereaper.autoReapPID(self.proc.pid)
@classmethod
def _parseLine(cls, line):
--
To view, visit http://gerrit.ovirt.org/32216
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I26263c5b4811d7f46f7c65d1d1b00bd96af02eb8
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: Configure iscsi_session recovery_tmo for multipath devices
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: Configure iscsi_session recovery_tmo for multipath devices
......................................................................
Configure iscsi_session recovery_tmo for multipath devices
This configuration is done by device-mapper-multipath in version
0.4.9-76 and later. However, device looses configuration after recovery
from network errors.
This patch adds the recovery_tmo configuration to vdsm-multipath udev
rules. Since setting the timeout requires a script, the dmsetup command
was moved into a new multipath-configure-device script.
When we require a multipath version fixing this issue, we can remove the
timeout handling code.
Change-Id: Iaaa40fa5e6dc86beef501ef4aaefa17c4c1574c1
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M .gitignore
M configure.ac
M debian/vdsm.install
M vdsm.spec.in
M vdsm/storage/Makefile.am
A vdsm/storage/multipath-configure-device.in
A vdsm/storage/vdsm-multipath.rules
D vdsm/storage/vdsm-multipath.rules.in
8 files changed, 53 insertions(+), 22 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/82/32582/1
diff --git a/.gitignore b/.gitignore
index fca4f2e..5890806 100644
--- a/.gitignore
+++ b/.gitignore
@@ -61,7 +61,6 @@
vdsm/storage/protect/safelease
vdsm/storage/lvm.env
vdsm/storage/vdsm-lvm.rules
-vdsm/storage/vdsm-multipath.rules
vdsm/sudoers.vdsm
vdsm/svdsm.logger.conf
vdsm/vdscli.py
diff --git a/configure.ac b/configure.ac
index f40d57a..c467c37 100644
--- a/configure.ac
+++ b/configure.ac
@@ -335,10 +335,10 @@
vdsm/rpc/Makefile
vdsm/sos/Makefile
vdsm/storage/Makefile
+ vdsm/storage/multipath-configure-device
vdsm/storage/imageRepository/Makefile
vdsm/storage/protect/Makefile
vdsm/storage/vdsm-lvm.rules
- vdsm/storage/vdsm-multipath.rules
vdsm/virt/Makefile
vdsm_hooks/Makefile
vdsm_hooks/checkimages/Makefile
diff --git a/debian/vdsm.install b/debian/vdsm.install
index bd63c72..759383d 100644
--- a/debian/vdsm.install
+++ b/debian/vdsm.install
@@ -25,6 +25,7 @@
./usr/lib/python2.7/dist-packages/sos/plugins/vdsm.py
./usr/lib/python2.7/dist-packages/vdsmapi.py
./usr/libexec/vdsm/curl-img-wrap
+./usr/libexec/vdsm/multiapth-configure-device
./usr/libexec/vdsm/ovirt_functions.sh
./usr/libexec/vdsm/persist-vdsm-hooks
./usr/libexec/vdsm/safelease
diff --git a/vdsm.spec.in b/vdsm.spec.in
index a5b354c..c912150 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -993,6 +993,7 @@
%{_libexecdir}/%{vdsm_name}/persist-vdsm-hooks
%{_libexecdir}/%{vdsm_name}/unpersist-vdsm-hook
%{_libexecdir}/%{vdsm_name}/ovirt_functions.sh
+%{_libexecdir}/%{vdsm_name}/multipath-configure-device
%{_libexecdir}/%{vdsm_name}/vdsm-gencerts.sh
%{_libexecdir}/%{vdsm_name}/vdsmd_init_common.sh
%{_datadir}/%{vdsm_name}/network/__init__.py*
diff --git a/vdsm/storage/Makefile.am b/vdsm/storage/Makefile.am
index 99b1460..91c2f9c 100644
--- a/vdsm/storage/Makefile.am
+++ b/vdsm/storage/Makefile.am
@@ -72,7 +72,8 @@
volume.py
dist_vdsmexec_SCRIPTS = \
- curl-img-wrap
+ curl-img-wrap \
+ multipath-configure-device
nodist_vdsmstorage_DATA = \
lvm.env \
diff --git a/vdsm/storage/multipath-configure-device.in b/vdsm/storage/multipath-configure-device.in
new file mode 100644
index 0000000..dcdfea5
--- /dev/null
+++ b/vdsm/storage/multipath-configure-device.in
@@ -0,0 +1,33 @@
+#!/bin/sh
+#
+# Copyright 2014 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed to you under the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version. See the files README and
+# LICENSE_GPL_v2 which accompany this distribution.
+#
+
+set -e
+
+# Ensure that multipath devices use no_path_retry fail, instead of
+# no_path_retry queue, which is hardcoded in multipath configuration for many
+# devices.
+#
+# See https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/...
+
+@DMSETUP_PATH@ message $DM_NAME 0 fail_if_no_path
+
+# Set iscsi_session recovery_tmo. This parameter is configured by multipath on
+# startup, but after a device fail and become active again, the configuration
+# is lost and revert back to iscsid.conf default (120).
+
+timeout=5
+
+for slave in /sys${DEVPATH}/slaves/*; do
+ path=$(@UDEVADM_PATH@ info --query=path --path="$slave")
+ session=$(echo "$path" | @SED_PATH@ -rn s'|^/devices/platform/host[0-9]+/(session[0-9]+)/.+$|\1|p')
+ if [ -n "$session" ]; then
+ echo $timeout > "/sys/class/iscsi_session/${session}/recovery_tmo"
+ fi
+done
diff --git a/vdsm/storage/vdsm-multipath.rules b/vdsm/storage/vdsm-multipath.rules
new file mode 100644
index 0000000..d4d0dd7
--- /dev/null
+++ b/vdsm/storage/vdsm-multipath.rules
@@ -0,0 +1,15 @@
+#
+# Copyright 2014 Red Hat, Inc. and/or its affiliates.
+#
+# Licensed to you under the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version. See the files README and
+# LICENSE_GPL_v2 which accompany this distribution.
+#
+
+# Vdsm rules for multipath devices.
+#
+# This rule runs vdsm-mutipath-configure tool to configure multiapth devices to
+# work with vdsm.
+
+ACTION=="add|change", ENV{DM_UUID}=="mpath-?*", RUN+="/usr/libexec/vdsm/multipath-configure-device"
diff --git a/vdsm/storage/vdsm-multipath.rules.in b/vdsm/storage/vdsm-multipath.rules.in
deleted file mode 100644
index cc2a878..0000000
--- a/vdsm/storage/vdsm-multipath.rules.in
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Copyright 2014 Red Hat, Inc. and/or its affiliates.
-#
-# Licensed to you under the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version. See the files README and
-# LICENSE_GPL_v2 which accompany this distribution.
-#
-
-# Vdsm rules for multipath devices.
-#
-# Ensure that multipath devices use no_path_retry fail, instead of
-# no_path_retry queue, which is hardcoded in multipath configuration for many
-# devices.
-#
-# See https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/...
-#
-
-ACTION=="add|change", ENV{DM_UUID}=="mpath-?*", RUN+="@DMSETUP_PATH@ message $env{DM_NAME} 0 fail_if_no_path"
--
To view, visit http://gerrit.ovirt.org/32582
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaaa40fa5e6dc86beef501ef4aaefa17c4c1574c1
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: vm: Require format attribute for drives
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: vm: Require format attribute for drives
......................................................................
vm: Require format attribute for drives
We have seen sampling errors where a drive has no "format" attribute.
These errors spam the system logs, and does not help to debug the real
issue - why the required format attribute is not set?
This patch ensure that a drive cannot be created without a format
attribute, avoding log spam by sampling errors, and hopefully revealing
the real reason for this bug.
One broken test creating a drive without a format was fixed and new test
ensure that creating drive without a format raises.
Change-Id: I01ab1e071ecb76f383cc6dc7d99782e10cc90136
Relates-To: http://gerrit.ovirt.org/22551
Relates-To: https://bugzilla.redhat.com/994534
Relates-To: http://lists.ovirt.org/pipermail/users/2014-February/021007.html
Bug-Url: https://bugzilla.redhat.com/1055437
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M tests/vmTests.py
M vdsm/vm.py
2 files changed, 28 insertions(+), 3 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/34/24234/1
diff --git a/tests/vmTests.py b/tests/vmTests.py
index 1e0a3f6..724d458 100644
--- a/tests/vmTests.py
+++ b/tests/vmTests.py
@@ -449,6 +449,18 @@
redir = vm.RedirDevice(self.conf, self.log, **dev)
self.assertXML(redir.getXML(), redirXML)
+ def testDriveRequiredParameters(self):
+ # TODO: It is not clear what are the other required parameters, and it
+ # the parameters depend on the type of drive. We probbaly need bigger
+ # test here that test many combinations.
+ # Currently this test only missing "format" attribute.
+ conf = {'index': '2', 'propagateErrors': 'off', 'iface': 'ide',
+ 'name': 'hdc', 'device': 'cdrom', 'path': '/tmp/fedora.iso',
+ 'type': 'disk', 'readonly': 'True', 'shared': 'none',
+ 'serial': '54-a672-23e5b495a9ea'}
+ self.assertRaises(vm.InvalidDeviceParameters, vm.Drive, {}, self.log,
+ **conf)
+
def testDriveSharedStatus(self):
sharedConfigs = [
# Backward compatibility
@@ -470,7 +482,8 @@
'exclusive', 'shared', 'none', 'transient',
]
- driveConfig = {'index': '0', 'iface': 'virtio', 'device': 'disk'}
+ driveConfig = {'index': '0', 'iface': 'virtio', 'device': 'disk',
+ 'format': 'raw'}
for driveInput, driveOutput in zip(sharedConfigs, expectedStates):
driveInput.update(driveConfig)
diff --git a/vdsm/vm.py b/vdsm/vm.py
index aae8bd6..b7151b1 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -80,6 +80,10 @@
SMARTCARD_DEVICES = 'smartcard'
+class InvalidDeviceParameters(Exception):
+ """ Raised when creating device with invalid parameters """
+
+
def isVdsmImage(drive):
"""
Tell if drive looks like a vdsm image
@@ -1438,6 +1442,9 @@
VOLWM_CHUNK_REPLICATE_MULT = 2 # Chunk multiplier during replication
def __init__(self, conf, log, **kwargs):
+ if 'format' not in kwargs:
+ raise InvalidDeviceParameters('"format" attribute is required:'
+ ' %r' % kwargs)
if not kwargs.get('serial'):
self.serial = kwargs.get('imageID'[-20:]) or ''
VmDevice.__init__(self, conf, log, **kwargs)
@@ -3108,8 +3115,13 @@
for devType, devClass in self.DeviceMapping:
for dev in devices[devType]:
- self._devices[devType].append(devClass(self.conf, self.log,
- **dev))
+ try:
+ drive = devClass(self.conf, self.log, **dev)
+ except InvalidDeviceParameters as e:
+ self.log.error('Ignoring device with invalid parameters:'
+ ' %s', e, exc_info=True)
+ else:
+ self._devices[devType].append(drive)
# We should set this event as a last part of drives initialization
self._pathsPreparedEvent.set()
--
To view, visit http://gerrit.ovirt.org/24234
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I01ab1e071ecb76f383cc6dc7d99782e10cc90136
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: vm: Prevent multiple threads blocking on same libvirt domain
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: vm: Prevent multiple threads blocking on same libvirt domain
......................................................................
vm: Prevent multiple threads blocking on same libvirt domain
If a libvirt call get stuck because a vm is not responding, we could
have multiple threads blocked on same vm without any limit, using
precious libvirt resources that could be used to run other vms.
This patch adds a new TimedLock lock, that raise a LockTimeout if the
lock cannot be acquired after configured timeout. Using this lock, a vm
allow now only one concurrent libvirt call. If a libvirt call get stuck,
and other threads are tyring to invoke libvirt calls on the same
vm, they will wait until the current call finish, or fail with a
timeout.
This should slow down calls for single vm, since each call is invoked
only when the previous call returns. However, when using many vms, this
creates natural round-robin scheduling, giving each vm equal chance to
make progress, and limiting the load on libvirt.
Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M debian/vdsm-python.install
M lib/vdsm/Makefile.am
A lib/vdsm/locking.py
M tests/Makefile.am
A tests/lockingTests.py
M vdsm.spec.in
M vdsm/virt/vm.py
7 files changed, 172 insertions(+), 8 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/72/30772/1
diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index 9d6a99c..b80fd4f 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -8,6 +8,7 @@
./usr/lib/python2.7/dist-packages/vdsm/exception.py
./usr/lib/python2.7/dist-packages/vdsm/ipwrapper.py
./usr/lib/python2.7/dist-packages/vdsm/libvirtconnection.py
+./usr/lib/python2.7/dist-packages/vdsm/locking.py
./usr/lib/python2.7/dist-packages/vdsm/netconfpersistence.py
./usr/lib/python2.7/dist-packages/vdsm/netinfo.py
./usr/lib/python2.7/dist-packages/vdsm/netlink/__init__.py
diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am
index 8e90e6e..ffb642c 100644
--- a/lib/vdsm/Makefile.am
+++ b/lib/vdsm/Makefile.am
@@ -28,6 +28,7 @@
exception.py \
ipwrapper.py \
libvirtconnection.py \
+ locking.py \
netconfpersistence.py \
netinfo.py \
profile.py \
diff --git a/lib/vdsm/locking.py b/lib/vdsm/locking.py
new file mode 100644
index 0000000..9e92ca3
--- /dev/null
+++ b/lib/vdsm/locking.py
@@ -0,0 +1,54 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import threading
+import time
+
+
+class LockTimeout(Exception):
+ """ Timeout acquiring a TimedLock """
+
+
+class TimedLock(object):
+ """
+ A lock raising a LockTimeout if it cannot be acquired within timeout.
+ """
+
+ def __init__(self, name, timeout):
+ self._name = name
+ self._timeout = timeout
+ self._cond = threading.Condition(threading.Lock())
+ self._busy = False
+
+ def __enter__(self):
+ deadline = time.time() + self._timeout
+ with self._cond:
+ while self._busy:
+ now = time.time()
+ if now >= deadline:
+ raise LockTimeout("Timeout acquiring lock %r" % self._name)
+ self._cond.wait(deadline - now)
+ self._busy = True
+ return self
+
+ def __exit__(self, *args):
+ with self._cond:
+ self._busy = False
+ self._cond.notify()
diff --git a/tests/Makefile.am b/tests/Makefile.am
index aa4a45e..10a2727 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -44,6 +44,7 @@
jsonRpcTests.py \
ksmTests.py \
libvirtconnectionTests.py \
+ lockingTests.py \
lvmTests.py \
main.py \
miscTests.py \
diff --git a/tests/lockingTests.py b/tests/lockingTests.py
new file mode 100644
index 0000000..9ac8a05
--- /dev/null
+++ b/tests/lockingTests.py
@@ -0,0 +1,89 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import time
+import threading
+from vdsm import locking
+from testlib import VdsmTestCase
+
+
+class TimedLockTests(VdsmTestCase):
+
+ def test_free(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0)
+ with self.assertNotRaises():
+ with lock:
+ pass
+
+ def test_busy(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0)
+ with self.assertRaises(locking.LockTimeout):
+ with lock:
+ with lock:
+ pass
+
+ def test_serialize(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0.4)
+ single_thread = threading.BoundedSemaphore(1)
+ passed = [0]
+ timedout = [0]
+
+ def run():
+ try:
+ with lock:
+ with single_thread:
+ time.sleep(0.1)
+ passed[0] += 1
+ except locking.LockTimeout:
+ timedout[0] += 1
+
+ self.run_threads(3, run)
+ self.assertEquals(passed[0], 3)
+ self.assertEquals(timedout[0], 0)
+
+ def test_timeout(self):
+ lock = locking.TimedLock("xxx-yyy-zzz", 0.1)
+ single_thread = threading.BoundedSemaphore(1)
+ passed = [0]
+ timedout = [0]
+
+ def run():
+ try:
+ with lock:
+ with single_thread:
+ time.sleep(0.2)
+ passed[0] += 1
+ except locking.LockTimeout:
+ timedout[0] += 1
+
+ self.run_threads(3, run)
+ self.assertEquals(passed[0], 1)
+ self.assertEquals(timedout[0], 2)
+
+ def run_threads(self, count, target):
+ threads = []
+ for i in range(count):
+ t = threading.Thread(target=target)
+ t.daemon = True
+ threads.append(t)
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 2d371b1..c7a2b0b 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1159,6 +1159,7 @@
%{python_sitearch}/%{vdsm_name}/exception.py*
%{python_sitearch}/%{vdsm_name}/ipwrapper.py*
%{python_sitearch}/%{vdsm_name}/libvirtconnection.py*
+%{python_sitearch}/%{vdsm_name}/locking.py*
%{python_sitearch}/%{vdsm_name}/netinfo.py*
%{python_sitearch}/%{vdsm_name}/netlink/__init__.py*
%{python_sitearch}/%{vdsm_name}/netlink/addr.py*
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 25eec71..9e9a844 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -37,6 +37,7 @@
# vdsm imports
from vdsm import constants
from vdsm import libvirtconnection
+from vdsm import locking
from vdsm import netinfo
from vdsm import qemuimg
from vdsm import utils
@@ -619,12 +620,24 @@
class NotifyingVirDomain:
- # virDomain wrapper that notifies vm when a method raises an exception with
- # get_error_code() = VIR_ERR_OPERATION_TIMEOUT
+ """
+ Wrap virDomain object for limiting concurrent calls and reporting timeouts.
- def __init__(self, dom, tocb):
+ The wrapper allows only one concurrent call per vm, to prevent blocking of
+ multiple threads when underlying libvirt call get stuck. Invoking a domain
+ method will block if the domain is busy with another call. If the domain is
+ not available after timeout seconds, a timeout is reported and a
+ TimeoutError is raised.
+
+ If a domain method was invoked, and the libvirt call failed with with
+ VIR_ERR_OPERATION_TIMEOUT error code, the timeout is reported, and
+ TimeoutError is raised.
+ """
+
+ def __init__(self, dom, tocb, vmid, timeout=30):
self._dom = dom
self._cb = tocb
+ self._timedlock = locking.TimedLock(vmid, timeout)
def __getattr__(self, name):
attr = getattr(self._dom, name)
@@ -633,7 +646,8 @@
def f(*args, **kwargs):
try:
- ret = attr(*args, **kwargs)
+ with self._timedlock:
+ ret = attr(*args, **kwargs)
self._cb(False)
return ret
except libvirt.libvirtError as e:
@@ -643,6 +657,9 @@
toe.err = e.err
raise toe
raise
+ except locking.LockTimeout as e:
+ self._cb(True)
+ raise TimeoutError(e)
return f
@@ -2892,7 +2909,7 @@
if self.recovering:
self._dom = NotifyingVirDomain(
self._connection.lookupByUUIDString(self.id),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
elif 'restoreState' in self.conf:
fromSnapshot = self.conf.get('restoreFromSnapshot', False)
srcDomXML = self.conf.pop('_srcDomXML')
@@ -2912,7 +2929,7 @@
self._dom = NotifyingVirDomain(
self._connection.lookupByUUIDString(self.id),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
else:
flags = libvirt.VIR_DOMAIN_NONE
if 'launchPaused' in self.conf:
@@ -2921,7 +2938,7 @@
del self.conf['launchPaused']
self._dom = NotifyingVirDomain(
self._connection.createXML(domxml, flags),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
hooks.after_vm_start(self._dom.XMLDesc(0), self.conf)
for dev in self._customDevices():
hooks.after_device_create(dev._deviceXML, self.conf,
@@ -3690,7 +3707,7 @@
# or restart vdsm if connection to libvirt was lost
self._dom = NotifyingVirDomain(
self._connection.lookupByUUIDString(self.id),
- self._timeoutExperienced)
+ self._timeoutExperienced, self.id)
if not self._incomingMigrationFinished.isSet():
state = self._dom.state(0)
--
To view, visit http://gerrit.ovirt.org/30772
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib459697b8688ebcba987cd6b9e11815826e92990
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: sampling: Add sampling benchmark for profiling
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: sampling: Add sampling benchmark for profiling
......................................................................
sampling: Add sampling benchmark for profiling
We have trouble profiling vdsm when using 100's of vms. This patch adds
simple benchmark, simulating vdsm sampling, for evaluating the sampling
implementation.
Change-Id: I7bbb73f38fbbbdfc1fe7050ac3e30327411f8f33
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
A tests/perf/sampling_perf.py
1 file changed, 131 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/87/30987/1
diff --git a/tests/perf/sampling_perf.py b/tests/perf/sampling_perf.py
new file mode 100644
index 0000000..f656474
--- /dev/null
+++ b/tests/perf/sampling_perf.py
@@ -0,0 +1,131 @@
+# Compare sampling in multiple threads vs sampling via the scheduler and
+# executor.
+
+import pthreading
+pthreading.monkey_patch()
+
+import sys
+sys.path = ['../../lib', '../../vdsm'] + sys.path
+
+import libvirt
+import logging
+import signal
+import threading
+import time
+from xml.dom import minidom
+
+from vdsm import executor as execute
+from vdsm import profile
+from vdsm import schedule
+from vdsm import utils
+
+from virt import sampling
+
+RUN_SEC = 120
+VMS = 100
+
+connection = libvirt.openReadOnly('qemu:///system')
+
+
+def sampling_task():
+ # Do some libvirt calls and do some xml processing, simulating vdsm
+ # sampling functions.
+ sp = connection.listAllStoragePools()[0]
+ xml = sp.XMLDesc()
+ dom = minidom.parseString(xml)
+ uuid = dom.getElementsByTagName('uuid')[0]
+ return uuid.childNodes[0].data
+
+
+samplings = (
+ sampling.AdvancedStatsFunction(sampling_task, 2), # highWrite
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleVmJobs
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleBalloon
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleCpu
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleNet
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleVcpuPinning
+ sampling.AdvancedStatsFunction(sampling_task, 15), # sampleCpuTune
+ sampling.AdvancedStatsFunction(sampling_task, 60), # updateVolumes
+ sampling.AdvancedStatsFunction(sampling_task, 60), # sampleDisk
+ sampling.AdvancedStatsFunction(sampling_task, 60), # sampleDiskLatency
+)
+
+
+class SamplerThread(object):
+
+ def __init__(self, name, samplings):
+ self.iterator = sampling.delays(sampling.cycle(samplings))
+ self.thread = threading.Thread(target=self._run, name=name)
+ self.thread.daemon = True
+ self.event = threading.Event()
+ self.last_sample_time = 0
+
+ def start(self):
+ logging.debug("starting %s", self.thread.name)
+ self.thread.start()
+
+ def stop(self):
+ logging.debug("stopping %s", self.thread.name)
+ self.event.set()
+
+ @utils.traceback()
+ def _run(self):
+ logging.debug("%s started", self.thread.name)
+ while not self.event.is_set():
+ delay, task = next(self.iterator)
+ if delay > 0:
+ self.event.wait(delay)
+ if self.event.is_set():
+ break
+ self.last_sample_time = time.time()
+ task()
+ logging.debug("%s stopped", self.thread.name)
+
+
+(a)profile.profile("threads.prof", builtins=True)
+def do_threads(vms):
+ threads = []
+ try:
+ for i in range(vms):
+ t = SamplerThread('SamplerThread-%i' % i, samplings)
+ t.start()
+ threads.append(t)
+ time.sleep(RUN_SEC)
+ finally:
+ for t in threads:
+ t.stop()
+
+
+(a)profile.profile("pool.prof", builtins=True)
+def do_pool(vms):
+ scheduler = schedule.Scheduler()
+ scheduler.start()
+ executor = execute.Executor(5, 1000, scheduler)
+ executor.start()
+ samplers = []
+ try:
+ for i in range(vms):
+ s = sampling.Sampler("Sampler-%i" % i, samplings, None,
+ scheduler=scheduler, executor=executor)
+ s.start()
+ samplers.append(s)
+ time.sleep(RUN_SEC)
+ finally:
+ for s in samplers:
+ s.stop()
+ executor.stop()
+ scheduler.stop()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.DEBUG)
+ if len(sys.argv) == 1:
+ print 'Usage: pyhton sampling_perf.py threads|pool [vms]'
+ sys.exit(2)
+ name = 'do_' + sys.argv[1]
+ func = globals()[name]
+ if len(sys.argv) > 2:
+ vms = int(sys.argv[2])
+ else:
+ vms = VMS
+ func(vms)
--
To view, visit http://gerrit.ovirt.org/30987
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7bbb73f38fbbbdfc1fe7050ac3e30327411f8f33
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 1 month
Change in vdsm[master]: sampling: Scalable vm sampling
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: sampling: Scalable vm sampling
......................................................................
sampling: Scalable vm sampling
Previously each vm was running a sampling thread, which is not very
efficient in Python when running 100's of vms. This introduces the
Sampler class, managing sampling for one vm, replacing the
AdvancedStatsThread class.
Each VM keeps one Sampler object, running sampling tasks serially using
a thread pool, ensuring that each vm has only single sampling task
running at any time.
If a VM sampling task get stuck, sampling for this vm stops until the
stuck task is finished. This keep the same behavior of the current code
using one sampling thread per vm.
For simplicity, this uses the storage thread pool for running sampling
tasks. In the final version, this should use a more robust thread pool
handling stuck worker threads, see: http://gerrit.ovirt.org/29191
Threads usage:
- One scheduler thread serving all vm samplers
- 20 worker threads. The final count should be determined by testing,
ensuring that we don't overload libvirt. Configurable using new
vars:vm_sampling_threads option.
Change-Id: I5539a728f3a1b3075712331440eff1749da6c530
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/config.py.in
M tests/numaUtilsTests.py
M tests/samplingTests.py
M tests/vmApiTests.py
M tests/vmTests.py
M vdsm/vdsm
M vdsm/virt/sampling.py
M vdsm/virt/vm.py
8 files changed, 420 insertions(+), 121 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/80/29980/1
diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in
index 52e0cd9..c411e4e 100644
--- a/lib/vdsm/config.py.in
+++ b/lib/vdsm/config.py.in
@@ -190,6 +190,14 @@
('vm_sample_cpu_tune_interval', '15', None),
+ ('vm_sampling_threads', '20',
+ 'Number of vm sampling threads. Should match libvirt thread '
+ 'pool size (default 20)'),
+
+ ('vm_sampling_tasks', '500',
+ 'Number of vm sampling tasks. Should be bigger then number of '
+ 'running vms (default 500)'),
+
('trust_store_path', '@TRUSTSTORE@',
'Where the certificates and keys are situated.'),
diff --git a/tests/numaUtilsTests.py b/tests/numaUtilsTests.py
index e5dd293..93e30a9 100644
--- a/tests/numaUtilsTests.py
+++ b/tests/numaUtilsTests.py
@@ -65,7 +65,7 @@
(3, 1, 19590000000L, 2)], 15
-class FakeVmStatsThread:
+class FakeVmStats:
def __init__(self, vm):
self._vm = vm
self.sampleVcpuPinning = FakeAdvancedStatsFunction()
@@ -98,7 +98,7 @@
'memory': '1024',
'nodeIndex': 1}]}
with vmTests.FakeVM(VM_PARAMS) as fake:
- fake._vmStats = FakeVmStatsThread(fake)
+ fake._vmStats = FakeVmStats(fake)
expectedResult = {'0': [0, 1], '1': [0, 1]}
vmNumaNodeRuntimeMap = numaUtils.getVmNumaNodeRuntimeInfo(fake)
self.assertEqual(expectedResult, vmNumaNodeRuntimeMap)
diff --git a/tests/samplingTests.py b/tests/samplingTests.py
index 2ff703e..1089960 100644
--- a/tests/samplingTests.py
+++ b/tests/samplingTests.py
@@ -111,3 +111,228 @@
s1 = sampling.InterfaceSample(lo)
s1.operstate = 'x'
self.assertEquals('operstate:x', s1.connlog_diff(s0))
+
+
+class Task(object):
+
+ def __init__(self, interval, name, error=None):
+ self.interval = interval
+ self.name = name
+ self.error = error
+ self.executed = 0
+
+ def __call__(self):
+ self.executed += 1
+ if self.error:
+ raise self.error
+
+
+class SamplingIterationTests(TestCaseBase):
+
+ task1 = Task(2, "task1")
+ task2 = Task(2, "task2")
+ task3 = Task(3, "task3")
+ samplings = [task1, task2, task3]
+
+ virtual_times = [
+ # cycle 1
+ (2, task1),
+ (2, task2),
+ (3, task3),
+ (4, task1),
+ (4, task2),
+ (6, task1),
+ (6, task2),
+ (6, task3),
+ # cycle 2
+ (8, task1),
+ (8, task2),
+ (9, task3),
+ (10, task1),
+ (10, task2),
+ (12, task1),
+ (12, task2),
+ (12, task3),
+ ]
+
+ delays = [
+ (2, task1),
+ (0, task2),
+ (1, task3),
+ (1, task1),
+ (0, task2),
+ (2, task1),
+ (0, task2),
+ (0, task3),
+ ]
+
+ def test_virtual_time(self):
+ iterator = sampling.cycle(self.samplings)
+ for pair in self.virtual_times:
+ self.assertEquals(next(iterator), pair)
+
+ def test_delays(self):
+ iterator = sampling.delays(self.virtual_times)
+ for i in range(2):
+ for pair in self.delays:
+ self.assertEquals(next(iterator), pair)
+
+ def test_chain(self):
+ iterator = sampling.delays(sampling.cycle(self.samplings))
+ for i in range(3):
+ for pair in self.delays:
+ self.assertEquals(next(iterator), pair)
+
+
+class SamplerTests(TestCaseBase):
+
+ def setUp(self):
+ self.scheduler = FakeScheduler()
+ self.threadpool = FakeThreadPool()
+ self.handler = FakeHandler(True)
+ self.task1 = Task(2, "task1")
+ self.task2 = Task(3, "task2")
+ self.task3 = Task(4, "task3")
+ samplings = [self.task1, self.task2, self.task3]
+ self.sampler = sampling.Sampler("vm-id", samplings, self.handler,
+ scheduler=self.scheduler,
+ threadpool=self.threadpool)
+
+ def test_start(self):
+ self.sampler.start()
+ self.assertEquals(self.scheduler.call.delay, 2)
+ self.assertEquals(self.scheduler.call.func, self.sampler._dispatch)
+ self.assertEquals(self.threadpool.tasks, [])
+
+ def test_stop(self):
+ self.sampler.start()
+ self.sampler.stop()
+ self.assertEquals(self.scheduler.call.func, INVALID)
+ self.assertEquals(self.threadpool.tasks, [])
+
+ def test_dispatch(self):
+ self.sampler.start()
+ self.scheduler.fire()
+ self.assertEquals(self.threadpool.tasks,
+ [("vm-id_task1", self.sampler)])
+
+ def test_run_samplings(self):
+ self.sampler.start()
+
+ # time = 2
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.scheduler.call.delay, 1)
+ self.assertEquals(self.task1.executed, 1)
+
+ # time = 3
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task2.executed, 1)
+ self.assertEquals(self.scheduler.call.delay, 1)
+
+ # time = 4
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task1.executed, 2)
+ self.assertEquals(self.scheduler.call.delay, 0)
+
+ # time = 4
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task3.executed, 1)
+ self.assertEquals(self.scheduler.call.delay, 2)
+
+ # time = 6
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task1.executed, 3)
+ self.assertEquals(self.scheduler.call.delay, 0)
+
+ # time = 6
+ self.scheduler.fire()
+ self.threadpool.run()
+ self.assertEquals(self.task2.executed, 2)
+ self.assertEquals(self.scheduler.call.delay, 2)
+
+
+class SamplerHandlerTests(TestCaseBase):
+
+ def test_does_handle_error(self):
+ self.check(FakeHandler(True))
+
+ def test_does_not_handle_error(self):
+ self.check(FakeHandler(False))
+
+ def check(self, handler):
+ scheduler = FakeScheduler()
+ threadpool = FakeThreadPool()
+ error = Exception("Sampling task failed")
+ task = Task(2, "task", error=error)
+ sampler = sampling.Sampler("vm-id", [task], handler,
+ scheduler=scheduler, threadpool=threadpool)
+ sampler.start()
+ scheduler.fire()
+ threadpool.run()
+ self.assertEquals(handler.error, error)
+
+
+# Helpers
+
+
+class FakeHandler(object):
+
+ def __init__(self, result):
+ self.error = None
+ self.result = result
+
+ def handleStatsException(self, e):
+ self.error = e
+ return self.result
+
+
+class FakeScheduler(object):
+
+ def __init__(self):
+ # Schedule only single call to simplify the tests
+ self.call = None
+
+ def schedule(self, delay, func):
+ assert self.call is None
+ self.call = Call(delay, func)
+ return self.call
+
+ def fire(self):
+ call = self.call
+ self.call = None
+ call.func()
+
+
+class Call(object):
+
+ def __init__(self, delay, func):
+ self.delay = delay
+ self.func = func
+
+ def cancel(self):
+ self.func = INVALID
+
+
+def INVALID():
+ pass
+
+
+class FakeThreadPool(object):
+
+ def __init__(self):
+ self.tasks = []
+
+ def queueTask(self, id, task):
+ # Executes task on a worker thread in the real object. Does nothing
+ # here to simplify the tests.
+ self.tasks.append((id, task))
+
+ def run(self):
+ # Run next task manually to simplify the tests
+ task_id, task = self.tasks.pop()
+ task()
diff --git a/tests/vmApiTests.py b/tests/vmApiTests.py
index bc68c6d..ae2f98f 100644
--- a/tests/vmApiTests.py
+++ b/tests/vmApiTests.py
@@ -22,6 +22,7 @@
import os
import os.path
+from virt import sampling
from virt import vmexitreason
from vdsm import define
from testrunner import VdsmTestCase as TestCaseBase
@@ -56,11 +57,15 @@
@contextmanager
def ensureVmStats(vm):
- vm._initVmStats()
+ sampling.start()
try:
- yield vm
+ vm._initVmStats()
+ try:
+ yield vm
+ finally:
+ vm._vmStats.stop()
finally:
- vm._vmStats.stop()
+ sampling.stop()
class TestVmStats(TestSchemaCompliancyBase):
diff --git a/tests/vmTests.py b/tests/vmTests.py
index fda71f5..3ef04f9 100644
--- a/tests/vmTests.py
+++ b/tests/vmTests.py
@@ -1338,7 +1338,7 @@
self.assertEqual(stats['exitMessage'], msg)
-class TestVmStatsThread(TestCaseBase):
+class TestVmStats(TestCaseBase):
VM_PARAMS = {'displayPort': -1, 'displaySecurePort': -1,
'display': 'qxl', 'displayIp': '127.0.0.1',
'vmType': 'kvm', 'memSize': 1024}
@@ -1349,8 +1349,8 @@
GBPS = 10 ** 9 / 8
MAC = '52:54:00:59:F5:3F'
with FakeVM() as fake:
- mock_stats_thread = vm.VmStatsThread(fake)
- res = mock_stats_thread._getNicStats(
+ mock_stats = vm.VmStats(fake)
+ res = mock_stats._getNicStats(
name='vnettest', model='virtio', mac=MAC,
start_sample=(2 ** 64 - 15 * GBPS, 1, 2, 3, 0, 4, 5, 6),
end_sample=(0, 7, 8, 9, 5 * GBPS, 10, 11, 12),
@@ -1366,9 +1366,9 @@
# bz1073478 - main case
with FakeVM(self.VM_PARAMS, self.DEV_BALLOON) as fake:
self.assertEqual(fake._dom, None)
- mock_stats_thread = vm.VmStatsThread(fake)
+ mock_stats = vm.VmStats(fake)
res = {}
- mock_stats_thread._getBalloonStats(res)
+ mock_stats._getBalloonStats(res)
self.assertIn('balloonInfo', res)
self.assertIn('balloon_cur', res['balloonInfo'])
@@ -1376,9 +1376,9 @@
# bz1073478 - extra case
with FakeVM(self.VM_PARAMS, self.DEV_BALLOON) as fake:
fake._dom = FakeDomain()
- mock_stats_thread = vm.VmStatsThread(fake)
+ mock_stats = vm.VmStats(fake)
res = {}
- mock_stats_thread._getBalloonStats(res)
+ mock_stats._getBalloonStats(res)
self.assertIn('balloonInfo', res)
self.assertIn('balloon_cur', res['balloonInfo'])
diff --git a/vdsm/vdsm b/vdsm/vdsm
index cbc16ff..2343606 100755
--- a/vdsm/vdsm
+++ b/vdsm/vdsm
@@ -38,6 +38,8 @@
from storage.dispatcher import Dispatcher
from storage.hsm import HSM
+from virt import sampling
+
import zombiereaper
import dsaversion
@@ -71,6 +73,7 @@
profile.start()
libvirtconnection.start_event_loop()
+ sampling.start()
if config.getboolean('irs', 'irs_enable'):
try:
@@ -88,6 +91,7 @@
profile.stop()
finally:
cif.prepareForShutdown()
+ sampling.stop()
def run(pidfile=None):
diff --git a/vdsm/virt/sampling.py b/vdsm/virt/sampling.py
index 3479eee..0cd823a 100644
--- a/vdsm/virt/sampling.py
+++ b/vdsm/virt/sampling.py
@@ -26,24 +26,50 @@
Contains a reverse dictionary pointing from error string to its error code.
"""
-import threading
-import os
-import time
-import logging
import errno
import ethtool
+import itertools
+import logging
+import operator
+import os
import re
+import threading
+import time
+from vdsm.config import config
from vdsm import utils
from vdsm import netinfo
+from vdsm import schedule
from vdsm.ipwrapper import getLinks
from vdsm.constants import P_VDSM_RUN
import caps
+from storage import threadPool
_THP_STATE_PATH = '/sys/kernel/mm/transparent_hugepage/enabled'
if not os.path.exists(_THP_STATE_PATH):
_THP_STATE_PATH = '/sys/kernel/mm/redhat_transparent_hugepage/enabled'
+
+
+_scheduler = None
+_threadpool = None
+
+
+def start():
+ """ Called during application startup """
+ global _scheduler, _threadpool
+ threads = config.getint('vars', 'vm_sampling_threads')
+ tasks = config.getint('vars', 'vm_sampling_tasks')
+ _threadpool = threadPool.ThreadPool(threads, maxTasks=tasks)
+ _scheduler = schedule.Scheduler("sampling.scheduler")
+
+
+def stop():
+ """ Called during application shutdown """
+ if _threadpool:
+ _threadpool.joinAll(waitForTasks=False, waitForThreads=False)
+ if _scheduler:
+ _scheduler.cancel()
class InterfaceSample:
@@ -318,9 +344,13 @@
def interval(self):
return self._interval
+ @property
+ def name(self):
+ return self._function.__name__
+
def __repr__(self):
return "<AdvancedStatsFunction %s at 0x%x>" % (
- self._function.__name__, id(self._function.__name__))
+ self.name, id(self))
def __call__(self, *args, **kwargs):
retValue = self._function(*args, **kwargs)
@@ -347,115 +377,136 @@
return bgn_sample, end_sample, (end_time - bgn_time)
-class AdvancedStatsThread(threading.Thread):
+class Sampler(object):
"""
- A thread that runs the registered AdvancedStatsFunction objects
- for statistic and monitoring purpose.
+ Execute sampling tasks on the threadpool according to sampling schedule.
+
+ The sepcified samplings tasks are executed on threadpool one at a time, as
+ if there was a thread running them one after another in a loop.
+
+ If one of the sampling task get stuck, the rest of the sampling tasks are
+ delayed. This is important to prevent flooding of the thread pool with
+ possibly blocking sampling task, when the underlying libvirt thread is
+ stuck. When a stuck sampling tasks finish, the sampling tasks continue
+ normally.
+
+ When a sampling task is stuck, no other task can run on the blocked worker
+ thread. The threadpool is responsible for detecting and handling this.
+
+ Each vm should create one sampler, to ensure that there is only one
+ sampling task per vm excuting on the threadpool.
"""
- DEFAULT_LOG = logging.getLogger("AdvancedStatsThread")
- def __init__(self, log=DEFAULT_LOG, daemon=False):
- """
- Initialize an AdvancedStatsThread object
- """
- threading.Thread.__init__(self)
- self.daemon = daemon
+ _log = logging.getLogger("Sampler")
- self._log = log
- self._stopEvent = threading.Event()
- self._contEvent = threading.Event()
-
- self._statsTime = None
- self._statsFunctions = []
-
- def addStatsFunction(self, *args):
- """
- Register the functions listed as arguments
- """
- if self.isAlive():
- raise RuntimeError("AdvancedStatsThread is started")
-
- for statsFunction in args:
- self._statsFunctions.append(statsFunction)
+ def __init__(self, vm_id, samplings, handler, scheduler=None,
+ threadpool=None):
+ self._vm_id = vm_id
+ self._samplings = samplings
+ self._handler = handler
+ self._scheduler = scheduler or _scheduler
+ self._threadpool = threadpool or _threadpool
+ self._lock = threading.Lock()
+ self._running = False
+ self._iterator = None
+ self._task = _INVALID
+ self._call = None
+ self._last_sample_time = 0 # Not sample taken yet
def start(self):
- """
- Start the execution of the thread and exit
- """
- self._log.debug("Start statistics collection")
- threading.Thread.start(self)
+ with self._lock:
+ if self._running:
+ raise AssertionError("Sampler is running")
+ self._log.debug("Starting sampler for vm %s", self._vm_id)
+ self._running = True
+ self._iterator = delays(cycle(self._samplings))
+ self._schedule_next_task()
def stop(self):
+ with self._lock:
+ if self._running:
+ self._log.debug("Stopping sampler for vm %s", self._vm_id)
+ self._running = False
+ self._call.cancel()
+ self._call = None
+ self._task = _INVALID
+ self._iterator = None
+
+ @property
+ def last_sample_time(self):
+ return self._last_sample_time
+
+ # Task interface
+
+ def __call__(self):
+ try:
+ self._last_sample_time = time.time()
+ self._task()
+ except Exception as e:
+ if not self._handler.handleStatsException(e):
+ self._log.exception("Stats function failed: %s", self._task)
+ finally:
+ with self._lock:
+ if self._running:
+ self._schedule_next_task()
+
+ # Private
+
+ def _dispatch(self):
"""
- Stop the execution of the thread and exit
+ Called from the scheduler thread when its time to ran the current
+ sampling task.
"""
- self._log.debug("Stop statistics collection")
- self._stopEvent.set()
- self._contEvent.set()
+ with self._lock:
+ if self._running:
+ task_id = self._vm_id + '_' + self._task.name
+ self._threadpool.queueTask(task_id, self)
- def pause(self):
- """
- Pause the execution of the registered functions
- """
- self._log.debug("Pause statistics collection")
- self._contEvent.clear()
+ def _schedule_next_task(self):
+ delay, self._task = next(self._iterator)
+ self._call = self._scheduler.schedule(delay, self._dispatch)
- def cont(self):
- """
- Resume the execution of the registered functions
- """
- self._log.debug("Resume statistics collection")
- self._contEvent.set()
- def getLastSampleTime(self):
- return self._statsTime
+# Sentinel used to mark a task as invalid, allowing running sampling task
+# without holding a lock.
+def _INVALID():
+ pass
- def run(self):
- self._log.debug("Stats thread started")
- self._contEvent.set()
- while not self._stopEvent.isSet():
- try:
- self.collect()
- except:
- self._log.debug("Stats thread failed", exc_info=True)
+def delays(virtual_times):
+ """
+ Accept stream of tuples (virtual_time, task) and return stream of tuples
+ (delay, task).
+ """
+ last_virtual_time = 0
+ for virtual_time, task in virtual_times:
+ delay = virtual_time - last_virtual_time
+ last_virtual_time = virtual_time
+ yield delay, task
- self._log.debug("Stats thread finished")
- def handleStatsException(self, ex):
- """
- Handle the registered function exceptions and eventually stop the
- sampling if a fatal error occurred.
- """
- return False
+def cycle(samplings):
+ """
+ Returns endless stream of tuples (virtual_time, task)
+ """
+ samplings = group_by_interval(samplings)
+ virtual_time = 1
+ while True:
+ for interval, tasks in samplings:
+ if virtual_time % interval == 0:
+ for task in tasks:
+ yield virtual_time, task
+ virtual_time += 1
- def collect(self):
- # TODO: improve this with lcm
- _mInt = map(lambda x: x.interval, self._statsFunctions)
- maxInterval = reduce(lambda x, y: x * y, set(_mInt), 1)
- intervalAccum = 0
- while not self._stopEvent.isSet():
- self._contEvent.wait()
-
- self._statsTime = time.time()
- waitInterval = maxInterval
-
- for statsFunction in self._statsFunctions:
- thisInterval = statsFunction.interval - (
- intervalAccum % statsFunction.interval)
- waitInterval = min(waitInterval, thisInterval)
-
- if intervalAccum % statsFunction.interval == 0:
- try:
- statsFunction()
- except Exception as e:
- if not self.handleStatsException(e):
- self._log.error("Stats function failed: %s",
- statsFunction, exc_info=True)
-
- self._stopEvent.wait(waitInterval)
- intervalAccum = (intervalAccum + waitInterval) % maxInterval
+def group_by_interval(samplings):
+ """
+ Groups samplings by interval.
+ """
+ keyfn = operator.attrgetter('interval')
+ samplings = sorted(samplings, key=keyfn)
+ return [(interval, tuple(tasks))
+ for interval, tasks in itertools.groupby(samplings, keyfn)]
class HostStatsThread(threading.Thread):
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 7835e99..f1d8983 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -65,7 +65,7 @@
from . import vmstatus
from .vmtune import updateIoTuneDom
-from .sampling import AdvancedStatsFunction, AdvancedStatsThread
+from .sampling import AdvancedStatsFunction, Sampler
from .utils import isVdsmImage, XMLElement
from vmpowerdown import VmShutdown, VmReboot
@@ -167,7 +167,7 @@
pass
-class VmStatsThread(AdvancedStatsThread):
+class VmStats(object):
MBPS_TO_BPS = 10 ** 6 / 8
# CPU tune sampling window
@@ -184,7 +184,6 @@
_libvirt_metadata_supported = True
def __init__(self, vm):
- AdvancedStatsThread.__init__(self, log=vm.log, daemon=True)
self._vm = vm
self.highWrite = (
@@ -237,11 +236,18 @@
config.getint('vars', 'vm_sample_cpu_tune_interval'),
self.CPU_TUNE_SAMPLING_WINDOW))
- self.addStatsFunction(
- self.highWrite, self.updateVolumes, self.sampleCpu,
- self.sampleDisk, self.sampleDiskLatency, self.sampleNet,
- self.sampleBalloon, self.sampleVmJobs, self.sampleVcpuPinning,
- self.sampleCpuTune)
+ samplings = (self.highWrite, self.updateVolumes, self.sampleCpu,
+ self.sampleDisk, self.sampleDiskLatency, self.sampleNet,
+ self.sampleBalloon, self.sampleVmJobs,
+ self.sampleVcpuPinning, self.sampleCpuTune)
+
+ self._sampler = Sampler(self._vm.id, samplings, self)
+
+ def start(self):
+ self._sampler.start()
+
+ def stop(self):
+ self._sampler.stop()
def _highWrite(self):
if not self._vm.isDisksStatsCollectionEnabled():
@@ -311,13 +317,13 @@
metadataCpuLimit = None
try:
- if VmStatsThread._libvirt_metadata_supported:
+ if VmStats._libvirt_metadata_supported:
metadataCpuLimit = self._vm._dom.metadata(
libvirt.VIR_DOMAIN_METADATA_ELEMENT,
METADATA_VM_TUNE_URI, 0)
except libvirt.libvirtError as e:
if e.get_error_code() == libvirt.VIR_ERR_ARGUMENT_UNSUPPORTED:
- VmStatsThread._libvirt_metadata_supported = False
+ VmStats._libvirt_metadata_supported = False
self._log.error("libvirt does not support metadata")
elif (e.get_error_code()
@@ -581,7 +587,7 @@
stats = {}
try:
- stats['statsAge'] = time.time() - self.getLastSampleTime()
+ stats['statsAge'] = time.time() - self._sampler.last_sample_time
except TypeError:
self._log.debug("Stats age not available")
stats['statsAge'] = -1.0
@@ -2778,7 +2784,7 @@
WARNING: this method should only gather statistics by copying data.
Especially avoid costly and dangerous ditrect calls to the _dom
- attribute. Use the VmStatsThread instead!
+ attribute. Use the VmStats instead!
"""
if self.lastStatus == vmstatus.DOWN:
@@ -3059,7 +3065,7 @@
return domxml.toxml()
def _initVmStats(self):
- self._vmStats = VmStatsThread(self)
+ self._vmStats = VmStats(self)
self._vmStats.start()
self._guestEventTime = self._startTime
@@ -3174,7 +3180,7 @@
if drive['device'] == 'disk' and isVdsmImage(drive):
self._syncVolumeChain(drive)
- # VmStatsThread may use block devices info from libvirt.
+ # VmStats may use block devices info from libvirt.
# So, run it after you have this info
self._initVmStats()
self.guestAgent = guestagent.GuestAgent(
--
To view, visit http://gerrit.ovirt.org/29980
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I5539a728f3a1b3075712331440eff1749da6c530
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 1 month