Dan Kenigsberg has uploaded a new change for review.
Change subject: pep8: more semi-automatic fixes
......................................................................
pep8: more semi-automatic fixes
Change-Id: Ifbb36be20c77f714246279d1ae9067353bced047
Signed-off-by: Dan Kenigsberg <danken(a)redhat.com>
---
M Makefile.am
M vdsm/storage/blockSD.py
M vdsm/storage/dispatcher.py
M vdsm/storage/image.py
M vdsm/storage/iscsi.py
M vdsm/storage/resourceManager.py
M vdsm/storage/sp.py
M vdsm/storage/storage_mailbox.py
M vdsm/storage/sync.py
M vdsm/storage/threadLocal.py
M vdsm/storage/threadPool.py
M vdsm/storage/volume.py
12 files changed, 203 insertions(+), 197 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/06/9606/1
diff --git a/Makefile.am b/Makefile.am
index 40f96a8..489eb6b 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -52,6 +52,7 @@
vdsm-tool \
vdsm/*.py \
vdsm/*.py.in \
+ vdsm/storage/__init__.py \
vdsm/storage/blockVolume.py \
vdsm/storage/devicemapper.py \
vdsm/storage/domainMonitor.py \
@@ -71,12 +72,19 @@
vdsm/storage/nfsSD.py \
vdsm/storage/outOfProcess.py \
vdsm/storage/persistentDict.py \
+ vdsm/storage/remoteFileHandler.py \
vdsm/storage/resourceFactories.py \
+ vdsm/storage/safelease.py \
vdsm/storage/sd.py \
+ vdsm/storage/sdc.py \
vdsm/storage/securable.py \
+ vdsm/storage/storageConstants.py \
+ vdsm/storage/storageServer.py \
vdsm/storage/storage_exception.py \
+ vdsm/storage/sync.py \
vdsm/storage/task.py \
vdsm/storage/taskManager.py \
+ vdsm/storage/threadLocal.py \
vdsm/storage/volume.py \
vdsm/vdsm \
vdsm_api \
diff --git a/vdsm/storage/blockSD.py b/vdsm/storage/blockSD.py
index bb98b60..141754c 100644
--- a/vdsm/storage/blockSD.py
+++ b/vdsm/storage/blockSD.py
@@ -56,7 +56,7 @@
MASTERLV = "master"
SPECIAL_LVS = (sd.METADATA, sd.LEASES, sd.IDS, sd.INBOX, sd.OUTBOX, MASTERLV)
-MASTERLV_SIZE = "1024" #In MiB = 2 ** 20 = 1024 ** 2 => 1GiB
+MASTERLV_SIZE = "1024" # In MiB = 2 ** 20 = 1024 ** 2 => 1GiB
BlockSDVol = namedtuple("BlockSDVol", "name, image, parent")
log = logging.getLogger("Storage.BlockSD")
@@ -70,11 +70,11 @@
# VG's metadata size in MiB
VG_METADATASIZE = 128
-MAX_PVS_LIMIT = 10 # BZ#648051
+MAX_PVS_LIMIT = 10 # BZ#648051
MAX_PVS = config.getint('irs', 'maximum_allowed_pvs')
if MAX_PVS > MAX_PVS_LIMIT:
- log.warning("maximum_allowed_pvs = %d ignored. MAX_PVS = %d", MAX_PVS,
MAX_PVS_LIMIT)
- MAX_PVS = MAX_PVS_LIMIT
+ log.warning("maximum_allowed_pvs = %d ignored. MAX_PVS = %d", MAX_PVS,
MAX_PVS_LIMIT)
+ MAX_PVS = MAX_PVS_LIMIT
PVS_METADATA_SIZE = MAX_PVS * 142
@@ -88,6 +88,7 @@
VERS_METADATA_LV = (0,)
VERS_METADATA_TAG = (2, 3)
+
def encodePVInfo(pvInfo):
return (
@@ -117,9 +118,11 @@
INVALID_CHARS = re.compile(r"[^a-zA-Z0-9_+.\-/=!:#]")
LVM_ENC_ESCAPE = re.compile("&(\d+)&")
+
# Move to lvm
def lvmTagEncode(s):
return INVALID_CHARS.sub(lambda c: "&%s&" % ord(c.group()), s)
+
def lvmTagDecode(s):
return LVM_ENC_ESCAPE.sub(lambda c: unichr(int(c.groups()[0])), s)
@@ -142,7 +145,7 @@
else:
if lv.name not in SPECIAL_LVS:
log.warning("Ignoring Volume %s that lacks minimal tag set"
- "tags %s" % (lv.name, lv.tags))
+ "tags %s" % (lv.name, lv.tags))
return vols
@@ -151,7 +154,7 @@
Return dict {volUUID: ((imgUUIDs,), parentUUID)} of the domain.
imgUUIDs is a list of all images dependant on volUUID.
- For template based volumes, the first image is the template's image.
+ For template based volumes, the first image is the template's image.
For other volumes, there is just a single imageUUID.
Template self image is the 1st term in template volume entry images.
"""
@@ -183,10 +186,10 @@
This function requires an active LV.
"""
dm = lvm.lvDmDev(sdUUID, volUUID)
- size = multipath.getDeviceSize(dm) # Bytes
+ size = multipath.getDeviceSize(dm) # Bytes
# TODO: Change for zero 128 M chuncks and log.
# 128 M is the vdsm extent size default
- BS = constants.MEGAB # 1024 ** 2 = 1 MiB
+ BS = constants.MEGAB # 1024 ** 2 = 1 MiB
count = size / BS
cmd = tuple(constants.CMD_LOWPRIO)
cmd += (constants.EXT_DD, "oflag=%s" % misc.DIRECTFLAG,
"if=/dev/zero",
@@ -200,7 +203,7 @@
ProcVol = namedtuple("ProcVol", "proc, vol")
# Put a sensible value for dd zeroing a 128 M or 1 G chunk and lvremove
# spent time.
- ZEROING_TIMEOUT = 60000 # [miliseconds]
+ ZEROING_TIMEOUT = 60000 # [miliseconds]
log.debug("sd: %s, LVs: %s, img: %s", sdUUID, volUUIDs, imgUUID)
# Prepare for zeroing
try:
@@ -232,7 +235,7 @@
# Wait until all the asyncs procs return
# Yes, this is a potentially infinite loop. Kill the vdsm task.
while zerofds:
- fdevents = poller.poll(ZEROING_TIMEOUT) # [(fd, event)]
+ fdevents = poller.poll(ZEROING_TIMEOUT) # [(fd, event)]
toDelete = []
for fd, event in fdevents:
proc, vol = zerofds[fd]
@@ -257,7 +260,6 @@
log.error("Remove failed for some of VG: %s zeroed volumes:
%s",
sdUUID, toDelete, exc_info=True)
-
log.debug("finished with VG:%s LVs: %s, img: %s", sdUUID, volUUIDs,
imgUUID)
return
@@ -266,6 +268,7 @@
log = logging.getLogger("storage.Metadata.VGTagMetadataRW")
METADATA_TAG_PREFIX = "MDT_"
METADATA_TAG_PREFIX_LEN = len(METADATA_TAG_PREFIX)
+
def __init__(self, vgName):
self._vgName = vgName
@@ -297,11 +300,13 @@
self.log.debug("Updating metadata adding=%s removing=%s", ",
".join(toAdd), ", ".join(toRemove))
lvm.changeVGTags(self._vgName, delTags=toRemove, addTags=toAdd)
+
class LvMetadataRW(object):
"""
Block Storage Domain metadata implementation
"""
log = logging.getLogger("storage.Metadata.LvMetadataRW")
+
def __init__(self, vgName, lvName, offset, size):
self._size = size
self._lvName = lvName
@@ -344,6 +349,7 @@
LvBasedSDMetadata = lambda vg, lv: DictValidator(PersistentDict(LvMetadataRW(vg, lv, 0,
SD_METADATA_SIZE)), BLOCK_SD_MD_FIELDS)
TagBasedSDMetadata = lambda vg: DictValidator(PersistentDict(VGTagMetadataRW(vg)),
BLOCK_SD_MD_FIELDS)
+
def selectMetadata(sdUUID):
mdProvider = LvBasedSDMetadata(sdUUID, sd.METADATA)
if len(mdProvider) > 0:
@@ -351,6 +357,7 @@
else:
metadata = TagBasedSDMetadata(sdUUID)
return metadata
+
def metadataValidity(vg):
"""
@@ -362,14 +369,14 @@
mda_size = int(vg.vg_mda_size)
mda_free = int(vg.vg_mda_free)
-
- if mda_size < (VG_METADATASIZE * constants.MEGAB)/2:
+ if mda_size < (VG_METADATASIZE * constants.MEGAB) / 2:
mdaStatus['mdavalid'] = False
if (mda_size * VG_MDA_MIN_THRESHOLD) > mda_free:
mdaStatus['mdathreshold'] = False
return mdaStatus
+
class BlockStorageDomain(sd.StorageDomain):
mountpoint = os.path.join(sd.StorageDomain.storage_repository,
@@ -424,7 +431,6 @@
except Exception:
self.log.warn("Resource namespace %s already registered",
lvmActivationNamespace)
-
@classmethod
def metaSize(cls, vgroup):
''' Calc the minimal meta volume size in MB'''
@@ -435,10 +441,10 @@
minmetasize = (SD_METADATA_SIZE / sd.METASIZE * int(vg.extent_size) +
(1024 * 1024 - 1)) / (1024 * 1024)
metaratio = int(vg.extent_size) / sd.METASIZE
- metasize = (int(vg.extent_count) * sd.METASIZE + (1024*1024-1)) / (1024*1024)
+ metasize = (int(vg.extent_count) * sd.METASIZE + (1024 * 1024 - 1)) / (1024 *
1024)
metasize = max(minmetasize, metasize)
- if metasize > int(vg.free) / (1024*1024):
- raise se.VolumeGroupSizeError("volume group has not enough extents %s
(Minimum %s), VG may be too small" % (vg.extent_count, (1024*1024)/sd.METASIZE))
+ if metasize > int(vg.free) / (1024 * 1024):
+ raise se.VolumeGroupSizeError("volume group has not enough extents %s
(Minimum %s), VG may be too small" % (vg.extent_count, (1024 * 1024) / sd.METASIZE))
cls.log.info("size %s MB (metaratio %s)" % (metasize, metaratio))
return metasize
@@ -566,7 +572,6 @@
misc.readfile(lvm.lvPath(self.sdUUID, sd.METADATA), 4096)
return time.time() - t
-
def produceVolume(self, imgUUID, volUUID):
"""
Produce a type specific volume object
@@ -574,13 +579,11 @@
repoPath = self._getRepoPath()
return blockVolume.BlockVolume(repoPath, self.sdUUID, imgUUID, volUUID)
-
def getVolumeClass(self):
"""
Return a type specific volume generator object
"""
return blockVolume.BlockVolume
-
def volumeExists(self, imgPath, volUUID):
"""
@@ -592,7 +595,6 @@
return False
return True
-
@classmethod
def validateCreateVolumeParams(cls, volFormat, preallocate, srcVolUUID):
"""
@@ -602,7 +604,6 @@
'preallocate' - sparse/preallocate
"""
blockVolume.BlockVolume.validateCreateVolumeParams(volFormat, preallocate,
srcVolUUID)
-
def createVolume(self, imgUUID, size, volFormat, preallocate, diskType, volUUID,
desc, srcImgUUID, srcVolUUID):
"""
@@ -665,7 +666,6 @@
devNum += 1
return mapping
-
def updateMapping(self):
# First read existing mapping from metadata
@@ -768,8 +768,8 @@
if (os.path.basename(dev) == pv["guid"] and
int(ext) in range(pestart, pestart + pecount)):
- offs = int(ext) + int(pv["mapoffset"])
- if offs < SD_METADATA_SIZE/sd.METASIZE:
+ offs = int(ext) + int(pv["mapoffset"])
+ if offs < SD_METADATA_SIZE / sd.METASIZE:
raise se.MetaDataMappingError("domain %s: vol %s MD offset %s is
bad - will overwrite SD's MD" % (self.sdUUID, vol_name, offs))
return offs
raise se.MetaDataMappingError("domain %s: can't map PV %s ext %s" %
(self.sdUUID, dev, ext))
@@ -860,7 +860,7 @@
try:
lvs = lvm.getLV(sdUUID)
except se.LogicalVolumeDoesNotExistError:
- lvs = () #No LVs in this VG (domain)
+ lvs = () # No LVs in this VG (domain)
for lv in lvs:
#Fix me: Should raise and get resource lock.
@@ -880,7 +880,7 @@
# First call parent getInfo() - it fills in all the common details
info = sd.StorageDomain.getInfo(self)
# Now add blockSD specific data
- vg = lvm.getVG(self.sdUUID) #vg.name = self.sdUUID
+ vg = lvm.getVG(self.sdUUID) # vg.name = self.sdUUID
info['vguuid'] = vg.uuid
info['state'] = vg.partial
return info
@@ -904,7 +904,7 @@
try:
lvs = lvm.getLV(self.sdUUID)
except se.LogicalVolumeDoesNotExistError:
- lvs = () #No LVs in this VG (domain)
+ lvs = () # No LVs in this VG (domain)
# Collect all the tags from all the volumes, but ignore duplicates
# set conveniently does exactly that
@@ -913,8 +913,8 @@
tags.update(lv.tags)
# Drop non image tags and strip prefix
taglen = len(blockVolume.TAG_PREFIX_IMAGE)
- images = [ i[taglen:] for i in tags
- if i.startswith(blockVolume.TAG_PREFIX_IMAGE) ]
+ images = [i[taglen:] for i in tags
+ if i.startswith(blockVolume.TAG_PREFIX_IMAGE)]
return images
def rmDCVolLinks(self, imgPath, volsImgs):
@@ -936,7 +936,7 @@
try:
os.rmdir(imgPath)
except OSError:
- self.log.warning("Can't rmdir %s. %s", imgPath, exc_info =
True)
+ self.log.warning("Can't rmdir %s. %s", imgPath, exc_info=True)
else:
self.log.debug("removed image dir: %s", imgPath)
return imgPath
@@ -960,7 +960,6 @@
Return dict {volUUID: ([imgUUID1, imgUUID2], parentUUID)]}.
"""
return getAllVolumes(self.sdUUID)
-
def activateVolumes(self, volUUIDs):
"""
@@ -1011,8 +1010,8 @@
# 32 - E2fsck canceled by user request
# 128 - Shared library error
if rc == 1 or rc == 2:
- # rc is a number
- self.log.info("fsck corrected fs errors (%s)", rc)
+ # rc is a number
+ self.log.info("fsck corrected fs errors (%s)", rc)
if rc >= 4:
raise se.BlockStorageDomainMasterFSCKError(masterfsdev, rc)
@@ -1127,14 +1126,14 @@
cls.log.debug("Trying to kill pid %d", pid)
os.kill(pid, signal.SIGKILL)
except OSError, e:
- if e.errno == errno.ESRCH: # No such process
+ if e.errno == errno.ESRCH: # No such process
pass
- elif e.errno == errno.EPERM: # Operation not permitted
+ elif e.errno == errno.EPERM: # Operation not permitted
cls.log.warn("Could not kill pid %d because
operation was not permitted", pid)
else:
- cls.log.warn("Could not kill pid %d because an
unexpected error", exc_info = True)
+ cls.log.warn("Could not kill pid %d because an
unexpected error", exc_info=True)
except:
- cls.log.warn("Could not kill pid %d because an
unexpected error", exc_info = True)
+ cls.log.warn("Could not kill pid %d because an
unexpected error", exc_info=True)
# Try umount, take 2
try:
@@ -1156,7 +1155,6 @@
# It is time to deactivate the master LV now
lvm.deactivateLVs(self.sdUUID, MASTERLV)
-
def refreshDirTree(self):
# create domain images folder
imagesPath = os.path.join(self.domaindir, sd.DOMAIN_IMAGES)
@@ -1176,7 +1174,7 @@
def extendVolume(self, volumeUUID, size, isShuttingDown=None):
self._extendlock.acquire()
try:
- lvm.extendLV(self.sdUUID, volumeUUID, size) #, isShuttingDown) # FIXME
+ lvm.extendLV(self.sdUUID, volumeUUID, size) # , isShuttingDown) # FIXME
finally:
self._extendlock.release()
@@ -1207,6 +1205,7 @@
if rc != 0:
raise se.MkfsError(dev)
+
def _removeVMSfs(dev):
"""
Destroy special VM data file system
@@ -1214,11 +1213,14 @@
# XXX Add at least minimal sanity check:. i.e. fs not mounted
pass
+
def _isSD(vg):
return STORAGE_DOMAIN_TAG in vg.tags
+
def findDomain(sdUUID):
return BlockStorageDomain(BlockStorageDomain.findDomainPath(sdUUID))
+
def getStorageDomainsList():
return [vg.name for vg in lvm.getAllVGs() if _isSD(vg)]
diff --git a/vdsm/storage/dispatcher.py b/vdsm/storage/dispatcher.py
index 7b2ba83..7c765a2 100644
--- a/vdsm/storage/dispatcher.py
+++ b/vdsm/storage/dispatcher.py
@@ -27,6 +27,7 @@
_EXPORTED_ATTRIBUTE = "__dispatcher_exported__"
+
class Protect:
STATUS_OK = {'status': {'code': 0, 'message':
"OK"}}
STATUS_ERROR = {'status': {'code': 100, 'message':
"ERROR"}}
@@ -86,6 +87,7 @@
setattr(f, _EXPORTED_ATTRIBUTE, True)
return f
+
class Dispatcher:
log = logging.getLogger('Storage.Dispatcher')
@@ -93,7 +95,6 @@
self.storage_repository = config.get('irs', 'repository')
self._exposeFunctions(obj)
self.log.info("Starting StorageDispatcher...")
-
def _exposeFunctions(self, obj):
for funcName in dir(obj):
@@ -104,7 +105,6 @@
continue
# Create a new entry in instance's "dict" that will mask
the original method
setattr(self, funcName, Protect(funcObj, funcName).run)
-
def _methodHelp(self, method):
# this method must be present for system.methodHelp
diff --git a/vdsm/storage/image.py b/vdsm/storage/image.py
index a098489..4364c38 100644
--- a/vdsm/storage/image.py
+++ b/vdsm/storage/image.py
@@ -393,7 +393,7 @@
used by other volumes.
"""
# Avoid relink templates for non-NFS domains
- if destDom.getStorageType() not in [ sd.NFS_DOMAIN ]:
+ if destDom.getStorageType() not in [sd.NFS_DOMAIN]:
self.log.debug("Doesn't relink templates non-NFS domain %s",
destDom.sdUUID)
return
@@ -1111,7 +1111,7 @@
"""
chain = []
accumulatedChainSize = 0
- endVolName = vols[ancestor].getParent() # TemplateVolName or None
+ endVolName = vols[ancestor].getParent() # TemplateVolName or None
currVolName = successor
while (currVolName != endVolName):
chain.insert(0, currVolName)
@@ -1194,4 +1194,3 @@
successor, imgUUID, exc_info=True)
self.log.info("Merge src=%s with dst=%s was successfully finished.",
srcVol.getVolumePath(), dstVol.getVolumePath())
-
diff --git a/vdsm/storage/iscsi.py b/vdsm/storage/iscsi.py
index aaf1237..bd1b660 100644
--- a/vdsm/storage/iscsi.py
+++ b/vdsm/storage/iscsi.py
@@ -51,6 +51,7 @@
log = logging.getLogger('Storage.ISCSI')
+
def getDevIscsiInfo(dev):
"""
Reports the iSCSI parameters of the given device 'dev'
@@ -71,8 +72,10 @@
return IscsiSession(0, IscsiInterface(""),
IscsiTarget(IscsiPortal("", 0), 0, ""), None)
+
def getSessionInfo(sessionID):
return supervdsm.getProxy().readSessionInfo(sessionID)
+
def readSessionInfo(sessionID):
sessionName = "session%d" % sessionID
@@ -121,6 +124,7 @@
return IscsiSession(sessionID, iface, target, cred)
+
def addIscsiNode(iface, target, credentials=None):
targetName = target.iqn
portalStr = "%s:%d" % (target.portal.hostname, target.portal.port)
@@ -139,6 +143,7 @@
removeIscsiNode(iface, target)
raise
+
def removeIscsiNode(iface, target):
targetName = target.iqn
portalStr = "%s:%d" % (target.portal.hostname, target.portal.port)
@@ -149,6 +154,7 @@
pass
iscsiadm.node_delete(iface.name, portalStr, targetName)
+
def addIscsiPortal(iface, portal, credentials=None):
discoverType = "sendtargets"
@@ -177,10 +183,12 @@
deleteIscsiPortal(iface, portal)
raise
+
def deleteIscsiPortal(iface, portal):
discoverType = "sendtargets"
portalStr = "%s:%d" % (portal.hostname, portal.port)
iscsiadm.discoverydb_delete(discoverType, iface.name, portalStr)
+
def discoverSendTargets(iface, portal, credentials=None):
# Because proper discovery actually has to clear the DB having multiple
@@ -203,6 +211,7 @@
return res
+
def iterateIscsiSessions():
for sessionDir in glob.iglob("/sys/class/iscsi_session/session*"):
sessionID = int(os.path.basename(sessionDir)[len("session"):])
@@ -213,6 +222,7 @@
raise
continue
+
class ChapCredentials(object):
def __init__(self, username=None, password=None):
@@ -235,6 +245,8 @@
return hash(self.__class__) ^ hash(self.username) ^ hash(self.password)
# Technically there are a lot more interface properties but VDSM doesn't
+
+
# support them at the moment
class IscsiInterface(object):
@@ -341,10 +353,12 @@
def __repr__(self):
return "<IscsiInterface name='%s'
transport='%s'>" % (self.name, self.transport)
+
def iterateIscsiInterfaces():
names = iscsiadm.iface_list()
for name in names:
yield IscsiInterface(name)
+
@misc.samplingmethod
def rescan():
@@ -352,6 +366,7 @@
iscsiadm.session_rescan()
except iscsiadm.IscsiError:
pass
+
@misc.samplingmethod
def forceIScsiScan():
@@ -403,6 +418,7 @@
log.warning("Still waiting for scsi scan of hbas: %s",
tuple(hba for p in processes))
+
def devIsiSCSI(dev):
hostdir = os.path.realpath(os.path.join("/sys/block", dev,
"device/../../.."))
host = os.path.basename(hostdir)
@@ -410,6 +426,7 @@
scsi_host = os.path.join(hostdir, constants.STRG_SCSI_HOST, host)
proc_name = os.path.join(scsi_host, "proc_name")
return (os.path.exists(iscsi_host) and os.path.exists(proc_name))
+
def getiScsiTarget(dev):
# FIXME: Use the new target object instead of a string
@@ -420,12 +437,14 @@
with open(os.path.join(iscsi_session, "targetname")) as f:
return f.readline().strip()
+
def getiScsiSession(dev):
# FIXME: Use the new session object instead of a string
device = os.path.realpath(os.path.join("/sys/block", dev,
"device"))
sessiondir = os.path.realpath(os.path.join(device, "../.."))
session = os.path.basename(sessiondir)
return int(session[len('session'):])
+
def getDefaultInitiatorName():
with open("/etc/iscsi/initiatorname.iscsi", "r") as f:
@@ -451,7 +470,10 @@
return sessions
+
RE_SCSI_SESSION = re.compile(r"^[Ss]ession(\d+)$")
+
+
def disconnectFromUndelyingStorage(devPath):
storageList = findUnderlyingStorage(devPath)
res = []
@@ -466,6 +488,7 @@
return res
+
def disconnectiScsiSession(sessionID):
#FIXME : Should throw exception on error
sessionID = int(sessionID)
@@ -475,4 +498,3 @@
return e[0]
return 0
-
diff --git a/vdsm/storage/resourceManager.py b/vdsm/storage/resourceManager.py
index b343566..9923923 100644
--- a/vdsm/storage/resourceManager.py
+++ b/vdsm/storage/resourceManager.py
@@ -30,12 +30,22 @@
import misc
from logUtils import SimpleLogAdapter
+
# Errors
-class ResourceManagerError(Exception): pass
-class RequestAlreadyProcessedError(ResourceManagerError): pass
-class RequestTimedOutError(ResourceManagerError): pass
+
+class ResourceManagerError(Exception):
+ pass
+
+
+class RequestAlreadyProcessedError(ResourceManagerError):
+ pass
+
+
+class RequestTimedOutError(ResourceManagerError):
+ pass
# TODO : Consider changing when we decided on a unified way of representing enums.
+
class LockType:
shared = "shared"
@@ -54,6 +64,7 @@
elif lockstate == LockState.locked:
return cls.exclusive
raise ValueError("invalid lockstate %s" % lockstate)
+
class LockState:
free = "free"
@@ -83,6 +94,7 @@
if str(locktype) == LockType.exclusive:
return cls.locked
raise ValueError("invalid locktype %s" % locktype)
+
@classmethod
def validate(cls, state):
try:
@@ -90,6 +102,7 @@
raise ValueError
except:
raise ValueError("invalid lock state %s" % state)
+
#TODO : Integrate all factory functionality to manager
class SimpleResourceFactory(object):
@@ -118,6 +131,7 @@
"""
return None
+
class RequestRef(object):
"""
The request object that the user can interact with.
@@ -144,6 +158,7 @@
return False
return (self._realRequset == other._realRequset)
+
class Request(object):
"""
@@ -219,8 +234,7 @@
except Exception:
self._log.warn("Request callback threw an exception",
exc_info=True)
-
- def wait(self, timeout = None):
+ def wait(self, timeout=None):
return self._doneEvent.wait(timeout)
def granted(self):
@@ -229,6 +243,7 @@
def __str__(self):
return "Request for %s - %s: %s" % (self.fullName, self.lockType,
self._status())
+
class ResourceRef(object):
"""
@@ -301,11 +316,12 @@
# a deadlock. This is why I need to use a timer. It will defer the
operation
# and use a different context.
ResourceManager.getInstance().releaseResource(namespace, name)
- threading.Thread(target = release, args=(self._log, self.namespace,
self.name)).start()
+ threading.Thread(target=release, args=(self._log, self.namespace,
self.name)).start()
self._isValid = False
def __repr__(self):
return "< ResourceRef '%s', isValid: '%s' obj:
'%s'>" % (self.fullName, self.isValid, repr(self.__wrappedObject) if
self.isValid else None)
+
class ResourceManager(object):
"""
@@ -339,7 +355,7 @@
"""
def __init__(self, factory):
self.resources = {}
- self.lock = threading.Lock()#misc.RWLock()
+ self.lock = threading.Lock() # misc.RWLock()
self.factory = factory
def __init__(self):
@@ -453,6 +469,7 @@
raise TypeError("'timeout' must be number")
resource = Queue()
+
def callback(req, res):
resource.put(res)
@@ -618,10 +635,11 @@
resource.activeUsers += 1
self._log.debug("Request '%s' was granted (%d active
users)", nextRequest, resource.activeUsers)
+
class Owner(object):
log = logging.getLogger('ResourceManager.Owner')
- def __init__(self, ownerobject, raiseonfailure = False):
+ def __init__(self, ownerobject, raiseonfailure=False):
self.ownerobject = ownerobject
self.requests = {}
self.resources = {}
@@ -746,7 +764,7 @@
raise ValueError("request %s is already requested by %s" %
(fullName, self))
try:
- request = manager.registerResource(namespace, name, locktype,
self._onRequestFinished)
+ request = manager.registerResource(namespace, name, locktype,
self._onRequestFinished)
except ValueError, ex:
self.log.debug("%s: request for '%s' could not be processed
(%s)", self, fullName, ex)
raise se.InvalidResourceName()
@@ -842,7 +860,6 @@
if hasattr(self.ownerobject, "resourceReleased"):
self.ownerobject.resourceReleased(resource.namespace, resource.name)
-
def cancelAll(self):
self.log.debug("Owner.cancelAll requests %s", self.requests)
self.lock.acquire()
@@ -858,7 +875,7 @@
def ownedResources(self):
res = self.resources.values()
- return [ (r.namespace, r.name, r.getStatus()) for r in res]
+ return [(r.namespace, r.name, r.getStatus()) for r in res]
def requestedResources(self):
reqs = self.requests.values()
@@ -873,5 +890,3 @@
def __str__(self):
return str(self.ownerobject)
-
-
diff --git a/vdsm/storage/sp.py b/vdsm/storage/sp.py
index 72443c6..8515f87 100644
--- a/vdsm/storage/sp.py
+++ b/vdsm/storage/sp.py
@@ -71,9 +71,11 @@
SPM_ID_FREE = -1
LVER_INVALID = -1
+
def domainListEncoder(domDict):
- domains = ','.join([ '%s:%s' % (k, v) for k, v in
domDict.iteritems()])
+ domains = ','.join(['%s:%s' % (k, v) for k, v in
domDict.iteritems()])
return domains
+
def domainListDecoder(s):
domList = {}
@@ -98,6 +100,7 @@
MAX_DOMAINS -= MAX_POOL_DESCRIPTION_SIZE + sd.MAX_DOMAIN_DESCRIPTION_SIZE
MAX_DOMAINS -= blockSD.PVS_METADATA_SIZE
MAX_DOMAINS /= 48
+
class StoragePool(Securable):
'''
@@ -138,7 +141,6 @@
@unsecured
def getSpmStatus(self):
return self.spmRole, self.getSpmLver(), self.getSpmId()
-
def __del__(self):
if len(self.domainMonitor.monitoredDomains) > 0:
@@ -387,7 +389,7 @@
self.setMetaParam(PMDK_SPM_ID, SPM_ID_FREE,
__securityOverride=True)
except:
- pass # The system can handle this inconsistency
+ pass # The system can handle this inconsistency
try:
self.masterDomain.releaseClusterLock()
@@ -421,8 +423,6 @@
self.log.debug("Running initial domain upgrade threads")
for sdUUID in self._domainsToUpgrade:
threading.Thread(target=self._upgradeCallback, args=(sdUUID, True),
kwargs={"__securityOverride": True}).start()
-
-
@unsecured
def __createMailboxMonitor(self):
@@ -482,7 +482,6 @@
mver = self.getMasterVersion()
if not int(masterVersion) > mver:
raise se.StoragePoolWrongMaster(self.spUUID, self.masterDomain.sdUUID)
-
@unsecured
def getMaximumSupportedDomains(self):
@@ -616,7 +615,6 @@
with open(self._poolFile, "w") as f:
f.writelines(pers)
-
@unsecured
def connect(self, hostID, scsiKey, msdUUID, masterVersion):
"""
@@ -646,12 +644,10 @@
return True
-
@unsecured
def stopMonitoringDomains(self):
self.domainMonitor.close()
return True
-
@unsecured
def disconnect(self):
@@ -679,7 +675,6 @@
self.stopMonitoringDomains()
return True
-
@unsecured
def getPoolParams(self):
file = open(self._poolFile, "r")
@@ -697,7 +692,6 @@
file.close()
return hostId, scsiKey, msdUUID, masterVersion
-
@unsecured
def createMaster(self, poolName, domain, masterVersion, leaseParams):
@@ -832,7 +826,7 @@
try:
fileUtils.tarCopy(src, dst, exclude=("./lost+found",))
except fileUtils.TarCopyFailed:
- self.log.error("tarCopy failed", exc_info = True)
+ self.log.error("tarCopy failed", exc_info=True)
# Failed to copy the master data
try:
newmsd.unmountMaster()
@@ -1042,7 +1036,7 @@
Validate that the storage domain is owned by the storage pool.
'sdUUID' - storage domain UUID
"""
- self.log.info("sdUUID=%s spUUID=%s", sdUUID, self.spUUID)
+ self.log.info("sdUUID=%s spUUID=%s", sdUUID, self.spUUID)
domainStatuses = self.getDomains()
dom = sdCache.produce(sdUUID)
@@ -1110,7 +1104,7 @@
m = mount.getMountFromTarget(masterDir)
except OSError, e:
if e.errno == errno.ENOENT:
- pass # Master is not mounted
+ pass # Master is not mounted
else:
raise
else:
@@ -1135,12 +1129,11 @@
return
else:
if current == linkName:
- return #Nothing to do
+ return # Nothing to do
#Rebuild the link
tmp_link_name = os.path.join(self.storage_repository, str(uuid.uuid4()))
- os.symlink(src, tmp_link_name) #make tmp_link
+ os.symlink(src, tmp_link_name) # make tmp_link
os.rename(tmp_link_name, linkName)
-
@unsecured
def _cleanupDomainLinks(self, domain):
@@ -1163,7 +1156,6 @@
with domPoolMD.transaction():
domain.changeRole(sd.REGULAR_DOMAIN)
domPoolMD[PMDK_MASTER_VER] = 0
-
@unsecured
def __rebuild(self, msdUUID, masterVersion):
@@ -1195,7 +1187,7 @@
block_mountpoint = os.path.join(sd.StorageDomain.storage_repository,
sd.DOMAIN_MNT_POINT, sd.BLOCKSD_DIR)
blockDomUUIDs = [vg.name for vg in blockSD.lvm.getVGs(domUUIDs)]
- domDirs = {} # {domUUID: domaindir}
+ domDirs = {} # {domUUID: domaindir}
#Add the block domains
for domUUID in blockDomUUIDs:
domaindir = os.path.join(block_mountpoint, domUUID)
@@ -1204,7 +1196,7 @@
fileUtils.createdir(os.path.join(domaindir, sd.DOMAIN_META_DATA))
fileUtils.createdir(os.path.join(domaindir, sd.DOMAIN_IMAGES))
#Add the file domains
- for domUUID, domaindir in fileSD.scanDomains(): #[(fileDomUUID, file_domaindir)]
+ for domUUID, domaindir in fileSD.scanDomains(): # [(fileDomUUID,
file_domaindir)]
if domUUID in domUUIDs:
domDirs[domUUID] = domaindir
@@ -1232,7 +1224,6 @@
except Exception as e:
self.log.warn("Could not clean all trash from the pool dom `%s`
(%s)", oldie, e)
-
@unsecured
def refresh(self, msdUUID, masterVersion):
"""
@@ -1241,7 +1232,6 @@
"""
sdCache.refresh()
self.__rebuild(msdUUID=msdUUID, masterVersion=masterVersion)
-
def updateVM(self, vmList, sdUUID=None):
"""
@@ -1273,7 +1263,7 @@
vmPath = os.path.join(vms, vmUUID)
if fileUtils.pathExists(vmPath):
try:
- fileUtils.cleanupdir(vmPath, ignoreErrors = False)
+ fileUtils.cleanupdir(vmPath, ignoreErrors=False)
except RuntimeError as e:
raise se.MiscDirCleanupFailure(str(e))
@@ -1287,7 +1277,6 @@
raise
-
def removeVM(self, vmUUID, sdUUID=None):
"""
Remove VM.
@@ -1296,7 +1285,7 @@
self.log.info("spUUID=%s vmUUID=%s sdUUID=%s", self.spUUID, vmUUID,
sdUUID)
vms = self._getVMsPath(sdUUID)
if os.path.exists(os.path.join(vms, vmUUID)):
- fileUtils.cleanupdir(os.path.join(vms, vmUUID))
+ fileUtils.cleanupdir(os.path.join(vms, vmUUID))
def setDescription(self, descr):
"""
@@ -1312,7 +1301,6 @@
raise se.UnicodeArgumentException()
self.setMetaParam(PMDK_POOL_DESCRIPTION, descr)
-
def extendVolume(self, sdUUID, volumeUUID, size, isShuttingDown=None):
sdCache.produce(sdUUID).extendVolume(volumeUUID, size, isShuttingDown)
@@ -1425,8 +1413,6 @@
self.log.error("Pool metadata error", exc_info=True)
raise se.StoragePoolActionError(self.spUUID)
-
- # Get info of all pool's domains
domDict = self.getDomains()
repoStats = self.getRepoStats()
for item in domDict:
@@ -1469,7 +1455,6 @@
info["pool_status"] = "connected"
return dict(info=info, dominfo=list_and_stats)
-
@unsecured
def getIsoDomain(self):
"""
@@ -1480,8 +1465,8 @@
try:
dom = sdCache.produce(item)
except se.StorageDomainDoesNotExist:
- self.log.warn("Storage domain %s does not exist", item)
- continue
+ self.log.warn("Storage domain %s does not exist", item)
+ continue
if dom.isISO():
return dom
@@ -1535,7 +1520,6 @@
self.log.debug("Master domain %s verified, version %s", msdUUID,
masterVersion)
return domain
-
@unsecured
def invalidateMetadata(self):
if not self.spmRole == SPM_ACQUIRED:
@@ -1586,7 +1570,6 @@
# Master tree should be exist in this point
# Recreate it if not.
dom.createMasterTree()
-
@unsecured
def getImageDomainsList(self, imgUUID, datadomains=True):
@@ -1696,7 +1679,6 @@
srcVolUUID, dstImgUUID, dstVolUUID, descr,
dstSdUUID,
volType, volFormat, preallocate, postZero,
force)
return dict(uuid=dstUUID)
-
def moveImage(self, srcDomUUID, dstDomUUID, imgUUID, vmUUID, op, postZero, force):
"""
@@ -1822,7 +1804,6 @@
repoPath = os.path.join(self.storage_repository, self.spUUID)
image.Image(repoPath).multiMove(srcDomUUID, dstDomUUID, imgDict, vmUUID,
force)
-
def mergeSnapshots(self, sdUUID, vmUUID, imgUUID, ancestor, successor, postZero):
"""
Merges the source volume to the destination volume.
@@ -1845,8 +1826,6 @@
with rmanager.acquireResource(imageResourcesNamespace, imgUUID,
rm.LockType.exclusive):
repoPath = os.path.join(self.storage_repository, self.spUUID)
image.Image(repoPath).merge(sdUUID, vmUUID, imgUUID, ancestor, successor,
postZero)
-
-
def createVolume(self, sdUUID, imgUUID, size, volFormat, preallocate, diskType,
volUUID=None,
desc="", srcImgUUID=volume.BLANK_UUID,
srcVolUUID=volume.BLANK_UUID):
@@ -1891,7 +1870,6 @@
srcImgUUID=srcImgUUID,
srcVolUUID=srcVolUUID)
return dict(uuid=uuid)
-
def deleteVolume(self, sdUUID, imgUUID, volumes, postZero, force):
"""
Deletes a given volume.
@@ -1914,7 +1892,6 @@
sdCache.produce(sdUUID).produceVolume(imgUUID,
volUUID).delete(postZero=postZero,
force=force)
-
def setMaxHostID(self, spUUID, maxID):
"""
Set maximum host ID
@@ -1923,7 +1900,6 @@
self._maxHostID
self.spmMailer.setMaxHostID(maxID)
raise se.NotImplementedException
-
def detachAllDomains(self):
"""
@@ -2005,4 +1981,3 @@
def extendSD(self, sdUUID, devlist, force):
sdCache.produce(sdUUID).extend(devlist, force)
-
diff --git a/vdsm/storage/storage_mailbox.py b/vdsm/storage/storage_mailbox.py
index c74fccc..74210d5 100644
--- a/vdsm/storage/storage_mailbox.py
+++ b/vdsm/storage/storage_mailbox.py
@@ -36,14 +36,14 @@
from storage_exception import InvalidParameterException
from vdsm import constants
-__author__="ayalb"
-__date__ ="$Mar 9, 2009 5:25:07 PM$"
+__author__ = "ayalb"
+__date__ = "$Mar 9, 2009 5:25:07 PM$"
CHECKSUM_BYTES = 4
MAILBOX_SIZE = 4096
PACKED_UUID_SIZE = 16
-VOLUME_MAX_SIZE = 0xFFFFFFFF # 64 bit unsigned max size
+VOLUME_MAX_SIZE = 0xFFFFFFFF # 64 bit unsigned max size
SIZE_CHARS = 16
MESSAGE_VERSION = "1"
MESSAGE_SIZE = 64
@@ -52,12 +52,13 @@
BLOCK_SIZE = 512
REPLY_OK = 1
EMPTYMAILBOX = MAILBOX_SIZE * "\0"
-BLOCKS_PER_MAILBOX = int(MAILBOX_SIZE/BLOCK_SIZE)
+BLOCKS_PER_MAILBOX = int(MAILBOX_SIZE / BLOCK_SIZE)
SLOTS_PER_MAILBOX = int(MAILBOX_SIZE / MESSAGE_SIZE)
-MESSAGES_PER_MAILBOX = SLOTS_PER_MAILBOX - 1 # Last message slot is reserved for metadata
(checksum, extendable mailbox, etc)
+MESSAGES_PER_MAILBOX = SLOTS_PER_MAILBOX - 1 # Last message slot is reserved for
metadata (checksum, extendable mailbox, etc)
_zeroCheck = misc.checksum(EMPTYMAILBOX, CHECKSUM_BYTES)
-pZeroChecksum = struct.pack('<l', _zeroCheck) # Assumes CHECKSUM_BYTES equals
4!!!
+pZeroChecksum = struct.pack('<l', _zeroCheck) # Assumes CHECKSUM_BYTES equals
4!!!
+
def dec2hex(n):
return "%x" % n
@@ -73,7 +74,6 @@
ctask = task.Task(id=None, name=cmd)
vars.task = ctask
ctask.prepare(cmd, *args)
-
class SPM_Extend_Message:
@@ -108,10 +108,8 @@
self.log.debug('new extend msg created: domain: %s, volume: %s',
volumeData['domainID'], volumeData['volumeID'])
-
def __getitem__(self, index):
return self.payload[index]
-
def checkReply(self, reply):
# Sanity check - Make sure reply is for current message
@@ -125,7 +123,6 @@
# raise RuntimeError('Request failed')
return REPLY_OK
-
@classmethod
def processRequest(cls, pool, msgID, payload):
cls.log.debug("processRequest, payload:" + repr(payload))
@@ -135,8 +132,8 @@
volume = {}
volume['poolID'] = pool.spUUID
- volume['domainID'] = misc.unpackUuid(payload[sdOffset:
sdOffset+PACKED_UUID_SIZE])
- volume['volumeID'] = misc.unpackUuid(payload[volumeOffset:
volumeOffset+PACKED_UUID_SIZE])
+ volume['domainID'] = misc.unpackUuid(payload[sdOffset:sdOffset +
PACKED_UUID_SIZE])
+ volume['volumeID'] = misc.unpackUuid(payload[volumeOffset:volumeOffset +
PACKED_UUID_SIZE])
size = int(payload[sizeOffset:sizeOffset + SIZE_CHARS], 16)
cls.log.info("processRequest: extending volume %s "
@@ -155,7 +152,6 @@
finally:
pool.spmMailer.sendReply(msgID, msg)
return {'status': {'code': 0, 'message':
'Done'}}
-
class HSM_Mailbox:
@@ -180,13 +176,11 @@
self._mailman = HSM_MailMonitor(self._inbox, self._outbox, hostID, self._queue,
monitorInterval)
self.log.debug('HSM_MailboxMonitor created for pool %s' % self._poolID)
-
def sendExtendMsg(self, volumeData, newSize, callbackFunction=None):
msg = SPM_Extend_Message(volumeData, newSize, callbackFunction)
if str(msg.pool) != self._poolID:
raise ValueError('PoolID does not correspond to Mailbox pool')
self._queue.put(msg)
-
def stop(self):
if self._mailman:
@@ -194,7 +188,6 @@
self._mailman.tp.joinAll(waitForTasks=False)
else:
self.log.warning("HSM_MailboxMonitor - No mail monitor object available
to stop")
-
def flushMessages(self):
if self._mailman:
@@ -218,17 +211,17 @@
self._activeMessages = {}
self._monitorInterval = monitorInterval
self._hostID = int(hostID)
- self._used_slots_array = [ 0 ] * MESSAGES_PER_MAILBOX
+ self._used_slots_array = [0] * MESSAGES_PER_MAILBOX
self._outgoingMail = EMPTYMAILBOX
self._incomingMail = EMPTYMAILBOX
# TODO: add support for multiple paths (multiple mailboxes)
self._spmStorageDir = config.get('irs', 'repository')
- self._inCmd = [ constants.EXT_DD,
+ self._inCmd = [constants.EXT_DD,
'if=' + str(inbox),
'iflag=direct,fullblock',
'bs=' + str(BLOCK_SIZE),
'count=' + str(BLOCKS_PER_MAILBOX),
- 'skip=' + str(self._hostID*BLOCKS_PER_MAILBOX)
+ 'skip=' + str(self._hostID * BLOCKS_PER_MAILBOX)
]
self._outCmd = [constants.EXT_DD,
'of=' + str(outbox),
@@ -236,12 +229,12 @@
'oflag=direct',
'conv=notrunc',
'bs=' + str(BLOCK_SIZE),
- 'seek=' + str(self._hostID*BLOCKS_PER_MAILBOX)
+ 'seek=' + str(self._hostID * BLOCKS_PER_MAILBOX)
]
self._init = False
- self._initMailbox() # Read initial mailbox state
+ self._initMailbox() # Read initial mailbox state
self._msgCounter = 0
- self._sendMail() # Clear outgoing mailbox
+ self._sendMail() # Clear outgoing mailbox
threading.Thread.__init__(self)
self.start()
@@ -254,34 +247,35 @@
else:
self.log.warning("HSM_MailboxMonitor - Could not initialize mailbox,
will not accept requests until init succeeds")
-
def immStop(self):
self._stop = True
-
def immFlush(self):
self._flush = True
-
def _handleResponses(self, newMsgs):
rc = False
for i in range(0, MESSAGES_PER_MAILBOX):
# Skip checking non used slots
- if self._used_slots_array[i] == 0: continue
+ if self._used_slots_array[i] == 0:
+ continue
# Skip empty return messages (messages with version 0)
- start = i*MESSAGE_SIZE
+ start = i * MESSAGE_SIZE
# First byte of message is message version.
# Check return message version, if 0 then message is empty
- if newMsgs[start] in ['\0', '0']: continue
+ if newMsgs[start] in ['\0', '0']:
+ continue
for j in range(start, start + MESSAGE_SIZE):
- if newMsgs[j] != self._incomingMail[j]: break
+ if newMsgs[j] != self._incomingMail[j]:
+ break
# If search exhausted then message hasn't changed since last read and can
be skipped
- if j == (start + MESSAGE_SIZE - 1): continue
+ if j == (start + MESSAGE_SIZE - 1):
+ continue
#
# We only get here if there is a novel reply so we can remove the message
from the active list
@@ -289,18 +283,18 @@
#
rc = True
- newMsg = newMsgs[start: start + MESSAGE_SIZE]
+ newMsg = newMsgs[start:start + MESSAGE_SIZE]
if newMsg == CLEAN_MESSAGE:
del self._activeMessages[i]
self._used_slots_array[i] = 0
self._msgCounter -= 1
- self._outgoingMail = self._outgoingMail[0: start] + MESSAGE_SIZE *
"\0" + self._outgoingMail[start + MESSAGE_SIZE: ]
+ self._outgoingMail = self._outgoingMail[0:start] + MESSAGE_SIZE *
"\0" + self._outgoingMail[start + MESSAGE_SIZE:]
continue
msg = self._activeMessages[i]
self._activeMessages[i] = CLEAN_MESSAGE
- self._outgoingMail = self._outgoingMail[0: start] + CLEAN_MESSAGE +
self._outgoingMail[start + MESSAGE_SIZE: ]
+ self._outgoingMail = self._outgoingMail[0:start] + CLEAN_MESSAGE +
self._outgoingMail[start + MESSAGE_SIZE:]
try:
self.log.debug("HSM_MailboxMonitor(%s/%s) - Checking reply:
%s", self._msgCounter, MESSAGES_PER_MAILBOX, repr(newMsg))
@@ -323,7 +317,6 @@
self._incomingMail = newMsgs
return rc
-
def _checkForMail(self):
#self.log.debug("HSM_MailMonitor - checking for mail")
#self.log.debug("Running command: " + str(self._inCmd))
@@ -335,14 +328,12 @@
#self.log.debug("Parsing inbox content: %s", in_mail)
return self._handleResponses(in_mail)
-
def _sendMail(self):
self.log.info("HSM_MailMonitor sending mail to SPM - " +
str(self._outCmd))
- chk = misc.checksum(self._outgoingMail[0: MAILBOX_SIZE-CHECKSUM_BYTES],
CHECKSUM_BYTES)
- pChk = struct.pack('<l', chk) # Assumes CHECKSUM_BYTES equals 4!!!
- self._outgoingMail = self._outgoingMail[0: MAILBOX_SIZE-CHECKSUM_BYTES] + pChk
+ chk = misc.checksum(self._outgoingMail[0:MAILBOX_SIZE - CHECKSUM_BYTES],
CHECKSUM_BYTES)
+ pChk = struct.pack('<l', chk) # Assumes CHECKSUM_BYTES equals 4!!!
+ self._outgoingMail = self._outgoingMail[0:MAILBOX_SIZE - CHECKSUM_BYTES] + pChk
misc.execCmd(self._outCmd, data=self._outgoingMail, sudo=False)
-
def _handleMessage(self, message):
# TODO: add support for multiple mailboxes
@@ -371,7 +362,6 @@
self._outgoingMail = self._outgoingMail[0:start] + message.payload +
self._outgoingMail[end:]
self.log.debug("HSM_MailMonitor - start: %s, end: %s, len: %s,
message(%s/%s): %s" % (start, end, len(self._outgoingMail), self._msgCounter,
MESSAGES_PER_MAILBOX, repr(self._outgoingMail[start:end])))
-
def run(self):
try:
failures = 0
@@ -380,7 +370,7 @@
while not self._init and not self._stop:
try:
time.sleep(2)
- self._initMailbox() # Read initial mailbox state
+ self._initMailbox() # Read initial mailbox state
except:
pass
@@ -445,8 +435,7 @@
finally:
self.log.info("HSM_MailboxMonitor - Incoming mail monitoring thread
stopped, clearing outgoing mail")
self._outgoingMail = EMPTYMAILBOX
- self._sendMail() # Clear outgoing mailbox
-
+ self._sendMail() # Clear outgoing mailbox
class SPM_MailMonitor:
@@ -509,18 +498,14 @@
thread.start_new_thread(self.run, (self, ))
self.log.debug('SPM_MailMonitor created for pool %s' % self._poolID)
-
def stop(self):
self._stop = True
-
def isStopped(self):
return self._stopped
-
def getMaxHostID(self):
return self._numHosts
-
def setMaxHostID(self, newMaxId):
self._inLock.acquire()
@@ -534,22 +519,21 @@
delta = MAILBOX_SIZE * diff
self._outgoingMail = self._outgoingMail[:-delta]
self._incomingMail = self._incomingMail[:-delta]
- self._numHosts=newMaxId
+ self._numHosts = newMaxId
self._outMailLen = MAILBOX_SIZE * self._numHosts
self._outLock.release()
self._inLock.release()
-
def _validateMailbox(self, mailbox, mailboxIndex):
- chkStart = MAILBOX_SIZE-CHECKSUM_BYTES
- chk = misc.checksum(mailbox[0: chkStart], CHECKSUM_BYTES)
- pChk = struct.pack('<l', chk) # Assumes CHECKSUM_BYTES equals 4!!!
- if pChk != mailbox[chkStart: chkStart+CHECKSUM_BYTES]:
+ chkStart = MAILBOX_SIZE - CHECKSUM_BYTES
+ chk = misc.checksum(mailbox[0:chkStart], CHECKSUM_BYTES)
+ pChk = struct.pack('<l', chk) # Assumes CHECKSUM_BYTES equals 4!!!
+ if pChk != mailbox[chkStart:chkStart + CHECKSUM_BYTES]:
self.log.error("SPM_MailMonitor: mailbox %s checksum failed, not
clearing mailbox, clearing newMail.", str(mailboxIndex))
return False
- elif pChk == pZeroChecksum: return False # Ignore messages of empty mailbox
+ elif pChk == pZeroChecksum:
+ return False # Ignore messages of empty mailbox
return True
-
def _handleRequests(self, newMail):
@@ -568,26 +552,26 @@
msgStart = msgId * MESSAGE_SIZE
# First byte of message is message version. Check message version, if 0
then message is empty and can be skipped
- if newMail[msgStart] in ['\0', '0']: continue
+ if newMail[msgStart] in ['\0', '0']:
+ continue
# Most mailboxes are probably empty so it costs less to check that all
messages start with 0 than
# to validate the mailbox, therefor this is done after we find a non
empty message in mailbox
if not isMailboxValidated:
- if not self._validateMailbox(newMail[mailboxStart: mailboxStart +
MAILBOX_SIZE], host):
+ if not self._validateMailbox(newMail[mailboxStart:mailboxStart +
MAILBOX_SIZE], host):
#Cleaning invalid mbx in newMail
- newMail = newMail[:mailboxStart] + EMPTYMAILBOX +
newMail[mailboxStart + MAILBOX_SIZE:]
+ newMail = newMail[:mailboxStart] + EMPTYMAILBOX +
newMail[mailboxStart + MAILBOX_SIZE:]
break
self.log.debug("SPM_MailMonitor: Mailbox %s validated, checking
mail", host)
isMailboxValidated = True
-
- newMsg = newMail[msgStart: msgStart+MESSAGE_SIZE]
+ newMsg = newMail[msgStart:msgStart + MESSAGE_SIZE]
msgOffset = msgId * MESSAGE_SIZE
if newMsg == CLEAN_MESSAGE:
# Should probably put a setter on outgoingMail which would take the
lock
self._outLock.acquire()
try:
- self._outgoingMail = self._outgoingMail[0:msgOffset] +
CLEAN_MESSAGE + self._outgoingMail[msgOffset+MESSAGE_SIZE: self._outMailLen]
+ self._outgoingMail = self._outgoingMail[0:msgOffset] +
CLEAN_MESSAGE + self._outgoingMail[msgOffset + MESSAGE_SIZE:self._outMailLen]
finally:
self._outLock.release()
send = True
@@ -601,18 +585,19 @@
break
# If search exhausted, i.e. message hasn't changed since last read,
it can be skipped
- if not isMessageNew: continue
+ if not isMessageNew:
+ continue
# We only get here if there is a novel request
try:
- msgType = newMail[msgStart+1: msgStart+5]
+ msgType = newMail[msgStart + 1:msgStart + 5]
if msgType in self._messageTypes:
# Use message class to process request according to message
specific logic
id = str(uuid.uuid4())
- self.log.debug("SPM_MailMonitor: processing request:
%s" % repr(newMail[msgStart: msgStart+MESSAGE_SIZE]))
+ self.log.debug("SPM_MailMonitor: processing request:
%s" % repr(newMail[msgStart:msgStart + MESSAGE_SIZE]))
res = self.tp.queueTask(id, runTask,
(self._messageTypes[msgType], msgId,
- newMail[msgStart: msgStart+MESSAGE_SIZE])
+ newMail[msgStart:msgStart + MESSAGE_SIZE])
)
if not res:
raise Exception()
@@ -628,7 +613,6 @@
self._incomingMail = newMail
return send
-
def _checkForMail(self):
# Lock is acquired in order to make sure that neither _numHosts nor incomingMail
are changed during checkForMail
@@ -657,15 +641,14 @@
finally:
self._inLock.release()
-
def sendReply(self, msgID, msg):
# Lock is acquired in order to make sure that neither _numHosts nor outgoingMail
are changed while used
self._outLock.acquire()
try:
- msgOffset = msgID*MESSAGE_SIZE
- self._outgoingMail = self._outgoingMail[0:msgOffset] + msg.payload +
self._outgoingMail[msgOffset+MESSAGE_SIZE: self._outMailLen]
+ msgOffset = msgID * MESSAGE_SIZE
+ self._outgoingMail = self._outgoingMail[0:msgOffset] + msg.payload +
self._outgoingMail[msgOffset + MESSAGE_SIZE:self._outMailLen]
mailboxOffset = (msgID / SLOTS_PER_MAILBOX) * MAILBOX_SIZE
- mailbox = self._outgoingMail[mailboxOffset: mailboxOffset + MAILBOX_SIZE]
+ mailbox = self._outgoingMail[mailboxOffset:mailboxOffset + MAILBOX_SIZE]
cmd = self._outCmd + ['bs=' + str(MAILBOX_SIZE), 'seek=' +
str(mailboxOffset / MAILBOX_SIZE)]
#self.log.debug("Running command: %s, for message id: %s",
str(cmd), str(msgID))
(rc, out, err) = misc.execCmd(cmd, sudo=False, data=mailbox)
@@ -673,7 +656,6 @@
self.log.error("SPM_MailMonitor: sendReply - couldn't send
reply, dd failed")
finally:
self._outLock.release()
-
def run(self, *args):
try:
@@ -689,4 +671,3 @@
self._stopped = True
self.tp.joinAll(waitForTasks=False)
self.log.info("SPM_MailMonitor - Incoming mail monitoring thread
stopped")
-
diff --git a/vdsm/storage/sync.py b/vdsm/storage/sync.py
index 1dbedc5..832c85d 100644
--- a/vdsm/storage/sync.py
+++ b/vdsm/storage/sync.py
@@ -1,13 +1,17 @@
from threading import Thread, Event
from functools import wraps
+
def AsyncCallStub(result):
def stubby():
return result
return AsyncCall(stubby, [], [])
-class AsyncCallNotDone(RuntimeError): pass
+
+class AsyncCallNotDone(RuntimeError):
+ pass
+
class AsyncCall(object):
def __init__(self, f, args, kwargs):
diff --git a/vdsm/storage/threadLocal.py b/vdsm/storage/threadLocal.py
index ce6ff08..ea245d0 100644
--- a/vdsm/storage/threadLocal.py
+++ b/vdsm/storage/threadLocal.py
@@ -22,4 +22,3 @@
vars = threading.local()
vars.task = None
-
diff --git a/vdsm/storage/threadPool.py b/vdsm/storage/threadPool.py
index 7edaeb2..4959ec1 100644
--- a/vdsm/storage/threadPool.py
+++ b/vdsm/storage/threadPool.py
@@ -18,6 +18,7 @@
False = 0
True = not False
+
class ThreadPool:
"""Flexible thread pool class. Creates a pool of threads, then
@@ -55,7 +56,6 @@
self.log.debug("Number of running tasks: %s", self.__runningTasks)
finally:
self.__runningTasksLock.release()
-
def getRunningTasks(self):
return self.__runningTasks
@@ -136,7 +136,7 @@
def stopThread(self):
return self.__tasks.put((None, None, None, None))
- def joinAll(self, waitForTasks = True, waitForThreads = True):
+ def joinAll(self, waitForTasks=True, waitForThreads=True):
""" Clear the task queue and terminate all pooled threads,
optionally allowing the tasks and threads to finish."""
@@ -168,6 +168,7 @@
finally:
self.__resizeLock.release()
+
class WorkerThread(threading.Thread):
""" Pooled thread class. """
@@ -190,9 +191,9 @@
while self.__isDying == False:
try:
id, cmd, args, callback = self.__pool.getNextTask()
- if id is None: #should retry.
+ if id is None: # should retry.
pass
- elif self.__isDying == True: #return the task into the queue, since we
abort.
+ elif self.__isDying == True: # return the task into the queue, since we
abort.
self.__pool.__tasks.put((id, cmd, args, callback))
elif callback is None:
self.__pool.setRunningTask(True)
diff --git a/vdsm/storage/volume.py b/vdsm/storage/volume.py
index a332f53..dcc6c0b 100644
--- a/vdsm/storage/volume.py
+++ b/vdsm/storage/volume.py
@@ -751,17 +751,17 @@
return {
"uuid": self.volUUID,
"type": meta.get(TYPE, ""),
- "format": meta.get(FORMAT, ""),
+ "format": meta.get(FORMAT, ""),
"disktype": meta.get(DISKTYPE, ""),
- "voltype": meta.get(VOLTYPE, ""),
+ "voltype": meta.get(VOLTYPE, ""),
"size": int(meta.get(SIZE, "0")),
- "parent": self.getParent(),
- "description": meta.get(DESCRIPTION, ""),
+ "parent": self.getParent(),
+ "description": meta.get(DESCRIPTION, ""),
"pool": meta.get(sd.DMDK_POOLS, ""),
- "domain": meta.get(DOMAIN, ""),
- "image": self.getImage(),
- "ctime": meta.get(CTIME, ""),
- "mtime": meta.get(MTIME, ""),
+ "domain": meta.get(DOMAIN, ""),
+ "image": self.getImage(),
+ "ctime": meta.get(CTIME, ""),
+ "mtime": meta.get(MTIME, ""),
"legality": meta.get(LEGALITY, ""),
}
@@ -770,18 +770,18 @@
voltype, disktype, desc="", legality=ILLEGAL_VOL):
meta = {
FORMAT: str(format),
- TYPE: str(type),
- VOLTYPE: str(voltype),
- DISKTYPE: str(disktype),
- SIZE: int(size),
- CTIME: int(time.time()),
- sd.DMDK_POOLS: "", # obsolete
+ TYPE: str(type),
+ VOLTYPE: str(voltype),
+ DISKTYPE: str(disktype),
+ SIZE: int(size),
+ CTIME: int(time.time()),
+ sd.DMDK_POOLS: "", # obsolete
DOMAIN: str(sdUUID),
- IMAGE: str(imgUUID),
- DESCRIPTION: str(desc),
- PUUID: str(puuid),
- MTIME: int(time.time()),
- LEGALITY: str(legality),
+ IMAGE: str(imgUUID),
+ DESCRIPTION: str(desc),
+ PUUID: str(puuid),
+ MTIME: int(time.time()),
+ LEGALITY: str(legality),
}
cls.createMetadata(metaId, meta)
--
To view, visit
http://gerrit.ovirt.org/9606
To unsubscribe, visit
http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifbb36be20c77f714246279d1ae9067353bced047
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Dan Kenigsberg <danken(a)redhat.com>