Change in vdsm[master]: storage: Use new concurrent.thread() utility
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: storage: Use new concurrent.thread() utility
......................................................................
storage: Use new concurrent.thread() utility
This patch updates the storage subsystem to use the new utility.
Behavior changes:
- fcscan.Scan threads are protected from silent failures.
- misc.Event notification threads are protected from silent failures,
and are daemonic.
- misc.itmap threads are protected from silent failures and are
daemonic.
- resourceManager.ResourceRef __del__ threads are protected from
silent failures and are daemonic.
- sd.StorageDomain __del__ threads are protected from silent failures
and are daemonic.
- sp.StoragePool upgrade threads are protected from silent failures
and are daemonic.
- storageServer.ConnectionMonitor thread is protected now from silent
failures.
- sync.AsyncCallStub call threads are protected from silent failures and
are daemonic (they were explicitily non-daemonic before).
- task.Task __del__ threads are protected from silent failures and
are daemonic.
- threadPool.WorkerThread threads are protected from silent failures
Notes:
- storage_mailbox.HSM_MailMonitor inherits now from object and keeps a
thread instance.
- threadPool.WorkerThread inherits now from object and keeps an thread
instance.
Change-Id: I83b09cac366417cd22d5d4976d334cf9632a53f5
Relates-To: https://bugzilla.redhat.com/1141422
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/storage/fc-scan
M vdsm/storage/hsm.py
M vdsm/storage/misc.py
M vdsm/storage/monitor.py
M vdsm/storage/resourceManager.py
M vdsm/storage/sd.py
M vdsm/storage/sp.py
M vdsm/storage/storageServer.py
M vdsm/storage/storage_mailbox.py
M vdsm/storage/sync.py
M vdsm/storage/task.py
M vdsm/storage/threadPool.py
12 files changed, 50 insertions(+), 48 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/52/45552/1
diff --git a/vdsm/storage/fc-scan b/vdsm/storage/fc-scan
index f21f93c..119a735 100755
--- a/vdsm/storage/fc-scan
+++ b/vdsm/storage/fc-scan
@@ -39,8 +39,8 @@
import logging
import os
import sys
-import threading
+from vdsm import concurrent
from vdsm import utils
log = logging.getLogger("fc-scan")
@@ -54,8 +54,7 @@
self.thread = None
def start(self):
- self.thread = threading.Thread(target=self.run)
- self.thread.daemon = True
+ self.thread = concurrent.thread(self.run, logger=log.name)
self.thread.start()
def wait(self):
diff --git a/vdsm/storage/hsm.py b/vdsm/storage/hsm.py
index 0806abb..0f8f5fd 100644
--- a/vdsm/storage/hsm.py
+++ b/vdsm/storage/hsm.py
@@ -23,7 +23,6 @@
"""
import os
-import threading
import logging
import glob
from fnmatch import fnmatch
@@ -38,6 +37,7 @@
import numbers
import stat
+from vdsm import concurrent
from vdsm.config import config
import sp
from spbackends import MAX_POOL_DESCRIPTION_SIZE, MAX_DOMAINS
@@ -365,16 +365,15 @@
except Exception:
self.log.warn("Failed to clean Storage Repository.", exc_info=True)
- @utils.traceback(on=self.log.name)
def storageRefresh():
sdCache.refreshStorage()
lvm.bootstrap(refreshlvs=blockSD.SPECIAL_LVS)
self._ready = True
self.log.debug("HSM is ready")
- storageRefreshThread = threading.Thread(target=storageRefresh,
- name="storageRefresh")
- storageRefreshThread.daemon = True
+ storageRefreshThread = concurrent.thread(storageRefresh,
+ name="storageRefresh",
+ logger=self.log.name)
storageRefreshThread.start()
monitorInterval = config.getint('irs', 'sd_health_check_delay')
diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py
index b14fd26..bf75ba9 100644
--- a/vdsm/storage/misc.py
+++ b/vdsm/storage/misc.py
@@ -47,6 +47,7 @@
import weakref
import inspect
+from vdsm import concurrent
from vdsm import constants
from vdsm import utils
import storage_exception as se
@@ -788,8 +789,8 @@
if self._sync:
func(*args, **kwargs)
else:
- threading.Thread(target=func, args=args,
- kwargs=kwargs).start()
+ concurrent.thread(func, args=args,
+ kwargs=kwargs).start()
except:
self._log.warn("Could not run registered method because "
"of an exception", exc_info=True)
@@ -798,8 +799,7 @@
def emit(self, *args, **kwargs):
if len(self._registrar) > 0:
- threading.Thread(target=self._emit, args=args,
- kwargs=kwargs).start()
+ concurrent.thread(self._emit, args=args, kwargs=kwargs).start()
class OperationMutex(object):
@@ -921,7 +921,7 @@
maxthreads += 1
threadsCount -= 1
- t = threading.Thread(target=wrapper, args=(arg,))
+ t = concurrent.thread(wrapper, args=(arg,))
t.start()
threadsCount += 1
maxthreads -= 1
diff --git a/vdsm/storage/monitor.py b/vdsm/storage/monitor.py
index 196fc10..80f3373 100644
--- a/vdsm/storage/monitor.py
+++ b/vdsm/storage/monitor.py
@@ -23,6 +23,7 @@
import time
import weakref
+from vdsm import concurrent
from vdsm import utils
from vdsm.config import config
@@ -163,8 +164,7 @@
log = logging.getLogger('Storage.Monitor')
def __init__(self, domainMonitor, sdUUID, hostId, interval):
- self.thread = threading.Thread(target=self._run)
- self.thread.setDaemon(True)
+ self.thread = concurrent.thread(self._run, logger=self.log.name)
self.domainMonitor = domainMonitor
self.stopEvent = threading.Event()
self.domain = None
@@ -200,7 +200,6 @@
""" Accessed by methods decorated with @util.cancelpoint """
return self.stopEvent.is_set()
- @utils.traceback(on=log.name)
def _run(self):
self.log.debug("Domain monitor for %s started", self.sdUUID)
try:
diff --git a/vdsm/storage/resourceManager.py b/vdsm/storage/resourceManager.py
index b1b0dc7..06228e2 100644
--- a/vdsm/storage/resourceManager.py
+++ b/vdsm/storage/resourceManager.py
@@ -29,6 +29,7 @@
import storage_exception as se
import misc
from logUtils import SimpleLogAdapter
+from vdsm import concurrent
from vdsm import utils
@@ -338,8 +339,8 @@
# deadlock. This is why I need to use a timer. It will defer
# the operation and use a different context.
ResourceManager.getInstance().releaseResource(namespace, name)
- threading.Thread(target=release, args=(self._log, self.namespace,
- self.name)).start()
+ concurrent.thread(release, args=(self._log, self.namespace,
+ self.name)).start()
self._isValid = False
def __repr__(self):
diff --git a/vdsm/storage/sd.py b/vdsm/storage/sd.py
index d4e2b5a..da0f7ae 100644
--- a/vdsm/storage/sd.py
+++ b/vdsm/storage/sd.py
@@ -31,6 +31,7 @@
import resourceFactories
from resourceFactories import IMAGE_NAMESPACE, VOLUME_NAMESPACE
import resourceManager as rm
+from vdsm import concurrent
from vdsm import constants
import clusterlock
import outOfProcess as oop
@@ -400,7 +401,7 @@
def __del__(self):
if self.stat:
- threading.Thread(target=self.stat.stop).start()
+ concurrent.thread(self.stat.stop).start()
@property
def sdUUID(self):
diff --git a/vdsm/storage/sp.py b/vdsm/storage/sp.py
index 4a8b67c..087d4eb 100644
--- a/vdsm/storage/sp.py
+++ b/vdsm/storage/sp.py
@@ -31,6 +31,7 @@
from imageRepository.formatConverter import DefaultFormatConverter
+from vdsm import concurrent
from vdsm import constants, utils
import storage_mailbox
import blockSD
@@ -441,9 +442,10 @@
self._upgradeCallback)
self.log.debug("Running initial domain upgrade threads")
for sdUUID in self._domainsToUpgrade:
- threading.Thread(target=self._upgradeCallback,
- args=(sdUUID, True),
- kwargs={"__securityOverride": True}).start()
+ concurrent.thread(self._upgradeCallback,
+ args=(sdUUID, True),
+ kwargs={"__securityOverride": True},
+ logger=self.log.name).start()
@unsecured
def __createMailboxMonitor(self):
diff --git a/vdsm/storage/storageServer.py b/vdsm/storage/storageServer.py
index 6636bd8..d436d7f 100644
--- a/vdsm/storage/storageServer.py
+++ b/vdsm/storage/storageServer.py
@@ -21,7 +21,7 @@
import logging
from os.path import normpath, basename, splitext
import os
-from threading import RLock, Lock, Event, Thread
+from threading import RLock, Lock, Event
import socket
import glob
from collections import namedtuple
@@ -32,6 +32,7 @@
from vdsm.compat import pickle
from vdsm.config import config
+from vdsm import concurrent
from vdsm import supervdsm
from vdsm import udevadm
@@ -777,8 +778,7 @@
self._conDict[alias] = ConnectionFactory.createConnection(conInfo)
def startMonitoring(self):
- t = Thread(target=self._monitorConnections)
- t.setDaemon(True)
+ t = concurrent.thread(self._monitorConnections, logger=self._log.name)
self._stopEvent.clear()
t.start()
diff --git a/vdsm/storage/storage_mailbox.py b/vdsm/storage/storage_mailbox.py
index e2a6708..b30fbab 100644
--- a/vdsm/storage/storage_mailbox.py
+++ b/vdsm/storage/storage_mailbox.py
@@ -34,8 +34,8 @@
import task
from threadPool import ThreadPool
from storage_exception import InvalidParameterException
+from vdsm import concurrent
from vdsm import constants
-from vdsm import utils
__author__ = "ayalb"
__date__ = "$Mar 9, 2009 5:25:07 PM$"
@@ -228,7 +228,7 @@
"available to flush")
-class HSM_MailMonitor(threading.Thread):
+class HSM_MailMonitor(object):
log = logging.getLogger('Storage.MailBox.HsmMailMonitor')
def __init__(self, inbox, outbox, hostID, queue, monitorInterval):
@@ -267,10 +267,9 @@
self._initMailbox() # Read initial mailbox state
self._msgCounter = 0
self._sendMail() # Clear outgoing mailbox
- threading.Thread.__init__(self)
- self.daemon = True
- self.name = "mailbox.HSMMonitor"
- self.start()
+ self._thread = concurrent.thread(self.run, name="mailbox.HSMMonitor",
+ logger=self.log.name)
+ self._thread.start()
def _initMailbox(self):
# Sync initial incoming mail state with storage view
@@ -426,8 +425,6 @@
MESSAGES_PER_MAILBOX,
repr(self._outgoingMail[start:end])))
- @utils.traceback(on=log.name,
- msg="Unhandled exception in HSM_MailMonitor thread")
def run(self):
try:
failures = 0
@@ -581,9 +578,8 @@
self.log.warning("SPM_MailMonitor couldn't clear outgoing mail, "
"dd failed")
- t = threading.Thread(target=self.run)
- t.daemon = True
- t.name = "mailbox.SPMMonitor"
+ t = concurrent.thread(self.run, name="mailbox.SPMMonitor",
+ logger=self.log.name)
t.start()
self.log.debug('SPM_MailMonitor created for pool %s' % self._poolID)
@@ -782,8 +778,6 @@
finally:
self._outLock.release()
- @utils.traceback(on=log.name,
- msg="Unhandled exception in SPM_MailMonitor thread")
def run(self):
try:
while not self._stop:
diff --git a/vdsm/storage/sync.py b/vdsm/storage/sync.py
index 832c85d..8a44497 100644
--- a/vdsm/storage/sync.py
+++ b/vdsm/storage/sync.py
@@ -1,5 +1,6 @@
-from threading import Thread, Event
+from threading import Event
from functools import wraps
+from vdsm import concurrent
def AsyncCallStub(result):
@@ -42,8 +43,7 @@
self._event.set()
def _call(self):
- t = Thread(target=self._wrapper)
- t.setDaemon(False)
+ t = concurrent.thread(self._wrapper)
t.start()
diff --git a/vdsm/storage/task.py b/vdsm/storage/task.py
index bde3421..c558e0e 100644
--- a/vdsm/storage/task.py
+++ b/vdsm/storage/task.py
@@ -58,6 +58,7 @@
import resourceManager
from threadLocal import vars
from weakref import proxy
+from vdsm import concurrent
from vdsm.config import config
import outOfProcess as oop
from logUtils import SimpleLogAdapter
@@ -513,8 +514,8 @@
if (self.cleanPolicy == TaskCleanType.auto and
self.store is not None):
taskDir = os.path.join(self.store, self.id)
- threading.Thread(target=finalize,
- args=(self.log, self.resOwner, taskDir)).start()
+ concurrent.thread(finalize,
+ args=(self.log, self.resOwner, taskDir)).start()
def _done(self):
self.resOwner.releaseAll()
diff --git a/vdsm/storage/threadPool.py b/vdsm/storage/threadPool.py
index 262012a..594a6b9 100644
--- a/vdsm/storage/threadPool.py
+++ b/vdsm/storage/threadPool.py
@@ -11,6 +11,7 @@
from time import sleep
from Queue import Queue, Empty
import logging
+from vdsm import concurrent
class ThreadPool:
@@ -165,7 +166,7 @@
self.__resizeLock.release()
-class WorkerThread(threading.Thread):
+class WorkerThread(object):
""" Pooled thread class. """
@@ -174,10 +175,15 @@
def __init__(self, pool):
""" Initialize the thread and remember the pool. """
- threading.Thread.__init__(self)
+ self._thread = concurrent.thread(self.run)
self.__pool = pool
self.__isDying = False
- self.daemon = True
+
+ def start(self):
+ self._thread.start()
+
+ def join(self):
+ self._thread.join()
def _processNextTask(self):
id, cmd, args, callback = self.__pool.getNextTask()
@@ -189,14 +195,14 @@
self.__pool.__tasks.put((id, cmd, args, callback))
elif callback is None:
self.__pool.setRunningTask(True)
- self.setName(id)
+ self._thread.name = id
self.log.debug("Task: %s running: %s with: %s" %
(id, repr(cmd), repr(args)))
cmd(args)
self.__pool.setRunningTask(False)
else:
self.__pool.setRunningTask(True)
- self.setName(id)
+ self._thread.name = id
callback(cmd(args))
self.__pool.setRunningTask(False)
except Exception:
--
To view, visit https://gerrit.ovirt.org/45552
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I83b09cac366417cd22d5d4976d334cf9632a53f5
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: virt: Use new concurrent.thread() utility
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: virt: Use new concurrent.thread() utility
......................................................................
virt: Use new concurrent.thread() utility
This patch updates virt subsystem to use the new utility.
Behavior changes:
- vm.Vm creation threads is protected from silent failures
This patch does not modify some threads inheriting from threading.Thread
instead of having a threading.Thread instance (migration.py, vm.py).
Change-Id: Ibc0c22fee4b2fbebaaedeaadc62418f7905037bd
Relates-To: https://bugzilla.redhat.com/1141422
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/v2v.py
M vdsm/virt/vm.py
2 files changed, 5 insertions(+), 5 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/55/45555/1
diff --git a/vdsm/v2v.py b/vdsm/v2v.py
index cdd831b..437fa86 100644
--- a/vdsm/v2v.py
+++ b/vdsm/v2v.py
@@ -36,11 +36,12 @@
import libvirt
+from vdsm import concurrent
from vdsm.constants import P_VDSM_RUN
from vdsm.define import errCode, doneCode
from vdsm import libvirtconnection, response
from vdsm.infra import zombiereaper
-from vdsm.utils import traceback, CommandPath, execCmd, NICENESS, IOCLASS
+from vdsm.utils import CommandPath, execCmd, NICENESS, IOCLASS
import caps
@@ -362,8 +363,7 @@
return obj
def start(self):
- t = threading.Thread(target=self._run_command)
- t.daemon = True
+ t = concurrent.thread(self._run_command)
t.start()
@property
@@ -393,7 +393,6 @@
with password_file(self._id, self._passwd_file, self._password):
self._run()
- @traceback(msg="Error importing vm")
def _run(self):
try:
self._import()
diff --git a/vdsm/virt/vm.py b/vdsm/virt/vm.py
index 90e6652..987313c 100644
--- a/vdsm/virt/vm.py
+++ b/vdsm/virt/vm.py
@@ -34,6 +34,7 @@
import libvirt
# vdsm imports
+from vdsm import concurrent
from vdsm import constants
from vdsm import libvirtconnection
from vdsm import netinfo
@@ -265,7 +266,7 @@
self._confLock = threading.Lock()
self._jobsLock = threading.Lock()
self._statusLock = threading.Lock()
- self._creationThread = threading.Thread(target=self._startUnderlyingVm)
+ self._creationThread = concurrent.thread(self._startUnderlyingVm)
if 'migrationDest' in self.conf:
self._lastStatus = vmstatus.MIGRATION_DESTINATION
elif 'restoreState' in self.conf:
--
To view, visit https://gerrit.ovirt.org/45555
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibc0c22fee4b2fbebaaedeaadc62418f7905037bd
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: concurrent: Use new concurrent.thread() utility
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: concurrent: Use new concurrent.thread() utility
......................................................................
concurrent: Use new concurrent.thread() utility
This patch remove eliminate some boilerplate code by using the new
concurrent.thread() utility for creating threads.
Nice side effect of this refactoring is adding the missing
@utils.traceback() preventing silent failures of this thread.
Change-Id: Id98bcdf7bb6c67583d9d994781e642441c9b8bdd
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/libvirtconnection.py
M lib/vdsm/schedule.py
M lib/vdsm/xmlrpc.py
M vdsm/clientIF.py
M vdsm/rpc/bindingxmlrpc.py
5 files changed, 16 insertions(+), 20 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/95/44895/1
diff --git a/lib/vdsm/libvirtconnection.py b/lib/vdsm/libvirtconnection.py
index f7a3926..15ccf33 100644
--- a/lib/vdsm/libvirtconnection.py
+++ b/lib/vdsm/libvirtconnection.py
@@ -27,6 +27,7 @@
import signal
import libvirt
+from . import concurrent
from . import utils
from .password import ProtectedPassword
from .tool.configurators import passwd
@@ -41,9 +42,8 @@
def start(self):
assert not self.run
- self.__thread = threading.Thread(target=self.__run,
- name="libvirtEventLoop")
- self.__thread.setDaemon(True)
+ self.__thread = concurrent.thread(self.__run, name="libvirtEventLoop",
+ logger=log.name)
self.run = True
self.__thread.start()
@@ -54,7 +54,6 @@
self.__thread.join()
self.__thread = None
- @utils.traceback(on=log.name)
def __run(self):
try:
libvirt.virEventRegisterDefaultImpl()
diff --git a/lib/vdsm/schedule.py b/lib/vdsm/schedule.py
index 05386c7..8407704 100644
--- a/lib/vdsm/schedule.py
+++ b/lib/vdsm/schedule.py
@@ -63,6 +63,7 @@
import threading
import time
+from . import concurrent
from . import utils
@@ -91,8 +92,8 @@
self._cond = threading.Condition(threading.Lock())
self._running = False
self._calls = []
- self._thread = threading.Thread(target=self._run, name=self._name)
- self._thread.daemon = True
+ self._thread = concurrent.thread(self._run, name=self._name,
+ logger=self._log.name)
def start(self):
self._log.debug("Starting scheduler %s", self._name)
@@ -137,7 +138,6 @@
self._cond.notify()
return call
- @utils.traceback(on=_log.name)
def _run(self):
self._log.debug("started")
try:
diff --git a/lib/vdsm/xmlrpc.py b/lib/vdsm/xmlrpc.py
index 66d9415..4327790 100644
--- a/lib/vdsm/xmlrpc.py
+++ b/lib/vdsm/xmlrpc.py
@@ -26,6 +26,7 @@
import sys
import threading
+from . import concurrent
from .config import config
from .executor import TaskQueue
from .utils import traceback
@@ -71,15 +72,14 @@
if sock is self._STOP:
return
self.log.info("Starting request handler for %s:%d", addr[0], addr[1])
- t = threading.Thread(target=self._process_requests, args=(sock, addr))
- t.daemon = True
+ t = concurrent.thread(self._process_requests, args=(sock, addr),
+ logger=self.log.name)
t.start()
def server_close(self):
self.queue.clear()
self.queue.put((self._STOP, self._STOP))
- @traceback(on=log.name)
def _process_requests(self, sock, addr):
self.log.info("Request handler for %s:%d started", addr[0], addr[1])
try:
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index 7b9c490..3834bcb 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -39,6 +39,7 @@
import libvirt
from vdsm import sslutils
from vdsm import libvirtconnection
+from vdsm import concurrent
from vdsm import constants
from vdsm import utils
import caps
@@ -108,8 +109,7 @@
self._netConfigDirty = False
self._prepareMOM()
secret.clear()
- threading.Thread(target=self._recoverThread,
- name='clientIFinit').start()
+ concurrent.thread(self._recoverThread, name='clientIFinit').start()
self.channelListener.settimeout(
config.getint('vars', 'guest_agent_timeout'))
self.channelListener.start()
@@ -290,9 +290,8 @@
def start(self):
for binding in self.bindings.values():
binding.start()
- self.thread = threading.Thread(target=self._reactor.process_requests,
- name='Reactor thread')
- self.thread.setDaemon(True)
+ self.thread = concurrent.thread(self._reactor.process_requests,
+ name='Reactor thread')
self.thread.start()
def _getUUIDSpecPath(self, uuid):
@@ -447,7 +446,6 @@
else:
raise JsonRpcBindingsError()
- @utils.traceback()
def _recoverThread(self):
# Trying to run recover process until it works. During that time vdsm
# stays in recovery mode (_recover=True), means all api requests
diff --git a/vdsm/rpc/bindingxmlrpc.py b/vdsm/rpc/bindingxmlrpc.py
index 04cdf32..a85b7c4 100644
--- a/vdsm/rpc/bindingxmlrpc.py
+++ b/vdsm/rpc/bindingxmlrpc.py
@@ -30,6 +30,7 @@
from vdsm.password import (ProtectedPassword,
protect_passwords,
unprotect_passwords)
+from vdsm import concurrent
from vdsm import utils
from vdsm import xmlrpc
from vdsm.define import doneCode, errCode
@@ -57,7 +58,6 @@
"""
Register xml-rpc functions and serve clients until stopped
"""
- @utils.traceback(on=self.log.name)
def threaded_start():
self.log.info("XMLRPC server running")
self._registerFunctions()
@@ -73,9 +73,8 @@
exc_info=True)
self.log.info("XMLRPC server stopped")
- self._thread = threading.Thread(target=threaded_start,
- name='BindingXMLRPC')
- self._thread.daemon = True
+ self._thread = concurrent.thread(threaded_start, name='BindingXMLRPC',
+ logger=self.log.name)
self._thread.start()
def add_socket(self, connected_socket, socket_address):
--
To view, visit https://gerrit.ovirt.org/44895
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id98bcdf7bb6c67583d9d994781e642441c9b8bdd
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: infra: Use new concurrent.thread() utility
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: infra: Use new concurrent.thread() utility
......................................................................
infra: Use new concurrent.thread() utility
This patch updates various infra stuff to use the new utility.
Behavior changes:
- clientIF recover thread is daemonic.
- clientIF reactor thread is protected from silent failures.
- supervdsmServer server thread is protected from silent failures.
This patch does not update:
- lib/yajsonrpc - it is not clear if we want to make it depend on vdsm
library
- debug plugin - being removed in https://gerrit.ovirt.org/44724
Change-Id: Ib7f76c4e7c6d155e97afce49e5acb55af1692cc3
Relates-To: https://bugzilla.redhat.com/1141422
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/clientIF.py
M vdsm/rpc/bindingxmlrpc.py
M vdsm/supervdsmServer
3 files changed, 9 insertions(+), 13 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/54/45554/1
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index 7fa2d0d..84aaf8f 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -39,6 +39,7 @@
import libvirt
from vdsm import sslutils
from vdsm import libvirtconnection
+from vdsm import concurrent
from vdsm import constants
from vdsm import utils
from vdsm import supervdsm
@@ -108,8 +109,7 @@
self._netConfigDirty = False
self._prepareMOM()
secret.clear()
- threading.Thread(target=self._recoverThread,
- name='clientIFinit').start()
+ concurrent.thread(self._recoverThread, name='clientIFinit').start()
self.channelListener.settimeout(
config.getint('vars', 'guest_agent_timeout'))
self.channelListener.start()
@@ -290,9 +290,8 @@
def start(self):
for binding in self.bindings.values():
binding.start()
- self.thread = threading.Thread(target=self._reactor.process_requests,
- name='Reactor thread')
- self.thread.setDaemon(True)
+ self.thread = concurrent.thread(self._reactor.process_requests,
+ name='Reactor thread')
self.thread.start()
def _getUUIDSpecPath(self, uuid):
@@ -447,7 +446,6 @@
else:
raise JsonRpcBindingsError()
- @utils.traceback()
def _recoverThread(self):
# Trying to run recover process until it works. During that time vdsm
# stays in recovery mode (_recover=True), means all api requests
diff --git a/vdsm/rpc/bindingxmlrpc.py b/vdsm/rpc/bindingxmlrpc.py
index 7f1da4f..ce1b105 100644
--- a/vdsm/rpc/bindingxmlrpc.py
+++ b/vdsm/rpc/bindingxmlrpc.py
@@ -30,6 +30,7 @@
from vdsm.password import (ProtectedPassword,
protect_passwords,
unprotect_passwords)
+from vdsm import concurrent
from vdsm import utils
from vdsm import xmlrpc
from vdsm.define import doneCode, errCode
@@ -57,7 +58,6 @@
"""
Register xml-rpc functions and serve clients until stopped
"""
- @utils.traceback(on=self.log.name)
def threaded_start():
self.log.info("XMLRPC server running")
self._registerFunctions()
@@ -73,9 +73,8 @@
exc_info=True)
self.log.info("XMLRPC server stopped")
- self._thread = threading.Thread(target=threaded_start,
- name='BindingXMLRPC')
- self._thread.daemon = True
+ self._thread = concurrent.thread(threaded_start, name='BindingXMLRPC',
+ logger=self.log.name)
self._thread.start()
def add_socket(self, connected_socket, socket_address):
diff --git a/vdsm/supervdsmServer b/vdsm/supervdsmServer
index 3c752f8..7f7b2b3 100755
--- a/vdsm/supervdsmServer
+++ b/vdsm/supervdsmServer
@@ -24,13 +24,13 @@
import stat
import errno
from functools import wraps
-import threading
import re
import getopt
import resource
import signal
import logging
import logging.config
+from vdsm import concurrent
from vdsm.infra import sigutils
from vdsm.infra import zombiereaper
@@ -504,8 +504,7 @@
manager.register('instance', callable=_SuperVdsm)
server = manager.get_server()
- servThread = threading.Thread(target=server.serve_forever)
- servThread.setDaemon(True)
+ servThread = concurrent.thread(server.serve_forever)
servThread.start()
chown(address, getpwnam(VDSM_USER).pw_uid, METADATA_GROUP)
--
To view, visit https://gerrit.ovirt.org/45554
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib7f76c4e7c6d155e97afce49e5acb55af1692cc3
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: v2v: Use new concurrent.thread() utility
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: v2v: Use new concurrent.thread() utility
......................................................................
v2v: Use new concurrent.thread() utility
This patch updates v2v to use the new utility. No behavior change is
expected, as v2v already used @utils.traceback and daemonic thread.
Change-Id: Iae73ef21caabc88951823ef20055da5aac6f7443
Relates-To: https://bugzilla.redhat.com/1141422
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/v2v.py
1 file changed, 3 insertions(+), 4 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/16/45616/1
diff --git a/vdsm/v2v.py b/vdsm/v2v.py
index cdd831b..437fa86 100644
--- a/vdsm/v2v.py
+++ b/vdsm/v2v.py
@@ -36,11 +36,12 @@
import libvirt
+from vdsm import concurrent
from vdsm.constants import P_VDSM_RUN
from vdsm.define import errCode, doneCode
from vdsm import libvirtconnection, response
from vdsm.infra import zombiereaper
-from vdsm.utils import traceback, CommandPath, execCmd, NICENESS, IOCLASS
+from vdsm.utils import CommandPath, execCmd, NICENESS, IOCLASS
import caps
@@ -362,8 +363,7 @@
return obj
def start(self):
- t = threading.Thread(target=self._run_command)
- t.daemon = True
+ t = concurrent.thread(self._run_command)
t.start()
@property
@@ -393,7 +393,6 @@
with password_file(self._id, self._passwd_file, self._password):
self._run()
- @traceback(msg="Error importing vm")
def _run(self):
try:
self._import()
--
To view, visit https://gerrit.ovirt.org/45616
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iae73ef21caabc88951823ef20055da5aac6f7443
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: hsm: Reformat device info dict
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: hsm: Reformat device info dict
......................................................................
hsm: Reformat device info dict
Reformat device info dict in getDeviceList using one item per line and
sorted. This make it easier to search and modify the code and creates
nicer diffs.
Change-Id: Ic285575545ccdbe449d1c6b95f5874f8e15711ce
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/storage/hsm.py
1 file changed, 16 insertions(+), 12 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/45/45845/1
diff --git a/vdsm/storage/hsm.py b/vdsm/storage/hsm.py
index 1b8c064..d3405aa 100644
--- a/vdsm/storage/hsm.py
+++ b/vdsm/storage/hsm.py
@@ -2024,18 +2024,22 @@
pvsize = ""
vguuid = ""
- devInfo = {'GUID': dev.get("guid", ""), 'pvUUID': pvuuid,
- 'pvsize': str(pvsize),
- 'vgUUID': vguuid, 'vendorID': dev.get("vendor", ""),
- 'productID': dev.get("product", ""),
- 'fwrev': dev.get("fwrev", ""),
- "serial": dev.get("serial", ""),
- 'capacity': dev.get("capacity", "0"),
- 'devtype': dev.get("devtype", ""),
- 'pathstatus': dev.get("paths", []),
- 'pathlist': dev.get("connections", []),
- 'logicalblocksize': dev.get("logicalblocksize", ""),
- 'physicalblocksize': dev.get("physicalblocksize", "")}
+ devInfo = {
+ "serial": dev.get("serial", ""),
+ 'GUID': dev.get("guid", ""),
+ 'capacity': dev.get("capacity", "0"),
+ 'devtype': dev.get("devtype", ""),
+ 'fwrev': dev.get("fwrev", ""),
+ 'logicalblocksize': dev.get("logicalblocksize", ""),
+ 'pathlist': dev.get("connections", []),
+ 'pathstatus': dev.get("paths", []),
+ 'physicalblocksize': dev.get("physicalblocksize", ""),
+ 'productID': dev.get("product", ""),
+ 'pvUUID': pvuuid,
+ 'pvsize': str(pvsize),
+ 'vendorID': dev.get("vendor", ""),
+ 'vgUUID': vguuid,
+ }
if not checkStatus:
devInfo["status"] = "unknown"
devices.append(devInfo)
--
To view, visit https://gerrit.ovirt.org/45845
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic285575545ccdbe449d1c6b95f5874f8e15711ce
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: block: add blkdiscard on zero image
by Federico Simoncelli
Federico Simoncelli has uploaded a new change for review.
Change subject: block: add blkdiscard on zero image
......................................................................
block: add blkdiscard on zero image
Change-Id: I4e059c556c550440727b36b8af8e5dfc29ce2ccb
Signed-off-by: Federico Simoncelli <fsimonce(a)redhat.com>
---
M vdsm/storage/blockSD.py
1 file changed, 2 insertions(+), 0 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/31/35631/1
diff --git a/vdsm/storage/blockSD.py b/vdsm/storage/blockSD.py
index daaa94f..a2370b8 100644
--- a/vdsm/storage/blockSD.py
+++ b/vdsm/storage/blockSD.py
@@ -33,6 +33,7 @@
from vdsm.config import config
from vdsm import constants
from vdsm import utils
+from vdsm import utillinux
import misc
import fileUtils
import sd
@@ -219,6 +220,7 @@
try:
misc.ddWatchCopy("/dev/zero", path, aborting, size)
+ utillinux.blkdiscard(path)
except Exception as e:
log.exception('zeroing operation failed')
raise se.VolumesZeroingError(path)
--
To view, visit http://gerrit.ovirt.org/35631
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e059c556c550440727b36b8af8e5dfc29ce2ccb
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Federico Simoncelli <fsimonce(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: rwlock: Replace misc.RWLock
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: rwlock: Replace misc.RWLock
......................................................................
rwlock: Replace misc.RWLock
This patch removes misc.RWLock and replace it with simpler
rwlock.RWLock.
The new lock does not support recursive locking or lock demotion, but I
think they are not used in by current code - not tested yet.
Change-Id: I9ae6064e8e031339303e64606a70673807c4083a
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M vdsm/storage/misc.py
M vdsm/storage/resourceManager.py
2 files changed, 4 insertions(+), 112 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/23/43423/1
diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py
index b14fd26..e45fad8 100644
--- a/vdsm/storage/misc.py
+++ b/vdsm/storage/misc.py
@@ -530,115 +530,6 @@
return 0
-class RWLock(object):
- """
- A simple ReadWriteLock implementation.
-
- The lock must be released by the thread that acquired it. Once a thread
- has acquired a lock, the same thread may acquire it again without blocking;
- the thread must release it once for each time it has acquired it. Note that
- lock promotion (acquiring an exclusive lock under a shared lock is
- forbidden and will raise an exception.
-
- The lock puts all requests in a queue. The request is granted when The
- previous one is released.
-
- Each request is represented by a :class:`threading.Event` object. When the
- Event is set the request is granted. This enables multiple callers to wait
- for a request thus implementing a shared lock.
- """
- class _contextLock(object):
- def __init__(self, owner, exclusive):
- self._owner = owner
- self._exclusive = exclusive
-
- def __enter__(self):
- self._owner.acquire(self._exclusive)
-
- def __exit__(self, exc_type, exc_value, traceback):
- self._owner.release()
-
- def __init__(self):
- self._syncRoot = threading.Lock()
- self._queue = Queue.Queue()
- self._currentSharedLock = None
- self._currentState = None
- self._holdingThreads = {}
-
- self.shared = self._contextLock(self, False)
- self.exclusive = self._contextLock(self, True)
-
- def acquireRead(self):
- return self.acquire(False)
-
- def acquireWrite(self):
- return self.acquire(True)
-
- def acquire(self, exclusive):
- currentEvent = None
- currentThread = threading.currentThread()
-
- # Handle reacquiring lock in the same thread
- if currentThread in self._holdingThreads:
- if self._currentState is False and exclusive:
- raise RuntimeError("Lock promotion is forbidden.")
-
- self._holdingThreads[currentThread] += 1
- return
-
- with self._syncRoot:
- # Handle regular acquisition
- if exclusive:
- currentEvent = threading.Event()
- self._currentSharedLock = None
- else:
- if self._currentSharedLock is None:
- self._currentSharedLock = threading.Event()
-
- currentEvent = self._currentSharedLock
-
- try:
- self._queue.put_nowait((currentEvent, exclusive))
- except Queue.Full:
- raise RuntimeError("There are too many objects waiting for "
- "this lock")
-
- if self._queue.unfinished_tasks == 1:
- # Bootstrap the process if needed. A lock is released the when
- # the next request is granted. When there is no one to grant
- # the request you have to grant it yourself.
- event, self._currentState = self._queue.get_nowait()
- event.set()
-
- currentEvent.wait()
-
- self._holdingThreads[currentThread] = 0
-
- def release(self):
- currentThread = threading.currentThread()
-
- if currentThread not in self._holdingThreads:
- raise RuntimeError("Releasing an lock without acquiring it first")
-
- # If in nested lock don't really release
- if self._holdingThreads[currentThread] > 0:
- self._holdingThreads[currentThread] -= 1
- return
-
- del self._holdingThreads[currentThread]
-
- with self._syncRoot:
- self._queue.task_done()
-
- if self._queue.empty():
- self._currentState = None
- return
-
- nextRequest, self._currentState = self._queue.get_nowait()
-
- nextRequest.set()
-
-
class DynamicBarrier(object):
def __init__(self):
self._cond = threading.Condition()
diff --git a/vdsm/storage/resourceManager.py b/vdsm/storage/resourceManager.py
index b1b0dc7..a7e67a2 100644
--- a/vdsm/storage/resourceManager.py
+++ b/vdsm/storage/resourceManager.py
@@ -29,6 +29,7 @@
import storage_exception as se
import misc
from logUtils import SimpleLogAdapter
+from vdsm import rwlock
from vdsm import utils
@@ -287,7 +288,7 @@
self.autoRelease = True
self._isValid = True
- self._syncRoot = misc.RWLock()
+ self._syncRoot = rwlock.RWLock()
def __wrapObj(self):
for attr in dir(self.__wrappedObject):
@@ -380,11 +381,11 @@
"""
def __init__(self, factory):
self.resources = {}
- self.lock = threading.Lock() # misc.RWLock()
+ self.lock = threading.Lock() # rwlock.RWLock()
self.factory = factory
def __init__(self):
- self._syncRoot = misc.RWLock()
+ self._syncRoot = rwlock.RWLock()
self._namespaces = {}
@classmethod
--
To view, visit https://gerrit.ovirt.org/43423
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9ae6064e8e031339303e64606a70673807c4083a
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: rwlock: Support non-blocking acquire
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: rwlock: Support non-blocking acquire
......................................................................
rwlock: Support non-blocking acquire
This patch adds non-blocking acquire suggested in
https://gerrit.ovirt.org/42773. This is a simpler alternative to timed
acquire, suggested in https://gerrit.ovirt.org/42909.
Change-Id: Iec721a07087349050bfe9aa11aacf3be9695fb85
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/rwlock.py
M tests/rwlock_test.py
2 files changed, 30 insertions(+), 48 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/28/42928/1
diff --git a/lib/vdsm/rwlock.py b/lib/vdsm/rwlock.py
index c6ce1ea..2281851 100644
--- a/lib/vdsm/rwlock.py
+++ b/lib/vdsm/rwlock.py
@@ -32,17 +32,23 @@
self._readers = set()
self._writer = None
- def acquireWrite(self):
+ def acquireWrite(self, block=True):
with self._lock:
if self._writer or self._readers or self._waiters:
+ if not block:
+ return False
self._wait(True)
self._writer = threading.current_thread()
+ return True
- def acquireRead(self):
+ def acquireRead(self, block=True):
with self._lock:
if self._writer or self._waiters:
+ if not block:
+ return False
self._wait(False)
self._readers.add(threading.current_thread())
+ return True
def release(self):
me = threading.current_thread()
diff --git a/tests/rwlock_test.py b/tests/rwlock_test.py
index 1c28f26..7ef5aa6 100644
--- a/tests/rwlock_test.py
+++ b/tests/rwlock_test.py
@@ -121,54 +121,30 @@
for t in threads:
t.stop()
- @slowtest
- def test_shared_context_blocks_writer(self):
- lock = RWLock()
- writer = RWThread(lock.exclusive)
- try:
- with lock.shared:
- writer.start()
- if not writer.ready.wait(2):
- raise RuntimeError("Timeout waiting for writer thread")
- # Writer must block
- self.assertFalse(writer.acquired.wait(1))
- finally:
- writer.stop()
-
- def test_shared_context_allows_reader(self):
- lock = RWLock()
- with lock.shared:
- reader = RWThread(lock.shared)
- with utils.running(reader):
- self.assertTrue(reader.acquired.wait(1))
-
- @slowtest
- def test_exclusive_context_blocks_writer(self):
- lock = RWLock()
- writer = RWThread(lock.exclusive)
- try:
- with lock.exclusive:
- writer.start()
- if not writer.ready.wait(2):
- raise RuntimeError("Timeout waiting for writer thread")
- # Reader must block
- self.assertFalse(writer.acquired.wait(1))
- finally:
- writer.stop()
-
- @slowtest
- def test_exclusive_context_blocks_reader(self):
+ def test_reader_blocks_writer(self):
lock = RWLock()
reader = RWThread(lock.shared)
- try:
- with lock.exclusive:
- reader.start()
- if not reader.ready.wait(2):
- raise RuntimeError("Timeout waiting for reader thread")
- # Reader must block
- self.assertFalse(reader.acquired.wait(1))
- finally:
- reader.stop()
+ with utils.running(reader):
+ if not reader.acquired.wait(2):
+ raise RuntimeError("Timeout waiting for reader thread")
+ self.assertFalse(lock.acquireWrite(block=False))
+
+ def test_writer_blocks_writer(self):
+ lock = RWLock()
+ writer = RWThread(lock.exclusive)
+ with utils.running(writer):
+ if not writer.acquired.wait(2):
+ raise RuntimeError("Timeout waiting for writer thread")
+ self.assertFalse(lock.acquireWrite(block=False))
+
+
+ def test_writer_blocks_reader(self):
+ lock = RWLock()
+ writer = RWThread(lock.exclusive)
+ with utils.running(writer):
+ if not writer.acquired.wait(2):
+ raise RuntimeError("Timeout waiting for writer thread")
+ self.assertFalse(lock.acquireRead(block=False))
@expandPermutations
--
To view, visit https://gerrit.ovirt.org/42928
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iec721a07087349050bfe9aa11aacf3be9695fb85
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months
Change in vdsm[master]: rwlock: Support timed acquire
by Nir Soffer
Nir Soffer has uploaded a new change for review.
Change subject: rwlock: Support timed acquire
......................................................................
rwlock: Support timed acquire
Add acquire timeout - with timeout=0, can be used to implement
non-blocking acquire. With bigger timeout, make it easy to fail a
request after a timeout, instead of leaving a blocked thread forever.
Change-Id: Idd9b11452d74b566b8989f41244f2d0534327214
Signed-off-by: Nir Soffer <nsoffer(a)redhat.com>
---
M lib/vdsm/rwlock.py
M tests/rwlock_test.py
2 files changed, 65 insertions(+), 21 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/09/42909/1
diff --git a/lib/vdsm/rwlock.py b/lib/vdsm/rwlock.py
index 900d91d..72f962c 100644
--- a/lib/vdsm/rwlock.py
+++ b/lib/vdsm/rwlock.py
@@ -22,6 +22,10 @@
import threading
+class Timeout(Exception):
+ pass
+
+
class RWLock(object):
def __init__(self):
@@ -30,16 +34,16 @@
self._readers = set()
self._writer = None
- def acquireWrite(self):
+ def acquireWrite(self, timeout=None):
with self._lock:
if self._writer or self._readers or self._waiters:
- self._wait(True)
+ self._wait(True, timeout)
self._writer = threading.current_thread()
- def acquireRead(self):
+ def acquireRead(self, timeout=None):
with self._lock:
if self._writer or self._waiters:
- self._wait(False)
+ self._wait(False, timeout)
self._readers.add(threading.current_thread())
def release(self):
@@ -60,13 +64,13 @@
if self._waiters:
self._wakeup_waiter()
- def _wait(self, wants_write):
+ def _wait(self, wants_write, timeout):
waiter = Waiter(wants_write)
self._waiters.append(waiter)
try:
self._lock.release()
try:
- waiter.wait()
+ waiter.wait(timeout)
finally:
self._lock.acquire()
finally:
@@ -84,8 +88,10 @@
self.wants_write = wants_write
self._event = threading.Event()
- def wait(self):
- self._event.wait()
+ def wait(self, timeout):
+ if not self._event.wait(timeout):
+ raise Timeout("Timeout acquiring lock for %s" %
+ "writing" if self.wants_write else "reading")
def wakeup(self):
self._event.set()
diff --git a/tests/rwlock_test.py b/tests/rwlock_test.py
index e67e27a..5ddf066 100644
--- a/tests/rwlock_test.py
+++ b/tests/rwlock_test.py
@@ -25,10 +25,40 @@
from testlib import VdsmTestCase
from testValidation import slowtest, stresstest
-from vdsm.rwlock import RWLock
+from vdsm.rwlock import RWLock, Timeout
class RWLockTests(VdsmTestCase):
+
+ def test_writer_blocks_other_writer(self):
+ lock = RWLock()
+ lock.acquireWrite()
+ try:
+ log = []
+ start_thread(writer, lock, log, timeout=0).join()
+ self.assertEqual(log, ["writer timeout"])
+ finally:
+ lock.release()
+
+ def test_writer_blocks_other_reader(self):
+ lock = RWLock()
+ lock.acquireWrite()
+ try:
+ log = []
+ start_thread(reader, lock, log, timeout=0).join()
+ self.assertEqual(log, ["reader timeout"])
+ finally:
+ lock.release()
+
+ def test_reader_blocks_other_writer(self):
+ lock = RWLock()
+ lock.acquireRead()
+ try:
+ log = []
+ start_thread(writer, lock, log, timeout=0).join()
+ self.assertEqual(log, ["writer timeout"])
+ finally:
+ lock.release()
@slowtest
def test_concurrent_readers(self):
@@ -194,22 +224,30 @@
self.assertEqual(log, ["reader acquired"])
-def writer(lock, log, hold=0):
- lock.acquireWrite()
+def writer(lock, log, timeout=1, hold=0):
try:
- log.append("writer acquired")
- time.sleep(hold)
- finally:
- lock.release()
+ lock.acquireWrite(timeout)
+ except Timeout:
+ log.append("writer timeout")
+ else:
+ try:
+ log.append("writer acquired")
+ time.sleep(hold)
+ finally:
+ lock.release()
-def reader(lock, log, hold=0):
- lock.acquireRead()
+def reader(lock, log, timeout=1, hold=0):
try:
- log.append("reader acquired")
- time.sleep(hold)
- finally:
- lock.release()
+ lock.acquireRead(timeout)
+ except Timeout:
+ log.append("reader timeout")
+ else:
+ try:
+ log.append("reader acquired")
+ time.sleep(hold)
+ finally:
+ lock.release()
def start_thread(func, *args, **kwargs):
--
To view, visit https://gerrit.ovirt.org/42909
To unsubscribe, visit https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Idd9b11452d74b566b8989f41244f2d0534327214
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <nsoffer(a)redhat.com>
8 years, 6 months