Author: tmckay Date: 2012-10-17 19:07:06 +0000 (Wed, 17 Oct 2012) New Revision: 5512
Modified: trunk/cumin/python/cumin/main.py trunk/cumin/python/cumin/model.py trunk/parsley/python/parsley/threadingex.py trunk/wooly/python/wooly/server.py Log: Improve shutdown handling for some threads, including the webserver BZ800065 (modified)
Modified: trunk/cumin/python/cumin/main.py =================================================================== --- trunk/cumin/python/cumin/main.py 2012-10-17 18:47:13 UTC (rev 5511) +++ trunk/cumin/python/cumin/main.py 2012-10-17 19:07:06 UTC (rev 5512) @@ -289,6 +289,12 @@ if self.wallaby is not None: self.wallaby.stop(wait=True)
+ try: + log.info("Stopping update threads") + self.model.stop_updates() + except: + log.debug("Exception stopping update threads") + self.server.stop() try: self.session.stop()
Modified: trunk/cumin/python/cumin/model.py =================================================================== --- trunk/cumin/python/cumin/model.py 2012-10-17 18:47:13 UTC (rev 5511) +++ trunk/cumin/python/cumin/model.py 2012-10-17 19:07:06 UTC (rev 5512) @@ -1,5 +1,7 @@ import logging import os +import copy + from threading import Lock, Thread from datetime import datetime, timedelta from time import sleep, mktime @@ -14,6 +16,7 @@ from rosemary.sqlquery import SqlQueryOptions from rosemary.model import RosemaryModel from sage.util import SyncSet +from parsley.threadingex import CuminBlockingDaemonThread
log = logging.getLogger("cumin.model")
@@ -33,10 +36,37 @@ self.static_group_config_values_by_negotiator = dict() self.dynamic_group_config_values_by_negotiator = dict()
+ # Disables the creation of further ObjectStore objects + # during shutdown when we are trying to terminate the updates + self.no_more_updates = False + self.lock = Lock()
self.job_meta_data = JobMetaData("job")
+ def stop_updates(self): + + def stop(storedict): + for k, v in storedict.iteritems(): + v.stop_updates() + + self.lock.acquire() + self.no_more_updates = True + + # do shallow copy of all the dictionaries + # that hold ObjectStore types + stores = list() + stores.append(copy.copy(self.limits_by_negotiator)) + stores.append(copy.copy(self.job_summaries_by_submission)) + stores.append(copy.copy(self.group_names_by_negotiator)) + stores.append(copy.copy(self.static_group_config_values_by_negotiator)) + stores.append(copy.copy(self.dynamic_group_config_values_by_negotiator)) + + self.lock.release() + + for storedict in stores: + stop(storedict) + def check(self): log.info("Checking %s", self)
@@ -84,17 +114,18 @@ return store
self.lock.acquire() - try: try: - store = self.group_names_by_negotiator[negotiator._qmf_agent_id] - store.extend_updates() + if self.no_more_updates: + store = NegotiatorGroupNamesStore(self, None) + store.exception = Exception("Shutting down") + else: + store = self.group_names_by_negotiator[negotiator._qmf_agent_id] + store.extend_updates() except KeyError: store = NegotiatorGroupNamesStore(self, negotiator) + self.group_names_by_negotiator[negotiator._qmf_agent_id] = store store.start_updates() - - self.group_names_by_negotiator[negotiator._qmf_agent_id] = store - return store finally: self.lock.release() @@ -111,7 +142,12 @@ dynamic_store = None static_store = None try: - if config == "GROUP_QUOTA_DYNAMIC": + if self.no_more_updates: + dynamic_store = NegotiatorDynamicGroupConfigValuesStore(self, None) + dynamic_store.exception = Exception("Shutting down") + static_store = NegotiatorStaticGroupConfigValuesStore(self, None) + static_store.exception = Exception("Shutting down") + elif config == "GROUP_QUOTA_DYNAMIC": try: dynamic_store = self.dynamic_group_config_values_by_negotiator[negotiator._qmf_agent_id] dynamic_store.extend_updates() @@ -124,12 +160,11 @@ dynamic_store = NegotiatorDynamicGroupConfigValuesStore(self, negotiator, needed_groups, config) for group in needed_groups: dynamic_store.add_group_config(group, config) + self.dynamic_group_config_values_by_negotiator[negotiator._qmf_agent_id] = dynamic_store dynamic_store.start_updates() - self.dynamic_group_config_values_by_negotiator[negotiator._qmf_agent_id] = dynamic_store if(negotiator._qmf_agent_id in self.static_group_config_values_by_negotiator.keys()): static_store = self.static_group_config_values_by_negotiator[negotiator._qmf_agent_id] else: - try: static_store = self.static_group_config_values_by_negotiator[negotiator._qmf_agent_id] static_store.extend_updates() @@ -140,11 +175,10 @@ static_store.update_new(None) except KeyError: static_store = NegotiatorStaticGroupConfigValuesStore(self, negotiator, needed_groups, config) + self.static_group_config_values_by_negotiator[negotiator._qmf_agent_id] = static_store static_store.start_updates() - self.static_group_config_values_by_negotiator[negotiator._qmf_agent_id] = static_store if(negotiator._qmf_agent_id in self.dynamic_group_config_values_by_negotiator.keys()): dynamic_store = self.dynamic_group_config_values_by_negotiator[negotiator._qmf_agent_id] - return (dynamic_store, static_store) finally: self.lock.release() @@ -154,10 +188,10 @@ return
self.lock.acquire() - try: - store = self.dynamic_group_config_values_by_negotiator[negotiator._qmf_agent_id] - store.update(None) + if not self.no_more_updates: + store = self.dynamic_group_config_values_by_negotiator[negotiator._qmf_agent_id] + store.update(None) finally: self.lock.release()
@@ -168,17 +202,18 @@ return store
self.lock.acquire() - try: try: - store = self.limits_by_negotiator[negotiator._qmf_agent_id] - store.extend_updates() + if self.no_more_updates: + store = NegotiatorLimitStore(self, None) + store.exception = Exception("Shutting down") + else: + store = self.limits_by_negotiator[negotiator._qmf_agent_id] + store.extend_updates() except KeyError: store = NegotiatorLimitStore(self, negotiator) + self.limits_by_negotiator[negotiator._qmf_agent_id] = store store.start_updates() - - self.limits_by_negotiator[negotiator._qmf_agent_id] = store - return store finally: self.lock.release() @@ -190,17 +225,18 @@ return store
self.lock.acquire() - try: try: - store = self.job_summaries_by_submission[submission._id] - store.extend_updates() + if self.no_more_updates: + store = SubmissionJobSummaryStore(self, None) + store.exception = Exception("Shutting down") + else: + store = self.job_summaries_by_submission[submission._id] + store.extend_updates() except KeyError: store = SubmissionJobSummaryStore(self, submission, machine_name) + self.job_summaries_by_submission[submission._id] = store store.start_updates() - - self.job_summaries_by_submission[submission._id] = store - return store finally: self.lock.release() @@ -718,6 +754,9 @@ def start_updates(self): self.update_thread.start()
+ def stop_updates(self, timeout=5): + self.update_thread.stop(timeout) + def extend_updates(self): self.update_thread.extend()
@@ -727,12 +766,11 @@ def delete(self): self.model = None
- class UpdateThread(Thread): + class UpdateThread(CuminBlockingDaemonThread): def __init__(self, store): - Thread.__init__(self) + super(ObjectStore.UpdateThread, self).__init__()
self.store = store - self.setDaemon(True) self.ticks = 0
def extend(self): @@ -754,7 +792,14 @@ self.store.exception = e
self.ticks += 1 - sleep(30) + try: + self._condition.acquire() + if not self.stop_requested: + self._condition.wait(30) + if self.stop_requested: + break + finally: + self._condition.release() finally: conn.close()
Modified: trunk/parsley/python/parsley/threadingex.py =================================================================== --- trunk/parsley/python/parsley/threadingex.py 2012-10-17 18:47:13 UTC (rev 5511) +++ trunk/parsley/python/parsley/threadingex.py 2012-10-17 19:07:06 UTC (rev 5512) @@ -22,3 +22,40 @@
writer.write(row % (cls, name, ident, alive, daemon, extra)) writer.write(os.linesep) + +class CuminDaemonThread(Thread): + def __init__(self): + super(CuminDaemonThread, self).__init__() + + self.name = self.__class__.__name__ + self.stop_requested = False + + self.setDaemon(True) + + def init(self): + pass + + def stop(self, timeout=5): + if self.stop_requested: + return + self.stop_requested = True + if self.isAlive(): + self.join(timeout) + +class CuminBlockingDaemonThread(CuminDaemonThread): + def __init__(self): + super(CuminBlockingDaemonThread, self).__init__() + + self._lock = Lock() + self._condition = Condition(self._lock) + + def stop(self, timeout=5): + try: + self._condition.acquire() + self.stop_requested = True + self._condition.notify() + finally: + self._condition.release() + + if self.isAlive(): + self.join(timeout)
Modified: trunk/wooly/python/wooly/server.py =================================================================== --- trunk/wooly/python/wooly/server.py 2012-10-17 18:47:13 UTC (rev 5511) +++ trunk/wooly/python/wooly/server.py 2012-10-17 19:07:06 UTC (rev 5512) @@ -3,6 +3,7 @@ from util import * from wooly import * from wsgiserver import CherryPyWSGIServer +from parsley.threadingex import CuminBlockingDaemonThread
log = logging.getLogger("wooly.server")
@@ -22,6 +23,7 @@
self.client_sessions_by_id = dict() self.client_session_expire_thread = ClientSessionExpireThread(self) + self.stop_requested = False
def server_alive(self): return self.dispatch_thread.isAlive() @@ -53,7 +55,8 @@
def stop(self): log.info("Stopping %s", self) - + self.stop_requested = True + self.client_session_expire_thread.stop() self.dispatch_thread.stop()
def get_page(self, env): @@ -75,11 +78,11 @@ return then
def service_request(self, env, response): - log.info("Request %s %s", env["REQUEST_METHOD"], env["REQUEST_URI"]) - + msg = "Request %s %s" % (env["REQUEST_METHOD"], env["REQUEST_URI"]) + log.info(msg) page = self.get_page(env)
- if page: + if page and not self.stop_requested: status, headers, content = self.service_page_request(page, env) else: status = "404 Not Found" @@ -247,7 +250,7 @@
self.server = server self.name = self.__class__.__name__ - + self.stopped = False self.setDaemon(True)
self.wsgi_server = CherryPyWSGIServer \ @@ -255,7 +258,7 @@ self.server.service_request, request_queue_size=32, numthreads=32, - max=32) + max=32, shutdown_timeout=0.5)
if self.server.server_cert and self.server.server_key: ssl_adapter = None @@ -287,6 +290,9 @@
self.wsgi_server.environ["wsgi.version"] = (1, 1)
+ class MyShutdownException(Exception): + pass + def run(self): try: from ssl import SSLError @@ -306,15 +312,26 @@ except (SSLError, SSL.Error), e: log.error("Web server is shutting down from an unhandled SSLError" \ " (problem with the key or certificate?): %s" % e) - self.wsgi_server.stop() + self.stop()
+ except WebServerDispatchThread.MyShutdownException: + pass + except Exception, e: log.error("Web server is shutting down from an unhandled" \ " exception: %s" % e) - self.wsgi_server.stop() + self.stop()
def stop(self): - self.wsgi_server.stop() + # The CherryPyWSGIServer will continue to accept connection + # requests and spawn worker threads if _set_interrupt() is not + # called. The _set_interrupt() method will stop the acceptance + # of connection requests and then call stop(). Just calling + # stop() here isn't sufficient because stop() may never complete + # if connections requests continue to come in. + if not self.stopped: + self.stopped = True + self.wsgi_server._set_interrupt(WebServerDispatchThread.MyShutdownException)
class ClientSession(object): def __init__(self): @@ -340,19 +357,23 @@ args = (self.__class__.__name__, self.id, self.created) return "%s(%s,%s)" % args
-class ClientSessionExpireThread(Thread): +class ClientSessionExpireThread(CuminBlockingDaemonThread): def __init__(self, server): super(ClientSessionExpireThread, self).__init__()
self.server = server - self.name = self.__class__.__name__
- self.setDaemon(True) - def run(self): while True: self.expire_sessions() - time.sleep(60) + try: + self._condition.acquire() + if not self.stop_requested: + self._condition.wait(60) + if self.stop_requested: + break + finally: + self._condition.release()
def expire_sessions(self): when = datetime.now() - timedelta(hours=1) @@ -365,6 +386,4 @@ count += 1 except KeyError: pass - - log.info("Expired %i client sessions", count)
cumin-developers@lists.fedorahosted.org