[openstack-heat/f19] Add patches to update RPC and avoid qpid exchange leaks

Jeff Peeler jpeeler at fedoraproject.org
Tue Jun 25 16:19:39 UTC 2013


commit e7c814b137918cd0a94ed9cc8e9d7a57315823e9
Author: Jeff Peeler <jpeeler at redhat.com>
Date:   Tue Jun 25 12:17:10 2013 -0400

    Add patches to update RPC and avoid qpid exchange leaks
    
    Fixes rhbz#971572

 0002-Update-RPC-to-Oslo-incubator-2013.1.patch     | 1589 ++++++++++++++++++++
 ...oid-code-path-causing-qpid-exchange-leaks.patch |   26 +
 openstack-heat.spec                                |   10 +-
 3 files changed, 1624 insertions(+), 1 deletions(-)
---
diff --git a/0002-Update-RPC-to-Oslo-incubator-2013.1.patch b/0002-Update-RPC-to-Oslo-incubator-2013.1.patch
new file mode 100644
index 0000000..3fa0f4c
--- /dev/null
+++ b/0002-Update-RPC-to-Oslo-incubator-2013.1.patch
@@ -0,0 +1,1589 @@
+From 8dc18475174224ce312b86628d918daaa9265832 Mon Sep 17 00:00:00 2001
+From: Jeff Peeler <jpeeler at redhat.com>
+Date: Tue, 25 Jun 2013 11:22:26 -0400
+Subject: [PATCH] Update RPC to Oslo incubator 2013.1
+
+(removed bin/heat-rpc-zmq-receiver)
+---
+ heat/openstack/common/rpc/__init__.py   |  60 +++++--
+ heat/openstack/common/rpc/amqp.py       | 277 +++++++++++++++++++++++++++++---
+ heat/openstack/common/rpc/common.py     |  34 +++-
+ heat/openstack/common/rpc/impl_fake.py  |   2 +-
+ heat/openstack/common/rpc/impl_kombu.py |  37 ++++-
+ heat/openstack/common/rpc/impl_qpid.py  |  73 +++++++--
+ heat/openstack/common/rpc/impl_zmq.py   | 185 ++++++++++++++-------
+ heat/openstack/common/rpc/matchmaker.py | 186 +++++++++++++++++++--
+ 8 files changed, 728 insertions(+), 126 deletions(-)
+
+diff --git a/heat/openstack/common/rpc/__init__.py b/heat/openstack/common/rpc/__init__.py
+index 4158e50..9fd5910 100644
+--- a/heat/openstack/common/rpc/__init__.py
++++ b/heat/openstack/common/rpc/__init__.py
+@@ -25,9 +25,17 @@ For some wrappers that add message versioning to rpc, see:
+     rpc.proxy
+ """
+ 
++import inspect
++import logging
++
+ from oslo.config import cfg
+ 
++from heat.openstack.common.gettextutils import _
+ from heat.openstack.common import importutils
++from heat.openstack.common import local
++
++
++LOG = logging.getLogger(__name__)
+ 
+ 
+ rpc_opts = [
+@@ -63,7 +71,8 @@ rpc_opts = [
+                help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ ]
+ 
+-cfg.CONF.register_opts(rpc_opts)
++CONF = cfg.CONF
++CONF.register_opts(rpc_opts)
+ 
+ 
+ def set_defaults(control_exchange):
+@@ -84,10 +93,27 @@ def create_connection(new=True):
+ 
+     :returns: An instance of openstack.common.rpc.common.Connection
+     """
+-    return _get_impl().create_connection(cfg.CONF, new=new)
++    return _get_impl().create_connection(CONF, new=new)
++
++
++def _check_for_lock():
++    if not CONF.debug:
++        return None
++
++    if ((hasattr(local.strong_store, 'locks_held')
++         and local.strong_store.locks_held)):
++        stack = ' :: '.join([frame[3] for frame in inspect.stack()])
++        LOG.warn(_('A RPC is being made while holding a lock. The locks '
++                   'currently held are %(locks)s. This is probably a bug. '
++                   'Please report it. Include the following: [%(stack)s].'),
++                 {'locks': local.strong_store.locks_held,
++                  'stack': stack})
++        return True
++
++    return False
+ 
+ 
+-def call(context, topic, msg, timeout=None):
++def call(context, topic, msg, timeout=None, check_for_lock=False):
+     """Invoke a remote method that returns something.
+ 
+     :param context: Information that identifies the user that has made this
+@@ -101,13 +127,17 @@ def call(context, topic, msg, timeout=None):
+                                              "args" : dict_of_kwargs }
+     :param timeout: int, number of seconds to use for a response timeout.
+                     If set, this overrides the rpc_response_timeout option.
++    :param check_for_lock: if True, a warning is emitted if a RPC call is made
++                    with a lock held.
+ 
+     :returns: A dict from the remote method.
+ 
+     :raises: openstack.common.rpc.common.Timeout if a complete response
+              is not received before the timeout is reached.
+     """
+-    return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
++    if check_for_lock:
++        _check_for_lock()
++    return _get_impl().call(CONF, context, topic, msg, timeout)
+ 
+ 
+ def cast(context, topic, msg):
+@@ -125,7 +155,7 @@ def cast(context, topic, msg):
+ 
+     :returns: None
+     """
+-    return _get_impl().cast(cfg.CONF, context, topic, msg)
++    return _get_impl().cast(CONF, context, topic, msg)
+ 
+ 
+ def fanout_cast(context, topic, msg):
+@@ -146,10 +176,10 @@ def fanout_cast(context, topic, msg):
+ 
+     :returns: None
+     """
+-    return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
++    return _get_impl().fanout_cast(CONF, context, topic, msg)
+ 
+ 
+-def multicall(context, topic, msg, timeout=None):
++def multicall(context, topic, msg, timeout=None, check_for_lock=False):
+     """Invoke a remote method and get back an iterator.
+ 
+     In this case, the remote method will be returning multiple values in
+@@ -167,6 +197,8 @@ def multicall(context, topic, msg, timeout=None):
+                                              "args" : dict_of_kwargs }
+     :param timeout: int, number of seconds to use for a response timeout.
+                     If set, this overrides the rpc_response_timeout option.
++    :param check_for_lock: if True, a warning is emitted if a RPC call is made
++                    with a lock held.
+ 
+     :returns: An iterator.  The iterator will yield a tuple (N, X) where N is
+               an index that starts at 0 and increases by one for each value
+@@ -176,7 +208,9 @@ def multicall(context, topic, msg, timeout=None):
+     :raises: openstack.common.rpc.common.Timeout if a complete response
+              is not received before the timeout is reached.
+     """
+-    return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
++    if check_for_lock:
++        _check_for_lock()
++    return _get_impl().multicall(CONF, context, topic, msg, timeout)
+ 
+ 
+ def notify(context, topic, msg, envelope=False):
+@@ -218,7 +252,7 @@ def cast_to_server(context, server_params, topic, msg):
+ 
+     :returns: None
+     """
+-    return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
++    return _get_impl().cast_to_server(CONF, context, server_params, topic,
+                                       msg)
+ 
+ 
+@@ -234,7 +268,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
+ 
+     :returns: None
+     """
+-    return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
++    return _get_impl().fanout_cast_to_server(CONF, context, server_params,
+                                              topic, msg)
+ 
+ 
+@@ -264,10 +298,10 @@ def _get_impl():
+     global _RPCIMPL
+     if _RPCIMPL is None:
+         try:
+-            _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
++            _RPCIMPL = importutils.import_module(CONF.rpc_backend)
+         except ImportError:
+             # For backwards compatibility with older nova config.
+-            impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+-                                                'nova.openstack.common.rpc')
++            impl = CONF.rpc_backend.replace('nova.rpc',
++                                            'nova.openstack.common.rpc')
+             _RPCIMPL = importutils.import_module(impl)
+     return _RPCIMPL
+diff --git a/heat/openstack/common/rpc/amqp.py b/heat/openstack/common/rpc/amqp.py
+index 9d69dbb..e9d1bea 100644
+--- a/heat/openstack/common/rpc/amqp.py
++++ b/heat/openstack/common/rpc/amqp.py
+@@ -25,13 +25,19 @@ Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
+ AMQP, but is deprecated and predates this code.
+ """
+ 
++import collections
+ import inspect
+ import sys
+ import uuid
+ 
+ from eventlet import greenpool
+ from eventlet import pools
++from eventlet import queue
+ from eventlet import semaphore
++# TODO(pekowsk): Remove import cfg and below comment in Havana.
++# This import should no longer be needed when the amqp_rpc_single_reply_queue
++# option is removed.
++from oslo.config import cfg
+ 
+ from heat.openstack.common import excutils
+ from heat.openstack.common.gettextutils import _
+@@ -40,6 +46,17 @@ from heat.openstack.common import log as logging
+ from heat.openstack.common.rpc import common as rpc_common
+ 
+ 
++# TODO(pekowski): Remove this option in Havana.
++amqp_opts = [
++    cfg.BoolOpt('amqp_rpc_single_reply_queue',
++                default=False,
++                help='Enable a fast single reply queue if using AMQP based '
++                'RPC like RabbitMQ or Qpid.'),
++]
++
++cfg.CONF.register_opts(amqp_opts)
++
++UNIQUE_ID = '_unique_id'
+ LOG = logging.getLogger(__name__)
+ 
+ 
+@@ -51,6 +68,7 @@ class Pool(pools.Pool):
+         kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
+         kwargs.setdefault("order_as_stack", True)
+         super(Pool, self).__init__(*args, **kwargs)
++        self.reply_proxy = None
+ 
+     # TODO(comstud): Timeout connections not used in a while
+     def create(self):
+@@ -60,6 +78,16 @@ class Pool(pools.Pool):
+     def empty(self):
+         while self.free_items:
+             self.get().close()
++        # Force a new connection pool to be created.
++        # Note that this was added due to failing unit test cases. The issue
++        # is the above "while loop" gets all the cached connections from the
++        # pool and closes them, but never returns them to the pool, a pool
++        # leak. The unit tests hang waiting for an item to be returned to the
++        # pool. The unit tests get here via the teatDown() method. In the run
++        # time code, it gets here via cleanup() and only appears in service.py
++        # just before doing a sys.exit(), so cleanup() only happens once and
++        # the leakage is not a problem.
++        self.connection_cls.pool = None
+ 
+ 
+ _pool_create_sem = semaphore.Semaphore()
+@@ -137,6 +165,12 @@ class ConnectionContext(rpc_common.Connection):
+     def create_worker(self, topic, proxy, pool_name):
+         self.connection.create_worker(topic, proxy, pool_name)
+ 
++    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
++        self.connection.join_consumer_pool(callback,
++                                           pool_name,
++                                           topic,
++                                           exchange_name)
++
+     def consume_in_thread(self):
+         self.connection.consume_in_thread()
+ 
+@@ -148,8 +182,45 @@ class ConnectionContext(rpc_common.Connection):
+             raise rpc_common.InvalidRPCConnectionReuse()
+ 
+ 
+-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+-              ending=False, log_failure=True):
++class ReplyProxy(ConnectionContext):
++    """ Connection class for RPC replies / callbacks """
++    def __init__(self, conf, connection_pool):
++        self._call_waiters = {}
++        self._num_call_waiters = 0
++        self._num_call_waiters_wrn_threshhold = 10
++        self._reply_q = 'reply_' + uuid.uuid4().hex
++        super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
++        self.declare_direct_consumer(self._reply_q, self._process_data)
++        self.consume_in_thread()
++
++    def _process_data(self, message_data):
++        msg_id = message_data.pop('_msg_id', None)
++        waiter = self._call_waiters.get(msg_id)
++        if not waiter:
++            LOG.warn(_('no calling threads waiting for msg_id : %s'
++                       ', message : %s') % (msg_id, message_data))
++        else:
++            waiter.put(message_data)
++
++    def add_call_waiter(self, waiter, msg_id):
++        self._num_call_waiters += 1
++        if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
++            LOG.warn(_('Number of call waiters is greater than warning '
++                       'threshhold: %d. There could be a MulticallProxyWaiter '
++                       'leak.') % self._num_call_waiters_wrn_threshhold)
++            self._num_call_waiters_wrn_threshhold *= 2
++        self._call_waiters[msg_id] = waiter
++
++    def del_call_waiter(self, msg_id):
++        self._num_call_waiters -= 1
++        del self._call_waiters[msg_id]
++
++    def get_reply_q(self):
++        return self._reply_q
++
++
++def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
++              failure=None, ending=False, log_failure=True):
+     """Sends a reply or an error on the channel signified by msg_id.
+ 
+     Failure should be a sys.exc_info() tuple.
+@@ -168,13 +239,22 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+                    'failure': failure}
+         if ending:
+             msg['ending'] = True
+-        conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
++        _add_unique_id(msg)
++        # If a reply_q exists, add the msg_id to the reply and pass the
++        # reply_q to direct_send() to use it as the response queue.
++        # Otherwise use the msg_id for backward compatibilty.
++        if reply_q:
++            msg['_msg_id'] = msg_id
++            conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
++        else:
++            conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+ 
+ 
+ class RpcContext(rpc_common.CommonRpcContext):
+     """Context that supports replying to a rpc.call"""
+     def __init__(self, **kwargs):
+         self.msg_id = kwargs.pop('msg_id', None)
++        self.reply_q = kwargs.pop('reply_q', None)
+         self.conf = kwargs.pop('conf')
+         super(RpcContext, self).__init__(**kwargs)
+ 
+@@ -182,13 +262,14 @@ class RpcContext(rpc_common.CommonRpcContext):
+         values = self.to_dict()
+         values['conf'] = self.conf
+         values['msg_id'] = self.msg_id
++        values['reply_q'] = self.reply_q
+         return self.__class__(**values)
+ 
+     def reply(self, reply=None, failure=None, ending=False,
+               connection_pool=None, log_failure=True):
+         if self.msg_id:
+-            msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
+-                      ending, log_failure)
++            msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
++                      reply, failure, ending, log_failure)
+             if ending:
+                 self.msg_id = None
+ 
+@@ -204,6 +285,7 @@ def unpack_context(conf, msg):
+             value = msg.pop(key)
+             context_dict[key[9:]] = value
+     context_dict['msg_id'] = msg.pop('_msg_id', None)
++    context_dict['reply_q'] = msg.pop('_reply_q', None)
+     context_dict['conf'] = conf
+     ctx = RpcContext.from_dict(context_dict)
+     rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+@@ -224,15 +306,86 @@ def pack_context(msg, context):
+     msg.update(context_d)
+ 
+ 
+-class ProxyCallback(object):
+-    """Calls methods on a proxy object based on method and args."""
++class _MsgIdCache(object):
++    """This class checks any duplicate messages."""
+ 
+-    def __init__(self, conf, proxy, connection_pool):
+-        self.proxy = proxy
++    # NOTE: This value is considered can be a configuration item, but
++    #       it is not necessary to change its value in most cases,
++    #       so let this value as static for now.
++    DUP_MSG_CHECK_SIZE = 16
++
++    def __init__(self, **kwargs):
++        self.prev_msgids = collections.deque([],
++                                             maxlen=self.DUP_MSG_CHECK_SIZE)
++
++    def check_duplicate_message(self, message_data):
++        """AMQP consumers may read same message twice when exceptions occur
++           before ack is returned. This method prevents doing it.
++        """
++        if UNIQUE_ID in message_data:
++            msg_id = message_data[UNIQUE_ID]
++            if msg_id not in self.prev_msgids:
++                self.prev_msgids.append(msg_id)
++            else:
++                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
++
++
++def _add_unique_id(msg):
++    """Add unique_id for checking duplicate messages."""
++    unique_id = uuid.uuid4().hex
++    msg.update({UNIQUE_ID: unique_id})
++    LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
++
++
++class _ThreadPoolWithWait(object):
++    """Base class for a delayed invocation manager used by
++    the Connection class to start up green threads
++    to handle incoming messages.
++    """
++
++    def __init__(self, conf, connection_pool):
+         self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
+         self.connection_pool = connection_pool
+         self.conf = conf
+ 
++    def wait(self):
++        """Wait for all callback threads to exit."""
++        self.pool.waitall()
++
++
++class CallbackWrapper(_ThreadPoolWithWait):
++    """Wraps a straight callback to allow it to be invoked in a green
++    thread.
++    """
++
++    def __init__(self, conf, callback, connection_pool):
++        """
++        :param conf: cfg.CONF instance
++        :param callback: a callable (probably a function)
++        :param connection_pool: connection pool as returned by
++                                get_connection_pool()
++        """
++        super(CallbackWrapper, self).__init__(
++            conf=conf,
++            connection_pool=connection_pool,
++        )
++        self.callback = callback
++
++    def __call__(self, message_data):
++        self.pool.spawn_n(self.callback, message_data)
++
++
++class ProxyCallback(_ThreadPoolWithWait):
++    """Calls methods on a proxy object based on method and args."""
++
++    def __init__(self, conf, proxy, connection_pool):
++        super(ProxyCallback, self).__init__(
++            conf=conf,
++            connection_pool=connection_pool,
++        )
++        self.proxy = proxy
++        self.msg_id_cache = _MsgIdCache()
++
+     def __call__(self, message_data):
+         """Consumer callback to call a method on a proxy object.
+ 
+@@ -251,6 +404,7 @@ class ProxyCallback(object):
+         if hasattr(local.store, 'context'):
+             del local.store.context
+         rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
++        self.msg_id_cache.check_duplicate_message(message_data)
+         ctxt = unpack_context(self.conf, message_data)
+         method = message_data.get('method')
+         args = message_data.get('args', {})
+@@ -289,15 +443,74 @@ class ProxyCallback(object):
+                        connection_pool=self.connection_pool,
+                        log_failure=False)
+         except Exception:
+-            LOG.exception(_('Exception during message handling'))
+-            ctxt.reply(None, sys.exc_info(),
+-                       connection_pool=self.connection_pool)
++            # sys.exc_info() is deleted by LOG.exception().
++            exc_info = sys.exc_info()
++            LOG.error(_('Exception during message handling'),
++                      exc_info=exc_info)
++            ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
++
++
++class MulticallProxyWaiter(object):
++    def __init__(self, conf, msg_id, timeout, connection_pool):
++        self._msg_id = msg_id
++        self._timeout = timeout or conf.rpc_response_timeout
++        self._reply_proxy = connection_pool.reply_proxy
++        self._done = False
++        self._got_ending = False
++        self._conf = conf
++        self._dataqueue = queue.LightQueue()
++        # Add this caller to the reply proxy's call_waiters
++        self._reply_proxy.add_call_waiter(self, self._msg_id)
++        self.msg_id_cache = _MsgIdCache()
+ 
+-    def wait(self):
+-        """Wait for all callback threads to exit."""
+-        self.pool.waitall()
++    def put(self, data):
++        self._dataqueue.put(data)
++
++    def done(self):
++        if self._done:
++            return
++        self._done = True
++        # Remove this caller from reply proxy's call_waiters
++        self._reply_proxy.del_call_waiter(self._msg_id)
+ 
++    def _process_data(self, data):
++        result = None
++        self.msg_id_cache.check_duplicate_message(data)
++        if data['failure']:
++            failure = data['failure']
++            result = rpc_common.deserialize_remote_exception(self._conf,
++                                                             failure)
++        elif data.get('ending', False):
++            self._got_ending = True
++        else:
++            result = data['result']
++        return result
+ 
++    def __iter__(self):
++        """Return a result until we get a reply with an 'ending" flag"""
++        if self._done:
++            raise StopIteration
++        while True:
++            try:
++                data = self._dataqueue.get(timeout=self._timeout)
++                result = self._process_data(data)
++            except queue.Empty:
++                LOG.exception(_('Timed out waiting for RPC response.'))
++                self.done()
++                raise rpc_common.Timeout()
++            except Exception:
++                with excutils.save_and_reraise_exception():
++                    self.done()
++            if self._got_ending:
++                self.done()
++                raise StopIteration
++            if isinstance(result, Exception):
++                self.done()
++                raise result
++            yield result
++
++
++#TODO(pekowski): Remove MulticallWaiter() in Havana.
+ class MulticallWaiter(object):
+     def __init__(self, conf, connection, timeout):
+         self._connection = connection
+@@ -307,6 +520,7 @@ class MulticallWaiter(object):
+         self._done = False
+         self._got_ending = False
+         self._conf = conf
++        self.msg_id_cache = _MsgIdCache()
+ 
+     def done(self):
+         if self._done:
+@@ -318,6 +532,7 @@ class MulticallWaiter(object):
+ 
+     def __call__(self, data):
+         """The consume() callback will call this.  Store the result."""
++        self.msg_id_cache.check_duplicate_message(data)
+         if data['failure']:
+             failure = data['failure']
+             self._result = rpc_common.deserialize_remote_exception(self._conf,
+@@ -353,22 +568,41 @@ def create_connection(conf, new, connection_pool):
+     return ConnectionContext(conf, connection_pool, pooled=not new)
+ 
+ 
++_reply_proxy_create_sem = semaphore.Semaphore()
++
++
+ def multicall(conf, context, topic, msg, timeout, connection_pool):
+     """Make a call that returns multiple times."""
++    # TODO(pekowski): Remove all these comments in Havana.
++    # For amqp_rpc_single_reply_queue = False,
+     # Can't use 'with' for multicall, as it returns an iterator
+     # that will continue to use the connection.  When it's done,
+     # connection.close() will get called which will put it back into
+     # the pool
++    # For amqp_rpc_single_reply_queue = True,
++    # The 'with' statement is mandatory for closing the connection
+     LOG.debug(_('Making synchronous call on %s ...'), topic)
+     msg_id = uuid.uuid4().hex
+     msg.update({'_msg_id': msg_id})
+     LOG.debug(_('MSG_ID is %s') % (msg_id))
++    _add_unique_id(msg)
+     pack_context(msg, context)
+ 
+-    conn = ConnectionContext(conf, connection_pool)
+-    wait_msg = MulticallWaiter(conf, conn, timeout)
+-    conn.declare_direct_consumer(msg_id, wait_msg)
+-    conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
++    # TODO(pekowski): Remove this flag and the code under the if clause
++    #                 in Havana.
++    if not conf.amqp_rpc_single_reply_queue:
++        conn = ConnectionContext(conf, connection_pool)
++        wait_msg = MulticallWaiter(conf, conn, timeout)
++        conn.declare_direct_consumer(msg_id, wait_msg)
++        conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
++    else:
++        with _reply_proxy_create_sem:
++            if not connection_pool.reply_proxy:
++                connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
++        msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
++        wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
++        with ConnectionContext(conf, connection_pool) as conn:
++            conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+     return wait_msg
+ 
+ 
+@@ -385,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
+ def cast(conf, context, topic, msg, connection_pool):
+     """Sends a message on a topic without waiting for a response."""
+     LOG.debug(_('Making asynchronous cast on %s...'), topic)
++    _add_unique_id(msg)
+     pack_context(msg, context)
+     with ConnectionContext(conf, connection_pool) as conn:
+         conn.topic_send(topic, rpc_common.serialize_msg(msg))
+@@ -393,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool):
+ def fanout_cast(conf, context, topic, msg, connection_pool):
+     """Sends a message on a fanout exchange without waiting for a response."""
+     LOG.debug(_('Making asynchronous fanout cast...'))
++    _add_unique_id(msg)
+     pack_context(msg, context)
+     with ConnectionContext(conf, connection_pool) as conn:
+         conn.fanout_send(topic, rpc_common.serialize_msg(msg))
+@@ -400,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
+ 
+ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
+     """Sends a message on a topic to a specific server."""
++    _add_unique_id(msg)
+     pack_context(msg, context)
+     with ConnectionContext(conf, connection_pool, pooled=False,
+                            server_params=server_params) as conn:
+@@ -409,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
+ def fanout_cast_to_server(conf, context, server_params, topic, msg,
+                           connection_pool):
+     """Sends a message on a fanout exchange to a specific server."""
++    _add_unique_id(msg)
+     pack_context(msg, context)
+     with ConnectionContext(conf, connection_pool, pooled=False,
+                            server_params=server_params) as conn:
+@@ -420,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
+     LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+               dict(event_type=msg.get('event_type'),
+                    topic=topic))
++    _add_unique_id(msg)
+     pack_context(msg, context)
+     with ConnectionContext(conf, connection_pool) as conn:
+         if envelope:
+diff --git a/heat/openstack/common/rpc/common.py b/heat/openstack/common/rpc/common.py
+index ed5a9d2..3a59670 100644
+--- a/heat/openstack/common/rpc/common.py
++++ b/heat/openstack/common/rpc/common.py
+@@ -49,8 +49,8 @@ deserialize_msg().
+ The current message format (version 2.0) is very simple.  It is:
+ 
+     {
+-        'heat.version': <RPC Envelope Version as a String>,
+-        'heat.message': <Application Message Payload, JSON encoded>
++        'oslo.version': <RPC Envelope Version as a String>,
++        'oslo.message': <Application Message Payload, JSON encoded>
+     }
+ 
+ Message format version '1.0' is just considered to be the messages we sent
+@@ -66,8 +66,8 @@ to the messaging libraries as a dict.
+ '''
+ _RPC_ENVELOPE_VERSION = '2.0'
+ 
+-_VERSION_KEY = 'heat.version'
+-_MESSAGE_KEY = 'heat.message'
++_VERSION_KEY = 'oslo.version'
++_MESSAGE_KEY = 'oslo.message'
+ 
+ 
+ # TODO(russellb) Turn this on after Grizzly.
+@@ -125,6 +125,10 @@ class Timeout(RPCException):
+     message = _("Timeout while waiting on RPC response.")
+ 
+ 
++class DuplicateMessageError(RPCException):
++    message = _("Found duplicate message(%(msg_id)s). Skipping it.")
++
++
+ class InvalidRPCConnectionReuse(RPCException):
+     message = _("Invalid reuse of an RPC connection.")
+ 
+@@ -197,6 +201,28 @@ class Connection(object):
+         """
+         raise NotImplementedError()
+ 
++    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
++        """Register as a member of a group of consumers for a given topic from
++        the specified exchange.
++
++        Exactly one member of a given pool will receive each message.
++
++        A message will be delivered to multiple pools, if more than
++        one is created.
++
++        :param callback: Callable to be invoked for each message.
++        :type callback: callable accepting one argument
++        :param pool_name: The name of the consumer pool.
++        :type pool_name: str
++        :param topic: The routing topic for desired messages.
++        :type topic: str
++        :param exchange_name: The name of the message exchange where
++                              the client should attach. Defaults to
++                              the configured exchange.
++        :type exchange_name: str
++        """
++        raise NotImplementedError()
++
+     def consume_in_thread(self):
+         """Spawn a thread to handle incoming messages.
+ 
+diff --git a/heat/openstack/common/rpc/impl_fake.py b/heat/openstack/common/rpc/impl_fake.py
+index 6c2e130..94597b4 100644
+--- a/heat/openstack/common/rpc/impl_fake.py
++++ b/heat/openstack/common/rpc/impl_fake.py
+@@ -1,6 +1,6 @@
+ # vim: tabstop=4 shiftwidth=4 softtabstop=4
+ 
+-#    Copyright 2011 OpenStack LLC
++#    Copyright 2011 OpenStack Foundation
+ #
+ #    Licensed under the Apache License, Version 2.0 (the "License"); you may
+ #    not use this file except in compliance with the License. You may obtain
+diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py
+index 360b78b..886fbcf 100644
+--- a/heat/openstack/common/rpc/impl_kombu.py
++++ b/heat/openstack/common/rpc/impl_kombu.py
+@@ -1,6 +1,6 @@
+ # vim: tabstop=4 shiftwidth=4 softtabstop=4
+ 
+-#    Copyright 2011 OpenStack LLC
++#    Copyright 2011 OpenStack Foundation
+ #
+ #    Licensed under the Apache License, Version 2.0 (the "License"); you may
+ #    not use this file except in compliance with the License. You may obtain
+@@ -66,7 +66,8 @@ kombu_opts = [
+                help='the RabbitMQ userid'),
+     cfg.StrOpt('rabbit_password',
+                default='guest',
+-               help='the RabbitMQ password'),
++               help='the RabbitMQ password',
++               secret=True),
+     cfg.StrOpt('rabbit_virtual_host',
+                default='/',
+                help='the RabbitMQ virtual host'),
+@@ -164,9 +165,10 @@ class ConsumerBase(object):
+             try:
+                 msg = rpc_common.deserialize_msg(message.payload)
+                 callback(msg)
+-                message.ack()
+             except Exception:
+                 LOG.exception(_("Failed to process message... skipping it."))
++            finally:
++                message.ack()
+ 
+         self.queue.consume(*args, callback=_callback, **options)
+ 
+@@ -196,6 +198,7 @@ class DirectConsumer(ConsumerBase):
+         """
+         # Default options
+         options = {'durable': False,
++                   'queue_arguments': _get_queue_arguments(conf),
+                    'auto_delete': True,
+                    'exclusive': False}
+         options.update(kwargs)
+@@ -621,8 +624,8 @@ class Connection(object):
+ 
+         def _error_callback(exc):
+             if isinstance(exc, socket.timeout):
+-                LOG.exception(_('Timed out waiting for RPC response: %s') %
+-                              str(exc))
++                LOG.debug(_('Timed out waiting for RPC response: %s') %
++                          str(exc))
+                 raise rpc_common.Timeout()
+             else:
+                 LOG.exception(_('Failed to consume message from queue: %s') %
+@@ -749,6 +752,30 @@ class Connection(object):
+         self.proxy_callbacks.append(proxy_cb)
+         self.declare_topic_consumer(topic, proxy_cb, pool_name)
+ 
++    def join_consumer_pool(self, callback, pool_name, topic,
++                           exchange_name=None):
++        """Register as a member of a group of consumers for a given topic from
++        the specified exchange.
++
++        Exactly one member of a given pool will receive each message.
++
++        A message will be delivered to multiple pools, if more than
++        one is created.
++        """
++        callback_wrapper = rpc_amqp.CallbackWrapper(
++            conf=self.conf,
++            callback=callback,
++            connection_pool=rpc_amqp.get_connection_pool(self.conf,
++                                                         Connection),
++        )
++        self.proxy_callbacks.append(callback_wrapper)
++        self.declare_topic_consumer(
++            queue_name=pool_name,
++            topic=topic,
++            exchange_name=exchange_name,
++            callback=callback_wrapper,
++        )
++
+ 
+ def create_connection(conf, new=True):
+     """Create a connection"""
+diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py
+index 3acfe69..88c531a 100644
+--- a/heat/openstack/common/rpc/impl_qpid.py
++++ b/heat/openstack/common/rpc/impl_qpid.py
+@@ -1,6 +1,6 @@
+ # vim: tabstop=4 shiftwidth=4 softtabstop=4
+ 
+-#    Copyright 2011 OpenStack LLC
++#    Copyright 2011 OpenStack Foundation
+ #    Copyright 2011 - 2012, Red Hat, Inc.
+ #
+ #    Licensed under the Apache License, Version 2.0 (the "License"); you may
+@@ -40,8 +40,8 @@ qpid_opts = [
+     cfg.StrOpt('qpid_hostname',
+                default='localhost',
+                help='Qpid broker hostname'),
+-    cfg.StrOpt('qpid_port',
+-               default='5672',
++    cfg.IntOpt('qpid_port',
++               default=5672,
+                help='Qpid broker port'),
+     cfg.ListOpt('qpid_hosts',
+                 default=['$qpid_hostname:$qpid_port'],
+@@ -51,7 +51,8 @@ qpid_opts = [
+                help='Username for qpid connection'),
+     cfg.StrOpt('qpid_password',
+                default='',
+-               help='Password for qpid connection'),
++               help='Password for qpid connection',
++               secret=True),
+     cfg.StrOpt('qpid_sasl_mechanisms',
+                default='',
+                help='Space separated list of SASL mechanisms to use for auth'),
+@@ -68,6 +69,8 @@ qpid_opts = [
+ 
+ cfg.CONF.register_opts(qpid_opts)
+ 
++JSON_CONTENT_TYPE = 'application/json; charset=utf8'
++
+ 
+ class ConsumerBase(object):
+     """Consumer base class."""
+@@ -122,10 +125,27 @@ class ConsumerBase(object):
+         self.receiver = session.receiver(self.address)
+         self.receiver.capacity = 1
+ 
++    def _unpack_json_msg(self, msg):
++        """Load the JSON data in msg if msg.content_type indicates that it
++           is necessary.  Put the loaded data back into msg.content and
++           update msg.content_type appropriately.
++
++        A Qpid Message containing a dict will have a content_type of
++        'amqp/map', whereas one containing a string that needs to be converted
++        back from JSON will have a content_type of JSON_CONTENT_TYPE.
++
++        :param msg: a Qpid Message object
++        :returns: None
++        """
++        if msg.content_type == JSON_CONTENT_TYPE:
++            msg.content = jsonutils.loads(msg.content)
++            msg.content_type = 'amqp/map'
++
+     def consume(self):
+         """Fetch the message and pass it to the callback object"""
+         message = self.receiver.fetch()
+         try:
++            self._unpack_json_msg(message)
+             msg = rpc_common.deserialize_msg(message.content)
+             self.callback(msg)
+         except Exception:
+@@ -330,15 +350,16 @@ class Connection(object):
+ 
+     def reconnect(self):
+         """Handles reconnecting and re-establishing sessions and queues"""
+-        if self.connection.opened():
+-            try:
+-                self.connection.close()
+-            except qpid_exceptions.ConnectionError:
+-                pass
+-
+         attempt = 0
+         delay = 1
+         while True:
++            # Close the session if necessary
++            if self.connection.opened():
++                try:
++                    self.connection.close()
++                except qpid_exceptions.ConnectionError:
++                    pass
++
+             broker = self.brokers[attempt % len(self.brokers)]
+             attempt += 1
+ 
+@@ -414,8 +435,8 @@ class Connection(object):
+ 
+         def _error_callback(exc):
+             if isinstance(exc, qpid_exceptions.Empty):
+-                LOG.exception(_('Timed out waiting for RPC response: %s') %
+-                              str(exc))
++                LOG.debug(_('Timed out waiting for RPC response: %s') %
++                          str(exc))
+                 raise rpc_common.Timeout()
+             else:
+                 LOG.exception(_('Failed to consume message from queue: %s') %
+@@ -559,6 +580,34 @@ class Connection(object):
+ 
+         return consumer
+ 
++    def join_consumer_pool(self, callback, pool_name, topic,
++                           exchange_name=None):
++        """Register as a member of a group of consumers for a given topic from
++        the specified exchange.
++
++        Exactly one member of a given pool will receive each message.
++
++        A message will be delivered to multiple pools, if more than
++        one is created.
++        """
++        callback_wrapper = rpc_amqp.CallbackWrapper(
++            conf=self.conf,
++            callback=callback,
++            connection_pool=rpc_amqp.get_connection_pool(self.conf,
++                                                         Connection),
++        )
++        self.proxy_callbacks.append(callback_wrapper)
++
++        consumer = TopicConsumer(conf=self.conf,
++                                 session=self.session,
++                                 topic=topic,
++                                 callback=callback_wrapper,
++                                 name=pool_name,
++                                 exchange_name=exchange_name)
++
++        self._register_consumer(consumer)
++        return consumer
++
+ 
+ def create_connection(conf, new=True):
+     """Create a connection"""
+diff --git a/heat/openstack/common/rpc/impl_zmq.py b/heat/openstack/common/rpc/impl_zmq.py
+index b41c53d..316d92b 100644
+--- a/heat/openstack/common/rpc/impl_zmq.py
++++ b/heat/openstack/common/rpc/impl_zmq.py
+@@ -16,6 +16,7 @@
+ 
+ import os
+ import pprint
++import re
+ import socket
+ import sys
+ import types
+@@ -25,6 +26,7 @@ import eventlet
+ import greenlet
+ from oslo.config import cfg
+ 
++from heat.openstack.common import excutils
+ from heat.openstack.common.gettextutils import _
+ from heat.openstack.common import importutils
+ from heat.openstack.common import jsonutils
+@@ -89,10 +91,10 @@ def _serialize(data):
+     Error if a developer passes us bad data.
+     """
+     try:
+-        return str(jsonutils.dumps(data, ensure_ascii=True))
++        return jsonutils.dumps(data, ensure_ascii=True)
+     except TypeError:
+-        LOG.error(_("JSON serialization failed."))
+-        raise
++        with excutils.save_and_reraise_exception():
++            LOG.error(_("JSON serialization failed."))
+ 
+ 
+ def _deserialize(data):
+@@ -216,11 +218,18 @@ class ZmqClient(object):
+             socket_type = zmq.PUSH
+         self.outq = ZmqSocket(addr, socket_type, bind=bind)
+ 
+-    def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+-        if serialize:
+-            data = rpc_common.serialize_msg(data, force_envelope)
+-        self.outq.send([str(msg_id), str(topic), str('cast'),
+-                        _serialize(data)])
++    def cast(self, msg_id, topic, data, envelope=False):
++        msg_id = msg_id or 0
++
++        if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
++            self.outq.send(map(bytes,
++                           (msg_id, topic, 'cast', _serialize(data))))
++            return
++
++        rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
++        zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
++        self.outq.send(map(bytes,
++                       (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
+ 
+     def close(self):
+         self.outq.close()
+@@ -294,13 +303,13 @@ class InternalContext(object):
+             ctx.replies)
+ 
+         LOG.debug(_("Sending reply"))
+-        cast(CONF, ctx, topic, {
++        _multi_send(_cast, ctx, topic, {
+             'method': '-process_reply',
+             'args': {
+-                'msg_id': msg_id,
++                'msg_id': msg_id,  # Include for Folsom compat.
+                 'response': response
+             }
+-        })
++        }, _msg_id=msg_id)
+ 
+ 
+ class ConsumerBase(object):
+@@ -319,7 +328,7 @@ class ConsumerBase(object):
+         else:
+             return [result]
+ 
+-    def process(self, style, target, proxy, ctx, data):
++    def process(self, proxy, ctx, data):
+         data.setdefault('version', None)
+         data.setdefault('args', {})
+ 
+@@ -423,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor):
+ 
+     def __init__(self, conf):
+         super(ZmqProxy, self).__init__(conf)
++        pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
++        self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
+ 
+         self.topic_proxy = {}
+ 
+@@ -431,21 +442,15 @@ class ZmqProxy(ZmqBaseReactor):
+ 
+         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+         data = sock.recv()
+-        msg_id, topic, style, in_msg = data
+-        topic = topic.split('.', 1)[0]
++        topic = data[1]
+ 
+         LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+ 
+-        # Handle zmq_replies magic
+         if topic.startswith('fanout~'):
+             sock_type = zmq.PUB
++            topic = topic.split('.', 1)[0]
+         elif topic.startswith('zmq_replies'):
+             sock_type = zmq.PUB
+-            inside = rpc_common.deserialize_msg(_deserialize(in_msg))
+-            msg_id = inside[-1]['args']['msg_id']
+-            response = inside[-1]['args']['response']
+-            LOG.debug(_("->response->%s"), response)
+-            data = [str(msg_id), _serialize(response)]
+         else:
+             sock_type = zmq.PUSH
+ 
+@@ -454,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor):
+                 LOG.info(_("Creating proxy for topic: %s"), topic)
+ 
+                 try:
++                    # The topic is received over the network,
++                    # don't trust this input.
++                    if self.badchars.search(topic) is not None:
++                        emsg = _("Topic contained dangerous characters.")
++                        LOG.warn(emsg)
++                        raise RPCException(emsg)
++
+                     out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+                                          (ipc_dir, topic),
+                                          sock_type, bind=True)
+@@ -510,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
+                               ipc_dir, run_as_root=True)
+                 utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+             except utils.ProcessExecutionError:
+-                LOG.error(_("Could not create IPC directory %s") %
+-                          (ipc_dir, ))
+-                raise
++                with excutils.save_and_reraise_exception():
++                    LOG.error(_("Could not create IPC directory %s") %
++                              (ipc_dir, ))
+ 
+         try:
+             self.register(consumption_proxy,
+@@ -520,13 +532,28 @@ class ZmqProxy(ZmqBaseReactor):
+                           zmq.PULL,
+                           out_bind=True)
+         except zmq.ZMQError:
+-            LOG.error(_("Could not create ZeroMQ receiver daemon. "
+-                        "Socket may already be in use."))
+-            raise
++            with excutils.save_and_reraise_exception():
++                LOG.error(_("Could not create ZeroMQ receiver daemon. "
++                            "Socket may already be in use."))
+ 
+         super(ZmqProxy, self).consume_in_thread()
+ 
+ 
++def unflatten_envelope(packenv):
++    """Unflattens the RPC envelope.
++       Takes a list and returns a dictionary.
++       i.e. [1,2,3,4] => {1: 2, 3: 4}
++    """
++    i = iter(packenv)
++    h = {}
++    try:
++        while True:
++            k = i.next()
++            h[k] = i.next()
++    except StopIteration:
++        return h
++
++
+ class ZmqReactor(ZmqBaseReactor):
+     """
+     A consumer class implementing a
+@@ -547,38 +574,53 @@ class ZmqReactor(ZmqBaseReactor):
+             self.mapping[sock].send(data)
+             return
+ 
+-        msg_id, topic, style, in_msg = data
++        proxy = self.proxies[sock]
+ 
+-        ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
+-        ctx = RpcContext.unmarshal(ctx)
++        if data[2] == 'cast':  # Legacy protocol
++            packenv = data[3]
+ 
+-        proxy = self.proxies[sock]
++            ctx, msg = _deserialize(packenv)
++            request = rpc_common.deserialize_msg(msg)
++            ctx = RpcContext.unmarshal(ctx)
++        elif data[2] == 'impl_zmq_v2':
++            packenv = data[4:]
+ 
+-        self.pool.spawn_n(self.process, style, topic,
+-                          proxy, ctx, request)
++            msg = unflatten_envelope(packenv)
++            request = rpc_common.deserialize_msg(msg)
++
++            # Unmarshal only after verifying the message.
++            ctx = RpcContext.unmarshal(data[3])
++        else:
++            LOG.error(_("ZMQ Envelope version unsupported or unknown."))
++            return
++
++        self.pool.spawn_n(self.process, proxy, ctx, request)
+ 
+ 
+ class Connection(rpc_common.Connection):
+     """Manages connections and threads."""
+ 
+     def __init__(self, conf):
++        self.topics = []
+         self.reactor = ZmqReactor(conf)
+ 
+     def create_consumer(self, topic, proxy, fanout=False):
+-        # Only consume on the base topic name.
+-        topic = topic.split('.', 1)[0]
+-
+-        LOG.info(_("Create Consumer for topic (%(topic)s)") %
+-                 {'topic': topic})
++        # Register with matchmaker.
++        _get_matchmaker().register(topic, CONF.rpc_zmq_host)
+ 
+         # Subscription scenarios
+         if fanout:
+-            subscribe = ('', fanout)[type(fanout) == str]
+             sock_type = zmq.SUB
+-            topic = 'fanout~' + topic
++            subscribe = ('', fanout)[type(fanout) == str]
++            topic = 'fanout~' + topic.split('.', 1)[0]
+         else:
+             sock_type = zmq.PULL
+             subscribe = None
++            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
++
++        if topic in self.topics:
++            LOG.info(_("Skipping topic registration. Already registered."))
++            return
+ 
+         # Receive messages from (local) proxy
+         inaddr = "ipc://%s/zmq_topic_%s" % \
+@@ -589,19 +631,26 @@ class Connection(rpc_common.Connection):
+ 
+         self.reactor.register(proxy, inaddr, sock_type,
+                               subscribe=subscribe, in_bind=False)
++        self.topics.append(topic)
+ 
+     def close(self):
++        _get_matchmaker().stop_heartbeat()
++        for topic in self.topics:
++            _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
++
+         self.reactor.close()
++        self.topics = []
+ 
+     def wait(self):
+         self.reactor.wait()
+ 
+     def consume_in_thread(self):
++        _get_matchmaker().start_heartbeat()
+         self.reactor.consume_in_thread()
+ 
+ 
+-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+-          force_envelope=False):
++def _cast(addr, context, topic, msg, timeout=None, envelope=False,
++          _msg_id=None):
+     timeout_cast = timeout or CONF.rpc_cast_timeout
+     payload = [RpcContext.marshal(context), msg]
+ 
+@@ -610,7 +659,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+             conn = ZmqClient(addr)
+ 
+             # assumes cast can't return an exception
+-            conn.cast(msg_id, topic, payload, serialize, force_envelope)
++            conn.cast(_msg_id, topic, payload, envelope)
+         except zmq.ZMQError:
+             raise RPCException("Cast failed. ZMQ Socket Exception")
+         finally:
+@@ -618,8 +667,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+                 conn.close()
+ 
+ 
+-def _call(addr, context, msg_id, topic, msg, timeout=None,
+-          serialize=True, force_envelope=False):
++def _call(addr, context, topic, msg, timeout=None,
++          envelope=False):
+     # timeout_response is how long we wait for a response
+     timeout = timeout or CONF.rpc_response_timeout
+ 
+@@ -649,23 +698,36 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
+     with Timeout(timeout, exception=rpc_common.Timeout):
+         try:
+             msg_waiter = ZmqSocket(
+-                "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
++                "ipc://%s/zmq_topic_zmq_replies.%s" %
++                (CONF.rpc_zmq_ipc_dir,
++                 CONF.rpc_zmq_host),
+                 zmq.SUB, subscribe=msg_id, bind=False
+             )
+ 
+             LOG.debug(_("Sending cast"))
+-            _cast(addr, context, msg_id, topic, payload,
+-                  serialize=serialize, force_envelope=force_envelope)
++            _cast(addr, context, topic, payload, envelope)
+ 
+             LOG.debug(_("Cast sent; Waiting reply"))
+             # Blocks until receives reply
+             msg = msg_waiter.recv()
+             LOG.debug(_("Received message: %s"), msg)
+             LOG.debug(_("Unpacking response"))
+-            responses = _deserialize(msg[-1])
++
++            if msg[2] == 'cast':  # Legacy version
++                raw_msg = _deserialize(msg[-1])[-1]
++            elif msg[2] == 'impl_zmq_v2':
++                rpc_envelope = unflatten_envelope(msg[4:])
++                raw_msg = rpc_common.deserialize_msg(rpc_envelope)
++            else:
++                raise rpc_common.UnsupportedRpcEnvelopeVersion(
++                    _("Unsupported or unknown ZMQ envelope returned."))
++
++            responses = raw_msg['args']['response']
+         # ZMQError trumps the Timeout error.
+         except zmq.ZMQError:
+             raise RPCException("ZMQ Socket Error")
++        except (IndexError, KeyError):
++            raise RPCException(_("RPC Message Invalid."))
+         finally:
+             if 'msg_waiter' in vars():
+                 msg_waiter.close()
+@@ -681,8 +743,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
+     return responses[-1]
+ 
+ 
+-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+-                force_envelope=False):
++def _multi_send(method, context, topic, msg, timeout=None,
++                envelope=False, _msg_id=None):
+     """
+     Wraps the sending of messages,
+     dispatches to the matchmaker and sends
+@@ -699,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+         LOG.warn(_("No matchmaker results. Not casting."))
+         # While not strictly a timeout, callers know how to handle
+         # this exception and a timeout isn't too big a lie.
+-        raise rpc_common.Timeout, "No match from matchmaker."
++        raise rpc_common.Timeout(_("No match from matchmaker."))
+ 
+     # This supports brokerless fanout (addresses > 1)
+     for queue in queues:
+@@ -708,11 +770,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ 
+         if method.__name__ == '_cast':
+             eventlet.spawn_n(method, _addr, context,
+-                             _topic, _topic, msg, timeout, serialize,
+-                             force_envelope)
++                             _topic, msg, timeout, envelope,
++                             _msg_id)
+             return
+-        return method(_addr, context, _topic, _topic, msg, timeout,
+-                      serialize, force_envelope)
++        return method(_addr, context, _topic, msg, timeout,
++                      envelope)
+ 
+ 
+ def create_connection(conf, new=True):
+@@ -742,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
+     _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+ 
+ 
+-def notify(conf, context, topic, msg, **kwargs):
++def notify(conf, context, topic, msg, envelope):
+     """
+     Send notification event.
+     Notifications are sent to topic-priority.
+@@ -750,10 +812,8 @@ def notify(conf, context, topic, msg, **kwargs):
+     """
+     # NOTE(ewindisch): dot-priority in rpc notifier does not
+     # work with our assumptions.
+-    topic.replace('.', '-')
+-    kwargs['serialize'] = kwargs.pop('envelope')
+-    kwargs['force_envelope'] = True
+-    cast(conf, context, topic, msg, **kwargs)
++    topic = topic.replace('.', '-')
++    cast(conf, context, topic, msg, envelope=envelope)
+ 
+ 
+ def cleanup():
+@@ -777,8 +837,9 @@ def _get_ctxt():
+     return ZMQ_CTX
+ 
+ 
+-def _get_matchmaker():
++def _get_matchmaker(*args, **kwargs):
+     global matchmaker
+     if not matchmaker:
+-        matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
++        matchmaker = importutils.import_object(
++            CONF.rpc_zmq_matchmaker, *args, **kwargs)
+     return matchmaker
+diff --git a/heat/openstack/common/rpc/matchmaker.py b/heat/openstack/common/rpc/matchmaker.py
+index 88e540c..2d1df17 100644
+--- a/heat/openstack/common/rpc/matchmaker.py
++++ b/heat/openstack/common/rpc/matchmaker.py
+@@ -22,6 +22,7 @@ import contextlib
+ import itertools
+ import json
+ 
++import eventlet
+ from oslo.config import cfg
+ 
+ from heat.openstack.common.gettextutils import _
+@@ -33,6 +34,12 @@ matchmaker_opts = [
+     cfg.StrOpt('matchmaker_ringfile',
+                default='/etc/nova/matchmaker_ring.json',
+                help='Matchmaker ring file (JSON)'),
++    cfg.IntOpt('matchmaker_heartbeat_freq',
++               default=300,
++               help='Heartbeat frequency'),
++    cfg.IntOpt('matchmaker_heartbeat_ttl',
++               default=600,
++               help='Heartbeat time-to-live.'),
+ ]
+ 
+ CONF = cfg.CONF
+@@ -70,12 +77,73 @@ class Binding(object):
+ 
+ 
+ class MatchMakerBase(object):
+-    """Match Maker Base Class."""
+-
++    """
++    Match Maker Base Class.
++    Build off HeartbeatMatchMakerBase if building a
++    heartbeat-capable MatchMaker.
++    """
+     def __init__(self):
+         # Array of tuples. Index [2] toggles negation, [3] is last-if-true
+         self.bindings = []
+ 
++        self.no_heartbeat_msg = _('Matchmaker does not implement '
++                                  'registration or heartbeat.')
++
++    def register(self, key, host):
++        """
++        Register a host on a backend.
++        Heartbeats, if applicable, may keepalive registration.
++        """
++        pass
++
++    def ack_alive(self, key, host):
++        """
++        Acknowledge that a key.host is alive.
++        Used internally for updating heartbeats,
++        but may also be used publically to acknowledge
++        a system is alive (i.e. rpc message successfully
++        sent to host)
++        """
++        pass
++
++    def is_alive(self, topic, host):
++        """
++        Checks if a host is alive.
++        """
++        pass
++
++    def expire(self, topic, host):
++        """
++        Explicitly expire a host's registration.
++        """
++        pass
++
++    def send_heartbeats(self):
++        """
++        Send all heartbeats.
++        Use start_heartbeat to spawn a heartbeat greenthread,
++        which loops this method.
++        """
++        pass
++
++    def unregister(self, key, host):
++        """
++        Unregister a topic.
++        """
++        pass
++
++    def start_heartbeat(self):
++        """
++        Spawn heartbeat greenthread.
++        """
++        pass
++
++    def stop_heartbeat(self):
++        """
++        Destroys the heartbeat greenthread.
++        """
++        pass
++
+     def add_binding(self, binding, rule, last=True):
+         self.bindings.append((binding, rule, False, last))
+ 
+@@ -99,6 +167,103 @@ class MatchMakerBase(object):
+         return workers
+ 
+ 
++class HeartbeatMatchMakerBase(MatchMakerBase):
++    """
++    Base for a heart-beat capable MatchMaker.
++    Provides common methods for registering,
++    unregistering, and maintaining heartbeats.
++    """
++    def __init__(self):
++        self.hosts = set()
++        self._heart = None
++        self.host_topic = {}
++
++        super(HeartbeatMatchMakerBase, self).__init__()
++
++    def send_heartbeats(self):
++        """
++        Send all heartbeats.
++        Use start_heartbeat to spawn a heartbeat greenthread,
++        which loops this method.
++        """
++        for key, host in self.host_topic:
++            self.ack_alive(key, host)
++
++    def ack_alive(self, key, host):
++        """
++        Acknowledge that a host.topic is alive.
++        Used internally for updating heartbeats,
++        but may also be used publically to acknowledge
++        a system is alive (i.e. rpc message successfully
++        sent to host)
++        """
++        raise NotImplementedError("Must implement ack_alive")
++
++    def backend_register(self, key, host):
++        """
++        Implements registration logic.
++        Called by register(self,key,host)
++        """
++        raise NotImplementedError("Must implement backend_register")
++
++    def backend_unregister(self, key, key_host):
++        """
++        Implements de-registration logic.
++        Called by unregister(self,key,host)
++        """
++        raise NotImplementedError("Must implement backend_unregister")
++
++    def register(self, key, host):
++        """
++        Register a host on a backend.
++        Heartbeats, if applicable, may keepalive registration.
++        """
++        self.hosts.add(host)
++        self.host_topic[(key, host)] = host
++        key_host = '.'.join((key, host))
++
++        self.backend_register(key, key_host)
++
++        self.ack_alive(key, host)
++
++    def unregister(self, key, host):
++        """
++        Unregister a topic.
++        """
++        if (key, host) in self.host_topic:
++            del self.host_topic[(key, host)]
++
++        self.hosts.discard(host)
++        self.backend_unregister(key, '.'.join((key, host)))
++
++        LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
++
++    def start_heartbeat(self):
++        """
++        Implementation of MatchMakerBase.start_heartbeat
++        Launches greenthread looping send_heartbeats(),
++        yielding for CONF.matchmaker_heartbeat_freq seconds
++        between iterations.
++        """
++        if len(self.hosts) == 0:
++            raise MatchMakerException(
++                _("Register before starting heartbeat."))
++
++        def do_heartbeat():
++            while True:
++                self.send_heartbeats()
++                eventlet.sleep(CONF.matchmaker_heartbeat_freq)
++
++        self._heart = eventlet.spawn(do_heartbeat)
++
++    def stop_heartbeat(self):
++        """
++        Destroys the heartbeat greenthread.
++        """
++        if self._heart:
++            self._heart.kill()
++
++
+ class DirectBinding(Binding):
+     """
+     Specifies a host in the key via a '.' character
+@@ -202,24 +367,25 @@ class FanoutRingExchange(RingExchange):
+ 
+ class LocalhostExchange(Exchange):
+     """Exchange where all direct topics are local."""
+-    def __init__(self):
++    def __init__(self, host='localhost'):
++        self.host = host
+         super(Exchange, self).__init__()
+ 
+     def run(self, key):
+-        return [(key.split('.')[0] + '.localhost', 'localhost')]
++        return [('.'.join((key.split('.')[0], self.host)), self.host)]
+ 
+ 
+ class DirectExchange(Exchange):
+     """
+     Exchange where all topic keys are split, sending to second half.
+-    i.e. "compute.host" sends a message to "compute" running on "host"
++    i.e. "compute.host" sends a message to "compute.host" running on "host"
+     """
+     def __init__(self):
+         super(Exchange, self).__init__()
+ 
+     def run(self, key):
+-        b, e = key.split('.', 1)
+-        return [(b, e)]
++        e = key.split('.', 1)[1]
++        return [(key, e)]
+ 
+ 
+ class MatchMakerRing(MatchMakerBase):
+@@ -238,11 +404,11 @@ class MatchMakerLocalhost(MatchMakerBase):
+     Match Maker where all bare topics resolve to localhost.
+     Useful for testing.
+     """
+-    def __init__(self):
++    def __init__(self, host='localhost'):
+         super(MatchMakerLocalhost, self).__init__()
+-        self.add_binding(FanoutBinding(), LocalhostExchange())
++        self.add_binding(FanoutBinding(), LocalhostExchange(host))
+         self.add_binding(DirectBinding(), DirectExchange())
+-        self.add_binding(TopicBinding(), LocalhostExchange())
++        self.add_binding(TopicBinding(), LocalhostExchange(host))
+ 
+ 
+ class MatchMakerStub(MatchMakerBase):
+-- 
+1.8.1.4
+
diff --git a/0003-avoid-code-path-causing-qpid-exchange-leaks.patch b/0003-avoid-code-path-causing-qpid-exchange-leaks.patch
new file mode 100644
index 0000000..9d67b51
--- /dev/null
+++ b/0003-avoid-code-path-causing-qpid-exchange-leaks.patch
@@ -0,0 +1,26 @@
+From 975c75e089327bd63f9fa5c934950244f0daae42 Mon Sep 17 00:00:00 2001
+From: =?UTF-8?q?P=C3=A1draig=20Brady?= <pbrady at redhat.com>
+Date: Fri, 21 Jun 2013 10:47:51 +0100
+Subject: [PATCH] avoid code path causing qpid exchange leaks
+
+Always assume amqp_rpc_single_reply_queue is True,
+so that the problematic code path is ignored.
+The issue is discussed at https://pad.lv/1178375
+---
+ heat/openstack/common/rpc/amqp.py |    3 ++-
+ 1 files changed, 2 insertions(+), 1 deletions(-)
+
+diff --git a/heat/openstack/common/rpc/amqp.py b/heat/openstack/common/rpc/amqp.py
+index d8e6ba0..01f0e1f 100644
+--- a/heat/openstack/common/rpc/amqp.py
++++ b/heat/openstack/common/rpc/amqp.py
+@@ -590,7 +590,8 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
+ 
+     # TODO(pekowski): Remove this flag and the code under the if clause
+     #                 in Havana.
+-    if not conf.amqp_rpc_single_reply_queue:
++    # (p-draigbrady): This clause is disabled to avoid qpid exchange leaks
++    if False and not conf.amqp_rpc_single_reply_queue:
+         conn = ConnectionContext(conf, connection_pool)
+         wait_msg = MulticallWaiter(conf, conn, timeout)
+         conn.declare_direct_consumer(msg_id, wait_msg)
diff --git a/openstack-heat.spec b/openstack-heat.spec
index f37b190..921c69d 100644
--- a/openstack-heat.spec
+++ b/openstack-heat.spec
@@ -5,7 +5,7 @@
 Name:		openstack-heat
 Summary:	OpenStack Orchestration (heat)
 Version:	2013.1.2
-Release:	2%{?dist}
+Release:	3%{?dist}
 License:	ASL 2.0
 Group:		System Environment/Base
 URL:		http://www.openstack.org
@@ -20,6 +20,8 @@ Source4:	openstack-heat-engine.service
 Source5:	openstack-heat-api-cloudwatch.service
 
 Patch0: switch-to-using-m2crypto.patch
+Patch1: 0002-Update-RPC-to-Oslo-incubator-2013.1.patch
+Patch2: 0003-avoid-code-path-causing-qpid-exchange-leaks.patch
 
 BuildArch: noarch
 BuildRequires: python2-devel
@@ -37,6 +39,8 @@ Requires: %{name}-cli = %{version}-%{release}
 %prep
 %setup -q -n heat-%{version}
 %patch0 -p1
+%patch1 -p1
+%patch2 -p1
 
 %build
 %{__python} setup.py build
@@ -283,6 +287,10 @@ Heat client tools accessible from the CLI
 %{_mandir}/man1/heat-watch.1.gz
 
 %changelog
+* Tue Jun 25 2013 Jeff Peeler <jpeeler at redhat.com> 2013.1.2-3
+- update RPC from Oslo
+- avoid qpid exchange leaks
+
 * Mon Jun 10 2013 Jeff Peeler <jpeeler at redhat.com> 2013.1.2-2
 - fixed m2crypto patch
 


More information about the scm-commits mailing list