Change in vdsm[master]: dump the core of a domain
by shaohef@linux.vnet.ibm.com
ShaoHe Feng has uploaded a new change for review.
Change subject: dump the core of a domain
......................................................................
dump the core of a domain
libvirt support an API to dump the core of a domain on a given file for
analysis when guest OS crash.
There are two kind of dump files. one is QEMU suspend to disk image.
the other is core file which like kdump file butcontains registers'
value.
It's helpful to support by VDSM to find root cause if a guest gets hang
and kdump isn't set up in it. This would be a good RAS feature.
Here's the definition of the new API:
coreDump:
This method will dump the core of a domain on a given file for
analysis.
Input parameter:
vmId - VM UUID
to - the core file named by the user
flags - defined in libvirt.py
VIR_DUMP_CRASH
VIR_DUMP_LIVE
VIR_DUMP_BYPASS_CACHE
VIR_DUMP_RESET
VIR_DUMP_MEMORY_ONLY
Return value:
success: return doneCode
failure: return errCode including underlying libvirt error message.
Change-Id: If4aac9e747dc7aa64a6ff5ef256a7a4375aa2bb5
Signed-off-by: ShaoHe Feng <shaohef(a)linux.vnet.ibm.com>
---
M vdsm/API.py
M vdsm/BindingXMLRPC.py
M vdsm/define.py
M vdsm/libvirtvm.py
M vdsm_cli/vdsClient.py
5 files changed, 80 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/29/7329/1
diff --git a/vdsm/API.py b/vdsm/API.py
index 19cbb42..e2b24cb 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -244,6 +244,12 @@
self.log.debug("Error creating VM", exc_info=True)
return errCode['unexpected']
+ def coreDump(self, to, flags):
+ v = self._cif.vmContainer.get(self._UUID)
+ if not v:
+ return errCode['noVM']
+ return v.coreDump(to, flags)
+
def desktopLock(self):
"""
Lock user session in guest operating system using guest agent.
diff --git a/vdsm/BindingXMLRPC.py b/vdsm/BindingXMLRPC.py
index cc5300f..be71e6a 100644
--- a/vdsm/BindingXMLRPC.py
+++ b/vdsm/BindingXMLRPC.py
@@ -208,6 +208,10 @@
vm = API.VM(vmId)
return vm.cont()
+ def vmCoreDump(self, vmId, to, flags):
+ vm = API.VM(vmId)
+ return vm.coreDump(to, flags)
+
def vmReset(self, vmId):
vm = API.VM(vmId)
return vm.reset()
@@ -725,6 +729,7 @@
(self.getVMList, 'list'),
(self.vmPause, 'pause'),
(self.vmCont, 'cont'),
+ (self.vmCoreDump, 'coreDump'),
(self.vmSnapshot, 'snapshot'),
(self.vmMerge, 'merge'),
(self.vmMergeStatus, 'mergeStatus'),
diff --git a/vdsm/define.py b/vdsm/define.py
index 31deb4f..1fedac5 100644
--- a/vdsm/define.py
+++ b/vdsm/define.py
@@ -114,6 +114,10 @@
'mergeErr': {'status':
{'code': 52,
'message': 'Merge failed'}},
+ 'coreDumpErr': {'status':
+ {'code': 54,
+ 'message':
+ 'Failed to get coreDump file'}},
'recovery': {'status':
{'code': 99,
'message':
diff --git a/vdsm/libvirtvm.py b/vdsm/libvirtvm.py
index 4554fee..cbd9f96 100644
--- a/vdsm/libvirtvm.py
+++ b/vdsm/libvirtvm.py
@@ -1904,6 +1904,27 @@
self.saveState()
+ def coreDump(self, to, flags):
+
+ def reportError(key='coreDumpErr', msg=None):
+ self.log.error("get coreDump failed", exc_info=True)
+ if msg == None:
+ error = errCode[key]
+ else:
+ error = {'status' : {'code': errCode[key] \
+ ['status']['code'], 'message': msg}}
+ return error
+
+ if self._dom == None:
+ return reportError()
+ try:
+ self._dom.coreDump(to, flags)
+ except libvirt.libvirtError, e:
+ if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
+ return reportError(key='noVM')
+ return reportError(msg=e.message)
+ return {'status': doneCode}
+
def changeCD(self, drivespec):
return self._changeBlockDev('cdrom', 'hdc', drivespec)
diff --git a/vdsm_cli/vdsClient.py b/vdsm_cli/vdsClient.py
index eeb7c95..cdcd3a8 100644
--- a/vdsm_cli/vdsClient.py
+++ b/vdsm_cli/vdsClient.py
@@ -1589,6 +1589,33 @@
return status['status']['code'], status['status']['message']
+ def coreDump(self, args):
+ DUMPFLAGS = {'crash': 1 << 0,
+ 'live': 1 << 1,
+ 'bypass-cache': 1 << 2,
+ 'reset': 1 << 3,
+ 'memory-only': 1 << 4}
+ flags = 0
+ vmId = args[0]
+ coreFile = args[1]
+ params = {}
+ if len(args) > 2:
+ for arg in args[2:]:
+ kv = arg.split('=', 1)
+ if len(kv) < 2:
+ params[kv[0]] = "True"
+ else:
+ params[kv[0]] = kv[1]
+ for k, v in params.items():
+ if v.lower() == "true" or not v:
+ try:
+ flags = flags + DUMPFLAGS[k]
+ except KeyError:
+ print "unrecognized optoin %s for cormDump command" % k
+ response = self.s.coreDump(vmId, coreFile, flags)
+ return response['status']['code'], response['status']['message']
+
+
if __name__ == '__main__':
if _glusterEnabled:
serv = ge.GlusterService()
@@ -2239,6 +2266,23 @@
('<vmId> <sdUUID> <imgUUID> <baseVolUUID> <volUUID>',
"Take a live snapshot"
)),
+ 'coreDump': (serv.coreDump,
+ ('<vmId> <file> [live=<True>] '
+ '[crash=<True>] [bypass-cache=<True>] '
+ '[reset=<True>] [memory-only=<True>]',
+ "get memeory dump or migration file"
+ 'optional params:',
+ 'crash: crash the domain after core dump'
+ 'default False',
+ 'live: perform a live core dump if supported, '
+ 'default False',
+ 'bypass-cache: avoid file system cache when saving'
+ 'default False',
+ 'reset: reset the domain after core dump'
+ 'default False',
+ "memory-only: dump domain's memory only"
+ 'default False'
+ )),
}
if _glusterEnabled:
commands.update(ge.getGlusterCmdDict(serv))
--
To view, visit http://gerrit.ovirt.org/7329
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If4aac9e747dc7aa64a6ff5ef256a7a4375aa2bb5
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: ShaoHe Feng <shaohef(a)linux.vnet.ibm.com>
7 years, 5 months
Change in vdsm[master]: cancel the core dump of a VM
by shaohef@linux.vnet.ibm.com
ShaoHe Feng has uploaded a new change for review.
Change subject: cancel the core dump of a VM
......................................................................
cancel the core dump of a VM
Change-Id: I2fa9e82cfbd43c9edb98fac9af41eb0deb0c67ad
Signed-off-by: ShaoHe Feng <shaohef(a)linux.vnet.ibm.com>
---
M vdsm/API.py
M vdsm/BindingXMLRPC.py
M vdsm/define.py
M vdsm/vm.py
M vdsm_api/vdsmapi-schema.json
M vdsm_cli/vdsClient.py
6 files changed, 62 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/30/11130/1
diff --git a/vdsm/API.py b/vdsm/API.py
index 4f5eed8..c5f7d40 100644
--- a/vdsm/API.py
+++ b/vdsm/API.py
@@ -293,6 +293,15 @@
return errCode['noVM']
return v.coreDump(to, dumpParams)
+ def dumpCancel(self):
+ """
+ Cancel a currently outgoing core dump process.
+ """
+ v = self._cif.vmContainer.get(self._UUID)
+ if not v:
+ return errCode['noVM']
+ return v.dumpCancel()
+
def desktopLock(self):
"""
Lock user session in guest operating system using guest agent.
diff --git a/vdsm/BindingXMLRPC.py b/vdsm/BindingXMLRPC.py
index 9fcbefd..17d97b1 100644
--- a/vdsm/BindingXMLRPC.py
+++ b/vdsm/BindingXMLRPC.py
@@ -215,6 +215,10 @@
vm = API.VM(vmId)
return vm.coreDump(to, params)
+ def vmCoreDumpCancel(self, vmId):
+ vm = API.VM(vmId)
+ return vm.dumpCancel()
+
def vmReset(self, vmId):
vm = API.VM(vmId)
return vm.reset()
@@ -764,6 +768,7 @@
(self.vmPause, 'pause'),
(self.vmCont, 'cont'),
(self.vmCoreDump, 'coreDump'),
+ (self.vmCoreDumpCancel, 'dumpCancel'),
(self.vmSnapshot, 'snapshot'),
(self.vmMerge, 'merge'),
(self.vmMergeStatus, 'mergeStatus'),
diff --git a/vdsm/define.py b/vdsm/define.py
index 84aacad..e1d428c 100644
--- a/vdsm/define.py
+++ b/vdsm/define.py
@@ -134,6 +134,9 @@
{'code': 58,
'message':
'Failed to generate coreDump file'}},
+ 'dumpCancelErr': {'status':
+ {'code': 59,
+ 'message': 'Failed to cancel dump'}},
'recovery': {'status':
{'code': 99,
'message':
diff --git a/vdsm/vm.py b/vdsm/vm.py
index be947c6..0a40e97 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -1345,3 +1345,29 @@
return check
finally:
self._guestCpuLock.release()
+
+ def dumpCancel(self):
+ def reportError(self, key='dumpCancelErr', msg=None):
+ if msg is None:
+ error = errCode[key]
+ else:
+ error = {'status':
+ {'code': errCode[key]['status']['code'],
+ 'message': msg}}
+ self.log.error("Failed to cancel core dump. " + msg,
+ exc_info=True)
+ return error
+
+ self._acquireCpuLockWithTimeout()
+ try:
+ if not self.isDoingDump():
+ return reportError(msg='no core dump in process')
+ if self.dumpMode() == "memory":
+ return reportError(msg='invalid to cancel memory dump')
+ self._doCoredumpThread.stop()
+ return {'status': {'code': 0,
+ 'message': 'core dump process stopped'}}
+ except Exception, e:
+ return reportError(msg=e.message)
+ finally:
+ self._guestCpuLock.release()
diff --git a/vdsm_api/vdsmapi-schema.json b/vdsm_api/vdsmapi-schema.json
index 63b0fb1..39d1cba 100644
--- a/vdsm_api/vdsmapi-schema.json
+++ b/vdsm_api/vdsmapi-schema.json
@@ -5474,6 +5474,16 @@
'data': {'to': 'str', 'params': 'DumpParams'}}
##
+# @VM.dumpCancel:
+#
+# Cancel the currently outgoing core dump process.
+#
+# Since: 4.10.4
+#
+##
+{'command': {'class': 'VM', 'name': 'dumpCancel'}}
+
+##
# @VM.monitorCommand:
#
# Send a command to the qemu monitor.
diff --git a/vdsm_cli/vdsClient.py b/vdsm_cli/vdsClient.py
index c4171d9..32ad348 100644
--- a/vdsm_cli/vdsClient.py
+++ b/vdsm_cli/vdsClient.py
@@ -1669,6 +1669,11 @@
return status['status']['code'], status['status']['message']
+ def do_dumpCancel(self, args):
+ vmId = args[0]
+ response = self.s.dumpCancel(vmId)
+ return response['status']['code'], response['status']['message']
+
def coreDump(self, args):
dumpParams = {'crash': False,
'live': False,
@@ -2413,6 +2418,10 @@
'Start live replication to the destination '
'domain'
)),
+ 'coreDumpCancel': (serv.do_dumpCancel,
+ ('<vmId>',
+ 'cancel machine core dump'
+ )),
'coreDump': (serv.coreDump,
('<vmId> <file> [live=<True|False>] '
'[crash=<True|False>] [bypass-cache=<True|False>] '
--
To view, visit http://gerrit.ovirt.org/11130
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2fa9e82cfbd43c9edb98fac9af41eb0deb0c67ad
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: ShaoHe Feng <shaohef(a)linux.vnet.ibm.com>
7 years, 5 months
Change in vdsm[master]: [WIP] Added glusterVolumeTop verb
by tjeyasin@redhat.com
Hello Ayal Baron, Bala.FA, Saggi Mizrahi, Federico Simoncelli, Dan Kenigsberg,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/7844
to review the following change.
Change subject: [WIP] Added glusterVolumeTop verb
......................................................................
[WIP] Added glusterVolumeTop verb
Added glusterVolumeTopOpen verb
Added glusterVolumeTopRead verb
Added glusterVolumeTopWrite verb
Added glusterVolumeTopOpenDir verb
Added glusterVolumeTopReadDir verb
Added glusterVolumeTopReadPerf
verb Added glusterVolumeTopWritePerf verb
Following is the output structure of glusterVolumeTopOpen
{'statusCode' : CODE,
'brickCount': BRICK-COUNT,
'bricks': {BRICK-NAME: {'count':FILE-COUNT,
'currentOpenFds': CURRENT-OPEN-FDS-COUNT,
'maxOpen': MAX-OPEN,
'maxOpenTime': MAX-OPEN-TIME,
'files': [{FILE-NAME: FILE-OPEN-COUNT}, ...]
}, ...} }
Following is the output structure of glusterVolumeTopRead
{'statusCode': CODE,
'brickCount': BRICK-COUNT,
'topOp': TOP-OP,
'bricks': {BRICK-NAME: {
'count': FILE-COUNT,
'files': [{FILE-NAME: FILE-READ-COUNT}, ...]}
,...}}
Following is the output structure glusterVolumeTopWrite
{'statusCode' : CODE,
'brickCount': BRICK-COUNT,
'topOp': TOP-OP,
'bricks': {BRICK-NAME: {'count': FILE-COUNT,
'files': [{FILE-NAME: FILE-WRITE-COUNT}...]}
,...}}
Following is the output structure glusterVolumeTopOpenDir
{'statusCode': CODE,
'brickCount': BRICK-COUNT,
'topOp': TOP-OP,
'bricks': {BRICK-NAME: {'count':OPEN-DIR-COUNT,
'files': [{DIR-NAME: DIR-OPEN-COUNT}, ...]}
,...}
Following is the output structure glusterVolumeTopReadDir
{'statusCode': CODE,
'brickCount': BRICK-COUNT,
'topOp': TOP-OP,
'bricks': {BRICK-NAME: {'count':READ-DIR-COUNT,
'files': [{DIR-NAME: DIR-READ-COUNT}, ...]}
,...}
Following is the output structure glusterVolumeTopReadPerf
{'statusCode': CODE,
'brickCount': BRICK-COUNT,
'topOp': TOP-OP,
'bricks': {BRICK-NAME: {'fileCount':READ-COUNT,
'throughput': BRICK-WISE-READ-THROUGHPUT,
' timeTaken': TIME-TAKEN,
'files': [{FILE-NAME:
{'throughput':FILE-READ-THROUGHPUT,
'time': TIME}}, ...]}
,...}}
Following is the output structure glusterVolumeTopWritePerf
{'statusCode': CODE,
'brickCount': BRICK-COUNT,
'topOp': TOP-OP,
'bricks': {BRICK-NAME: {'fileCount':WRITE-COUNT,
'throughput': BRICK-WISE-WRITE-THROUGHPUT,
' timeTaken': TIME-TAKEN,
'files': [{FILE-NAME:
{'throughput':FILE-WRITE-THROUGHPUT,
'time': TIME}}, ...]}
,...}}
Change-Id: I96486363a9acb7472014a67fcd2d5185d4f3c428
Signed-off-by: Timothy Asir <tjeyasin(a)redhat.com>
---
M vdsm/gluster/api.py
M vdsm/gluster/cli.py
M vdsm/gluster/exception.py
M vdsm_cli/vdsClientGluster.py
4 files changed, 372 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/44/7844/1
diff --git a/vdsm/gluster/api.py b/vdsm/gluster/api.py
index e52430b..3f493e0 100644
--- a/vdsm/gluster/api.py
+++ b/vdsm/gluster/api.py
@@ -241,6 +241,61 @@
status = self.svdsmProxy.glusterVolumeProfileInfo(volumeName)
return {'profileInfo': status}
+ @exportAsVerb
+ def volumeTopOpen(self, volumeName, brickName=None, count=None,
+ options=None):
+ status = self.svdsmProxy.glusterVolumeTopOpen(volumeName,
+ brickName, count)
+ return {'topOpen': status}
+
+ @exportAsVerb
+ def volumeTopRead(self, volumeName, brickName=None, count=None,
+ options=None):
+ status = self.svdsmProxy.glusterVolumeTopRead(volumeName,
+ brickName, count)
+ return {'topRead': status}
+
+ @exportAsVerb
+ def volumeTopWrite(self, volumeName, brickName=None, count=None,
+ options=None):
+ status = self.svdsmProxy.glusterVolumeTopWrite(volumeName,
+ brickName, count)
+ return {'topWrite': status}
+
+ @exportAsVerb
+ def volumeTopOpenDir(self, volumeName, brickName=None, count=None,
+ options=None):
+ status = self.svdsmProxy.glusterVolumeTopOpenDir(volumeName,
+ brickName, count)
+ return {'topOpenDir': status}
+
+ @exportAsVerb
+ def volumeTopWriteDir(self, volumeName, brickName=None, count=None,
+ options=None):
+ status = self.svdsmProxy.glusterVolumeTopWriteDir(volumeName,
+ brickName, count)
+ return {'topWriteDir': status}
+
+ @exportAsVerb
+ def volumeTopReadPerf(self, volumeName, blockSize=None, count=None,
+ brickName=None, listCount=None, options=None):
+ status = self.svdsmProxy.glusterVolumeTopReadPerf(volumeName,
+ blockSize,
+ count,
+ brickName,
+ listCount)
+ return {'topReadPerf': status}
+
+ @exportAsVerb
+ def volumeTopWritePerf(self, volumeName, blockSize=None, count=None,
+ brickName=None, listCount=None, options=None):
+ status = self.svdsmProxy.glusterVolumeTopWritePerf(volumeName,
+ blockSize,
+ count,
+ brickName,
+ listCount)
+ return {'topWritePerf': status}
+
def getGlusterMethods(gluster):
l = []
diff --git a/vdsm/gluster/cli.py b/vdsm/gluster/cli.py
index b91a04f..ba4768c 100644
--- a/vdsm/gluster/cli.py
+++ b/vdsm/gluster/cli.py
@@ -334,6 +334,66 @@
return volumeInfoDict
+def _parseGlusterVolumeTopOpen(tree):
+ bricks = {}
+ for brick in tree.findall('volTop/brick'):
+ fileList = []
+ for file in brick.findall('file'):
+ fileList.append({file.find('filename').text:
+ file.find('count').text})
+ bricks[brick.find('name').text] = {
+ 'count': brick.find('members').text,
+ 'currentOpen': brick.find('currentOpen').text,
+ 'maxOpen': brick.find('maxOpen').text,
+ 'maxOpenTime': brick.find('maxOpenTime').text,
+ 'files': fileList}
+ status = {
+ 'topOp': tree.find('volTop/topOp').text,
+ 'brickCount': tree.find('volTop/brickCount').text,
+ 'statusCode': tree.find('opRet').text,
+ 'bricks': bricks}
+ return status
+
+
+def _parseGlusterVolumeTop(tree):
+ bricks = {}
+ for brick in tree.findall('volTop/brick'):
+ fileList = []
+ for fileTag in brick.findall('file'):
+ fileList.append({fileTag.find('filename').text:
+ fileTag.find('count').text})
+ bricks[brick.find('name').text] = {
+ 'count': brick.find('members').text,
+ 'files': fileList}
+ status = {
+ 'topOp': tree.find('volTop/topOp').text,
+ 'brickCount': tree.find('volTop/brickCount').text,
+ 'statusCode': tree.find('opRet').text,
+ 'bricks': bricks}
+ return status
+
+
+def _parseGlusterVolumeTopPerf(tree):
+ bricks = {}
+ for brick in tree.findall('volTop/brick'):
+ fileList = []
+ for fileTag in brick.findall('file'):
+ fileList.append({fileTag.find('filename').text:
+ {'count': fileTag.find('count').text,
+ 'time': fileTag.find('time').text}})
+ bricks[brick.find('name').text] = {
+ 'count': brick.find('members').text,
+ 'throughput': brick.find('throughput').text,
+ 'timeTaken': brick.find('timeTaken').text,
+ 'files': fileList}
+ status = {
+ 'topOp': tree.find('volTop/topOp').text,
+ 'brickCount': tree.find('volTop/brickCount').text,
+ 'statusCode': tree.find("opRet").text,
+ 'bricks': bricks}
+ return status
+
+
def _parseGlusterVolumeProfileInfo(tree):
bricks = {}
for brick in tree.findall('volProfile/brick'):
@@ -819,3 +879,132 @@
return _parseGlusterVolumeProfileInfo(xmltree)
except:
raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopOpen(volumeName, brickName=None, count=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "open"]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if count:
+ command += ["list-cnt", "%s" % count]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopOpenFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTopOpen(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopRead(volumeName, brickName=None, count=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "read"]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if count:
+ command += ["list-cnt", "%s" % count]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopReadFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTop(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopWrite(volumeName, brickName=None, count=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "write"]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if count:
+ command += ["list-cnt", "%s" % count]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopWriteFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTop(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopOpenDir(volumeName, brickName=None, count=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "opendir"]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if count:
+ command += ["list-cnt", "%s" % count]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopOpenDirFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTop(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopReadDir(volumeName, bricName=None, count=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "write"]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if count:
+ command += ["list-cnt", "%s" % count]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopReadDirFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTop(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopReadPerf(volumeName, blockSize=None, count=None,
+ brickName=None, listCount=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "read-perf"]
+ if blockSize:
+ command += ["bs", "%s" % blockSize]
+ if count:
+ command += ["count", "%s" % count]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if listCount:
+ command += ["list-cnt", "%s" % listCount]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopReadPerfFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTopPerf(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
+@exportToSuperVdsm
+def volumeTopWritePerf(volumeName, blockSize=None, count=None,
+ brickName=None, listCount=None):
+ command = _getGlusterVolCmd() + ["top", volumeName, "write-perf"]
+ if blockSize:
+ command += ["bs", "%s" % blockSize]
+ if count:
+ command += ["count", "%s" % count]
+ if brickName:
+ command += ["brick", "%s" % brickName]
+ if listCount:
+ command += ["list-cnt", "%s" % listCount]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeTopWritePerfFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseGlusterVolumeTopPerf(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
diff --git a/vdsm/gluster/exception.py b/vdsm/gluster/exception.py
index bc20dd0..b392ec8 100644
--- a/vdsm/gluster/exception.py
+++ b/vdsm/gluster/exception.py
@@ -343,6 +343,41 @@
message = "Volume profile info failed"
+class GlusterVolumeTopOpenFailedException(GlusterVolumeException):
+ code = 4161
+ message = "Volume top open failed"
+
+
+class GlusterVolumeTopReadFailedException(GlusterVolumeException):
+ code = 4162
+ message = "Volume top read failed"
+
+
+class GlusterVolumeTopWriteFailedException(GlusterVolumeException):
+ code = 4163
+ message = "Volume top write failed"
+
+
+class GlusterVolumeTopOpenDirFailedException(GlusterVolumeException):
+ code = 4164
+ message = "Volume top open dir failed"
+
+
+class GlusterVolumeTopReadDirFailedException(GlusterVolumeException):
+ code = 4165
+ message = "Volume top read dir failed"
+
+
+class GlusterVolumeTopReadPerfFailedException(GlusterVolumeException):
+ code = 4166
+ message = "Volume top read perf failed"
+
+
+class GlusterVolumeTopWritePerfFailedException(GlusterVolumeException):
+ code = 4167
+ message = "Volume top write perf failed"
+
+
# Host
class GlusterHostException(GlusterException):
code = 4400
diff --git a/vdsm_cli/vdsClientGluster.py b/vdsm_cli/vdsClientGluster.py
index 8422695..3663c63 100644
--- a/vdsm_cli/vdsClientGluster.py
+++ b/vdsm_cli/vdsClientGluster.py
@@ -221,6 +221,41 @@
pp.pprint(status)
return status['status']['code'], status['status']['message']
+ def do_glusterVolumeTopOpen(self, args):
+ status = self.s.glusterVolumeTopOpen(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeTopRead(self, args):
+ status = self.s.glusterVolumeTopRead(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeTopWrite(self, args):
+ status = self.s.glusterVolumeTopWrite(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeTopOpenDir(self, args):
+ status = self.s.glusterVolumeTopOpenDir(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeTopReadDir(self, args):
+ status = self.s.glusterVolumeTopReadDir(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeTopReadPerf(self, args):
+ status = self.s.glusterVolumeTop(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterVolumeTopWritePerf(self, args):
+ status = self.s.glusterVolumeTop(args[0])
+ pp.pprint(status)
+ return status['status']['code'], status['status']['message']
+
def getGlusterCmdDict(serv):
return {
@@ -403,4 +438,62 @@
('<volume_name>\n\t<volume_name> is existing volume name',
'get gluster volume profile info'
)),
+ 'glusterVolumeTopOpen':
+ (serv.do_glusterVolumeTopOpen,
+ ('<volume_name> [brick=<existing_brick>] '
+ '[count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get volume top open fd count and maximum fd count of '
+ 'a given volume with its all brick or specified brick'
+ )),
+ 'glusterVolumeTopRead':
+ (serv.do_glusterVolumeTopRead,
+ ('<volume_name> [brick=<existing_brick>] '
+ '[count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get list of highest read calls on each brick or '
+ 'a specified brick of a volume'
+ )),
+ 'glusterVolumeTopWrite':
+ (serv.do_glusterVolumeTopWrite,
+ ('<volume_name> [brick=<existing_brick>] '
+ '[count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get list of highest write calls on each brick or '
+ 'a specified brick of a volume'
+ )),
+ 'glusterVolumeTopOpenDir':
+ (serv.do_glusterVolumeTopOpenDir,
+ ('<volume_name> [brick=<existing_brick>] '
+ '[count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get list of highest open calls on directories of each brick '
+ 'or a specified brick of a volume'
+ )),
+ 'glusterVolumeTopReadDir':
+ (serv.do_glusterVolumeTopReadDir,
+ ('<volume_name> [brick=<existing_brick>] '
+ '[count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get list of highest read calls on directories of each brick '
+ 'or a specified brick of a volume'
+ )),
+ 'glusterVolumeTopReadPerf':
+ (serv.do_glusterVolumeTopReadPerf,
+ ('<volume_name> [block_size=<block_size>] '
+ '[count=<count>] [brick=<existing_brick>] '
+ '[list_count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get list of read throughput of files on bricks. '
+ 'if the block size and the count is not specified, '
+ 'it will give the output based on historical data'
+ )),
+ 'glusterVolumeTopWritePerf':
+ (serv.do_glusterVolumeTopWritePerf,
+ ('<volume_name> [block_size=<block_size>] '
+ '[count=<count>] [brick=<existing_brick>] '
+ '[list_count=<list_count>]\n\t'
+ '<volume_name> is existing volume name\n\t'
+ 'get list of write throughput of files on bricks'
+ )),
}
--
To view, visit http://gerrit.ovirt.org/7844
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I96486363a9acb7472014a67fcd2d5185d4f3c428
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Timothy Asir <tjeyasin(a)redhat.com>
Gerrit-Reviewer: Ayal Baron <abaron(a)redhat.com>
Gerrit-Reviewer: Bala.FA <barumuga(a)redhat.com>
Gerrit-Reviewer: Dan Kenigsberg <danken(a)redhat.com>
Gerrit-Reviewer: Federico Simoncelli <fsimonce(a)redhat.com>
Gerrit-Reviewer: Saggi Mizrahi <smizrahi(a)redhat.com>
7 years, 5 months
Change in vdsm[master]: gluster: Setup and verify ssl connection between nodes.
by tjeyasin@redhat.com
Hello Ayal Baron, Bala.FA, Saggi Mizrahi, Dan Kenigsberg,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/18355
to review the following change.
Change subject: gluster: Setup and verify ssl connection between nodes.
......................................................................
gluster: Setup and verify ssl connection between nodes.
This will be used in geo-replication session creation.
Because, there should be password-less ssh access between
at least one node of master volume and one node of slave
volume before creating geo-replication session.
Below new verbs are added
*glusterValidateSshConnection
*glusterSetupSshConnection
Change-Id: Ia6f040b1343998de4f8e28419c63e380240368db
Signed-off-by: Bala.FA <barumuga(a)redhat.com>
Signed-off-by: Timothy Asir <tjeyasin(a)redhat.com>
---
M client/vdsClientGluster.py
M vdsm.spec.in
M vdsm/gluster/api.py
M vdsm/gluster/exception.py
4 files changed, 191 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/55/18355/1
diff --git a/client/vdsClientGluster.py b/client/vdsClientGluster.py
index 90af83e..0ae7ec7 100644
--- a/client/vdsClientGluster.py
+++ b/client/vdsClientGluster.py
@@ -424,6 +424,30 @@
pp.pprint(status)
return status['status']['code'], status['status']['message']
+ def do_glusterValidateSshConnection(self, args):
+ params = self._eqSplit(args)
+ host = params.get('host', '')
+ fingerprint = params.get('fingerprint', '')
+ username = params.get('username', '')
+
+ status = self.s.glusterValidateSshConnection(host,
+ fingerprint,
+ username)
+ return status['status']['code'], status['status']['message']
+
+ def do_glusterSetupSshConnection(self, args):
+ params = self._eqSplit(args)
+ host = params.get('host', '')
+ fingerprint = params.get('fingerprint', '')
+ username = params.get('username', '')
+ password = params.get('password', '')
+
+ status = self.s.glusterSetupSshConnection(host,
+ fingerprint,
+ username,
+ password)
+ return status['status']['code'], status['status']['message']
+
def getGlusterCmdDict(serv):
return \
@@ -705,4 +729,15 @@
'not set'
'(swift, glusterd, smb, memcached)'
)),
+ 'glusterValidateSshConnection': (
+ serv.do_glusterValidateSshConnection,
+ ('host=<host> fingerprint=<fingerprint> username=<username>',
+ 'validate passwordless ssh connection'
+ )),
+ 'glusterSetupSshConnection': (
+ serv.do_glusterSetupSshConnection,
+ ('host=<host> fingerprint=<fingerprint> username=<username> '
+ 'password=<password>',
+ 'setup passwordless ssh connection'
+ )),
}
diff --git a/vdsm.spec.in b/vdsm.spec.in
index e2307e0..81d8f9f 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -524,6 +524,7 @@
Requires: glusterfs-fuse
Requires: glusterfs-rdma
Requires: python-magic
+Requires: python-paramiko
%description gluster
Gluster plugin enables VDSM to serve Gluster functionalities.
diff --git a/vdsm/gluster/api.py b/vdsm/gluster/api.py
index 4bd8308..1d93150 100644
--- a/vdsm/gluster/api.py
+++ b/vdsm/gluster/api.py
@@ -19,11 +19,28 @@
#
from functools import wraps
+import socket
+import paramiko
+import logging
+import os
+import re
from vdsm.define import doneCode
import supervdsm as svdsm
+from vdsm.config import config
+from vdsm import utils
+import exception as ge
_SUCCESS = {'status': doneCode}
+_KEYFILE = config.get('vars', 'trust_store_path') + '/keys/vdsmkey.pem'
+_sshKeyGenCommandPath = utils.CommandPath("ssh-keygen",
+ "/usr/bin/ssh-keygen",
+ )
+_SSH_COPY_ID_CMD = 'umask 077 && mkdir -p ~/.ssh && ' \
+ 'cat >> ~/.ssh/authorized_keys && if test -x /sbin/restorecon; ' \
+ 'then /sbin/restorecon ~/.ssh ~/.ssh/authorized_keys >/dev/null 2>&1; ' \
+ 'else true; fi'
+paramiko.util.get_logger('paramiko').setLevel(logging.ERROR)
GLUSTER_RPM_PACKAGES = (
('glusterfs', 'glusterfs'),
@@ -59,6 +76,57 @@
wrapper.exportAsVerb = True
return wrapper
+
+
+class VolumeStatus():
+ ONLINE = 'ONLINE'
+ OFFLINE = 'OFFLINE'
+
+
+class HostKeyMatchException(paramiko.SSHException):
+ def __init__(self, hostname, fingerprint, expected_fingerprint):
+ self.err = 'Fingerprint %s of host %s does not match with %s' % \
+ (fingerprint, hostname, expected_fingerprint)
+ paramiko.SSHException.__init__(self, self.err)
+ self.hostname = hostname
+ self.fingerprint = fingerprint
+ self.expected_fingerprint = expected_fingerprint
+
+
+class HostKeyMatchPolicy(paramiko.AutoAddPolicy):
+ def __init__(self, expected_fingerprint):
+ self.expected_fingerprint = expected_fingerprint
+
+ def missing_host_key(self, client, hostname, key):
+ s = paramiko.util.hexlify(key.get_fingerprint())
+ fingerprint = ':'.join(re.findall('..', s))
+ if fingerprint.upper() == self.expected_fingerprint.upper():
+ paramiko.AutoAddPolicy.missing_host_key(self, client, hostname,
+ key)
+ else:
+ raise HostKeyMatchException(hostname, fingerprint,
+ self.expected_fingerprint)
+
+
+class GlusterSsh(paramiko.SSHClient):
+ def __init__(self, hostname, fingerprint, port=22, username=None,
+ password=None, pkey=None, key_filenames=[], timeout=None,
+ allow_agent=True, look_for_keys=True, compress=False):
+ paramiko.SSHClient.__init__(self)
+ key_file_list = []
+ if os.path.exists(_KEYFILE):
+ key_file_list.append(_KEYFILE)
+ key_file_list.append(key_filenames)
+ self.set_missing_host_key_policy(HostKeyMatchPolicy(fingerprint))
+ try:
+ paramiko.SSHClient.connect(self, hostname, port, username,
+ password, pkey, key_file_list, timeout,
+ allow_agent, look_for_keys, compress)
+ except socket.error, e:
+ err = ['%s: %s' % (hostname, e)]
+ raise ge.GlusterSshConnectionFailedException(err=err)
+ except HostKeyMatchException, e:
+ raise ge.GlusterSshHostKeyMismatchException(err=[e.err])
class GlusterApi(object):
@@ -287,6 +355,57 @@
status = self.svdsmProxy.glusterServicesGet(serviceNames)
return {'services': status}
+ def _validateSshConnection(self, hostname, fingerprint, username):
+ try:
+ ssh = GlusterSsh(hostname,
+ fingerprint,
+ username=username)
+ ssh.close()
+ return True
+ except paramiko.SSHException, e:
+ raise ge.GlusterSshHostKeyAuthException(err=[str(e)])
+
+ @exportAsVerb
+ def validateSshConnection(self, hostname, fingerprint, username,
+ options=None):
+ self._validateSshConnection(hostname, fingerprint, username)
+
+ @exportAsVerb
+ def setupSshConnection(self, hostname, fingerprint, username, password,
+ options=None):
+ rc, out, err = utils.execCmd([_sshKeyGenCommandPath.cmd, '-y', '-f',
+ _KEYFILE])
+ if rc != 0:
+ raise ge.GlusterSshPubKeyGenerationFailedException(rc=rc, err=err)
+
+ try:
+ ssh = GlusterSsh(hostname,
+ fingerprint,
+ username=username,
+ password=password)
+ c = ssh.get_transport().open_session()
+ c.exec_command(_SSH_COPY_ID_CMD)
+ stdin = c.makefile('wb')
+ stdout = c.makefile('rb')
+ stderr = c.makefile_stderr('rb')
+ stdin.write('\n'.join(out) + '\n')
+ stdin.flush()
+ stdin.close()
+ c.shutdown_write()
+ rc = c.recv_exit_status()
+ out = stdout.read().splitlines()
+ err = stderr.read().splitlines()
+ c.close()
+ ssh.close()
+ if rc != 0:
+ raise ge.GlusterSshSetupExecFailedException(rc=rc,
+ out=out,
+ err=err)
+ except paramiko.AuthenticationException, e:
+ raise ge.GlusterSshHostAuthException(err=[str(e)])
+
+ self._validateSshConnection(hostname, fingerprint, username)
+
def getGlusterMethods(gluster):
l = []
diff --git a/vdsm/gluster/exception.py b/vdsm/gluster/exception.py
index c569a9e..c9a0548 100644
--- a/vdsm/gluster/exception.py
+++ b/vdsm/gluster/exception.py
@@ -484,3 +484,39 @@
prefix = "%s: " % (action)
self.message = prefix + "Service action is not supported"
self.err = [self.message]
+
+
+# Ssh
+class GlusterSshException(GlusterException):
+ code = 4500
+ message = "Gluster ssh exception"
+
+
+class GlusterSshConnectionFailedException(GlusterSshException):
+ code = 4501
+ message = "SSH connection failed"
+
+
+class GlusterSshHostKeyMismatchException(GlusterSshException):
+ code = 4502
+ message = "Host key match failed"
+
+
+class GlusterSshHostKeyAuthException(GlusterSshException):
+ code = 4503
+ message = "SSH host key authentication failed"
+
+
+class GlusterSshHostAuthException(GlusterSshException):
+ code = 4504
+ message = "SSH host authentication failed"
+
+
+class GlusterSshPubKeyGenerationFailedException(GlusterSshException):
+ code = 4505
+ message = "SSH public key generation failed"
+
+
+class GlusterSshSetupExecFailedException(GlusterSshException):
+ code = 4506
+ message = "SSH key setup execution failed"
--
To view, visit http://gerrit.ovirt.org/18355
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia6f040b1343998de4f8e28419c63e380240368db
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Timothy Asir <tjeyasin(a)redhat.com>
Gerrit-Reviewer: Ayal Baron <abaron(a)redhat.com>
Gerrit-Reviewer: Bala.FA <barumuga(a)redhat.com>
Gerrit-Reviewer: Dan Kenigsberg <danken(a)redhat.com>
Gerrit-Reviewer: Saggi Mizrahi <smizrahi(a)redhat.com>
7 years, 5 months
Change in vdsm[master]: do not use OOP for padding snapshot's memory volume
by ahadas@redhat.com
Arik Hadas has uploaded a new change for review.
Change subject: do not use OOP for padding snapshot's memory volume
......................................................................
do not use OOP for padding snapshot's memory volume
Change-Id: I2a94354e188019f3afd209633979ec5a5b35293b
Signed-off-by: Arik Hadas <ahadas(a)redhat.com>
---
M vdsm/virt/vm.py
1 file changed, 3 insertions(+), 9 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/38/26538/1
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 6711bc6..b8ce533 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -3543,15 +3543,9 @@
'_srcDomXML': self._dom.XMLDesc(0),
'elapsedTimeOffset': time.time() - self._startTime}
- def _padMemoryVolume(memoryVolPath, sdUUID):
- sdType = sd.name2type(
- self.cif.irs.getStorageDomainInfo(sdUUID)['info']['type'])
- if sdType in sd.FILE_DOMAIN_TYPES:
- if sdType == sd.NFS_DOMAIN:
- oop.getProcessPool(sdUUID).fileUtils. \
- padToBlockSize(memoryVolPath)
- else:
- fileUtils.padToBlockSize(memoryVolPath)
+ def _padMemoryVolume(memoryVolPath):
+ if not utils.isBlockDevice(memoryVolPath):
+ fileUtils.padToBlockSize(memoryVolPath)
snap = xml.dom.minidom.Element('domainsnapshot')
disks = xml.dom.minidom.Element('disks')
--
To view, visit http://gerrit.ovirt.org/26538
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2a94354e188019f3afd209633979ec5a5b35293b
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Arik Hadas <ahadas(a)redhat.com>
7 years, 5 months
Change in vdsm[master]: clientIF: Remove unnecessary device is disk check
by sgotliv@redhat.com
Sergey Gotliv has uploaded a new change for review.
Change subject: clientIF: Remove unnecessary device is disk check
......................................................................
clientIF: Remove unnecessary device is disk check
Change-Id: I98317e805e6770df5dacd3237a383aaca78fde1e
Signed-off-by: Sergey Gotliv <sgotliv(a)redhat.com>
---
M vdsm/clientIF.py
1 file changed, 1 insertion(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/63/22363/1
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index c083991..124f8e5 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -244,7 +244,7 @@
def prepareVolumePath(self, drive, vmId=None):
if type(drive) is dict:
# PDIV drive format
- if drive['device'] == 'disk' and vm.isVdsmImage(drive):
+ if vm.isVdsmImage(drive):
res = self.irs.prepareImage(
drive['domainID'], drive['poolID'],
drive['imageID'], drive['volumeID'])
--
To view, visit http://gerrit.ovirt.org/22363
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I98317e805e6770df5dacd3237a383aaca78fde1e
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Sergey Gotliv <sgotliv(a)redhat.com>
7 years, 5 months
Change in vdsm[master]: Added gluster tag support in getAllTasks()
by barumuga@redhat.com
Hello Ayal Baron, Timothy Asir, Saggi Mizrahi, Federico Simoncelli, Dan Kenigsberg,
I'd like you to do a code review. Please visit
http://gerrit.ovirt.org/7579
to review the following change.
Change subject: Added gluster tag support in getAllTasks()
......................................................................
Added gluster tag support in getAllTasks()
If param tag is empty, all tasks including gluster tasks are returned,
else tasks those tags are in param tag list are returned.
As below verbs are not consumed by engine/RHS-C yet, its OK to differ in
compatibility issue now.
glusterVolumeRebalanceStart
glusterVolumeRebalanceStatus
glusterVolumeReplaceBrickStart
glusterVolumeReplaceBrickStatus
glusterVolumeRemoveBrickStart
glusterVolumeRemoveBrickStatus
Change-Id: I9c765cbfebb5ba22f0d21efa04c824ea4daf6432
Signed-off-by: Bala.FA <barumuga(a)redhat.com>
---
M tests/gluster_cli_tests.py
M vdsm/gluster/cli.py
M vdsm/gluster/exception.py
M vdsm/storage/taskManager.py
4 files changed, 367 insertions(+), 95 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/79/7579/1
diff --git a/tests/gluster_cli_tests.py b/tests/gluster_cli_tests.py
index f442893..9c6357c 100644
--- a/tests/gluster_cli_tests.py
+++ b/tests/gluster_cli_tests.py
@@ -28,6 +28,7 @@
from gluster import cli as gcli
except ImportError:
pass
+import xml.etree.cElementTree as etree
class GlusterCliTests(TestCaseBase):
@@ -115,3 +116,74 @@
def test_parsePeerStatus(self):
self._parsePeerStatus_empty_test()
self._parsePeerStatus_test()
+
+ def _parseVolumeStatusAll_test(self):
+ out = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<cliOutput>
+ <opRet>0</opRet>
+ <opErrno>0</opErrno>
+ <opErrstr></opErrstr>
+ <volumes>
+ <volume>
+ <name>V1</name>
+ <id>03eace73-9197-49d0-a877-831bc6e9dac2</id>
+ <tasks>
+ <task>
+ <name>rebalance</name>
+ <id>12345473-9197-49d0-a877-831bc6e9dac2</id>
+ </task>
+ </tasks>
+ </volume>
+ <volume>
+ <name>V2</name>
+ <id>03eace73-1237-49d0-a877-831bc6e9dac2</id>
+ <tasks>
+ <task>
+ <name>replace-brick</name>
+ <id>12345473-1237-49d0-a877-831bc6e9dac2</id>
+ <sourceBrick>192.168.122.167:/tmp/V2-b1</sourceBrick>
+ <destBrick>192.168.122.168:/tmp/V2-b1</destBrick>
+ </task>
+ </tasks>
+ </volume>
+ <volume>
+ <name>V3</name>
+ <id>03eace73-1237-1230-a877-831bc6e9dac2</id>
+ <tasks>
+ <task>
+ <name>remove-brick</name>
+ <id>12345473-1237-1230-a877-831bc6e9dac2</id>
+ <BrickCount>2</BrickCount>
+ <brick>192.168.122.167:/tmp/V3-b1</brick>
+ <brick>192.168.122.168:/tmp/V3-b1</brick>
+ </task>
+ </tasks>
+ </volume>
+ </volumes>
+</cliOutput>"""
+ tree = etree.fromstring(out)
+ status = gcli._parseVolumeStatusAll(tree)
+ self.assertEquals(status,
+ {'12345473-1237-1230-a877-831bc6e9dac2':
+ {'bricks': ['192.168.122.167:/tmp/V3-b1',
+ '192.168.122.168:/tmp/V3-b1'],
+ 'taskType': 'remove-brick',
+ 'volumeId':
+ '03eace73-1237-1230-a877-831bc6e9dac2',
+ 'volumeName': 'V3'},
+ '12345473-1237-49d0-a877-831bc6e9dac2':
+ {'bricks': ['192.168.122.167:/tmp/V2-b1',
+ '192.168.122.168:/tmp/V2-b1'],
+ 'taskType': 'replace-brick',
+ 'volumeId':
+ '03eace73-1237-49d0-a877-831bc6e9dac2',
+ 'volumeName': 'V2'},
+ '12345473-9197-49d0-a877-831bc6e9dac2':
+ {'bricks': [],
+ 'taskType': 'rebalance',
+ 'volumeId':
+ '03eace73-9197-49d0-a877-831bc6e9dac2',
+ 'volumeName': 'V1'}})
+
+ def test_parseVolumeStatusAll(self):
+ self._parseVolumeStatusAll_test()
diff --git a/vdsm/gluster/cli.py b/vdsm/gluster/cli.py
index 95de106..1f464f6 100644
--- a/vdsm/gluster/cli.py
+++ b/vdsm/gluster/cli.py
@@ -84,6 +84,55 @@
raise ge.GlusterCmdFailedException(rc=rv, err=[msg])
+class TaskType:
+ REBALANCE = 'rebalance'
+ REPLACE_BRICK = 'replace-brick'
+ REMOVE_BRICK = 'remove-brick'
+
+
+def _parseVolumeStatusAll(tree):
+ """
+ returns {TaskId: {'volumeName': VolumeName,
+ 'volumeId': VolumeId,
+ 'taskType': TaskType,
+ 'bricks': BrickList}, ...}
+ """
+ tasks = {}
+ for el in tree.findall('volumes/volume'):
+ volumeName = el.find('name').text
+ volumeId = el.find('id').text
+ for c in el.findall('tasks/task'):
+ taskType = c.find('name').text
+ taskId = c.find('id').text
+ bricks = []
+ if taskType == TaskType.REPLACE_BRICK:
+ bricks.append(c.find('sourceBrick').text)
+ bricks.append(c.find('destBrick').text)
+ elif taskType == TaskType.REMOVE_BRICK:
+ for b in c.findall('brick'):
+ bricks.append(b.text)
+ elif taskType == TaskType.REBALANCE:
+ pass
+ tasks[taskId] = {'volumeName': volumeName,
+ 'volumeId': volumeId,
+ 'taskType': taskType,
+ 'bricks': bricks}
+ return tasks
+
+
+@exportToSuperVdsm
+def volumeStatusAll():
+ command = _getGlusterVolCmd() + ["status", "all"]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeStatusAllFailedException(rc=e.rc, err=e.err)
+ try:
+ return _parseVolumeStatusAll(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
+
+
def _parseVolumeInfo(out):
if not out[0].strip():
del out[0]
@@ -300,11 +349,15 @@
command.append("start")
if force:
command.append("force")
- rc, out, err = _execGluster(command)
- if rc:
- raise ge.GlusterVolumeRebalanceStartFailedException(rc, out, err)
- else:
- return True
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRebalanceStartFailedException(rc=e.rc,
+ err=e.err)
+ try:
+ return {'taskId': xmltree.find('id').text}
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
@exportToSuperVdsm
@@ -312,84 +365,147 @@
command = _getGlusterVolCmd() + ["rebalance", volumeName, "stop"]
if force:
command.append('force')
- rc, out, err = _execGluster(command)
- if rc:
- raise ge.GlusterVolumeRebalanceStopFailedException(rc, out, err)
- else:
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRebalanceStopFailedException(rc=e.rc,
+ err=e.err)
+
+
+class TaskStatus():
+ RUNNING = 'RUNNING'
+ FAILED = 'FAILED'
+ COMPLETED = 'COMPLETED'
+
+
+def _parseVolumeRebalanceRemoveBrickStatus(xmltree, mode):
+ """
+ returns {'taskId': UUID,
+ 'host': [{'name': NAME,
+ 'id': HOSTID,
+ 'filesScanned': INT,
+ 'filesMoved': INT,
+ 'filesFailed': INT,
+ 'totalSizeMoved': INT,
+ 'status': TaskStatus},...]
+ 'summary': {'filesScanned': INT,
+ 'filesMoved': INT,
+ 'filesFailed': INT,
+ 'totalSizeMoved': INT,
+ 'status': TaskStatus}}
+ """
+ if mode == 'rebalance':
+ tree = xmltree.find('volRebalance')
+ elif mode == 'remove-brick':
+ tree = xmltree.find('volRemoveBrick')
+ else:
+ return
+ status = \
+ {'taskId': tree.find('id').text,
+ 'summary': \
+ {'filesScanned': int(tree.find('summary/filesScanned').text),
+ 'filesMoved': int(tree.find('summary/filesMoved').text),
+ 'filesFailed': int(tree.find('summary/filesFailed').text),
+ 'totalSizeMoved': int(tree.find('summary/totalSizeMoved').text),
+ 'status': tree.find('summary/status').text},
+ 'host': []}
+ for el in tree.findall('node'):
+ status['host'].append({'name': el.find('name').text,
+ 'id': el.find('id').text,
+ 'filesScanned':
+ int(el.find('filesScanned').text),
+ 'filesMoved': int(el.find('filesMoved').text),
+ 'filesFailed': int(el.find('filesFailed').text),
+ 'totalSizeMoved':
+ int(el.find('totalSizeMoved').text),
+ 'status': el.find('status').text})
+ return status
+
+
+def _parseVolumeRebalanceStatus(tree):
+ return _parseVolumeRebalanceRemoveBrickStatus(tree, 'rebalance')
@exportToSuperVdsm
def volumeRebalanceStatus(volumeName):
- rc, out, err = _execGluster(_getGlusterVolCmd() + ["rebalance", volumeName,
- "status"])
- if rc:
- raise ge.GlusterVolumeRebalanceStatusFailedException(rc, out, err)
- if 'in progress' in out[0]:
- return BrickStatus.RUNNING, "\n".join(out)
- elif 'complete' in out[0]:
- return BrickStatus.COMPLETED, "\n".join(out)
- else:
- return BrickStatus.UNKNOWN, "\n".join(out)
+ command = _getGlusterVolCmd() + ["rebalance", volumeName, "status"]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRebalanceStatusFailedException(rc=e.rc,
+ err=e.err)
+ try:
+ return _parseVolumeRebalanceStatus(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
@exportToSuperVdsm
def volumeReplaceBrickStart(volumeName, existingBrick, newBrick):
- rc, out, err = _execGluster(_getGlusterVolCmd() + ["replace-brick",
- volumeName,
- existingBrick, newBrick,
- "start"])
- if rc:
- raise ge.GlusterVolumeReplaceBrickStartFailedException(rc, out, err)
- else:
- return True
+ command = _getGlusterVolCmd() + ["replace-brick", volumeName,
+ existingBrick, newBrick, "start"]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeReplaceBrickStartFailedException(rc=e.rc,
+ err=e.err)
+ try:
+ return {'taskId': xmltree.find('id').text}
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
@exportToSuperVdsm
def volumeReplaceBrickAbort(volumeName, existingBrick, newBrick):
- rc, out, err = _execGluster(_getGlusterVolCmd() + ["replace-brick",
- volumeName,
- existingBrick, newBrick,
- "abort"])
- if rc:
- raise ge.GlusterVolumeReplaceBrickAbortFailedException(rc, out, err)
- else:
+ command = _getGlusterVolCmd() + ["replace-brick", volumeName,
+ existingBrick, newBrick, "abort"]
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeReplaceBrickAbortFailedException(rc=e.rc,
+ err=e.err)
@exportToSuperVdsm
def volumeReplaceBrickPause(volumeName, existingBrick, newBrick):
- rc, out, err = _execGluster(_getGlusterVolCmd() + ["replace-brick",
- volumeName,
- existingBrick, newBrick,
- "pause"])
- if rc:
- raise ge.GlusterVolumeReplaceBrickPauseFailedException(rc, out, err)
- else:
+ command = _getGlusterVolCmd() + ["replace-brick", volumeName,
+ existingBrick, newBrick, "pause"]
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeReplaceBrickPauseFailedException(rc=e.rc,
+ err=e.err)
+
+
+def _parseVolumeReplaceBrickStatus(tree):
+ """
+ returns {'taskId': UUID,
+ 'filesMoved': INT,
+ 'movingFile': STRING,
+ 'status': TaskStatus}}
+ """
+ return {'taskId': tree.find('volReplaceBrick/id').text,
+ 'filesMoved': int(tree.find('volReplaceBrick/filesMoved').text),
+ 'movingFile': tree.find('volReplaceBrick/movingFile').text,
+ 'status': tree.find('volReplaceBrick/status').text}
@exportToSuperVdsm
def volumeReplaceBrickStatus(volumeName, existingBrick, newBrick):
- rc, out, err = _execGluster(_getGlusterVolCmd() + ["replace-brick",
- volumeName,
- existingBrick, newBrick,
- "status"])
- if rc:
- raise ge.GlusterVolumeReplaceBrickStatusFailedException(rc, out,
- err)
- message = "\n".join(out)
- statLine = out[0].strip().upper()
- if BrickStatus.PAUSED in statLine:
- return BrickStatus.PAUSED, message
- elif statLine.endswith('MIGRATION COMPLETE'):
- return BrickStatus.COMPLETED, message
- elif statLine.startswith('NUMBER OF FILES MIGRATED'):
- return BrickStatus.RUNNING, message
- elif statLine.endswith("UNKNOWN"):
- return BrickStatus.UNKNOWN, message
- else:
- return BrickStatus.NA, message
+ command = _getGlusterVolCmd() + ["replace-brick", volumeName,
+ existingBrick, newBrick, "status"]
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeReplaceBrickStatusFailedException(rc=e.rc,
+ err=e.err)
+ try:
+ return _parseVolumeReplaceBrickStatus(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
@exportToSuperVdsm
@@ -399,12 +515,12 @@
existingBrick, newBrick, "commit"]
if force:
command.append('force')
- rc, out, err = _execGluster(command)
- if rc:
- raise ge.GlusterVolumeReplaceBrickCommitFailedException(rc, out,
- err)
- else:
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeReplaceBrickCommitFailedException(rc=e.rc,
+ err=e.err)
@exportToSuperVdsm
@@ -413,12 +529,15 @@
if replicaCount:
command += ["replica", "%s" % replicaCount]
command += brickList + ["start"]
-
- rc, out, err = _execGluster(command)
- if rc:
- raise ge.GlusterVolumeRemoveBrickStartFailedException(rc, out, err)
- else:
- return True
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRemoveBrickStartFailedException(rc=e.rc,
+ err=e.err)
+ try:
+ return {'taskId': xmltree.find('id').text}
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
@exportToSuperVdsm
@@ -427,12 +546,16 @@
if replicaCount:
command += ["replica", "%s" % replicaCount]
command += brickList + ["stop"]
- rc, out, err = _execGluster(command)
-
- if rc:
- raise ge.GlusterVolumeRemoveBrickStopFailedException(rc, out, err)
- else:
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRemoveBrickStopFailedException(rc=e.rc,
+ err=e.err)
+
+
+def _parseVolumeRemoveBrickStatus(tree):
+ return _parseVolumeRebalanceRemoveBrickStatus(tree, 'remove-brick')
@exportToSuperVdsm
@@ -441,12 +564,15 @@
if replicaCount:
command += ["replica", "%s" % replicaCount]
command += brickList + ["status"]
- rc, out, err = _execGluster(command)
-
- if rc:
- raise ge.GlusterVolumeRemoveBrickStatusFailedException(rc, out, err)
- else:
- return "\n".join(out)
+ try:
+ xmltree, out = _execGlusterXml(command)
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRemoveBrickStatusFailedException(rc=e.rc,
+ err=e.err)
+ try:
+ return _parseVolumeRemoveBrickStatus(xmltree)
+ except:
+ raise ge.GlusterXmlErrorException(err=out)
@exportToSuperVdsm
@@ -455,12 +581,12 @@
if replicaCount:
command += ["replica", "%s" % replicaCount]
command += brickList + ["commit"]
- rc, out, err = _execGluster(command)
-
- if rc:
- raise ge.GlusterVolumeRemoveBrickCommitFailedException(rc, out, err)
- else:
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRemoveBrickCommitFailedException(rc=e.rc,
+ err=e.err)
@exportToSuperVdsm
@@ -469,12 +595,12 @@
if replicaCount:
command += ["replica", "%s" % replicaCount]
command += brickList + ["force"]
- rc, out, err = _execGluster(command)
-
- if rc:
- raise ge.GlusterVolumeRemoveBrickForceFailedException(rc, out, err)
- else:
+ try:
+ _execGlusterXml(command)
return True
+ except ge.GlusterCmdFailedException, e:
+ raise ge.GlusterVolumeRemoveBrickForceFailedException(rc=e.rc,
+ err=e.err)
@exportToSuperVdsm
diff --git a/vdsm/gluster/exception.py b/vdsm/gluster/exception.py
index f4f497b..f209885 100644
--- a/vdsm/gluster/exception.py
+++ b/vdsm/gluster/exception.py
@@ -323,6 +323,11 @@
message = "Volume remove brick force failed"
+class GlusterVolumeStatusAllFailedException(GlusterVolumeException):
+ code = 4158
+ message = "Volume status all failed"
+
+
# Host
class GlusterHostException(GlusterException):
code = 4400
diff --git a/vdsm/storage/taskManager.py b/vdsm/storage/taskManager.py
index 3bc12f3..0a269cd 100644
--- a/vdsm/storage/taskManager.py
+++ b/vdsm/storage/taskManager.py
@@ -25,6 +25,12 @@
import storage_exception as se
from task import Task, Job, TaskCleanType
from threadPool import ThreadPool
+try:
+ from gluster import cli as gcli
+ from gluster import exception as ge
+ _glusterEnabled = True
+except ImportError:
+ _glusterEnabled = False
class TaskManager:
@@ -113,19 +119,82 @@
self.log.debug("Return: %s", subRes)
return subRes
- def getAllTasks(self):
+ def _getAllGlusterTasks(self):
"""
- Return Tasks for all public tasks.
+ Return all gluster tasks
+ """
+ subRes = {}
+ if not _glusterEnabled:
+ return subRes
+
+ for taskId, value in gcli.volumeStatusAll():
+ msg = ''
+ state = ''
+ try:
+ if value['taskType'] == gcli.TaskType.REBALANCE:
+ status = gcli.volumeRebalanceStatus(value['volumeName'])
+ msg = ('Files [scanned: %d, moved: %d, failed: %d], '
+ 'Total size moved: %d') % \
+ (status['summary']['filesScanned'],
+ status['summary']['filesMoved'],
+ status['summary']['filesFailed'],
+ status['summary']['totalSizeMoved'])
+ state = status['summary']['status']
+ elif value['taskType'] == gcli.TaskType.REMOVE_BRICK:
+ status = gcli.volumeRemoveBrickStatus(value['volumeName'],
+ value['bricks'])
+ msg = ('Files [scanned: %d, moved: %d, failed: %d], '
+ 'Total size moved: %d') % \
+ (status['summary']['filesScanned'],
+ status['summary']['filesMoved'],
+ status['summary']['filesFailed'],
+ status['summary']['totalSizeMoved'])
+ state = status['summary']['status']
+ elif value['taskType'] == gcli.TaskType.REPLACE_BRICK:
+ status = gcli.volumeReplaceBrickStatus(value['volumeName'],
+ value['bricks'][0],
+ value['bricks'][1])
+ msg = 'Files moved: %d, Moving file: %s' % \
+ (status['filesMoved'], status['movingFile'])
+ state = status['status']
+ except ge.GlusterException:
+ self.log.error("gluster exception occured", exc_info=True)
+
+ subRes[taskId] = {"id": taskId,
+ "verb": value['volumeName'],
+ "state": state,
+ "code": value['taskType'],
+ "message": msg,
+ "result": '',
+ "tag": 'gluster'}
+ return subRes
+
+ def getAllTasks(self, tag=[]):
+ """
+ Return Tasks for all public tasks if param tag is empty,
+ else return tasks those tags are in param tag.
"""
self.log.debug("Entry.")
subRes = {}
for taskID, task in self._tasks.items():
try:
- subRes[taskID] = task.getDetails()
+ if not tag:
+ subRes[taskID] = task.getDetails()
+ elif task.getTags() in tag:
+ subRes[taskID] = task.getDetails()
except se.UnknownTask:
# Return info for existing tasks only.
self.log.warn("Unknown task %s. Maybe task was already "
"cleared.", taskID)
+
+ try:
+ if not tag:
+ subRes.update(self._getAllGlusterTasks())
+ elif 'gluster' in tag:
+ subRes.update(self._getAllGlusterTasks())
+ except ge.GlusterException:
+ self.log.error("gluster exception occured", exc_info=True)
+
self.log.debug("Return: %s", subRes)
return subRes
--
To view, visit http://gerrit.ovirt.org/7579
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9c765cbfebb5ba22f0d21efa04c824ea4daf6432
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Bala.FA <barumuga(a)redhat.com>
Gerrit-Reviewer: Ayal Baron <abaron(a)redhat.com>
Gerrit-Reviewer: Dan Kenigsberg <danken(a)redhat.com>
Gerrit-Reviewer: Federico Simoncelli <fsimonce(a)redhat.com>
Gerrit-Reviewer: Saggi Mizrahi <smizrahi(a)redhat.com>
Gerrit-Reviewer: Timothy Asir <tjeyasin(a)redhat.com>
7 years, 5 months
Change in vdsm[master]: multipath: Move all calls to multipath exe to a single method
by smizrahi@redhat.com
Saggi Mizrahi has uploaded a new change for review.
Change subject: multipath: Move all calls to multipath exe to a single method
......................................................................
multipath: Move all calls to multipath exe to a single method
This makes the code a bit cleaner
Change-Id: I52afc07a07a925ed7572eb369deb7c203edb04cd
Signed-off-by: Saggi Mizrahi <smizrahi(a)redhat.com>
---
M vdsm/storage/multipath.py
1 file changed, 11 insertions(+), 4 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/55/19255/1
diff --git a/vdsm/storage/multipath.py b/vdsm/storage/multipath.py
index 924d747..c31b5c3 100644
--- a/vdsm/storage/multipath.py
+++ b/vdsm/storage/multipath.py
@@ -94,6 +94,10 @@
)
+def _runCmd(args):
+ return misc.execCmd([constants.EXT_MULTIPATH] + args, sudo=True)
+
+
def rescan():
"""
Forces multipath daemon to rescan the list of available devices and
@@ -108,8 +112,8 @@
supervdsm.getProxy().forceScsiScan()
# Now let multipath daemon pick up new devices
- cmd = [constants.EXT_MULTIPATH, "-r"]
- misc.execCmd(cmd, sudo=True)
+
+ _runCmd("-r")
def isEnabled():
@@ -154,6 +158,10 @@
return False
+def flushAll():
+ _runCmd("-F")
+
+
def setupMultipath():
"""
Set up the multipath daemon configuration to the known and
@@ -173,8 +181,7 @@
raise se.MultipathSetupError()
misc.persistFile(MPATH_CONF)
- # Flush all unused multipath device maps
- misc.execCmd([constants.EXT_MULTIPATH, "-F"], sudo=True)
+ flushAll()
cmd = [constants.EXT_VDSM_TOOL, "service-reload", "multipathd"]
rc = misc.execCmd(cmd, sudo=True)[0]
--
To view, visit http://gerrit.ovirt.org/19255
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I52afc07a07a925ed7572eb369deb7c203edb04cd
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <smizrahi(a)redhat.com>
7 years, 6 months
Change in vdsm[master]: [WIP]Java Bindings: Proton support in Java Bindings
by smizrahi@redhat.com
Saggi Mizrahi has uploaded a new change for review.
Change subject: [WIP]Java Bindings: Proton support in Java Bindings
......................................................................
[WIP]Java Bindings: Proton support in Java Bindings
Change-Id: I94c52e118cb63d7df84b89a9b93da7b9e477be91
Signed-off-by: Saggi Mizrahi <smizrahi(a)redhat.com>
---
A client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonAuthenticator.java
A client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonClient.java
A client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonListener.java
A client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonReactor.java
A client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/AmqpReactorTestHelper.java
A client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/TestJsonRpcClientAMQP.java
6 files changed, 844 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/28/15428/1
diff --git a/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonAuthenticator.java b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonAuthenticator.java
new file mode 100644
index 0000000..35c9099
--- /dev/null
+++ b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonAuthenticator.java
@@ -0,0 +1,98 @@
+package org.ovirt.vdsm.reactors;
+
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
+import org.apache.qpid.proton.engine.Sasl.SaslState;
+import org.ovirt.vdsm.reactors.ProtonAuthenticator.AuthenticatorType;
+
+public final class ProtonAuthenticator {
+
+ public enum AuthenticatorType {
+
+ SERVER, CLIENT
+ }
+
+ public enum ConnectionState {
+
+ AUTHENTICATING, CONNECTED, FAILED
+ }
+ private ConnectionState _state;
+ final private AuthenticatorType _authType;
+ final private Connector<?> _connector;
+
+ public ProtonAuthenticator(Connector<?> connector,
+ AuthenticatorType authType) {
+ _authType = authType;
+ setState(ConnectionState.AUTHENTICATING);
+ _connector = connector;
+ final Sasl sasl = _connector.sasl();
+ if (authType == AuthenticatorType.CLIENT) {
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ sasl.client();
+ }
+ }
+
+ private void setState(ConnectionState state) {
+ _state = state;
+ }
+
+ public ConnectionState getState() {
+ return _state;
+ }
+
+ public void authenticate() {
+ final Sasl sasl = _connector.sasl();
+
+ while (true) {
+ try {
+ this._connector.process();
+ } catch (IOException ex) {
+ return;
+ }
+ final SaslState state = sasl.getState();
+ switch (state) {
+ case PN_SASL_CONF:
+ if (_authType == AuthenticatorType.SERVER) {
+ sasl.setMechanisms(new String[]{"ANONYMOUS"});
+ sasl.server();
+ }
+ break;
+ case PN_SASL_STEP:
+ if (_authType == AuthenticatorType.SERVER) {
+ final String[] mechs = sasl.getRemoteMechanisms();
+ if (mechs.length < 1) {
+ sasl.done(SaslOutcome.PN_SASL_AUTH);
+ break;
+ }
+
+ final String mech = mechs[0];
+ if (mech.equals("ANONYMOUS")) {
+ sasl.done(SaslOutcome.PN_SASL_OK);
+ } else {
+ sasl.done(SaslOutcome.PN_SASL_AUTH);
+ }
+ }
+ return;
+ case PN_SASL_PASS:
+ this.setState(ConnectionState.CONNECTED);
+ return;
+ case PN_SASL_FAIL:
+ this.setState(ConnectionState.FAILED);
+ return;
+ case PN_SASL_IDLE:
+
+ break;
+ default:
+ return;
+ }
+ }
+ }
+
+ public AuthenticatorType getAuthType() {
+ return _authType;
+ }
+}
diff --git a/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonClient.java b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonClient.java
new file mode 100644
index 0000000..4baffbf
--- /dev/null
+++ b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonClient.java
@@ -0,0 +1,224 @@
+package org.ovirt.vdsm.reactors;
+
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+
+import javax.swing.event.EventListenerList;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.MessageFactory;
+import org.apache.qpid.proton.message.impl.MessageFactoryImpl;
+
+public final class ProtonClient implements ReactorClient {
+ private final ProtonReactor _reactor;
+ final private Session _ssn;
+
+ private Sender _sender;
+ private Receiver _receiver;
+
+ private final int _CREDIT = 10;
+ private final ConcurrentLinkedQueue<ByteBuffer> _outbox;
+ private final EventListenerList _eventListeners;
+ private final int _deliveryTimeoutSec;
+ private final MessageFactory _msgFactory;
+
+ public ProtonClient(ProtonReactor reactor, Session session) {
+ _ssn = session;
+ _sender = null;
+ _receiver = null;
+ _outbox = new ConcurrentLinkedQueue<>();
+ _eventListeners = new EventListenerList();
+ _deliveryTimeoutSec = 60 * 3;
+ _reactor = reactor;
+ _msgFactory = new MessageFactoryImpl();
+ }
+
+ @Override
+ public void addEventListener(EventListener el) {
+ synchronized (_eventListeners) {
+ _eventListeners.add(EventListener.class, el);
+ }
+ }
+
+ @Override
+ public void removeEventListener(EventListener el) {
+ synchronized (_eventListeners) {
+ _eventListeners.remove(EventListener.class, el);
+ }
+ }
+
+ private void emitOnMessageReceived(ByteBuffer message) {
+ synchronized (_eventListeners) {
+ final Class<EventListener> cls = EventListener.class;
+ final EventListener[] els = _eventListeners.getListeners(cls);
+ for (EventListener el : els) {
+ el.onMessageReceived(this, message);
+ }
+ }
+ }
+
+ @Override
+ public void sendMessage(ByteBuffer message) {
+ _outbox.add(message);
+ _reactor.wakeup();
+ }
+
+ public void addLink(Link link) {
+ assert (link.getSession().equals(_ssn));
+
+ if (link instanceof Sender) {
+ if (_sender != null) {
+ // already have a sender
+ link.close();
+ return;
+ }
+
+ _sender = (Sender) link;
+ } else {
+ assert (link instanceof Receiver);
+ if (_receiver != null) {
+ // already have a receiver
+ link.close();
+ return;
+ }
+
+ _receiver = (Receiver) link;
+ _receiver.flow(_CREDIT);
+ }
+ link.open();
+ }
+
+ private Message _popOutgoingMessage() {
+ final ByteBuffer data = _outbox.poll();
+ if (data == null) {
+ return null;
+ }
+
+ final Section body = new Data(Binary.create(data));
+ final Message msg = _msgFactory.createMessage();
+ msg.setBody(body);
+ msg.setAddress(_sender.getTarget().toString());
+ return msg;
+ }
+
+ public void queueDeliveries() {
+ if (_sender == null) {
+ final String uuid = UUID.randomUUID().toString();
+ _sender = _ssn.sender("Sender-" + uuid);
+ }
+
+ while (_sender.getCredit() > 0) {
+ final Message m = _popOutgoingMessage();
+ if (m == null) {
+ return;
+ }
+
+ final String uuid = UUID.randomUUID().toString();
+ final Delivery d = _sender
+ .delivery(("outgoing-" + uuid).getBytes());
+ d.setContext(m);
+ }
+ }
+
+ public void processDelivery(Delivery delivery) {
+ assert (_ssn.equals(delivery.getLink().getSession()));
+
+ if (delivery.isReadable()) {
+ _processIncomingDelivery(delivery);
+ } else {
+ assert (delivery.isWritable());
+ _processOutgoingDelivery(delivery);
+ }
+ }
+
+ private void _processOutgoingDelivery(Delivery delivery) {
+ final Sender link = (Sender) delivery.getLink();
+ assert (link.equals(_sender));
+
+ final Message msg = (Message) delivery.getContext();
+ // TBD: Buffer can be reused forever. Change in case of
+ // performance issues.
+ ByteBuffer buff;
+ int i = 1;
+ int written = 0;
+ do {
+ buff = ByteBuffer.allocate(i * 4096);
+ written = msg.encode(buff.array(), 0, buff.capacity());
+ i++;
+ } while (written == buff.capacity());
+
+ link.send(buff.array(), 0, written);
+ if (link.advance()) {
+ // Attach timeout to the delivery
+ final Calendar calendar = Calendar.getInstance();
+ calendar.add(Calendar.SECOND, _deliveryTimeoutSec);
+ delivery.setContext(calendar);
+ }
+ }
+
+ private void _processIncomingDelivery(Delivery delivery) {
+ int total = 0;
+ int read = 0;
+ ByteBuffer buff = ByteBuffer.allocate(4096);
+
+ while (read >= 0) {
+ total += read;
+ if (total >= buff.capacity()) {
+ final ByteBuffer buff2 = ByteBuffer
+ .allocate(buff.capacity() * 2);
+ buff2.put(buff);
+ buff = buff2;
+ }
+ read = _receiver.recv(buff.array(), total, buff.capacity() - total);
+ }
+
+ final Message msg = _msgFactory.createMessage();
+ msg.decode(buff.array(), 0, total);
+
+ assert (msg.getBody() instanceof Data);
+ final Data body = (Data) msg.getBody();
+ final ByteBuffer bb = body.getValue().asByteBuffer();
+ delivery.settle();
+ emitOnMessageReceived(bb);
+ }
+
+ @Override
+ public Future<Void> close() {
+ final Session ssn = _ssn;
+ return _reactor.queueOperation(new Callable<Void>() {
+ @Override
+ public Void call() {
+ ssn.close();
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public boolean closed() {
+ return _ssn.getLocalState().equals(EndpointState.CLOSED);
+ }
+
+ public void removeLink(Link link) {
+ if (link.equals(_sender)) {
+ _sender = null;
+ } else {
+ assert (link.equals(_receiver));
+ _receiver = null;
+ }
+ link.close();
+ }
+}
diff --git a/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonListener.java b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonListener.java
new file mode 100644
index 0000000..35896f4
--- /dev/null
+++ b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonListener.java
@@ -0,0 +1,42 @@
+package org.ovirt.vdsm.reactors;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+import org.apache.qpid.proton.driver.Listener;
+
+public final class ProtonListener implements ReactorListener {
+ private final EventListener _eventListener;
+ private Listener<ProtonListener> _listener;
+ private final ProtonReactor _reactor;
+
+ public ProtonListener(ProtonReactor reactor, EventListener eventListener) {
+ _eventListener = eventListener;
+ _reactor = reactor;
+ }
+
+ public void setListener(Listener<ProtonListener> l) {
+ _listener = l;
+ }
+
+ public void accept(ReactorClient client) {
+ _eventListener.onAcccept(this, client);
+ }
+
+ @Override
+ public Future<Void> close() {
+ final Listener<ProtonListener> l = _listener;
+ return _reactor.queueOperation(new Callable<Void>() {
+ @Override
+ public Void call() {
+ try {
+ l.close();
+ } catch (IOException e) {
+ // already closed
+ }
+ return null;
+ }
+ });
+ }
+}
diff --git a/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonReactor.java b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonReactor.java
new file mode 100644
index 0000000..b5a38b4
--- /dev/null
+++ b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/reactors/ProtonReactor.java
@@ -0,0 +1,452 @@
+package org.ovirt.vdsm.reactors;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import org.apache.qpid.proton.driver.Connector;
+import org.apache.qpid.proton.driver.Driver;
+import org.apache.qpid.proton.driver.Listener;
+import org.apache.qpid.proton.driver.impl.DriverFactoryImpl;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.EngineFactory;
+import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.engine.impl.EngineFactoryImpl;
+import org.ovirt.vdsm.reactors.ProtonAuthenticator.AuthenticatorType;
+import org.ovirt.vdsm.reactors.ProtonAuthenticator.ConnectionState;
+import org.ovirt.vdsm.util.ChainedOperation;
+import org.ovirt.vdsm.util.ReactorScheduler;
+
+public final class ProtonReactor implements Reactor {
+
+ private final Driver _driver;
+ private final ReactorScheduler _scheduler;
+ private boolean _isRunning;
+ final Object _syncRoot = new Object();
+ final ProtonReactor reactor = this;
+ private EngineFactory _engineFactory;
+
+ public boolean isRunning() {
+ return _isRunning;
+ }
+
+ public ProtonReactor() throws IOException {
+ _engineFactory = new EngineFactoryImpl();
+ _driver = new DriverFactoryImpl().createDriver();
+ _isRunning = false;
+ _scheduler = new ReactorScheduler();
+ }
+
+ @Override
+ public void finalize() throws Throwable {
+ try {
+ _driver.destroy();
+ } finally {
+ super.finalize();
+ }
+ }
+
+ // Creates a listener, returns null if failed to bind or reactor is not
+ // running;
+ @Override
+ public Future<ReactorListener> createListener(final String host,
+ final int port,
+ final ReactorListener.EventListener eventListener) {
+
+ return queueOperation(new Callable<ReactorListener>() {
+ @Override
+ public ProtonListener call() {
+
+ final ProtonListener listener = new ProtonListener(reactor, eventListener);
+ final Listener<ProtonListener> l = _driver.createListener(host,
+ port, listener);
+
+ listener.setListener(l);
+
+ if (l == null) {
+ return null;
+ }
+
+ return listener;
+ }
+ });
+ }
+
+ @Override
+ public Future<ReactorClient> createClient(final String host, final int port) {
+ final Driver driver = _driver;
+ final EngineFactory engineFactory = _engineFactory;
+
+ return queueOperation(new ChainedOperation.Operation<ReactorClient>() {
+ final private int _INIT = 1;
+ final private int _AUTHENTICATE = 2;
+ final private int _DONE = 3;
+ private int _state;
+ final private Driver _driver;
+ final private ProtonReactor _reactor;
+ private Connector<ProtonAuthenticator> _connector;
+ private ProtonAuthenticator _auth;
+ private boolean _done;
+ private boolean _cancelled;
+ private ReactorClient _result;
+ private EngineFactory _engineFactory;
+
+ {
+ _driver = driver;
+ _reactor = reactor;
+ _state = _INIT;
+ _done = false;
+ _cancelled = false;
+ _engineFactory = engineFactory;
+ }
+
+ @Override
+ public void call(final boolean cancelled) {
+ switch (_state) {
+ case _INIT:
+ if (cancelled) {
+ _cancelled = true;
+ _done = true;
+ return;
+ }
+
+ _connector = this._driver.createConnector(host, port, null);
+
+ final Connection connection = engineFactory.createConnection();
+ _connector.setConnection(connection);
+ _auth = new ProtonAuthenticator(_connector,
+ AuthenticatorType.CLIENT);
+ _connector.setContext(_auth);
+ connection.open();
+ _state = _AUTHENTICATE;
+ case _AUTHENTICATE:
+ if (cancelled) {
+ _cancelled = true;
+ _close();
+ return;
+ }
+
+ switch (_auth.getState()) {
+ case AUTHENTICATING:
+ _auth.authenticate();
+ try {
+ _connector.process();
+ } catch (IOException e) {
+ // ignore
+ }
+ return;
+ case FAILED:
+ _close();
+ return;
+ case CONNECTED:
+ // Success !
+ break;
+ }
+
+ Session ssn = _connector.getConnection().session();
+ ssn.open();
+ _result = new ProtonClient(_reactor, ssn);
+ ssn.setContext(_result);
+ _done = true;
+ _state = _DONE;
+ }
+ }
+
+ private void _close() {
+ _connector.getConnection().close();
+ _connector.close();
+ _done = true;
+ _result = null;
+ }
+
+ @Override
+ public boolean isDone() {
+ return _done;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return _cancelled;
+ }
+
+ @Override
+ public ReactorClient getResult() {
+ return _result;
+ }
+ });
+ }
+
+ // Queues operation to be run in the serving loop.
+ public <T> Future<T> queueOperation(Callable<T> cb) {
+ final FutureTask<T> task = new FutureTask<>(cb);
+ _queueFuture(task);
+ return task;
+ }
+
+ public <T> Future<T> queueOperation(ChainedOperation.Operation<T> op) {
+ final ChainedOperation<T> task = new ChainedOperation<>(op);
+ _queueFuture(task);
+ return task;
+ }
+
+ private void _queueFuture(Future<?> op) {
+ synchronized (_scheduler) {
+ _scheduler.queueFuture(op);
+ wakeup();
+ }
+ }
+
+ private void _waitEvents() {
+ _driver.doWait(0);
+ }
+
+ public void wakeup() {
+ _driver.wakeup();
+ }
+
+ @Override
+ public void serve() {
+ synchronized (_syncRoot) {
+ _isRunning = true;
+ }
+
+ while (_isRunning) {
+ //_waitEvents();
+ synchronized (_scheduler) {
+ _scheduler.performPendingOperations();
+ }
+ _acceptConnectionRequests();
+ _processConnectors();
+ }
+ }
+
+ private void _processConnectors() {
+ for (Connector<?> connector = _driver.connector(); connector != null; connector = _driver
+ .connector()) {
+ if (connector.isClosed()) {
+ connector.destroy();
+ continue;
+ }
+
+ try {
+ connector.process();
+ } catch (IOException e) {
+ continue;
+ }
+
+ final Object ctx = connector.getContext();
+ assert (ctx instanceof ProtonAuthenticator);
+
+ if (ctx instanceof ProtonAuthenticator) {
+ final ProtonAuthenticator auth = (ProtonAuthenticator) ctx;
+ ConnectionState cs = auth.getState();
+ if (cs.equals(ConnectionState.AUTHENTICATING)) {
+ auth.authenticate();
+ cs = auth.getState();
+ }
+
+ if (cs.equals(ConnectionState.CONNECTED)) {
+ if (connector.getConnection() == null) {
+ connector.setConnection(_engineFactory.createConnection());
+ }
+ _processConnector(connector);
+ }
+ }
+
+ try {
+ connector.process();
+ } catch (IOException e) {
+ continue;
+ }
+ }
+ }
+
+ private void _processConnector(Connector<?> connector) {
+ _initConnection(connector);
+ _openPendingSessions(connector);
+ _openLinks(connector);
+ _queueOutgoingDeliveries(connector);
+ _processDeliveries(connector);
+ _cleanDeliveries(connector);
+ _cleanLinks(connector);
+ _cleanSessions(connector);
+ }
+
+ private void _cleanSessions(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ final EnumSet<EndpointState> localState = EnumSet
+ .of(EndpointState.ACTIVE);
+ final EnumSet<EndpointState> remoteState = EnumSet
+ .of(EndpointState.CLOSED);
+
+ for (Session ssn = conn.sessionHead(localState, remoteState); ssn != null; ssn
+ .next(localState, remoteState)) {
+
+ ssn.close();
+ }
+ }
+
+ private void _cleanLinks(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ final EnumSet<EndpointState> localState = EnumSet
+ .of(EndpointState.ACTIVE);
+ final EnumSet<EndpointState> remoteState = EnumSet
+ .of(EndpointState.CLOSED);
+
+ for (Link link = conn.linkHead(localState, remoteState); link != null; link = link
+ .next(localState, remoteState)) {
+
+ final ProtonClient ssn = _getClient(link.getSession());
+ ssn.removeLink(link);
+ }
+ }
+
+ private void _cleanDeliveries(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ final EnumSet<EndpointState> localState = EnumSet
+ .of(EndpointState.ACTIVE);
+ final EnumSet<EndpointState> remoteState = EnumSet
+ .allOf(EndpointState.class);
+ for (Link link = conn.linkHead(localState, remoteState); link != null; link = link
+ .next(localState, remoteState)) {
+
+ if (link instanceof Receiver) {
+ // We settle all incoming deliveries upon receive
+ continue;
+ }
+
+ Delivery d;
+ final Calendar now = Calendar.getInstance();
+ for (Iterator<Delivery> iter = link.unsettled(); iter.hasNext();) {
+ d = iter.next();
+ Object ctx = d.getContext();
+ if (!(ctx instanceof Calendar)) {
+ // Has not been sent yet
+ continue;
+ }
+
+ final Calendar timeout = (Calendar) ctx;
+ boolean remoteClosed = link.getRemoteState().equals(
+ EndpointState.CLOSED);
+ boolean timedOut = now.after(timeout);
+ if (d.remotelySettled() || timedOut || remoteClosed) {
+ d.settle();
+ d.free();
+ }
+ }
+
+ }
+
+ }
+
+ private void _processDeliveries(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ for (Delivery delivery = conn.getWorkHead(); delivery != null; delivery = delivery
+ .getWorkNext()) {
+
+ final ProtonClient client = _getClient(delivery.getLink()
+ .getSession());
+ client.processDelivery(delivery);
+ }
+ }
+
+ private void _queueOutgoingDeliveries(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+
+ final EnumSet<EndpointState> localState = EnumSet
+ .of(EndpointState.ACTIVE);
+ final EnumSet<EndpointState> remoteState = EnumSet
+ .allOf(EndpointState.class);
+
+ for (Session ssn = conn.sessionHead(localState, remoteState); ssn != null; ssn = ssn
+ .next(localState, remoteState)) {
+
+ final ProtonClient client = _getClient(ssn);
+ client.queueDeliveries();
+ }
+ }
+
+ private void _openLinks(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ final EnumSet<EndpointState> localState = EnumSet
+ .of(EndpointState.UNINITIALIZED);
+ final EnumSet<EndpointState> remoteState = EnumSet
+ .allOf(EndpointState.class);
+ for (Link link = conn.linkHead(localState, remoteState); link != null; link = link
+ .next(localState, remoteState)) {
+
+ // configure the link
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
+
+ final ProtonClient client = _getClient(link.getSession());
+ client.addLink(link);
+ }
+ }
+
+ private ProtonClient _getClient(Session ssn) {
+ return (ProtonClient) ssn.getContext();
+ }
+
+ private void _openPendingSessions(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ final EnumSet<EndpointState> localState = EnumSet
+ .of(EndpointState.UNINITIALIZED);
+ final EnumSet<EndpointState> remoteState = EnumSet
+ .allOf(EndpointState.class);
+
+ for (Session ssn = conn.sessionHead(localState, remoteState); ssn != null; ssn = ssn
+ .next(localState, remoteState)) {
+
+ final ProtonClient client = new ProtonClient(this, ssn);
+ ssn.setContext(client);
+ final Object ctx = connector.getContext();
+ assert (ctx instanceof ProtonAuthenticator);
+ ProtonAuthenticator auth = (ProtonAuthenticator) ctx;
+ if (auth.getAuthType() == AuthenticatorType.SERVER) {
+ ssn.open();
+ final ProtonListener l = (ProtonListener) ctx;
+ l.accept(client);
+ } else {
+ ssn.close();
+ }
+ }
+ }
+
+ private void _initConnection(Connector<?> connector) {
+ final Connection conn = connector.getConnection();
+ if (conn.getLocalState().equals(EndpointState.UNINITIALIZED)) {
+ conn.open();
+ }
+ }
+
+ private void _acceptConnectionRequests() {
+ for (final Listener<?> l : _driver.listeners()) {
+
+ @SuppressWarnings("unchecked")
+ final Connector<ProtonAuthenticator> connector = (Connector<ProtonAuthenticator>) l
+ .accept();
+ if (connector == null) {
+ continue;
+ }
+ connector.setContext(new ProtonAuthenticator(connector,
+ AuthenticatorType.SERVER));
+ }
+ }
+
+ public void stop() {
+ synchronized (_syncRoot) {
+ _isRunning = false;
+ }
+
+ wakeup();
+ }
+}
\ No newline at end of file
diff --git a/client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/AmqpReactorTestHelper.java b/client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/AmqpReactorTestHelper.java
new file mode 100644
index 0000000..46d9cc3
--- /dev/null
+++ b/client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/AmqpReactorTestHelper.java
@@ -0,0 +1,18 @@
+package org.ovirt.vdsm.jsonrpc;
+
+import java.io.IOException;
+import org.ovirt.vdsm.reactors.ProtonReactor;
+import org.ovirt.vdsm.reactors.Reactor;
+
+public class AmqpReactorTestHelper implements ReactorTestHelper {
+ @Override
+ public Reactor createReactor() throws IOException {
+ return new ProtonReactor();
+ }
+
+ @Override
+ public String getUriScheme() {
+ return "amqp";
+ }
+
+}
diff --git a/client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/TestJsonRpcClientAMQP.java b/client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/TestJsonRpcClientAMQP.java
new file mode 100644
index 0000000..9e0c24c
--- /dev/null
+++ b/client/java/vdsm-json-rpc/src/test/java/org/ovirt/vdsm/jsonrpc/TestJsonRpcClientAMQP.java
@@ -0,0 +1,10 @@
+package org.ovirt.vdsm.jsonrpc;
+
+public class TestJsonRpcClientAMQP extends TestJsonRpcClient {
+
+ @Override
+ protected ReactorTestHelper getHelper() {
+ return new AmqpReactorTestHelper();
+ }
+
+}
--
To view, visit http://gerrit.ovirt.org/15428
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I94c52e118cb63d7df84b89a9b93da7b9e477be91
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <smizrahi(a)redhat.com>
7 years, 6 months
Change in vdsm[master]: [WIP] vdsm: add support for PCI passthrough
by mpoledni@redhat.com
Martin Polednik has uploaded a new change for review.
Change subject: [WIP] vdsm: add support for PCI passthrough
......................................................................
[WIP] vdsm: add support for PCI passthrough
required functionality:
* report PCI devices available on host [x]
* handle createVm xml generation [x]
* hotplugHostdev [ ] (required for after-migration)
* hotpunlugHostdev [ ] (required for migration)
Change-Id: I363d2622d72ca2db75f60032fe0892c348bab121
Signed-off-by: Martin Polednik <mpoledni(a)redhat.com>
---
M lib/vdsm/define.py
M vdsm/caps.py
M vdsm/vm.py
3 files changed, 83 insertions(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/62/22462/1
diff --git a/lib/vdsm/define.py b/lib/vdsm/define.py
index eb78633..9605f93 100644
--- a/lib/vdsm/define.py
+++ b/lib/vdsm/define.py
@@ -132,6 +132,12 @@
'transientErr': {'status': {
'code': 59,
'message': 'Action not permitted on a VM with transient disks'}},
+ 'hotplugHostdev': {'status': {
+ 'code': 60,
+ 'message': 'Failed to hotplug hostdev'}},
+ 'hotunplugHostdev': {'status': {
+ 'code': 61,
+ 'message': 'Failed to hotunplug hostdev'}},
'recovery': {'status': {
'code': 99,
'message': 'Recovering from crash or Initializing'}},
diff --git a/vdsm/caps.py b/vdsm/caps.py
index 3839134..d6af375 100644
--- a/vdsm/caps.py
+++ b/vdsm/caps.py
@@ -308,6 +308,38 @@
return dict(release=release, version=version, name=osname)
+def hostdevList():
+ devices = []
+ for device in libvirtconnection.get().listAllDevices():
+ devXML = minidom.parseString(device.XMLDesc())
+ dev = {}
+
+ # we have to grab attributes that will most likely not only
+ # uniquely identify device, but also serve as human readable
+ # representation of the device
+ try:
+ dev['name'] = devXML.getElementsByTagName('name')[0].\
+ childNodes[0].data
+ capability = devXML.getElementsByTagName('capability')[0]
+ try:
+ dev['product'] = capability.getElementsByTagName('product')[0]\
+ .childNodes[0].data
+ dev['vendor'] = capability.getElementsByTagName('vendor')[0].\
+ childNodes[0].data
+ except IndexError:
+ # althought the retrieval of product/vendor was not successful,
+ # we can still report back the name
+ pass
+ except IndexError:
+ # should device not have a name, there is nothing engine could send
+ # back that we could use to uniquely identify and initiate a device
+ continue
+
+ devices.append(dev)
+
+ return devices
+
+
def get():
targetArch = platform.machine()
@@ -360,6 +392,7 @@
config.getint('vars', 'extra_mem_reserve'))
caps['guestOverhead'] = config.get('vars', 'guest_ram_overhead')
caps['rngSources'] = _getRngSources()
+ caps['hostDevices'] = hostdevList()
return caps
diff --git a/vdsm/vm.py b/vdsm/vm.py
index a5d923b..a477bc9 100644
--- a/vdsm/vm.py
+++ b/vdsm/vm.py
@@ -78,6 +78,7 @@
WATCHDOG_DEVICES = 'watchdog'
CONSOLE_DEVICES = 'console'
SMARTCARD_DEVICES = 'smartcard'
+HOSTDEV_DEVICES = 'hostdev'
def isVdsmImage(drive):
@@ -1656,6 +1657,27 @@
return m
+class HostDevice(VmDevice):
+ def getXML(self):
+ """
+ Create domxml for a hostdev device.
+
+ <devices>
+ <hostdev mode='subsystem' type='usb'>
+ <source startupPolicy='optional'>
+ <vendor id='0x1234'/>
+ <product id='0xbeef'/>
+ </source>
+ <boot order='2'/>
+ </hostdev>
+ </devices>
+ """
+ # libvirt gives us direct api call to construct the XML
+ return xml.dom.minidom.parseString(libvirtconnection.get().
+ nodeDeviceLookupByName(self.name).
+ XMLDesc())
+
+
class WatchdogDevice(VmDevice):
def __init__(self, *args, **kwargs):
super(WatchdogDevice, self).__init__(*args, **kwargs)
@@ -1769,7 +1791,8 @@
(CONSOLE_DEVICES, ConsoleDevice),
(REDIR_DEVICES, RedirDevice),
(RNG_DEVICES, RngDevice),
- (SMARTCARD_DEVICES, SmartCardDevice))
+ (SMARTCARD_DEVICES, SmartCardDevice),
+ (HOSTDEV_DEVICES, HostDevice))
def _makeDeviceDict(self):
return dict((dev, []) for dev, _ in self.DeviceMapping)
@@ -3127,6 +3150,26 @@
break
+ def hotplugHostdev(self, params):
+ hostdev = HostDevice(self.conf, self.log, **params)
+ self._devices[HOSTDEV_DEVICES].append(hostdev)
+ hostdevXML = hostdev.getXML().toprettyxml(encoding='utf-8')
+ hostdev._deviceXML = hostdevXML
+ self.log.debug("Hotplug hostdev xml: %s", hostdevXML)
+
+ try:
+ self._dom.attachDevice(hostdevXML)
+ except libvirt.libvirtError as e:
+ self.log.error("Hotplug failed", exc_info=True)
+ if e.get_error_code() == libvirt.VIR_ERR_NO_DOMAIN:
+ return errCode['noVM']
+ return {'status': {'code':
+ errCode['hotplugHostdev']['status']['code'],
+ 'message': e.message}}
+
+ def hotunplugHostdev(self, name):
+ pass
+
def hotplugNic(self, params):
if self.isMigrating():
return errCode['migInProgress']
--
To view, visit http://gerrit.ovirt.org/22462
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I363d2622d72ca2db75f60032fe0892c348bab121
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Martin Polednik <mpoledni(a)redhat.com>
7 years, 7 months