Piotr Kliczewski has uploaded a new change for review.
Change subject: stomp: reroute messages to different process
......................................................................
stomp: reroute messages to different process
When a client subscribes it can define which messages should be rerouted
to it based on a custom stomp header. Whenever vdsm receives message for
a verb specified it sends it to that client instead of processing it.
Change-Id: I622cb7f3b39a19314b7de4c325a62fa47faeaa4d
Signed-off-by: Piotr Kliczewski <piotr.kliczewski(a)gmail.com>
---
M lib/yajsonrpc/stomp.py
M lib/yajsonrpc/stompreactor.py
M tests/stompAdapterTests.py
3 files changed, 161 insertions(+), 16 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/46/65846/1
diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py
index 558c519..99d09ac 100644
--- a/lib/yajsonrpc/stomp.py
+++ b/lib/yajsonrpc/stomp.py
@@ -563,6 +563,7 @@
self._valid = True
self._message_handler = message_handler
self._destination = destination
+ self._redirects = []
def handle_message(self, frame):
self._message_handler(self, frame)
@@ -576,6 +577,13 @@
def set_message_handler(self, handler):
self._message_handler = handler
+ def set_redirects(self, redirects):
+ self._redirects = redirects
+
+ @property
+ def redirects(self):
+ return self._redirects
+
@property
def id(self):
return self._subid
diff --git a/lib/yajsonrpc/stompreactor.py b/lib/yajsonrpc/stompreactor.py
index 42d7d01..61bb98c 100644
--- a/lib/yajsonrpc/stompreactor.py
+++ b/lib/yajsonrpc/stompreactor.py
@@ -15,7 +15,7 @@
from __future__ import absolute_import
import logging
-from collections import deque
+from collections import deque, defaultdict
from uuid import uuid4
import functools
@@ -68,6 +68,7 @@
self._outbox = deque()
self._sub_dests = sub_map
self._req_dest = req_dest
+ self._redirects = defaultdict(list)
self._sub_ids = {}
request_queues = config.get('addresses', 'request_queues')
self.request_queues = request_queues.split(",")
@@ -128,6 +129,7 @@
self.log.info("Subscribe command received")
destination = frame.headers.get("destination", None)
sub_id = frame.headers.get("id", None)
+ redirect_value = frame.headers.get("redirect", None)
if not destination or not sub_id:
self._send_error("Missing destination or subscription id header",
@@ -145,6 +147,12 @@
self._sub_dests[destination].append(subscription)
self._sub_ids[sub_id] = subscription
+
+ if redirect_value:
+ redirects = redirect_value.split(",")
+ subscription.set_redirects(redirects)
+ for redirect in redirects:
+ self._redirects[redirect].append(subscription)
def _send_error(self, msg, connection):
res = stomp.Frame(
@@ -188,17 +196,26 @@
subs = self._sub_dests[subscription.destination]
if len(subs) == 1:
del self._sub_dests[subscription.destination]
+ self._remove_redirects(subscription)
else:
if subscription in subs:
subs.remove(subscription)
+ self._remove_redirects(subscription)
+
+ def _remove_redirects(self, subscription):
+ if subscription.redirects:
+ for redirect in subscription.redirects:
+ reds = self._redirects[redirect]
+ if len(reds) == 1:
+ del self._redirects[redirect]
+ else:
+ reds.remove(subscription)
def _cmd_send(self, dispatcher, frame):
destination = frame.headers.get(stomp.Headers.DESTINATION, None)
if destination in self.request_queues:
# default subscription
- self._handle_internal(dispatcher,
- frame.headers.get(stomp.Headers.REPLY_TO),
- frame.body)
+ self._handle_internal(dispatcher, frame)
return
else:
try:
@@ -224,31 +241,42 @@
)
subscription.client.send_raw(res)
- def _handle_internal(self, dispatcher, req_dest, request):
+ def _handle_internal(self, dispatcher, frame):
"""
We need to build response dictionary which maps message id
with destination. For legacy mode we use known 3.5 destination
or for standard mode we use 'reply-to' header.
"""
+ requests = []
try:
- self._handle_destination(dispatcher, req_dest, json.loads(request))
+ req_dest = frame.headers.get(stomp.Headers.REPLY_TO)
+ requests = self._handle_destination(dispatcher, req_dest,
+ frame, json.loads(frame.body))
except Exception:
# let json server process issue
pass
- dispatcher.connection.handleMessage(request)
+ for request in requests:
+ dispatcher.connection.handleMessage(request)
- def _handle_destination(self, dispatcher, req_dest, request):
+ def _handle_destination(self, dispatcher, req_dest, frame, request):
"""
We could receive single message or batch of messages. We need
to build response map for each message.
"""
if isinstance(request, list):
- map(functools.partial(self._handle_destination, dispatcher,
- req_dest),
- request)
- return
+ results = map(functools.partial(self._handle_destination,
+ dispatcher, req_dest, frame),
+ request)
+ return results
+
+ if request.get("method") in self._redirects.keys():
+ subscriptions = self._redirects[request.get("method")]
+ for subscription in subscriptions:
+ subscription.client.send_raw(frame)
+ return []
self._req_dest[request.get("id")] = req_dest
+ return [request]
def handle_frame(self, dispatcher, frame):
try:
@@ -318,7 +346,7 @@
"""
Sends message to all subscribes that subscribed to destination.
"""
- def send(self, message, destination):
+ def send(self, message, destination='jms.topic.vdsm_responses'):
resp = json.loads(message)
response_id = resp.get("id")
diff --git a/tests/stompAdapterTests.py b/tests/stompAdapterTests.py
index 610c722..8f98391 100644
--- a/tests/stompAdapterTests.py
+++ b/tests/stompAdapterTests.py
@@ -17,7 +17,7 @@
#
# Refer to the README and COPYING files for full details of the license
#
-from collections import defaultdict
+from collections import defaultdict, deque
from testlib import VdsmTestCase as TestCaseBase
from yajsonrpc import JsonRpcRequest
@@ -36,6 +36,10 @@
def send_raw(self, msg):
self._client.queue_frame(msg)
+
+ @property
+ def queue(self):
+ return self._client._outbox
def handleMessage(self, data):
self._client.queue_frame(data)
@@ -59,6 +63,7 @@
def __init__(self, destination, id):
self._destination = destination
self._id = id
+ self._redirects = []
def set_client(self, client):
self._client = TestConnection(client)
@@ -74,6 +79,29 @@
@property
def client(self):
return self._client
+
+ def set_redirects(self, redirects):
+ self._redirects = redirects
+
+ @property
+ def redirects(self):
+ return self._redirects
+
+
+class TestClient(object):
+
+ def __init__(self):
+ self._outbox = deque()
+
+ def queue_frame(self, frame):
+ self._outbox.append(frame)
+
+ def pop_message(self):
+ return self._outbox.popleft()
+
+ @property
+ def has_outgoing_messages(self):
+ return (len(self._outbox) > 0)
class ConnectFrameTest(TestCaseBase):
@@ -255,7 +283,7 @@
data = adapter.pop_message()
self.assertIsNot(data, None)
- request = JsonRpcRequest.decode(data)
+ request = JsonRpcRequest.fromRawObject(data)
self.assertEquals(request.method, 'Host.getAllVmStats')
self.assertTrue(len(ids) == 1)
@@ -278,7 +306,7 @@
data = adapter.pop_message()
self.assertIsNot(data, None)
- request = JsonRpcRequest.decode(data)
+ request = JsonRpcRequest.fromRawObject(data)
self.assertEquals(request.method, 'Host.getAllVmStats')
self.assertTrue(len(ids) == 1)
@@ -337,3 +365,84 @@
resp_frame = adapter.pop_message()
self.assertEquals(resp_frame.command, Command.MESSAGE)
+
+
+class RedirectTests(TestCaseBase):
+
+ def test_subscription(self):
+ frame = Frame(Command.SUBSCRIBE,
+ {'ack': 'auto',
+ Headers.DESTINATION: 'jms.queue.events',
+ 'id': '1',
+ 'redirect': 'VM.create'})
+ sub_map = defaultdict(list)
+
+ adapter = StompAdapterImpl(Reactor(), sub_map, {})
+ adapter.handle_frame(TestDispatcher(adapter), frame)
+
+ sub = sub_map['jms.queue.events']
+
+ self.assertEquals(len(sub), 1)
+ subscrption = sub[0]
+ self.assertEquals(subscrption.id, '1')
+
+ redirects = subscrption.redirects
+ self.assertEquals(len(redirects), 1)
+ self.assertEquals(redirects, ['VM.create'])
+ self.assertEquals(len(adapter._redirects), 1)
+
+ def test_unsubscribe(self):
+ frame = Frame(Command.UNSUBSCRIBE,
+ {'id': '1'})
+
+ subscription = TestSubscription('jms.queue.events',
+ '1')
+ subscription.set_redirects(['VM.create'])
+ sub_map = defaultdict(list)
+ sub_map['jms.queue.events'].append(subscription)
+
+ adapter = StompAdapterImpl(Reactor(), sub_map, {})
+ adapter._sub_ids['1'] = subscription
+ adapter._redirects['VM.create'].append(subscription)
+
+ adapter.handle_frame(TestDispatcher(adapter), frame)
+
+ self.assertTrue(len(adapter._redirects) == 0)
+ self.assertTrue(len(adapter._sub_ids) == 0)
+ self.assertTrue(len(sub_map) == 0)
+
+ def test_redirect(self):
+ frame = Frame(command=Command.SEND,
+ headers={Headers.DESTINATION: 'jms.topic.vdsm_requests',
+ Headers.REPLY_TO: 'jms.topic.vdsm_responses',
+ Headers.CONTENT_LENGTH: '103'},
+
body=('{"jsonrpc":"2.0","method":"VM.create",'
+
'"params":{},"id":"e8a936a6-d886-4cfa-97b9-2d54209'
+ '053ff"}'
+ )
+ )
+
+ subscription = TestSubscription('jms.topic.vdsm_requests',
+ '1')
+ client = TestClient()
+ subscription.set_client(client)
+
+ subscription.set_redirects(['VM.create'])
+ sub_map = defaultdict(list)
+ sub_map['jms.topic.vdsm_requests'].append(subscription)
+ ids = {}
+
+ adapter = StompAdapterImpl(Reactor(), sub_map, ids)
+ adapter._sub_ids['1'] = subscription
+ adapter._redirects['VM.create'].append(subscription)
+
+ adapter.handle_frame(TestDispatcher(adapter), frame)
+
+ self.assertFalse(adapter.has_outgoing_messages)
+ self.assertTrue(len(ids) == 0)
+
+ self.assertTrue(client.has_outgoing_messages)
+ data = client.pop_message()
+ request = JsonRpcRequest.decode(data.body)
+ self.assertEquals(request.method, 'VM.create')
+ self.assertEquals(request.id, 'e8a936a6-d886-4cfa-97b9-2d54209053ff')
--
To view, visit
https://gerrit.ovirt.org/65846
To unsubscribe, visit
https://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I622cb7f3b39a19314b7de4c325a62fa47faeaa4d
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <piotr.kliczewski(a)gmail.com>