[mingw-qpid-cpp: 20/28] Re-generated patch from the upstream git.

Kalev Lember kalev at fedoraproject.org
Wed Mar 7 17:18:29 UTC 2012


commit a085373575d541cc02dd4844354298cf1cb2662b
Author: Ted Ross <tross at redhat.com>
Date:   Thu Sep 22 16:30:03 2011 -0400

    Re-generated patch from the upstream git.

 fedora.patch | 2225 +++++++++++++++++++++++-----------------------------------
 1 files changed, 879 insertions(+), 1346 deletions(-)
---
diff --git a/fedora.patch b/fedora.patch
index 8172f66..87db2e0 100644
--- a/fedora.patch
+++ b/fedora.patch
@@ -1,721 +1,3 @@
-From 00d9b16761b7f237c6b1418995cc9367aebac0f5 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Tue, 12 Jul 2011 11:49:32 +0000
-Subject: [PATCH 01/14] QPID-3275 - QMF Console asynchronous correlation-id
- should be scoped to the session, not the specific
- agent
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1145557 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/src/qmf/Agent.cpp            |   29 ++++++-----------------------
- qpid/cpp/src/qmf/AgentImpl.h          |    1 -
- qpid/cpp/src/qmf/ConsoleSession.cpp   |    2 +-
- qpid/cpp/src/qmf/ConsoleSessionImpl.h |    3 +++
- 4 files changed, 10 insertions(+), 25 deletions(-)
-
-diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
-index 915f2a1..684f8e4 100644
---- a/qpid/cpp/src/qmf/Agent.cpp
-+++ b/qpid/cpp/src/qmf/Agent.cpp
-@@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
- 
- AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
-     name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
--    sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
-+    sender(session.directSender), schemaCache(s.schemaCache)
- {
- }
- 
-@@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const
- ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
- {
-     boost::shared_ptr<SyncContext> context(new SyncContext());
--    uint32_t correlator;
-+    uint32_t correlator(session.correlator());
-     ConsoleEvent result;
- 
-     {
-         qpid::sys::Mutex::ScopedLock l(lock);
--        correlator = nextCorrelator++;
-         contextMap[correlator] = context;
-     }
-     try {
-@@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout)
- 
- uint32_t AgentImpl::queryAsync(const Query& query)
- {
--    uint32_t correlator;
--
--    {
--        qpid::sys::Mutex::ScopedLock l(lock);
--        correlator = nextCorrelator++;
--    }
-+    uint32_t correlator(session.correlator());
- 
-     sendQuery(query, correlator);
-     return correlator;
-@@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text)
- ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout)
- {
-     boost::shared_ptr<SyncContext> context(new SyncContext());
--    uint32_t correlator;
-+    uint32_t correlator(session.correlator());
-     ConsoleEvent result;
- 
-     {
-         qpid::sys::Mutex::ScopedLock l(lock);
--        correlator = nextCorrelator++;
-         contextMap[correlator] = context;
-     }
-     try {
-@@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg
- 
- uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr)
- {
--    uint32_t correlator;
--
--    {
--        qpid::sys::Mutex::ScopedLock l(lock);
--        correlator = nextCorrelator++;
--    }
-+    uint32_t correlator(session.correlator());
- 
-     sendMethod(method, args, addr, correlator);
-     return correlator;
-@@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
- 
- void AgentImpl::sendSchemaRequest(const SchemaId& id)
- {
--    uint32_t correlator;
--
--    {
--        qpid::sys::Mutex::ScopedLock l(lock);
--        correlator = nextCorrelator++;
--    }
-+    uint32_t correlator(session.correlator());
- 
-     if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
-         Query query(QUERY_SCHEMA, id);
-diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h
-index 7fa4f43..09754a3 100644
---- a/qpid/cpp/src/qmf/AgentImpl.h
-+++ b/qpid/cpp/src/qmf/AgentImpl.h
-@@ -99,7 +99,6 @@ namespace qmf {
-         uint32_t capability;
-         qpid::messaging::Sender sender;
-         qpid::types::Variant::Map attributes;
--        uint32_t nextCorrelator;
-         std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
-         boost::shared_ptr<SchemaCache> schemaCache;
-         mutable std::set<std::string> packageSet;
-diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
-index 5df0d83..7b51d80 100644
---- a/qpid/cpp/src/qmf/ConsoleSession.cpp
-+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
-@@ -68,7 +68,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
- ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-     connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
-     opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
--    connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
-+    connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
- {
-     if (!options.empty()) {
-         qpid::messaging::AddressParser parser(options);
-diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-index 411b3f0..429dfc4 100644
---- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-@@ -90,6 +90,8 @@ namespace qmf {
-         std::string directBase;
-         std::string topicBase;
-         boost::shared_ptr<SchemaCache> schemaCache;
-+        qpid::sys::Mutex corrlock;
-+        uint32_t nextCorrelator;
- 
-         void enqueueEvent(const ConsoleEvent&);
-         void enqueueEventLH(const ConsoleEvent&);
-@@ -100,6 +102,7 @@ namespace qmf {
-         void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
-         void periodicProcessing(uint64_t);
-         void run();
-+        uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
- 
-         friend class AgentImpl;
-     };
--- 
-1.7.4.4
-
-From 40738a2501f8bfe2f85e1d26790584ce034bd930 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Tue, 12 Jul 2011 16:11:34 +0000
-Subject: [PATCH 02/14] QPID-3344 - Comparisons of const DataAddr objects are
- incorrect Applied patch from Zane Bitter
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1145644 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/include/qmf/DataAddr.h           |    3 +++
- qpid/cpp/src/qmf/DataAddr.cpp             |    6 ++++--
- qpid/cpp/src/qmf/DataAddrImpl.h           |    4 ++--
- qpid/cpp/src/qpid/store/StorageProvider.h |    2 +-
- 4 files changed, 10 insertions(+), 5 deletions(-)
-
-diff --git a/qpid/cpp/include/qmf/DataAddr.h b/qpid/cpp/include/qmf/DataAddr.h
-index 63d309c..20c4690 100644
---- a/qpid/cpp/include/qmf/DataAddr.h
-+++ b/qpid/cpp/include/qmf/DataAddr.h
-@@ -51,6 +51,9 @@ namespace qmf {
-         QMF_EXTERN uint32_t getAgentEpoch() const;
-         QMF_EXTERN qpid::types::Variant::Map asMap() const;
- 
-+        QMF_EXTERN bool operator==(const DataAddr&) const;
-+        QMF_EXTERN bool operator<(const DataAddr&) const;
-+
- #ifndef SWIG
-     private:
-         friend class qmf::PrivateImplRef<DataAddr>;
-diff --git a/qpid/cpp/src/qmf/DataAddr.cpp b/qpid/cpp/src/qmf/DataAddr.cpp
-index fb51d57..d16e120 100644
---- a/qpid/cpp/src/qmf/DataAddr.cpp
-+++ b/qpid/cpp/src/qmf/DataAddr.cpp
-@@ -36,7 +36,9 @@ DataAddr::~DataAddr() { PI::dtor(*this); }
- DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
- 
- bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
-+bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
- bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
-+bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
- 
- DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
- DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
-@@ -45,7 +47,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); }
- uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
- Variant::Map DataAddr::asMap() const { return impl->asMap(); }
- 
--bool DataAddrImpl::operator==(const DataAddrImpl& other)
-+bool DataAddrImpl::operator==(const DataAddrImpl& other) const
- {
-     return
-         agentName == other.agentName &&
-@@ -54,7 +56,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other)
- }
- 
- 
--bool DataAddrImpl::operator<(const DataAddrImpl& other)
-+bool DataAddrImpl::operator<(const DataAddrImpl& other) const
- {
-     if (agentName < other.agentName) return true;
-     if (agentName > other.agentName) return false;
-diff --git a/qpid/cpp/src/qmf/DataAddrImpl.h b/qpid/cpp/src/qmf/DataAddrImpl.h
-index 3f9cae9..11d512f 100644
---- a/qpid/cpp/src/qmf/DataAddrImpl.h
-+++ b/qpid/cpp/src/qmf/DataAddrImpl.h
-@@ -38,8 +38,8 @@ namespace qmf {
-         //
-         // Methods from API handle
-         //
--        bool operator==(const DataAddrImpl&);
--        bool operator<(const DataAddrImpl&);
-+        bool operator==(const DataAddrImpl&) const;
-+        bool operator<(const DataAddrImpl&) const;
-         DataAddrImpl(const qpid::types::Variant::Map&);
-         DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
-             agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}
-diff --git a/qpid/cpp/src/qpid/store/StorageProvider.h b/qpid/cpp/src/qpid/store/StorageProvider.h
-index bc8d187..d162cc5 100644
---- a/qpid/cpp/src/qpid/store/StorageProvider.h
-+++ b/qpid/cpp/src/qpid/store/StorageProvider.h
-@@ -54,7 +54,7 @@ struct QueueEntry {
-     QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
-         : queueId(id), tplStatus(tpl), xid(x) {}
- 
--    bool operator==(const QueueEntry& rhs) {
-+    bool operator==(const QueueEntry& rhs) const {
-         if (queueId != rhs.queueId) return false;
-         if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
-         return xid == rhs.xid;
--- 
-1.7.4.4
-
-From e2257aece703af5d775b087440958e19702b92a0 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Fri, 5 Aug 2011 15:24:16 +0000
-Subject: [PATCH 03/14] NO-JIRA - Added missing template file in distribution.
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1154264 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/managementgen/Makefile.am |    1 +
- 1 files changed, 1 insertions(+), 0 deletions(-)
-
-diff --git a/qpid/cpp/managementgen/Makefile.am b/qpid/cpp/managementgen/Makefile.am
-index e10dd63..4fc5edc 100644
---- a/qpid/cpp/managementgen/Makefile.am
-+++ b/qpid/cpp/managementgen/Makefile.am
-@@ -32,6 +32,7 @@ pkgpyexec_qmfgentmpl_PYTHON = \
- 	qmfgen/templates/Args.h \
- 	qmfgen/templates/Class.cpp \
- 	qmfgen/templates/Class.h \
-+	qmfgen/templates/CMakeLists.cmake \
- 	qmfgen/templates/Event.cpp \
- 	qmfgen/templates/Event.h \
- 	qmfgen/templates/Makefile.mk \
--- 
-1.7.4.4
-
-From f654585bacf244e33cdb5a8a1ece6c15b18ecdc1 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Mon, 15 Aug 2011 16:47:56 +0000
-Subject: [PATCH 04/14] QPID-3423 - Timing and Performance Improvements in QMF
- Libraries
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1157907 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/include/qmf/AgentSession.h             |    5 +++
- qpid/cpp/include/qmf/ConsoleSession.h           |    4 ++
- qpid/cpp/src/qmf/AgentSession.cpp               |   41 ++++++++++++++++------
- qpid/cpp/src/qmf/ConsoleSession.cpp             |   43 ++++++++++++++++-------
- qpid/cpp/src/qmf/ConsoleSessionImpl.h           |    1 +
- qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp |   19 ++++++++--
- 6 files changed, 86 insertions(+), 27 deletions(-)
-
-diff --git a/qpid/cpp/include/qmf/AgentSession.h b/qpid/cpp/include/qmf/AgentSession.h
-index 1eeb252..5ecfb04 100644
---- a/qpid/cpp/include/qmf/AgentSession.h
-+++ b/qpid/cpp/include/qmf/AgentSession.h
-@@ -71,6 +71,11 @@ namespace qmf {
-          *                                    If False: Listen only on the routable direct address
-          *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce strict access control to the network
-          *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
-+         *    max-thread-wait-time:N     - Time (in seconds) the session thread will wait for messages from the network between
-+         *                                 periodic background processing passes. [default: 5]
-+         *                                 Must not be greater than 'interval'.  Larger numbers will cause fewer wake-ups but will
-+         *                                 increase the time it takes to shut down the process.  This setting will not affect the
-+         *                                 agent's response time for queries or method invocation.
-          */
-         QMF_EXTERN AgentSession(qpid::messaging::Connection& conn, const std::string& options="");
- 
-diff --git a/qpid/cpp/include/qmf/ConsoleSession.h b/qpid/cpp/include/qmf/ConsoleSession.h
-index 6008036..5e3a091 100644
---- a/qpid/cpp/include/qmf/ConsoleSession.h
-+++ b/qpid/cpp/include/qmf/ConsoleSession.h
-@@ -61,6 +61,10 @@ namespace qmf {
-          *                                    If False: Listen only on the routable direct address
-          *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce strict access control to the network
-          *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
-+         *    max-thread-wait-time:N     - Time (in seconds) the session thread will wait for messages from the network between
-+         *                                 periodic background processing passes.
-+         *                                 Must not be greater than 60.  Larger numbers will cause fewer wake-ups but will
-+         *                                 increase the time it takes to shut down the process. [default: 5]
-          */
-         QMF_EXTERN ConsoleSession(qpid::messaging::Connection& conn, const std::string& options="");
- 
-diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
-index 71d3693..a88782d 100644
---- a/qpid/cpp/src/qmf/AgentSession.cpp
-+++ b/qpid/cpp/src/qmf/AgentSession.cpp
-@@ -120,6 +120,7 @@ namespace qmf {
-         bool publicEvents;
-         bool listenOnDirect;
-         bool strictSecurity;
-+        uint32_t maxThreadWaitTime;
-         uint64_t schemaUpdateTime;
-         string directBase;
-         string topicBase;
-@@ -185,7 +186,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
-     bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
-     externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
-     maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
--    listenOnDirect(true), strictSecurity(false),
-+    listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
-     schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
- {
-     //
-@@ -246,7 +247,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
-         iter = optMap.find("strict-security");
-         if (iter != optMap.end())
-             strictSecurity = iter->second.asBool();
-+
-+        iter = optMap.find("max-thread-wait-time");
-+        if (iter != optMap.end())
-+            maxThreadWaitTime = iter->second.asUint32();
-     }
-+
-+    if (maxThreadWaitTime > interval)
-+        maxThreadWaitTime = interval;
- }
- 
- 
-@@ -254,6 +262,11 @@ AgentSessionImpl::~AgentSessionImpl()
- {
-     if (opened)
-         close();
-+
-+    if (thread) {
-+        thread->join();
-+        delete thread;
-+    }
- }
- 
- 
-@@ -262,6 +275,12 @@ void AgentSessionImpl::open()
-     if (opened)
-         throw QmfException("The session is already open");
- 
-+    // If the thread exists, join and delete it before creating a new one.
-+    if (thread) {
-+        thread->join();
-+        delete thread;
-+    }
-+
-     const string addrArgs(";{create:never,node:{type:topic}}");
-     const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
-     attributes["_direct_subject"] = routableAddr;
-@@ -304,13 +323,8 @@ void AgentSessionImpl::close()
-     if (!opened)
-         return;
- 
--    // Stop and join the receiver thread
-+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
-     threadCanceled = true;
--    thread->join();
--    delete thread;
--
--    // Close the AMQP session
--    session.close();
-     opened = false;
- }
- 
-@@ -320,9 +334,13 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
-     uint64_t milliseconds = timeout.getMilliseconds();
-     qpid::sys::Mutex::ScopedLock l(lock);
- 
--    if (eventQueue.empty() && milliseconds > 0)
--        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
--                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
-+    if (eventQueue.empty() && milliseconds > 0) {
-+        int64_t nsecs(qpid::sys::TIME_INFINITE);
-+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
-+            nsecs = (int64_t) milliseconds * 1000000;
-+        qpid::sys::Duration then(nsecs);
-+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
-+    }
- 
-     if (!eventQueue.empty()) {
-         event = eventQueue.front();
-@@ -1050,7 +1068,7 @@ void AgentSessionImpl::run()
-             periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
- 
-             Receiver rx;
--            bool valid = session.nextReceiver(rx, Duration::SECOND);
-+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
-             if (threadCanceled)
-                 break;
-             if (valid) {
-@@ -1067,6 +1085,7 @@ void AgentSessionImpl::run()
-         enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
-     }
- 
-+    session.close();
-     QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
- }
- 
-diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
-index 7b51d80..af83595 100644
---- a/qpid/cpp/src/qmf/ConsoleSession.cpp
-+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
-@@ -66,7 +66,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
- //========================================================================================
- 
- ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
--    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
-+    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
-     opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
-     connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
- {
-@@ -92,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-         iter = optMap.find("strict-security");
-         if (iter != optMap.end())
-             strictSecurity = iter->second.asBool();
-+
-+        iter = optMap.find("max-thread-wait-time");
-+        if (iter != optMap.end())
-+            maxThreadWaitTime = iter->second.asUint32();
-     }
-+
-+    if (maxThreadWaitTime > 60)
-+        maxThreadWaitTime = 60;
- }
- 
- 
-@@ -100,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
- {
-     if (opened)
-         close();
-+
-+    if (thread) {
-+        thread->join();
-+        delete thread;
-+    }
- }
- 
- 
-@@ -154,6 +166,12 @@ void ConsoleSessionImpl::open()
-     if (opened)
-         throw QmfException("The session is already open");
- 
-+    // If the thread exists, join and delete it before creating a new one.
-+    if (thread) {
-+        thread->join();
-+        delete thread;
-+    }
-+
-     // Establish messaging addresses
-     directBase = "qmf." + domain + ".direct";
-     topicBase = "qmf." + domain + ".topic";
-@@ -182,14 +200,13 @@ void ConsoleSessionImpl::open()
- 
-     // Start the receiver thread
-     threadCanceled = false;
-+    opened = true;
-     thread = new qpid::sys::Thread(*this);
- 
-     // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
-     sendBrokerLocate();
-     if (agentQuery)
-         sendAgentLocate();
--
--    opened = true;
- }
- 
- 
-@@ -198,13 +215,8 @@ void ConsoleSessionImpl::close()
-     if (!opened)
-         throw QmfException("The session is already closed");
- 
--    // Stop and join the receiver thread
-+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
-     threadCanceled = true;
--    thread->join();
--    delete thread;
--
--    // Close the AMQP session
--    session.close();
-     opened = false;
- }
- 
-@@ -214,9 +226,13 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
-     uint64_t milliseconds = timeout.getMilliseconds();
-     qpid::sys::Mutex::ScopedLock l(lock);
- 
--    if (eventQueue.empty() && milliseconds > 0)
--        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
--                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
-+    if (eventQueue.empty() && milliseconds > 0) {
-+        int64_t nsecs(qpid::sys::TIME_INFINITE);
-+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
-+            nsecs = (int64_t) milliseconds * 1000000;
-+        qpid::sys::Duration then(nsecs);
-+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
-+    }
- 
-     if (!eventQueue.empty()) {
-         event = eventQueue.front();
-@@ -596,7 +612,7 @@ void ConsoleSessionImpl::run()
-                                qpid::sys::TIME_SEC);
- 
-             Receiver rx;
--            bool valid = session.nextReceiver(rx, Duration::SECOND);
-+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
-             if (threadCanceled)
-                 break;
-             if (valid) {
-@@ -613,6 +629,7 @@ void ConsoleSessionImpl::run()
-         enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
-     }
- 
-+    session.close();
-     QPID_LOG(debug, "ConsoleSession thread exiting");
- }
- 
-diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-index 429dfc4..478d24e 100644
---- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-@@ -76,6 +76,7 @@ namespace qmf {
-         uint32_t maxAgentAgeMinutes;
-         bool listenOnDirect;
-         bool strictSecurity;
-+        uint32_t maxThreadWaitTime;
-         Query agentQuery;
-         bool opened;
-         std::queue<ConsoleEvent> eventQueue;
-diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
-index 633401e..f183ff8 100644
---- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
-+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
-@@ -1378,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThread::isSleeping() const
- 
- void ManagementAgentImpl::PublishThread::run()
- {
--    uint16_t    totalSleep;
-+    uint16_t totalSleep;
-+    uint16_t sleepTime;
- 
-     while (!shutdown) {
-         agent.periodicProcessing();
-         totalSleep = 0;
--        while (totalSleep++ < agent.getInterval() && !shutdown) {
--            ::sleep(1);
-+
-+        //
-+        // Calculate a sleep time that is no greater than 5 seconds and
-+        // no less than 1 second.
-+        //
-+        sleepTime = agent.getInterval();
-+        if (sleepTime > 5)
-+            sleepTime = 5;
-+        else if (sleepTime == 0)
-+            sleepTime = 1;
-+
-+        while (totalSleep < agent.getInterval() && !shutdown) {
-+            ::sleep(sleepTime);
-+            totalSleep += sleepTime;
-         }
-     }
- }
--- 
-1.7.4.4
-
-From 6ca9eda32ed8c809524b913dd169afa4a35eb58d Mon Sep 17 00:00:00 2001
-From: Ted Ross <ross at localhost.localdomain>
-Date: Mon, 15 Aug 2011 16:25:52 -0400
-Subject: [PATCH 05/14] Fixed EXTERN definitions
-
----
- qpid/cpp/include/qpid/framing/FieldTable.h |    4 ++--
- 1 files changed, 2 insertions(+), 2 deletions(-)
-
-diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h
-index e8ec524..bdcef6d 100644
---- a/qpid/cpp/include/qpid/framing/FieldTable.h
-+++ b/qpid/cpp/include/qpid/framing/FieldTable.h
-@@ -65,8 +65,8 @@ class FieldTable
-     QPID_COMMON_EXTERN void decode(Buffer& buffer);
- 
-     QPID_COMMON_EXTERN int count() const;
--    QPID_COMMON_EXTERN size_t size() const { return values.size(); }
--    QPID_COMMON_EXTERN bool empty() { return size() == 0; }
-+    QPID_COMMON_INLINE_EXTERN size_t size() const { return values.size(); }
-+    QPID_COMMON_INLINE_EXTERN bool empty() { return size() == 0; }
-     QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value);
-     QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const;
-     QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; }
--- 
-1.7.4.4
-
-From 1f68475caffaa3c36ad921684bbaea35c68ce375 Mon Sep 17 00:00:00 2001
-From: Ted Ross <ross at localhost.localdomain>
-Date: Mon, 15 Aug 2011 17:10:15 -0400
-Subject: [PATCH 06/14] Enable qmf2 for mingw32
-
----
- qpid/cpp/src/CMakeLists.txt |    4 ++--
- 1 files changed, 2 insertions(+), 2 deletions(-)
-
-diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
-index 80315b9..978d962 100644
---- a/qpid/cpp/src/CMakeLists.txt
-+++ b/qpid/cpp/src/CMakeLists.txt
-@@ -1081,7 +1081,7 @@ install (TARGETS qmf OPTIONAL
-          COMPONENT ${QPID_COMPONENT_QMF})
- install_pdb (qmf ${QPID_COMPONENT_QMF})
- 
--if(NOT WIN32)
-+#if(NOT WIN32)
-     set (qmf2_HEADERS
-         ../include/qmf/AgentEvent.h
-         ../include/qmf/Agent.h
-@@ -1156,7 +1156,7 @@ if(NOT WIN32)
-             DESTINATION ${QPID_INSTALL_INCLUDEDIR}/qmf
-             COMPONENT ${QPID_COMPONENT_QMF})
-     install_pdb (qmf2 ${QPID_COMPONENT_QMF})
--endif (NOT WIN32)
-+#endif (NOT WIN32)
- 
- set (qmfengine_SOURCES
-      qmf/engine/Agent.cpp
--- 
-1.7.4.4
-
-From 337286e57705835975acd215aa1990f3597abc6a Mon Sep 17 00:00:00 2001
-From: Ted Ross <ross at localhost.localdomain>
-Date: Mon, 15 Aug 2011 17:36:24 -0400
-Subject: [PATCH 07/14] Fixed externs in AddressParser
-
----
- qpid/cpp/src/qpid/messaging/AddressParser.h |   10 +++++-----
- 1 files changed, 5 insertions(+), 5 deletions(-)
-
-diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.h b/qpid/cpp/src/qpid/messaging/AddressParser.h
-index 1635331..c51200c 100644
---- a/qpid/cpp/src/qpid/messaging/AddressParser.h
-+++ b/qpid/cpp/src/qpid/messaging/AddressParser.h
-@@ -26,13 +26,13 @@
- namespace qpid {
- namespace messaging {
- 
--class AddressParser
-+class QPID_MESSAGING_CLASS_EXTERN AddressParser
- {
-   public:
--    AddressParser(const std::string&);
--    bool parse(Address& address);
--    bool parseMap(qpid::types::Variant::Map& map);
--    bool parseList(qpid::types::Variant::List& list);
-+    QPID_MESSAGING_EXTERN AddressParser(const std::string&);
-+    QPID_MESSAGING_EXTERN bool parse(Address& address);
-+    QPID_MESSAGING_EXTERN bool parseMap(qpid::types::Variant::Map& map);
-+    QPID_MESSAGING_EXTERN bool parseList(qpid::types::Variant::List& list);
-   private:
-     const std::string& input;
-     std::string::size_type current;
--- 
-1.7.4.4
-
-From 666f43e30629559d97a71bd737d36ab14d3eeb86 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Tue, 13 Sep 2011 19:34:38 +0000
-Subject: [PATCH 08/14] QPID-3484 - QMF Main-Loop Integration Applied patch
- from Darryl Pierce.
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1170314 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am    |    5 +-
- .../qmf2/examples/cpp/event_driven_list_agents.cpp |  107 ++++++++++++
- qpid/cpp/include/qmf/AgentSession.h                |    1 +
- qpid/cpp/include/qmf/ConsoleSession.h              |    1 +
- qpid/cpp/include/qmf/posix/EventNotifier.h         |   62 +++++++
- qpid/cpp/src/CMakeLists.txt                        |    5 +
- qpid/cpp/src/qmf.mk                                |    4 +
- qpid/cpp/src/qmf/AgentSession.cpp                  |  167 +++++--------------
- qpid/cpp/src/qmf/AgentSessionImpl.h                |  175 ++++++++++++++++++++
- qpid/cpp/src/qmf/ConsoleSession.cpp                |   38 ++++-
- qpid/cpp/src/qmf/ConsoleSessionImpl.h              |   17 ++
- qpid/cpp/src/qmf/EventNotifierImpl.cpp             |   56 +++++++
- qpid/cpp/src/qmf/EventNotifierImpl.h               |   48 ++++++
- qpid/cpp/src/qmf/PosixEventNotifier.cpp            |   63 +++++++
- qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp        |  108 ++++++++++++
- qpid/cpp/src/qmf/PosixEventNotifierImpl.h          |   61 +++++++
- qpid/cpp/src/tests/Qmf2.cpp                        |  104 ++++++++++++-
- 17 files changed, 890 insertions(+), 132 deletions(-)
- create mode 100644 qpid/cpp/bindings/qmf2/examples/cpp/event_driven_list_agents.cpp
- create mode 100644 qpid/cpp/include/qmf/posix/EventNotifier.h
- create mode 100644 qpid/cpp/src/qmf/AgentSessionImpl.h
- create mode 100644 qpid/cpp/src/qmf/EventNotifierImpl.cpp
- create mode 100644 qpid/cpp/src/qmf/EventNotifierImpl.h
- create mode 100644 qpid/cpp/src/qmf/PosixEventNotifier.cpp
- create mode 100644 qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
- create mode 100644 qpid/cpp/src/qmf/PosixEventNotifierImpl.h
-
 diff --git a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am b/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
 index 84207d4..062fbd0 100644
 --- a/qpid/cpp/bindings/qmf2/examples/cpp/Makefile.am
@@ -852,10 +134,22 @@ index 0000000..c288aa6
 +}
 +
 diff --git a/qpid/cpp/include/qmf/AgentSession.h b/qpid/cpp/include/qmf/AgentSession.h
-index 5ecfb04..589d364 100644
+index 1eeb252..589d364 100644
 --- a/qpid/cpp/include/qmf/AgentSession.h
 +++ b/qpid/cpp/include/qmf/AgentSession.h
-@@ -188,6 +188,7 @@ namespace qmf {
+@@ -71,6 +71,11 @@ namespace qmf {
+          *                                    If False: Listen only on the routable direct address
+          *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce strict access control to the network
+          *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
++         *    max-thread-wait-time:N     - Time (in seconds) the session thread will wait for messages from the network between
++         *                                 periodic background processing passes. [default: 5]
++         *                                 Must not be greater than 'interval'.  Larger numbers will cause fewer wake-ups but will
++         *                                 increase the time it takes to shut down the process.  This setting will not affect the
++         *                                 agent's response time for queries or method invocation.
+          */
+         QMF_EXTERN AgentSession(qpid::messaging::Connection& conn, const std::string& options="");
+ 
+@@ -183,6 +188,7 @@ namespace qmf {
  #ifndef SWIG
      private:
          friend class qmf::PrivateImplRef<AgentSession>;
@@ -864,10 +158,21 @@ index 5ecfb04..589d364 100644
      };
  
 diff --git a/qpid/cpp/include/qmf/ConsoleSession.h b/qpid/cpp/include/qmf/ConsoleSession.h
-index 5e3a091..022485c 100644
+index 6008036..022485c 100644
 --- a/qpid/cpp/include/qmf/ConsoleSession.h
 +++ b/qpid/cpp/include/qmf/ConsoleSession.h
-@@ -123,6 +123,7 @@ namespace qmf {
+@@ -61,6 +61,10 @@ namespace qmf {
+          *                                    If False: Listen only on the routable direct address
+          *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce strict access control to the network
+          *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
++         *    max-thread-wait-time:N     - Time (in seconds) the session thread will wait for messages from the network between
++         *                                 periodic background processing passes.
++         *                                 Must not be greater than 60.  Larger numbers will cause fewer wake-ups but will
++         *                                 increase the time it takes to shut down the process. [default: 5]
+          */
+         QMF_EXTERN ConsoleSession(qpid::messaging::Connection& conn, const std::string& options="");
+ 
+@@ -119,6 +123,7 @@ namespace qmf {
  #ifndef SWIG
      private:
          friend class qmf::PrivateImplRef<ConsoleSession>;
@@ -875,12 +180,26 @@ index 5e3a091..022485c 100644
  #endif
      };
  
+diff --git a/qpid/cpp/include/qmf/DataAddr.h b/qpid/cpp/include/qmf/DataAddr.h
+index 63d309c..20c4690 100644
+--- a/qpid/cpp/include/qmf/DataAddr.h
++++ b/qpid/cpp/include/qmf/DataAddr.h
+@@ -51,6 +51,9 @@ namespace qmf {
+         QMF_EXTERN uint32_t getAgentEpoch() const;
+         QMF_EXTERN qpid::types::Variant::Map asMap() const;
+ 
++        QMF_EXTERN bool operator==(const DataAddr&) const;
++        QMF_EXTERN bool operator<(const DataAddr&) const;
++
+ #ifndef SWIG
+     private:
+         friend class qmf::PrivateImplRef<DataAddr>;
 diff --git a/qpid/cpp/include/qmf/posix/EventNotifier.h b/qpid/cpp/include/qmf/posix/EventNotifier.h
 new file mode 100644
-index 0000000..91817cc
+index 0000000..ebc1cb5
 --- /dev/null
 +++ b/qpid/cpp/include/qmf/posix/EventNotifier.h
-@@ -0,0 +1,62 @@
+@@ -0,0 +1,63 @@
 +#ifndef __QMF_POSIX_EVENT_NOTIFIER_H
 +#define __QMF_POSIX_EVENT_NOTIFIER_H
 +
@@ -921,6 +240,7 @@ index 0000000..91817cc
 +
 +  class QMF_CLASS_EXTERN EventNotifier : public qmf::Handle<qmf::PosixEventNotifierImpl> {
 +  public:
++      QMF_EXTERN EventNotifier(PosixEventNotifierImpl* impl = 0);
 +      QMF_EXTERN EventNotifier(::qmf::AgentSession& agentSession);
 +      QMF_EXTERN EventNotifier(::qmf::ConsoleSession& consoleSession);
 +      QMF_EXTERN EventNotifier(const EventNotifier& that);
@@ -943,29 +263,84 @@ index 0000000..91817cc
 +
 +#endif
 +
+diff --git a/qpid/cpp/include/qpid/framing/FieldTable.h b/qpid/cpp/include/qpid/framing/FieldTable.h
+index e8ec524..bdcef6d 100644
+--- a/qpid/cpp/include/qpid/framing/FieldTable.h
++++ b/qpid/cpp/include/qpid/framing/FieldTable.h
+@@ -65,8 +65,8 @@ class FieldTable
+     QPID_COMMON_EXTERN void decode(Buffer& buffer);
+ 
+     QPID_COMMON_EXTERN int count() const;
+-    QPID_COMMON_EXTERN size_t size() const { return values.size(); }
+-    QPID_COMMON_EXTERN bool empty() { return size() == 0; }
++    QPID_COMMON_INLINE_EXTERN size_t size() const { return values.size(); }
++    QPID_COMMON_INLINE_EXTERN bool empty() { return size() == 0; }
+     QPID_COMMON_EXTERN void set(const std::string& name, const ValuePtr& value);
+     QPID_COMMON_EXTERN ValuePtr get(const std::string& name) const;
+     QPID_COMMON_INLINE_EXTERN bool isSet(const std::string& name) const { return get(name).get() != 0; }
+diff --git a/qpid/cpp/managementgen/Makefile.am b/qpid/cpp/managementgen/Makefile.am
+index e10dd63..4fc5edc 100644
+--- a/qpid/cpp/managementgen/Makefile.am
++++ b/qpid/cpp/managementgen/Makefile.am
+@@ -32,6 +32,7 @@ pkgpyexec_qmfgentmpl_PYTHON = \
+ 	qmfgen/templates/Args.h \
+ 	qmfgen/templates/Class.cpp \
+ 	qmfgen/templates/Class.h \
++	qmfgen/templates/CMakeLists.cmake \
+ 	qmfgen/templates/Event.cpp \
+ 	qmfgen/templates/Event.h \
+ 	qmfgen/templates/Makefile.mk \
 diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
-index 978d962..2986a90 100644
+index 80315b9..0be42b8 100644
 --- a/qpid/cpp/src/CMakeLists.txt
 +++ b/qpid/cpp/src/CMakeLists.txt
-@@ -1093,6 +1093,7 @@ install_pdb (qmf ${QPID_COMPONENT_QMF})
-         ../include/qmf/exceptions.h
-         ../include/qmf/Handle.h
-         ../include/qmf/ImportExport.h
-+        ../include/qmf/posix/EventNotifier.h
-         ../include/qmf/Query.h
-         ../include/qmf/Schema.h
-         ../include/qmf/SchemaId.h
-@@ -1122,6 +1123,10 @@ install_pdb (qmf ${QPID_COMPONENT_QMF})
+@@ -1081,7 +1081,7 @@ install (TARGETS qmf OPTIONAL
+          COMPONENT ${QPID_COMPONENT_QMF})
+ install_pdb (qmf ${QPID_COMPONENT_QMF})
+ 
+-if(NOT WIN32)
++#if(NOT WIN32)
+     set (qmf2_HEADERS
+         ../include/qmf/AgentEvent.h
+         ../include/qmf/Agent.h
+@@ -1122,6 +1122,8 @@ if(NOT WIN32)
          qmf/DataAddrImpl.h
          qmf/Data.cpp
          qmf/DataImpl.h
 +        qmf/EventNotifierImpl.h
 +        qmf/EventNotifierImpl.cpp
-+        qmf/PosixEventNotifier.cpp
-+        qmf/PosixEventNotifierImpl.cpp
          qmf/exceptions.cpp
          qmf/Expression.cpp
          qmf/Expression.h
+@@ -1144,6 +1146,19 @@ if(NOT WIN32)
+         qmf/SubscriptionImpl.h
+         )
+ 
++if(NOT WIN32)
++    set (qmf2_HEADERS
++        ${qmf2_HEADERS}
++        ../include/qmf/posix/EventNotifier.h
++        )
++
++    set (qmf2_SOURCES
++        ${qmf2_SOURCES}
++        qmf/PosixEventNotifier.cpp
++        qmf/PosixEventNotifierImpl.cpp
++        )
++endif (NOT WIN32)
++
+     add_msvc_version (qmf2 library dll)
+     add_library (qmf2 SHARED ${qmf2_SOURCES})
+     target_link_libraries (qmf2 qpidmessaging qpidtypes qpidclient qpidcommon)
+@@ -1156,7 +1171,7 @@ if(NOT WIN32)
+             DESTINATION ${QPID_INSTALL_INCLUDEDIR}/qmf
+             COMPONENT ${QPID_COMPONENT_QMF})
+     install_pdb (qmf2 ${QPID_COMPONENT_QMF})
+-endif (NOT WIN32)
++#endif (NOT WIN32)
+ 
+ set (qmfengine_SOURCES
+      qmf/engine/Agent.cpp
 diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
 index f3462f1..4da8470 100644
 --- a/qpid/cpp/src/qmf.mk
@@ -988,11 +363,106 @@ index f3462f1..4da8470 100644
    qmf/exceptions.cpp		\
    qmf/Expression.cpp		\
    qmf/Expression.h		\
+diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
+index 915f2a1..684f8e4 100644
+--- a/qpid/cpp/src/qmf/Agent.cpp
++++ b/qpid/cpp/src/qmf/Agent.cpp
+@@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
+ 
+ AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
+     name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
+-    sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
++    sender(session.directSender), schemaCache(s.schemaCache)
+ {
+ }
+ 
+@@ -102,12 +102,11 @@ const Variant& AgentImpl::getAttribute(const string& k) const
+ ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
+ {
+     boost::shared_ptr<SyncContext> context(new SyncContext());
+-    uint32_t correlator;
++    uint32_t correlator(session.correlator());
+     ConsoleEvent result;
+ 
+     {
+         qpid::sys::Mutex::ScopedLock l(lock);
+-        correlator = nextCorrelator++;
+         contextMap[correlator] = context;
+     }
+     try {
+@@ -151,12 +150,7 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout)
+ 
+ uint32_t AgentImpl::queryAsync(const Query& query)
+ {
+-    uint32_t correlator;
+-
+-    {
+-        qpid::sys::Mutex::ScopedLock l(lock);
+-        correlator = nextCorrelator++;
+-    }
++    uint32_t correlator(session.correlator());
+ 
+     sendQuery(query, correlator);
+     return correlator;
+@@ -172,12 +166,11 @@ uint32_t AgentImpl::queryAsync(const string& text)
+ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout)
+ {
+     boost::shared_ptr<SyncContext> context(new SyncContext());
+-    uint32_t correlator;
++    uint32_t correlator(session.correlator());
+     ConsoleEvent result;
+ 
+     {
+         qpid::sys::Mutex::ScopedLock l(lock);
+-        correlator = nextCorrelator++;
+         contextMap[correlator] = context;
+     }
+     try {
+@@ -213,12 +206,7 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg
+ 
+ uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr)
+ {
+-    uint32_t correlator;
+-
+-    {
+-        qpid::sys::Mutex::ScopedLock l(lock);
+-        correlator = nextCorrelator++;
+-    }
++    uint32_t correlator(session.correlator());
+ 
+     sendMethod(method, args, addr, correlator);
+     return correlator;
+@@ -596,12 +584,7 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
+ 
+ void AgentImpl::sendSchemaRequest(const SchemaId& id)
+ {
+-    uint32_t correlator;
+-
+-    {
+-        qpid::sys::Mutex::ScopedLock l(lock);
+-        correlator = nextCorrelator++;
+-    }
++    uint32_t correlator(session.correlator());
+ 
+     if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
+         Query query(QUERY_SCHEMA, id);
+diff --git a/qpid/cpp/src/qmf/AgentImpl.h b/qpid/cpp/src/qmf/AgentImpl.h
+index 7fa4f43..09754a3 100644
+--- a/qpid/cpp/src/qmf/AgentImpl.h
++++ b/qpid/cpp/src/qmf/AgentImpl.h
+@@ -99,7 +99,6 @@ namespace qmf {
+         uint32_t capability;
+         qpid::messaging::Sender sender;
+         qpid::types::Variant::Map attributes;
+-        uint32_t nextCorrelator;
+         std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
+         boost::shared_ptr<SchemaCache> schemaCache;
+         mutable std::set<std::string> packageSet;
 diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
-index a88782d..d92b2a4 100644
+index 71d3693..251c25f 100644
 --- a/qpid/cpp/src/qmf/AgentSession.cpp
 +++ b/qpid/cpp/src/qmf/AgentSession.cpp
-@@ -19,134 +19,7 @@
+@@ -19,133 +19,7 @@
   *
   */
  
@@ -1097,7 +567,6 @@ index a88782d..d92b2a4 100644
 -        bool publicEvents;
 -        bool listenOnDirect;
 -        bool strictSecurity;
--        uint32_t maxThreadWaitTime;
 -        uint64_t schemaUpdateTime;
 -        string directBase;
 -        string topicBase;
@@ -1123,12 +592,115 @@ index a88782d..d92b2a4 100644
 -    };
 -}
 -
--typedef qmf::PrivateImplRef<AgentSession> PI;
-+#include "qmf/AgentSessionImpl.h"
+-typedef qmf::PrivateImplRef<AgentSession> PI;
++#include "qmf/AgentSessionImpl.h"
+ 
+ AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
+ AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
+@@ -181,11 +55,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
+ //========================================================================================
+ 
+ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
+-    connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
++    connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
+     bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
+     externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
+     maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
+-    listenOnDirect(true), strictSecurity(false),
++    listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+     schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
+ {
+     //
+@@ -246,7 +120,14 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
+         iter = optMap.find("strict-security");
+         if (iter != optMap.end())
+             strictSecurity = iter->second.asBool();
++
++        iter = optMap.find("max-thread-wait-time");
++        if (iter != optMap.end())
++            maxThreadWaitTime = iter->second.asUint32();
+     }
++
++    if (maxThreadWaitTime > interval)
++        maxThreadWaitTime = interval;
+ }
+ 
+ 
+@@ -254,6 +135,11 @@ AgentSessionImpl::~AgentSessionImpl()
+ {
+     if (opened)
+         close();
++
++    if (thread) {
++        thread->join();
++        delete thread;
++    }
+ }
+ 
+ 
+@@ -262,6 +148,12 @@ void AgentSessionImpl::open()
+     if (opened)
+         throw QmfException("The session is already open");
+ 
++    // If the thread exists, join and delete it before creating a new one.
++    if (thread) {
++        thread->join();
++        delete thread;
++    }
++
+     const string addrArgs(";{create:never,node:{type:topic}}");
+     const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
+     attributes["_direct_subject"] = routableAddr;
+@@ -299,34 +191,47 @@ void AgentSessionImpl::open()
+ }
+ 
+ 
+-void AgentSessionImpl::close()
++void AgentSessionImpl::closeAsync()
+ {
+     if (!opened)
+         return;
+ 
+-    // Stop and join the receiver thread
++    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
+     threadCanceled = true;
+-    thread->join();
+-    delete thread;
+-
+-    // Close the AMQP session
+-    session.close();
+     opened = false;
+ }
+ 
+ 
++void AgentSessionImpl::close()
++{
++    closeAsync();
++
++    if (thread) {
++        thread->join();
++        delete thread;
++        thread = 0;
++    }
++}
++
++
+ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
+ {
+     uint64_t milliseconds = timeout.getMilliseconds();
+     qpid::sys::Mutex::ScopedLock l(lock);
+ 
+-    if (eventQueue.empty() && milliseconds > 0)
+-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
+-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
++    if (eventQueue.empty() && milliseconds > 0) {
++        int64_t nsecs(qpid::sys::TIME_INFINITE);
++        if ((uint64_t)(nsecs / 1000000) > milliseconds)
++            nsecs = (int64_t) milliseconds * 1000000;
++        qpid::sys::Duration then(nsecs);
++        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
++    }
  
- AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
- AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
-@@ -345,6 +218,8 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
      if (!eventQueue.empty()) {
          event = eventQueue.front();
          eventQueue.pop();
@@ -1137,7 +709,7 @@ index a88782d..d92b2a4 100644
          return true;
      }
  
-@@ -359,6 +234,19 @@ int AgentSessionImpl::pendingEvents() const
+@@ -341,6 +246,19 @@ int AgentSessionImpl::pendingEvents() const
  }
  
  
@@ -1157,7 +729,7 @@ index a88782d..d92b2a4 100644
  void AgentSessionImpl::registerSchema(Schema& schema)
  {
      if (!schema.isFinalized())
-@@ -614,8 +502,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
+@@ -596,8 +514,10 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
      qpid::sys::Mutex::ScopedLock l(lock);
      bool notify = eventQueue.empty();
      eventQueue.push(event);
@@ -1169,7 +741,7 @@ index a88782d..d92b2a4 100644
  }
  
  
-@@ -1059,6 +949,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
+@@ -1041,6 +961,13 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
  }
  
  
@@ -1183,7 +755,20 @@ index a88782d..d92b2a4 100644
  void AgentSessionImpl::run()
  {
      QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
-@@ -1089,3 +986,15 @@ void AgentSessionImpl::run()
+@@ -1050,7 +977,7 @@ void AgentSessionImpl::run()
+             periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
+ 
+             Receiver rx;
+-            bool valid = session.nextReceiver(rx, Duration::SECOND);
++            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+             if (threadCanceled)
+                 break;
+             if (valid) {
+@@ -1067,6 +994,19 @@ void AgentSessionImpl::run()
+         enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
+     }
+ 
++    session.close();
      QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
  }
  
@@ -1201,10 +786,10 @@ index a88782d..d92b2a4 100644
 +
 diff --git a/qpid/cpp/src/qmf/AgentSessionImpl.h b/qpid/cpp/src/qmf/AgentSessionImpl.h
 new file mode 100644
-index 0000000..cf1b1d7
+index 0000000..9039a59
 --- /dev/null
 +++ b/qpid/cpp/src/qmf/AgentSessionImpl.h
-@@ -0,0 +1,175 @@
+@@ -0,0 +1,176 @@
 +#ifndef __QMF_AGENT_SESSION_IMPL_H
 +#define __QMF_AGENT_SESSION_IMPL_H
 +
@@ -1291,6 +876,7 @@ index 0000000..cf1b1d7
 +        void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
 +        const string& getName() const { return agentName; }
 +        void open();
++        void closeAsync();
 +        void close();
 +        bool nextEvent(AgentEvent& e, Duration t);
 +        int pendingEvents() const;
@@ -1381,10 +967,124 @@ index 0000000..cf1b1d7
 +#endif
 +
 diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
-index af83595..d084b8a 100644
+index 5df0d83..2dfc894 100644
 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp
 +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
-@@ -237,6 +237,8 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
+@@ -66,9 +66,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
+ //========================================================================================
+ 
+ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
+-    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
+-    opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+-    connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
++    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
++    opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
++    connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
+ {
+     if (!options.empty()) {
+         qpid::messaging::AddressParser parser(options);
+@@ -92,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
+         iter = optMap.find("strict-security");
+         if (iter != optMap.end())
+             strictSecurity = iter->second.asBool();
++
++        iter = optMap.find("max-thread-wait-time");
++        if (iter != optMap.end())
++            maxThreadWaitTime = iter->second.asUint32();
+     }
++
++    if (maxThreadWaitTime > 60)
++        maxThreadWaitTime = 60;
+ }
+ 
+ 
+@@ -100,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
+ {
+     if (opened)
+         close();
++
++    if (thread) {
++        thread->join();
++        delete thread;
++    }
+ }
+ 
+ 
+@@ -154,6 +166,12 @@ void ConsoleSessionImpl::open()
+     if (opened)
+         throw QmfException("The session is already open");
+ 
++    // If the thread exists, join and delete it before creating a new one.
++    if (thread) {
++        thread->join();
++        delete thread;
++    }
++
+     // Establish messaging addresses
+     directBase = "qmf." + domain + ".direct";
+     topicBase = "qmf." + domain + ".topic";
+@@ -182,45 +200,57 @@ void ConsoleSessionImpl::open()
+ 
+     // Start the receiver thread
+     threadCanceled = false;
++    opened = true;
+     thread = new qpid::sys::Thread(*this);
+ 
+     // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
+     sendBrokerLocate();
+     if (agentQuery)
+         sendAgentLocate();
+-
+-    opened = true;
+ }
+ 
+ 
+-void ConsoleSessionImpl::close()
++void ConsoleSessionImpl::closeAsync()
+ {
+     if (!opened)
+         throw QmfException("The session is already closed");
+ 
+-    // Stop and join the receiver thread
++    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
+     threadCanceled = true;
+-    thread->join();
+-    delete thread;
+-
+-    // Close the AMQP session
+-    session.close();
+     opened = false;
+ }
+ 
+ 
++void ConsoleSessionImpl::close()
++{
++    closeAsync();
++
++    if (thread) {
++        thread->join();
++        delete thread;
++        thread = 0;
++    }
++}
++
++
+ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
+ {
+     uint64_t milliseconds = timeout.getMilliseconds();
+     qpid::sys::Mutex::ScopedLock l(lock);
+ 
+-    if (eventQueue.empty() && milliseconds > 0)
+-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
+-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
++    if (eventQueue.empty() && milliseconds > 0) {
++        int64_t nsecs(qpid::sys::TIME_INFINITE);
++        if ((uint64_t)(nsecs / 1000000) > milliseconds)
++            nsecs = (int64_t) milliseconds * 1000000;
++        qpid::sys::Duration then(nsecs);
++        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
++    }
+ 
      if (!eventQueue.empty()) {
          event = eventQueue.front();
          eventQueue.pop();
@@ -1393,28 +1093,28 @@ index af83595..d084b8a 100644
          return true;
      }
  
-@@ -251,6 +253,20 @@ int ConsoleSessionImpl::pendingEvents() const
+@@ -235,6 +265,20 @@ int ConsoleSessionImpl::pendingEvents() const
  }
  
  
 +void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
 +{
 +    qpid::sys::Mutex::ScopedLock l(lock);
-+    this->eventNotifier = notifier;
++    eventNotifier = notifier;
 +}
 +
 +
 +EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
 +{
 +    qpid::sys::Mutex::ScopedLock l(lock);
-+    return this->eventNotifier;
++    return eventNotifier;
 +}
 +
 +
  uint32_t ConsoleSessionImpl::getAgentCount() const
  {
      qpid::sys::Mutex::ScopedLock l(lock);
-@@ -292,8 +308,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
+@@ -276,8 +320,10 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
  {
      bool notify = eventQueue.empty();
      eventQueue.push(event);
@@ -1426,7 +1126,7 @@ index af83595..d084b8a 100644
  }
  
  
-@@ -602,6 +620,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
+@@ -586,6 +632,13 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
  }
  
  
@@ -1440,7 +1140,20 @@ index af83595..d084b8a 100644
  void ConsoleSessionImpl::run()
  {
      QPID_LOG(debug, "ConsoleSession thread started");
-@@ -633,3 +658,14 @@ void ConsoleSessionImpl::run()
+@@ -596,7 +649,7 @@ void ConsoleSessionImpl::run()
+                                qpid::sys::TIME_SEC);
+ 
+             Receiver rx;
+-            bool valid = session.nextReceiver(rx, Duration::SECOND);
++            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+             if (threadCanceled)
+                 break;
+             if (valid) {
+@@ -613,6 +666,18 @@ void ConsoleSessionImpl::run()
+         enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
+     }
+ 
++    session.close();
      QPID_LOG(debug, "ConsoleSession thread exiting");
  }
  
@@ -1456,7 +1169,7 @@ index af83595..d084b8a 100644
 +  return *session.impl;
 +}
 diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-index 478d24e..660fc9b 100644
+index 411b3f0..2f1f631 100644
 --- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
 +++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
 @@ -27,6 +27,7 @@
@@ -1482,7 +1195,11 @@ index 478d24e..660fc9b 100644
  namespace qmf {
      class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
      public:
-@@ -59,6 +65,10 @@ namespace qmf {
+@@ -56,9 +62,14 @@ namespace qmf {
+         void setDomain(const std::string& d) { domain = d; }
+         void setAgentFilter(const std::string& f);
+         void open();
++        void closeAsync();
          void close();
          bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
          int pendingEvents() const;
@@ -1493,7 +1210,11 @@ index 478d24e..660fc9b 100644
          uint32_t getAgentCount() const;
          Agent getAgent(uint32_t i) const;
          Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
-@@ -80,6 +90,7 @@ namespace qmf {
+@@ -76,9 +87,11 @@ namespace qmf {
+         uint32_t maxAgentAgeMinutes;
+         bool listenOnDirect;
+         bool strictSecurity;
++        uint32_t maxThreadWaitTime;
          Query agentQuery;
          bool opened;
          std::queue<ConsoleEvent> eventQueue;
@@ -1501,13 +1222,22 @@ index 478d24e..660fc9b 100644
          qpid::sys::Thread* thread;
          bool threadCanceled;
          uint64_t lastVisit;
-@@ -102,11 +113,17 @@ namespace qmf {
+@@ -90,6 +103,8 @@ namespace qmf {
+         std::string directBase;
+         std::string topicBase;
+         boost::shared_ptr<SchemaCache> schemaCache;
++        qpid::sys::Mutex corrlock;
++        uint32_t nextCorrelator;
+ 
+         void enqueueEvent(const ConsoleEvent&);
+         void enqueueEventLH(const ConsoleEvent&);
+@@ -99,10 +114,17 @@ namespace qmf {
          void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
          void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
          void periodicProcessing(uint64_t);
 +        void alertEventNotifierLH(bool readable);
          void run();
-         uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
++        uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
  
          friend class AgentImpl;
      };
@@ -1518,7 +1248,54 @@ index 478d24e..660fc9b 100644
 +    };
  }
  
- #endif
+ #endif
+diff --git a/qpid/cpp/src/qmf/DataAddr.cpp b/qpid/cpp/src/qmf/DataAddr.cpp
+index fb51d57..d16e120 100644
+--- a/qpid/cpp/src/qmf/DataAddr.cpp
++++ b/qpid/cpp/src/qmf/DataAddr.cpp
+@@ -36,7 +36,9 @@ DataAddr::~DataAddr() { PI::dtor(*this); }
+ DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
+ 
+ bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
++bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
+ bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
++bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
+ 
+ DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
+ DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
+@@ -45,7 +47,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); }
+ uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
+ Variant::Map DataAddr::asMap() const { return impl->asMap(); }
+ 
+-bool DataAddrImpl::operator==(const DataAddrImpl& other)
++bool DataAddrImpl::operator==(const DataAddrImpl& other) const
+ {
+     return
+         agentName == other.agentName &&
+@@ -54,7 +56,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other)
+ }
+ 
+ 
+-bool DataAddrImpl::operator<(const DataAddrImpl& other)
++bool DataAddrImpl::operator<(const DataAddrImpl& other) const
+ {
+     if (agentName < other.agentName) return true;
+     if (agentName > other.agentName) return false;
+diff --git a/qpid/cpp/src/qmf/DataAddrImpl.h b/qpid/cpp/src/qmf/DataAddrImpl.h
+index 3f9cae9..11d512f 100644
+--- a/qpid/cpp/src/qmf/DataAddrImpl.h
++++ b/qpid/cpp/src/qmf/DataAddrImpl.h
+@@ -38,8 +38,8 @@ namespace qmf {
+         //
+         // Methods from API handle
+         //
+-        bool operator==(const DataAddrImpl&);
+-        bool operator<(const DataAddrImpl&);
++        bool operator==(const DataAddrImpl&) const;
++        bool operator<(const DataAddrImpl&) const;
+         DataAddrImpl(const qpid::types::Variant::Map&);
+         DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
+             agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}
 diff --git a/qpid/cpp/src/qmf/EventNotifierImpl.cpp b/qpid/cpp/src/qmf/EventNotifierImpl.cpp
 new file mode 100644
 index 0000000..20114aa
@@ -1637,10 +1414,10 @@ index 0000000..d85f997
 +
 diff --git a/qpid/cpp/src/qmf/PosixEventNotifier.cpp b/qpid/cpp/src/qmf/PosixEventNotifier.cpp
 new file mode 100644
-index 0000000..b5c7121
+index 0000000..a364cc1
 --- /dev/null
 +++ b/qpid/cpp/src/qmf/PosixEventNotifier.cpp
-@@ -0,0 +1,63 @@
+@@ -0,0 +1,65 @@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
@@ -1669,6 +1446,8 @@ index 0000000..b5c7121
 +
 +typedef qmf::PrivateImplRef<posix::EventNotifier> PI;
 +
++posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); }
++
 +posix::EventNotifier::EventNotifier(AgentSession& agentSession)
 +{
 +    PI::ctor(*this, new PosixEventNotifierImpl(agentSession));
@@ -1706,10 +1485,10 @@ index 0000000..b5c7121
 +
 diff --git a/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp b/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
 new file mode 100644
-index 0000000..abc9cad
+index 0000000..011dbcc
 --- /dev/null
 +++ b/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
-@@ -0,0 +1,108 @@
+@@ -0,0 +1,112 @@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
@@ -1730,516 +1509,165 @@ index 0000000..abc9cad
 + */
 +
 +#include "PosixEventNotifierImpl.h"
++#include "qpid/log/Statement.h"
 +
 +#include <fcntl.h>
 +#include <unistd.h>
++#include <errno.h>
 +
 +#define BUFFER_SIZE 10
 +
-+using namespace qmf;
-+
-+PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession)
-+    : EventNotifierImpl(agentSession)
-+{
-+    openHandle();
-+}
-+
-+
-+PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession)
-+    : EventNotifierImpl(consoleSession)
-+{
-+    openHandle();
-+}
-+
-+
-+PosixEventNotifierImpl::~PosixEventNotifierImpl()
-+{
-+    closeHandle();
-+}
-+
-+
-+void PosixEventNotifierImpl::update(bool readable)
-+{
-+    char buffer[BUFFER_SIZE];
-+
-+    if(readable && !this->isReadable()) {
-+        (void) ::write(myHandle, "1", 1);
-+    }
-+    else if(!readable && this->isReadable()) {
-+        (void) ::read(yourHandle, buffer, BUFFER_SIZE);
-+    }
-+}
-+
-+
-+void PosixEventNotifierImpl::openHandle()
-+{
-+    int pair[2];
-+
-+    if(::pipe(pair) == -1)
-+        throw QmfException("Unable to open event notifier handle.");
-+
-+    yourHandle = pair[0];
-+    myHandle = pair[1];
-+
-+    int flags;
-+
-+    flags = ::fcntl(yourHandle, F_GETFL);
-+    if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
-+        throw QmfException("Unable to make remote handle non-blocking.");
-+
-+    flags = ::fcntl(myHandle, F_GETFL);
-+    if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
-+        throw QmfException("Unable to make local handle non-blocking.");
-+}
-+
-+
-+void PosixEventNotifierImpl::closeHandle()
-+{
-+    if(myHandle > 0) {
-+        ::close(myHandle);
-+        myHandle = -1;
-+    }
-+
-+    if(yourHandle > 0) {
-+        ::close(yourHandle);
-+        yourHandle = -1;
-+    }
-+}
-+
-+
-+PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier)
-+{
-+    return *notifier.impl;
-+}
-+
-+
-+const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier)
-+{
-+    return *notifier.impl;
-+}
-+
-diff --git a/qpid/cpp/src/qmf/PosixEventNotifierImpl.h b/qpid/cpp/src/qmf/PosixEventNotifierImpl.h
-new file mode 100644
-index 0000000..c8a7446
---- /dev/null
-+++ b/qpid/cpp/src/qmf/PosixEventNotifierImpl.h
-@@ -0,0 +1,61 @@
-+#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
-+#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
-+
-+/*
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
-+ *
-+ *   http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing,
-+ * software distributed under the License is distributed on an
-+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-+ * KIND, either express or implied.  See the License for the
-+ * specific language governing permissions and limitations
-+ * under the License.
-+ */
-+
-+#include "qmf/posix/EventNotifier.h"
-+#include "qmf/EventNotifierImpl.h"
-+#include "qpid/RefCounted.h"
-+
-+namespace qmf
-+{
-+    class AgentSession;
-+    class ConsoleSession;
-+
-+    class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted
-+    {
-+    public:
-+        PosixEventNotifierImpl(AgentSession& agentSession);
-+        PosixEventNotifierImpl(ConsoleSession& consoleSession);
-+        virtual ~PosixEventNotifierImpl();
-+
-+        int getHandle() const { return yourHandle; }
-+
-+    private:
-+        int myHandle;
-+        int yourHandle;
-+
-+        void openHandle();
-+        void closeHandle();
-+
-+    protected:
-+        void update(bool readable);
-+    };
-+
-+    struct PosixEventNotifierImplAccess
-+    {
-+        static PosixEventNotifierImpl& get(posix::EventNotifier& notifier);
-+        static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier);
-+    };
-+
-+}
-+
-+#endif
-+
-diff --git a/qpid/cpp/src/tests/Qmf2.cpp b/qpid/cpp/src/tests/Qmf2.cpp
-index 66c774a..bc263d5 100644
---- a/qpid/cpp/src/tests/Qmf2.cpp
-+++ b/qpid/cpp/src/tests/Qmf2.cpp
-@@ -23,12 +23,36 @@
- #include "qmf/QueryImpl.h"
- #include "qmf/SchemaImpl.h"
- #include "qmf/exceptions.h"
--
-+#include "qpid/messaging/Connection.h"
-+#include "qmf/PosixEventNotifierImpl.h"
-+#include "qmf/AgentSession.h"
-+#include "qmf/AgentSessionImpl.h"
-+#include "qmf/ConsoleSession.h"
-+#include "qmf/ConsoleSessionImpl.h"
- #include "unit_test.h"
- 
-+using namespace std;
- using namespace qpid::types;
-+using namespace qpid::messaging;
- using namespace qmf;
- 
-+bool isReadable(int fd)
-+{
-+    fd_set rfds;
-+    struct timeval tv;
-+    int nfds, result;
-+
-+    FD_ZERO(&rfds);
-+    FD_SET(fd, &rfds);
-+    nfds = fd + 1;
-+    tv.tv_sec = 0;
-+    tv.tv_usec = 0;
-+
-+    result = select(nfds, &rfds, NULL, NULL, &tv);
-+
-+    return result > 0;
-+}
-+
- namespace qpid {
- namespace tests {
- 
-@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
-     BOOST_CHECK_THROW(method.getArgument(3), QmfException);
- }
- 
-+QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
-+{
-+    Connection connection("localhost");
-+    AgentSession session(connection, "");
-+    posix::EventNotifier notifier(session);
-+
-+    AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
-+            
-+    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
-+}
-+
-+QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
-+{
-+    Connection connection("localhost");
-+    ConsoleSession session(connection, "");
-+    posix::EventNotifier notifier(session);
-+
-+    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
-+
-+    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
-+}
-+
-+QPID_AUTO_TEST_CASE(testGetHandle)
-+{
-+    Connection connection("localhost");
-+    ConsoleSession session(connection, "");
-+    posix::EventNotifier notifier(session);
-+
-+    BOOST_CHECK(notifier.getHandle() > 0);
-+}
++using namespace qmf;
 +
-+QPID_AUTO_TEST_CASE(testSetReadableToFalse)
++PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession)
++    : EventNotifierImpl(agentSession)
 +{
-+    Connection connection("localhost");
-+    ConsoleSession session(connection, "");
-+    posix::EventNotifier notifier(session);
-+    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
-+
-+    bool readable(isReadable(notifier.getHandle()));
-+    BOOST_CHECK(!readable);
++    openHandle();
 +}
 +
-+QPID_AUTO_TEST_CASE(testSetReadable)
-+{
-+    Connection connection("localhost");
-+    ConsoleSession session(connection, "");
-+    posix::EventNotifier notifier(session);
-+    PosixEventNotifierImplAccess::get(notifier).setReadable(true);
 +
-+    bool readable(isReadable(notifier.getHandle()));
-+    BOOST_CHECK(readable);
++PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession)
++    : EventNotifierImpl(consoleSession)
++{
++    openHandle();
 +}
 +
-+QPID_AUTO_TEST_CASE(testSetReadableMultiple)
-+{
-+    Connection connection("localhost");
-+    ConsoleSession session(connection, "");
-+    posix::EventNotifier notifier(session);
-+    for (int i = 0; i < 15; i++)
-+        PosixEventNotifierImplAccess::get(notifier).setReadable(true);
-+    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
 +
-+    bool readable(isReadable(notifier.getHandle()));
-+    BOOST_CHECK(!readable);
++PosixEventNotifierImpl::~PosixEventNotifierImpl()
++{
++    closeHandle();
 +}
 +
-+QPID_AUTO_TEST_CASE(testDeleteNotifier)
++
++void PosixEventNotifierImpl::update(bool readable)
 +{
-+    Connection connection("localhost");
-+    ConsoleSession session(connection, "");
-+    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
-+    {
-+        posix::EventNotifier notifier(session);
-+        BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
-+    }
-+    BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
-+}
++    char buffer[BUFFER_SIZE];
 +
- QPID_AUTO_TEST_SUITE_END()
- 
- }} // namespace qpid::tests
--- 
-1.7.4.4
-
-From 0918bc94f925d413d8021a855cf566f6a55c654c Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Wed, 14 Sep 2011 21:41:11 +0000
-Subject: [PATCH 09/14] QPID-3484 - Fixed handling of unused return values to
- prevent compiler warnings.
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1170860 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp |    8 ++++++--
- 1 files changed, 6 insertions(+), 2 deletions(-)
-
-diff --git a/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp b/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
-index abc9cad..011dbcc 100644
---- a/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
-+++ b/qpid/cpp/src/qmf/PosixEventNotifierImpl.cpp
-@@ -18,9 +18,11 @@
-  */
- 
- #include "PosixEventNotifierImpl.h"
-+#include "qpid/log/Statement.h"
- 
- #include <fcntl.h>
- #include <unistd.h>
-+#include <errno.h>
- 
- #define BUFFER_SIZE 10
- 
-@@ -51,10 +53,12 @@ void PosixEventNotifierImpl::update(bool readable)
-     char buffer[BUFFER_SIZE];
- 
-     if(readable && !this->isReadable()) {
--        (void) ::write(myHandle, "1", 1);
++    if(readable && !this->isReadable()) {
 +        if (::write(myHandle, "1", 1) == -1)
 +            QPID_LOG(error, "PosixEventNotifierImpl::update write failed: " << errno);
-     }
-     else if(!readable && this->isReadable()) {
--        (void) ::read(yourHandle, buffer, BUFFER_SIZE);
++    }
++    else if(!readable && this->isReadable()) {
 +        if (::read(yourHandle, buffer, BUFFER_SIZE) == -1)
 +            QPID_LOG(error, "PosixEventNotifierImpl::update read failed: " << errno);
-     }
- }
- 
--- 
-1.7.4.4
-
-From c1fe836092355bf45c7118517e49d9307a12c7c8 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at apache.org>
-Date: Fri, 16 Sep 2011 14:34:39 +0000
-Subject: [PATCH 10/14] QPID-3484 - Added missing constructor for
- EventNotifier, fixed initialization bug.
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1171592 13f79535-47bb-0310-9956-ffa450edef68
----
- qpid/cpp/include/qmf/posix/EventNotifier.h |    1 +
- qpid/cpp/src/qmf/AgentSession.cpp          |   16 ++++++++++++++--
- qpid/cpp/src/qmf/AgentSessionImpl.h        |    1 +
- qpid/cpp/src/qmf/ConsoleSession.cpp        |   20 ++++++++++++++++----
- qpid/cpp/src/qmf/ConsoleSessionImpl.h      |    1 +
- qpid/cpp/src/qmf/PosixEventNotifier.cpp    |    2 ++
- 6 files changed, 35 insertions(+), 6 deletions(-)
-
-diff --git a/qpid/cpp/include/qmf/posix/EventNotifier.h b/qpid/cpp/include/qmf/posix/EventNotifier.h
-index 91817cc..ebc1cb5 100644
---- a/qpid/cpp/include/qmf/posix/EventNotifier.h
-+++ b/qpid/cpp/include/qmf/posix/EventNotifier.h
-@@ -38,6 +38,7 @@ namespace posix {
- 
-   class QMF_CLASS_EXTERN EventNotifier : public qmf::Handle<qmf::PosixEventNotifierImpl> {
-   public:
-+      QMF_EXTERN EventNotifier(PosixEventNotifierImpl* impl = 0);
-       QMF_EXTERN EventNotifier(::qmf::AgentSession& agentSession);
-       QMF_EXTERN EventNotifier(::qmf::ConsoleSession& consoleSession);
-       QMF_EXTERN EventNotifier(const EventNotifier& that);
-diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp
-index d92b2a4..251c25f 100644
---- a/qpid/cpp/src/qmf/AgentSession.cpp
-+++ b/qpid/cpp/src/qmf/AgentSession.cpp
-@@ -55,7 +55,7 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
- //========================================================================================
- 
- AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
--    connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
-+    connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
-     bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
-     externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
-     maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
-@@ -191,7 +191,7 @@ void AgentSessionImpl::open()
- }
- 
- 
--void AgentSessionImpl::close()
-+void AgentSessionImpl::closeAsync()
- {
-     if (!opened)
-         return;
-@@ -202,6 +202,18 @@ void AgentSessionImpl::close()
- }
- 
- 
-+void AgentSessionImpl::close()
++    }
++}
++
++
++void PosixEventNotifierImpl::openHandle()
 +{
-+    closeAsync();
++    int pair[2];
 +
-+    if (thread) {
-+        thread->join();
-+        delete thread;
-+        thread = 0;
-+    }
++    if(::pipe(pair) == -1)
++        throw QmfException("Unable to open event notifier handle.");
++
++    yourHandle = pair[0];
++    myHandle = pair[1];
++
++    int flags;
++
++    flags = ::fcntl(yourHandle, F_GETFL);
++    if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
++        throw QmfException("Unable to make remote handle non-blocking.");
++
++    flags = ::fcntl(myHandle, F_GETFL);
++    if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
++        throw QmfException("Unable to make local handle non-blocking.");
 +}
 +
 +
- bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
- {
-     uint64_t milliseconds = timeout.getMilliseconds();
-diff --git a/qpid/cpp/src/qmf/AgentSessionImpl.h b/qpid/cpp/src/qmf/AgentSessionImpl.h
-index cf1b1d7..9039a59 100644
---- a/qpid/cpp/src/qmf/AgentSessionImpl.h
-+++ b/qpid/cpp/src/qmf/AgentSessionImpl.h
-@@ -84,6 +84,7 @@ namespace qmf {
-         void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
-         const string& getName() const { return agentName; }
-         void open();
-+        void closeAsync();
-         void close();
-         bool nextEvent(AgentEvent& e, Duration t);
-         int pendingEvents() const;
-diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp
-index d084b8a..2dfc894 100644
---- a/qpid/cpp/src/qmf/ConsoleSession.cpp
-+++ b/qpid/cpp/src/qmf/ConsoleSession.cpp
-@@ -67,7 +67,7 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
- 
- ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-     connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
--    opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
-+    opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
-     connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
- {
-     if (!options.empty()) {
-@@ -210,7 +210,7 @@ void ConsoleSessionImpl::open()
- }
- 
- 
--void ConsoleSessionImpl::close()
-+void ConsoleSessionImpl::closeAsync()
- {
-     if (!opened)
-         throw QmfException("The session is already closed");
-@@ -221,6 +221,18 @@ void ConsoleSessionImpl::close()
- }
- 
- 
-+void ConsoleSessionImpl::close()
++void PosixEventNotifierImpl::closeHandle()
 +{
-+    closeAsync();
++    if(myHandle > 0) {
++        ::close(myHandle);
++        myHandle = -1;
++    }
 +
-+    if (thread) {
-+        thread->join();
-+        delete thread;
-+        thread = 0;
++    if(yourHandle > 0) {
++        ::close(yourHandle);
++        yourHandle = -1;
 +    }
 +}
 +
 +
- bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
- {
-     uint64_t milliseconds = timeout.getMilliseconds();
-@@ -256,14 +268,14 @@ int ConsoleSessionImpl::pendingEvents() const
- void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
- {
-     qpid::sys::Mutex::ScopedLock l(lock);
--    this->eventNotifier = notifier;
-+    eventNotifier = notifier;
- }
- 
- 
- EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
- {
-     qpid::sys::Mutex::ScopedLock l(lock);
--    return this->eventNotifier;
-+    return eventNotifier;
- }
- 
- 
-diff --git a/qpid/cpp/src/qmf/ConsoleSessionImpl.h b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-index 660fc9b..2f1f631 100644
---- a/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-+++ b/qpid/cpp/src/qmf/ConsoleSessionImpl.h
-@@ -62,6 +62,7 @@ namespace qmf {
-         void setDomain(const std::string& d) { domain = d; }
-         void setAgentFilter(const std::string& f);
-         void open();
-+        void closeAsync();
-         void close();
-         bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
-         int pendingEvents() const;
-diff --git a/qpid/cpp/src/qmf/PosixEventNotifier.cpp b/qpid/cpp/src/qmf/PosixEventNotifier.cpp
-index b5c7121..a364cc1 100644
---- a/qpid/cpp/src/qmf/PosixEventNotifier.cpp
-+++ b/qpid/cpp/src/qmf/PosixEventNotifier.cpp
-@@ -26,6 +26,8 @@ using namespace std;
- 
- typedef qmf::PrivateImplRef<posix::EventNotifier> PI;
- 
-+posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); }
++PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier)
++{
++    return *notifier.impl;
++}
++
++
++const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier)
++{
++    return *notifier.impl;
++}
++
+diff --git a/qpid/cpp/src/qmf/PosixEventNotifierImpl.h b/qpid/cpp/src/qmf/PosixEventNotifierImpl.h
+new file mode 100644
+index 0000000..c8a7446
+--- /dev/null
++++ b/qpid/cpp/src/qmf/PosixEventNotifierImpl.h
+@@ -0,0 +1,61 @@
++#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
++#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
++
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
++#include "qmf/posix/EventNotifier.h"
++#include "qmf/EventNotifierImpl.h"
++#include "qpid/RefCounted.h"
++
++namespace qmf
++{
++    class AgentSession;
++    class ConsoleSession;
++
++    class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted
++    {
++    public:
++        PosixEventNotifierImpl(AgentSession& agentSession);
++        PosixEventNotifierImpl(ConsoleSession& consoleSession);
++        virtual ~PosixEventNotifierImpl();
++
++        int getHandle() const { return yourHandle; }
++
++    private:
++        int myHandle;
++        int yourHandle;
++
++        void openHandle();
++        void closeHandle();
++
++    protected:
++        void update(bool readable);
++    };
++
++    struct PosixEventNotifierImplAccess
++    {
++        static PosixEventNotifierImpl& get(posix::EventNotifier& notifier);
++        static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier);
++    };
++
++}
++
++#endif
 +
- posix::EventNotifier::EventNotifier(AgentSession& agentSession)
- {
-     PI::ctor(*this, new PosixEventNotifierImpl(agentSession));
--- 
-1.7.4.4
-
-From e5dda70d9a87b570a7386071762860d9a5eae107 Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at redhat.com>
-Date: Wed, 14 Sep 2011 14:59:18 -0400
-Subject: [PATCH 11/14] unused-result.patch
-
----
- qpid/cpp/src/qmf/engine/ResilientConnection.cpp |    4 ++--
- qpid/cpp/src/qpid/broker/Daemon.cpp             |   10 +++++-----
- qpid/cpp/src/qpid/sys/posix/LockFile.cpp        |    2 +-
- qpid/cpp/src/tests/BrokerMgmtAgent.cpp          |    2 +-
- qpid/cpp/src/tests/ForkedBroker.cpp             |    2 +-
- 5 files changed, 10 insertions(+), 10 deletions(-)
-
 diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
 index 41dd9ff..851193c 100644
 --- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -2262,6 +1690,40 @@ index 41dd9ff..851193c 100644
      }
  }
  
+diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+index 633401e..f183ff8 100644
+--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
++++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+@@ -1378,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThread::isSleeping() const
+ 
+ void ManagementAgentImpl::PublishThread::run()
+ {
+-    uint16_t    totalSleep;
++    uint16_t totalSleep;
++    uint16_t sleepTime;
+ 
+     while (!shutdown) {
+         agent.periodicProcessing();
+         totalSleep = 0;
+-        while (totalSleep++ < agent.getInterval() && !shutdown) {
+-            ::sleep(1);
++
++        //
++        // Calculate a sleep time that is no greater than 5 seconds and
++        // no less than 1 second.
++        //
++        sleepTime = agent.getInterval();
++        if (sleepTime > 5)
++            sleepTime = 5;
++        else if (sleepTime == 0)
++            sleepTime = 1;
++
++        while (totalSleep < agent.getInterval() && !shutdown) {
++            ::sleep(sleepTime);
++            totalSleep += sleepTime;
+         }
+     }
+ }
 diff --git a/qpid/cpp/src/qpid/broker/Daemon.cpp b/qpid/cpp/src/qpid/broker/Daemon.cpp
 index c36538b..281345b 100644
 --- a/qpid/cpp/src/qpid/broker/Daemon.cpp
@@ -2285,6 +1747,42 @@ index c36538b..281345b 100644
          }
      }
      else {                      // Parent
+diff --git a/qpid/cpp/src/qpid/messaging/AddressParser.h b/qpid/cpp/src/qpid/messaging/AddressParser.h
+index 1635331..c51200c 100644
+--- a/qpid/cpp/src/qpid/messaging/AddressParser.h
++++ b/qpid/cpp/src/qpid/messaging/AddressParser.h
+@@ -26,13 +26,13 @@
+ namespace qpid {
+ namespace messaging {
+ 
+-class AddressParser
++class QPID_MESSAGING_CLASS_EXTERN AddressParser
+ {
+   public:
+-    AddressParser(const std::string&);
+-    bool parse(Address& address);
+-    bool parseMap(qpid::types::Variant::Map& map);
+-    bool parseList(qpid::types::Variant::List& list);
++    QPID_MESSAGING_EXTERN AddressParser(const std::string&);
++    QPID_MESSAGING_EXTERN bool parse(Address& address);
++    QPID_MESSAGING_EXTERN bool parseMap(qpid::types::Variant::Map& map);
++    QPID_MESSAGING_EXTERN bool parseList(qpid::types::Variant::List& list);
+   private:
+     const std::string& input;
+     std::string::size_type current;
+diff --git a/qpid/cpp/src/qpid/store/StorageProvider.h b/qpid/cpp/src/qpid/store/StorageProvider.h
+index bc8d187..d162cc5 100644
+--- a/qpid/cpp/src/qpid/store/StorageProvider.h
++++ b/qpid/cpp/src/qpid/store/StorageProvider.h
+@@ -54,7 +54,7 @@ struct QueueEntry {
+     QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
+         : queueId(id), tplStatus(tpl), xid(x) {}
+ 
+-    bool operator==(const QueueEntry& rhs) {
++    bool operator==(const QueueEntry& rhs) const {
+         if (queueId != rhs.queueId) return false;
+         if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
+         return xid == rhs.xid;
 diff --git a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp b/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
 index f5a6c29..c1f1c37 100755
 --- a/qpid/cpp/src/qpid/sys/posix/LockFile.cpp
@@ -2324,77 +1822,135 @@ index 10674b5..de1b42d 100644
      }
  }
  
--- 
-1.7.4.4
-
-From caec0216a72bf1c6bd8094b03337f0181f5ba07d Mon Sep 17 00:00:00 2001
-From: Ted Ross <tross at redhat.com>
-Date: Tue, 20 Sep 2011 11:09:10 -0400
-Subject: [PATCH 12/14] In CMake build, only build the posix event notifier
- (qmf2) for linux, not Windows.
-
----
- qpid/cpp/src/CMakeLists.txt |   16 +++++++++++++---
- 1 files changed, 13 insertions(+), 3 deletions(-)
-
-diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
-index 2986a90..0be42b8 100644
---- a/qpid/cpp/src/CMakeLists.txt
-+++ b/qpid/cpp/src/CMakeLists.txt
-@@ -1093,7 +1093,6 @@ install_pdb (qmf ${QPID_COMPONENT_QMF})
-         ../include/qmf/exceptions.h
-         ../include/qmf/Handle.h
-         ../include/qmf/ImportExport.h
--        ../include/qmf/posix/EventNotifier.h
-         ../include/qmf/Query.h
-         ../include/qmf/Schema.h
-         ../include/qmf/SchemaId.h
-@@ -1125,8 +1124,6 @@ install_pdb (qmf ${QPID_COMPONENT_QMF})
-         qmf/DataImpl.h
-         qmf/EventNotifierImpl.h
-         qmf/EventNotifierImpl.cpp
--        qmf/PosixEventNotifier.cpp
--        qmf/PosixEventNotifierImpl.cpp
-         qmf/exceptions.cpp
-         qmf/Expression.cpp
-         qmf/Expression.h
-@@ -1149,6 +1146,19 @@ install_pdb (qmf ${QPID_COMPONENT_QMF})
-         qmf/SubscriptionImpl.h
-         )
+diff --git a/qpid/cpp/src/tests/Qmf2.cpp b/qpid/cpp/src/tests/Qmf2.cpp
+index 66c774a..bc263d5 100644
+--- a/qpid/cpp/src/tests/Qmf2.cpp
++++ b/qpid/cpp/src/tests/Qmf2.cpp
+@@ -23,12 +23,36 @@
+ #include "qmf/QueryImpl.h"
+ #include "qmf/SchemaImpl.h"
+ #include "qmf/exceptions.h"
+-
++#include "qpid/messaging/Connection.h"
++#include "qmf/PosixEventNotifierImpl.h"
++#include "qmf/AgentSession.h"
++#include "qmf/AgentSessionImpl.h"
++#include "qmf/ConsoleSession.h"
++#include "qmf/ConsoleSessionImpl.h"
+ #include "unit_test.h"
  
-+if(NOT WIN32)
-+    set (qmf2_HEADERS
-+        ${qmf2_HEADERS}
-+        ../include/qmf/posix/EventNotifier.h
-+        )
++using namespace std;
+ using namespace qpid::types;
++using namespace qpid::messaging;
+ using namespace qmf;
+ 
++bool isReadable(int fd)
++{
++    fd_set rfds;
++    struct timeval tv;
++    int nfds, result;
 +
-+    set (qmf2_SOURCES
-+        ${qmf2_SOURCES}
-+        qmf/PosixEventNotifier.cpp
-+        qmf/PosixEventNotifierImpl.cpp
-+        )
-+endif (NOT WIN32)
++    FD_ZERO(&rfds);
++    FD_SET(fd, &rfds);
++    nfds = fd + 1;
++    tv.tv_sec = 0;
++    tv.tv_usec = 0;
 +
-     add_msvc_version (qmf2 library dll)
-     add_library (qmf2 SHARED ${qmf2_SOURCES})
-     target_link_libraries (qmf2 qpidmessaging qpidtypes qpidclient qpidcommon)
--- 
-1.7.4.4
-
-From 258294f134d42cf131e2bcbcb5d77482664efece Mon Sep 17 00:00:00 2001
-From: Nuno Santos <nsantos at apache.org>
-Date: Thu, 8 Sep 2011 20:45:33 +0000
-Subject: [PATCH 13/14] QPID-3437: qpid-config address option confusing in
- help
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1166897 13f79535-47bb-0310-9956-ffa450edef68
-(cherry picked from commit 3ee5b3a8b77426987bff6b65f147b27a99b027c3)
----
- qpid/tools/src/py/qpid-config |    7 +------
- 1 files changed, 1 insertions(+), 6 deletions(-)
-
++    result = select(nfds, &rfds, NULL, NULL, &tv);
++
++    return result > 0;
++}
++
+ namespace qpid {
+ namespace tests {
+ 
+@@ -315,6 +339,84 @@ QPID_AUTO_TEST_CASE(testSchema)
+     BOOST_CHECK_THROW(method.getArgument(3), QmfException);
+ }
+ 
++QPID_AUTO_TEST_CASE(testAgentSessionEventListener)
++{
++    Connection connection("localhost");
++    AgentSession session(connection, "");
++    posix::EventNotifier notifier(session);
++
++    AgentSessionImpl& sessionImpl = AgentSessionImplAccess::get(session);
++            
++    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
++}
++
++QPID_AUTO_TEST_CASE(testConsoleSessionEventListener)
++{
++    Connection connection("localhost");
++    ConsoleSession session(connection, "");
++    posix::EventNotifier notifier(session);
++
++    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
++
++    BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
++}
++
++QPID_AUTO_TEST_CASE(testGetHandle)
++{
++    Connection connection("localhost");
++    ConsoleSession session(connection, "");
++    posix::EventNotifier notifier(session);
++
++    BOOST_CHECK(notifier.getHandle() > 0);
++}
++
++QPID_AUTO_TEST_CASE(testSetReadableToFalse)
++{
++    Connection connection("localhost");
++    ConsoleSession session(connection, "");
++    posix::EventNotifier notifier(session);
++    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
++
++    bool readable(isReadable(notifier.getHandle()));
++    BOOST_CHECK(!readable);
++}
++
++QPID_AUTO_TEST_CASE(testSetReadable)
++{
++    Connection connection("localhost");
++    ConsoleSession session(connection, "");
++    posix::EventNotifier notifier(session);
++    PosixEventNotifierImplAccess::get(notifier).setReadable(true);
++
++    bool readable(isReadable(notifier.getHandle()));
++    BOOST_CHECK(readable);
++}
++
++QPID_AUTO_TEST_CASE(testSetReadableMultiple)
++{
++    Connection connection("localhost");
++    ConsoleSession session(connection, "");
++    posix::EventNotifier notifier(session);
++    for (int i = 0; i < 15; i++)
++        PosixEventNotifierImplAccess::get(notifier).setReadable(true);
++    PosixEventNotifierImplAccess::get(notifier).setReadable(false);
++
++    bool readable(isReadable(notifier.getHandle()));
++    BOOST_CHECK(!readable);
++}
++
++QPID_AUTO_TEST_CASE(testDeleteNotifier)
++{
++    Connection connection("localhost");
++    ConsoleSession session(connection, "");
++    ConsoleSessionImpl& sessionImpl = ConsoleSessionImplAccess::get(session);
++    {
++        posix::EventNotifier notifier(session);
++        BOOST_CHECK(sessionImpl.getEventNotifier() != 0);
++    }
++    BOOST_CHECK(sessionImpl.getEventNotifier() == 0);
++}
++
+ QPID_AUTO_TEST_SUITE_END()
+ 
+ }} // namespace qpid::tests
 diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
-index b6eb055..4ed1ea7 100755
+index b6eb055..4af3c84 100755
 --- a/qpid/tools/src/py/qpid-config
 +++ b/qpid/tools/src/py/qpid-config
 @@ -39,11 +39,6 @@ Usage:  qpid-config [OPTIONS]
@@ -2409,7 +1965,15 @@ index b6eb055..4ed1ea7 100755
  Examples:
  
  $ qpid-config add queue q
-@@ -159,7 +154,7 @@ def OptionsAndArguments(argv):
+@@ -102,6 +97,7 @@ class Config:
+         self._flowStopSize      = None
+         self._flowResumeSize    = None
+         self._extra_arguments   = []
++        self._returnCode        = 0
+ 
+ config = Config()
+ 
+@@ -159,7 +155,7 @@ def OptionsAndArguments(argv):
      group1 = OptionGroup(parser, "General Options")
      group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
      group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
@@ -2418,35 +1982,7 @@ index b6eb055..4ed1ea7 100755
      group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
      parser.add_option_group(group1)
  
--- 
-1.7.4.4
-
-From 0b178e6d01d6a3c67fa55086394cf53132909cce Mon Sep 17 00:00:00 2001
-From: Nuno Santos <nsantos at apache.org>
-Date: Thu, 8 Sep 2011 20:27:49 +0000
-Subject: [PATCH 14/14] make 'qpid-config queues/exchanges
- <queue/exchange_name>' return proper error code, for
- scripting purposes
-
-git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1166888 13f79535-47bb-0310-9956-ffa450edef68
-(cherry picked from commit 7d5bce20c5ca6bc3869cbe2f6a73f1968c7a5abb)
----
- qpid/tools/src/py/qpid-config |   18 ++++++++++++++++--
- 1 files changed, 16 insertions(+), 2 deletions(-)
-
-diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
-index 4ed1ea7..4af3c84 100755
---- a/qpid/tools/src/py/qpid-config
-+++ b/qpid/tools/src/py/qpid-config
-@@ -97,6 +97,7 @@ class Config:
-         self._flowStopSize      = None
-         self._flowResumeSize    = None
-         self._extra_arguments   = []
-+        self._returnCode        = 0
- 
- config = Config()
- 
-@@ -354,9 +355,16 @@ class BrokerManager:
+@@ -359,9 +355,16 @@ class BrokerManager:
          caption1 = "Type      "
          caption2 = "Exchange Name"
          maxNameLen = len(caption2)
@@ -2463,7 +1999,7 @@ index 4ed1ea7..4af3c84 100755
          print "%s%-*s  Attributes" % (caption1, maxNameLen, caption2)
          line = ""
          for i in range(((maxNameLen + len(caption1)) / 5) + 5):
-@@ -393,12 +401,18 @@ class BrokerManager:
+@@ -398,12 +401,18 @@ class BrokerManager:
  
      def QueueList(self, filter):
          queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
@@ -2483,7 +2019,7 @@ index 4ed1ea7..4af3c84 100755
          print "%-*s  Attributes" % (maxNameLen, caption)
          line = ""
          for i in range((maxNameLen / 5) + 5):
-@@ -670,7 +684,7 @@ def main(argv=None):
+@@ -675,7 +684,7 @@ def main(argv=None):
                  print "Failed: %s: %s" % (e.__class__.__name__, e)
                  return 1
  
@@ -2492,6 +2028,3 @@ index 4ed1ea7..4af3c84 100755
  
  if __name__ == "__main__":
          sys.exit(main())
--- 
-1.7.4.4
-


More information about the scm-commits mailing list