development.ini | 2 moksha/api/hub/consumer.py | 17 ++++++++ moksha/api/widgets/amqp/amqp.py | 6 -- moksha/api/widgets/live/live.py | 18 ++++---- moksha/hub/amqp/qpid010.py | 4 - moksha/hub/hub.py | 83 +++++++++++++++++++--------------------- moksha/lib/helpers.py | 4 + orbited.cfg | 2 8 files changed, 77 insertions(+), 59 deletions(-)
New commits: commit ed6e3748f51b671c944799a426d15af18f9fa0df Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 22:39:27 2009 -0500
Disable STOMP support by default
diff --git a/development.ini b/development.ini index b3d18ca..3d0fa9a 100644 --- a/development.ini +++ b/development.ini @@ -85,7 +85,7 @@ orbited_host = localhost orbited_port = 9000
# Stomp broker configuration. -stomp_broker = localhost +#stomp_broker = localhost stomp_port = 61613 stomp_user = guest stomp_pass = guest diff --git a/orbited.cfg b/orbited.cfg index b7c2195..cbabfe7 100644 --- a/orbited.cfg +++ b/orbited.cfg @@ -4,7 +4,7 @@ http://:9000
# This enables Orbited's built-in MorbidQ message queue. # Comment this out if to use your own, eg: RabbitMQ -stomp://:61613 +#stomp://:61613
[access] * -> localhost:61613
commit 7817d5b0dc06c60ca82ada8173641b9756662e74 Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 22:38:27 2009 -0500
[hub] only setup the local/remote AMQP queue in the hub if stomp is enabled.
diff --git a/moksha/hub/hub.py b/moksha/hub/hub.py index 138c4d3..0e097b3 100644 --- a/moksha/hub/hub.py +++ b/moksha/hub/hub.py @@ -120,16 +120,17 @@ class CentralMokshaHub(MokshaHub): self.__init_data_streams()
def __init_amqp(self): - log.debug("Initializing local AMQP queue...") - self.server_queue_name = 'moksha_hub_' + self.session.name - self.queue_declare(queue=self.server_queue_name, exclusive=True) - self.exchange_bind(self.server_queue_name, binding_key='#') - self.local_queue_name = 'moksha_hub' - self.local_queue = self.session.incoming(self.local_queue_name) - self.message_subscribe(queue=self.server_queue_name, - destination=self.local_queue_name) - self.local_queue.start() - self.local_queue.listen(self.consume_amqp_message) + if self.stomp_broker: + log.debug("Initializing local AMQP queue...") + self.server_queue_name = 'moksha_hub_' + self.session.name + self.queue_declare(queue=self.server_queue_name, exclusive=True) + self.exchange_bind(self.server_queue_name, binding_key='#') + self.local_queue_name = 'moksha_hub' + self.local_queue = self.session.incoming(self.local_queue_name) + self.message_subscribe(queue=self.server_queue_name, + destination=self.local_queue_name) + self.local_queue.start() + self.local_queue.listen(self.consume_amqp_message)
def __init_consumers(self): """ Initialize all Moksha Consumer objects """
commit 75d670b1f44d87b11ae8da706b6f41a54bee9d3c Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 22:37:16 2009 -0500
[consumer] If we're using AMQP w/o STOMP, have the consumers setup their own remote & local queues with their MokshaHub.
diff --git a/moksha/api/hub/consumer.py b/moksha/api/hub/consumer.py index ac2495f..c3f9ed2 100644 --- a/moksha/api/hub/consumer.py +++ b/moksha/api/hub/consumer.py @@ -26,7 +26,11 @@ loaded, and receives each message for the specified topic through the .. moduleauthor:: Luke Macken lmacken@redhat.com """
+import logging +log = logging.getLogger('moksha.hub') + from moksha.hub.hub import MokshaHub +from moksha.lib.helpers import listify
class Consumer(object): """ A message consumer """ @@ -34,6 +38,19 @@ class Consumer(object):
def __init__(self): self.hub = MokshaHub() + self.log = log + if self.hub.amqp_broker and not self.hub.stomp_broker: + for topic in listify(self.topic): + log.debug('Subscribing to consumer topic %s' % topic) + server_queue_name = 'moksha_consumer_' + self.hub.session.name + self.hub.queue_declare(queue=server_queue_name, exclusive=True) + self.hub.exchange_bind(server_queue_name, binding_key=topic) + local_queue_name = 'moksha_consumer_' + self.hub.session.name + self.hub.local_queue = self.hub.session.incoming(local_queue_name) + self.hub.message_subscribe(queue=server_queue_name, + destination=local_queue_name) + self.hub.local_queue.start() + self.hub.local_queue.listen(self.consume)
def consume(self, message): raise NotImplementedError
commit 6ec6fabf8fc4fe63e94ff49246ed6463cab5bee2 Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 22:36:44 2009 -0500
Add a moksha.lib.helpers.listify method
diff --git a/moksha/lib/helpers.py b/moksha/lib/helpers.py index 45aa72b..4995921 100644 --- a/moksha/lib/helpers.py +++ b/moksha/lib/helpers.py @@ -1039,3 +1039,7 @@ def get_num_cpus():
def deprecation(message): warnings.warn(message, DeprecationWarning) + + +def listify(something): + return not isinstance(something, list) and [something] or something
commit 5d8e0d2c30454713694fbeda7a7d1af3325e1f1d Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 22:36:32 2009 -0500
Minor module cleanup
diff --git a/moksha/hub/hub.py b/moksha/hub/hub.py index 6d5e203..138c4d3 100644 --- a/moksha/hub/hub.py +++ b/moksha/hub/hub.py @@ -18,7 +18,6 @@
from moksha.hub.reactor import reactor
-import os import sys import signal import pkg_resources @@ -26,7 +25,6 @@ import logging
from tg import config from orbited import json -from threading import Thread from paste.deploy import appconfig
from moksha.lib.helpers import trace, defaultdict, get_moksha_config_path
commit c2918583b4d118afcd81b15a4104146d6020f0e1 Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 22:35:44 2009 -0500
Move the consume_{amqp,stomp}_message methods out of the MokshaHub and into the CentralMokshaHub class.
diff --git a/moksha/hub/hub.py b/moksha/hub/hub.py index 862ed0e..6d5e203 100644 --- a/moksha/hub/hub.py +++ b/moksha/hub/hub.py @@ -96,41 +96,12 @@ class MokshaHub(StompHub, AMQPHub): This method will cause the specified `callback` to be executed with each message that goes through a given topic. """ + log.debug('watch_topic(%s)' % locals()) if len(self.topics[topic]) == 0: if self.stomp_broker: self.subscribe(topic) self.topics[topic].append(callback)
- def consume_amqp_message(self, message): - self.message_accept(message) - topic = message.headers[0]['routing_key'] - try: - body = json.decode(message.body) - except Exception, e: - log.warning('Cannot decode message from JSON: %s' % e) - body = message.body - if self.stomp_broker: - StompHub.send_message(self, topic.encode('utf8'), - message.body.encode('utf8')) - - def consume_stomp_message(self, message): - topic = message['headers'].get('destination') - if not topic: - return - - # We can enable this if/when we need it... - #try: - # body = json.decode(message['body']) - #except Exception, e: - # log.warning('Cannot decode message from JSON: %s' % e) - # body = message['body'] - - - # feed all of our consumers - for callback in self.topics.get(topic, []): - reactor.callInThread(callback, message) - - class CentralMokshaHub(MokshaHub): """ The Moksha Hub is responsible for initializing all of the Hooks, @@ -154,7 +125,7 @@ class CentralMokshaHub(MokshaHub): log.debug("Initializing local AMQP queue...") self.server_queue_name = 'moksha_hub_' + self.session.name self.queue_declare(queue=self.server_queue_name, exclusive=True) - self.exchange_bind(self.server_queue_name) + self.exchange_bind(self.server_queue_name, binding_key='#') self.local_queue_name = 'moksha_hub' self.local_queue = self.session.incoming(self.local_queue_name) self.message_subscribe(queue=self.server_queue_name, @@ -204,6 +175,35 @@ class CentralMokshaHub(MokshaHub): log.debug("Stopping data stream %s" % stream) stream.stop()
+ def consume_amqp_message(self, message): + self.message_accept(message) + topic = message.headers[0]['routing_key'] + try: + body = json.decode(message.body) + except Exception, e: + log.warning('Cannot decode message from JSON: %s' % e) + log.debug('Message: %r' % message.body) + body = message.body + if self.stomp_broker: + StompHub.send_message(self, topic.encode('utf8'), + message.body.encode('utf8')) + + def consume_stomp_message(self, message): + topic = message['headers'].get('destination') + if not topic: + return + + # We can enable this if/when we need it... + #try: + # body = json.decode(message['body']) + #except Exception, e: + # log.warning('Cannot decode message from JSON: %s' % e) + # body = message['body'] + + # feed all of our consumers + for callback in self.topics.get(topic, []): + reactor.callInThread(callback, message) +
def setup_logger(verbose): global log
commit 4593e855177f8909166a65b9395947110996bfce Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 19:29:12 2009 -0500
Fix a bug in our Qpid hub
diff --git a/moksha/hub/amqp/qpid010.py b/moksha/hub/amqp/qpid010.py index 0ed5e5d..2af134d 100644 --- a/moksha/hub/amqp/qpid010.py +++ b/moksha/hub/amqp/qpid010.py @@ -16,11 +16,9 @@ # # Authors: Luke Macken lmacken@redhat.com
-import qpid import logging
from qpid.util import connect, URL, ssl -from qpid.queue import Empty from qpid.datatypes import Message, uuid4, RangedSet from qpid.connection import Connection
@@ -66,7 +64,7 @@ class QpidAMQPHub(BaseAMQPHub): msg = Message(props, message) self.session.message_transfer(destination=exchange, message=msg)
- def subscribe_queue(server_queue_name, local_queue_name): + def subscribe_queue(self, server_queue_name, local_queue_name): queue = self.session.incoming(local_queue_name) self.session.message_subscribe(queue=server_queue_name, destination=local_queue_name)
commit a14d8edd2c000547deca66309cc950327552c2d1 Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 19:28:16 2009 -0500
More AMQPSocket/LiveWidget integration
diff --git a/moksha/api/widgets/amqp/amqp.py b/moksha/api/widgets/amqp/amqp.py index 14a8ebc..2259e92 100644 --- a/moksha/api/widgets/amqp/amqp.py +++ b/moksha/api/widgets/amqp/amqp.py @@ -74,14 +74,11 @@ class AMQPSocket(Widget): 'orbited_port', 'orbited_url', 'orbited_js', 'amqp_broker_host', 'amqp_broker_port', 'amqp_broker_user', 'amqp_broker_pass', 'send_hook', 'recieve_hook'] - onconnectedframe = amqp_subscribe('org.fedoraproject.#') + onconnectedframe = '' onmessageframe = '' send_hook = '' recieve_hook = ''
- # Popup notification bubbles on socket state changes - notify = False - engine_name = 'mako' template = u""" <script type="text/javascript"> diff --git a/moksha/api/widgets/live/live.py b/moksha/api/widgets/live/live.py index 706402c..74608e8 100644 --- a/moksha/api/widgets/live/live.py +++ b/moksha/api/widgets/live/live.py @@ -23,7 +23,7 @@ from tw.api import Widget
from moksha.exc import MokshaException from moksha.api.widgets.stomp import StompWidget, stomp_subscribe, stomp_unsubscribe -from moksha.api.widgets.amqp import amqp_subscribe, amqp_unsubscribe +from moksha.api.widgets.amqp import amqp_subscribe, amqp_unsubscribe, AMQPSocket
class LiveWidget(Widget): """ A live streaming widget. @@ -39,14 +39,11 @@ class LiveWidget(Widget): template = 'mako:myproject.templates.mylivewidget'
""" - callbacks = ['onmessage'] engine_name = 'mako'
def __init__(self, id, *args, **kw): super(LiveWidget, self).__init__(*args, **kw) - backend = tg.config.get('moksha.livesocket.backend', 'amqp').lower() - if backend == 'stomp': - self.callbacks.extend(StompWidget.callbacks) + self.backend = tg.config.get('moksha.livesocket.backend', 'amqp').lower()
def update_params(self, d): """ Register this widgets message topic callbacks """ @@ -56,7 +53,12 @@ class LiveWidget(Widget): if not topics: raise MokshaException('You must specify a `topic` to subscribe to') topics = isinstance(topics, list) and topics or [topics] - for callback in self.callbacks: + callbacks = [] + if self.backend == 'stomp': + callbacks = StompWidget.callbacks + elif self.backend == 'amqp': + callbacks = AMQPSocket.callbacks + for callback in callbacks: if callback == 'onmessageframe': for topic in topics: cb = getattr(self, 'onmessage').replace('${id}', self.id)
commit d42c0fbceac6fa85520bbc382810f2e8e9aa1bb2 Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 19:27:08 2009 -0500
Make sure we return something from the unsubscribe_method
diff --git a/moksha/api/widgets/amqp/amqp.py b/moksha/api/widgets/amqp/amqp.py index 989913c..14a8ebc 100644 --- a/moksha/api/widgets/amqp/amqp.py +++ b/moksha/api/widgets/amqp/amqp.py @@ -57,6 +57,7 @@ def amqp_unsubscribe(topic): """ Return a javascript callback that unsubscribes to a given topic, or a list of topics. """ + return "" # TODO: #sub = "stomp.unsubscribe('%s');" #if isinstance(topic, list):
commit cf7015f5cf55c71824f136dfa4d9fa0cf0ff9389 Author: Luke Macken lmacken@redhat.com Date: Wed Nov 11 14:30:59 2009 -0500
Yes, class methods take classes as a first arg
diff --git a/moksha/api/widgets/live/live.py b/moksha/api/widgets/live/live.py index ab360b9..706402c 100644 --- a/moksha/api/widgets/live/live.py +++ b/moksha/api/widgets/live/live.py @@ -80,7 +80,7 @@ class LiveWidget(Widget): return topics
@classmethod - def subscribe_topics(topics): + def subscribe_topics(cls, topics): backend = tg.config.get('moksha.livesocket.backend', 'stomp').lower() if backend == 'amqp': return amqp_subscribe(topics) @@ -92,7 +92,7 @@ class LiveWidget(Widget): "'stomp'." % backend)
@classmethod - def unsubscribe_topics(topics): + def unsubscribe_topics(cls, topics): backend = tg.config.get('moksha.livesocket.backend', 'stomp').lower() if backend == 'amqp': return amqp_unsubscribe(topics)
moksha-commits@lists.fedorahosted.org