rpms/qpid-cpp/devel mrg_1.3.x.patch, NONE, 1.1 store_1.3.x.patch, NONE, 1.1
Nuno Santos
nsantos at fedoraproject.org
Thu Jul 22 18:35:32 UTC 2010
Author: nsantos
Update of /cvs/pkgs/rpms/qpid-cpp/devel
In directory cvs01.phx2.fedoraproject.org:/tmp/cvs-serv10554
Added Files:
mrg_1.3.x.patch store_1.3.x.patch
Log Message:
Rebased to sync with mrg
mrg_1.3.x.patch:
b/qpid/cpp/bindings/qmf/python/qmf.py | 15
b/qpid/cpp/bindings/qmf/ruby/qmf.rb | 15
b/qpid/cpp/bindings/qpid/dotnet/ReadMe.txt | 33
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/csharp.direct.receiver.cs | 19
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/csharp.direct.receiver.csproj | 12
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.sender/csharp.direct.sender.cs | 15
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.sender/csharp.direct.sender.csproj | 12
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.client/Properties/AssemblyInfo.cs | 36
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.client/csharp.example.client.cs | 70
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.client/csharp.example.client.csproj | 81
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.declare_queues/Properties/AssemblyInfo.cs | 36
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.declare_queues/csharp.example.declare_queues.cs | 61
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.declare_queues/csharp.example.declare_queues.csproj | 81
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.drain/Options.cs | 181 +
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.drain/Properties/AssemblyInfo.cs | 36
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.drain/csharp.example.drain.cs | 85
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.drain/csharp.example.drain.csproj | 82
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.helloworld/Properties/AssemblyInfo.cs | 36
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.helloworld/csharp.example.helloworld.cs | 55
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.helloworld/csharp.example.helloworld.csproj | 81
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.server/Properties/AssemblyInfo.cs | 36
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.server/csharp.example.server.cs | 61
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.server/csharp.example.server.csproj | 81
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.spout/Options.cs | 189 +
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.spout/Properties/AssemblyInfo.cs | 36
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.spout/csharp.example.spout.cs | 117 +
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.spout/csharp.example.spout.csproj | 82
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/Properties/AssemblyInfo.cs | 54
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs | 280 ++
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj | 69
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.sender/Properties/AssemblyInfo.cs | 54
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.sender/csharp.map.callback.sender.cs | 146 +
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj | 66
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.receiver/csharp.map.receiver.csproj | 16
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.receiver/csharp.map.recevier.cs | 4
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.sender/csharp.map.sender.cs | 17
b/qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.sender/csharp.map.sender.csproj | 12
b/qpid/cpp/bindings/qpid/dotnet/examples/powershell.example.helloworld/powershell.example.helloworld.ps1 | 34
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/Application.Designer.vb | 13
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/Application.myapp | 10
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/AssemblyInfo.vb | 35
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/Resources.Designer.vb | 63
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/Resources.resx | 117 +
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/Settings.Designer.vb | 73
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/MyProject/Settings.settings | 7
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/visualbasic.example.client.vb | 69
b/qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/visualbasic.example.client.vbproj | 134 +
b/qpid/cpp/bindings/qpid/dotnet/org.apache.qpid.messaging.sln | 44
b/qpid/cpp/bindings/qpid/dotnet/src/Address.cpp | 186 +
b/qpid/cpp/bindings/qpid/dotnet/src/Address.h | 89
b/qpid/cpp/bindings/qpid/dotnet/src/Connection.cpp | 194 +-
b/qpid/cpp/bindings/qpid/dotnet/src/Connection.h | 21
b/qpid/cpp/bindings/qpid/dotnet/src/Duration.h | 65
b/qpid/cpp/bindings/qpid/dotnet/src/Message.cpp | 2
b/qpid/cpp/bindings/qpid/dotnet/src/Message.h | 82
b/qpid/cpp/bindings/qpid/dotnet/src/QpidException.h | 38
b/qpid/cpp/bindings/qpid/dotnet/src/QpidMarshal.h | 1
b/qpid/cpp/bindings/qpid/dotnet/src/QpidTypeCheck.h | 75
b/qpid/cpp/bindings/qpid/dotnet/src/ReadMe.txt | 8
b/qpid/cpp/bindings/qpid/dotnet/src/Receiver.cpp | 145 +
b/qpid/cpp/bindings/qpid/dotnet/src/Receiver.h | 20
b/qpid/cpp/bindings/qpid/dotnet/src/Sender.cpp | 29
b/qpid/cpp/bindings/qpid/dotnet/src/Sender.h | 33
b/qpid/cpp/bindings/qpid/dotnet/src/Session.cpp | 305 ++-
b/qpid/cpp/bindings/qpid/dotnet/src/Session.h | 47
b/qpid/cpp/bindings/qpid/dotnet/src/TypeTranslator.cpp | 411 ++++
b/qpid/cpp/bindings/qpid/dotnet/src/TypeTranslator.h | 70
b/qpid/cpp/bindings/qpid/dotnet/src/app.rc | 1
b/qpid/cpp/bindings/qpid/dotnet/src/org.apache.qpid.messaging.rc | 101 +
b/qpid/cpp/bindings/qpid/dotnet/src/org.apache.qpid.messaging.vcproj | 34
b/qpid/cpp/bindings/qpid/dotnet/src/resource1.h | 14
b/qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/Properties/AssemblyInfo.cs | 55
b/qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/org.apache.qpid.messaging.sessionreceiver.csproj | 65
b/qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/sessionreceiver.cs | 133 +
b/qpid/cpp/bindings/qpid/dotnet/test/messaging.test/messaging.test.cs | 62
b/qpid/cpp/bindings/qpid/dotnet/test/messaging.test/messaging.test.csproj | 19
b/qpid/cpp/bld-winsdk.ps1 | 19
b/qpid/cpp/configure.ac | 7
b/qpid/cpp/etc/qpidd | 3
b/qpid/cpp/examples/examples.sln | 32
b/qpid/cpp/examples/messaging/messaging_client.vcproj | 221 --
b/qpid/cpp/examples/messaging/messaging_drain.vcproj | 183 -
b/qpid/cpp/examples/messaging/messaging_map_receiver.vcproj | 221 --
b/qpid/cpp/examples/messaging/messaging_map_sender.vcproj | 221 --
b/qpid/cpp/examples/messaging/messaging_server.vcproj | 221 --
b/qpid/cpp/examples/messaging/messaging_spout.vcproj | 182 -
b/qpid/cpp/examples/messaging/spout.cpp | 13
b/qpid/cpp/examples/old-examples.sln | 13
b/qpid/cpp/examples/qmf-console/console.cpp | 1
b/qpid/cpp/examples/qmf-console/ping.cpp | 1
b/qpid/cpp/include/qmf/engine/Console.h | 3
b/qpid/cpp/include/qpid/Address.h | 3
b/qpid/cpp/include/qpid/Url.h | 2
b/qpid/cpp/include/qpid/agent/ManagementAgent.h | 9
b/qpid/cpp/include/qpid/console/Broker.h | 15
b/qpid/cpp/include/qpid/console/SessionManager.h | 14
b/qpid/cpp/include/qpid/framing/FieldValue.h | 3
b/qpid/cpp/include/qpid/management/ConnectionSettings.h | 118 +
b/qpid/cpp/include/qpid/management/ManagementObject.h | 1
b/qpid/cpp/src/CMakeLists.txt | 1
b/qpid/cpp/src/Makefile.am | 2
b/qpid/cpp/src/acl.mk | 4
b/qpid/cpp/src/cluster.mk | 6
b/qpid/cpp/src/qmf.mk | 20
b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp | 30
b/qpid/cpp/src/qmf/engine/ConsoleImpl.h | 2
b/qpid/cpp/src/qmfc.mk | 3
b/qpid/cpp/src/qpid/Url.cpp | 40
b/qpid/cpp/src/qpid/acl/AclData.cpp | 4
b/qpid/cpp/src/qpid/acl/AclValidator.cpp | 3
b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 25
b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 2
b/qpid/cpp/src/qpid/broker/Bridge.cpp | 26
b/qpid/cpp/src/qpid/broker/Bridge.h | 3
b/qpid/cpp/src/qpid/broker/Broker.cpp | 2
b/qpid/cpp/src/qpid/broker/Connection.cpp | 3
b/qpid/cpp/src/qpid/broker/Connection.h | 8
b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 17
b/qpid/cpp/src/qpid/broker/ConnectionHandler.h | 7
b/qpid/cpp/src/qpid/broker/Message.cpp | 6
b/qpid/cpp/src/qpid/broker/Message.h | 1
b/qpid/cpp/src/qpid/broker/Queue.cpp | 1
b/qpid/cpp/src/qpid/broker/Queue.h | 2
b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 19
b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h | 10
b/qpid/cpp/src/qpid/broker/SemanticState.cpp | 3
b/qpid/cpp/src/qpid/broker/SemanticState.h | 4
b/qpid/cpp/src/qpid/broker/SessionState.cpp | 3
b/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp | 3
b/qpid/cpp/src/qpid/client/Bounds.cpp | 6
b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 12
b/qpid/cpp/src/qpid/client/ConnectionHandler.h | 9
b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 2
b/qpid/cpp/src/qpid/client/RdmaConnector.cpp | 19
b/qpid/cpp/src/qpid/client/SaslFactory.cpp | 5
b/qpid/cpp/src/qpid/client/SessionImpl.cpp | 2
b/qpid/cpp/src/qpid/client/TCPConnector.cpp | 12
b/qpid/cpp/src/qpid/client/TCPConnector.h | 1
b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp | 3
b/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.h | 3
b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 2
b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 32
b/qpid/cpp/src/qpid/cluster/Cluster.cpp | 2
b/qpid/cpp/src/qpid/cluster/Cluster.h | 3
b/qpid/cpp/src/qpid/cluster/Connection.cpp | 31
b/qpid/cpp/src/qpid/cluster/Connection.h | 20
b/qpid/cpp/src/qpid/cluster/Multicaster.cpp | 1
b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 2
b/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp | 6
b/qpid/cpp/src/qpid/cluster/RetractClient.cpp | 2
b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 16
b/qpid/cpp/src/qpid/console/SessionManager.cpp | 28
b/qpid/cpp/src/qpid/framing/FrameSet.cpp | 10
b/qpid/cpp/src/qpid/framing/FrameSet.h | 2
b/qpid/cpp/src/qpid/management/ConnectionSettings.cpp | 41
b/qpid/cpp/src/qpid/management/ManagementAgent.cpp | 46
b/qpid/cpp/src/qpid/management/ManagementAgent.h | 3
b/qpid/cpp/src/qpid/management/ManagementObject.cpp | 37
b/qpid/cpp/src/qpid/messaging/AddressParser.cpp | 4
b/qpid/cpp/src/qpid/messaging/Connection.cpp | 2
b/qpid/cpp/src/qpid/sys/AsynchIO.h | 2
b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 11
b/qpid/cpp/src/qpid/sys/ClusterSafe.cpp | 6
b/qpid/cpp/src/qpid/sys/ClusterSafe.h | 17
b/qpid/cpp/src/qpid/sys/DispatchHandle.cpp | 3
b/qpid/cpp/src/qpid/sys/Poller.h | 2
b/qpid/cpp/src/qpid/sys/RdmaIOPlugin.cpp | 12
b/qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp | 46
b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 11
b/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp | 5
b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 4
b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 3
b/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp | 4
b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 46
b/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h | 30
b/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp | 5
b/qpid/cpp/src/qpid/sys/windows/IocpPoller.cpp | 6
b/qpid/cpp/src/qpid/types/Variant.cpp | 56
b/qpid/cpp/src/replication.mk | 11
b/qpid/cpp/src/ssl.mk | 10
b/qpid/cpp/src/tests/CMakeLists.txt | 56
b/qpid/cpp/src/tests/DispatcherTest.cpp | 9
b/qpid/cpp/src/tests/Makefile.am | 60
b/qpid/cpp/src/tests/MessagingFixture.h | 5
b/qpid/cpp/src/tests/MessagingSessionTests.cpp | 3
b/qpid/cpp/src/tests/cluster_authentication_soak.cpp | 22
b/qpid/cpp/src/tests/cluster_test.cpp | 5
b/qpid/cpp/src/tests/cluster_tests.py | 2
b/qpid/cpp/src/tests/failover_soak.cpp | 13
b/qpid/cpp/src/tests/federation.py | 5
b/qpid/cpp/src/tests/qpid-client-test.cpp | 139 +
b/qpid/cpp/src/tests/qpid-latency-test.cpp | 469 ++++
b/qpid/cpp/src/tests/qpid-perftest.cpp | 741 +++++++
b/qpid/cpp/src/tests/qpid-topic-listener.cpp | 209 ++
b/qpid/cpp/src/tests/qpid-topic-publisher.cpp | 230 ++
b/qpid/cpp/src/tests/qpid-txtest.cpp | 340 +++
b/qpid/cpp/src/tests/qpid_receive.cpp | 19
b/qpid/cpp/src/tests/qpid_send.cpp | 16
b/qpid/cpp/src/tests/quick_perftest | 2
b/qpid/cpp/src/tests/quick_txtest | 2
b/qpid/cpp/src/tests/run_long_cluster_tests | 2
b/qpid/cpp/src/tests/run_perftest | 6
b/qpid/cpp/src/tests/ssl_test | 2
b/qpid/cpp/src/tests/topictest | 4
b/qpid/cpp/src/tests/verify_cluster_objects | 456 ----
b/qpid/cpp/src/xml.mk | 7
b/qpid/cpp/xml/cluster.xml | 11
b/qpid/python/examples/api/drain | 8
b/qpid/python/examples/api/server | 4
b/qpid/python/examples/api/spout | 9
b/qpid/python/qpid/connection.py | 2
b/qpid/python/qpid/messaging/driver.py | 19
b/qpid/python/qpid/messaging/endpoints.py | 5
b/qpid/python/qpid/messaging/exceptions.py | 5
b/qpid/python/qpid/ops.py | 10
b/qpid/python/qpid/tests/messaging/__init__.py | 6
b/qpid/python/qpid/tests/messaging/endpoints.py | 28
b/qpid/python/qpid/tests/messaging/message.py | 5
qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/csharp.direct.receiver.cs | 65
qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/csharp.direct.receiver.csproj | 22
qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.sender/csharp.direct.sender.cs | 58
qpid/cpp/bindings/qpid/dotnet/examples/csharp.direct.sender/csharp.direct.sender.csproj | 22
qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.client/csharp.example.client.cs | 2
qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.drain/csharp.example.drain.cs | 6
qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.server/csharp.example.server.cs | 2
qpid/cpp/bindings/qpid/dotnet/examples/csharp.example.spout/csharp.example.spout.cs | 8
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.cs | 55
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver/csharp.map.callback.receiver.csproj | 22
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.sender/csharp.map.callback.sender.cs | 63
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.callback.sender/csharp.map.callback.sender.csproj | 23
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.receiver/csharp.map.receiver.csproj | 22
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.receiver/csharp.map.recevier.cs | 32
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.sender/csharp.map.sender.cs | 50
qpid/cpp/bindings/qpid/dotnet/examples/csharp.map.sender/csharp.map.sender.csproj | 22
qpid/cpp/bindings/qpid/dotnet/examples/visualbasic.example.client/visualbasic.example.client.vb | 2
qpid/cpp/bindings/qpid/dotnet/org.apache.qpid.messaging.sln | 170 +
qpid/cpp/bindings/qpid/dotnet/src/Address.cpp | 131 -
qpid/cpp/bindings/qpid/dotnet/src/Address.h | 124 +
qpid/cpp/bindings/qpid/dotnet/src/Connection.cpp | 133 -
qpid/cpp/bindings/qpid/dotnet/src/Connection.h | 39
qpid/cpp/bindings/qpid/dotnet/src/Duration.h | 29
qpid/cpp/bindings/qpid/dotnet/src/Message.cpp | 954 +++-------
qpid/cpp/bindings/qpid/dotnet/src/Message.h | 351 ++-
qpid/cpp/bindings/qpid/dotnet/src/QpidException.h | 16
qpid/cpp/bindings/qpid/dotnet/src/QpidMarshal.h | 11
qpid/cpp/bindings/qpid/dotnet/src/QpidTypeCheck.h | 31
qpid/cpp/bindings/qpid/dotnet/src/Receiver.cpp | 131 -
qpid/cpp/bindings/qpid/dotnet/src/Receiver.h | 127 -
qpid/cpp/bindings/qpid/dotnet/src/Sender.cpp | 29
qpid/cpp/bindings/qpid/dotnet/src/Sender.h | 29
qpid/cpp/bindings/qpid/dotnet/src/Session.cpp | 448 ++--
qpid/cpp/bindings/qpid/dotnet/src/Session.h | 63
qpid/cpp/bindings/qpid/dotnet/src/TypeTranslator.cpp | 152 -
qpid/cpp/bindings/qpid/dotnet/src/TypeTranslator.h | 47
qpid/cpp/bindings/qpid/dotnet/src/org.apache.qpid.messaging.rc | 6
qpid/cpp/bindings/qpid/dotnet/src/org.apache.qpid.messaging.vcproj | 20
qpid/cpp/bindings/qpid/dotnet/src/resource.h | 22
qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/Properties/AssemblyInfo.cs | 4
qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/org.apache.qpid.messaging.sessionreceiver.csproj | 22
qpid/cpp/bindings/qpid/dotnet/src/sessionreceiver/sessionreceiver.cs | 38
qpid/cpp/bindings/qpid/dotnet/test/messaging.test/messaging.test.cs | 91
qpid/cpp/bindings/qpid/dotnet/test/messaging.test/messaging.test.csproj | 24
qpid/cpp/bld-winsdk.ps1 | 278 +-
qpid/cpp/examples/messaging/spout.cpp | 2
qpid/cpp/src/CMakeLists.txt | 82
qpid/cpp/src/Makefile.am | 36
qpid/cpp/src/acl.mk | 3
qpid/cpp/src/cluster.mk | 6
qpid/cpp/src/qmfc.mk | 3
qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 33
qpid/cpp/src/qpid/broker/Connection.cpp | 42
qpid/cpp/src/qpid/broker/Connection.h | 15
qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 16
qpid/cpp/src/qpid/broker/ConnectionHandler.h | 12
qpid/cpp/src/qpid/broker/Message.cpp | 7
qpid/cpp/src/qpid/broker/Queue.cpp | 15
qpid/cpp/src/qpid/broker/Queue.h | 2
qpid/cpp/src/qpid/broker/SemanticState.cpp | 41
qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp | 3
qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 3
qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 21
qpid/cpp/src/qpid/client/RdmaConnector.cpp | 13
qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 19
qpid/cpp/src/qpid/cluster/Cluster.cpp | 63
qpid/cpp/src/qpid/cluster/Cluster.h | 3
qpid/cpp/src/qpid/cluster/Connection.cpp | 344 +--
qpid/cpp/src/qpid/cluster/Connection.h | 21
qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 22
qpid/cpp/src/qpid/management/ManagementAgent.cpp | 161 -
qpid/cpp/src/qpid/management/ManagementAgent.h | 9
qpid/cpp/src/qpid/sys/ClusterSafe.cpp | 17
qpid/cpp/src/qpid/sys/ClusterSafe.h | 6
qpid/cpp/src/qpid/sys/DispatchHandle.cpp | 11
qpid/cpp/src/qpid/sys/epoll/EpollPoller.cpp | 12
qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp | 7
qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp | 8
qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp | 46
qpid/cpp/src/qpid/sys/rdma/RdmaIO.h | 19
qpid/cpp/src/replication.mk | 11
qpid/cpp/src/ssl.mk | 6
qpid/cpp/src/tests/MessagingSessionTests.cpp | 46
qpid/cpp/src/tests/client_test.cpp | 139 -
qpid/cpp/src/tests/cluster_test.cpp | 8
qpid/cpp/src/tests/cluster_tests.py | 9
qpid/cpp/src/tests/latencytest.cpp | 469 ----
qpid/cpp/src/tests/perftest.cpp | 741 -------
qpid/cpp/src/tests/topic_listener.cpp | 209 --
qpid/cpp/src/tests/topic_publisher.cpp | 230 --
qpid/cpp/src/tests/txtest.cpp | 341 ---
qpid/cpp/src/xml.mk | 4
qpid/cpp/xml/cluster.xml | 10
qpid/python/qpid/messaging/driver.py | 89
qpid/python/qpid/messaging/endpoints.py | 105 -
qpid/python/qpid/ops.py | 5
qpid/python/qpid/tests/messaging/__init__.py | 14
qpid/python/qpid/tests/messaging/endpoints.py | 107 +
316 files changed, 11493 insertions(+), 6791 deletions(-)
--- NEW FILE mrg_1.3.x.patch ---
>From 11ed7634097ca3a3e52141509496105ad65290be Mon Sep 17 00:00:00 2001
From: Alan Conway <aconway at apache.org>
Date: Fri, 21 May 2010 17:31:29 +0000
Subject: [PATCH] Fix broker core dump during start-up caused by un-initialized mAgent pointer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@947081 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit ffc5afc35bbc8854e1956d120f79072373f29432)
---
qpid/cpp/src/qpid/cluster/Cluster.cpp | 1 +
1 files changed, 1 insertions(+), 0 deletions(-)
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 7332102..099c3ef 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -247,6 +247,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
name(settings.name),
self(cpg.self()),
clusterId(true),
+ mAgent(0),
expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
--
1.5.5.6
>From a15f49f58cdb1ee2906f03bb487cf40c43498238 Mon Sep 17 00:00:00 2001
From: Kenneth Anthony Giusti <kgiusti at apache.org>
Date: Thu, 20 May 2010 21:42:40 +0000
Subject: [PATCH] Bug 593828 - QMF: python console needs ability to filter unsolicited events.
QMF: provide event filter api for python console
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@946801 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit b806ee99fb0224069ba628bc0c506e02bb227de2)
---
qpid/cpp/src/qpid/management/ManagementAgent.cpp | 46 ++++++++++++++++-----
qpid/cpp/src/qpid/management/ManagementAgent.h | 2 +
2 files changed, 37 insertions(+), 11 deletions(-)
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 92f9d79..d4649a7 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -52,6 +52,25 @@ using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace {
+ const string defaultVendorName("vendor");
+ const string defaultProductName("product");
+
+ // Create a valid binding key substring by
+ // replacing all '.' chars with '_'
+ const string keyifyNameStr(const string& name)
+ {
+ string n2 = name;
+
+ size_t pos = n2.find('.');
+ while (pos != n2.npos) {
+ n2.replace(pos, 1, "_");
+ pos = n2.find('.', pos);
+ }
+ return n2;
+ }
+}
+
static Variant::Map mapEncodeSchemaId(const string& pname,
const string& cname,
@@ -81,6 +100,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
threadPoolSize(1), interval(10), broker(0), timer(0),
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
+ vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
qmf1Support(qmfV1), qmf2Support(qmfV2)
{
nextObjectId = 1;
@@ -89,6 +109,8 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
nextRemoteBank = 10;
nextRequestSequence = 1;
clientWasAdded = false;
+ attrMap["_vendor"] = defaultVendorName;
+ attrMap["_product"] = defaultProductName;
}
ManagementAgent::~ManagementAgent ()
@@ -196,6 +218,9 @@ void ManagementAgent::setName(const string& vendor, const string& product, const
name_address = vendor + ":" + product + ":" + inst;
attrMap["_instance"] = inst;
attrMap["_name"] = name_address;
+
+ vendorNameKey = keyifyNameStr(vendor);
+ productNameKey = keyifyNameStr(product);
}
@@ -318,6 +343,10 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
+ static const std::string severityStr[] = {
+ "emerg", "alert", "crit", "error", "warn",
+ "note", "info", "debug"
+ };
sys::Mutex::ScopedLock lock (userLock);
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
@@ -362,7 +391,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
headers["qmf.agent"] = name_address;
stringstream key;
- key << "agent.ind.event." << sev << "." << name_address << "." << event.getEventName();
+ key << "agent.ind.event." << vendorNameKey
+ << "." << productNameKey
+ << "." << severityStr[sev]
+ << "." << keyifyNameStr(event.getPackageName())
+ << "." << keyifyNameStr(event.getEventName());
string content;
MapCodec::encode(map_, content);
@@ -803,16 +836,7 @@ void ManagementAgent::periodicProcessing (void)
if (qmf2Support) {
std::stringstream addr_key;
- addr_key << "agent.ind.heartbeat";
-
- // append .<vendor>.<product> to address key if present.
- Variant::Map::const_iterator v;
- if ((v = attrMap.find("_vendor")) != attrMap.end()){
- addr_key << "." << v->second.getString();
- if ((v = attrMap.find("_product")) != attrMap.end()) {
- addr_key << "." << v->second.getString();
- }
- }
+ addr_key << "agent.ind.heartbeat." << vendorNameKey << "." << productNameKey;
Variant::Map map;
Variant::Map headers;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index a87cc91..8129c1e 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -284,6 +284,8 @@ private:
// Agent name and address
qpid::types::Variant::Map attrMap;
std::string name_address;
+ std::string vendorNameKey; // "." --> "_"
+ std::string productNameKey; // "." --> "_"
// supported management protocol
bool qmf1Support;
--
1.5.5.6
>From d3a710d15dcfa2d14750c783de70776bb50a856d Mon Sep 17 00:00:00 2001
From: Kenneth Anthony Giusti <kgiusti at apache.org>
Date: Fri, 21 May 2010 17:39:51 +0000
Subject: [PATCH] Bug 593831 - QMF: c++ console needs ability to filter unsolicited events.
QMF: add bindEvent api to allow filtering of unsolicted events.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@947084 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit 5afdc67935d07852c7c166741401ec4a77604d9b)
---
qpid/cpp/bindings/qmf/python/qmf.py | 15 +++++++++++-
qpid/cpp/bindings/qmf/ruby/qmf.rb | 15 ++++++++++++
qpid/cpp/include/qmf/engine/Console.h | 3 ++
qpid/cpp/include/qpid/console/SessionManager.h | 14 +++++++++++
qpid/cpp/src/qmf/engine/ConsoleImpl.cpp | 30 ++++++++++++++++++++++++
qpid/cpp/src/qmf/engine/ConsoleImpl.h | 2 +
qpid/cpp/src/qpid/console/SessionManager.cpp | 27 +++++++++++++++++++++
7 files changed, 105 insertions(+), 1 deletions(-)
diff --git a/qpid/cpp/bindings/qmf/python/qmf.py b/qpid/cpp/bindings/qmf/python/qmf.py
index 37442b9..06d3070 100644
--- a/qpid/cpp/bindings/qmf/python/qmf.py
+++ b/qpid/cpp/bindings/qmf/python/qmf.py
@@ -1166,9 +1166,22 @@ class Console(Thread):
if "class" in kwargs:
self.impl.bindClass(package, kwargs["class"])
else:
- self.impl.bindClass(package)
+ self.impl.bindClass(package, "*")
else:
raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']")
+
+
+ def bind_event(self, kwargs = {}):
+ if "key" in kwargs:
+ self.impl.bindEvent(kwargs["key"])
+ elif "package" in kwargs:
+ package = kwargs["package"]
+ if "event" in kwargs:
+ self.impl.bindEvent(package, kwargs["event"])
+ else:
+ self.impl.bindEvent(package, "*")
+ else:
+ raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'event']")
[...29204 lines suppressed...]
+}
+# Copy back the preserved things
+foreach ($pattern in $preserve) {
+ $target = Join-Path $install_dir $pattern
+ $tparent = Split-Path -parent $target
+ New-Item -force -type directory $tparent
+ Move-Item -force -path "$preserve_dir/$pattern" -destination "$install_dir/$pattern"
+}
+Remove-Item -recurse $preserve_dir
+
+# Zip the /bin PDB files into two zip files.
+# we previously arranged that the Debug pdbs go in the DebugPDB subdirectory
+# and the Release pdbs go in the ReleasePDB subdirectory
+&'7z' a -mx9 ".\$install_dir\bin\symbols-debug.zip" ".\$install_dir\bin\DebugPDB\*.pdb"
+&'7z' a -mx9 ".\$install_dir\bin\symbols-release.zip" ".\$install_dir\bin\ReleasePDB\*.pdb"
+
+# It would be very good to cut down on the shipped boost include files too, ideally by
+# starting with the qpid files and recursively noting all boost headers actually needed
+
+
+# Create a new zip for the whole kit.
+# Exclude *.pdb so as not include the debug symbols twice
+if (Test-Path $zipfile) {Remove-Item $zipfile}
+&'7z' a $zipfile ".\$install_dir\*" -xr!*pdb
+
+# Remove temporary install area
+# Remove-Item -recurse $install_dir
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 7083574..cf9161d 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -34,6 +34,37 @@ include(FindDoxygen)
#set (CMAKE_VERBOSE_MAKEFILE ON) # for debugging
+#
+# Set up installation of .pdb files if the compiler is Visual Studio
+#
+# Sample: install_pdb (qpidcommon ${QPID_COMPONENT_COMMON})
+#
+MACRO (install_pdb theLibrary theComponent)
+ if (MSVC)
+ get_target_property(library_dll ${theLibrary} LOCATION)
+ string(REPLACE .dll .pdb library_pdb ${library_dll})
+ string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} library_pdb ${library_pdb})
+ string(REPLACE .pdb d.pdb libraryd_pdb ${library_pdb})
+ #message(STATUS "_pdb: ${library_pdb}, ${libraryd_pdb}")
+ install (PROGRAMS
+ ${library_pdb}
+ DESTINATION ${QPID_INSTALL_LIBDIR}/ReleasePDB
+ COMPONENT ${theComponent}
+ OPTIONAL
+ CONFIGURATIONS Release|MinSizeRel)
+ install (PROGRAMS
+ ${library_pdb}
+ DESTINATION ${QPID_INSTALL_LIBDIR}/ReleasePDB
+ COMPONENT ${theComponent}
+ CONFIGURATIONS RelWithDebInfo)
+ install (PROGRAMS
+ ${libraryd_pdb}
+ DESTINATION ${QPID_INSTALL_LIBDIR}/DebugPDB
+ COMPONENT ${theComponent}
+ CONFIGURATIONS Debug)
+ endif (MSVC)
+ENDMACRO (install_pdb)
+
# check if we generate source as part of the build
# - rubygen generates the amqp spec and clustering
# - managementgen generates the broker management code
@@ -460,6 +491,10 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
if (MSVC80)
add_definitions(/D "_WIN32_WINNT=0x0501")
endif (MSVC80)
+
+ # set the RelWithDebInfo compile/link switchs to equal Release
+ set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "/MD /O2 /Ob2 /D NDEBUG")
+ set (CMAKE_SHARED_LINKER_FLAGS_RELWITHDEBINFO "/debug /INCREMENTAL:NO")
endif (MSVC)
set (qpidcommon_platform_SOURCES
@@ -544,7 +579,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
${qpid_poller_module}
)
- set (qpidcommon_platform_LIBS
+ set (qpidcommon_platform_LIBS
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
uuid
@@ -646,19 +681,8 @@ set_target_properties (qpidcommon PROPERTIES
install (TARGETS qpidcommon
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_COMMON})
+install_pdb (qpidcommon ${QPID_COMPONENT_COMMON})
-if (WIN32)
- # Need the .pdb file, which isn't installed with the .dll/.lib
- # Not built... if needed, add the build option then uncomment this.
- #get_target_property(qpidcommon_dll qpidcommon LOCATION)
- #string(REPLACE .dll .pdb qpidcommon_pdb ${qpidcommon_dll})
- #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qpidcommon_pdb ${qpidcommon_pdb})
- #message(STATUS "_pdb: ${qpidcommon_pdb}")
- #install (PROGRAMS
- # ${qpidcommon_pdb}
- # DESTINATION ${QPID_INSTALL_LIBDIR}
- # COMPONENT ${QPID_COMPONENT_CLIENT})
-endif (WIN32)
set (qpidclient_SOURCES
${rgen_client_srcs}
@@ -709,6 +733,8 @@ install (DIRECTORY ../include/qpid
DESTINATION ${QPID_INSTALL_INCLUDEDIR}
COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE}
PATTERN ".svn" EXCLUDE)
+install_pdb (qpidclient ${QPID_COMPONENT_CLIENT})
+
set (qpidmessaging_SOURCES
qpid/messaging/Address.cpp
@@ -756,6 +782,7 @@ set_target_properties (qpidmessaging PROPERTIES VERSION ${qpidc_version})
install (TARGETS qpidmessaging
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_CLIENT})
+install_pdb (qpidmessaging ${QPID_COMPONENT_CLIENT})
# Released source artifacts from Apache have the generated headers included in
# the source tree, not the binary tree. So don't attempt to grab them when
@@ -766,17 +793,6 @@ if (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
COMPONENT ${QPID_COMPONENT_CLIENT_INCLUDE})
endif (NOT QPID_GENERATED_HEADERS_IN_SOURCE)
-if (WIN32)
- # Need the .pdb file, which isn't installed with the .dll/.lib
- #get_target_property(qpidclient_dll qpidclient LOCATION)
- #string(REPLACE .dll .pdb qpidclient_pdb ${qpidclient_dll})
- #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qpidclient_pdb ${qpidclient_pdb})
- #message(STATUS "_pdb: ${qpidclient_pdb}")
- #install (PROGRAMS
- # ${qpidclient_pdb}
- # DESTINATION ${QPID_INSTALL_LIBDIR}
- # COMPONENT ${QPID_COMPONENT_CLIENT})
-endif (WIN32)
if (WIN32)
set(AMQP_WCF_DIR ${qpid-cpp_SOURCE_DIR}/../wcf)
@@ -787,6 +803,7 @@ if (WIN32)
install (TARGETS qpidxarm
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_CLIENT})
+ install_pdb (qpidxarm ${QPID_COMPONENT_CLIENT})
endif (EXISTS ${DTC_PLUGIN_SOURCE})
endif (WIN32)
@@ -867,6 +884,8 @@ endif (MSVC)
install (TARGETS qpidbroker
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_BROKER})
+install_pdb (qpidbroker ${QPID_COMPONENT_BROKER})
+
set (qpidd_SOURCES
${qpidd_platform_SOURCES}
@@ -905,6 +924,7 @@ set_target_properties (qmf PROPERTIES
install (TARGETS qmf OPTIONAL
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
+install_pdb (qmf ${QPID_COMPONENT_QMF})
set (qmfengine_SOURCES
qmf/engine/Agent.cpp
@@ -944,6 +964,7 @@ set_target_properties (qmfengine PROPERTIES
install (TARGETS qmfengine OPTIONAL
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
+install_pdb (qmfengine ${QPID_COMPONENT_QMF})
# QMF console library
#module_hdr += \
@@ -993,17 +1014,7 @@ set_target_properties (qmfconsole PROPERTIES
install (TARGETS qmfconsole
DESTINATION ${QPID_INSTALL_LIBDIR}
COMPONENT ${QPID_COMPONENT_QMF})
-if (WIN32)
- # Need the .pdb file, which isn't installed with the .dll/.lib
- #get_target_property(qmfconsole_dll qmfconsole LOCATION)
- #string(REPLACE .dll .pdb qmfconsole_pdb ${qmfconsole_dll})
- #string(REPLACE $(OutDir) \${CMAKE_INSTALL_CONFIG_NAME} qmfconsole_pdb ${qmfconsole_pdb})
- #message(STATUS "_pdb: ${qmfconsole_pdb}")
- #install (PROGRAMS
- # ${qmfconsole_pdb}
- # DESTINATION ${QPID_INSTALL_LIBDIR}
- # COMPONENT ${QPID_COMPONENT_QMF})
-endif (WIN32)
+install_pdb (qmfconsole ${QPID_COMPONENT_QMF})
# A queue event listener plugin that creates messages on a replication
# queue corresponding to enqueue and dequeue events:
--
1.5.5.6
store_1.3.x.patch:
b/configure.ac | 9
b/lib/JournalImpl.cpp | 41 +-
b/lib/JournalImpl.h | 13
b/lib/Makefile.am | 7
b/lib/MessageStoreImpl.cpp | 32 +
b/lib/MessageStoreImpl.h | 2
b/lib/StorePlugin.cpp | 9
b/lib/jrnl/jdir.cpp | 2
b/tests/OrderingTest.cpp | 7
b/tests/SimpleTest.cpp | 45 +-
b/tests/TransactionalTest.cpp | 9
b/tests/TwoPhaseCommitTest.cpp | 10
b/tests/cluster/run_python_cluster_tests | 8
b/tests/python_tests/client_persistence.py | 6
b/tests/python_tests/store_test.py | 5
lib/JournalImpl.cpp | 42 --
lib/JournalImpl.h | 484 ++++++++++++++---------------
lib/Makefile.am | 11
lib/MessageStoreImpl.cpp | 65 +--
lib/MessageStoreImpl.h | 18 -
lib/StorePlugin.cpp | 2
lib/jrnl/jdir.cpp | 6
22 files changed, 436 insertions(+), 397 deletions(-)
--- NEW FILE store_1.3.x.patch ---
>From 600610444c444d446c092804b575f5f3eeb49917 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 21 May 2010 12:49:22 +0000
Subject: [PATCH 01/13] Added a lock to protect MessageList in MessageStoreImpl and the static variables in JournalImpl; Switched all locks at this level to qpid::sys::Mutex and qpid::sys::ScopedLock for consistency.
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3980 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/JournalImpl.cpp | 41 +++++++++++++++++++++++++----------------
lib/JournalImpl.h | 13 ++++++-------
lib/MessageStoreImpl.cpp | 32 ++++++++++++++++++++++++--------
lib/MessageStoreImpl.h | 1 +
4 files changed, 56 insertions(+), 31 deletions(-)
diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index eafd807..ed1c334 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -40,12 +40,13 @@ using namespace mrg::journal;
using qpid::management::ManagementAgent;
namespace _qmf = qmf::com::redhat::rhm::store;
+qpid::sys::Mutex JournalImpl::_static_lock;
qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
u_int32_t JournalImpl::cnt = 0;
-void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent) _parent->flushFire(); }
+void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
-void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent) _parent->getEventsFire(); }
+void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
@@ -68,12 +69,15 @@ JournalImpl::JournalImpl(const std::string& journalId,
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
- if (journalTimerPtr == 0)
- journalTimerPtr = new qpid::sys::Timer;
- assert (journalTimerPtr != 0);
- cnt++;
- journalTimerPtr->start();
- journalTimerPtr->add(inactivityFireEventPtr);
+ {
+ qpid::sys::Mutex::ScopedLock sl(_static_lock);
+ if (journalTimerPtr == 0)
+ journalTimerPtr = new qpid::sys::Timer;
+ assert (journalTimerPtr != 0);
+ cnt++;
+ journalTimerPtr->start();
+ journalTimerPtr->add(inactivityFireEventPtr);
+ }
if (_agent != 0)
{
@@ -112,11 +116,13 @@ JournalImpl::~JournalImpl()
inactivityFireEventPtr->cancel();
free_read_buffers();
- // TODO: Make this if() thread-safe
- if (journalTimerPtr && --cnt == 0)
{
- delete journalTimerPtr;
- journalTimerPtr = 0;
+ qpid::sys::Mutex::ScopedLock sl(_static_lock);
+ if (journalTimerPtr && --cnt == 0)
+ {
+ delete journalTimerPtr;
+ journalTimerPtr = 0;
+ }
}
if (_mgmtObject != 0) {
@@ -503,7 +509,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl)
{
const iores res = jcntl::flush(block_till_aio_cmpl);
{
- slock s(_getf_mutex);
+ qpid::sys::Mutex::ScopedLock sl(_getf_lock);
if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); }
}
return res;
@@ -533,7 +539,7 @@ JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const
void
JournalImpl::getEventsFire()
{
- slock s(_getf_mutex);
+ qpid::sys::Mutex::ScopedLock sl(_getf_lock);
getEventsTimerSetFlag = false;
if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
@@ -552,8 +558,11 @@ JournalImpl::flushFire()
}
}
inactivityFireEventPtr->setupNextFire();
- assert(journalTimerPtr != 0);
- journalTimerPtr->add(inactivityFireEventPtr);
+ {
+ qpid::sys::Mutex::ScopedLock sl(_static_lock);
+ assert(journalTimerPtr != 0);
+ journalTimerPtr->add(inactivityFireEventPtr);
+ }
}
void
diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h
index 2d1b869..a3f5a17 100644
--- a/lib/JournalImpl.h
+++ b/lib/JournalImpl.h
@@ -27,8 +27,6 @@
#include <set>
#include "jrnl/enums.hpp"
#include "jrnl/jcntl.hpp"
-#include "jrnl/slock.hpp"
-#include "jrnl/smutex.hpp"
#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/PersistableQueue.h>
@@ -47,38 +45,39 @@ namespace mrg {
class InactivityFireEvent : public qpid::sys::TimerTask
{
JournalImpl* _parent;
- mrg::journal::smutex _ife_mutex;
+ qpid::sys::Mutex _ife_lock;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout), _parent(p) {}
virtual ~InactivityFireEvent() {}
void fire();
- inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; }
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
};
class GetEventsFireEvent : public qpid::sys::TimerTask
{
JournalImpl* _parent;
- mrg::journal::smutex _gefe_mutex;
+ qpid::sys::Mutex _gefe_lock;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
qpid::sys::TimerTask(timeout), _parent(p) {}
virtual ~GetEventsFireEvent() {}
void fire();
- inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; }
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
};
class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
{
private:
+ static qpid::sys::Mutex _static_lock;
static qpid::sys::Timer* journalTimerPtr;
static u_int32_t cnt;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- mrg::journal::smutex _getf_mutex;
+ qpid::sys::Mutex _getf_lock;
u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 9b4bf25..ed3975d 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -362,10 +362,13 @@ void MessageStoreImpl::init()
void MessageStoreImpl::finalize()
{
if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
- for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
{
- JournalImpl* jQueue = i->second;
- if (jQueue->is_ready()) jQueue->stop(true);
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
+ {
+ JournalImpl* jQueue = i->second;
+ if (jQueue->is_ready()) jQueue->stop(true);
+ }
}
if (mgmtObject != 0) {
@@ -377,10 +380,13 @@ void MessageStoreImpl::finalize()
void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
{
if (isInit) {
- if (journalList.size()) { // check no queues exist
- std::ostringstream oss;
- oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
- THROW_STORE_EXCEPTION(oss.str());
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ if (journalList.size()) { // check no queues exist
+ std::ostringstream oss;
+ oss << "truncateInit() called with " << journalList.size() << " queues still in existence";
+ THROW_STORE_EXCEPTION(oss.str());
+ }
}
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
@@ -402,6 +408,7 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
void MessageStoreImpl::chkTplStoreInit()
{
+ // Don't take lock unless necessary
if (!tplStorePtr->is_ready()) {
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
if (!tplStorePtr->is_ready()) {
@@ -480,6 +487,9 @@ void MessageStoreImpl::create(PersistableQueue& queue,
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
std::string("JournalData"), defJournalGetEventsTimeout,
defJournalFlushTimeout, agent);
+ }
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queue.getName()]=jQueue;
}
@@ -517,7 +527,10 @@ void MessageStoreImpl::destroy(PersistableQueue& queue)
JournalImpl* jQueue = static_cast<JournalImpl*>(eqs);
jQueue->delete_jrnl_files();
queue.setExternalQueueStore(0); // will delete the journal if exists
- journalList.erase(journalList.find(queue.getName()));
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ journalList.erase(journalList.find(queue.getName()));
+ }
}
}
@@ -759,6 +772,9 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
{
qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+ }
+ {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
}
queue->setExternalQueueStore(dynamic_cast<ExternalQueueStore*>(jQueue));
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 076a0ca..136659f 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -126,6 +126,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
boost::shared_ptr<TplJournalImpl> tplStorePtr;
TplRecoverMap tplRecoverMap;
JournalListMap journalList;
+ qpid::sys::Mutex journalListLock;
IdSequence queueIdSequence;
IdSequence exchangeIdSequence;
--
1.6.6.1
>From 1392d6fdbf4625bcbcadc16ba323af467305e4d8 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 21 May 2010 16:46:34 +0000
Subject: [PATCH 02/13] Removed redundant locks; the previous checkin installed the correct lock in JournalImpl::JournalImpl.
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3982 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/MessageStoreImpl.cpp | 24 +++++++-----------------
lib/MessageStoreImpl.h | 2 --
2 files changed, 7 insertions(+), 19 deletions(-)
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index ed3975d..8cedb51 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -410,12 +410,9 @@ void MessageStoreImpl::chkTplStoreInit()
{
// Don't take lock unless necessary
if (!tplStorePtr->is_ready()) {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- if (!tplStorePtr->is_ready()) {
- journal::jdir::create_dir(getTplBaseDir());
- tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
- if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
- }
+ journal::jdir::create_dir(getTplBaseDir());
+ tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
+ if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
}
}
@@ -481,13 +478,8 @@ void MessageStoreImpl::create(PersistableQueue& queue,
return;
}
- {
- // TODO: Is this mutex necessary?
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue),
- std::string("JournalData"), defJournalGetEventsTimeout,
- defJournalFlushTimeout, agent);
- }
+ jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"),
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queue.getName()]=jQueue;
@@ -769,10 +761,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
- {
- qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock);
- jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
- }
+ jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 136659f..12e1d97 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -93,7 +93,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
typedef std::map<std::string, TplRecover> TplRecoverMap;
typedef TplRecoverMap::const_iterator TplRecoverMapCitr;
- typedef std::pair<std::string, JournalImpl*> JournalListPair;
typedef std::map<std::string, JournalImpl*> JournalListMap;
typedef JournalListMap::iterator JournalListMapItr;
@@ -149,7 +148,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
const char* envPath;
qmf::com::redhat::rhm::store::Store* mgmtObject;
- qpid::sys::Mutex jrnlCreateLock;
qpid::management::ManagementAgent* agent;
// Parameter validation and calculation
--
1.6.6.1
>From e62a68ecde439e5a98576cbbae65bbc3f3005449 Mon Sep 17 00:00:00 2001
From: aconway <aconway at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Thu, 27 May 2010 18:06:48 +0000
Subject: [PATCH 03/13] Bug 596765: Remove global shared_ptr to store in store plugin.
The global shared_ptr delays destruction of the store till after the broker is deleted causing core dumps when unregistering management objects.
https://bugzilla.redhat.com/show_bug.cgi?id=596765
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3995 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/MessageStoreImpl.cpp | 2 +-
lib/StorePlugin.cpp | 8 +++-----
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 8cedb51..e7cb405 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -428,11 +428,11 @@ void MessageStoreImpl::open(db_ptr db,
MessageStoreImpl::~MessageStoreImpl()
{
+ finalize();
try {
for (std::list<db_ptr >::iterator i = dbs.begin(); i != dbs.end(); i++) {
(*i)->close(0);
}
-// if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
} catch (const DbException& e) {
QPID_LOG(error, "Error closing BDB databases: " << e.what());
} catch (const journal::jexception& e) {
diff --git a/lib/StorePlugin.cpp b/lib/StorePlugin.cpp
index 1cbdbff..0fb3512 100644
--- a/lib/StorePlugin.cpp
+++ b/lib/StorePlugin.cpp
@@ -36,16 +36,15 @@ using namespace std;
struct StorePlugin : public Plugin {
mrg::msgstore::MessageStoreImpl::StoreOptions options;
- boost::shared_ptr<qpid::broker::MessageStore> store;
Options* getOptions() { return &options; }
void earlyInitialize (Plugin::Target& target)
{
Broker* broker = dynamic_cast<Broker*>(&target);
- store.reset(new mrg::msgstore::MessageStoreImpl ());
+ if (!broker) return;
+ boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl ());
DataDir& dataDir = broker->getDataDir ();
-
if (options.storeDir.empty ())
{
if (!dataDir.isEnabled ())
@@ -67,8 +66,7 @@ struct StorePlugin : public Plugin {
void finalize()
{
- MessageStore* sp = store.get();
- static_cast<mrg::msgstore::MessageStoreImpl*>(sp)->finalize();
+ // This function intentionally left blank
}
const char* id() {return "StorePlugin";}
--
1.6.6.1
>From 91dcd499e54fd82b0a808f705b18c2b6c5775bd0 Mon Sep 17 00:00:00 2001
From: aconway <aconway at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Mon, 31 May 2010 14:08:28 +0000
Subject: [PATCH 04/13] Skip cluster_tests.ShortTests.test_sasl as it depends on a SASL database not available in the store build environment.
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3996 06e15bec-b515-0410-bef0-cc27a458cf48
---
tests/cluster/run_python_cluster_tests | 7 ++++++-
1 files changed, 6 insertions(+), 1 deletions(-)
diff --git a/tests/cluster/run_python_cluster_tests b/tests/cluster/run_python_cluster_tests
index 4bd2126..ce96152 100755
--- a/tests/cluster/run_python_cluster_tests
+++ b/tests/cluster/run_python_cluster_tests
@@ -28,8 +28,13 @@ func_check_qpid_python || exit 0 # A warning, not a failure.
echo "Running Python cluster tests..."
OUTDIR=brokertest.tmp
rm -rf $OUTDIR
-# Ignore tests requiring a store by default.
+
+# Ignore tests known to fail.
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:-"-I ${CLUSTER_TESTS_FAIL}"}
+# Ignore tests that don't work in the store environment
+# SASL test needs sasl test database which is not installed.
+CLUSTER_TESTS_IGNORE="${CLUSTER_TESTS_IGNORE} -i cluster_tests.ShortTests.test_sasl"
+
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
TEST_CMD="${QPID_PYTHON_TEST} -m cluster_tests ${CLUSTER_TESTS_IGNORE} ${CLUSTER_TESTS} -DOUTDIR=$OUTDIR"
--
1.6.6.1
>From d4795a9796726dbdb1f911c81e3b2e899fbfc40e Mon Sep 17 00:00:00 2001
From: aconway <aconway at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Mon, 31 May 2010 19:31:45 +0000
Subject: [PATCH 05/13] Fix valgrind errors caused by order of destruction issue.
Added a callback so that MessageStoreImpl is informed when JournalImpl
instances are deleted and can remove them from its map.
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3997 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/JournalImpl.cpp | 7 +-
lib/JournalImpl.h | 467 +++++++++++++++++++++++-----------------------
lib/MessageStoreImpl.cpp | 17 ++-
lib/MessageStoreImpl.h | 5 +
4 files changed, 261 insertions(+), 235 deletions(-)
diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index ed1c334..a660d3c 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -53,7 +53,8 @@ JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* a):
+ qpid::management::ManagementAgent* a,
+ DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalBaseFilename),
getEventsTimerSetFlag(false),
lastReadRid(0),
@@ -65,7 +66,8 @@ JournalImpl::JournalImpl(const std::string& journalId,
_dtok(),
_external(false),
_agent(a),
- _mgmtObject(0)
+ _mgmtObject(0),
+ deleteCallback(onDelete)
{
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
@@ -108,6 +110,7 @@ JournalImpl::JournalImpl(const std::string& journalId,
JournalImpl::~JournalImpl()
{
+ if (deleteCallback) deleteCallback(*this);
if (_init_flag && !_stop_flag){
try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete!
catch (const jexception& e) { log(LOG_ERROR, e.what()); }
diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h
index a3f5a17..aab8467 100644
--- a/lib/JournalImpl.h
+++ b/lib/JournalImpl.h
@@ -1,25 +1,25 @@
/*
- Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
+ Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
- This file is part of the Qpid async store library msgstore.so.
+ This file is part of the Qpid async store library msgstore.so.
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or (at your option) any later version.
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
- The GNU Lesser General Public License is available in the file COPYING.
- */
+ The GNU Lesser General Public License is available in the file COPYING.
+*/
#ifndef _JournalImpl_
#define _JournalImpl_
@@ -38,219 +38,228 @@
#include "qmf/com/redhat/rhm/store/Journal.h"
namespace mrg {
- namespace msgstore {
-
- class JournalImpl;
-
- class InactivityFireEvent : public qpid::sys::TimerTask
- {
- JournalImpl* _parent;
- qpid::sys::Mutex _ife_lock;
-
- public:
- InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout), _parent(p) {}
- virtual ~InactivityFireEvent() {}
- void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
- };
-
- class GetEventsFireEvent : public qpid::sys::TimerTask
- {
- JournalImpl* _parent;
- qpid::sys::Mutex _gefe_lock;
-
- public:
- GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::sys::TimerTask(timeout), _parent(p) {}
- virtual ~GetEventsFireEvent() {}
- void fire();
- inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
- };
-
- class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
- {
- private:
- static qpid::sys::Mutex _static_lock;
- static qpid::sys::Timer* journalTimerPtr;
- static u_int32_t cnt;
-
- bool getEventsTimerSetFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
- qpid::sys::Mutex _getf_lock;
-
- u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
- std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
-
- bool writeActivityFlag;
- bool flushTriggeredFlag;
- boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
-
- // temp local vars for loadMsgContent below
- void* _xidp;
- void* _datap;
- size_t _dlen;
- mrg::journal::data_tok _dtok;
- bool _external;
-
- qpid::management::ManagementAgent* _agent;
- qmf::com::redhat::rhm::store::Journal* _mgmtObject;
-
- public:
- JournalImpl(const std::string& journalId,
- const std::string& journalDirectory,
- const std::string& journalBaseFilename,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent);
-
- virtual ~JournalImpl();
-
- void initialize(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
- mrg::journal::aio_callback* const cbp);
-
- inline void initialize(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks) {
- initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
- this);
- }
-
- void recover(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
- mrg::journal::aio_callback* const cbp,
- boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id);
-
- inline void recover(const u_int16_t num_jfiles,
- const bool auto_expand,
- const u_int16_t ae_max_jfiles,
- const u_int32_t jfsize_sblks,
- const u_int16_t wcache_num_pages,
- const u_int32_t wcache_pgsize_sblks,
- boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
- u_int64_t& highest_rid,
- u_int64_t queue_id) {
- recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
- this, prep_tx_list_ptr, highest_rid, queue_id);
- }
-
- void recover_complete();
-
- // Temporary fn to read and save last msg read from journal so it can be assigned
- // in chunks. To be replaced when coding to do this direct from the journal is ready.
- // Returns true if the record is extern, false if local.
- bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
-
- // Overrides for write inactivity timer
- void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, mrg::journal::data_tok* dtokp,
- const bool transient = false);
-
- void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
- const bool transient = false);
-
- void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
- const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
- const bool transient = false);
-
- void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
- const std::string& xid, const bool transient = false);
-
- void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
-
- void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
-
- mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
- size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
- bool ignore_pending_txns = false);
-
- void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
-
- void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
-
- void stop(bool block_till_aio_cmpl = false);
-
- // Logging
- void log(mrg::journal::log_level level, const std::string& log_stmt) const;
- void log(mrg::journal::log_level level, const char* const log_stmt) const;
-
- // Overrides for get_events timer
- mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
-
- // TimerTask callback
- void getEventsFire();
- void flushFire();
-
- // AIO callbacks
- virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
- virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
-
- qpid::management::ManagementObject* GetManagementObject (void) const
- { return _mgmtObject; }
-
- qpid::management::Manageable::status_t ManagementMethod (uint32_t,
- qpid::management::Args&,
- std::string&);
-
- private:
- void free_read_buffers();
-
- inline void setGetEventTimer()
- {
- assert(journalTimerPtr != 0);
- getEventsFireEventsPtr->setupNextFire();
- journalTimerPtr->add(getEventsFireEventsPtr);
- getEventsTimerSetFlag = true;
- }
- void handleIoResult(const mrg::journal::iores r);
-
- // Management instrumentation callbacks overridden from jcntl
- inline void instr_incr_outstanding_aio_cnt() {
- if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
- }
- inline void instr_decr_outstanding_aio_cnt() {
- if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
- }
- }; // class JournalImpl
-
- class TplJournalImpl : public JournalImpl
- {
- public:
- TplJournalImpl(const std::string& journalId,
- const std::string& journalDirectory,
- const std::string& journalBaseFilename,
- const qpid::sys::Duration getEventsTimeout,
- const qpid::sys::Duration flushTimeout,
- qpid::management::ManagementAgent* agent) :
- JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
- {}
-
- ~TplJournalImpl() {}
-
- // Special version of read_data_record that ignores transactions - needed when reading the TPL
- inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
- void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
- mrg::journal::data_tok* const dtokp) {
- return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
- }
- inline void read_reset() { _rmgr.invalidate(); }
- }; // class TplJournalImpl
-
- } // namespace msgstore
+namespace msgstore {
+
+class JournalImpl;
+
+class InactivityFireEvent : public qpid::sys::TimerTask
+{
+ JournalImpl* _parent;
+ qpid::sys::Mutex _ife_lock;
+
+ public:
+ InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::sys::TimerTask(timeout), _parent(p) {}
+ virtual ~InactivityFireEvent() {}
+ void fire();
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; }
+};
+
+class GetEventsFireEvent : public qpid::sys::TimerTask
+{
+ JournalImpl* _parent;
+ qpid::sys::Mutex _gefe_lock;
+
+ public:
+ GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
+ qpid::sys::TimerTask(timeout), _parent(p) {}
+ virtual ~GetEventsFireEvent() {}
+ void fire();
+ inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; }
+};
+
+class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback
+{
+ public:
+ typedef boost::function<void (JournalImpl&)> DeleteCallback;
+
+ private:
+ static qpid::sys::Mutex _static_lock;
+ static qpid::sys::Timer* journalTimerPtr;
+ static u_int32_t cnt;
+
+ bool getEventsTimerSetFlag;
+ boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
+ qpid::sys::Mutex _getf_lock;
+
+ u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests
+ std::vector<u_int64_t> oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence
+
+ bool writeActivityFlag;
+ bool flushTriggeredFlag;
+ boost::intrusive_ptr<qpid::sys::TimerTask> inactivityFireEventPtr;
+
+ // temp local vars for loadMsgContent below
+ void* _xidp;
+ void* _datap;
+ size_t _dlen;
+ mrg::journal::data_tok _dtok;
+ bool _external;
+
+ qpid::management::ManagementAgent* _agent;
+ qmf::com::redhat::rhm::store::Journal* _mgmtObject;
+ DeleteCallback deleteCallback;
+
+ public:
+
+ JournalImpl(const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* agent,
+ DeleteCallback deleteCallback=DeleteCallback() );
+
+ virtual ~JournalImpl();
+
+ void initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp);
+
+ inline void initialize(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks) {
+ initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ this);
+ }
+
+ void recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ mrg::journal::aio_callback* const cbp,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id);
+
+ inline void recover(const u_int16_t num_jfiles,
+ const bool auto_expand,
+ const u_int16_t ae_max_jfiles,
+ const u_int32_t jfsize_sblks,
+ const u_int16_t wcache_num_pages,
+ const u_int32_t wcache_pgsize_sblks,
+ boost::ptr_list<msgstore::PreparedTransaction>* prep_tx_list_ptr,
+ u_int64_t& highest_rid,
+ u_int64_t queue_id) {
+ recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks,
+ this, prep_tx_list_ptr, highest_rid, queue_id);
+ }
+
+ void recover_complete();
+
+ // Temporary fn to read and save last msg read from journal so it can be assigned
+ // in chunks. To be replaced when coding to do this direct from the journal is ready.
+ // Returns true if the record is extern, false if local.
+ bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
+
+ // Overrides for write inactivity timer
+ void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp,
+ const bool transient = false);
+
+ void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+ const bool transient = false);
+
+ void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
+ const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid,
+ const bool transient = false);
+
+ void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp,
+ const std::string& xid, const bool transient = false);
+
+ void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false);
+
+ void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false);
+
+ mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff,
+ size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp,
+ bool ignore_pending_txns = false);
+
+ void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid);
+
+ void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid);
+
+ void stop(bool block_till_aio_cmpl = false);
+
+ // Logging
+ void log(mrg::journal::log_level level, const std::string& log_stmt) const;
+ void log(mrg::journal::log_level level, const char* const log_stmt) const;
+
+ // Overrides for get_events timer
+ mrg::journal::iores flush(const bool block_till_aio_cmpl = false);
+
+ // TimerTask callback
+ void getEventsFire();
+ void flushFire();
+
+ // AIO callbacks
+ virtual void wr_aio_cb(std::vector<mrg::journal::data_tok*>& dtokl);
+ virtual void rd_aio_cb(std::vector<u_int16_t>& pil);
+
+ qpid::management::ManagementObject* GetManagementObject (void) const
+ { return _mgmtObject; }
+
+ qpid::management::Manageable::status_t ManagementMethod (uint32_t,
+ qpid::management::Args&,
+ std::string&);
+
+ void resetDeleteCallback() { deleteCallback = DeleteCallback(); }
+
+ private:
+ void free_read_buffers();
+
+ inline void setGetEventTimer()
+ {
+ assert(journalTimerPtr != 0);
+ getEventsFireEventsPtr->setupNextFire();
+ journalTimerPtr->add(getEventsFireEventsPtr);
+ getEventsTimerSetFlag = true;
+ }
+ void handleIoResult(const mrg::journal::iores r);
+
+ // Management instrumentation callbacks overridden from jcntl
+ inline void instr_incr_outstanding_aio_cnt() {
+ if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
+ }
+ inline void instr_decr_outstanding_aio_cnt() {
+ if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
+ }
+
+}; // class JournalImpl
+
+class TplJournalImpl : public JournalImpl
+{
+ public:
+ TplJournalImpl(const std::string& journalId,
+ const std::string& journalDirectory,
+ const std::string& journalBaseFilename,
+ const qpid::sys::Duration getEventsTimeout,
+ const qpid::sys::Duration flushTimeout,
+ qpid::management::ManagementAgent* agent) :
+ JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+ {}
+
+ ~TplJournalImpl() {}
+
+ // Special version of read_data_record that ignores transactions - needed when reading the TPL
+ inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize,
+ void** const xidpp, std::size_t& xidsize, bool& transient, bool& external,
+ mrg::journal::data_tok* const dtokp) {
+ return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
+ }
+ inline void read_reset() { _rmgr.invalidate(); }
+}; // class TplJournalImpl
+
+} // namespace msgstore
} // namespace mrg
#endif
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index e7cb405..04297e8 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -367,6 +367,7 @@ void MessageStoreImpl::finalize()
for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
{
JournalImpl* jQueue = i->second;
+ jQueue->resetDeleteCallback();
if (jQueue->is_ready()) jQueue->stop(true);
}
}
@@ -479,7 +480,8 @@ void MessageStoreImpl::create(PersistableQueue& queue,
}
jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"),
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queue.getName()]=jQueue;
@@ -521,7 +523,7 @@ void MessageStoreImpl::destroy(PersistableQueue& queue)
queue.setExternalQueueStore(0); // will delete the journal if exists
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
- journalList.erase(journalList.find(queue.getName()));
+ journalList.erase(queue.getName());
}
}
}
@@ -762,7 +764,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
break;
}
jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
- defJournalGetEventsTimeout, defJournalFlushTimeout, agent);
+ defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
+ boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
journalList[queueName] = jQueue;
@@ -1644,6 +1647,11 @@ std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for
std::string MessageStoreImpl::getStoreDir() const { return storeDir; }
+void MessageStoreImpl::journalDeleted(JournalImpl& j) {
+ qpid::sys::Mutex::ScopedLock sl(journalListLock);
+ journalList.erase(j.id());
+}
+
MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
qpid::Options(name),
numJrnlFiles(defNumJrnlFiles),
@@ -1668,7 +1676,7 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
// "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
- ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
+ ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
"Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
("truncate", qpid::optValue(truncateFlag, "yes|no"),
"If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve "
@@ -1687,3 +1695,4 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) :
"Lower values decrease latency at the expense of throughput.")
;
}
+
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 12e1d97..d650020 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -149,6 +149,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
qmf::com::redhat::rhm::store::Store* mgmtObject;
qpid::management::ManagementAgent* agent;
+
// Parameter validation and calculation
static u_int16_t chkJrnlNumFilesParam(const u_int16_t param,
@@ -359,6 +360,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
{ return qpid::management::Manageable::STATUS_OK; }
std::string getStoreDir() const;
+
+ private:
+ void journalDeleted(JournalImpl&);
+
}; // class MessageStoreImpl
} // namespace msgstore
--
1.6.6.1
>From 821aca95eeb79c7c83bc6d47176d188d8a8a1ba1 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Tue, 1 Jun 2010 16:02:36 +0000
Subject: [PATCH 06/13] Fix for Bug 598557: "qpidd --no-data dir with store loaded segfaults".
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3998 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/MessageStoreImpl.cpp | 2 +-
1 files changed, 1 insertions(+), 1 deletions(-)
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 04297e8..e3b2599 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -361,7 +361,7 @@ void MessageStoreImpl::init()
void MessageStoreImpl::finalize()
{
- if (tplStorePtr->is_ready()) tplStorePtr->stop(true);
+ if (tplStorePtr.get() && tplStorePtr->is_ready()) tplStorePtr->stop(true);
{
qpid::sys::Mutex::ScopedLock sl(journalListLock);
for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++)
--
1.6.6.1
>From 3e2438975882993c66f4f8b9fb40dd37da3fe95e Mon Sep 17 00:00:00 2001
From: gordonsim <gordonsim at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Wed, 2 Jun 2010 19:33:45 +0000
Subject: [PATCH 07/13] Set reliability for link to prevent auto-delete being set on subscription queues (broker now cleans these up even for connections that are open when broker is shutdown)
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4005 06e15bec-b515-0410-bef0-cc27a458cf48
---
tests/python_tests/client_persistence.py | 6 +++---
tests/python_tests/store_test.py | 4 +++-
2 files changed, 6 insertions(+), 4 deletions(-)
diff --git a/tests/python_tests/client_persistence.py b/tests/python_tests/client_persistence.py
index f16e548..dc197dc 100644
--- a/tests/python_tests/client_persistence.py
+++ b/tests/python_tests/client_persistence.py
@@ -103,9 +103,9 @@ class ExchangeQueueTests(StoreTest):
broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK)
ssn = broker.connect().session()
snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}")
- ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True}}")
- ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True}}")
- ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}")
+ ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}")
msg1 = Message("Msg1", durable=True, correlation_id="Msg0001")
snd.send(msg1)
msg2 = Message("Msg2", durable=True, correlation_id="Msg0002")
diff --git a/tests/python_tests/store_test.py b/tests/python_tests/store_test.py
index 61d3687..87dcefa 100644
--- a/tests/python_tests/store_test.py
+++ b/tests/python_tests/store_test.py
@@ -301,9 +301,11 @@ class StoreTest(BrokerTest):
x_bindings_list = []
for binding in binding_list:
x_bindings_list.append("{exchange: %s, key: %s}" % binding)
+ if durable: reliability = 'at-least-once'
+ else: reliability = None
return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True,
link_name=link_name, durable=durable, x_declare_list=x_declare_list,
- x_bindings_list=x_bindings_list)
+ x_bindings_list=x_bindings_list, link_reliability=reliability)
def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False):
"""Check that a message is on a queue by dequeuing it and comparing it to the expected message"""
--
1.6.6.1
>From 22c33fd72f2e8e32a6537b1891934834dc7f2d95 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 4 Jun 2010 17:37:23 +0000
Subject: [PATCH 08/13] Fixes for various Coverity-indicated problems: 11689(MessageStoreImpl.cpp), 11691(jdir.cpp) and 11688(JournalImpl.cpp).
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4008 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/JournalImpl.cpp | 4 +++-
lib/MessageStoreImpl.cpp | 7 +++----
lib/jrnl/jdir.cpp | 1 +
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index a660d3c..5e1ed7a 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -498,7 +498,9 @@ JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid)
void
JournalImpl::stop(bool block_till_aio_cmpl)
{
- (dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get()))->cancel();
+ InactivityFireEvent* ifep = dynamic_cast<InactivityFireEvent*>(inactivityFireEventPtr.get());
+ assert(ifep); // dynamic_cast can return null if the cast fails
+ ifep->cancel();
jcntl::stop(block_till_aio_cmpl);
if (_mgmtObject != 0) {
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index e3b2599..2262b0d 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -193,8 +193,7 @@ void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts,
<< JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value.");
return;
}
- u_int16_t q = opts->autoJrnlExpandMaxFiles;
- if (q && q == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) {
+ if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) {
// num-jfiles is different from the default AND max-auto-expand-jfiles is still at default
// change value of max-auto-expand-jfiles
autoJrnlExpand = true;
@@ -1327,10 +1326,10 @@ void MessageStoreImpl::store(const PersistableQueue* queue,
}
}
} else {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: queue NULL.");
+ THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL."));
}
} catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " +
+ THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " +
e.what());
}
}
diff --git a/lib/jrnl/jdir.cpp b/lib/jrnl/jdir.cpp
index 74651bd..d26cef0 100644
--- a/lib/jrnl/jdir.cpp
+++ b/lib/jrnl/jdir.cpp
@@ -202,6 +202,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const
break;
}
}
+ close_dir(dir, dirname, "push_down");
return bak_dir_name;
}
--
1.6.6.1
>From fbd21b64d3caa4c6a1937dcc560973de1c173b2d Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 4 Jun 2010 18:43:28 +0000
Subject: [PATCH 09/13] Further tidy-up: closing directory handles in exception paths
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4009 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/jrnl/jdir.cpp | 5 +++++
1 files changed, 5 insertions(+), 0 deletions(-)
diff --git a/lib/jrnl/jdir.cpp b/lib/jrnl/jdir.cpp
index d26cef0..b718f74 100644
--- a/lib/jrnl/jdir.cpp
+++ b/lib/jrnl/jdir.cpp
@@ -152,6 +152,7 @@ jdir::clear_dir(const std::string& dirname, const std::string&
newname << bak_dir << "/" << entry->d_name;
if (::rename(oldname.str().c_str(), newname.str().c_str()))
{
+ ::closedir(dir);
std::ostringstream oss;
oss << "file=\"" << oldname.str() << "\" dest=\"" <<
newname.str() << "\"" << FORMAT_SYSERR(errno);
@@ -195,6 +196,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const
newname << bak_dir_name << "/" << target_dir;
if (::rename(oldname.str().c_str(), newname.str().c_str()))
{
+ ::closedir(dir);
std::ostringstream oss;
oss << "file=\"" << oldname.str() << "\" dest=\"" << newname.str() << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir", "push_down");
@@ -284,6 +286,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
std::string full_name(dirname + "/" + entry->d_name);
if (::stat(full_name.c_str(), &s))
{
+ ::closedir(dir);
std::ostringstream oss;
oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir");
@@ -294,6 +297,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
{
if(::unlink(full_name.c_str()))
{
+ ::closedir(dir);
std::ostringstream oss;
oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno);
throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir");
@@ -305,6 +309,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only)
}
else // all other types, throw up!
{
+ ::closedir(dir);
std::ostringstream oss;
oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink.";
oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")";
--
1.6.6.1
>From 7d09142721e18ff5769b9c35c446eab723793d44 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Tue, 8 Jun 2010 19:11:00 +0000
Subject: [PATCH 10/13] Fix for a recent regression in r.3982 in which a lock wich protects the TPL from being initialized by multiple threads was erroneously removed. The lock is now replaced.
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4017 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/MessageStoreImpl.cpp | 3 ++-
lib/MessageStoreImpl.h | 1 +
2 files changed, 3 insertions(+), 1 deletions(-)
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 2262b0d..5f98055 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -408,7 +408,8 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles)
void MessageStoreImpl::chkTplStoreInit()
{
- // Don't take lock unless necessary
+ // Prevent multiple threads from late-initializing the TPL
+ qpid::sys::Mutex::ScopedLock sl(tplInitLock);
if (!tplStorePtr->is_ready()) {
journal::jdir::create_dir(getTplBaseDir());
tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index d650020..2659f32 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -124,6 +124,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
// Pointer to Transaction Prepared List (TPL) journal instance
boost::shared_ptr<TplJournalImpl> tplStorePtr;
TplRecoverMap tplRecoverMap;
+ qpid::sys::Mutex tplInitLock;
JournalListMap journalList;
qpid::sys::Mutex journalListLock;
--
1.6.6.1
>From 1bb317d8e88c910b5247b54a9530a5505fb67168 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Thu, 17 Jun 2010 18:58:04 +0000
Subject: [PATCH 11/13] Added variable MSGSTORE_VERSION_INFO to control msgstore.so.x.x.x lib version numbers
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4034 06e15bec-b515-0410-bef0-cc27a458cf48
---
configure.ac | 9 +--------
lib/Makefile.am | 6 ++++--
2 files changed, 5 insertions(+), 10 deletions(-)
diff --git a/configure.ac b/configure.ac
index 9a32097..3c014d9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -21,7 +21,7 @@ dnl The GNU Lesser General Public License is available in the file COPYING.
dnl
dnl Process this file with autoconf to produce a configure script.
-AC_INIT([msg-store], [0.6], [rhemrg-users-list at redhat.com])
+AC_INIT([msg-store], [0.7], [rhemrg-users-list at redhat.com])
AC_CONFIG_AUX_DIR([build-aux])
AM_INIT_AUTOMAKE([dist-bzip2])
@@ -201,13 +201,6 @@ if test x$DB_CXX_HEADER_PREFIX = x; then
fi
AC_SUBST(DB_CXX_HEADER_PREFIX)
-# Set the argument to be used in "libtool -version-info ARG".
-QPID_CURRENT=1
-QPID_REVISION=0
-QPID_AGE=1
-LIBTOOL_VERSION_INFO_ARG=$QPID_CURRENT:$QPID_REVISION:$QPID_AGE
-AC_SUBST(LIBTOOL_VERSION_INFO_ARG)
-
gl_CLOCK_TIME
# We use valgrind for the tests. See if it's available.
diff --git a/lib/Makefile.am b/lib/Makefile.am
index ab72d96..8f0301b 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -34,8 +34,10 @@ msgstore_la_LIBADD = \
$(LIB_CLOCK_GETTIME) \
$(QPID_LIBS)
-msgstore_la_LDFLAGS = \
- $(PLUGINLDFLAGS)
+MSGSTORE_VERSION_INFO = 1:0:0
+msgstore_la_LDFLAGS = \
+ $(PLUGINLDFLAGS) \
+ -version-info $(MSGSTORE_VERSION_INFO)
msgstore_la_SOURCES = \
StorePlugin.cpp \
--
1.6.6.1
>From 01305c0b44a6167ca587ddd940361bd623677564 Mon Sep 17 00:00:00 2001
From: kpvdr <kpvdr at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Fri, 18 Jun 2010 14:06:28 +0000
Subject: [PATCH 12/13] Removed the lib version info from the previous checkin
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4036 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/Makefile.am | 10 ++++------
1 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/lib/Makefile.am b/lib/Makefile.am
index 8f0301b..95428f1 100644
--- a/lib/Makefile.am
+++ b/lib/Makefile.am
@@ -27,19 +27,17 @@ PLUGINLDFLAGS=-no-undefined -module -avoid-version
dmoduledir=$(libdir)/qpid/daemon
dmodule_LTLIBRARIES = msgstore.la
-msgstore_la_LIBADD = \
+msgstore_la_LIBADD = \
$(APR_LIBS) \
$(LIB_DLOPEN) \
$(LIB_BERKELEY_DB) \
$(LIB_CLOCK_GETTIME) \
$(QPID_LIBS)
-MSGSTORE_VERSION_INFO = 1:0:0
-msgstore_la_LDFLAGS = \
- $(PLUGINLDFLAGS) \
- -version-info $(MSGSTORE_VERSION_INFO)
+msgstore_la_LDFLAGS = \
+ $(PLUGINLDFLAGS)
-msgstore_la_SOURCES = \
+msgstore_la_SOURCES = \
StorePlugin.cpp \
BindingDbt.cpp \
BufferValue.cpp \
--
1.6.6.1
>From fe4143cc7226143cb3eb025efcf0e6a8d873866d Mon Sep 17 00:00:00 2001
From: aconway <aconway at 06e15bec-b515-0410-bef0-cc27a458cf48>
Date: Mon, 28 Jun 2010 18:18:31 +0000
Subject: [PATCH 13/13] Bug 607748 - Crash on exit in store cluster tests.
This is an order-of-static-destructors problem.
This is an order-of-static-destructors problem. Fixed by having the
store use the broker's Timer. This ensures orderly shut down as the
brokers destructor will destroy the store first and then the timer.
git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4053 06e15bec-b515-0410-bef0-cc27a458cf48
---
lib/JournalImpl.cpp | 31 ++++++----------------------
lib/JournalImpl.h | 17 ++++++++++-----
lib/MessageStoreImpl.cpp | 9 ++++---
lib/MessageStoreImpl.h | 7 +++++-
lib/StorePlugin.cpp | 2 +-
tests/OrderingTest.cpp | 7 ++++-
tests/SimpleTest.cpp | 45 ++++++++++++++++++++++-------------------
tests/TransactionalTest.cpp | 9 +++++--
tests/TwoPhaseCommitTest.cpp | 9 +++++--
9 files changed, 71 insertions(+), 65 deletions(-)
diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp
index 5e1ed7a..962125b 100644
--- a/lib/JournalImpl.cpp
+++ b/lib/JournalImpl.cpp
@@ -33,6 +33,7 @@
#include "qmf/com/redhat/rhm/store/EventFull.h"
#include "qmf/com/redhat/rhm/store/EventRecovered.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Timer.h"
#include "StoreException.h"
using namespace mrg::msgstore;
@@ -40,15 +41,12 @@ using namespace mrg::journal;
using qpid::management::ManagementAgent;
namespace _qmf = qmf::com::redhat::rhm::store;
-qpid::sys::Mutex JournalImpl::_static_lock;
-qpid::sys::Timer* JournalImpl::journalTimerPtr = 0;
-u_int32_t JournalImpl::cnt = 0;
-
void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); }
void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); }
-JournalImpl::JournalImpl(const std::string& journalId,
+JournalImpl::JournalImpl(qpid::sys::Timer& timer_,
+ const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
@@ -56,6 +54,7 @@ JournalImpl::JournalImpl(const std::string& journalId,
qpid::management::ManagementAgent* a,
DeleteCallback onDelete):
jcntl(journalId, journalDirectory, journalBaseFilename),
+ timer(timer_),
getEventsTimerSetFlag(false),
lastReadRid(0),
writeActivityFlag(false),
@@ -72,13 +71,8 @@ JournalImpl::JournalImpl(const std::string& journalId,
getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout);
{
- qpid::sys::Mutex::ScopedLock sl(_static_lock);
- if (journalTimerPtr == 0)
- journalTimerPtr = new qpid::sys::Timer;
- assert (journalTimerPtr != 0);
- cnt++;
- journalTimerPtr->start();
- journalTimerPtr->add(inactivityFireEventPtr);
+ timer.start();
+ timer.add(inactivityFireEventPtr);
}
if (_agent != 0)
@@ -119,15 +113,6 @@ JournalImpl::~JournalImpl()
inactivityFireEventPtr->cancel();
free_read_buffers();
- {
- qpid::sys::Mutex::ScopedLock sl(_static_lock);
- if (journalTimerPtr && --cnt == 0)
- {
- delete journalTimerPtr;
- journalTimerPtr = 0;
- }
- }
-
if (_mgmtObject != 0) {
_mgmtObject->resourceDestroy();
_mgmtObject = 0;
@@ -564,9 +549,7 @@ JournalImpl::flushFire()
}
inactivityFireEventPtr->setupNextFire();
{
- qpid::sys::Mutex::ScopedLock sl(_static_lock);
- assert(journalTimerPtr != 0);
- journalTimerPtr->add(inactivityFireEventPtr);
+ timer.add(inactivityFireEventPtr);
}
}
diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h
index aab8467..b85cf02 100644
--- a/lib/JournalImpl.h
+++ b/lib/JournalImpl.h
@@ -37,6 +37,10 @@
#include "qpid/management/Manageable.h"
#include "qmf/com/redhat/rhm/store/Journal.h"
+namespace qpid { namespace sys {
+class Timer;
+}}
+
namespace mrg {
namespace msgstore {
@@ -75,9 +79,9 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
private:
static qpid::sys::Mutex _static_lock;
- static qpid::sys::Timer* journalTimerPtr;
static u_int32_t cnt;
+ qpid::sys::Timer& timer;
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::sys::TimerTask> getEventsFireEventsPtr;
qpid::sys::Mutex _getf_lock;
@@ -102,7 +106,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
public:
- JournalImpl(const std::string& journalId,
+ JournalImpl(qpid::sys::Timer& timer,
+ const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
@@ -219,9 +224,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
inline void setGetEventTimer()
{
- assert(journalTimerPtr != 0);
getEventsFireEventsPtr->setupNextFire();
- journalTimerPtr->add(getEventsFireEventsPtr);
+ timer.add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
}
void handleIoResult(const mrg::journal::iores r);
@@ -239,13 +243,14 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal
class TplJournalImpl : public JournalImpl
{
public:
- TplJournalImpl(const std::string& journalId,
+ TplJournalImpl(qpid::sys::Timer& timer,
+ const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
const qpid::sys::Duration getEventsTimeout,
const qpid::sys::Duration flushTimeout,
qpid::management::ManagementAgent* agent) :
- JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
+ JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent)
{}
~TplJournalImpl() {}
diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp
index 5f98055..e4f98b5 100644
--- a/lib/MessageStoreImpl.cpp
+++ b/lib/MessageStoreImpl.cpp
@@ -62,7 +62,7 @@ MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid,
tpc_flag(_tpc_flag)
{}
-MessageStoreImpl::MessageStoreImpl(const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
numJrnlFiles(0),
autoJrnlExpand(false),
autoJrnlExpandMaxFiles(0),
@@ -77,6 +77,7 @@ MessageStoreImpl::MessageStoreImpl(const char* envpath) :
highestRid(0),
isInit(false),
envPath(envpath),
+ timer(timer_),
mgmtObject(0),
agent(0)
{}
@@ -339,7 +340,7 @@ void MessageStoreImpl::init()
open(mappingDb, txn.get(), "mappings.db", true);
open(bindingDb, txn.get(), "bindings.db", true);
open(generalDb, txn.get(), "general.db", false);
- tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
+ tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent));
txn.commit();
} catch (const journal::jexception& e) {
QPID_LOG(error, "Journal Exception occurred while initializing store: " << e);
@@ -479,7 +480,7 @@ void MessageStoreImpl::create(PersistableQueue& queue,
return;
}
- jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"),
+ jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
@@ -763,7 +764,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn,
QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
break;
}
- jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+ jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"),
defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
{
diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h
index 2659f32..8e46dd2 100644
--- a/lib/MessageStoreImpl.h
+++ b/lib/MessageStoreImpl.h
@@ -45,6 +45,10 @@
#define DB_BUFFER_SMALL ENOMEM
#endif
+namespace qpid { namespace sys {
+class Timer;
+}}
+
namespace mrg {
namespace msgstore {
@@ -147,6 +151,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
u_int64_t highestRid;
bool isInit;
const char* envPath;
+ qpid::sys::Timer& timer;
qmf::com::redhat::rhm::store::Store* mgmtObject;
qpid::management::ManagementAgent* agent;
@@ -266,7 +271,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem
public:
typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
- MessageStoreImpl(const char* envpath = 0);
+ MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
virtual ~MessageStoreImpl();
diff --git a/lib/StorePlugin.cpp b/lib/StorePlugin.cpp
index 0fb3512..8231bd6 100644
--- a/lib/StorePlugin.cpp
+++ b/lib/StorePlugin.cpp
@@ -43,7 +43,7 @@ struct StorePlugin : public Plugin {
{
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl ());
+ boost::shared_ptr<qpid::broker::MessageStore> store(new mrg::msgstore::MessageStoreImpl (broker->getTimer()));
DataDir& dataDir = broker->getDataDir ();
if (options.storeDir.empty ())
{
diff --git a/tests/OrderingTest.cpp b/tests/OrderingTest.cpp
index 16f88d0..10fda1d 100644
--- a/tests/OrderingTest.cpp
+++ b/tests/OrderingTest.cpp
@@ -30,6 +30,9 @@
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -59,7 +62,7 @@ int counter = 1;
void setup()
{
- store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
store->init(test_dir, 4, 1, true); // truncate store
queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
@@ -98,7 +101,7 @@ void restart()
queue.reset();
store.reset();
- store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl());
+ store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(timer));
store->init(test_dir, 4, 1);
ExchangeRegistry exchanges;
LinkRegistry links;
diff --git a/tests/SimpleTest.cpp b/tests/SimpleTest.cpp
index 4d5f155..c62869d 100644
--- a/tests/SimpleTest.cpp
+++ b/tests/SimpleTest.cpp
@@ -32,6 +32,9 @@
#include <qpid/framing/AMQHeaderBody.h>
#include <qpid/framing/FieldTable.h>
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -92,7 +95,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
const string& key, const FieldTable& args)
{
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
@@ -102,7 +105,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
store.bind(*exchange, *queue, key, args);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -121,7 +124,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName,
store.unbind(*exchange, *queue, key, args);
}
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -148,7 +151,7 @@ QPID_AUTO_TEST_CASE(CreateDelete)
SET_LOG_LEVEL("error+"); // This only needs to be set once.
cout << test_filename << ".CreateDelete: " << flush;
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
string name("CreateDeleteQueue");
Queue queue(name, 0, &store, 0);
@@ -164,7 +167,7 @@ QPID_AUTO_TEST_CASE(CreateDelete)
QPID_AUTO_TEST_CASE(EmptyRecover)
{
cout << test_filename << ".EmptyRecover: " << flush;
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
QueueRegistry registry;
registry.setStore (&store);
@@ -181,7 +184,7 @@ QPID_AUTO_TEST_CASE(QueueCreate)
uint64_t id(0);
string name("MyDurableQueue");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
@@ -189,7 +192,7 @@ QPID_AUTO_TEST_CASE(QueueCreate)
id = queue.getPersistenceId();
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -209,7 +212,7 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
std::auto_ptr<QueuePolicy> policy( QueuePolicy::createQueuePolicy(101, 202));
string name("MyDurableQueue");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
FieldTable settings;
@@ -218,7 +221,7 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
BOOST_REQUIRE(queue.getPersistenceId());
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -239,14 +242,14 @@ QPID_AUTO_TEST_CASE(QueueDestroy)
string name("MyDurableQueue");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue queue(name, 0, &store, 0);
store.create(queue, qpid::framing::FieldTable());
store.destroy(queue);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -272,7 +275,7 @@ QPID_AUTO_TEST_CASE(Enqueue)
string data1("abcdefg");
string data2("hijklmn");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -290,7 +293,7 @@ QPID_AUTO_TEST_CASE(Enqueue)
queue->enqueue(0, msg);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -331,7 +334,7 @@ QPID_AUTO_TEST_CASE(Dequeue)
string routingKey("MyRoutingKey");
Uuid messageId(true);
string data("abcdefg");
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
FieldTable settings;
@@ -347,7 +350,7 @@ QPID_AUTO_TEST_CASE(Dequeue)
queue->dequeue(0, qm);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
QueueRegistry registry;
registry.setStore (&store);
@@ -370,7 +373,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
FieldTable args;
args.setString("a", "A");
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
ExchangeRegistry registry;
Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
@@ -379,7 +382,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
BOOST_REQUIRE(id);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
@@ -393,7 +396,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
store.destroy(*exchange);
}
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry registry;
@@ -441,7 +444,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
string key("my-routing-key");
FieldTable args;
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1, true); // truncate store
Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
@@ -455,7 +458,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
store.destroy(*queue1);
}//db will be closed
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
@@ -472,7 +475,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
store.destroy(*exchange);
}
{
- MessageStoreImpl store;
+ MessageStoreImpl store(timer);
store.init(test_dir, 4, 1);
ExchangeRegistry exchanges;
QueueRegistry queues;
diff --git a/tests/TransactionalTest.cpp b/tests/TransactionalTest.cpp
index d6f6d7f..ac5a6b6 100644
--- a/tests/TransactionalTest.cpp
+++ b/tests/TransactionalTest.cpp
@@ -32,6 +32,9 @@
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -69,7 +72,7 @@ class TestTxnCtxt : public TxnCtxt
class TestMessageStore: public MessageStoreImpl
{
public:
- TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+ TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {}
std::auto_ptr<qpid::broker::TransactionContext> begin() {
checkInit();
// pass sequence number for c/a
@@ -109,7 +112,7 @@ Queue::shared_ptr queueB;
template <class T>
void setup()
{
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
@@ -128,7 +131,7 @@ void restart()
queues.reset();
store.reset();
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1);
queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
ExchangeRegistry exchanges;
diff --git a/tests/TwoPhaseCommitTest.cpp b/tests/TwoPhaseCommitTest.cpp
index 86d3976..f442310 100644
--- a/tests/TwoPhaseCommitTest.cpp
+++ b/tests/TwoPhaseCommitTest.cpp
@@ -32,6 +32,9 @@
#include "qpid/log/Statement.h"
#include "TxnCtxt.h"
#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::sys::Timer timer;
#define SET_LOG_LEVEL(level) \
qpid::log::Options opts(""); \
@@ -182,7 +185,7 @@ class TwoPhaseCommitTest
class TestMessageStore: public MessageStoreImpl
{
public:
- TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {}
+ TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {}
std::auto_ptr<qpid::broker::TPCTransactionContext> begin(const std::string& xid) {
checkInit();
IdSequence* jtx = &messageIdSequence;
@@ -325,7 +328,7 @@ class TwoPhaseCommitTest
template <class T>
void setup()
{
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1, true); // truncate store
//create two queues:
@@ -353,7 +356,7 @@ class TwoPhaseCommitTest
queues.reset();
links.reset();
- store = std::auto_ptr<T>(new T());
+ store = std::auto_ptr<T>(new T(timer));
store->init(test_dir, 4, 1);
sys::Timer t;
ExchangeRegistry exchanges;
--
1.6.6.1
More information about the scm-commits
mailing list