Change in vdsm[master]: mailbox: Log traceback after fatal failures in mailbox monitors
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: mailbox: Log traceback after fatal failures in mailbox monitors
......................................................................
mailbox: Log traceback after fatal failures in mailbox monitors
Mailbox monitor threads were not logging fatal error properly. Now these
threads use our standard @traceback decorator to ensure that failures
are logged properly.
The message when stopping the spm mailbox thread normally was moved out
of the try finally block, to ensure that it shows only for normal
shutdown, and not for unexpected death of the thread, where we will have
a clear traceback.
Change-Id: I68957ca745018349cae488acfe252e902c2af3ae
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/storage/storage_mailbox.py
1 file changed, 7 insertions(+), 2 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/53/29853/1
diff --git a/vdsm/storage/storage_mailbox.py b/vdsm/storage/storage_mailbox.py
index c31018a..59d0060 100644
--- a/vdsm/storage/storage_mailbox.py
+++ b/vdsm/storage/storage_mailbox.py
@@ -36,6 +36,7 @@
from threadPool import ThreadPool
from storage_exception import InvalidParameterException
from vdsm import constants
+from vdsm import utils
__author__ = "ayalb"
__date__ = "$Mar 9, 2009 5:25:07 PM$"
@@ -425,6 +426,8 @@
MESSAGES_PER_MAILBOX,
repr(self._outgoingMail[start:end])))
+ @utils.traceback(on=log.name,
+ msg="Unhandled exception in HSM_MailMonitor thread")
def run(self):
try:
failures = 0
@@ -778,6 +781,8 @@
finally:
self._outLock.release()
+ @utils.traceback(on=log.name,
+ msg="Unhandled exception in SPM_MailMonitor thread")
def run(self, *args):
try:
while not self._stop:
@@ -788,8 +793,8 @@
self._inLock.release()
self.log.error("Error checking for mail", exc_info=True)
time.sleep(self._monitorInterval)
+ self.log.info("SPM_MailMonitor - Incoming mail monitoring thread "
+ "stopped")
finally:
self._stopped = True
self.tp.joinAll(waitForTasks=False)
- self.log.info("SPM_MailMonitor - Incoming mail monitoring thread "
- "stopped")
--
To view, visit http://gerrit.ovirt.org/29853
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I68957ca745018349cae488acfe252e902c2af3ae
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
9 years, 4 months
Change in vdsm[master]: gluster: geo-replication pause and resume session
by dnarayan@redhat.com
Darshan N has uploaded a new change for review.
Change subject: gluster: geo-replication pause and resume session
......................................................................
gluster: geo-replication pause and resume session
This patch adds two new verbs to pause and resume
geo-replication session between master and slave
volume. It returns a boolean output.
Change-Id: I024bcee148bab1e713e1bc5c73d288613c466656
Signed-off-by: ndarshan <dnarayan(a)redhat.com>
---
M client/vdsClientGluster.py
M vdsm/gluster/api.py
M vdsm/gluster/cli.py
M vdsm/gluster/exception.py
M vdsm/gluster/vdsmapi-gluster-schema.json
5 files changed, 157 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/69/31069/1
diff --git a/client/vdsClientGluster.py b/client/vdsClientGluster.py
index 421c853..bd7e107 100644
--- a/client/vdsClientGluster.py
+++ b/client/vdsClientGluster.py
@@ -430,6 +430,34 @@
pp.pprint(status)
return status['status']['code'], status['status']['message']
+ def do_glusterVolumeGeoRepPause(self, args):
+ params = self._eqSplit(args)
+ volumeName = params.get('volumeName', '')
+ remoteHost = params.get('remoteHost', '')
+ remoteVolumeName = params.get('remoteVolumeName', '')
+ force = (params.get('force', 'no').upper() == 'YES')
+
+ status = self.s.glusterVolumeGeoRepPause(volumeName,
+ remoteHost,
+ remoteVolumeName,
+ force)
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeGeoRepResume(self, args):
+ params = self._eqSplit(args)
+ volumeName = params.get('volumeName', '')
+ remoteHost = params.get('remoteHost', '')
+ remoteVolumeName = params.get('remoteVolumeName', '')
+ force = (params.get('force', 'no').upper() == 'YES')
+
+ status = self.s.glusterVolumeGeoRepResume(volumeName,
+ remoteHost,
+ remoteVolumeName,
+ force)
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
def getGlusterCmdDict(serv):
return \
@@ -731,4 +759,26 @@
('volumeName=<volume name>',
'Returns total, free and used space(bytes) of gluster volume'
)),
+ 'glusterVolumeGeoRepPause': (
+ serv.do_glusterVolumeGeoRepPause,
+ ('volumeName=<master_volume_name> '
+ 'remoteHost=<slave_host_name> '
+ 'remoteVolumeName=<slave_volume_name> '
+ '[force={yes|no}]\n\t'
+ '<master_volume_name>existing volume name in the master node\n\t'
+ '<slave_host_name>is remote slave host name or ip\n\t'
+ '<slave_volume_name>existing volume name in the slave node',
+ 'Pause the geo-replication session'
+ )),
+ 'glusterVolumeGeoRepResume': (
+ serv.do_glusterVolumeGeoRepResume,
+ ('volumeName=<master_volume_name> '
+ 'remoteHost=<slave_host_name> '
+ 'remoteVolumeName=<slave_volume_name> '
+ '[force={yes|no}]\n\t'
+ '<master_volume_name>existing volume name in the master node\n\t'
+ '<slave_host_name>is remote slave host name or ip\n\t'
+ '<slave_volume_name>existing volume name in the slave node',
+ 'Resume the geo-replication session'
+ )),
}
diff --git a/vdsm/gluster/api.py b/vdsm/gluster/api.py
index 0e6c850..abe8ba0 100644
--- a/vdsm/gluster/api.py
+++ b/vdsm/gluster/api.py
@@ -322,6 +322,22 @@
data = self.svdsmProxy.glusterVolumeStatvfs(volumeName)
return self._computeVolumeStats(data)
+ @exportAsVerb
+ def volumeGeoRepPause(self, volumeName, remoteHost, remoteVolumeName,
+ force=False, options=None):
+ self.svdsmProxy.glusterVolumeGeoRepPause(volumeName,
+ remoteHost,
+ remoteVolumeName,
+ force)
+
+ @exportAsVerb
+ def volumeGeoRepResume(self, volumeName, remoteHost, remoteVolumeName,
+ force=False, options=None):
+ self.svdsmProxy.glusterVolumeGeoRepResume(volumeName,
+ remoteHost,
+ remoteVolumeName,
+ force)
+
def getGlusterMethods(gluster):
l = []
diff --git a/vdsm/gluster/cli.py b/vdsm/gluster/cli.py
index 2e1c9a9..7993b18 100644
--- a/vdsm/gluster/cli.py
+++ b/vdsm/gluster/cli.py
@@ -1054,3 +1054,33 @@
return _parseVolumeTasks(xmltree)
except _etreeExceptions:
raise ge.GlusterXmlErrorException(err=[etree.tostring(xmltree)])
+
+
+@makePublic
+def volumeGeoRepPause(volumeName, remoteHost, remoteVolumeName, force=False):
+ command = _getGlusterVolCmd() + ["geo-replication", volumeName,
+ "%s::%s" % (remoteHost, remoteVolumeName),
+ "pause"]
+ if force:
+ command.append('force')
+ try:
+ _execGlusterXml(command)
+ return True
+ except ge.GlusterCmdFailedException as e:
+ raise ge.GlusterVolumeGeoRepPauseFailedException(rc=e.rc,
+ err=e.err)
+
+
+@makePublic
+def volumeGeoRepResume(volumeName, remoteHost, remoteVolumeName, force=False):
+ command = _getGlusterVolCmd() + ["geo-replication", volumeName,
+ "%s::%s" % (remoteHost, remoteVolumeName),
+ "resume"]
+ if force:
+ command.append('force')
+ try:
+ _execGlusterXml(command)
+ return True
+ except ge.GlusterCmdFailedException as e:
+ raise ge.GlusterVolumeGeoRepResumeFailedException(rc=e.rc,
+ err=e.err)
diff --git a/vdsm/gluster/exception.py b/vdsm/gluster/exception.py
index 0205cb1..b7914c0 100644
--- a/vdsm/gluster/exception.py
+++ b/vdsm/gluster/exception.py
@@ -506,3 +506,19 @@
class GlfsFiniException(GlusterLibgfapiException):
code = 4573
message = "glfs fini failed"
+
+
+#geo-replication
+class GlusterGeoRepException(GlusterException):
+ code = 4560
+ message = "Gluster Geo-Replication Exception"
+
+
+class GlusterVolumeGeoRepPauseFailedException(GlusterVolumeException):
+ code = 4575
+ message = "Volume geo-replication pause failed"
+
+
+class GlusterVolumeGeoRepResumeFailedException(GlusterVolumeException):
+ code = 4576
+ message = "Volume geo-replication resume failed"
diff --git a/vdsm/gluster/vdsmapi-gluster-schema.json b/vdsm/gluster/vdsmapi-gluster-schema.json
index 4ddd182..47003e5 100644
--- a/vdsm/gluster/vdsmapi-gluster-schema.json
+++ b/vdsm/gluster/vdsmapi-gluster-schema.json
@@ -1238,3 +1238,48 @@
{'command': {'class': 'GlusterVolume', 'name': 'statsInfoGet'},
'data': {'volumeName': 'str'},
'returns': 'GlusterVolumeStatsInfo'}
+
+
+##
+# @GlusterVolume.geoRepSessionPause:
+#
+# Pauses the Geo Replication session
+#
+# @volName: Is an existing volume name in the master node
+#
+# @remoteHost: Is remote slave host name or ip
+#
+# @remoteVolumeName: Is an available existing volume name in the slave node
+#
+# @force: For pausing a geo-replication session forcefully
+#
+# Returns:
+# True if session is successfully Paused
+#
+# Since: 4.16.0
+##
+{'command': {'class': 'GlusterVolume', 'name': 'geoRepSessionStart'},
+ 'data': {'volName': 'str', 'remoteHost': 'str', 'remoteVolumeName': 'str', 'force': 'bool'},
+ 'returns': 'bool'}
+
+##
+# @GlusterVolume.geoRepSessionResume:
+#
+# Resumes the Geo Replication session
+#
+# @volName: Is an existing volume name in the master node
+#
+# @remoteHost: Is remote slave host name or ip
+#
+# @remoteVolumeName: Is an available existing volume name in the slave node
+#
+# @force: For resuming a georeplication session forcefully
+#
+# Returns:
+# True if session is successfully resumed
+#
+# Since: 4.16.0
+##
+{'command': {'class': 'GlusterVolume', 'name': 'geoRepSessionStop'},
+ 'data': {'volName': 'str', 'remoteHost': 'str', 'remoteVolumeName': 'str', 'force': 'bool'},
+ 'returns': 'bool'}
--
To view, visit http://gerrit.ovirt.org/31069
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I024bcee148bab1e713e1bc5c73d288613c466656
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Darshan N <dnarayan(a)redhat.com>
9 years, 4 months
Change in vdsm[master]: vdsm-gluster: Added gluster volume geo-replication start verb
by tjeyasin@redhat.com
Hello Ayal Baron, Bala.FA, Saggi Mizrahi, Dan Kenigsberg,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/17766
to review the following change.
Change subject: vdsm-gluster: Added gluster volume geo-replication start verb
......................................................................
vdsm-gluster: Added gluster volume geo-replication start verb
Start the geo-replication session between the hosts.
Start distributed geo-replication on all the nodes that are a part
of the master-volume. Even if any node, that is a part of the
master-volume is down, the command will still be successful.
Change-Id: I3cf03c748cf9fe28efe7d407727cd52da20701c5
Signed-off-by: Timothy Asir <tjeyasin(a)redhat.com>
---
M client/vdsClientGluster.py
M vdsm/gluster/api.py
M vdsm/gluster/cli.py
M vdsm/gluster/exception.py
4 files changed, 100 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/66/17766/1
diff --git a/client/vdsClientGluster.py b/client/vdsClientGluster.py
index 90af83e..feb6387 100644
--- a/client/vdsClientGluster.py
+++ b/client/vdsClientGluster.py
@@ -424,6 +424,34 @@
pp.pprint(status)
return status['status']['code'], status['status']['message']
+ def do_glusterVolumeGeoRepStart(self, args):
+ params = self._eqSplit(args)
+ masterVolName = params.get('masterVolName', '')
+ slaveHost = params.get('slaveHost', '')
+ slaveVolName = params.get('slaveVolName', '')
+ if not(masterVolName and slaveHost and slaveVolName):
+ raise ValueError
+
+ status = self.s.glusterVolumeGeoRepStart(masterVolName,
+ slaveHost,
+ slaveVolName)
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeGeoRepStop(self, args):
+ params = self._eqSplit(args)
+ masterVolName = params.get('masterVolName', '')
+ slaveHost = params.get('slaveHost', '')
+ slaveVolName = params.get('slaveVolName', '')
+ if not(masterVolName and slaveHost and slaveVolName):
+ raise ValueError
+
+ status = self.s.glusterVolumeGeoRepStop(masterVolName,
+ slaveHost,
+ slaveVolName)
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
def getGlusterCmdDict(serv):
return \
@@ -705,4 +733,26 @@
'not set'
'(swift, glusterd, smb, memcached)'
)),
+ 'glusterVolumeGeoRepStart': (
+ serv.do_glusterVolumeGeoRepStart,
+ ('masterVolName=<master_volume_name> slaveHost=<slave_host> '
+ 'slaveVolName=<slave_volume_name>\n\t'
+ '<master_volume_name> is an existing volume name in the '
+ 'master node\n\t'
+ '<slave_host> is slave host name\n\t'
+ '<slave_volume_name> is an existing volume name in the '
+ 'slave node',
+ 'start volume geo-replication'
+ )),
+ 'glusterVolumeGeoRepStop': (
+ serv.do_glusterVolumeGeoRepStop,
+ ('masterVolName=<master_volume_name> slaveHost=<slave_host> '
+ 'slaveVolName=<slave_volume_name>\n\t'
+ '<master_volume_name> is an existing volume name in the '
+ 'master node\n\t'
+ '<slave_host> is slave host name\n\t'
+ '<slave_volume_name> is an existing volume name in the '
+ 'slave node',
+ 'stop volume geo-replication'
+ )),
}
diff --git a/vdsm/gluster/api.py b/vdsm/gluster/api.py
index 4bd8308..ed9f5ae 100644
--- a/vdsm/gluster/api.py
+++ b/vdsm/gluster/api.py
@@ -287,6 +287,20 @@
status = self.svdsmProxy.glusterServicesGet(serviceNames)
return {'services': status}
+ @exportAsVerb
+ def volumeGeoRepStart(self, masterVolName, slaveHost, slaveVolName,
+ options=None):
+ self.svdsmProxy.glusterVolumeGeoRepStart(masterVolName,
+ slaveHost,
+ slaveVolName)
+
+ @exportAsVerb
+ def volumeGeoRepStop(self, masterVolName, slaveHost, slaveVolName,
+ options=None):
+ self.svdsmProxy.glusterVolumeGeoRepStop(masterVolName,
+ slaveHost,
+ slaveVolName)
+
def getGlusterMethods(gluster):
l = []
diff --git a/vdsm/gluster/cli.py b/vdsm/gluster/cli.py
index bac6d1c..e4d6615 100644
--- a/vdsm/gluster/cli.py
+++ b/vdsm/gluster/cli.py
@@ -897,3 +897,29 @@
return _parseVolumeProfileInfo(xmltree, nfs)
except _etreeExceptions:
raise ge.GlusterXmlErrorException(err=[etree.tostring(xmltree)])
+
+
+@makePublic
+def volumeGeoRepStart(masterVolName, slaveHost, slaveVolName):
+ command = _getGlusterVolCmd() + ["geo-replication", masterVolName,
+ "%s::%s" % (slaveHost, slaveVolName),
+ "start"]
+ try:
+ _execGlusterXml(command)
+ return True
+ except ge.GlusterCmdFailedException as e:
+ raise ge.GlusterVolumeGeoRepStartFailedException(rc=e.rc,
+ err=e.err)
+
+
+@makePublic
+def volumeGeoRepStop(masterVolName, slaveHost, slaveVolName):
+ command = _getGlusterVolCmd() + ["geo-replication", masterVolName,
+ "%s::%s" % (slaveHost, slaveVolName),
+ "stop"]
+ try:
+ _execGlusterXml(command)
+ return True
+ except ge.GlusterCmdFailedException as e:
+ raise ge.GlusterVolumeGeoRepStopFailedException(rc=e.rc,
+ err=e.err)
diff --git a/vdsm/gluster/exception.py b/vdsm/gluster/exception.py
index c569a9e..259df32 100644
--- a/vdsm/gluster/exception.py
+++ b/vdsm/gluster/exception.py
@@ -484,3 +484,13 @@
prefix = "%s: " % (action)
self.message = prefix + "Service action is not supported"
self.err = [self.message]
+
+
+class GlusterVolumeGeoRepStartFailedException(GlusterVolumeException):
+ code = 4164
+ message = "Volume geo-replication start failed"
+
+
+class GlusterVolumeGeoRepStopFailedException(GlusterVolumeException):
+ code = 4165
+ message = "Volume geo-replication stop failed"
--
To view, visit http://gerrit.ovirt.org/17766
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3cf03c748cf9fe28efe7d407727cd52da20701c5
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Timothy Asir <tjeyasin(a)redhat.com>
Gerrit-Reviewer: Ayal Baron <abaron(a)redhat.com>
Gerrit-Reviewer: Bala.FA <barumuga(a)redhat.com>
Gerrit-Reviewer: Dan Kenigsberg <danken(a)redhat.com>
Gerrit-Reviewer: Saggi Mizrahi <smizrahi(a)redhat.com>
9 years, 4 months
Change in vdsm[master]: networkTests: Extend a test to also consider DHCPv6
by osvoboda@redhat.com
Ondřej Svoboda has uploaded a new change for review.
Change subject: networkTests: Extend a test to also consider DHCPv6
......................................................................
networkTests: Extend a test to also consider DHCPv6
Change-Id: Ic5d821edd54681a7a8c1013a90af61ae835baa39
Signed-off-by: Ondřej Svoboda <osvoboda(a)redhat.com>
---
M tests/functional/dummy.py
M tests/functional/firewall.py
M tests/functional/networkTests.py
3 files changed, 32 insertions(+), 16 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/32/30532/1
diff --git a/tests/functional/dummy.py b/tests/functional/dummy.py
index aa56c71..b94c77d 100644
--- a/tests/functional/dummy.py
+++ b/tests/functional/dummy.py
@@ -54,9 +54,9 @@
(dummyName, e))
-def setIP(dummyName, ipaddr, netmask):
+def setIP(dummyName, ipaddr, netmask, family=4):
try:
- addrAdd(dummyName, ipaddr, netmask)
+ addrAdd(dummyName, ipaddr, netmask, family)
except IPRoute2Error:
raise SkipTest('Failed to set device ip')
diff --git a/tests/functional/firewall.py b/tests/functional/firewall.py
index ee7bb9f..9940a75 100644
--- a/tests/functional/firewall.py
+++ b/tests/functional/firewall.py
@@ -43,7 +43,10 @@
if _serviceRunning('iptables'):
_execCmdChecker([_IPTABLES_BINARY.cmd, '-I', 'INPUT', '-i',
veth, '-p', 'udp', '--sport', '68', '--dport',
- '67', '-j', 'ACCEPT'])
+ '67', '-j', 'ACCEPT']) # DHCPv4
+ _execCmdChecker([_IPTABLES_BINARY.cmd, '-I', 'INPUT', '-i',
+ veth, '-p', 'udp', '--sport', '546', '--dport',
+ '547', '-j', 'ACCEPT']) # DHCPv6
elif _serviceRunning('firewalld'):
_execCmdChecker([_FIREWALLD_BINARY.cmd, '--zone=trusted',
'--change-interface=' + veth])
@@ -68,7 +71,10 @@
if _serviceRunning('iptables'):
_execCmdChecker([_IPTABLES_BINARY.cmd, '-D', 'INPUT', '-i',
veth, '-p', 'udp', '--sport', '68', '--dport',
- '67', '-j', 'ACCEPT'])
+ '67', '-j', 'ACCEPT']) # DHCPv4
+ _execCmdChecker([_IPTABLES_BINARY.cmd, '-D', 'INPUT', '-i',
+ veth, '-p', 'udp', '--sport', '546', '--dport',
+ '547', '-j', 'ACCEPT']) # DHCPv6
elif _serviceRunning('firewalld'):
_execCmdChecker([_FIREWALLD_BINARY.cmd, '--zone=trusted',
'--remove-interface=' + veth])
diff --git a/tests/functional/networkTests.py b/tests/functional/networkTests.py
index f87f49b..0033c19 100644
--- a/tests/functional/networkTests.py
+++ b/tests/functional/networkTests.py
@@ -64,8 +64,11 @@
DHCP_RANGE_TO = '240.0.0.100'
CUSTOM_PROPS = {'linux': 'rules', 'vdsm': 'as well'}
-IPv6_ADDRESS = 'fdb3:84e5:4ff4:55e3::1/64'
+IPv6_ADDRESS = 'fdb3:84e5:4ff4:55e3::1'
+IPv6_CIDR = '64'
+IPv6_ADDRESS_AND_CIDR = IPv6_ADDRESS + '/' + IPv6_CIDR
IPv6_GATEWAY = 'fdb3:84e5:4ff4:55e3::ff'
+DHCPv6_ADDRESS = 'fdb3:84e5:4ff4:55e3::2'
dummyPool = set()
DUMMY_POOL_SIZE = 5
@@ -90,11 +93,16 @@
@contextmanager
-def dnsmasqDhcp(interface):
+def dnsmasqDhcp(interface, family=4):
"""Manages the life cycle of dnsmasq as a DHCP server."""
dhcpServer = dhcp.Dnsmasq()
+ if family == 4:
+ rangeFrom, rangeTo = DHCP_RANGE_FROM, DHCP_RANGE_TO
+ else:
+ rangeFrom, rangeTo = DHCPv6_ADDRESS, DHCPv6_ADDRESS
+
try:
- dhcpServer.start(interface, DHCP_RANGE_FROM, DHCP_RANGE_TO)
+ dhcpServer.start(interface, rangeFrom, rangeTo)
except dhcp.DhcpError as e:
raise SkipTest(e)
@@ -1769,11 +1777,11 @@
nic, = nics
networks = {
NETWORK_NAME + '1':
- {'nic': nic, 'bootproto': 'none', 'ipv6addr': IPv6_ADDRESS,
- 'ipv6gateway': IPv6_GATEWAY},
+ {'nic': nic, 'bootproto': 'none', 'ipv6gateway': IPv6_GATEWAY,
+ 'ipv6addr': IPv6_ADDRESS_AND_CIDR},
NETWORK_NAME + '2':
- {'nic': nic, 'bootproto': 'none', 'ipv6addr': IPv6_ADDRESS,
- 'ipv6gateway': IPv6_GATEWAY, 'ipaddr': IP_ADDRESS,
+ {'nic': nic, 'bootproto': 'none', 'ipv6gateway': IPv6_GATEWAY,
+ 'ipv6addr': IPv6_ADDRESS_AND_CIDR, 'ipaddr': IP_ADDRESS,
'gateway': IP_GATEWAY,
'netmask': prefix2netmask(int(IP_CIDR))}}
for network, netdict in networks.iteritems():
@@ -1783,7 +1791,7 @@
self.assertEqual(status, SUCCESS, msg)
self.assertNetworkExists(network)
self.assertIn(
- IPv6_ADDRESS,
+ IPv6_ADDRESS_AND_CIDR,
self.vdsm_net.netinfo.networks[network]['ipv6addrs'])
self.assertEqual(
IPv6_GATEWAY,
@@ -1836,14 +1844,15 @@
NOCHK)
@permutations([[True], [False]])
+ @permutations([[(4, IP_ADDRESS, IP_CIDR)], [(6, IPv6_ADDRESS, IPv6_CIDR)]])
@cleanupNet
@RequireVethMod
@ValidateRunningAsRoot
- def testSetupNetworksAddDelDhcp(self, bridged):
+ def testSetupNetworksAddDelDhcp(self, bridged, (family, addr, cidr)):
with vethIf() as (left, right):
- veth.setIP(left, IP_ADDRESS, IP_CIDR)
+ veth.setIP(left, addr, cidr)
veth.setLinkUp(left)
- with dnsmasqDhcp(left):
+ with dnsmasqDhcp(left, family):
network = {NETWORK_NAME: {'nic': right, 'bridged': bridged,
'bootproto': 'dhcp',
'blockingdhcp': True}}
@@ -1853,7 +1862,8 @@
self.assertNetworkExists(NETWORK_NAME)
net = self.vdsm_net.netinfo.networks[NETWORK_NAME]
- self.assertEqual(net['bootproto4'], 'dhcp')
+ if family == 4:
+ self.assertEqual(net['bootproto4'], 'dhcp')
if bridged:
self.assertEqual(net['cfg']['BOOTPROTO'], 'dhcp')
--
To view, visit http://gerrit.ovirt.org/30532
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic5d821edd54681a7a8c1013a90af61ae835baa39
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Ondřej Svoboda <osvoboda(a)redhat.com>
9 years, 5 months
Change in vdsm[master]: Added support for reporting the timezone and os info
by Vinzenz Feenstra
Vinzenz Feenstra has uploaded a new change for review.
Change subject: Added support for reporting the timezone and os info
......................................................................
Added support for reporting the timezone and os info
The guest agent reports now the timezone and more detailed os information.
This patch introduces the support for those new messages.
Change-Id: Iadb85a5003fcb4693b5a1174655acf59193340b5
Guest-Agent-API-Version: 2
Bug-Url: https://bugzilla.redhat.com/869296
Signed-off-by: Vinzenz Feenstra <vfeenstr(a)redhat.com>
---
M vdsm/rpc/vdsmapi-schema.json
M vdsm/virt/guestagent.py
2 files changed, 50 insertions(+), 2 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/40/28940/1
diff --git a/vdsm/rpc/vdsmapi-schema.json b/vdsm/rpc/vdsmapi-schema.json
index 2ccf963..869b78c 100644
--- a/vdsm/rpc/vdsmapi-schema.json
+++ b/vdsm/rpc/vdsmapi-schema.json
@@ -3456,6 +3456,12 @@
#
# @clientIp: The IP address of the client connected to the display
#
+# @guestOsInfo: #optional Guest OS information like architecture,
+# type, and version (Since 4.15.0)
+#
+# @guestTimezone: #optional Timezone and time offset of the guest OS
+# (Since 4.15.0)
+#
# Since: 4.14.1
##
{'type': 'VMFullInfo',
@@ -3470,7 +3476,45 @@
'guestFQDN': 'str', 'displayIp': 'str', 'keyboardLayout': 'str',
'displayPort': 'uint', 'guestIPs': 'str', 'smartcardEnable': 'bool',
'nicModel': 'VmInterfaceDeviceModel', 'pitReinjection': 'bool',
- 'status': 'str', 'clientIp': 'str'}}
+ 'status': 'str', 'clientIp': 'str', '*guestOsInfo': 'VMGuestOsInfo',
+ '*guestTimezone': 'VMGuestTimezone'}}
+
+##
+# @VMGuestTimezone:
+#
+# Timezone configuration of the guest operating system
+#
+# @zone: Name of the timezone native to the Guest OS
+#
+# @offset: Time offset of the configured timezone in minutes
+#
+# Since: 4.15.0
+##
+{'type': 'VMGuestTimezone',
+ 'data': {'zone': 'string', 'offset': 'uint'}}
+
+##
+# @VMGuestOsInfo:
+#
+# Information about the guest operating system on a VM
+#
+# @version: Operating system version
+#
+# @distribution: Name of the linux distribution
+#
+# @codename: Codename of the OS
+#
+# @arch: Architecture of the OS (x86, x86_64, ppc64, ...)
+#
+# @type: Type of the OS (linux, windows, ...)
+#
+# @kernel: Kernel version reported by a linux guest
+#
+# Since: 4.15.0
+##
+{'type': 'VMGuestOsInfo',
+ 'data': {'version': 'string', 'distribution': 'string', 'codename': 'string',
+ 'arch': 'string', 'type': 'string', 'kernel': 'string'}}
##
# @Host.getVMFullList:
diff --git a/vdsm/virt/guestagent.py b/vdsm/virt/guestagent.py
index 31c0a26..8ff262f 100644
--- a/vdsm/virt/guestagent.py
+++ b/vdsm/virt/guestagent.py
@@ -30,7 +30,7 @@
from . import vmstatus
-_MAX_SUPPORTED_API_VERSION = 1
+_MAX_SUPPORTED_API_VERSION = 2
_IMPLICIT_API_VERSION_ZERO = 0
_MESSAGE_API_VERSION_LOOKUP = {
@@ -255,6 +255,10 @@
self.guestInfo['guestName'] = args['name']
elif message == 'os-version':
self.guestInfo['guestOs'] = args['version']
+ elif message == 'os-info':
+ self.guestInfo['guestOsInfo'] = args
+ elif message == 'timezone':
+ self.guestInfo['guestTimezone'] = args
elif message == 'network-interfaces':
interfaces = []
old_ips = ''
--
To view, visit http://gerrit.ovirt.org/28940
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iadb85a5003fcb4693b5a1174655acf59193340b5
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Vinzenz Feenstra <vfeenstr(a)redhat.com>
9 years, 5 months
Change in vdsm[master]: vm: Automatically add a serial port for a console device
by Vinzenz Feenstra
Vinzenz Feenstra has uploaded a new change for review.
Change subject: vm: Automatically add a serial port for a console device
......................................................................
vm: Automatically add a serial port for a console device
Without a serial device the console support does not work for
linux systems. We switch from virtio to serial and append a serial
port if a console device was defined.
Change-Id: Ifa7b02a7bcaad63017c35c811a194fa42e2b694f
Signed-off-by: Vinzenz Feenstra <vfeenstr(a)redhat.com>
---
M vdsm/vm.py
1 file changed, 14 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/79/25979/1
diff --git a/vdsm/vm.py b/vdsm/vm.py
index 9cb0e82..7ce7c59 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -1171,6 +1171,18 @@
commandLine.appendChildWithArgs('qemu:arg', value='keyboard')
self.dom.appendChild(commandLine)
+ def appendSerial(self):
+ """
+ Add a serial port for the console device if it exists
+ <serial type='pty'>
+ <target port='0'>
+ </serial>
+ """
+ if len(self._devices.getElementsByTagName('console')) == 1:
+ s = XMLElement('serial', type='pty')
+ s.appendChildWithArgs('target', port='0')
+ self._devices.appendChild(s)
+
def appendGraphics(self):
"""
Add graphics section to domain xml.
@@ -1888,7 +1900,7 @@
</console>
"""
m = self.createXmlElem('console', 'pty')
- m.appendChildWithArgs('target', type='virtio', port='0')
+ m.appendChildWithArgs('target', type='serial', port='0')
return m
@@ -3025,6 +3037,7 @@
domxml.appendEmulator()
self._appendDevices(domxml)
+ domxml.appendSerial()
for drive in self._devices[DISK_DEVICES][:]:
for leaseElement in drive.getLeasesXML():
--
To view, visit http://gerrit.ovirt.org/25979
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifa7b02a7bcaad63017c35c811a194fa42e2b694f
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Vinzenz Feenstra <vfeenstr(a)redhat.com>
9 years, 5 months
Change in vdsm[master]: net_tc: add support for mirroring all (not just IPv4) traffic
by asegurap@redhat.com
Antoni Segura Puimedon has uploaded a new change for review.
Change subject: net_tc: add support for mirroring all (not just IPv4) traffic
......................................................................
net_tc: add support for mirroring all (not just IPv4) traffic
Up until now the filter was only acting on packets with IPv4 headers
because of the 'protocol ip' part of the tc filter. This makes the
mirroring protocol agnostic.
Change-Id: I9de381e67fae4cb79fe41dbd9d1b60b72eb84de4
Signed-off-by: Antoni S. Puimedon <asegurap(a)redhat.com>
---
M vdsm/network/tc.py
1 file changed, 2 insertions(+), 2 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/88/27388/1
diff --git a/vdsm/network/tc.py b/vdsm/network/tc.py
index 35c4a92..de1837a 100644
--- a/vdsm/network/tc.py
+++ b/vdsm/network/tc.py
@@ -127,7 +127,7 @@
if filt.prio:
command.extend(['prio', filt.prio])
command.extend(['handle', filt.handle])
- command.extend(['protocol', 'ip', 'u32', 'match', 'u8', '0', '0'])
+ command.extend(['protocol', 'all', 'u32', 'match', 'u8', '0', '0'])
for a in filt.actions:
command.extend(['action', 'mirred', 'egress', 'mirror',
'dev', a.target])
@@ -203,7 +203,7 @@
out = _process_request([EXT_TC, 'filter', 'show', 'dev', dev,
'parent', parent])
- HEADER = 'filter protocol ip pref '
+ HEADER = 'filter protocol all pref '
prio = handle = None
actions = []
prevline = ' '
--
To view, visit http://gerrit.ovirt.org/27388
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9de381e67fae4cb79fe41dbd9d1b60b72eb84de4
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Antoni Segura Puimedon <asegurap(a)redhat.com>
9 years, 5 months
Change in vdsm[master]: gluster: geo replication status and status detail
by dnarayan@redhat.com
Hello Bala.FA,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/18414
to review the following change.
Change subject: gluster: geo replication status and status detail
......................................................................
gluster: geo replication status and status detail
this has two verbs, status: provides geo-replication status of all running
sessions or all sessions associated with a perticular source volume or
session between a source and remote volume. status detail: provides detailed
status of geo-repliction session between source and remote volume
Change-Id: I4f37f35a5480fbe049a67758e122d4a0c2eba513
Signed-off-by: ndarshan <dnarayan(a)redhat.com>
---
M client/vdsClientGluster.py
M vdsm/gluster/api.py
M vdsm/gluster/cli.py
M vdsm/gluster/exception.py
M vdsm/gluster/vdsmapi-gluster-schema.json
5 files changed, 223 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/14/18414/1
diff --git a/client/vdsClientGluster.py b/client/vdsClientGluster.py
index 90af83e..76a5ba8 100644
--- a/client/vdsClientGluster.py
+++ b/client/vdsClientGluster.py
@@ -424,6 +424,35 @@
pp.pprint(status)
return status['status']['code'], status['status']['message']
+ def do_glusterVolumeGeoRepStatus(self, args):
+ params = self._eqSplit(args)
+ try:
+ volName = params.get('volName', '')
+ remoteHost = params.get('remoteHost', '')
+ remoteVolName = params.get('remoteVolName', '')
+ except:
+ raise ValueError
+ status = self.s.glusterVolumeGeoRepStatus(volName, remoteHost,
+ remoteVolName)
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeGeoRepStatusDetail(self, args):
+ params = self._eqSplit(args)
+ try:
+ volName = params.get('volName', '')
+ remoteHost = params.get('remoteHost', '')
+ remoteVolName = params.get('remoteVolName', '')
+ except:
+ raise ValueError
+ if not (volName and remoteHost and remoteVolName):
+ raise ValueError
+ status = self.s.glusterVolumeGeoRepStatusDetail(volName,
+ remoteHost,
+ remoteVolName)
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
def getGlusterCmdDict(serv):
return \
@@ -705,4 +734,24 @@
'not set'
'(swift, glusterd, smb, memcached)'
)),
+ 'glusterVolumeGeoRepStatus': (
+ serv.do_glusterVolumeGeoRepStatus,
+ ('volName=<master_volume_name> '
+ 'remoteHost=<slave_host_name> '
+ 'remoteVolName=<slave_volume_name> '
+ '<master_volume_name>existing volume name in the master node\n\t'
+ '<slave_host_name>is remote slave host name or ip\n\t'
+ '<slave_volume_name>existing volume name in the slave node',
+ 'Returns ths status of geo-replication'
+ )),
+ 'glusterVolumeGeoRepStatusDetail': (
+ serv.do_glusterVolumeGeoRepStatusDetail,
+ ('volName=<master_volume_name> '
+ 'remoteHost=<slave_host_name> '
+ 'remoteVolName=<slave_volume_name> '
+ '<master_volume_name>existing volume name in the master node\n\t'
+ '<slave_host_name>is remote slave host name or ip\n\t'
+ '<slave_volume_name>existing volume name in the slave node',
+ 'Returns the Detailed status of geo-replication'
+ ))
}
diff --git a/vdsm/gluster/api.py b/vdsm/gluster/api.py
index 4bd8308..d24e700 100644
--- a/vdsm/gluster/api.py
+++ b/vdsm/gluster/api.py
@@ -287,6 +287,22 @@
status = self.svdsmProxy.glusterServicesGet(serviceNames)
return {'services': status}
+ @exportAsVerb
+ def volumeGeoRepStatus(self, volName=None, remoteHost=None,
+ remoteVolName=None, options=None):
+ status = self.svdsmProxy.glusterVolumeGeoRepStatus(volName,
+ remoteHost,
+ remoteVolName)
+ return {'geo-rep': status}
+
+ @exportAsVerb
+ def volumeGeoRepStatusDetail(self, volName, remoteHost,
+ remoteVolName, options=None):
+ status = self.svdsmProxy.glusterVolumeGeoRepStatusDetail(volName,
+ remoteHost,
+ remoteVolName)
+ return {'geo-rep': status}
+
def getGlusterMethods(gluster):
l = []
diff --git a/vdsm/gluster/cli.py b/vdsm/gluster/cli.py
index bac6d1c..1cf0e12 100644
--- a/vdsm/gluster/cli.py
+++ b/vdsm/gluster/cli.py
@@ -897,3 +897,59 @@
return _parseVolumeProfileInfo(xmltree, nfs)
except _etreeExceptions:
raise ge.GlusterXmlErrorException(err=[etree.tostring(xmltree)])
+
+
+def _parseGeoRepStatusDetail(tree):
+ status = {'node': tree.find('geoRep/node').text,
+ 'health': tree.find('geoRep/health').text,
+ 'uptime': tree.find('geoRep/uptime').text,
+ 'filesSyncd': tree.find('geoRep/filesSyncd').text,
+ 'filesPending': tree.find('geoRep/filesPending').text,
+ 'bytesPending': tree.find('geoRep/bytesPending').text,
+ 'deletesPending': tree.find('geoRep/deletesPending').text}
+ return status
+
+
+def _parseGeoRepStatus(tree):
+ pairs = []
+ for el in tree.findall('geoRep/pair'):
+ value = {}
+ value['node'] = el.find('node').text
+ value['master'] = el.find('master').text
+ value['slave'] = el.find('slave').text
+ value['health'] = el.find('health').text
+ value['uptime'] = el.find('uptime').text
+ pairs.append(value)
+ return pairs
+
+
+@makePublic
+def volumeGeoRepStatus(volName=None, remoteHost=None, remoteVolName=None,
+ ):
+ command = _getGlusterVolCmd() + ["geo-replication", volName,
+ "%s::%s" % (remoteHost, remoteVolName),
+ "status"]
+ try:
+ xmltree = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException as e:
+ raise ge.GlusterGeoRepStatusFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGeoRepStatus(xmltree)
+ except _etreeExceptions:
+ raise ge.GlusterXmlErrorException(err=[etree.tostring(xmltree)])
+
+
+@makePublic
+def volumeGeoRepStatusDetail(volName, remoteHost, remoteVolName,
+ ):
+ command = _getGlusterVolCmd() + ["geo-replication", volName,
+ "%s::%s" % (remoteHost, remoteVolName),
+ "status", "detail"]
+ try:
+ xmltree = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException as e:
+ raise ge.GlusterGeoRepStatusDetailFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGeoRepStatusDetail(xmltree)
+ except _etreeExceptions:
+ raise ge.GlusterXmlErrorException(err=[etree.tostring(xmltree)])
diff --git a/vdsm/gluster/exception.py b/vdsm/gluster/exception.py
index c569a9e..d95b168 100644
--- a/vdsm/gluster/exception.py
+++ b/vdsm/gluster/exception.py
@@ -484,3 +484,19 @@
prefix = "%s: " % (action)
self.message = prefix + "Service action is not supported"
self.err = [self.message]
+
+
+#geo-replication
+class GlusterGeoRepException(GlusterException):
+ code = 4560
+ message = "Gluster Geo-Replication Exception"
+
+
+class GlusterGeoRepStatusFailedException(GlusterGeoRepException):
+ code = 4565
+ message = "Geo Rep status failed"
+
+
+class GlusterGeoRepStatusDetailFailedException(GlusterGeoRepException):
+ code = 4566
+ message = "Geo Rep status detail failed"
diff --git a/vdsm/gluster/vdsmapi-gluster-schema.json b/vdsm/gluster/vdsmapi-gluster-schema.json
index 7a4c034..557c750 100644
--- a/vdsm/gluster/vdsmapi-gluster-schema.json
+++ b/vdsm/gluster/vdsmapi-gluster-schema.json
@@ -372,3 +372,89 @@
{'command': {'class': 'GlusterService', 'name': 'action'},
'data': {'serviceName': 'str', 'action': 'GlusterServiceAction'},
'returns': 'GlusterServicesStatusInfo'}
+
+##
+# @GlusterGeoRepStatus:
+#
+# Gluster geo-replication status information.
+#
+# @node: The node where geo-replication is started
+#
+# @master: The source for geo-replication
+#
+# @slave: The destination of geo-replication
+#
+# @health: The status of the geo-replication session
+#
+# @uptime: The time since the geo-replication started
+#
+# Since: 4.10.3
+##
+{'type': 'GlusterGeoRepStatus',
+ 'data': {'node': 'str', 'master': 'str', 'slave': 'str', 'health': 'str', 'uptime': 'int'}}
+
+
+##
+# @GlusterVolume.geoRepStatus:
+#
+# Gets the status of geo-Replication session
+#
+# @masterVolName: Is an existing volume name in the master node
+#
+# @slaveHost: Is remote slave host name or ip
+#
+# @slaveVolName: Is an available existing volume name in the slave node
+#
+# Returns:
+# status information for geo-replication
+#
+# Since: 4.10.3
+##
+{'command': {'class': 'GlusterVolume', 'name': 'geoRepStatus'},
+ 'data': {'masterVolName': 'str', 'slaveHost': 'str', 'slaveVolName': 'str'},
+ 'returns': 'GlusterGeoRepStatus'}
+
+##
+# @GlusterGeoRepStatusDetail:
+#
+# Gluster geo-replication detailed status information.
+#
+# @node: The node where geo-replication is started
+#
+# @health: The status of the geo-replication session
+#
+# @uptime: The time since the geo-replication started
+#
+# @filesSyncd: The number of files that are synced
+#
+# @filesPending: The number of files that are pending to be synced
+#
+# @bytesPending: The number of bytes that are pending to be synced
+#
+# @deletesPending: The number of deletes that are pending
+#
+# Since: 4.10.3
+##
+{'type': 'GlusterGeoRepStatusDetail',
+ 'data': {'node': 'str', 'health': 'str', 'uptime': 'int', 'filesSyncd': 'int', 'filesPending': 'int',
+ 'bytesPending': 'int','deletesPending': 'int'}}
+
+##
+# @GlusterVolume.geoRepStatusDetail:
+#
+# Gets the detailed status of geo-Replication session
+#
+# @masterVolName: Is an existing volume name in the master node
+#
+# @slaveHost: Is remote slave host name or ip
+#
+# @slaveVolName: Is an available existing volume name in the slave node
+#
+# Returns:
+# detailed status information of geo-replication session
+#
+# Since: 4.10.3
+##
+{'command': {'class': 'GlusterVolume', 'name': 'geoRepStatusDetail'},
+ 'data': {'masterVolName': 'str', 'slaveHost': 'str', 'slaveVolName': 'str'},
+ 'returns': 'GlusterGeoRepStatusDetail'}
--
To view, visit http://gerrit.ovirt.org/18414
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4f37f35a5480fbe049a67758e122d4a0c2eba513
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: ndarshan <dnarayan(a)redhat.com>
Gerrit-Reviewer: Bala.FA <barumuga(a)redhat.com>
9 years, 5 months
Change in vdsm[master]: schedule: Introduce scheduling libary
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: schedule: Introduce scheduling libary
......................................................................
schedule: Introduce scheduling libary
This moudule provides a Scheduler class scheduling execution of
callables on a background thread.
This should be part of the new scalable vm sampling implemntation, and
can be used also whenever you like to perform a short task on a
background thread, without waiting for the completion of the task.
See the module docstring and tests for usage examples.
Change-Id: Ie3764806d93bd37c3b5924080eb5ae4d29e4f4e0
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M debian/vdsm-python.install
M lib/vdsm/Makefile.am
A lib/vdsm/schedule.py
M tests/Makefile.am
A tests/scheduleTests.py
M vdsm.spec.in
6 files changed, 346 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/07/29607/1
diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index 2d4bba6..1775241 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -16,6 +16,7 @@
./usr/lib/python2.7/dist-packages/vdsm/netlink/link.py
./usr/lib/python2.7/dist-packages/vdsm/profile.py
./usr/lib/python2.7/dist-packages/vdsm/qemuimg.py
+./usr/lib/python2.7/dist-packages/vdsm/schedule.py
./usr/lib/python2.7/dist-packages/vdsm/sslutils.py
./usr/lib/python2.7/dist-packages/vdsm/tool/__init__.py
./usr/lib/python2.7/dist-packages/vdsm/tool/dummybr.py
diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am
index c074bb3..89a5573 100644
--- a/lib/vdsm/Makefile.am
+++ b/lib/vdsm/Makefile.am
@@ -32,6 +32,7 @@
netinfo.py \
profile.py \
qemuimg.py \
+ schedule.py \
SecureXMLRPCServer.py \
sslutils.py \
utils.py \
diff --git a/lib/vdsm/schedule.py b/lib/vdsm/schedule.py
new file mode 100644
index 0000000..bd75924
--- /dev/null
+++ b/lib/vdsm/schedule.py
@@ -0,0 +1,213 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+This module provides a Scheduler class scheduling execution of
+a callable on a background thread.
+
+To use a scheduler, create an instance:
+
+ scheduler = schedule.Scheduler()
+
+When you want to schedule some callable:
+
+ def task():
+ print '30 seconds passed'
+
+ scheduler.schedule(30.0, task)
+
+task will be called after 30.0 seconds on the scheduler background thread.
+
+If you need to cancel a scheduled call, keep the ScheduledCall object returned
+from Scheduler.schedule(), and cancel the task:
+
+ scheduled_call = scheduler.schedule(30.0, call)
+ ...
+ scheduled_call.cancel()
+
+Finally, when the scheduler is not needed any more:
+
+ scheduler.cancel()
+
+This will cancel any pending calls and terminate the scheduler thread.
+"""
+
+import heapq
+import logging
+import threading
+import time
+
+from . import utils
+
+
+class Scheduler(object):
+ """
+ Schedule calls for future execution in a background thread.
+
+ This class is thread safe; multiple threads can schedule calls or cancel
+ the scheudler.
+ """
+
+ DEFAULT_DELAY = 30.0 # Used if no timeout are scheduled
+
+ _log = logging.getLogger("vds.Scheduler")
+
+ def __init__(self):
+ self._log.debug("Starting scheduler")
+ self._cond = threading.Condition(threading.Lock())
+ self._running = True
+ self._timeouts = []
+ t = threading.Thread(target=self._run)
+ t.daemon = True
+ t.start()
+
+ def schedule(self, delay, callee):
+ """
+ Schedule callee to be called after delay seconds on the scheduler
+ thread.
+
+ Callee must not block or take excessive time to complete. It it does
+ not finish quickly, it may delay other scheduled calls on the scheduler
+ thread.
+
+ Returns a ScheduledCall that may be canceled if callee was not called
+ yet.
+ """
+ deadline = time.time() + delay
+ timeout = _Timeout(deadline, callee)
+ self._log.debug("Schedulng %s", timeout)
+ with self._cond:
+ if self._running:
+ heapq.heappush(self._timeouts, timeout)
+ self._cond.notify()
+ else:
+ timeout.cancel()
+ return ScheduledCall(timeout)
+
+ def cancel(self):
+ """
+ Cancel all schedueld calls and invalidate the scheduler. Calls
+ scheduled after a scheduler was cancel will never be called.
+ """
+ self._log.debug("Canceling scheduler")
+ with self._cond:
+ self._running = False
+ self._cond.notify()
+
+ @utils.traceback(on=_log.name)
+ def _run(self):
+ try:
+ self._log.debug("started")
+ self._loop()
+ self._log.debug("canceled")
+ finally:
+ self._cleanup()
+
+ def _loop(self):
+ while True:
+ with self._cond:
+ if not self._running:
+ return
+ delay = self._time_until_deadline()
+ if delay > 0.0:
+ self._cond.wait(delay)
+ if not self._running:
+ return
+ expired = self._pop_expired_timeouts()
+ for timeout in expired:
+ timeout.fire()
+
+ def _time_until_deadline(self):
+ if self._timeouts:
+ return self._timeouts[0].deadline - time.time()
+ return self.DEFAULT_DELAY
+
+ def _pop_expired_timeouts(self):
+ now = time.time()
+ expired = []
+ while self._timeouts:
+ timeout = self._timeouts[0]
+ if timeout.deadline > now:
+ break
+ heapq.heappop(self._timeouts)
+ expired.append(timeout)
+ return expired
+
+ def _cleanup(self):
+ # Help the garbage collector by breaking reference cycles
+ with self._cond:
+ for timeout in self._timeouts:
+ timeout.cancel()
+
+
+class ScheduledCall(object):
+ """
+ Returned when a callable is scheduled to be called after delay. The caller
+ may cancel the call if it was not called yet.
+
+ This class is thread safe; any thread can cacnel a call.
+ """
+
+ _log = logging.getLogger("vds.Scheduler")
+
+ def __init__(self, timeout):
+ self._timeout = timeout
+
+ @property
+ def deadline(self):
+ return self._timeout.deadline
+
+ def cancel(self):
+ self._log.debug("Canceling %s", self)
+ self._timeout.cancel()
+
+
+# Sentinel for marking timeouts as invalid. Callable so we can invaliate a
+# timeout in a thread safe manner without locks.
+def _INVALID():
+ pass
+
+
+class _Timeout(object):
+ """
+ Created for each scheduled call.
+ """
+
+ _log = logging.getLogger('vds.Timeout')
+
+ def __init__(self, deadline, callee):
+ self.deadline = deadline
+ self.callee = callee
+
+ def fire(self):
+ if self.callee is _INVALID:
+ return
+ try:
+ self.callee()
+ except Exception:
+ self._log.exception("Unhandled exception in scheduled call")
+ finally:
+ self.callee = _INVALID
+
+ def cancel(self):
+ self.callee = _INVALID
+
+ def __cmp__(self, other):
+ return cmp(self.deadline, other.deadline)
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 6507165..786dea4 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -65,6 +65,7 @@
remoteFileHandlerTests.py \
resourceManagerTests.py \
samplingTests.py \
+ scheduleTests.py \
schemaTests.py \
securableTests.py \
sslTests.py \
diff --git a/tests/scheduleTests.py b/tests/scheduleTests.py
new file mode 100644
index 0000000..0c370d9
--- /dev/null
+++ b/tests/scheduleTests.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import threading
+import time
+
+from vdsm import schedule
+from testrunner import VdsmTestCase
+
+
+class SchedulerTests(VdsmTestCase):
+
+ # Time to wait for completion, so test will not fail on overloaded
+ # machines. If tests fails on CI, increase this value.
+ GRACETIME = 0.1
+
+ MAX_TASKS = 1000
+
+ def setUp(self):
+ self.scheduler = schedule.Scheduler()
+
+ def tearDown(self):
+ self.scheduler.cancel()
+
+ def test_schedule(self):
+ delay = 0.3
+ task1 = Task()
+ task2 = Task()
+ timeout1 = self.scheduler.schedule(delay, task1)
+ self.scheduler.schedule(10, task2)
+ task1.wait(delay + self.GRACETIME)
+ self.assertTrue(timeout1.deadline <= task1.call_time)
+ self.assertTrue(task1.call_time < timeout1.deadline + self.GRACETIME)
+ self.assertEquals(task2.call_time, None)
+
+ def test_schedule_many(self):
+ delay = 0.3
+ tasks = []
+ for i in range(self.MAX_TASKS):
+ task = Task()
+ timeout = self.scheduler.schedule(delay, task)
+ tasks.append((task, timeout))
+ last_task = tasks[-1][0]
+ last_task.wait(delay + self.GRACETIME)
+ for task, timeout in tasks:
+ self.assertTrue(timeout.deadline <= task.call_time)
+ self.assertTrue(task.call_time < timeout.deadline + self.GRACETIME)
+
+ def test_continue_after_failures(self):
+ self.scheduler.schedule(0.3, FailingTask())
+ task = Task()
+ self.scheduler.schedule(0.4, task)
+ task.wait(0.4 + self.GRACETIME)
+ self.assertTrue(task.call_time is not None)
+
+ def test_cancel_timeout(self):
+ delay = 0.3
+ task = Task()
+ timeout = self.scheduler.schedule(delay, task)
+ timeout.cancel()
+ task.wait(delay + self.GRACETIME)
+ self.assertEquals(task.call_time, None)
+
+ def test_cancel_many(self):
+ delay = 0.3
+ tasks = []
+ for i in range(self.MAX_TASKS):
+ task = Task()
+ timeout = self.scheduler.schedule(delay, task)
+ tasks.append((task, timeout))
+ for task, timeout in tasks:
+ timeout.cancel()
+ last_task = tasks[-1][0]
+ last_task.wait(delay + self.GRACETIME)
+ for task, timeout in tasks:
+ self.assertEquals(task.call_time, None)
+
+ def test_cancel(self):
+ delay = 0.3
+ tasks = []
+ for i in range(self.MAX_TASKS):
+ task = Task()
+ timeout = self.scheduler.schedule(delay, task)
+ tasks.append((task, timeout))
+ self.scheduler.cancel()
+ last_task = tasks[-1][0]
+ last_task.wait(delay + self.GRACETIME)
+ for task, timeout in tasks:
+ self.assertEquals(task.call_time, None)
+
+
+class Task(object):
+
+ def __init__(self):
+ self.cond = threading.Condition(threading.Lock())
+ self.call_time = None
+
+ def __call__(self):
+ with self.cond:
+ self.call_time = time.time()
+ self.cond.notify()
+
+ def wait(self, timeout):
+ with self.cond:
+ if self.call_time is None:
+ self.cond.wait(timeout)
+
+
+class FailingTask(object):
+
+ def __call__(self):
+ raise Exception("This task is broken")
diff --git a/vdsm.spec.in b/vdsm.spec.in
index dfca5bd..8ba0477 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1163,6 +1163,7 @@
%{python_sitearch}/%{vdsm_name}/qemuimg.py*
%{python_sitearch}/%{vdsm_name}/SecureXMLRPCServer.py*
%{python_sitearch}/%{vdsm_name}/netconfpersistence.py*
+%{python_sitearch}/%{vdsm_name}/schedule.py*
%{python_sitearch}/%{vdsm_name}/sslutils.py*
%{python_sitearch}/%{vdsm_name}/utils.py*
%{python_sitearch}/%{vdsm_name}/vdscli.py*
--
To view, visit http://gerrit.ovirt.org/29607
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie3764806d93bd37c3b5924080eb5ae4d29e4f4e0
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
9 years, 5 months
Change in vdsm[master]: threadpool: add a thread pool with watchdog
by fromani@redhat.com
Francesco Romani has uploaded a new change for review.
Change subject: threadpool: add a thread pool with watchdog
......................................................................
threadpool: add a thread pool with watchdog
Add a new thread pool implementation with built-in
watchdog.
The watchdog is needed to accomodate the needs of
the virt sampling code.
The virt sampling needs to deal with possibly blocking
I/O like calls which cannot be made not-blocking.
Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Signed-off-by: Francesco Romani <fromani(a)redhat.com>
---
M lib/threadpool/Makefile.am
A lib/threadpool/examples/demo.py
A lib/threadpool/schedqueue.py
A lib/threadpool/tests/test_schedqueue.py
A lib/threadpool/tests/test_timetrackingthread.py
A lib/threadpool/tests/test_worker.py
A lib/threadpool/tests/test_workqueue.py
A lib/threadpool/watchedpool.py
A lib/threadpool/watchman.py
A lib/threadpool/worker.py
A lib/threadpool/workqueue.py
11 files changed, 1,003 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/91/29191/1
diff --git a/lib/threadpool/Makefile.am b/lib/threadpool/Makefile.am
index 3a71a5d..8b7098e 100644
--- a/lib/threadpool/Makefile.am
+++ b/lib/threadpool/Makefile.am
@@ -22,4 +22,9 @@
dist_threadpool_PYTHON = \
__init__.py \
+ schedqueue.py \
+ watchedpool.py \
+ watchman.py \
+ worker.py \
+ workqueue.py \
$(NULL)
diff --git a/lib/threadpool/examples/demo.py b/lib/threadpool/examples/demo.py
new file mode 100644
index 0000000..cf80930
--- /dev/null
+++ b/lib/threadpool/examples/demo.py
@@ -0,0 +1,62 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import logging
+import random
+import sys
+import time
+
+from watchedpool import WatchedThreadPool
+
+
+def _test_fun(*args):
+ """
+ just block the caller for a random time span.
+ """
+ tmo = random.randint(1, 15)
+ logging.info('started, waiting: %.3fs', tmo)
+ time.sleep(tmo)
+ logging.info('done')
+
+
+def _main(args):
+ """
+ main test driver
+ """
+ nworkers = int(args[0]) if len(args) >= 1 else 1
+ pool = WatchedThreadPool(nworkers)
+ pool.start()
+ for tag in range(nworkers):
+ pool.submit_periodic(4, _test_fun)
+
+ try:
+ while True:
+ time.sleep(1.0)
+ except KeyboardInterrupt:
+ logging.debug('exiting!')
+ finally:
+ pool.stop()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(
+ level=logging.DEBUG,
+ format='%(asctime)s %(threadName)-10s %(levelname)-8s %(message)s')
+ _main(sys.argv[1:])
diff --git a/lib/threadpool/schedqueue.py b/lib/threadpool/schedqueue.py
new file mode 100644
index 0000000..c2bfbae
--- /dev/null
+++ b/lib/threadpool/schedqueue.py
@@ -0,0 +1,123 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+Time-aware priority queue, built like queue.Queue
+but tailored for WatchedThreadPool needs.
+"""
+
+import heapq
+import threading
+import time
+
+
+class QueueError(Exception):
+ """
+ Queue exception root.
+ """
+
+
+class Empty(QueueError):
+ """
+ Attempt to get an item from an empty queue.
+ """
+
+
+class Full(QueueError):
+ """
+ Attempt to put an item into a full queue.
+ """
+
+
+class NotYet(QueueError):
+ """
+ Attempt to get an item which is not due at the
+ present (reported) time.
+ """
+
+
+class SchedQueue(object):
+ """
+ Time-aware, not blocking (client must take care of
+ waiting/timing) priority queue.
+ """
+ def __init__(self, maxsize=0, timefunc=time.time):
+ self._timefunc = timefunc
+ self._maxsize = maxsize
+ self._queue = self._init(maxsize)
+ self._lock = threading.Lock()
+
+ def __len__(self):
+ with self._lock:
+ return self._size()
+
+ def put(self, item, delay=0):
+ """
+ Put an item into the queue, to be made ready 'delay' seconds
+ in the future.
+ """
+ with self._lock:
+ if self._maxsize > 0 and self._size() == self._maxsize:
+ raise Full
+ self._put((self._timefunc() + delay, item))
+
+ def get(self, timenow):
+ """
+ Ask for an item to be consumed.
+ """
+ with self._lock:
+ if not self._size():
+ raise Empty
+ timeval, _ = self._peek()
+ if timeval > timenow:
+ raise NotYet
+ _, item = self._get()
+ return item
+
+ def _init(self, maxsize):
+ """
+ builds the internal queue.
+ """
+ self._maxsize = maxsize # to make pylint happy
+ return []
+
+ def _size(self):
+ """
+ size of the internal queue.
+ """
+ return len(self._queue)
+
+ def _put(self, item):
+ """
+ put a new element (at the end of) in the queue.
+ """
+ heapq.heappush(self._queue, item)
+
+ def _get(self):
+ """
+ gets (and removes) the next element in queue.
+ """
+ return heapq.heappop(self._queue)
+
+ def _peek(self):
+ """
+ read-only view of the next element in queue.
+ """
+ return self._queue[0]
diff --git a/lib/threadpool/tests/test_schedqueue.py b/lib/threadpool/tests/test_schedqueue.py
new file mode 100644
index 0000000..3595589
--- /dev/null
+++ b/lib/threadpool/tests/test_schedqueue.py
@@ -0,0 +1,56 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import unittest
+
+import schedqueue
+
+
+class SchedQueueTests(unittest.TestCase):
+ def _faketime(self):
+ ret = self._ts
+ self._ts += 1
+ return ret
+
+ def setUp(self):
+ self._ts = 0
+ self._sq = schedqueue.SchedQueue(timefunc=self._faketime)
+
+ def test_empty_on_create(self):
+ self.assertEqual(len(self._sq), 0)
+
+ def test_put_on_full(self):
+ self._sq = schedqueue.SchedQueue(maxsize=1, timefunc=self._faketime)
+ self._sq.put('foo')
+ self.assertRaises(schedqueue.Full, self._sq.put, 'bar')
+
+ def test_get_on_empty(self):
+ self.assertRaises(schedqueue.Empty, self._sq.get, 1)
+
+ def test_get_too_early(self):
+ item = 'foo'
+ self._sq.put(item, delay=5)
+ self.assertRaises(schedqueue.NotYet, self._sq.get, 0)
+
+ def test_put_get(self):
+ item = 'foo'
+ self._sq.put(item)
+ res = self._sq.get(timenow=1)
+ self.assertEqual(res, item)
diff --git a/lib/threadpool/tests/test_timetrackingthread.py b/lib/threadpool/tests/test_timetrackingthread.py
new file mode 100644
index 0000000..14a25ef
--- /dev/null
+++ b/lib/threadpool/tests/test_timetrackingthread.py
@@ -0,0 +1,64 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import unittest
+
+import worker
+
+
+class TimeTrackingThreadTests(unittest.TestCase):
+ def _fakesleep(self, delay):
+ self._ts += max(0, delay) # avoid negative delays
+
+ def _faketime(self):
+ ret = self._ts
+ self._ts += 1
+ return ret
+
+ def setUp(self):
+ self._ts = 0
+ self._ttt = worker.TimeTrackingThread(self._faketime)
+
+ def assertIdle(self):
+ self.assertFalse(self._ttt.busy)
+ self.assertEqual(self._ttt.task, None)
+
+ def assertBusyOn(self, task):
+ self.assertTrue(self._ttt.busy)
+ self.assertEqual(self._ttt.task, task)
+
+ def test_created_free(self):
+ self.assertIdle()
+
+ def test_carry_task(self):
+ self.assertIdle()
+
+ task = 'foobar'
+ with self._ttt.track_time(task):
+ self.assertBusyOn(task)
+
+ self.assertIdle()
+
+ def test_elapsed(self):
+ task = 'barbaz'
+ delay = 5
+ with self._ttt.track_time(task):
+ self._fakesleep(delay)
+ self.assertTrue(self._ttt.elapsed >= delay)
diff --git a/lib/threadpool/tests/test_worker.py b/lib/threadpool/tests/test_worker.py
new file mode 100644
index 0000000..7396673
--- /dev/null
+++ b/lib/threadpool/tests/test_worker.py
@@ -0,0 +1,60 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import contextlib
+import unittest
+
+import schedqueue
+import worker
+
+
+class FakeWorkQueue(object):
+ def __init__(self, items):
+ self._items = items
+
+ @contextlib.contextmanager
+ def fetch(self):
+ if not self._items:
+ raise schedqueue.Empty
+ item = self._items.pop(0)
+ if issubclass(item, Exception):
+ raise item
+ else:
+ yield item
+
+
+class WorkerThreadTests(unittest.TestCase):
+ def test__pull_empty(self):
+ wq = FakeWorkQueue([])
+ wt = worker.Worker(wq)
+ with wt._pull() as item:
+ self.assertEqual(item, (None, None, None, None))
+
+ def test__pull_empty_exc(self):
+ wq = FakeWorkQueue([schedqueue.Empty])
+ wt = worker.Worker(wq)
+ with wt._pull() as item:
+ self.assertEqual(item, (None, None, None, None))
+
+ def test__pull_notyet(self):
+ wq = FakeWorkQueue([schedqueue.NotYet])
+ wt = worker.Worker(wq)
+ with wt._pull() as item:
+ self.assertEqual(item, (None, None, None, None))
diff --git a/lib/threadpool/tests/test_workqueue.py b/lib/threadpool/tests/test_workqueue.py
new file mode 100644
index 0000000..68e982d
--- /dev/null
+++ b/lib/threadpool/tests/test_workqueue.py
@@ -0,0 +1,80 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import unittest
+
+import schedqueue
+import watchedpool
+
+
+def _do_nothing(*args, **kwargs):
+ pass
+
+
+class WorkQueueTests(unittest.TestCase):
+ def _fakesleep(self, delay):
+ pass
+
+ def _faketime(self):
+ ret = self._ts
+ self._ts += 1
+ return ret
+
+ def assertDummyWork(self, workitem, in_tag=None):
+ out_tag, work, args, kwargs = workitem
+ if in_tag is not None:
+ self.assertEqual(in_tag, out_tag)
+ self.assertEqual(work, _do_nothing)
+ self.assertEqual(len(args), 0)
+ self.assertEqual(len(kwargs), 0)
+
+ def setUp(self):
+ self._ts = 0
+ self._wq = watchedpool.WorkQueue(
+ timefunc=self._faketime,
+ delayfunc=self._fakesleep)
+
+ def test_created_empty(self):
+ self.assertEqual(len(self._wq), 0)
+
+ def test_get_on_empty(self):
+ self.assertRaises(schedqueue.Empty, self._wq.get)
+
+ def test_post_on_empty(self):
+ self._wq.post(0, _do_nothing)
+ self.assertEqual(len(self._wq), 1)
+
+ def test_put_get(self):
+ in_tag = 'foo'
+ self._wq.put(in_tag, 0, _do_nothing)
+ self.assertDummyWork(self._wq.get())
+
+ def test_remove(self):
+ in_tag = 'foo'
+ self._wq.put(in_tag, 0, _do_nothing)
+ self._wq.remove(in_tag)
+ self.assertRaises(schedqueue.Empty, self._wq.get)
+
+ def test_fetch(self):
+ in_tag = 'foo'
+ self._wq.put(in_tag, 1, _do_nothing)
+ with self._wq.fetch() as work_item:
+ self.assertDummyWork(work_item)
+ self.assertEqual(len(self._wq), 1)
diff --git a/lib/threadpool/watchedpool.py b/lib/threadpool/watchedpool.py
new file mode 100644
index 0000000..6a8938f
--- /dev/null
+++ b/lib/threadpool/watchedpool.py
@@ -0,0 +1,128 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+PoC of a watched thread pool implementation.
+This thread pool adds a watcher/watchman thread which detaches `rogue'
+threads from the main pool, and ask them to die as soon as possible.
+The purpose is to avoid worker threads getting stuck too much on
+busy, possible blocking operations (e.g. risky I/O).
+
+In the worst case scenario, the pool size degenerates in a
+thread-per-operation. In the common and expected case,
+the pool size is fixed and rogue threads are just spikes
+in the thread number.
+
+TODO:
+- allow the client to resize dynamically the pool [?]
+- let this code be easily mergeable with vdsm/storage/threadPool.py
+- demonstrate that this code does not leak threads.
+"""
+
+import logging
+import threading
+
+from watchman import Watchman
+from worker import Worker
+from workqueue import WorkQueue
+
+
+class WatchedThreadPool(object):
+ """
+ Straightforward thread pool, with an added Watchman.
+ """
+ def __init__(self, nworkers=1, work_interval=1):
+ self._nworkers = nworkers
+ self._work_interval = work_interval
+ self._watch_interval = 2 * self._work_interval
+ self._busy_timeout = 3 * self._watch_interval
+ self._work_queue = None
+ self._workers = None
+ self._watcher = None
+ self._running = False
+ self._lock = threading.Lock()
+
+ def start(self):
+ """
+ starts the pool. Thread safe.
+ """
+ with self._lock:
+ if not self._running:
+ self._work_queue = WorkQueue(self._work_interval)
+ self._workers = [self._make_worker()
+ for _ in range(self._nworkers)]
+ self._watcher = Watchman(self._workers,
+ self._make_worker,
+ self._watch_interval,
+ self._busy_timeout)
+ self._watcher.start()
+ logging.info('pool started')
+ self._running = True
+
+ def stop(self, wait=True):
+ """
+ terminate all pooled threads. Thread safe.
+ """
+ with self._lock:
+ if self._running:
+ self._running = False
+
+ self._watcher.stop()
+ if wait:
+ for worker in self._workers:
+ worker.join()
+ del worker
+ self._watcher.reap([]) # last chance
+ self._watcher.join()
+ del self._watcher
+
+ def submit_periodic(self, interval, work, *args, **kwargs):
+ """
+ submit a new work unit to the pool.
+ returns the work unit tag.
+ The tag is guaranteed to be unique per-pool.
+ """
+ if self._running:
+ tag = self._work_queue.post(interval, work, args, kwargs)
+ logging.debug('new work: (%s, %s, %s) -> %s',
+ str(work), str(args), str(kwargs), tag)
+ return tag # FIXME
+ else:
+ return None
+
+ def remove(self, tag):
+ """
+ remove a work unit from the pool.
+ """
+ return self._work_queue.remove(tag)
+
+ def notify_timeout(self, tag, on_timeout):
+ """
+ register a timeout callback for the given work unit.
+ """
+ self._watcher.notify_timeout(tag, on_timeout)
+
+ def _make_worker(self):
+ """
+ creates a new idle (not started) worker.
+ """
+ worker = Worker(self._work_queue)
+ logging.info('worker %s added to the pool', worker.name)
+ return worker
diff --git a/lib/threadpool/watchman.py b/lib/threadpool/watchman.py
new file mode 100644
index 0000000..53bfc11
--- /dev/null
+++ b/lib/threadpool/watchman.py
@@ -0,0 +1,209 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+PoF of a watched thread pool implementation.
+This thread pool adds a watcher/watchman thread which detaches `rogue'
+threads from the main pool, and ask them to die as soon as possible.
+The purpose is to avoid worker threads getting stuck too much on
+busy, possible blocking operations (e.g. risky I/O).
+
+In the worst case scenario, the pool size degenerates in a
+thread-per-operation. In the common and expected case,
+the pool size is fixed and rogue threads are just spikes
+in the thread number.
+
+TODO:
+- allow the client to resize dynamically the pool [?]
+- let this code be easily mergeable with vdsm/storage/threadPool.py
+- demonstrate that this code does not leak threads.
+"""
+
+import logging
+import threading
+import time
+
+
+class Watchman(threading.Thread):
+ """
+ Watches the worker threads, detects the rogues, if any,
+ and possibly replace them to ensure the pool is operative
+ and not blocked.
+ """
+ def __init__(self, workers, make_worker,
+ watch_interval=5, busy_timeout=10,
+ rogues_cap=None, delayfunc=time.sleep):
+ super(Watchman, self).__init__()
+ self.daemon = True
+ self._workers = workers
+ self._rogues = []
+ self._rogues_cap = rogues_cap
+ self._watch_interval = watch_interval
+ self._make_worker = make_worker
+ self._busy_timeout = busy_timeout
+ self._timeout_cb = {}
+ self._stop = threading.Event()
+ self._running = True
+ self._delayfunc = delayfunc
+
+ def suspend(self):
+ """
+ suspend the watch, without stopping.
+ """
+ self._running = False
+
+ def resume(self):
+ """
+ resume the watch.
+ """
+ self._running = True
+
+ def start(self):
+ """
+ start the watched pool and the watcher itself.
+ """
+ for worker in self._workers:
+ worker.start()
+ super(Watchman, self).start()
+
+ def stop(self):
+ """
+ stop the watched pool, and the watcher itself.
+ """
+ self.suspend()
+ for worker in self._workers:
+ worker.stop()
+ self._stop.set()
+
+ def run(self):
+ logging.debug(
+ 'watcher starting (with %i workers)', len(self._workers))
+
+ while not self._stop.is_set():
+ self._delayfunc(self._watch_interval)
+ if self._running:
+ workers, rogues, newbies = self.verify()
+ self.enforce(workers, rogues, newbies)
+
+ logging.debug('watcher done')
+
+ def notify_timeout(self, tag, on_timeout):
+ """
+ register a timeout callback for the given work unit.
+ """
+ self._timeout_cb[tag] = on_timeout
+
+ def stuck(self, worker):
+ """
+ is a given worker stuck?
+ """
+ stuck, task = False, None
+ if worker.busy:
+ # use locals to freeze the state
+ task = worker.task
+ elapsed = worker.elapsed
+ stuck = elapsed > self._busy_timeout
+ logging.warning(
+ '%s busy on %s for the last %02i seconds (max=%02i) -> %s',
+ worker.name, task, elapsed, self._busy_timeout,
+ 'STUCK' if stuck else 'OK')
+ return stuck, task
+
+ def verify(self):
+ """
+ check the health of the worker pool, and classify
+ it into
+ - workers: well behaving worker threads
+ - rogues: workers got stuck, to be evicted from the pool
+ - newbies: new workers ready to replace rogues
+ """
+ try:
+ workers, rogues, newbies = [], [], []
+ logging.debug('verification loop started')
+ for worker in self._workers:
+ stuck, task = self.stuck(worker)
+ if not stuck:
+ workers.append(worker)
+ elif not self._room_for_rogues(rogues):
+ logging.warning(
+ 'worker %s gone rogue, but cap reached', worker.name)
+ elif not self._is_detachable(task):
+ logging.warning(
+ 'worker %s gone rogue, but task %s not detachable',
+ worker.name, task)
+ else:
+ logging.info(
+ 'worker %s gone rogue, replacing', worker.name)
+ worker.stop()
+ rogues.append(worker) # until it dies
+ newbies.append(self._make_worker())
+ logging.debug('verification loop done: %i rogues', len(rogues))
+ return workers, rogues, newbies
+ except Exception:
+ logging.exception('verify failed', exc_info=True)
+
+ def enforce(self, workers, rogues, newbies):
+ """
+ enforce the pool size by evicting rogues into a limbo,
+ and by replenishing the worker pool with the provided
+ newbies.
+ """
+ try:
+ for newbie in newbies:
+ newbie.start()
+ workers.append(newbie)
+ self._workers = workers
+ self.reap(rogues)
+ except Exception:
+ logging.exception('enforce failed', exc_info=True)
+
+ def reap(self, rogues):
+ """
+ collect the terminated rogues.
+ """
+ rogues.extend(self._rogues)
+ new_rogues = []
+ for rogue in rogues:
+ if rogue.is_alive():
+ new_rogues.append(rogue)
+ else:
+ logging.info('rogue thread %s collected', rogue.name)
+ rogue.join()
+ del rogue
+ self._rogues = new_rogues
+
+ def _room_for_rogues(self, rogues):
+ return (self._rogues_cap is None
+ or len(rogues) < self._rogues_cap)
+
+ def _is_detachable(self, task):
+ """
+ if a task is detachable, its worker thread will be detached
+ by the pool. Otherwise the thread will be kept in the pool
+ """
+ on_timeout = self._timeout_cb.get(task, _on_timeout_dummy)
+ return on_timeout(task)
+
+
+def _on_timeout_dummy(*args):
+ """
+ Dummy timeout callback. Marks every work unit as detachable.
+ """
+ return True
diff --git a/lib/threadpool/worker.py b/lib/threadpool/worker.py
new file mode 100644
index 0000000..fe63d34
--- /dev/null
+++ b/lib/threadpool/worker.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import contextlib
+import logging
+import threading
+import time
+
+import schedqueue
+
+
+class TimeTrackingThread(threading.Thread):
+ """
+ Thin mixin to help a worker thread to track the time
+ consumed by a work item.
+ """
+ def __init__(self, timefunc=time.time):
+ super(TimeTrackingThread, self).__init__()
+ self._work_started_at = None
+ self._working_on = None
+ self._timefunc = timefunc
+
+ @property
+ def busy(self):
+ """
+ is the thread doing some work or is it idle?
+ """
+ return self._work_started_at is not None
+
+ @property
+ def elapsed(self):
+ """
+ how much time since the thread started the current work unit?
+ """
+ return self._timefunc() - self._work_started_at
+
+ @property
+ def task(self):
+ """
+ what is this worker doing?
+ """
+ return self._working_on
+
+ @contextlib.contextmanager
+ def track_time(self, tag):
+ """
+ helper context manager to track the time consumed in
+ the current work unit
+ """
+ self._working_on = tag
+ self._work_started_at = self._timefunc()
+ yield
+ self._work_started_at = None
+ self._working_on = None
+
+
+class Worker(TimeTrackingThread):
+ """
+ A regular worker thread. This pool has no additional
+ requirements saved the ability of a worker thread
+ to die ASAP (see the stop() method)
+ """
+ def __init__(self, work_queue):
+ super(Worker, self).__init__()
+ self.daemon = True
+ self._work_queue = work_queue
+ self._stop = threading.Event()
+
+ def run(self):
+ logging.info('worker %s starting', self.name)
+
+ while not self._stop.is_set():
+ self.process()
+
+ logging.info('worker %s done', self.name)
+
+ def process(self):
+ """
+ execute a single try of work processing.
+ """
+ with self._pull() as (tag, work, args, kwargs):
+ if work is not None:
+ self._do_work(tag, work, args, kwargs)
+
+ def stop(self):
+ """
+ gracefully terminate ASAP, and avoid to begin more work.
+ """
+ self._stop.set()
+
+ @contextlib.contextmanager
+ def _pull(self):
+ """
+ pulls the next work item
+ """
+ try:
+ with self._work_queue.fetch() as item:
+ yield item
+ except schedqueue.NotYet:
+ yield None, None, None, None
+ except schedqueue.Empty:
+ yield None, None, None, None
+
+ def _do_work(self, tag, work, args, kwargs):
+ """
+ process a single work unit.
+ """
+ with self.track_time(tag):
+ try:
+ work(*args, **kwargs)
+ except Exception:
+ logging.exception('work failed', exc_info=True)
diff --git a/lib/threadpool/workqueue.py b/lib/threadpool/workqueue.py
new file mode 100644
index 0000000..73949b8
--- /dev/null
+++ b/lib/threadpool/workqueue.py
@@ -0,0 +1,87 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import contextlib
+import time
+import uuid
+
+import schedqueue
+
+
+class WorkQueue(object):
+ """
+ Tiny wrapper around a plain Queue.Queue, to encapsulate
+ the 'periodic work' behaviour.
+ """
+ def __init__(self, wait_timeout=1,
+ timefunc=time.time, delayfunc=time.sleep):
+ self._queue = schedqueue.SchedQueue(timefunc=timefunc)
+ self._work = {}
+ self._wait_timeout = wait_timeout
+ self._timefunc = timefunc
+ self._delayfunc = delayfunc
+
+ def put(self, tag, interval, work, *args, **kwargs):
+ """
+ like Queue.put().
+ """
+ item = (work, args, kwargs, interval)
+ self._work[tag] = item
+ self._queue.put(tag, delay=interval)
+
+ def get(self):
+ """
+ like Queue.get().
+ """
+ tag, work, args, kwargs, _ = self._get()
+ return (tag, work, args, kwargs)
+
+ def remove(self, tag):
+ return self._work.pop(tag, None)
+
+ def post(self, interval, work, *args, **kwargs):
+ """
+ like put(), but with auto-tagging.
+ """
+ return self.put(str(uuid.uuid4()), interval, work, *args, **kwargs)
+
+ @contextlib.contextmanager
+ def fetch(self):
+ """
+ like get(), but automatically reinject the work
+ to do in the queue if the task is periodic.
+ """
+ tag, work, args, kwargs, interval = self._get()
+ yield (tag, work, args, kwargs)
+ if interval:
+ self.put(tag, interval, work, args, kwargs)
+
+ def _get(self):
+ tag, work, args, kwargs = None, None, None, None
+ while work is None:
+ self._delayfunc(self._wait_timeout)
+ tag = self._queue.get(self._timefunc())
+ item = self._work.get(tag, None)
+ if item is not None:
+ work, args, kwargs, interval = item
+ return (tag, work, args, kwargs, interval)
+
+ def __len__(self):
+ return len(self._work)
--
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <fromani(a)redhat.com>
9 years, 5 months