[openstack-heat/f19] Add patches to update RPC and avoid qpid exchange leaks
Jeff Peeler
jpeeler at fedoraproject.org
Tue Jun 25 16:19:39 UTC 2013
commit e7c814b137918cd0a94ed9cc8e9d7a57315823e9
Author: Jeff Peeler <jpeeler at redhat.com>
Date: Tue Jun 25 12:17:10 2013 -0400
Add patches to update RPC and avoid qpid exchange leaks
Fixes rhbz#971572
0002-Update-RPC-to-Oslo-incubator-2013.1.patch | 1589 ++++++++++++++++++++
...oid-code-path-causing-qpid-exchange-leaks.patch | 26 +
openstack-heat.spec | 10 +-
3 files changed, 1624 insertions(+), 1 deletions(-)
---
diff --git a/0002-Update-RPC-to-Oslo-incubator-2013.1.patch b/0002-Update-RPC-to-Oslo-incubator-2013.1.patch
new file mode 100644
index 0000000..3fa0f4c
--- /dev/null
+++ b/0002-Update-RPC-to-Oslo-incubator-2013.1.patch
@@ -0,0 +1,1589 @@
+From 8dc18475174224ce312b86628d918daaa9265832 Mon Sep 17 00:00:00 2001
+From: Jeff Peeler <jpeeler at redhat.com>
+Date: Tue, 25 Jun 2013 11:22:26 -0400
+Subject: [PATCH] Update RPC to Oslo incubator 2013.1
+
+(removed bin/heat-rpc-zmq-receiver)
+---
+ heat/openstack/common/rpc/__init__.py | 60 +++++--
+ heat/openstack/common/rpc/amqp.py | 277 +++++++++++++++++++++++++++++---
+ heat/openstack/common/rpc/common.py | 34 +++-
+ heat/openstack/common/rpc/impl_fake.py | 2 +-
+ heat/openstack/common/rpc/impl_kombu.py | 37 ++++-
+ heat/openstack/common/rpc/impl_qpid.py | 73 +++++++--
+ heat/openstack/common/rpc/impl_zmq.py | 185 ++++++++++++++-------
+ heat/openstack/common/rpc/matchmaker.py | 186 +++++++++++++++++++--
+ 8 files changed, 728 insertions(+), 126 deletions(-)
+
+diff --git a/heat/openstack/common/rpc/__init__.py b/heat/openstack/common/rpc/__init__.py
+index 4158e50..9fd5910 100644
+--- a/heat/openstack/common/rpc/__init__.py
++++ b/heat/openstack/common/rpc/__init__.py
+@@ -25,9 +25,17 @@ For some wrappers that add message versioning to rpc, see:
+ rpc.proxy
+ """
+
++import inspect
++import logging
++
+ from oslo.config import cfg
+
++from heat.openstack.common.gettextutils import _
+ from heat.openstack.common import importutils
++from heat.openstack.common import local
++
++
++LOG = logging.getLogger(__name__)
+
+
+ rpc_opts = [
+@@ -63,7 +71,8 @@ rpc_opts = [
+ help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
+ ]
+
+-cfg.CONF.register_opts(rpc_opts)
++CONF = cfg.CONF
++CONF.register_opts(rpc_opts)
+
+
+ def set_defaults(control_exchange):
+@@ -84,10 +93,27 @@ def create_connection(new=True):
+
+ :returns: An instance of openstack.common.rpc.common.Connection
+ """
+- return _get_impl().create_connection(cfg.CONF, new=new)
++ return _get_impl().create_connection(CONF, new=new)
++
++
++def _check_for_lock():
++ if not CONF.debug:
++ return None
++
++ if ((hasattr(local.strong_store, 'locks_held')
++ and local.strong_store.locks_held)):
++ stack = ' :: '.join([frame[3] for frame in inspect.stack()])
++ LOG.warn(_('A RPC is being made while holding a lock. The locks '
++ 'currently held are %(locks)s. This is probably a bug. '
++ 'Please report it. Include the following: [%(stack)s].'),
++ {'locks': local.strong_store.locks_held,
++ 'stack': stack})
++ return True
++
++ return False
+
+
+-def call(context, topic, msg, timeout=None):
++def call(context, topic, msg, timeout=None, check_for_lock=False):
+ """Invoke a remote method that returns something.
+
+ :param context: Information that identifies the user that has made this
+@@ -101,13 +127,17 @@ def call(context, topic, msg, timeout=None):
+ "args" : dict_of_kwargs }
+ :param timeout: int, number of seconds to use for a response timeout.
+ If set, this overrides the rpc_response_timeout option.
++ :param check_for_lock: if True, a warning is emitted if a RPC call is made
++ with a lock held.
+
+ :returns: A dict from the remote method.
+
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
+ """
+- return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
++ if check_for_lock:
++ _check_for_lock()
++ return _get_impl().call(CONF, context, topic, msg, timeout)
+
+
+ def cast(context, topic, msg):
+@@ -125,7 +155,7 @@ def cast(context, topic, msg):
+
+ :returns: None
+ """
+- return _get_impl().cast(cfg.CONF, context, topic, msg)
++ return _get_impl().cast(CONF, context, topic, msg)
+
+
+ def fanout_cast(context, topic, msg):
+@@ -146,10 +176,10 @@ def fanout_cast(context, topic, msg):
+
+ :returns: None
+ """
+- return _get_impl().fanout_cast(cfg.CONF, context, topic, msg)
++ return _get_impl().fanout_cast(CONF, context, topic, msg)
+
+
+-def multicall(context, topic, msg, timeout=None):
++def multicall(context, topic, msg, timeout=None, check_for_lock=False):
+ """Invoke a remote method and get back an iterator.
+
+ In this case, the remote method will be returning multiple values in
+@@ -167,6 +197,8 @@ def multicall(context, topic, msg, timeout=None):
+ "args" : dict_of_kwargs }
+ :param timeout: int, number of seconds to use for a response timeout.
+ If set, this overrides the rpc_response_timeout option.
++ :param check_for_lock: if True, a warning is emitted if a RPC call is made
++ with a lock held.
+
+ :returns: An iterator. The iterator will yield a tuple (N, X) where N is
+ an index that starts at 0 and increases by one for each value
+@@ -176,7 +208,9 @@ def multicall(context, topic, msg, timeout=None):
+ :raises: openstack.common.rpc.common.Timeout if a complete response
+ is not received before the timeout is reached.
+ """
+- return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
++ if check_for_lock:
++ _check_for_lock()
++ return _get_impl().multicall(CONF, context, topic, msg, timeout)
+
+
+ def notify(context, topic, msg, envelope=False):
+@@ -218,7 +252,7 @@ def cast_to_server(context, server_params, topic, msg):
+
+ :returns: None
+ """
+- return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic,
++ return _get_impl().cast_to_server(CONF, context, server_params, topic,
+ msg)
+
+
+@@ -234,7 +268,7 @@ def fanout_cast_to_server(context, server_params, topic, msg):
+
+ :returns: None
+ """
+- return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params,
++ return _get_impl().fanout_cast_to_server(CONF, context, server_params,
+ topic, msg)
+
+
+@@ -264,10 +298,10 @@ def _get_impl():
+ global _RPCIMPL
+ if _RPCIMPL is None:
+ try:
+- _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend)
++ _RPCIMPL = importutils.import_module(CONF.rpc_backend)
+ except ImportError:
+ # For backwards compatibility with older nova config.
+- impl = cfg.CONF.rpc_backend.replace('nova.rpc',
+- 'nova.openstack.common.rpc')
++ impl = CONF.rpc_backend.replace('nova.rpc',
++ 'nova.openstack.common.rpc')
+ _RPCIMPL = importutils.import_module(impl)
+ return _RPCIMPL
+diff --git a/heat/openstack/common/rpc/amqp.py b/heat/openstack/common/rpc/amqp.py
+index 9d69dbb..e9d1bea 100644
+--- a/heat/openstack/common/rpc/amqp.py
++++ b/heat/openstack/common/rpc/amqp.py
+@@ -25,13 +25,19 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
+ AMQP, but is deprecated and predates this code.
+ """
+
++import collections
+ import inspect
+ import sys
+ import uuid
+
+ from eventlet import greenpool
+ from eventlet import pools
++from eventlet import queue
+ from eventlet import semaphore
++# TODO(pekowsk): Remove import cfg and below comment in Havana.
++# This import should no longer be needed when the amqp_rpc_single_reply_queue
++# option is removed.
++from oslo.config import cfg
+
+ from heat.openstack.common import excutils
+ from heat.openstack.common.gettextutils import _
+@@ -40,6 +46,17 @@ from heat.openstack.common import log as logging
+ from heat.openstack.common.rpc import common as rpc_common
+
+
++# TODO(pekowski): Remove this option in Havana.
++amqp_opts = [
++ cfg.BoolOpt('amqp_rpc_single_reply_queue',
++ default=False,
++ help='Enable a fast single reply queue if using AMQP based '
++ 'RPC like RabbitMQ or Qpid.'),
++]
++
++cfg.CONF.register_opts(amqp_opts)
++
++UNIQUE_ID = '_unique_id'
+ LOG = logging.getLogger(__name__)
+
+
+@@ -51,6 +68,7 @@ class Pool(pools.Pool):
+ kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
+ kwargs.setdefault("order_as_stack", True)
+ super(Pool, self).__init__(*args, **kwargs)
++ self.reply_proxy = None
+
+ # TODO(comstud): Timeout connections not used in a while
+ def create(self):
+@@ -60,6 +78,16 @@ class Pool(pools.Pool):
+ def empty(self):
+ while self.free_items:
+ self.get().close()
++ # Force a new connection pool to be created.
++ # Note that this was added due to failing unit test cases. The issue
++ # is the above "while loop" gets all the cached connections from the
++ # pool and closes them, but never returns them to the pool, a pool
++ # leak. The unit tests hang waiting for an item to be returned to the
++ # pool. The unit tests get here via the teatDown() method. In the run
++ # time code, it gets here via cleanup() and only appears in service.py
++ # just before doing a sys.exit(), so cleanup() only happens once and
++ # the leakage is not a problem.
++ self.connection_cls.pool = None
+
+
+ _pool_create_sem = semaphore.Semaphore()
+@@ -137,6 +165,12 @@ class ConnectionContext(rpc_common.Connection):
+ def create_worker(self, topic, proxy, pool_name):
+ self.connection.create_worker(topic, proxy, pool_name)
+
++ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
++ self.connection.join_consumer_pool(callback,
++ pool_name,
++ topic,
++ exchange_name)
++
+ def consume_in_thread(self):
+ self.connection.consume_in_thread()
+
+@@ -148,8 +182,45 @@ class ConnectionContext(rpc_common.Connection):
+ raise rpc_common.InvalidRPCConnectionReuse()
+
+
+-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+- ending=False, log_failure=True):
++class ReplyProxy(ConnectionContext):
++ """ Connection class for RPC replies / callbacks """
++ def __init__(self, conf, connection_pool):
++ self._call_waiters = {}
++ self._num_call_waiters = 0
++ self._num_call_waiters_wrn_threshhold = 10
++ self._reply_q = 'reply_' + uuid.uuid4().hex
++ super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
++ self.declare_direct_consumer(self._reply_q, self._process_data)
++ self.consume_in_thread()
++
++ def _process_data(self, message_data):
++ msg_id = message_data.pop('_msg_id', None)
++ waiter = self._call_waiters.get(msg_id)
++ if not waiter:
++ LOG.warn(_('no calling threads waiting for msg_id : %s'
++ ', message : %s') % (msg_id, message_data))
++ else:
++ waiter.put(message_data)
++
++ def add_call_waiter(self, waiter, msg_id):
++ self._num_call_waiters += 1
++ if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
++ LOG.warn(_('Number of call waiters is greater than warning '
++ 'threshhold: %d. There could be a MulticallProxyWaiter '
++ 'leak.') % self._num_call_waiters_wrn_threshhold)
++ self._num_call_waiters_wrn_threshhold *= 2
++ self._call_waiters[msg_id] = waiter
++
++ def del_call_waiter(self, msg_id):
++ self._num_call_waiters -= 1
++ del self._call_waiters[msg_id]
++
++ def get_reply_q(self):
++ return self._reply_q
++
++
++def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
++ failure=None, ending=False, log_failure=True):
+ """Sends a reply or an error on the channel signified by msg_id.
+
+ Failure should be a sys.exc_info() tuple.
+@@ -168,13 +239,22 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
+ 'failure': failure}
+ if ending:
+ msg['ending'] = True
+- conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
++ _add_unique_id(msg)
++ # If a reply_q exists, add the msg_id to the reply and pass the
++ # reply_q to direct_send() to use it as the response queue.
++ # Otherwise use the msg_id for backward compatibilty.
++ if reply_q:
++ msg['_msg_id'] = msg_id
++ conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
++ else:
++ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+
+
+ class RpcContext(rpc_common.CommonRpcContext):
+ """Context that supports replying to a rpc.call"""
+ def __init__(self, **kwargs):
+ self.msg_id = kwargs.pop('msg_id', None)
++ self.reply_q = kwargs.pop('reply_q', None)
+ self.conf = kwargs.pop('conf')
+ super(RpcContext, self).__init__(**kwargs)
+
+@@ -182,13 +262,14 @@ class RpcContext(rpc_common.CommonRpcContext):
+ values = self.to_dict()
+ values['conf'] = self.conf
+ values['msg_id'] = self.msg_id
++ values['reply_q'] = self.reply_q
+ return self.__class__(**values)
+
+ def reply(self, reply=None, failure=None, ending=False,
+ connection_pool=None, log_failure=True):
+ if self.msg_id:
+- msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
+- ending, log_failure)
++ msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
++ reply, failure, ending, log_failure)
+ if ending:
+ self.msg_id = None
+
+@@ -204,6 +285,7 @@ def unpack_context(conf, msg):
+ value = msg.pop(key)
+ context_dict[key[9:]] = value
+ context_dict['msg_id'] = msg.pop('_msg_id', None)
++ context_dict['reply_q'] = msg.pop('_reply_q', None)
+ context_dict['conf'] = conf
+ ctx = RpcContext.from_dict(context_dict)
+ rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+@@ -224,15 +306,86 @@ def pack_context(msg, context):
+ msg.update(context_d)
+
+
+-class ProxyCallback(object):
+- """Calls methods on a proxy object based on method and args."""
++class _MsgIdCache(object):
++ """This class checks any duplicate messages."""
+
+- def __init__(self, conf, proxy, connection_pool):
+- self.proxy = proxy
++ # NOTE: This value is considered can be a configuration item, but
++ # it is not necessary to change its value in most cases,
++ # so let this value as static for now.
++ DUP_MSG_CHECK_SIZE = 16
++
++ def __init__(self, **kwargs):
++ self.prev_msgids = collections.deque([],
++ maxlen=self.DUP_MSG_CHECK_SIZE)
++
++ def check_duplicate_message(self, message_data):
++ """AMQP consumers may read same message twice when exceptions occur
++ before ack is returned. This method prevents doing it.
++ """
++ if UNIQUE_ID in message_data:
++ msg_id = message_data[UNIQUE_ID]
++ if msg_id not in self.prev_msgids:
++ self.prev_msgids.append(msg_id)
++ else:
++ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
++
++
++def _add_unique_id(msg):
++ """Add unique_id for checking duplicate messages."""
++ unique_id = uuid.uuid4().hex
++ msg.update({UNIQUE_ID: unique_id})
++ LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
++
++
++class _ThreadPoolWithWait(object):
++ """Base class for a delayed invocation manager used by
++ the Connection class to start up green threads
++ to handle incoming messages.
++ """
++
++ def __init__(self, conf, connection_pool):
+ self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
+ self.connection_pool = connection_pool
+ self.conf = conf
+
++ def wait(self):
++ """Wait for all callback threads to exit."""
++ self.pool.waitall()
++
++
++class CallbackWrapper(_ThreadPoolWithWait):
++ """Wraps a straight callback to allow it to be invoked in a green
++ thread.
++ """
++
++ def __init__(self, conf, callback, connection_pool):
++ """
++ :param conf: cfg.CONF instance
++ :param callback: a callable (probably a function)
++ :param connection_pool: connection pool as returned by
++ get_connection_pool()
++ """
++ super(CallbackWrapper, self).__init__(
++ conf=conf,
++ connection_pool=connection_pool,
++ )
++ self.callback = callback
++
++ def __call__(self, message_data):
++ self.pool.spawn_n(self.callback, message_data)
++
++
++class ProxyCallback(_ThreadPoolWithWait):
++ """Calls methods on a proxy object based on method and args."""
++
++ def __init__(self, conf, proxy, connection_pool):
++ super(ProxyCallback, self).__init__(
++ conf=conf,
++ connection_pool=connection_pool,
++ )
++ self.proxy = proxy
++ self.msg_id_cache = _MsgIdCache()
++
+ def __call__(self, message_data):
+ """Consumer callback to call a method on a proxy object.
+
+@@ -251,6 +404,7 @@ class ProxyCallback(object):
+ if hasattr(local.store, 'context'):
+ del local.store.context
+ rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
++ self.msg_id_cache.check_duplicate_message(message_data)
+ ctxt = unpack_context(self.conf, message_data)
+ method = message_data.get('method')
+ args = message_data.get('args', {})
+@@ -289,15 +443,74 @@ class ProxyCallback(object):
+ connection_pool=self.connection_pool,
+ log_failure=False)
+ except Exception:
+- LOG.exception(_('Exception during message handling'))
+- ctxt.reply(None, sys.exc_info(),
+- connection_pool=self.connection_pool)
++ # sys.exc_info() is deleted by LOG.exception().
++ exc_info = sys.exc_info()
++ LOG.error(_('Exception during message handling'),
++ exc_info=exc_info)
++ ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
++
++
++class MulticallProxyWaiter(object):
++ def __init__(self, conf, msg_id, timeout, connection_pool):
++ self._msg_id = msg_id
++ self._timeout = timeout or conf.rpc_response_timeout
++ self._reply_proxy = connection_pool.reply_proxy
++ self._done = False
++ self._got_ending = False
++ self._conf = conf
++ self._dataqueue = queue.LightQueue()
++ # Add this caller to the reply proxy's call_waiters
++ self._reply_proxy.add_call_waiter(self, self._msg_id)
++ self.msg_id_cache = _MsgIdCache()
+
+- def wait(self):
+- """Wait for all callback threads to exit."""
+- self.pool.waitall()
++ def put(self, data):
++ self._dataqueue.put(data)
++
++ def done(self):
++ if self._done:
++ return
++ self._done = True
++ # Remove this caller from reply proxy's call_waiters
++ self._reply_proxy.del_call_waiter(self._msg_id)
+
++ def _process_data(self, data):
++ result = None
++ self.msg_id_cache.check_duplicate_message(data)
++ if data['failure']:
++ failure = data['failure']
++ result = rpc_common.deserialize_remote_exception(self._conf,
++ failure)
++ elif data.get('ending', False):
++ self._got_ending = True
++ else:
++ result = data['result']
++ return result
+
++ def __iter__(self):
++ """Return a result until we get a reply with an 'ending" flag"""
++ if self._done:
++ raise StopIteration
++ while True:
++ try:
++ data = self._dataqueue.get(timeout=self._timeout)
++ result = self._process_data(data)
++ except queue.Empty:
++ LOG.exception(_('Timed out waiting for RPC response.'))
++ self.done()
++ raise rpc_common.Timeout()
++ except Exception:
++ with excutils.save_and_reraise_exception():
++ self.done()
++ if self._got_ending:
++ self.done()
++ raise StopIteration
++ if isinstance(result, Exception):
++ self.done()
++ raise result
++ yield result
++
++
++#TODO(pekowski): Remove MulticallWaiter() in Havana.
+ class MulticallWaiter(object):
+ def __init__(self, conf, connection, timeout):
+ self._connection = connection
+@@ -307,6 +520,7 @@ class MulticallWaiter(object):
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
++ self.msg_id_cache = _MsgIdCache()
+
+ def done(self):
+ if self._done:
+@@ -318,6 +532,7 @@ class MulticallWaiter(object):
+
+ def __call__(self, data):
+ """The consume() callback will call this. Store the result."""
++ self.msg_id_cache.check_duplicate_message(data)
+ if data['failure']:
+ failure = data['failure']
+ self._result = rpc_common.deserialize_remote_exception(self._conf,
+@@ -353,22 +568,41 @@ def create_connection(conf, new, connection_pool):
+ return ConnectionContext(conf, connection_pool, pooled=not new)
+
+
++_reply_proxy_create_sem = semaphore.Semaphore()
++
++
+ def multicall(conf, context, topic, msg, timeout, connection_pool):
+ """Make a call that returns multiple times."""
++ # TODO(pekowski): Remove all these comments in Havana.
++ # For amqp_rpc_single_reply_queue = False,
+ # Can't use 'with' for multicall, as it returns an iterator
+ # that will continue to use the connection. When it's done,
+ # connection.close() will get called which will put it back into
+ # the pool
++ # For amqp_rpc_single_reply_queue = True,
++ # The 'with' statement is mandatory for closing the connection
+ LOG.debug(_('Making synchronous call on %s ...'), topic)
+ msg_id = uuid.uuid4().hex
+ msg.update({'_msg_id': msg_id})
+ LOG.debug(_('MSG_ID is %s') % (msg_id))
++ _add_unique_id(msg)
+ pack_context(msg, context)
+
+- conn = ConnectionContext(conf, connection_pool)
+- wait_msg = MulticallWaiter(conf, conn, timeout)
+- conn.declare_direct_consumer(msg_id, wait_msg)
+- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
++ # TODO(pekowski): Remove this flag and the code under the if clause
++ # in Havana.
++ if not conf.amqp_rpc_single_reply_queue:
++ conn = ConnectionContext(conf, connection_pool)
++ wait_msg = MulticallWaiter(conf, conn, timeout)
++ conn.declare_direct_consumer(msg_id, wait_msg)
++ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
++ else:
++ with _reply_proxy_create_sem:
++ if not connection_pool.reply_proxy:
++ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
++ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
++ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
++ with ConnectionContext(conf, connection_pool) as conn:
++ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ return wait_msg
+
+
+@@ -385,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
+ def cast(conf, context, topic, msg, connection_pool):
+ """Sends a message on a topic without waiting for a response."""
+ LOG.debug(_('Making asynchronous cast on %s...'), topic)
++ _add_unique_id(msg)
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, rpc_common.serialize_msg(msg))
+@@ -393,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool):
+ def fanout_cast(conf, context, topic, msg, connection_pool):
+ """Sends a message on a fanout exchange without waiting for a response."""
+ LOG.debug(_('Making asynchronous fanout cast...'))
++ _add_unique_id(msg)
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg))
+@@ -400,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
+
+ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
+ """Sends a message on a topic to a specific server."""
++ _add_unique_id(msg)
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool, pooled=False,
+ server_params=server_params) as conn:
+@@ -409,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
+ def fanout_cast_to_server(conf, context, server_params, topic, msg,
+ connection_pool):
+ """Sends a message on a fanout exchange to a specific server."""
++ _add_unique_id(msg)
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool, pooled=False,
+ server_params=server_params) as conn:
+@@ -420,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
+ LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+ dict(event_type=msg.get('event_type'),
+ topic=topic))
++ _add_unique_id(msg)
+ pack_context(msg, context)
+ with ConnectionContext(conf, connection_pool) as conn:
+ if envelope:
+diff --git a/heat/openstack/common/rpc/common.py b/heat/openstack/common/rpc/common.py
+index ed5a9d2..3a59670 100644
+--- a/heat/openstack/common/rpc/common.py
++++ b/heat/openstack/common/rpc/common.py
+@@ -49,8 +49,8 @@ deserialize_msg().
+ The current message format (version 2.0) is very simple. It is:
+
+ {
+- 'heat.version': <RPC Envelope Version as a String>,
+- 'heat.message': <Application Message Payload, JSON encoded>
++ 'oslo.version': <RPC Envelope Version as a String>,
++ 'oslo.message': <Application Message Payload, JSON encoded>
+ }
+
+ Message format version '1.0' is just considered to be the messages we sent
+@@ -66,8 +66,8 @@ to the messaging libraries as a dict.
+ '''
+ _RPC_ENVELOPE_VERSION = '2.0'
+
+-_VERSION_KEY = 'heat.version'
+-_MESSAGE_KEY = 'heat.message'
++_VERSION_KEY = 'oslo.version'
++_MESSAGE_KEY = 'oslo.message'
+
+
+ # TODO(russellb) Turn this on after Grizzly.
+@@ -125,6 +125,10 @@ class Timeout(RPCException):
+ message = _("Timeout while waiting on RPC response.")
+
+
++class DuplicateMessageError(RPCException):
++ message = _("Found duplicate message(%(msg_id)s). Skipping it.")
++
++
+ class InvalidRPCConnectionReuse(RPCException):
+ message = _("Invalid reuse of an RPC connection.")
+
+@@ -197,6 +201,28 @@ class Connection(object):
+ """
+ raise NotImplementedError()
+
++ def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
++ """Register as a member of a group of consumers for a given topic from
++ the specified exchange.
++
++ Exactly one member of a given pool will receive each message.
++
++ A message will be delivered to multiple pools, if more than
++ one is created.
++
++ :param callback: Callable to be invoked for each message.
++ :type callback: callable accepting one argument
++ :param pool_name: The name of the consumer pool.
++ :type pool_name: str
++ :param topic: The routing topic for desired messages.
++ :type topic: str
++ :param exchange_name: The name of the message exchange where
++ the client should attach. Defaults to
++ the configured exchange.
++ :type exchange_name: str
++ """
++ raise NotImplementedError()
++
+ def consume_in_thread(self):
+ """Spawn a thread to handle incoming messages.
+
+diff --git a/heat/openstack/common/rpc/impl_fake.py b/heat/openstack/common/rpc/impl_fake.py
+index 6c2e130..94597b4 100644
+--- a/heat/openstack/common/rpc/impl_fake.py
++++ b/heat/openstack/common/rpc/impl_fake.py
+@@ -1,6 +1,6 @@
+ # vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+-# Copyright 2011 OpenStack LLC
++# Copyright 2011 OpenStack Foundation
+ #
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
+ # not use this file except in compliance with the License. You may obtain
+diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py
+index 360b78b..886fbcf 100644
+--- a/heat/openstack/common/rpc/impl_kombu.py
++++ b/heat/openstack/common/rpc/impl_kombu.py
+@@ -1,6 +1,6 @@
+ # vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+-# Copyright 2011 OpenStack LLC
++# Copyright 2011 OpenStack Foundation
+ #
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
+ # not use this file except in compliance with the License. You may obtain
+@@ -66,7 +66,8 @@ kombu_opts = [
+ help='the RabbitMQ userid'),
+ cfg.StrOpt('rabbit_password',
+ default='guest',
+- help='the RabbitMQ password'),
++ help='the RabbitMQ password',
++ secret=True),
+ cfg.StrOpt('rabbit_virtual_host',
+ default='/',
+ help='the RabbitMQ virtual host'),
+@@ -164,9 +165,10 @@ class ConsumerBase(object):
+ try:
+ msg = rpc_common.deserialize_msg(message.payload)
+ callback(msg)
+- message.ack()
+ except Exception:
+ LOG.exception(_("Failed to process message... skipping it."))
++ finally:
++ message.ack()
+
+ self.queue.consume(*args, callback=_callback, **options)
+
+@@ -196,6 +198,7 @@ class DirectConsumer(ConsumerBase):
+ """
+ # Default options
+ options = {'durable': False,
++ 'queue_arguments': _get_queue_arguments(conf),
+ 'auto_delete': True,
+ 'exclusive': False}
+ options.update(kwargs)
+@@ -621,8 +624,8 @@ class Connection(object):
+
+ def _error_callback(exc):
+ if isinstance(exc, socket.timeout):
+- LOG.exception(_('Timed out waiting for RPC response: %s') %
+- str(exc))
++ LOG.debug(_('Timed out waiting for RPC response: %s') %
++ str(exc))
+ raise rpc_common.Timeout()
+ else:
+ LOG.exception(_('Failed to consume message from queue: %s') %
+@@ -749,6 +752,30 @@ class Connection(object):
+ self.proxy_callbacks.append(proxy_cb)
+ self.declare_topic_consumer(topic, proxy_cb, pool_name)
+
++ def join_consumer_pool(self, callback, pool_name, topic,
++ exchange_name=None):
++ """Register as a member of a group of consumers for a given topic from
++ the specified exchange.
++
++ Exactly one member of a given pool will receive each message.
++
++ A message will be delivered to multiple pools, if more than
++ one is created.
++ """
++ callback_wrapper = rpc_amqp.CallbackWrapper(
++ conf=self.conf,
++ callback=callback,
++ connection_pool=rpc_amqp.get_connection_pool(self.conf,
++ Connection),
++ )
++ self.proxy_callbacks.append(callback_wrapper)
++ self.declare_topic_consumer(
++ queue_name=pool_name,
++ topic=topic,
++ exchange_name=exchange_name,
++ callback=callback_wrapper,
++ )
++
+
+ def create_connection(conf, new=True):
+ """Create a connection"""
+diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py
+index 3acfe69..88c531a 100644
+--- a/heat/openstack/common/rpc/impl_qpid.py
++++ b/heat/openstack/common/rpc/impl_qpid.py
+@@ -1,6 +1,6 @@
+ # vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+-# Copyright 2011 OpenStack LLC
++# Copyright 2011 OpenStack Foundation
+ # Copyright 2011 - 2012, Red Hat, Inc.
+ #
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
+@@ -40,8 +40,8 @@ qpid_opts = [
+ cfg.StrOpt('qpid_hostname',
+ default='localhost',
+ help='Qpid broker hostname'),
+- cfg.StrOpt('qpid_port',
+- default='5672',
++ cfg.IntOpt('qpid_port',
++ default=5672,
+ help='Qpid broker port'),
+ cfg.ListOpt('qpid_hosts',
+ default=['$qpid_hostname:$qpid_port'],
+@@ -51,7 +51,8 @@ qpid_opts = [
+ help='Username for qpid connection'),
+ cfg.StrOpt('qpid_password',
+ default='',
+- help='Password for qpid connection'),
++ help='Password for qpid connection',
++ secret=True),
+ cfg.StrOpt('qpid_sasl_mechanisms',
+ default='',
+ help='Space separated list of SASL mechanisms to use for auth'),
+@@ -68,6 +69,8 @@ qpid_opts = [
+
+ cfg.CONF.register_opts(qpid_opts)
+
++JSON_CONTENT_TYPE = 'application/json; charset=utf8'
++
+
+ class ConsumerBase(object):
+ """Consumer base class."""
+@@ -122,10 +125,27 @@ class ConsumerBase(object):
+ self.receiver = session.receiver(self.address)
+ self.receiver.capacity = 1
+
++ def _unpack_json_msg(self, msg):
++ """Load the JSON data in msg if msg.content_type indicates that it
++ is necessary. Put the loaded data back into msg.content and
++ update msg.content_type appropriately.
++
++ A Qpid Message containing a dict will have a content_type of
++ 'amqp/map', whereas one containing a string that needs to be converted
++ back from JSON will have a content_type of JSON_CONTENT_TYPE.
++
++ :param msg: a Qpid Message object
++ :returns: None
++ """
++ if msg.content_type == JSON_CONTENT_TYPE:
++ msg.content = jsonutils.loads(msg.content)
++ msg.content_type = 'amqp/map'
++
+ def consume(self):
+ """Fetch the message and pass it to the callback object"""
+ message = self.receiver.fetch()
+ try:
++ self._unpack_json_msg(message)
+ msg = rpc_common.deserialize_msg(message.content)
+ self.callback(msg)
+ except Exception:
+@@ -330,15 +350,16 @@ class Connection(object):
+
+ def reconnect(self):
+ """Handles reconnecting and re-establishing sessions and queues"""
+- if self.connection.opened():
+- try:
+- self.connection.close()
+- except qpid_exceptions.ConnectionError:
+- pass
+-
+ attempt = 0
+ delay = 1
+ while True:
++ # Close the session if necessary
++ if self.connection.opened():
++ try:
++ self.connection.close()
++ except qpid_exceptions.ConnectionError:
++ pass
++
+ broker = self.brokers[attempt % len(self.brokers)]
+ attempt += 1
+
+@@ -414,8 +435,8 @@ class Connection(object):
+
+ def _error_callback(exc):
+ if isinstance(exc, qpid_exceptions.Empty):
+- LOG.exception(_('Timed out waiting for RPC response: %s') %
+- str(exc))
++ LOG.debug(_('Timed out waiting for RPC response: %s') %
++ str(exc))
+ raise rpc_common.Timeout()
+ else:
+ LOG.exception(_('Failed to consume message from queue: %s') %
+@@ -559,6 +580,34 @@ class Connection(object):
+
+ return consumer
+
++ def join_consumer_pool(self, callback, pool_name, topic,
++ exchange_name=None):
++ """Register as a member of a group of consumers for a given topic from
++ the specified exchange.
++
++ Exactly one member of a given pool will receive each message.
++
++ A message will be delivered to multiple pools, if more than
++ one is created.
++ """
++ callback_wrapper = rpc_amqp.CallbackWrapper(
++ conf=self.conf,
++ callback=callback,
++ connection_pool=rpc_amqp.get_connection_pool(self.conf,
++ Connection),
++ )
++ self.proxy_callbacks.append(callback_wrapper)
++
++ consumer = TopicConsumer(conf=self.conf,
++ session=self.session,
++ topic=topic,
++ callback=callback_wrapper,
++ name=pool_name,
++ exchange_name=exchange_name)
++
++ self._register_consumer(consumer)
++ return consumer
++
+
+ def create_connection(conf, new=True):
+ """Create a connection"""
+diff --git a/heat/openstack/common/rpc/impl_zmq.py b/heat/openstack/common/rpc/impl_zmq.py
+index b41c53d..316d92b 100644
+--- a/heat/openstack/common/rpc/impl_zmq.py
++++ b/heat/openstack/common/rpc/impl_zmq.py
+@@ -16,6 +16,7 @@
+
+ import os
+ import pprint
++import re
+ import socket
+ import sys
+ import types
+@@ -25,6 +26,7 @@ import eventlet
+ import greenlet
+ from oslo.config import cfg
+
++from heat.openstack.common import excutils
+ from heat.openstack.common.gettextutils import _
+ from heat.openstack.common import importutils
+ from heat.openstack.common import jsonutils
+@@ -89,10 +91,10 @@ def _serialize(data):
+ Error if a developer passes us bad data.
+ """
+ try:
+- return str(jsonutils.dumps(data, ensure_ascii=True))
++ return jsonutils.dumps(data, ensure_ascii=True)
+ except TypeError:
+- LOG.error(_("JSON serialization failed."))
+- raise
++ with excutils.save_and_reraise_exception():
++ LOG.error(_("JSON serialization failed."))
+
+
+ def _deserialize(data):
+@@ -216,11 +218,18 @@ class ZmqClient(object):
+ socket_type = zmq.PUSH
+ self.outq = ZmqSocket(addr, socket_type, bind=bind)
+
+- def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+- if serialize:
+- data = rpc_common.serialize_msg(data, force_envelope)
+- self.outq.send([str(msg_id), str(topic), str('cast'),
+- _serialize(data)])
++ def cast(self, msg_id, topic, data, envelope=False):
++ msg_id = msg_id or 0
++
++ if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
++ self.outq.send(map(bytes,
++ (msg_id, topic, 'cast', _serialize(data))))
++ return
++
++ rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
++ zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
++ self.outq.send(map(bytes,
++ (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
+
+ def close(self):
+ self.outq.close()
+@@ -294,13 +303,13 @@ class InternalContext(object):
+ ctx.replies)
+
+ LOG.debug(_("Sending reply"))
+- cast(CONF, ctx, topic, {
++ _multi_send(_cast, ctx, topic, {
+ 'method': '-process_reply',
+ 'args': {
+- 'msg_id': msg_id,
++ 'msg_id': msg_id, # Include for Folsom compat.
+ 'response': response
+ }
+- })
++ }, _msg_id=msg_id)
+
+
+ class ConsumerBase(object):
+@@ -319,7 +328,7 @@ class ConsumerBase(object):
+ else:
+ return [result]
+
+- def process(self, style, target, proxy, ctx, data):
++ def process(self, proxy, ctx, data):
+ data.setdefault('version', None)
+ data.setdefault('args', {})
+
+@@ -423,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor):
+
+ def __init__(self, conf):
+ super(ZmqProxy, self).__init__(conf)
++ pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
++ self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
+
+ self.topic_proxy = {}
+
+@@ -431,21 +442,15 @@ class ZmqProxy(ZmqBaseReactor):
+
+ #TODO(ewindisch): use zero-copy (i.e. references, not copying)
+ data = sock.recv()
+- msg_id, topic, style, in_msg = data
+- topic = topic.split('.', 1)[0]
++ topic = data[1]
+
+ LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+
+- # Handle zmq_replies magic
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
++ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
+ sock_type = zmq.PUB
+- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
+- msg_id = inside[-1]['args']['msg_id']
+- response = inside[-1]['args']['response']
+- LOG.debug(_("->response->%s"), response)
+- data = [str(msg_id), _serialize(response)]
+ else:
+ sock_type = zmq.PUSH
+
+@@ -454,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
+
+ try:
++ # The topic is received over the network,
++ # don't trust this input.
++ if self.badchars.search(topic) is not None:
++ emsg = _("Topic contained dangerous characters.")
++ LOG.warn(emsg)
++ raise RPCException(emsg)
++
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+@@ -510,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+- LOG.error(_("Could not create IPC directory %s") %
+- (ipc_dir, ))
+- raise
++ with excutils.save_and_reraise_exception():
++ LOG.error(_("Could not create IPC directory %s") %
++ (ipc_dir, ))
+
+ try:
+ self.register(consumption_proxy,
+@@ -520,13 +532,28 @@ class ZmqProxy(ZmqBaseReactor):
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+- LOG.error(_("Could not create ZeroMQ receiver daemon. "
+- "Socket may already be in use."))
+- raise
++ with excutils.save_and_reraise_exception():
++ LOG.error(_("Could not create ZeroMQ receiver daemon. "
++ "Socket may already be in use."))
+
+ super(ZmqProxy, self).consume_in_thread()
+
+
++def unflatten_envelope(packenv):
++ """Unflattens the RPC envelope.
++ Takes a list and returns a dictionary.
++ i.e. [1,2,3,4] => {1: 2, 3: 4}
++ """
++ i = iter(packenv)
++ h = {}
++ try:
++ while True:
++ k = i.next()
++ h[k] = i.next()
++ except StopIteration:
++ return h
++
++
+ class ZmqReactor(ZmqBaseReactor):
+ """
+ A consumer class implementing a
+@@ -547,38 +574,53 @@ class ZmqReactor(ZmqBaseReactor):
+ self.mapping[sock].send(data)
+ return
+
+- msg_id, topic, style, in_msg = data
++ proxy = self.proxies[sock]
+
+- ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
+- ctx = RpcContext.unmarshal(ctx)
++ if data[2] == 'cast': # Legacy protocol
++ packenv = data[3]
+
+- proxy = self.proxies[sock]
++ ctx, msg = _deserialize(packenv)
++ request = rpc_common.deserialize_msg(msg)
++ ctx = RpcContext.unmarshal(ctx)
++ elif data[2] == 'impl_zmq_v2':
++ packenv = data[4:]
+
+- self.pool.spawn_n(self.process, style, topic,
+- proxy, ctx, request)
++ msg = unflatten_envelope(packenv)
++ request = rpc_common.deserialize_msg(msg)
++
++ # Unmarshal only after verifying the message.
++ ctx = RpcContext.unmarshal(data[3])
++ else:
++ LOG.error(_("ZMQ Envelope version unsupported or unknown."))
++ return
++
++ self.pool.spawn_n(self.process, proxy, ctx, request)
+
+
+ class Connection(rpc_common.Connection):
+ """Manages connections and threads."""
+
+ def __init__(self, conf):
++ self.topics = []
+ self.reactor = ZmqReactor(conf)
+
+ def create_consumer(self, topic, proxy, fanout=False):
+- # Only consume on the base topic name.
+- topic = topic.split('.', 1)[0]
+-
+- LOG.info(_("Create Consumer for topic (%(topic)s)") %
+- {'topic': topic})
++ # Register with matchmaker.
++ _get_matchmaker().register(topic, CONF.rpc_zmq_host)
+
+ # Subscription scenarios
+ if fanout:
+- subscribe = ('', fanout)[type(fanout) == str]
+ sock_type = zmq.SUB
+- topic = 'fanout~' + topic
++ subscribe = ('', fanout)[type(fanout) == str]
++ topic = 'fanout~' + topic.split('.', 1)[0]
+ else:
+ sock_type = zmq.PULL
+ subscribe = None
++ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
++
++ if topic in self.topics:
++ LOG.info(_("Skipping topic registration. Already registered."))
++ return
+
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+@@ -589,19 +631,26 @@ class Connection(rpc_common.Connection):
+
+ self.reactor.register(proxy, inaddr, sock_type,
+ subscribe=subscribe, in_bind=False)
++ self.topics.append(topic)
+
+ def close(self):
++ _get_matchmaker().stop_heartbeat()
++ for topic in self.topics:
++ _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
++
+ self.reactor.close()
++ self.topics = []
+
+ def wait(self):
+ self.reactor.wait()
+
+ def consume_in_thread(self):
++ _get_matchmaker().start_heartbeat()
+ self.reactor.consume_in_thread()
+
+
+-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+- force_envelope=False):
++def _cast(addr, context, topic, msg, timeout=None, envelope=False,
++ _msg_id=None):
+ timeout_cast = timeout or CONF.rpc_cast_timeout
+ payload = [RpcContext.marshal(context), msg]
+
+@@ -610,7 +659,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ conn = ZmqClient(addr)
+
+ # assumes cast can't return an exception
+- conn.cast(msg_id, topic, payload, serialize, force_envelope)
++ conn.cast(_msg_id, topic, payload, envelope)
+ except zmq.ZMQError:
+ raise RPCException("Cast failed. ZMQ Socket Exception")
+ finally:
+@@ -618,8 +667,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ conn.close()
+
+
+-def _call(addr, context, msg_id, topic, msg, timeout=None,
+- serialize=True, force_envelope=False):
++def _call(addr, context, topic, msg, timeout=None,
++ envelope=False):
+ # timeout_response is how long we wait for a response
+ timeout = timeout or CONF.rpc_response_timeout
+
+@@ -649,23 +698,36 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
+ with Timeout(timeout, exception=rpc_common.Timeout):
+ try:
+ msg_waiter = ZmqSocket(
+- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
++ "ipc://%s/zmq_topic_zmq_replies.%s" %
++ (CONF.rpc_zmq_ipc_dir,
++ CONF.rpc_zmq_host),
+ zmq.SUB, subscribe=msg_id, bind=False
+ )
+
+ LOG.debug(_("Sending cast"))
+- _cast(addr, context, msg_id, topic, payload,
+- serialize=serialize, force_envelope=force_envelope)
++ _cast(addr, context, topic, payload, envelope)
+
+ LOG.debug(_("Cast sent; Waiting reply"))
+ # Blocks until receives reply
+ msg = msg_waiter.recv()
+ LOG.debug(_("Received message: %s"), msg)
+ LOG.debug(_("Unpacking response"))
+- responses = _deserialize(msg[-1])
++
++ if msg[2] == 'cast': # Legacy version
++ raw_msg = _deserialize(msg[-1])[-1]
++ elif msg[2] == 'impl_zmq_v2':
++ rpc_envelope = unflatten_envelope(msg[4:])
++ raw_msg = rpc_common.deserialize_msg(rpc_envelope)
++ else:
++ raise rpc_common.UnsupportedRpcEnvelopeVersion(
++ _("Unsupported or unknown ZMQ envelope returned."))
++
++ responses = raw_msg['args']['response']
+ # ZMQError trumps the Timeout error.
+ except zmq.ZMQError:
+ raise RPCException("ZMQ Socket Error")
++ except (IndexError, KeyError):
++ raise RPCException(_("RPC Message Invalid."))
+ finally:
+ if 'msg_waiter' in vars():
+ msg_waiter.close()
+@@ -681,8 +743,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
+ return responses[-1]
+
+
+-def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+- force_envelope=False):
++def _multi_send(method, context, topic, msg, timeout=None,
++ envelope=False, _msg_id=None):
+ """
+ Wraps the sending of messages,
+ dispatches to the matchmaker and sends
+@@ -699,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ LOG.warn(_("No matchmaker results. Not casting."))
+ # While not strictly a timeout, callers know how to handle
+ # this exception and a timeout isn't too big a lie.
+- raise rpc_common.Timeout, "No match from matchmaker."
++ raise rpc_common.Timeout(_("No match from matchmaker."))
+
+ # This supports brokerless fanout (addresses > 1)
+ for queue in queues:
+@@ -708,11 +770,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+
+ if method.__name__ == '_cast':
+ eventlet.spawn_n(method, _addr, context,
+- _topic, _topic, msg, timeout, serialize,
+- force_envelope)
++ _topic, msg, timeout, envelope,
++ _msg_id)
+ return
+- return method(_addr, context, _topic, _topic, msg, timeout,
+- serialize, force_envelope)
++ return method(_addr, context, _topic, msg, timeout,
++ envelope)
+
+
+ def create_connection(conf, new=True):
+@@ -742,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
+ _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+
+
+-def notify(conf, context, topic, msg, **kwargs):
++def notify(conf, context, topic, msg, envelope):
+ """
+ Send notification event.
+ Notifications are sent to topic-priority.
+@@ -750,10 +812,8 @@ def notify(conf, context, topic, msg, **kwargs):
+ """
+ # NOTE(ewindisch): dot-priority in rpc notifier does not
+ # work with our assumptions.
+- topic.replace('.', '-')
+- kwargs['serialize'] = kwargs.pop('envelope')
+- kwargs['force_envelope'] = True
+- cast(conf, context, topic, msg, **kwargs)
++ topic = topic.replace('.', '-')
++ cast(conf, context, topic, msg, envelope=envelope)
+
+
+ def cleanup():
+@@ -777,8 +837,9 @@ def _get_ctxt():
+ return ZMQ_CTX
+
+
+-def _get_matchmaker():
++def _get_matchmaker(*args, **kwargs):
+ global matchmaker
+ if not matchmaker:
+- matchmaker = importutils.import_object(CONF.rpc_zmq_matchmaker)
++ matchmaker = importutils.import_object(
++ CONF.rpc_zmq_matchmaker, *args, **kwargs)
+ return matchmaker
+diff --git a/heat/openstack/common/rpc/matchmaker.py b/heat/openstack/common/rpc/matchmaker.py
+index 88e540c..2d1df17 100644
+--- a/heat/openstack/common/rpc/matchmaker.py
++++ b/heat/openstack/common/rpc/matchmaker.py
+@@ -22,6 +22,7 @@ import contextlib
+ import itertools
+ import json
+
++import eventlet
+ from oslo.config import cfg
+
+ from heat.openstack.common.gettextutils import _
+@@ -33,6 +34,12 @@ matchmaker_opts = [
+ cfg.StrOpt('matchmaker_ringfile',
+ default='/etc/nova/matchmaker_ring.json',
+ help='Matchmaker ring file (JSON)'),
++ cfg.IntOpt('matchmaker_heartbeat_freq',
++ default=300,
++ help='Heartbeat frequency'),
++ cfg.IntOpt('matchmaker_heartbeat_ttl',
++ default=600,
++ help='Heartbeat time-to-live.'),
+ ]
+
+ CONF = cfg.CONF
+@@ -70,12 +77,73 @@ class Binding(object):
+
+
+ class MatchMakerBase(object):
+- """Match Maker Base Class."""
+-
++ """
++ Match Maker Base Class.
++ Build off HeartbeatMatchMakerBase if building a
++ heartbeat-capable MatchMaker.
++ """
+ def __init__(self):
+ # Array of tuples. Index [2] toggles negation, [3] is last-if-true
+ self.bindings = []
+
++ self.no_heartbeat_msg = _('Matchmaker does not implement '
++ 'registration or heartbeat.')
++
++ def register(self, key, host):
++ """
++ Register a host on a backend.
++ Heartbeats, if applicable, may keepalive registration.
++ """
++ pass
++
++ def ack_alive(self, key, host):
++ """
++ Acknowledge that a key.host is alive.
++ Used internally for updating heartbeats,
++ but may also be used publically to acknowledge
++ a system is alive (i.e. rpc message successfully
++ sent to host)
++ """
++ pass
++
++ def is_alive(self, topic, host):
++ """
++ Checks if a host is alive.
++ """
++ pass
++
++ def expire(self, topic, host):
++ """
++ Explicitly expire a host's registration.
++ """
++ pass
++
++ def send_heartbeats(self):
++ """
++ Send all heartbeats.
++ Use start_heartbeat to spawn a heartbeat greenthread,
++ which loops this method.
++ """
++ pass
++
++ def unregister(self, key, host):
++ """
++ Unregister a topic.
++ """
++ pass
++
++ def start_heartbeat(self):
++ """
++ Spawn heartbeat greenthread.
++ """
++ pass
++
++ def stop_heartbeat(self):
++ """
++ Destroys the heartbeat greenthread.
++ """
++ pass
++
+ def add_binding(self, binding, rule, last=True):
+ self.bindings.append((binding, rule, False, last))
+
+@@ -99,6 +167,103 @@ class MatchMakerBase(object):
+ return workers
+
+
++class HeartbeatMatchMakerBase(MatchMakerBase):
++ """
++ Base for a heart-beat capable MatchMaker.
++ Provides common methods for registering,
++ unregistering, and maintaining heartbeats.
++ """
++ def __init__(self):
++ self.hosts = set()
++ self._heart = None
++ self.host_topic = {}
++
++ super(HeartbeatMatchMakerBase, self).__init__()
++
++ def send_heartbeats(self):
++ """
++ Send all heartbeats.
++ Use start_heartbeat to spawn a heartbeat greenthread,
++ which loops this method.
++ """
++ for key, host in self.host_topic:
++ self.ack_alive(key, host)
++
++ def ack_alive(self, key, host):
++ """
++ Acknowledge that a host.topic is alive.
++ Used internally for updating heartbeats,
++ but may also be used publically to acknowledge
++ a system is alive (i.e. rpc message successfully
++ sent to host)
++ """
++ raise NotImplementedError("Must implement ack_alive")
++
++ def backend_register(self, key, host):
++ """
++ Implements registration logic.
++ Called by register(self,key,host)
++ """
++ raise NotImplementedError("Must implement backend_register")
++
++ def backend_unregister(self, key, key_host):
++ """
++ Implements de-registration logic.
++ Called by unregister(self,key,host)
++ """
++ raise NotImplementedError("Must implement backend_unregister")
++
++ def register(self, key, host):
++ """
++ Register a host on a backend.
++ Heartbeats, if applicable, may keepalive registration.
++ """
++ self.hosts.add(host)
++ self.host_topic[(key, host)] = host
++ key_host = '.'.join((key, host))
++
++ self.backend_register(key, key_host)
++
++ self.ack_alive(key, host)
++
++ def unregister(self, key, host):
++ """
++ Unregister a topic.
++ """
++ if (key, host) in self.host_topic:
++ del self.host_topic[(key, host)]
++
++ self.hosts.discard(host)
++ self.backend_unregister(key, '.'.join((key, host)))
++
++ LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
++
++ def start_heartbeat(self):
++ """
++ Implementation of MatchMakerBase.start_heartbeat
++ Launches greenthread looping send_heartbeats(),
++ yielding for CONF.matchmaker_heartbeat_freq seconds
++ between iterations.
++ """
++ if len(self.hosts) == 0:
++ raise MatchMakerException(
++ _("Register before starting heartbeat."))
++
++ def do_heartbeat():
++ while True:
++ self.send_heartbeats()
++ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
++
++ self._heart = eventlet.spawn(do_heartbeat)
++
++ def stop_heartbeat(self):
++ """
++ Destroys the heartbeat greenthread.
++ """
++ if self._heart:
++ self._heart.kill()
++
++
+ class DirectBinding(Binding):
+ """
+ Specifies a host in the key via a '.' character
+@@ -202,24 +367,25 @@ class FanoutRingExchange(RingExchange):
+
+ class LocalhostExchange(Exchange):
+ """Exchange where all direct topics are local."""
+- def __init__(self):
++ def __init__(self, host='localhost'):
++ self.host = host
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+- return [(key.split('.')[0] + '.localhost', 'localhost')]
++ return [('.'.join((key.split('.')[0], self.host)), self.host)]
+
+
+ class DirectExchange(Exchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+- i.e. "compute.host" sends a message to "compute" running on "host"
++ i.e. "compute.host" sends a message to "compute.host" running on "host"
+ """
+ def __init__(self):
+ super(Exchange, self).__init__()
+
+ def run(self, key):
+- b, e = key.split('.', 1)
+- return [(b, e)]
++ e = key.split('.', 1)[1]
++ return [(key, e)]
+
+
+ class MatchMakerRing(MatchMakerBase):
+@@ -238,11 +404,11 @@ class MatchMakerLocalhost(MatchMakerBase):
+ Match Maker where all bare topics resolve to localhost.
+ Useful for testing.
+ """
+- def __init__(self):
++ def __init__(self, host='localhost'):
+ super(MatchMakerLocalhost, self).__init__()
+- self.add_binding(FanoutBinding(), LocalhostExchange())
++ self.add_binding(FanoutBinding(), LocalhostExchange(host))
+ self.add_binding(DirectBinding(), DirectExchange())
+- self.add_binding(TopicBinding(), LocalhostExchange())
++ self.add_binding(TopicBinding(), LocalhostExchange(host))
+
+
+ class MatchMakerStub(MatchMakerBase):
+--
+1.8.1.4
+
diff --git a/0003-avoid-code-path-causing-qpid-exchange-leaks.patch b/0003-avoid-code-path-causing-qpid-exchange-leaks.patch
new file mode 100644
index 0000000..9d67b51
--- /dev/null
+++ b/0003-avoid-code-path-causing-qpid-exchange-leaks.patch
@@ -0,0 +1,26 @@
+From 975c75e089327bd63f9fa5c934950244f0daae42 Mon Sep 17 00:00:00 2001
+From: =?UTF-8?q?P=C3=A1draig=20Brady?= <pbrady at redhat.com>
+Date: Fri, 21 Jun 2013 10:47:51 +0100
+Subject: [PATCH] avoid code path causing qpid exchange leaks
+
+Always assume amqp_rpc_single_reply_queue is True,
+so that the problematic code path is ignored.
+The issue is discussed at https://pad.lv/1178375
+---
+ heat/openstack/common/rpc/amqp.py | 3 ++-
+ 1 files changed, 2 insertions(+), 1 deletions(-)
+
+diff --git a/heat/openstack/common/rpc/amqp.py b/heat/openstack/common/rpc/amqp.py
+index d8e6ba0..01f0e1f 100644
+--- a/heat/openstack/common/rpc/amqp.py
++++ b/heat/openstack/common/rpc/amqp.py
+@@ -590,7 +590,8 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
+
+ # TODO(pekowski): Remove this flag and the code under the if clause
+ # in Havana.
+- if not conf.amqp_rpc_single_reply_queue:
++ # (p-draigbrady): This clause is disabled to avoid qpid exchange leaks
++ if False and not conf.amqp_rpc_single_reply_queue:
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
diff --git a/openstack-heat.spec b/openstack-heat.spec
index f37b190..921c69d 100644
--- a/openstack-heat.spec
+++ b/openstack-heat.spec
@@ -5,7 +5,7 @@
Name: openstack-heat
Summary: OpenStack Orchestration (heat)
Version: 2013.1.2
-Release: 2%{?dist}
+Release: 3%{?dist}
License: ASL 2.0
Group: System Environment/Base
URL: http://www.openstack.org
@@ -20,6 +20,8 @@ Source4: openstack-heat-engine.service
Source5: openstack-heat-api-cloudwatch.service
Patch0: switch-to-using-m2crypto.patch
+Patch1: 0002-Update-RPC-to-Oslo-incubator-2013.1.patch
+Patch2: 0003-avoid-code-path-causing-qpid-exchange-leaks.patch
BuildArch: noarch
BuildRequires: python2-devel
@@ -37,6 +39,8 @@ Requires: %{name}-cli = %{version}-%{release}
%prep
%setup -q -n heat-%{version}
%patch0 -p1
+%patch1 -p1
+%patch2 -p1
%build
%{__python} setup.py build
@@ -283,6 +287,10 @@ Heat client tools accessible from the CLI
%{_mandir}/man1/heat-watch.1.gz
%changelog
+* Tue Jun 25 2013 Jeff Peeler <jpeeler at redhat.com> 2013.1.2-3
+- update RPC from Oslo
+- avoid qpid exchange leaks
+
* Mon Jun 10 2013 Jeff Peeler <jpeeler at redhat.com> 2013.1.2-2
- fixed m2crypto patch
More information about the scm-commits
mailing list