Change in vdsm[master]: pep8 clean vdsm/storage/securable.py
by xiawenc@linux.vnet.ibm.com
Wenchao Xia has uploaded a new change for review.
Change subject: pep8 clean vdsm/storage/securable.py
......................................................................
pep8 clean vdsm/storage/securable.py
Change-Id: Ie1c5b97b22ffc921e17e4cef2e79244d5ad190c7
Signed-off-by: wenchao xia <xiawenc(a)linux.vnet.ibm.com>
---
M Makefile.am
M vdsm/storage/securable.py
2 files changed, 10 insertions(+), 2 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/82/4682/1
--
To view, visit http://gerrit.ovirt.org/4682
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie1c5b97b22ffc921e17e4cef2e79244d5ad190c7
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Wenchao Xia <xiawenc(a)linux.vnet.ibm.com>
10 years, 9 months
Change in vdsm[master]: Qunatum POC changes
by gkotton@redhat.com
Gary Kotton has uploaded a new change for review.
Change subject: Qunatum POC changes
......................................................................
Qunatum POC changes
Change-Id: I992a0dd278daefa27e87b696c584bd7cd590c78e
Signed-off-by: Gary Kotton <gkotton(a)redhat.com>
---
M .gitignore
M vdsm/libvirtvm.py
A vdsm/quantum.py
3 files changed, 144 insertions(+), 29 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/43/4043/1
--
To view, visit http://gerrit.ovirt.org/4043
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I992a0dd278daefa27e87b696c584bd7cd590c78e
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Gary Kotton <gkotton(a)redhat.com>
Gerrit-Reviewer: Federico Simoncelli <fsimonce(a)redhat.com>
10 years, 9 months
Change in vdsm[master]: Title: fix some code style consist with pep8
by baichm@linux.vnet.ibm.com
Bruce Bai has uploaded a new change for review.
Change subject: Title: fix some code style consist with pep8
......................................................................
Title: fix some code style consist with pep8
content: fix some storage code files pep8 style.
Change-Id: I50cf61d9b7815cbdd5571930e3f9be59183a83f4
Signed-off-by: Changming Bai <baichm(a)linux.vnet.ibm.com>
---
M Makefile.am
M vdsm/storage/__init__.py
M vdsm/storage/safelease.py
M vdsm/storage/storageConstants.py
M vdsm/storage/storage_exception.py
M vdsm/storage/sync.py
6 files changed, 373 insertions(+), 33 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/00/4500/1
--
To view, visit http://gerrit.ovirt.org/4500
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I50cf61d9b7815cbdd5571930e3f9be59183a83f4
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Bruce Bai <baichm(a)linux.vnet.ibm.com>
10 years, 9 months
Change in vdsm[master]: Modify before_vm_migrate_source.py for PEP8 compliance
by qiaoliyong@gmail.com
Eli Qiao has uploaded a new change for review.
Change subject: Modify before_vm_migrate_source.py for PEP8 compliance
......................................................................
Modify before_vm_migrate_source.py for PEP8 compliance
Change-Id: I33b157093350f0c96639836175d2b313fd684984
Signed-off-by: Eli Qiao <qiaoliyong(a)gmail.com>
---
M vdsm_hooks/hostusb/before_vm_migrate_source.py
1 file changed, 1 insertion(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/31/4331/1
--
To view, visit http://gerrit.ovirt.org/4331
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I33b157093350f0c96639836175d2b313fd684984
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Eli Qiao <qiaoliyong(a)gmail.com>
10 years, 9 months
Change in vdsm[master]: Modify before_vm_migrate_source.py for PEP8 compliance
by qiaoliyong@gmail.com
Eli Qiao has uploaded a new change for review.
Change subject: Modify before_vm_migrate_source.py for PEP8 compliance
......................................................................
Modify before_vm_migrate_source.py for PEP8 compliance
Change-Id: I0b02d00083eb0390be4708f122c94f6682c03359
Signed-off-by: Eli Qiao <qiaoliyong(a)gmail.com>
---
M vdsm_hooks/sriov/before_vm_migrate_source.py
1 file changed, 1 insertion(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/32/4332/1
--
To view, visit http://gerrit.ovirt.org/4332
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0b02d00083eb0390be4708f122c94f6682c03359
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Eli Qiao <qiaoliyong(a)gmail.com>
10 years, 9 months
Change in vdsm[master]: Modify ./vdsm/__init__.py for PEP8 compliance ./vdsm/__init_...
by qiaoliyong@gmail.com
Eli Qiao has uploaded a new change for review.
Change subject: Modify ./vdsm/__init__.py for PEP8 compliance ./vdsm/__init__.py
......................................................................
Modify ./vdsm/__init__.py for PEP8 compliance ./vdsm/__init__.py
Change-Id: Ifca40b37c8a4a01d30589e33d49016975ed0f53e
Signed-off-by: Eli Qiao <qiaoliyong(a)gmail.com>
---
M vdsm/__init__.py
1 file changed, 1 insertion(+), 1 deletion(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/33/4333/1
--
To view, visit http://gerrit.ovirt.org/4333
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifca40b37c8a4a01d30589e33d49016975ed0f53e
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Eli Qiao <qiaoliyong(a)gmail.com>
10 years, 9 months
Change in vdsm[master]: pep8 clean vdsm/storage/hba.py
by xiawenc@linux.vnet.ibm.com
Wenchao Xia has uploaded a new change for review.
Change subject: pep8 clean vdsm/storage/hba.py
......................................................................
pep8 clean vdsm/storage/hba.py
Change-Id: I5afe01d5cf3a6018f6d513a04b3b0e37335c5675
Signed-off-by: wenchao xia <xiawenc(a)linux.vnet.ibm.com>
---
M Makefile.am
M vdsm/storage/hba.py
2 files changed, 8 insertions(+), 5 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/82/4382/1
--
To view, visit http://gerrit.ovirt.org/4382
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I5afe01d5cf3a6018f6d513a04b3b0e37335c5675
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Wenchao Xia <xiawenc(a)linux.vnet.ibm.com>
10 years, 9 months
Change in vdsm[master]: jsonrpc: Introduce the definitive JsonRpcClient
by smizrahi@redhat.com
Saggi Mizrahi has uploaded a new change for review.
Change subject: jsonrpc: Introduce the definitive JsonRpcClient
......................................................................
jsonrpc: Introduce the definitive JsonRpcClient
Supports events batch calls and what-not
If only I actually had tests for it
Change-Id: Ie514c9029416a5869802f065b49f2da033ff3da6
Signed-off-by: Saggi Mizrahi <smizrahi(a)redhat.com>
---
M tests/jsonRpcTests.py
M tests/jsonRpcUtils.py
M vdsm.spec.in
M yajsonrpc/Makefile.am
M yajsonrpc/__init__.py
M yajsonrpc/asyncoreReactor.py
D yajsonrpc/client.py
M yajsonrpc/protonReactor.py
8 files changed, 402 insertions(+), 186 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/04/12304/1
diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py
index e13e24b..1697eaa 100644
--- a/tests/jsonRpcTests.py
+++ b/tests/jsonRpcTests.py
@@ -36,7 +36,8 @@
from yajsonrpc import \
JsonRpcError, \
JsonRpcMethodNotFoundError, \
- JsonRpcInternalError
+ JsonRpcInternalError, \
+ JsonRpcRequest
CALL_TIMEOUT = 5
@@ -60,6 +61,7 @@
while True:
try:
client, msg = self._queue.get()
+ self.log.info("Echoing message A")
if client is None:
return
@@ -67,6 +69,22 @@
client.send(msg)
except Exception:
self.log.error("EchoServer died unexpectedly", exc_info=True)
+
+
+class ReactorClientSyncWrapper(object):
+ def __init__(self, client):
+ self._client = client
+ self._queue = Queue()
+ self._client.setInbox(self._queue)
+
+ def send(self, data):
+ self._client.send(data)
+
+ def connect(self):
+ self._client.connect()
+
+ def recv(self, timeout=None):
+ return self._queue.get(True, timeout)[1]
@expandPermutations
@@ -88,7 +106,7 @@
self.fail("Reactor died: (%s) %s" % (type(e), e))
with constructReactor(reactorType) as \
- (reactor, clientFactory, laddr):
+ (reactor, clientReactor, laddr):
t = threading.Thread(target=echosrv.serve)
t.setDaemon(True)
@@ -96,14 +114,15 @@
reactor.createListener(laddr, echosrv.accept)
- clientNum = 4
- repeats = 2
- subRepeats = 4
+ clientNum = 2
+ repeats = 10
+ subRepeats = 10
clients = []
try:
for i in range(clientNum):
- c = clientFactory()
+ c = ReactorClientSyncWrapper(
+ clientReactor.createClient(laddr))
c.connect()
clients.append(c)
@@ -111,19 +130,21 @@
for client in clients:
for i in range(subRepeats):
self.log.info("Sending message...")
- client.send(data, CALL_TIMEOUT)
+ client.send(data)
for i in range(repeats * subRepeats):
for client in clients:
- self.log.info("Waiting for reply...")
- retData = client.recv(CALL_TIMEOUT)
- self.log.info("Asserting reply...")
- self.assertEquals(
- retData, data,
- "Data is not as expected " +
- "'%s...%s' != '%s...%s'" %
- (retData[:10], retData[-10:],
- data[:10], data[-10:]))
+ self.log.info("Waiting for reply...")
+ retData = client.recv(CALL_TIMEOUT)
+ self.log.info("Asserting reply...")
+ self.assertTrue(isinstance(retData,
+ (str, unicode)))
+ self.assertEquals(
+ retData, data,
+ "Data is not as expected " +
+ "'%s...%s' != '%s...%s'" %
+ (retData[:10], retData[-10:],
+ data[:10], data[-10:]))
finally:
queue.put((None, None))
@@ -138,6 +159,17 @@
@expandPermutations
class JsonRpcServerTests(TestCaseBase):
+
+ def _callTimeout(self, client, methodName, params=None, rid=None,
+ timeout=None):
+ call = client.call_async(JsonRpcRequest(methodName, params, rid))
+ self.assertTrue(call.wait(timeout))
+ resp = call.responses[0]
+ if resp.error is not None:
+ raise JsonRpcError(resp.error['code'], resp.error['message'])
+
+ return resp.result
+
@permutations(REACTOR_TYPE_PERMUTATIONS)
def testMethodCallArgList(self, reactorType):
data = dummyTextGenerator(1024)
@@ -147,9 +179,8 @@
client = clientFactory()
client.connect()
with closing(client):
- self.assertEquals(client.callMethod("echo", (data,), 10,
- CALL_TIMEOUT),
- data)
+ self.assertEquals(self._callTimeout(client, "echo", (data,),
+ 10, CALL_TIMEOUT), data)
@permutations(REACTOR_TYPE_PERMUTATIONS)
def testMethodCallArgDict(self, reactorType):
@@ -160,7 +191,7 @@
client = clientFactory()
client.connect()
with closing(client):
- self.assertEquals(client.callMethod("echo",
+ self.assertEquals(self._callTimeout(client, "echo",
{'text': data},
10, CALL_TIMEOUT),
data)
@@ -173,7 +204,7 @@
client.connect()
with closing(client):
with self.assertRaises(JsonRpcError) as cm:
- client.callMethod("I.DO.NOT.EXIST :(", [], 10,
+ self._callTimeout(client, "I.DO.NOT.EXIST :(", [], 10,
CALL_TIMEOUT)
self.assertEquals(cm.exception.code,
@@ -189,7 +220,8 @@
client.connect()
with closing(client):
with self.assertRaises(JsonRpcError) as cm:
- client.callMethod("echo", [], 10, timeout=CALL_TIMEOUT)
+ self._callTimeout(client, "echo", [], 10,
+ timeout=CALL_TIMEOUT)
self.assertEquals(cm.exception.code,
JsonRpcInternalError().code)
@@ -201,6 +233,7 @@
client = clientFactory()
client.connect()
with closing(client):
- res = client.callMethod("ping", [], 10, timeout=CALL_TIMEOUT)
+ res = self._callTimeout(client, "ping", [], 10,
+ timeout=CALL_TIMEOUT)
self.assertEquals(res, None)
diff --git a/tests/jsonRpcUtils.py b/tests/jsonRpcUtils.py
index 99e63d8..43b2ffd 100644
--- a/tests/jsonRpcUtils.py
+++ b/tests/jsonRpcUtils.py
@@ -8,9 +8,8 @@
from yajsonrpc import \
JsonRpcServer, \
- asyncoreReactor
-from yajsonrpc.client import \
- JsonRpcClient
+ asyncoreReactor, \
+ JsonRpcClientPool
protonReactor = None
try:
@@ -36,21 +35,8 @@
def _tcpServerConstructor():
port = getFreePort()
address = ("localhost", port)
- reactor = asyncoreReactor.AsyncoreReactor()
- clientsReactor = asyncoreReactor.AsyncoreReactor()
-
- t = threading.Thread(target=clientsReactor.process_requests)
- t.setDaemon(True)
- t.start()
-
- def clientFactory(address):
- return TestClientWrapper(clientsReactor.createClient(address))
-
- try:
- yield reactor, partial(clientFactory, address), address
- finally:
- reactor.stop()
- clientsReactor.stop()
+ reactorType = asyncoreReactor.AsyncoreReactor
+ yield reactorType, address
@contextmanager
@@ -59,59 +45,43 @@
raise SkipTest("qpid-proton python bindings are not installed")
port = getFreePort()
- reactor = protonReactor.ProtonReactor()
+ reactorType = protonReactor.ProtonReactor
- def clientFactory(address):
- return TestClientWrapper(reactor.createClient(address))
-
- try:
- yield (reactor,
- partial(clientFactory, ("127.0.0.1", port)),
- ("127.0.0.1", port))
- finally:
- reactor.stop()
+ yield (reactorType,
+ ("127.0.0.1", port))
REACTOR_CONSTRUCTORS = {"tcp": _tcpServerConstructor,
- "proton": _protonServerConstructor}
+ "amqp": _protonServerConstructor}
REACTOR_TYPE_PERMUTATIONS = [[r] for r in REACTOR_CONSTRUCTORS.iterkeys()]
@contextmanager
def constructReactor(tp):
- with REACTOR_CONSTRUCTORS[tp]() as (serverReactor, clientFactory, laddr):
+ with REACTOR_CONSTRUCTORS[tp]() as (reactorType, laddr):
+ serverReactor = reactorType()
t = threading.Thread(target=serverReactor.process_requests)
t.setDaemon(True)
t.start()
- yield serverReactor, clientFactory, laddr
+ clientReactor = reactorType()
+ t = threading.Thread(target=clientReactor.process_requests)
+ t.setDaemon(True)
+ t.start()
-
-class TestClientWrapper(object):
- def __init__(self, client):
- self._client = client
- self._queue = Queue()
- self._client.setInbox(self._queue)
-
- def send(self, data, timeout=None):
- self._client.send(data)
-
- def recv(self, timeout=None):
- return self._queue.get(timeout=timeout)[1]
-
- def connect(self):
- return self._client.connect()
-
- def close(self):
- return self._client.close()
+ try:
+ yield serverReactor, clientReactor, laddr
+ finally:
+ clientReactor.stop()
+ serverReactor.stop()
@contextmanager
def constructServer(tp, bridge):
queue = Queue()
server = JsonRpcServer(bridge, queue)
- with constructReactor(tp) as (reactor, clientFactory, laddr):
+ with constructReactor(tp) as (reactor, clientReactor, laddr):
def _accept(listener, client):
client.setInbox(queue)
@@ -121,10 +91,15 @@
t.setDaemon(True)
t.start()
- def jsonClientFactory():
- return JsonRpcClient(clientFactory())
+ cpool = JsonRpcClientPool({tp: clientReactor})
+ t = threading.Thread(target=cpool.serve)
+ t.setDaemon(True)
+ t.start()
+
+ curl = "%s://%s:%d" % (tp, laddr[0], laddr[1])
+ clientFactory = partial(cpool.createClient, curl)
try:
- yield server, jsonClientFactory
+ yield server, clientFactory
finally:
server.stop()
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 38838ea..c3b0296 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -988,7 +988,6 @@
%{python_sitearch}/yajsonrpc/__init__.py*
%{python_sitearch}/yajsonrpc/asyncoreReactor.py*
%{python_sitearch}/yajsonrpc/protonReactor.py*
-%{python_sitearch}/yajsonrpc/client.py*
%files bootstrap
%defattr(-, root, root, -)
diff --git a/yajsonrpc/Makefile.am b/yajsonrpc/Makefile.am
index e43eace..422abe9 100644
--- a/yajsonrpc/Makefile.am
+++ b/yajsonrpc/Makefile.am
@@ -21,7 +21,6 @@
dist_yajsonrpc_PYTHON = \
__init__.py \
asyncoreReactor.py \
- client.py \
protonReactor.py \
$(NULL)
diff --git a/yajsonrpc/__init__.py b/yajsonrpc/__init__.py
index 42e0704..65316dd 100644
--- a/yajsonrpc/__init__.py
+++ b/yajsonrpc/__init__.py
@@ -15,6 +15,9 @@
import json
import logging
from functools import partial
+from Queue import Queue
+from weakref import ref
+from threading import Lock, Event
__all__ = ["tcpReactor"]
@@ -88,8 +91,6 @@
raise JsonRpcInvalidRequestError()
reqId = obj.get("id")
- if not isinstance(reqId, int):
- raise JsonRpcInvalidRequestError()
params = obj.get('params', [])
if not isinstance(params, (list, dict)):
@@ -140,6 +141,58 @@
reqId = obj.get('id')
return JsonRpcResponse(result, error, reqId)
+ @staticmethod
+ def fromRawObject(obj):
+ print obj
+ if obj.get("jsonrpc") != "2.0":
+ raise JsonRpcInvalidRequestError()
+
+ if "result" not in obj and "error" not in obj:
+ raise JsonRpcInvalidRequestError()
+
+ result = obj.get("result")
+ error = obj.get("error")
+
+ reqId = obj.get("id")
+ if not isinstance(reqId, int):
+ raise JsonRpcInvalidRequestError()
+
+ return JsonRpcResponse(result, error, reqId)
+
+
+class _JsonRpcClientRequestContext(object):
+ def __init__(self, requests, callback):
+ self.callback = callback
+ self._requests = requests
+
+ self._responses = {}
+ for req in requests:
+ if req.id is None:
+ continue # Notifications don't have responses
+
+ self._responses[req.id] = None
+
+ def addResponse(self, resp):
+ self._responses[resp.id] = resp
+
+ def isDone(self):
+ for v in self._responses.values():
+ if v is None:
+ return False
+
+ return True
+
+ def getResponses(self):
+ return self._responses.values()
+
+ def ids(self):
+ return self._responses.keys()
+
+ def encode(self):
+ return ("[" +
+ ", ".join(r.encode() for r in self._requests) +
+ "]")
+
class _JsonRpcServeRequestContext(object):
def __init__(self, client):
@@ -190,6 +243,191 @@
self.sendReply()
+class JsonRpcClientPool(object):
+ log = logging.getLogger("JsonRpcClientPool")
+
+ def __init__(self, reactors):
+ self._reactors = reactors
+ self._inbox = Queue()
+ self._clients = {}
+ self._eventcbs = []
+
+ def createClient(self, address):
+ rtype, address = address.split("://")
+ host, port = address.split(":")
+ port = int(port)
+ transport = self._reactors[rtype].createClient((host, port))
+ transport.setInbox(self._inbox)
+ client = JsonRpcClient(transport)
+ self._clients[transport] = client
+ return client
+
+ def registerEventCallback(self, eventcb):
+ self._eventcbs.append(ref(eventcb))
+
+ def unregisterEventCallback(self, eventcb):
+ for r in self._eventcbs[:]:
+ cb = r()
+ if cb is None or cb == eventcb:
+ try:
+ self._eventcbs.remove(r)
+ except ValueError:
+ pass
+
+ def emit(self, client, event, params):
+ for r in self._eventcbs[:]:
+ cb = r()
+ if cb is None:
+ continue
+
+ cb(client, event, params)
+
+ def _processEvent(self, client, obj):
+ if isinstance(obj, list):
+ map(self._processEvent, obj)
+ return
+
+ req = JsonRpcRequest.fromRawObject(obj)
+ if not req.isNotification():
+ self.log.warning("Recieved non notification, ignoring")
+
+ self.emit(client, req.method, req.params)
+
+ def serve(self):
+ while True:
+ data = self._inbox.get()
+ if data is None:
+ return
+
+ transport, message = data
+ client = self._clients[transport]
+ try:
+ mobj = json.loads(message)
+ isResponse = self._isResponse(mobj)
+ except:
+ self.log.warning("Problem parsing message from client")
+ transport.close()
+ del self._clients[transport]
+ continue
+
+ if isResponse:
+ client._processIncomingResponse(mobj)
+ else:
+ self._processEvent(client, mobj)
+
+ def _isResponse(self, obj):
+ if isinstance(obj, list):
+ v = None
+ for res in map(self._isResponse, obj):
+ if v is None:
+ v = res
+
+ if v != res:
+ raise TypeError("batch is mixed")
+
+ return v
+ else:
+ return ("result" in obj or "error" in obj)
+
+ def close(self):
+ self._inbox.put(None)
+
+
+class JsonRpcCall(object):
+ def __init__(self):
+ self._ev = Event()
+ self.responses = None
+
+ def _callback(self, c, resp):
+ if not isinstance(resp, list):
+ resp = [resp]
+
+ self.responses = resp
+ self._ev.set()
+
+ def wait(self, timeout=None):
+ self._ev.wait(timeout)
+ return self._ev.is_set()
+
+ def isSet(self):
+ return self._ev.is_set()
+
+
+class JsonRpcClient(object):
+ log = logging.getLogger("jsonrpc.JsonRpcClient")
+
+ def __init__(self, transport):
+ self._transport = transport
+ self._runningRequests = {}
+ self._lock = Lock()
+
+ def connect(self):
+ self._transport.connect()
+
+ def callMethod(self, methodName, params=[], rid=None):
+ return self.call(JsonRpcRequest(methodName, params, rid))
+
+ def call(self, req):
+ resp = self.call_batch([req])[0]
+ if "error" in resp:
+ raise JsonRpcError(resp.error['code'], resp.error['message'])
+
+ return resp.result
+
+ def call_batch(self, *reqs):
+ call = self.call_async(reqs)
+ call.wait()
+ return call.responses
+
+ def call_async(self, *reqs):
+ call = JsonRpcCall()
+ self.call_cb(call._callback, *reqs)
+ return call
+
+ def call_cb(self, cb, *reqs):
+ ctx = _JsonRpcClientRequestContext(reqs, cb)
+ with self._lock:
+ for rid in ctx.ids():
+ try:
+ self._runningRequests[rid]
+ except KeyError:
+ pass
+ else:
+ raise ValueError("Request id already in use %s", rid)
+
+ self._runningRequests[rid] = ctx
+ self._transport.send(ctx.encode())
+
+ # All notifications
+ if ctx.isDone():
+ self._finalizeCtx(ctx)
+
+ def _finalizeCtx(self, ctx):
+ with self._lock:
+ if not ctx.isDone():
+ print ctx._responses
+ return
+
+ cb = ctx.callback
+ if cb is not None:
+ cb(self, ctx.getResponses())
+
+ def _processIncomingResponse(self, resp):
+ if isinstance(resp, list):
+ map(self._processIncomingResponse, resp)
+ return
+
+ resp = JsonRpcResponse.fromRawObject(resp)
+ with self._lock:
+ ctx = self._runningRequests.pop(resp.id)
+ ctx.addResponse(resp)
+
+ self._finalizeCtx(ctx)
+
+ def close(self):
+ self._transport.close()
+
+
class JsonRpcServer(object):
log = logging.getLogger("jsonrpc.JsonRpcServer")
diff --git a/yajsonrpc/asyncoreReactor.py b/yajsonrpc/asyncoreReactor.py
index 1ef6264..8c7fa5a 100644
--- a/yajsonrpc/asyncoreReactor.py
+++ b/yajsonrpc/asyncoreReactor.py
@@ -107,6 +107,15 @@
def close(self):
self._client.close()
+ def __eq__(self, other):
+ if type(self) != type(other):
+ return False
+
+ return hash(self) == hash(other)
+
+ def __hash__(self):
+ return hash(type(self)) ^ hash(self._client)
+
# FIXME: We should go about making a listener wrapper like the client wrapper
# This is not as high priority as users don't interact with listeners
diff --git a/yajsonrpc/client.py b/yajsonrpc/client.py
deleted file mode 100644
index b7d2786..0000000
--- a/yajsonrpc/client.py
+++ /dev/null
@@ -1,45 +0,0 @@
-import json
-from yajsonrpc import \
- JsonRpcError, \
- asyncoreReactor
-
-_Size = asyncoreReactor._Size
-
-proton = None
-try:
- import proton
- from yajsonrpc import protonReactor
- proton # Squash pyflakes error for
- protonReactor # unused import
-except ImportError:
- pass
-
-
-class JsonRpcClient(object):
- def __init__(self, reactorClient):
- self._transport = reactorClient
-
- def connect(self):
- self._transport.connect()
-
- def callMethod(self, methodName, params=(), reqId=None, timeout=None):
- msg = {'jsonrpc': '2.0',
- 'method': methodName,
- 'params': params,
- 'id': reqId}
-
- self._transport.send(json.dumps(msg, 'utf-8'), timeout=timeout)
- # Notifications have no repsonse
- if reqId is None:
- return
-
- resp = self._transport.recv(timeout=timeout)
- resp = json.loads(resp)
- if resp.get('error') is not None:
- raise JsonRpcError(resp['error']['code'],
- resp['error']['message'])
-
- return resp.get('result')
-
- def close(self):
- self._transport.close()
diff --git a/yajsonrpc/protonReactor.py b/yajsonrpc/protonReactor.py
index 2796e24..5cf845f 100644
--- a/yajsonrpc/protonReactor.py
+++ b/yajsonrpc/protonReactor.py
@@ -27,6 +27,8 @@
SERVER_AUTH = 2
CLIENT_AUTH = 3
+MBUFF_SIZE = 10
+
# Used for reactor coroutines
class Return(object):
@@ -73,7 +75,9 @@
def _openClientSession(self):
host, port = self._address
amqpAddress = "ampq://%s:%d/vdsm" % (host, port)
- senderName = "Client (%s)" % (amqpAddress,)
+ senderName = "jsonrpc.ProtonClient %s (%s)" % (str(uuid.uuid4()),
+ amqpAddress,)
+ self.log = logging.getLogger(senderName)
self.connector = proton.pn_connector(self._reactor._driver,
host, str(port), None)
@@ -116,12 +120,17 @@
def _pushIncomingMessage(self, msg):
try:
self._inbox.put_nowait((self, msg))
+ self.log.debug("Message queued on inbox (%d)", id(self._inbox))
except AttributeError:
# Inbox not set
+ self.log.warn("Message missed since inbox was not set for "
+ "this client")
pass
def _popPendingMessage(self):
- return self._outbox.get_nowait()
+ msg = self._outbox.get_nowait()
+ self.log.debug("Pulling message from outbox")
+ return msg
def setInbox(self, queue):
self._inbox = queue
@@ -140,6 +149,7 @@
log = logging.getLogger("jsonrpc.ProtonReactor")
def __init__(self, deliveryTimeout=5):
+ self.log = logging.getLogger("jsonrpc.ProtonReactor (%d)" % id(self))
self._isRunning = False
self._coroutines = []
@@ -148,6 +158,7 @@
self._deliveryTimeout = deliveryTimeout
self._commandQueue = Queue()
self._listeners = {}
+ self._wakeEv = Event()
def _activate(self, connector, cond):
self._scheduleOp(False, proton.pn_connector_activate,
@@ -166,7 +177,6 @@
return int(timeout * 1000)
def _waitDriverEvent(self, timeout=None):
- self.log.debug("Waiting for events")
timeout = self._convertTimeout(timeout)
proton.pn_driver_wait(self._driver, timeout)
@@ -209,10 +219,8 @@
self.log.debug("Authentication-PENDING")
def _processConnectors(self):
- connector = proton.pn_driver_connector(self._driver)
+ connector = proton.pn_connector_head(self._driver)
while connector:
- self.log.debug("Process Connector")
-
# releaes any connector that has been closed
if proton.pn_connector_closed(connector):
self.log.debug("Closing connector")
@@ -233,7 +241,7 @@
proton.pn_connector_process(connector)
- connector = proton.pn_driver_connector(self._driver)
+ connector = proton.pn_connector_next(connector)
def _initConnection(self, conn):
if proton.pn_connection_state(conn) & proton.PN_LOCAL_UNINIT:
@@ -241,8 +249,7 @@
proton.pn_connection_open(conn)
def createClient(self, address):
- client = ProtonClient(self, None, None, None, address)
- return client
+ return ProtonClient(self, None, None, None, address)
def _openPendingSessions(self, conn, connector):
ssn = proton.pn_session_head(conn, proton.PN_LOCAL_UNINIT)
@@ -260,7 +267,6 @@
def _openLinks(self, conn):
link = proton.pn_link_head(conn, proton.PN_LOCAL_UNINIT)
while link:
- self.log.debug("Opening Link")
proton.pn_terminus_copy(proton.pn_link_source(link),
proton.pn_link_remote_source(link))
proton.pn_terminus_copy(proton.pn_link_target(link),
@@ -278,17 +284,14 @@
elif proton.pn_link_is_receiver(link):
self.log.debug("Opening Link to recv messages")
- proton.pn_link_flow(link, 1)
proton.pn_link_open(link)
+ proton.pn_link_flow(link, MBUFF_SIZE)
link = proton.pn_link_next(link, proton.PN_LOCAL_UNINIT)
def _processDeliveries(self, conn, connector):
delivery = proton.pn_work_head(conn)
while delivery:
- self.log.debug("Process delivery %s" %
- proton.pn_delivery_tag(delivery))
-
if proton.pn_delivery_readable(delivery):
self._processIncoming(delivery, connector)
elif proton.pn_delivery_writable(delivery):
@@ -297,32 +300,46 @@
delivery = proton.pn_work_next(delivery)
def _cleanDeliveries(self, conn):
- link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE))
- while link:
+ def link_iter(conn):
+ link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE))
+ while link:
+ yield link
+ link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE))
+
+ def delivery_iter(link):
d = proton.pn_unsettled_head(link)
while d:
- _next = proton.pn_unsettled_next(d)
+ yield d
+ d = proton.pn_unsettled_next(d)
+
+ for link in link_iter(conn):
+ for d in delivery_iter(link):
+ ctx = proton.pn_delivery_get_context(d)
+ if isinstance(ctx, str):
+ continue
+
disp = proton.pn_delivery_remote_state(d)
- age = time.time() - proton.pn_delivery_get_context(d)
- self.log.debug("Checking delivery")
+ age = time.time() - ctx
+ self.log.debug("Checking delivery (%s)",
+ proton.pn_delivery_tag(d))
+
if disp and disp != proton.PN_ACCEPTED:
self.log.warn("Message was not accepted by remote end")
if disp and proton.pn_delivery_settled(d):
self.log.debug("Message settled by remote end")
proton.pn_delivery_settle(d)
+ proton.pn_delivery_clear(d)
elif age > self._deliveryTimeout:
self.log.warn("Delivary not settled by remote host")
proton.pn_delivery_settle(d)
+ proton.pn_delivery_clear(d)
elif proton.pn_link_state(link) & proton.PN_REMOTE_CLOSED:
self.log.warn("Link closed before settling message")
proton.pn_delivery_settle(d)
-
- d = _next
-
- link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE))
+ proton.pn_delivery_clear(d)
def _cleanLinks(self, conn):
link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE |
@@ -370,26 +387,22 @@
ctx.sender = sender
continue
- if proton.pn_link_credit(sender) == 0:
- self.log.debug("Not enough credit, waiting")
- continue
+ while proton.pn_link_credit(sender) > 0:
+ try:
+ data = ctx._popPendingMessage()
+ except Empty:
+ break
+ else:
+ msg = proton.Message()
+ msg.body = data
+ d = proton.pn_delivery(sender,
+ "delivery-%s" % str(uuid.uuid4()))
- try:
- data = ctx._popPendingMessage()
- except Empty:
- continue
- else:
- msg = proton.Message()
- msg.body = data
- self.log.debug("Creating delivery")
- proton.pn_link_set_context(sender, msg.encode())
-
- proton.pn_delivery(sender,
- "response-delivery-%s" % str(uuid.uuid4()))
- self.log.debug("Queued what I could")
+ proton.pn_delivery_set_context(d, msg.encode())
+ self.log.debug("Queueing delivery (%s)",
+ proton.pn_delivery_tag(d))
def _serviceConnector(self, connector):
- self.log.debug("Service Connector")
conn = proton.pn_connector_connection(connector)
self._initConnection(conn)
@@ -410,42 +423,37 @@
link = proton.pn_delivery_link(delivery)
ssn = proton.pn_link_session(link)
msg = []
- rc, buff = proton.pn_link_recv(link, 1024)
- while rc >= 0:
- msg.append(buff)
+ self.log.debug("Receiving '%s'", proton.pn_delivery_tag(delivery))
+ while True:
rc, buff = proton.pn_link_recv(link, 1024)
+ msg.append(buff)
+ if rc == proton.PN_EOS:
+ break
msg = ''.join(msg)
+ self.log.debug("Received '%s'", proton.pn_delivery_tag(delivery))
+ proton.pn_link_advance(link)
proton.pn_delivery_update(delivery, proton.PN_ACCEPTED)
+ proton.pn_delivery_settle(delivery)
+
msgObj = proton.Message()
msgObj.decode(msg)
ctx = proton.pn_session_get_context(ssn)
ctx._pushIncomingMessage(msgObj.body)
- proton.pn_delivery_settle(delivery)
- proton.pn_link_advance(link)
-
# if more credit is needed, grant it
if proton.pn_link_credit(link) == 0:
- proton.pn_link_flow(link, 1)
+ proton.pn_link_flow(link, MBUFF_SIZE)
def _processOutgoing(self, delivery):
link = proton.pn_delivery_link(delivery)
- msg = proton.pn_link_get_context(link)
- sent = proton.pn_link_send(link, msg)
- if sent < 0:
- self.log.warn("Problem sending message")
- else:
- msg = msg[sent:]
- if len(msg) != 0:
- self.log.debug("Delivery partial")
- proton.pn_link_set_context(link, msg)
- else:
- self.log.debug("Delivery finished")
- proton.pn_link_set_context(link, "")
- proton.pn_delivery_set_context(delivery, time.time())
- proton.pn_link_advance(link)
+ msg = proton.pn_delivery_get_context(delivery)
+ proton.pn_link_send(link, msg)
+ if proton.pn_link_advance(link):
+ self.log.debug("Delivery finished (%s)",
+ proton.pn_delivery_tag(delivery))
+ proton.pn_delivery_set_context(delivery, time.time())
def createListener(self, address, acceptHandler):
host, port = address
--
To view, visit http://gerrit.ovirt.org/12304
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie514c9029416a5869802f065b49f2da033ff3da6
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <smizrahi(a)redhat.com>
10 years, 9 months
Change in vdsm[master]: java_bindings: Add function overloads
by smizrahi@redhat.com
Saggi Mizrahi has uploaded a new change for review.
Change subject: java_bindings: Add function overloads
......................................................................
java_bindings: Add function overloads
Change-Id: I40887a5fd2e3901efdcc9633c5858ae6b2445291
Signed-off-by: Saggi Mizrahi <smizrahi(a)redhat.com>
---
M client/java/vdsm-api/generate.py
M client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/jsonrpc/JsonRpcClient.java
2 files changed, 22 insertions(+), 16 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/23/14823/1
diff --git a/client/java/vdsm-api/generate.py b/client/java/vdsm-api/generate.py
index fb03b60..ff9be96 100644
--- a/client/java/vdsm-api/generate.py
+++ b/client/java/vdsm-api/generate.py
@@ -230,9 +230,11 @@
#for $nobj in $client.nestedObjects
public class $nobj.name {
#for $command in $nobj.commands
+ #set $params = $command.parameters
+ #while True
#set $rtype = $command.returnType.genericTypeName
public Future<VDSMResponse<${rtype}>> ${command.name} (
- #for $param in $command.parameters
+ #for $param in $params
${param.type.typeName} ${param.name},
#end for
String id
@@ -243,13 +245,18 @@
builder.id(id);
- #for $param in $command.parameters
+ #for $param in $params
builder.addParameter("${param.realName}", ${param.name});
#end for
return _call(builder,
new ${command.returnType.genericTypeAdapter.typeName}());
}
+ #if len($params) == 0 or (not $params[-1].isOptional)
+ #break
+ #end if
+ #set $params = $params[:-1]
+ #end while
#end for
}
@@ -330,9 +337,6 @@
def resolveType(typeName, typeCache):
if isinstance(typeName, list):
return ArrayDef(typeName[0], typeCache)
-
- if typeName.startswith("*"):
- typeName = typeName[1:]
typedef = typeCache[typeName]
@@ -429,7 +433,9 @@
class CommandParameterDef(object):
def __init__(self, name, typeName, typeCache):
+ self.isOptional = False
if name.startswith("*"):
+ self.isOptional = True
name = name[1:]
self.realName = name
diff --git a/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/jsonrpc/JsonRpcClient.java b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/jsonrpc/JsonRpcClient.java
index 0d649af..818fb98 100644
--- a/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/jsonrpc/JsonRpcClient.java
+++ b/client/java/vdsm-json-rpc/src/main/java/org/ovirt/vdsm/jsonrpc/JsonRpcClient.java
@@ -48,7 +48,7 @@
}
i++;
}
- _responses = new ArrayList<JsonRpcResponse>(i);
+ _responses = new ArrayList<>(i);
if (i == 0) {
_latch = new CountDownLatch(1);
_latch.countDown();
@@ -153,16 +153,16 @@
public JsonRpcClient(ReactorClient client) {
_client = client;
_jfactory = new ObjectMapper().getJsonFactory();
- _runningCalls = new HashMap<JsonNode, JsonRpcCall>();
+ _runningCalls = new HashMap<>();
}
public Future<JsonRpcResponse> call(JsonRpcRequest req) {
final JsonNode json = req.toJson();
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
- final JsonGenerator gen = _jfactory.createJsonGenerator(os);
- gen.writeTree(json);
- gen.close();
+ try (JsonGenerator gen = _jfactory.createJsonGenerator(os)) {
+ gen.writeTree(json);
+ }
} catch (IOException e) {
// Never happens
}
@@ -183,13 +183,13 @@
public Future<List<JsonRpcResponse>> batchCall(List<JsonRpcRequest> requests) {
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try {
- final JsonGenerator gen = _jfactory.createJsonGenerator(os);
- gen.writeStartArray();
- for (final JsonRpcRequest request : requests) {
- gen.writeTree(request.toJson());
+ try (JsonGenerator gen = _jfactory.createJsonGenerator(os)) {
+ gen.writeStartArray();
+ for (final JsonRpcRequest request : requests) {
+ gen.writeTree(request.toJson());
+ }
+ gen.writeEndArray();
}
- gen.writeEndArray();
- gen.close();
} catch (IOException e) {
// Never happens
}
--
To view, visit http://gerrit.ovirt.org/14823
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I40887a5fd2e3901efdcc9633c5858ae6b2445291
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <smizrahi(a)redhat.com>
10 years, 9 months
Change in vdsm[master]: jsonrpc: Make proton reactor client capable as well
by smizrahi@redhat.com
Saggi Mizrahi has uploaded a new change for review.
Change subject: jsonrpc: Make proton reactor client capable as well
......................................................................
jsonrpc: Make proton reactor client capable as well
Change-Id: I14bc36adf2bd7536227138398761818a77174231
Signed-off-by: Saggi Mizrahi <smizrahi(a)redhat.com>
---
M tests/jsonRpcUtils.py
M yajsonrpc/client.py
M yajsonrpc/protonReactor.py
3 files changed, 122 insertions(+), 91 deletions(-)
git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/03/12303/1
diff --git a/tests/jsonRpcUtils.py b/tests/jsonRpcUtils.py
index d9918d2..99e63d8 100644
--- a/tests/jsonRpcUtils.py
+++ b/tests/jsonRpcUtils.py
@@ -10,8 +10,7 @@
JsonRpcServer, \
asyncoreReactor
from yajsonrpc.client import \
- JsonRpcClient, \
- ProtonReactorClient
+ JsonRpcClient
protonReactor = None
try:
@@ -60,12 +59,14 @@
raise SkipTest("qpid-proton python bindings are not installed")
port = getFreePort()
- serverAddress = "amqp://127.0.0.1:%d/vdsm_test" % (port,)
reactor = protonReactor.ProtonReactor()
+
+ def clientFactory(address):
+ return TestClientWrapper(reactor.createClient(address))
try:
yield (reactor,
- partial(ProtonReactorClient, serverAddress),
+ partial(clientFactory, ("127.0.0.1", port)),
("127.0.0.1", port))
finally:
reactor.stop()
diff --git a/yajsonrpc/client.py b/yajsonrpc/client.py
index 8bad01b..b7d2786 100644
--- a/yajsonrpc/client.py
+++ b/yajsonrpc/client.py
@@ -1,7 +1,4 @@
import json
-import socket
-import logging
-import uuid
from yajsonrpc import \
JsonRpcError, \
asyncoreReactor
@@ -46,57 +43,3 @@
def close(self):
self._transport.close()
-
-
-class ProtonReactorClient(object):
- log = logging.getLogger("ProtonReactorClient")
-
- def __init__(self, brokerAddress):
- self._serverAddress = brokerAddress
- self._msngr = proton.Messenger("client-%s" % str(uuid.uuid4()))
-
- def connect(self):
- self._msngr.start()
-
- def send(self, data, timeout=None):
- if timeout is None:
- timeout = -1
- else:
- timeout *= 1000
-
- msg = proton.Message()
- msg.address = self._serverAddress
- msg.body = unicode(data)
- self._msngr.timeout = timeout
- t = self._msngr.put(msg)
- try:
- self._msngr.send()
- except:
- self._msngr.settle(t)
- raise
-
- def recv(self, timeout=None):
- if timeout is None:
- timeout = -1
- else:
- timeout *= 1000
-
- self._msngr.timeout = timeout
- self.log.debug("Waiting for message")
- try:
- self._msngr.recv(1)
- finally:
- self.log.debug("Done waiting for message")
-
- if not self._msngr.incoming:
- raise socket.timeout()
-
- msg = proton.Message()
- t = self._msngr.get(msg)
- self._msngr.settle(t)
-
- return msg.body
-
- def close(self):
- self._msngr.timeout = 1000
- self._msngr.stop()
diff --git a/yajsonrpc/protonReactor.py b/yajsonrpc/protonReactor.py
index 557600c..2796e24 100644
--- a/yajsonrpc/protonReactor.py
+++ b/yajsonrpc/protonReactor.py
@@ -24,7 +24,14 @@
FAILED = 0
CONNECTED = 1
-AUTHENTICATING = 2
+SERVER_AUTH = 2
+CLIENT_AUTH = 3
+
+
+# Used for reactor coroutines
+class Return(object):
+ def __init__(self, value):
+ self.value = value
class ProtonListener(object):
@@ -40,15 +47,71 @@
class ProtonClient(object):
- def __init__(self, reactor, connection, connector, session):
+ log = logging.getLogger("jsonrpc.ProtonClient")
+
+ def __init__(self, reactor, connection, connector, session, address):
+ self._address = address
self.connector = connector
self.connection = connection
self.session = session
self.sender = None
- self.links = []
self._inbox = None
self._outbox = Queue()
self._reactor = reactor
+ self._connected = False
+
+ def closed(self):
+ return (self.connector is None or
+ proton.pn_connector_closed(self.connector))
+
+ def connect(self):
+ res = self._reactor._scheduleOp(True, self._openClientSession)
+ self.log.debug("Connected successfully to server")
+ if res == -1:
+ raise Exception("Could not connect to server")
+
+ def _openClientSession(self):
+ host, port = self._address
+ amqpAddress = "ampq://%s:%d/vdsm" % (host, port)
+ senderName = "Client (%s)" % (amqpAddress,)
+
+ self.connector = proton.pn_connector(self._reactor._driver,
+ host, str(port), None)
+ if self.connector is None:
+ # TODO: proper exception
+ raise Exception("Could not create connector")
+
+ self.connection = proton.pn_connection()
+ proton.pn_connector_set_connection(self.connector, self.connection)
+
+ sasl = proton.pn_connector_sasl(self.connector)
+ proton.pn_sasl_mechanisms(sasl, "ANONYMOUS")
+ proton.pn_sasl_client(sasl)
+
+ proton.pn_connector_set_context(self.connector, CLIENT_AUTH)
+ self.log.debug("Opening active connection")
+ proton.pn_connection_open(self.connection)
+
+ while True:
+ # TODO: Handle connection being closed mid authentication
+ if proton.pn_sasl_state(sasl) in (proton.PN_SASL_PASS,):
+ proton.pn_connector_set_context(self.connector, CONNECTED)
+ break
+
+ if proton.pn_sasl_state(sasl) == proton.PN_SASL_FAIL:
+ yield Return(-1)
+
+ yield
+
+ self.session = proton.pn_session(self.connection)
+ proton.pn_session_open(self.session)
+ proton.pn_session_set_context(self.session, self)
+
+ link = proton.pn_sender(self.session, senderName)
+ dst = proton.pn_link_target(link)
+ proton.pn_terminus_set_address(dst, amqpAddress)
+ self.sender = link
+ yield Return(1)
def _pushIncomingMessage(self, msg):
try:
@@ -78,16 +141,17 @@
def __init__(self, deliveryTimeout=5):
self._isRunning = False
+ self._coroutines = []
self._driver = proton.pn_driver()
- self._sessionContexts = []
self._deliveryTimeout = deliveryTimeout
self._commandQueue = Queue()
self._listeners = {}
def _activate(self, connector, cond):
- self._scheduleOp(False, proton.pn_connector_activate, connector, cond)
+ self._scheduleOp(False, proton.pn_connector_activate,
+ connector, cond)
def _convertTimeout(self, timeout):
"""
@@ -111,7 +175,7 @@
while l:
self.log.debug("Accepting Connection.")
connector = proton.pn_listener_accept(l)
- proton.pn_connector_set_context(connector, AUTHENTICATING)
+ proton.pn_connector_set_context(connector, SERVER_AUTH)
l = proton.pn_driver_listener(self._driver)
@@ -157,10 +221,13 @@
proton.pn_connector_process(connector)
state = proton.pn_connector_context(connector)
- if state == AUTHENTICATING:
+ if state == SERVER_AUTH:
self._authenticateConnector(connector)
elif state == CONNECTED:
self._serviceConnector(connector)
+ # Client authentication is handeled in a coroutine
+ elif state == CLIENT_AUTH:
+ pass
else:
self.log.warning("Unknown Connection state '%s'" % state)
@@ -173,16 +240,19 @@
self.log.debug("Connection Opened.")
proton.pn_connection_open(conn)
+ def createClient(self, address):
+ client = ProtonClient(self, None, None, None, address)
+ return client
+
def _openPendingSessions(self, conn, connector):
ssn = proton.pn_session_head(conn, proton.PN_LOCAL_UNINIT)
while ssn:
proton.pn_session_open(ssn)
- ctx = ProtonClient(self, conn, connector, ssn)
+ ctx = ProtonClient(self, conn, connector, ssn, None)
l = proton.pn_connector_listener(connector)
listener = proton.pn_listener_context(l)
listener._acceptHandler(listener, ctx)
- self._sessionContexts.append(ctx)
proton.pn_session_set_context(ssn, ctx)
self.log.debug("Session Opened.")
ssn = proton.pn_session_next(ssn, proton.PN_LOCAL_UNINIT)
@@ -197,19 +267,20 @@
proton.pn_link_remote_target(link))
ssn = proton.pn_link_session(link)
+ client = proton.pn_session_get_context(ssn)
if proton.pn_link_is_sender(link):
- for ctx in self._sessionContexts:
- if ctx['session'] != ssn:
- continue
+ if client.sender != link:
+ self.log.debug("Already have a sender opened for session")
+ proton.pn_link_close(link)
+ else:
+ self.log.debug("Opening Link to send messages")
+ proton.pn_link_open(link)
- ctx['links'].append(link)
- self.log.debug("Opening Link to send Events")
-
- if proton.pn_link_is_receiver(link):
+ elif proton.pn_link_is_receiver(link):
self.log.debug("Opening Link to recv messages")
proton.pn_link_flow(link, 1)
+ proton.pn_link_open(link)
- proton.pn_link_open(link)
link = proton.pn_link_next(link, proton.PN_LOCAL_UNINIT)
def _processDeliveries(self, conn, connector):
@@ -259,12 +330,10 @@
while link:
self.log.debug("Closing Link")
proton.pn_link_close(link)
- for ctx in self._sessionContexts:
- if link in ctx.links:
- ctx.links.remove(link)
-
- if link == ctx.sender:
- ctx.sender = None
+ ssn = proton.pn_link_session(link)
+ client = proton.pn_session_get_context(ssn)
+ if link == client.sender:
+ client.sender = None
link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE |
proton.PN_REMOTE_CLOSED))
@@ -275,7 +344,6 @@
while ssn:
self.log.debug("Closing Session")
proton.pn_session_close(ssn)
- self._sessionContexts.remove(proton.pn_session_get_context(ssn))
ssn = proton.pn_session_next(ssn, (proton.PN_LOCAL_ACTIVE |
proton.PN_REMOTE_CLOSED))
@@ -284,11 +352,15 @@
proton.PN_REMOTE_CLOSED)):
proton.pn_connection_close(conn)
- def _queueOutgoingDeliveries(self, conn):
- ctxs = (ctx for ctx in self._sessionContexts
- if ctx.connection == conn)
+ def _iterSessions(self, conn, flags):
+ ssn = proton.pn_session_head(conn, flags)
+ while ssn:
+ yield ssn
+ ssn = proton.pn_session_next(ssn, flags)
- for ctx in ctxs:
+ def _queueOutgoingDeliveries(self, conn):
+ for ssn in self._iterSessions(conn, proton.PN_LOCAL_ACTIVE):
+ ctx = proton.pn_session_get_context(ssn)
sender = ctx.sender
if sender is None:
@@ -296,7 +368,6 @@
sender = proton.pn_sender(ctx.session,
"sender-%s" % str(uuid.uuid4()))
ctx.sender = sender
- proton.pn_link_open(sender)
continue
if proton.pn_link_credit(sender) == 0:
@@ -315,6 +386,7 @@
proton.pn_delivery(sender,
"response-delivery-%s" % str(uuid.uuid4()))
+ self.log.debug("Queued what I could")
def _serviceConnector(self, connector):
self.log.debug("Service Connector")
@@ -399,8 +471,22 @@
else:
cmd, evt, _ = r
res = cmd()
- if evt is not None:
+ if hasattr(res, "next"):
+ self._coroutines.append((res, r))
+
+ elif evt is not None:
r[2] = res
+ evt.set()
+
+ def _processCoroutines(self):
+ for cr, req in self._coroutines[:]:
+ res = cr.next()
+ if isinstance(res, Return):
+ cr.close()
+ evt = req[1]
+ self._coroutines.remove((cr, req))
+ if evt is not None:
+ req[2] = res.value
evt.set()
def _scheduleOp(self, sync, op, *args, **kwargs):
@@ -423,6 +509,7 @@
self._emptyCommandQueue()
self._acceptConnectionRequests()
self._processConnectors()
+ self._processCoroutines()
l = proton.pn_listener_head(self._driver)
while l:
--
To view, visit http://gerrit.ovirt.org/12303
To unsubscribe, visit http://gerrit.ovirt.org/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I14bc36adf2bd7536227138398761818a77174231
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <smizrahi(a)redhat.com>
10 years, 9 months