Don't store monitor probes to other nodes
Signed-off-by: Angus Salkeld <asalkeld(a)redhat.com>
---
configure.ac | 6 +
src/Makefile.am | 17 +-
src/assembly.cpp | 547 ++++++++++++++++++++++++--------------------------
src/assembly.h | 37 ++--
src/cf2pe.xsl | 84 ++++++++
src/common_agent.cpp | 16 +-
src/deployable.cpp | 390 +++++++++++++++++++++++-------------
src/deployable.h | 50 +++--
src/dpe_agent.cpp | 6 +-
src/mainloop.cpp | 22 ++-
src/mainloop.h | 5 +
src/pcmk_pe.c | 145 +++++++++-----
src/pcmk_pe.h | 13 +-
src/resource.cpp | 301 +++++++++++++++++++++++++++
src/resource.h | 68 +++++++
15 files changed, 1170 insertions(+), 537 deletions(-)
create mode 100644 src/cf2pe.xsl
create mode 100644 src/resource.cpp
create mode 100644 src/resource.h
diff --git a/configure.ac b/configure.ac
index b7cf75c..02b9dd8 100644
--- a/configure.ac
+++ b/configure.ac
@@ -36,7 +36,9 @@ fi
PKG_CHECK_MODULES(glib, glib-2.0 >= 2.0)
PKG_CHECK_MODULES(dbus_glib_1, dbus-glib-1 >= 0.25)
PKG_CHECK_MODULES(libxml2, libxml-2.0 >= 2.7.7)
+PKG_CHECK_MODULES(libxslt, libxslt >= 0.25)
PKG_CHECK_MODULES(libqb, libqb)
+PKG_CHECK_MODULES(uuid, uuid)
# Checks for typedefs, structures, and compiler characteristics.
AC_HEADER_STDBOOL
@@ -179,12 +181,16 @@ AC_MSG_RESULT([ glib_CFLAGS = ${glib_CFLAGS}])
AC_MSG_RESULT([ glib_LIBS = ${glib_LIBS}])
AC_MSG_RESULT([ libxml2_CFLAGS = ${libxml2_CFLAGS}])
AC_MSG_RESULT([ libxml2_LIBS = ${libxml2_LIBS}])
+AC_MSG_RESULT([ libxslt_CFLAGS = ${libxslt_CFLAGS}])
+AC_MSG_RESULT([ libxslt_LIBS = ${libxslt_LIBS}])
AC_MSG_RESULT([ dbus_glib_1_CFLAGS = ${dbus_glib_1_CFLAGS}])
AC_MSG_RESULT([ dbus_glib_1_LIBS = ${dbus_glib_1_LIBS}])
AC_MSG_RESULT([ qmf_CFLAGS = ${qmf_CFLAGS}])
AC_MSG_RESULT([ qmf_LIBS = ${qmf_LIBS}])
AC_MSG_RESULT([ pcmk_CFLAGS = ${pcmk_CFLAGS}])
AC_MSG_RESULT([ pcmk_LIBS = ${pcmk_LIBS}])
+AC_MSG_RESULT([ uuid_CFLAGS = ${uuid_CFLAGS}])
+AC_MSG_RESULT([ uuid_LIBS = ${uuid_LIBS}])
AC_MSG_RESULT([])
AC_MSG_RESULT([$PACKAGE build info:])
AC_MSG_RESULT([ Final CFLAGS = ${CFLAGS}])
diff --git a/src/Makefile.am b/src/Makefile.am
index 144fea4..b75ddff 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -27,14 +27,17 @@ upstartconf_DATA = dped.conf
systemdconfdir = /lib/systemd/system
systemdconf_DATA = dped@.service
-EXTRA_DIST = $(upstartconf_DATA) $(systemdconf_DATA)
+xsldir = $(datadir)/pacemaker-cloud
+xsl_DATA = cf2pe.xsl
+
+EXTRA_DIST = $(upstartconf_DATA) $(systemdconf_DATA) $(xsl_DATA)
sbin_PROGRAMS = cped dped
noinst_PROGRAMS = cpe-tool2 pe_test
noinst_HEADERS = assembly.h config.h cpe_agent.h dpe_agent.h mainloop.h \
common_agent.h config_loader.h deployable.h init-dbus.h pcmk_pe.h \
schema.xml org/cloudpolicyengine/QmfPackage.cpp \
- org/cloudpolicyengine/QmfPackage.h
+ org/cloudpolicyengine/QmfPackage.h resource.h
qmfauto_path = org/cloudpolicyengine
qmfauto_c = $(qmfauto_path)/QmfPackage.cpp
@@ -50,13 +53,15 @@ cped_CPPFLAGS = $(libqb_CFLAGS) $(dbus_glib_1_CFLAGS) $(glib_CFLAGS)
$(qmf_CFLAG
cped_LDFLAGS = $(libqb_LIBS) $(dbus_glib_1_LIBS) $(qmf_LIBS) $(glib_LIBS)
-dped_SOURCES = pcmk_pe.c mainloop.cpp $(qmfauto_c) common_agent.cpp dpe_agent.cpp \
- deployable.cpp assembly.cpp config_loader.cpp
+dped_SOURCES = pcmk_pe.c mainloop.cpp $(qmfauto_c) common_agent.cpp \
+ deployable.cpp assembly.cpp config_loader.cpp dpe_agent.cpp \
+ resource.cpp
dped_CPPFLAGS = $(libqb_CFLAGS) $(glib_CFLAGS) $(qmf_CFLAGS) $(libxml2_CFLAGS) \
- -I$(qmfauto_path) $(pcmk_CFLAGS)
+ -I$(qmfauto_path) $(pcmk_CFLAGS) $(libxslt_CFLAGS) $(uuid_CFLAGS)
-dped_LDFLAGS = $(libqb_LIBS) $(qmf_LIBS) $(glib_LIBS) $(libxml2_LIBS) $(pcmk_LIBS)
+dped_LDFLAGS = $(libqb_LIBS) $(qmf_LIBS) $(glib_LIBS) $(libxml2_LIBS) \
+ $(pcmk_LIBS) $(libxslt_LIBS) $(uuid_LIBS)
cpe_tool2_SOURCES = cpe-tool2.c libinit.c
cpe_tool2_CPPFLAGS = $(dbus_glib_1_CFLAGS)
diff --git a/src/assembly.cpp b/src/assembly.cpp
index 4e58a1f..8191969 100644
--- a/src/assembly.cpp
+++ b/src/assembly.cpp
@@ -18,20 +18,20 @@
* You should have received a copy of the GNU General Public License
* along with pacemaker-cloud. If not, see <
http://www.gnu.org/licenses/>.
*/
-
#include "config.h"
-#include <string.h>
+
#include <qb/qblog.h>
+
#include <iostream>
#include <sstream>
#include <map>
-#include <assert.h>
+
#include "pcmk_pe.h"
#include "mainloop.h"
#include "assembly.h"
+#include "resource.h"
#include "deployable.h"
-
using namespace std;
using namespace qmf;
@@ -43,151 +43,96 @@ Assembly::op_remove_by_correlator(uint32_t correlator)
return op;
}
-gboolean
-Assembly::process_qmf_events(void)
+bool
+Assembly::process_qmf_events(ConsoleEvent &event)
{
- uint32_t rc = 0;
- ConsoleEvent event;
+ enum ocf_exitcode rc = OCF_OK;
bool got_event = false;
struct pe_operation *op;
+ Resource *rsc;
- if (state == Assembly::STATE_INIT) {
+ if (_state == Assembly::STATE_INIT) {
deref();
return FALSE;
}
- matahari_discover();
- while (session->nextEvent(event, qpid::messaging::Duration::IMMEDIATE)) {
- if (event.getType() == CONSOLE_EVENT) {
- uint32_t seq;
- uint32_t tstamp;
- const Data& event_data(event.getData(0));
+ if (event.getType() == CONSOLE_EVENT) {
+ const Data& event_data(event.getData(0));
- if (event_data.getSchemaId().getPackageName() != "org.matahariproject" ||
- event_data.getSchemaId().getName() != "heartbeat") {
- continue;
- }
+ if (event_data.getSchemaId().getName() == "heartbeat") {
+ uint32_t seq = event_data.getProperty("sequence");
+ uint32_t tstamp = event_data.getProperty("timestamp");
- tstamp = event_data.getProperty("timestamp");
- seq = event_data.getProperty("sequence");
heartbeat_recv(tstamp, seq);
got_event = true;
+ }
- } else if (event.getType() == CONSOLE_AGENT_DEL) {
- if (event.getAgentDelReason() == AGENT_DEL_AGED) {
- qb_log(LOG_NOTICE, "CONSOLE_AGENT_DEL (aged) %s",
- event.getAgent().getName().c_str());
- } else {
- qb_log(LOG_NOTICE, "CONSOLE_AGENT_DEL (filtered) %s",
- event.getAgent().getName().c_str());
- }
- _dead_agents.remove(event.getAgent().getName());
+ } else if (event.getType() == CONSOLE_AGENT_DEL) {
+ if (event.getAgentDelReason() == AGENT_DEL_AGED) {
+ qb_log(LOG_NOTICE, "CONSOLE_AGENT_DEL (aged) %s",
+ event.getAgent().getName().c_str());
+ } else {
+ qb_log(LOG_NOTICE, "CONSOLE_AGENT_DEL (filtered) %s",
+ event.getAgent().getName().c_str());
+ }
+ _dead_agents.remove(event.getAgent().getName());
- } else if (event.getType() == CONSOLE_METHOD_RESPONSE) {
- qpid::types::Variant::Map my_map = event.getArguments();
- op = op_remove_by_correlator(event.getCorrelator());
- rc = pe_resource_ocf_exitcode_get(op, my_map["rc"].asUint32());
- qb_log(LOG_INFO, "%s'ing: %s [%s:%s] on %s (interval:%d ms) result:%d",
+ } else if (event.getType() == CONSOLE_METHOD_RESPONSE) {
+ qpid::types::Variant::Map my_map = event.getArguments();
+ op = op_remove_by_correlator(event.getCorrelator());
+ rsc = (Resource *)op->resource;
+ rc = pe_resource_ocf_exitcode_get(op, my_map["rc"].asUint32());
+ op->times_executed++;
+
+ rsc->completed(op, rc);
+ // remove the ref for putting the op in the map
+ pe_resource_unref(op);
+ } else if (event.getType() == CONSOLE_EXCEPTION) {
+ rc = OCF_UNKNOWN_ERROR;
+ op = op_remove_by_correlator(event.getCorrelator());
+ rsc = (Resource *)op->resource;
+
+ if (event.getDataCount() >= 1) {
+ string error(event.getData(0).getProperty("error_text"));
+ qb_log(LOG_ERR, "%s'ing: %s [%s:%s] on %s (interval:%d ms) result:%s",
+ op->method, op->rname, op->rclass, op->rtype, op->hostname,
+ op->interval, error.c_str());
+ } else {
+ qb_log(LOG_ERR, "%s'ing: %s [%s:%s] on %s (interval:%d ms) result:%d",
op->method, op->rname, op->rclass, op->rtype, op->hostname,
op->interval, rc);
- if (op->interval == 0) {
- if (strcmp(op->method, "start") == 0 &&
- rc == OCF_OK) {
- string rname = op->rtype;
- string running = "running";
- string reason = "started OK";
- _dep->service_state_changed(this, rname, running, reason);
- }
-
- pe_resource_completed(op, rc);
- pe_resource_unref(op); // delete
- } else if (op->action != NULL) {
- pe_resource_completed(op, rc);
- op->action = NULL;
- op->graph = NULL;
- } else if (rc != op->target_outcome) {
-
- resource_failed(op);
-
- // delete request - timer will delete
- // this is only for repeats
- pe_resource_unref(op);
- }
- // remove the ref for putting the op in the map
- pe_resource_unref(op);
- } else if (event.getType() == CONSOLE_EXCEPTION) {
- rc = OCF_UNKNOWN_ERROR;
- op = op_remove_by_correlator(event.getCorrelator());
-
- if (event.getDataCount() >= 1) {
- string error(event.getData(0).getProperty("error_text"));
- qb_log(LOG_ERR, "%s'ing: %s [%s:%s] on %s (interval:%d ms) result:%s",
- op->method, op->rname, op->rclass, op->rtype, op->hostname,
- op->interval, error.c_str());
- } else {
- qb_log(LOG_ERR, "%s'ing: %s [%s:%s] on %s (interval:%d ms) result:%d",
- op->method, op->rname, op->rclass, op->rtype, op->hostname,
- op->interval, rc);
- }
- pe_resource_completed(op, rc);
- // remove the ref for putting the op in the map
- pe_resource_unref(op);
}
+ rsc->completed(op, rc);
}
check_state();
return TRUE;
}
-static void
-_poll_for_qmf_events(gpointer data)
-{
- Assembly *a = (Assembly *)data;
- qb_loop_timer_handle timer_handle;
-
- if (a->process_qmf_events()) {
- mainloop_timer_add(1000, data,
- _poll_for_qmf_events,
- &timer_handle);
- }
-}
-
void
-Assembly::matahari_discover(void)
+Assembly::matahari_discover(ConsoleSession *session)
{
- Agent a;
- ConsoleEvent ce;
int32_t ai;
int32_t ac;
int32_t q;
bool dont_use;
- // only search for agents if we have a heartbeat
- if (state != Assembly::STATE_OFFLINE) {
- return;
- }
- if (_mh_serv_class_found && _mh_rsc_class_found) {
- return;
- }
- if (hb_state != HEARTBEAT_OK) {
+ if (_mh_serv_class_found && _mh_rsc_class_found && _mh_host_class_found)
{
return;
}
string common = "package:org.matahariproject";
// common += ", where:[eq, uuid, [quote, " + string(_uuid) + "]]";
+ //common += ", where:[eq, hostname, [quote, " + _name + "]]";
common += "}";
- qb_enter();
-
ac = session->getAgentCount();
- qb_log(LOG_DEBUG, "session has %d agents", ac);
+ qb_log(LOG_TRACE, "session has %d agents", ac);
for (ai = 0; ai < ac; ai++) {
+ ConsoleEvent ce;
+ Agent a = session->getAgent(ai);
+
dont_use = false;
- a = session->getAgent(ai);
- if (a.getVendor() != "matahariproject.org" ||
- a.getProduct() != "service") {
- continue;
- }
- qb_log(LOG_INFO, "looking at agent %s", a.getName().c_str());
+ qb_log(LOG_TRACE, "looking at agent %s", a.getName().c_str());
for (list<string>::iterator it = _dead_agents.begin();
it != _dead_agents.end(); ++it) {
if (*it == a.getName()) {
@@ -196,216 +141,252 @@ Assembly::matahari_discover(void)
}
}
if (dont_use) {
- qb_log(LOG_INFO, "*** ignoring dead agent %s",
+ qb_log(LOG_DEBUG, "ignoring dead agent %s",
a.getName().c_str());
continue;
}
- if (!_mh_serv_class_found) {
+ if (!_mh_host_class_found) {
+ ce = a.query("{class:Host, " + common);
+ for (q = 0; q < ce.getDataCount(); q++) {
+ string hostname = ce.getData(q).getProperty("hostname");
+ qb_log(LOG_TRACE, "found Host class %d of %d hostname %s (looking for
%s)",
+ q, ce.getDataCount(), hostname.c_str(),
+ _name.c_str());
+ if (hostname == _name) {
+ string agent_name = ce.getAgent().getName();
+ qb_log(LOG_INFO, "choosing agent %s", a.getName().c_str());
+ _mh_host_class_found = true;
+ _mh_host_class = ce.getData(q);
+
+ _dep->map_agents_ass(agent_name, this);
+ break;
+ }
+ }
+ }
+ if (!_mh_serv_class_found && _hb_state == HEARTBEAT_OK) {
ce = a.query("{class:Services, " + common);
for (q = 0; q < ce.getDataCount(); q++) {
- qb_log(LOG_INFO, "WOOT found Services class %d of %d",
- q, ce.getDataCount());
- if (q == 0) {
+ string hostname = ce.getData(q).getProperty("hostname");
+ qb_log(LOG_TRACE, "found Servicest class %d of %d hostname %s (looking for
%s)",
+ q, ce.getDataCount(), hostname.c_str(),
+ _name.c_str());
+ if (hostname == _name) {
+ string agent_name = ce.getAgent().getName();
qb_log(LOG_INFO, "choosing agent %s", a.getName().c_str());
_mh_serv_class_found = true;
- _mh_serv_class = ce.getData(0);
+ _mh_serv_class = ce.getData(q);
+
+ _dep->map_agents_ass(agent_name, this);
+ break;
}
}
}
- if (!_mh_rsc_class_found) {
+ if (!_mh_rsc_class_found && _hb_state == HEARTBEAT_OK) {
ce = a.query("{class:Resources, " + common);
- if (ce.getDataCount() >= 1) {
- _mh_rsc_class_found = true;
- _mh_rsc_class = ce.getData(0);
- qb_log(LOG_DEBUG, "WOOT found Resources class");
+ for (q = 0; q < ce.getDataCount(); q++) {
+ string hostname = ce.getData(q).getProperty("hostname");
+ qb_log(LOG_TRACE, "found Resources class %d of %d hostname %s (looking for
%s)",
+ q, ce.getDataCount(), hostname.c_str(),
+ _name.c_str());
+ if (hostname == _name) {
+ string agent_name = ce.getAgent().getName();
+ _mh_rsc_class_found = true;
+ _mh_rsc_class = ce.getData(q);
+ qb_log(LOG_TRACE, "found Resources class");
+
+ _dep->map_agents_ass(agent_name, this);
+ break;
+ }
}
}
- if (_mh_serv_class_found && _mh_rsc_class_found) {
+ if (_mh_serv_class_found && _mh_rsc_class_found &&
_mh_host_class_found) {
break;
}
}
- qb_leave();
}
-static void
-resource_interval_timeout(gpointer data)
+void
+Assembly::resource_execute(struct pe_operation *op, std::string method,
+ qpid::types::Variant::Map in_args)
{
- struct pe_operation *op = (struct pe_operation *)data;
- Assembly *a = (Assembly *)op->user_data;
- qb_loop_timer_handle timer_handle;
-
- qb_enter();
- if (op->refcount == 1) {
- // we are the last ref holder
- pe_resource_unref(op);
- return;
- }
+ uint32_t correlation_id;
+ Agent a = _mh_serv_class.getAgent();
- a->_resource_execute(op);
+ correlation_id = a.callMethodAsync(method, in_args, _mh_serv_class.getAddr());
+ qb_log(LOG_DEBUG, "callMethodAsync: correlation id = %d", correlation_id);
+ _ops[correlation_id] = op;
qb_leave();
-
- mainloop_timer_add(op->interval, data,
- resource_interval_timeout,
- &timer_handle);
}
void
-Assembly::resource_failed(struct pe_operation *op)
+Assembly::deref(void)
{
- string rname = op->rtype;
- string running = "failed";
- string reason = "monitor failed";
- qb_log(LOG_NOTICE, "resourse %s:%s:%s FAILED",
- _name.c_str(), op->rname, op->rtype);
-
- _dep->service_state_changed(this, rname, running, reason);
+ _refcount--;
+ if (_refcount == 0) {
+ delete this;
+ }
}
-void
-Assembly::resource_execute(struct pe_operation *op)
+static void
+xml_new_int_prop(xmlNode *n, const char *name, int32_t val)
{
- qb_loop_timer_handle timer_handle;
-
- qb_enter();
- if (state != STATE_ONLINE) {
- qb_log(LOG_DEBUG, "can't execute resourse in offline state");
- if (op->interval > 0 && strcmp(op->method, "monitor") == 0)
{
- pe_resource_completed(op, OCF_UNKNOWN_ERROR);
- }
- pe_resource_unref(op); // delete
- return;
- }
-
- if (op->interval > 0 && strcmp(op->method, "monitor") == 0)
{
- op->user_data = this;
- pe_resource_ref(op);
- mainloop_timer_add(op->interval,
- op, resource_interval_timeout,
- &timer_handle);
- } else {
- _resource_execute(op);
- }
- qb_leave();
+ char int_str[36];
+ snprintf(int_str, 36, "%d", val);
+ xmlNewProp(n, BAD_CAST name, BAD_CAST int_str);
}
-void
-Assembly::_resource_execute(struct pe_operation *op)
+static void
+xml_new_time_prop(xmlNode *n, const char *name, time_t val)
{
- Agent a;
- qpid::types::Variant::Map in_args;
- qpid::types::Variant::Map in_params;
- const char *rmethod = op->method;
-
- qb_enter();
-
- if (state != Assembly::STATE_ONLINE) {
- qb_log(LOG_DEBUG, "can't execute resourse in offline state");
- pe_resource_completed(op, OCF_UNKNOWN_ERROR);
- pe_resource_unref(op); // delete
- qb_leave();
- return;
- }
-
- if (strcmp(op->method, "monitor") == 0) {
- in_args["interval"] = 0;
- }
- if (op->timeout < 20000) {
- in_args["timeout"] = 20000;
- } else {
- in_args["timeout"] = op->timeout;
- }
-
- if (strcmp(op->rclass, "lsb") == 0) {
- uint32_t corralation_id;
- a = _mh_serv_class.getAgent();
- if (strcmp(op->method, "monitor") == 0) {
- rmethod = "status";
- }
- in_args["name"] = op->rtype;
- pe_resource_ref(op);
- corralation_id = a.callMethodAsync(rmethod, in_args, _mh_serv_class.getAddr());
- qb_log(LOG_DEBUG, "callMethodAsync: %d", corralation_id);
- _ops[corralation_id] = op;
- } else {
- uint32_t corralation_id;
- a = _mh_rsc_class.getAgent();
- // make a non-empty parameters map
- in_params["qmf"] = "frustrating";
-
- in_args["name"] = op->rname;
- in_args["class"] = op->rclass;
- in_args["provider"] = op->rprovider;
- in_args["type"] = op->rtype;
- in_args["parameters"] = in_params;
- pe_resource_ref(op);
- corralation_id = a.callMethodAsync(rmethod, in_args, _mh_rsc_class.getAddr());
- _ops[corralation_id] = op;
- }
- qb_leave();
+ char int_str[36];
+ snprintf(int_str, 36, "%d", val);
+ xmlNewProp(n, BAD_CAST name, BAD_CAST int_str);
}
+/*
+ * id Identifier for the job constructed from the resource id, operation and interval.
+ * call-id The job's ticket number. Used as a sort key to determine the order in
which the jobs were executed.
+ * operation The action the resource agent was invoked with.
+ * interval The frequency, in milliseconds, at which the operation will be repeated. 0
indicates a one-off job.
+ * op-status The job's status. Generally this will be either 0 (done) or -1
(pending). Rarely used in favor of rc-code.
+ * rc-code The job's result.
+ * last-run Diagnostic indicator. Machine local date/time, in seconds since epoch,
+ * at which the job was executed.
+ * last-rc-change Diagnostic indicator. Machine local date/time, in seconds since epoch,
+ * at which the job first returned the current value of rc-code
+ * exec-time Diagnostic indicator. Time, in seconds, that the job was running for
+ * queue-time Diagnostic indicator. Time, in seconds, that the job was queued for in the
LRMd
+ * crm_feature_set The version which this job description conforms to. Used when
processing op-digest
+ * transition-key A concatenation of the job's graph action number, the graph
number,
+ * the expected result and the UUID of the crmd instance that scheduled it.
+ * This is used to construct transition-magic (below).
+ * transition-magic A concatenation of the job's op-status, rc-code and
transition-key.
+ * Guaranteed to be unique for the life of the cluster (which ensures it is part of
CIB update notifications)
+ * and contains all the information needed for the crmd to correctly analyze and
process the completed job.
+ * Most importantly, the decomposed elements tell the crmd if the job entry was
expected and whether it failed.
+ * op-digest An MD5 sum representing the parameters passed to the job.
+ * Used to detect changes to the configuration and restart resources if necessary.
+ * crm-debug-origin Diagnostic indicator. The origin of the current values.
+ */
+/* <lrm_resource id="pingd:0" type="pingd" class="ocf"
provider="pacemaker">
+ * <lrm_rsc_op
+ * id="pingd:0_monitor_30000"
+ * operation="monitor"
+ * call-id="34"
+ * rc-code="0"
+ * interval="30000"
+ * crm-debug-origin="do_update_resource"
+ * crm_feature_set="3.0.1"
+ * op-digest="a0f8398dac7ced82320fe99fd20fbd2f"
+ * transition-key="10:11:0:2668bbeb-06d5-40f9-936d-24cb7f87006a"
+ * transition-magic="0:0;10:11:0:2668bbeb-06d5-40f9-936d-24cb7f87006a"
+ * last-run="1239009741"
+ * last-rc-change="1239009741"
+ * exec-time="10"
+ * queue-time="0"/>
+ */
void
-Assembly::deref(void)
+Assembly::insert_op_history(xmlNode *rsc, struct operation_history *oh)
{
- refcount--;
- if (refcount == 0) {
- delete this;
- }
+ xmlNode *op;
+ char key[255];
+ char magic[255];
+
+ op = xmlNewChild(rsc, NULL, BAD_CAST "lrm_rsc_op", NULL);
+
+ xmlNewProp(op, BAD_CAST "id", BAD_CAST oh->rsc_id->c_str());
+ xmlNewProp(op, BAD_CAST "operation", BAD_CAST oh->operation->c_str());
+ xml_new_int_prop(op, "call-id", oh->call_id);
+ xml_new_int_prop(op, "rc-code", oh->rc);
+ xml_new_int_prop(op, "interval", oh->interval);
+ xml_new_time_prop(op, "last-run", oh->last_run);
+ xml_new_time_prop(op, "last-rc-change", oh->last_rc_change);
+
+ snprintf(key, 255, "%d:%d:%d:%s",
+ oh->action_id, oh->graph_id, oh->target_outcome,
+ _dep->crmd_uuid_get().c_str());
+ xmlNewProp(op, BAD_CAST "transition-key", BAD_CAST key);
+
+ snprintf(magic, 255, "0:%d:%s", oh->rc, key);
+ xmlNewProp(op, BAD_CAST "transition-magic", BAD_CAST magic);
+
+ xmlNewProp(op, BAD_CAST "op-digest", BAD_CAST oh->op_digest);
+ xmlNewProp(op, BAD_CAST "crm-debug-origin", BAD_CAST __func__);
+ xmlNewProp(op, BAD_CAST "crm_feature_set", BAD_CAST PE_CRM_VERSION);
+ xmlNewProp(op, BAD_CAST "op-status", BAD_CAST "0");
+ xmlNewProp(op, BAD_CAST "exec-time", BAD_CAST "0");
+ xmlNewProp(op, BAD_CAST "queue-time", BAD_CAST "0");
}
void
Assembly::insert_status(xmlNode *status)
{
xmlNode *node_state = xmlNewChild(status, NULL, BAD_CAST "node_state", NULL);
+ Resource *r = NULL;
qb_enter();
- xmlNewProp(node_state, BAD_CAST "id", BAD_CAST _name.c_str());
+ xmlNewProp(node_state, BAD_CAST "id", BAD_CAST _uuid.c_str());
xmlNewProp(node_state, BAD_CAST "uname", BAD_CAST _name.c_str());
xmlNewProp(node_state, BAD_CAST "ha", BAD_CAST "active");
xmlNewProp(node_state, BAD_CAST "expected", BAD_CAST "member");
xmlNewProp(node_state, BAD_CAST "in_ccm", BAD_CAST "true");
+ xmlNewProp(node_state, BAD_CAST "crmd", BAD_CAST "online");
- if (state == STATE_ONLINE) {
- xmlNewProp(node_state, BAD_CAST "crmd", BAD_CAST "online");
+ if (_state == STATE_ONLINE) {
xmlNewProp(node_state, BAD_CAST "join", BAD_CAST "member");
} else {
- xmlNewProp(node_state, BAD_CAST "crmd", BAD_CAST "offline");
- xmlNewProp(node_state, BAD_CAST "join", BAD_CAST "down");
+ xmlNewProp(node_state, BAD_CAST "join", BAD_CAST "pending");
+ }
+ if (op_history.size() > 0) {
+ xmlNode *lrm = xmlNewChild(node_state, NULL, BAD_CAST "lrm", NULL);
+ xmlNode *rscs = xmlNewChild(lrm, NULL, BAD_CAST "lrm_resources", NULL);
+
+ for (map<string, struct operation_history*>::iterator iter = op_history.begin();
+ iter != op_history.end(); iter++) {
+ struct operation_history *oh = iter->second;
+ xmlNode *rsc = NULL;
+ if (r != oh->resource) {
+ r = (Resource*)oh->resource;
+ rsc = r->insert_status(rscs);
+ }
+ insert_op_history(rsc, oh);
+ }
}
+
qb_leave();
}
void
Assembly::check_state(void)
{
- uint32_t new_state = (this->*state_table[state])();
+ uint32_t new_state = (this->*state_table[_state])();
- if (state_action_table[state][new_state]) {
- (this->*state_action_table[state][new_state])();
+ if (state_action_table[_state][new_state]) {
+ (this->*state_action_table[_state][new_state])();
}
- if (state != new_state) {
- state = new_state;
+ if (_state != new_state) {
+ _state = new_state;
}
}
uint32_t
Assembly::check_state_online(void)
{
- uint32_t new_state = state;
+ uint32_t new_state = _state;
gdouble elapsed = 0;
- if (hb_state == HEARTBEAT_OK) {
+ if (_hb_state == HEARTBEAT_OK) {
elapsed = g_timer_elapsed(_last_heartbeat, NULL);
if (elapsed > (5 * 1.5)) {
- hb_state = Assembly::HEARTBEAT_NOT_RECEIVED;
+ _hb_state = Assembly::HEARTBEAT_NOT_RECEIVED;
qb_log(LOG_WARNING,
- "assembly heartbeat too late! (%.2f > 5 seconds)",
- elapsed);
+ "assembly (%s) heartbeat too late! (%.2f > 5 seconds)",
+ _name.c_str(), elapsed);
}
}
- if (hb_state != HEARTBEAT_OK) {
+ if (_hb_state != HEARTBEAT_OK) {
new_state = STATE_OFFLINE;
}
return new_state;
@@ -414,9 +395,9 @@ Assembly::check_state_online(void)
uint32_t
Assembly::check_state_offline(void)
{
- uint32_t new_state = state;
- if (hb_state == HEARTBEAT_OK &&
- _mh_serv_class_found &&
+ uint32_t new_state = _state;
+ if (_hb_state == HEARTBEAT_OK &&
+ _mh_serv_class_found && _mh_host_class_found &&
_mh_rsc_class_found) {
new_state = STATE_ONLINE;
}
@@ -431,14 +412,17 @@ Assembly::state_offline_to_online(void)
_dep->assembly_state_changed(this, "running", "All good");
}
+
void
Assembly::state_online_to_offline(void)
{
map<uint32_t, struct pe_operation*>::iterator iter;
struct pe_operation *op;
+ Resource *rsc = (Resource *)op->resource;
_dead_agents.push_back(_mh_serv_class.getAgent().getName());
_dead_agents.push_back(_mh_rsc_class.getAgent().getName());
+ _dead_agents.push_back(_mh_host_class.getAgent().getName());
// TODO we need a timer -
// the same as the qmf agent ageing one to
// remove these agents from the _dead_agents list
@@ -446,13 +430,11 @@ Assembly::state_online_to_offline(void)
// be the same)
_mh_serv_class_found = false;
_mh_rsc_class_found = false;
+ _mh_host_class_found = false;
for (iter = _ops.begin(); iter != _ops.end(); iter++) {
op = iter->second;
- if (op->interval == 0) {
- pe_resource_completed(op, OCF_UNKNOWN_ERROR);
- }
- pe_resource_unref(op); // delete
+ rsc->completed(op, OCF_UNKNOWN_ERROR);
}
qb_log(LOG_NOTICE, "Assembly (%s) STATE_OFFLINE.",
@@ -465,17 +447,16 @@ Assembly::heartbeat_recv(uint32_t timestamp, uint32_t sequence)
{
gdouble elapsed = 0;
- qb_enter();
- if (hb_state != Assembly::HEARTBEAT_OK) {
+ if (_hb_state != Assembly::HEARTBEAT_OK) {
_last_sequence = sequence;
- hb_state = Assembly::HEARTBEAT_OK;
+ _hb_state = Assembly::HEARTBEAT_OK;
qb_log(LOG_INFO, "Got the first heartbeat.");
g_timer_stop(_last_heartbeat);
g_timer_start(_last_heartbeat);
return;
}
if (sequence > (_last_sequence + 1)) {
- hb_state = Assembly::HEARTBEAT_SEQ_BAD;
+ _hb_state = Assembly::HEARTBEAT_SEQ_BAD;
qb_log(LOG_WARNING, "assembly heartbeat missed a sequence!");
return;
@@ -485,50 +466,51 @@ Assembly::heartbeat_recv(uint32_t timestamp, uint32_t sequence)
g_timer_stop(_last_heartbeat);
elapsed = g_timer_elapsed(_last_heartbeat, NULL);
if (elapsed > (5 * 1.5)) {
- hb_state = Assembly::HEARTBEAT_NOT_RECEIVED;
+ _hb_state = Assembly::HEARTBEAT_NOT_RECEIVED;
qb_log(LOG_WARNING, "assembly heartbeat too late! (%.2f > 5 seconds)",
elapsed);
return;
}
g_timer_start(_last_heartbeat);
- qb_leave();
}
void
Assembly::stop(void)
{
- if (state > STATE_INIT) {
- session->close();
- connection->close();
- state = STATE_INIT;
+ if (_state > STATE_INIT) {
+ _state = STATE_INIT;
}
deref();
}
Assembly::Assembly() :
- _mh_serv_class_found(false), _mh_rsc_class_found(false),
- hb_state(HEARTBEAT_INIT), state(STATE_INIT), refcount(1),
- session(NULL), connection(NULL), _name("")
+ _mh_serv_class_found(false), _mh_rsc_class_found(false), _mh_host_class_found(false),
+ _hb_state(HEARTBEAT_INIT), _refcount(1), _dep(NULL),
+ _name(""), _uuid(""), _ipaddr(""), _state(STATE_OFFLINE)
{
+ state_table[STATE_OFFLINE] = &Assembly::check_state_offline;
+ state_table[STATE_ONLINE] = &Assembly::check_state_online;
+ state_table[STATE_INIT] = NULL;
+
+ state_action_table[STATE_OFFLINE][STATE_ONLINE] =
&Assembly::state_offline_to_online;
+ state_action_table[STATE_OFFLINE][STATE_OFFLINE] = NULL;
+ state_action_table[STATE_ONLINE][STATE_OFFLINE] =
&Assembly::state_online_to_offline;
+ state_action_table[STATE_ONLINE][STATE_ONLINE] = NULL;
+
+ _last_heartbeat = g_timer_new();
}
Assembly::~Assembly()
{
qb_log(LOG_DEBUG, "~Assembly(%s)", _name.c_str());
- if (state > STATE_INIT) {
- session->close();
- connection->close();
- }
}
Assembly::Assembly(Deployable *dep, std::string& name,
std::string& uuid, std::string& ipaddr) :
- _mh_serv_class_found(false), _mh_rsc_class_found(false),
- hb_state(HEARTBEAT_INIT), refcount(1), _dep(dep),
- _name(name), _uuid(uuid), _ipaddr(ipaddr), state(STATE_OFFLINE)
+ _mh_serv_class_found(false), _mh_rsc_class_found(false), _mh_host_class_found(false),
+ _hb_state(HEARTBEAT_INIT), _refcount(1), _dep(dep),
+ _name(name), _uuid(uuid), _ipaddr(ipaddr), _state(STATE_OFFLINE)
{
- qb_loop_timer_handle timer_handle;
- string url("localhost:49000");
state_table[STATE_OFFLINE] = &Assembly::check_state_offline;
state_table[STATE_ONLINE] = &Assembly::check_state_online;
@@ -541,19 +523,6 @@ Assembly::Assembly(Deployable *dep, std::string& name,
qb_log(LOG_INFO, "Assembly(%s:%s)", name.c_str(), ipaddr.c_str());
- connection = new qpid::messaging::Connection(url, connectionOptions);;
- connection->open();
-
- session = new ConsoleSession(*connection, "max-agent-age:1");
- session->open();
-
- qb_log(LOG_INFO, "Assembly(%s:%s) session open", name.c_str(),
ipaddr.c_str());
-
_last_heartbeat = g_timer_new();
- mainloop_timer_add(1000, this,
- _poll_for_qmf_events,
- &timer_handle);
- refcount++;
+ _refcount++;
}
-
-
diff --git a/src/assembly.h b/src/assembly.h
index 9fd4a88..a26b358 100644
--- a/src/assembly.h
+++ b/src/assembly.h
@@ -18,15 +18,11 @@
* You should have received a copy of the GNU General Public License
* along with pacemaker-cloud. If not, see <
http://www.gnu.org/licenses/>.
*/
+#include <string>
-#include <qpid/messaging/Connection.h>
-#include <qpid/messaging/Duration.h>
#include <qmf/ConsoleSession.h>
#include <qmf/ConsoleEvent.h>
#include <qmf/Data.h>
-#include <qpid/types/Variant.h>
-#include <libxml/parser.h>
-#include <string>
class Deployable;
@@ -40,15 +36,17 @@ private:
typedef uint32_t (Assembly::*fsm_state_fn)(void);
typedef void (Assembly::*fsm_action_fn)(void);
- std::string connectionOptions;
- qmf::ConsoleSession *session;
qmf::Data _mh_serv_class;
bool _mh_serv_class_found;
+
qmf::Data _mh_rsc_class;
bool _mh_rsc_class_found;
- qpid::messaging::Connection *connection;
- uint32_t hb_state;
- uint32_t state;
+
+ qmf::Data _mh_host_class;
+ bool _mh_host_class_found;
+
+ uint32_t _hb_state;
+ uint32_t _state;
static const uint32_t NUM_STATES = 3;
fsm_state_fn state_table[NUM_STATES];
fsm_action_fn state_action_table[NUM_STATES][NUM_STATES];
@@ -59,7 +57,7 @@ private:
std::string _name;
std::string _uuid;
std::string _ipaddr;
- int refcount;
+ int _refcount;
std::map<uint32_t, struct pe_operation*> _ops;
std::list<std::string> _dead_agents;
@@ -72,32 +70,31 @@ private:
void heartbeat_recv(uint32_t timestamp, uint32_t sequence);
void check_state(void);
- void matahari_discover(void);
- void resource_failed(struct pe_operation *op);
void deref(void);
+ void insert_op_history(xmlNode *rsc, struct operation_history *oh);
public:
static const uint32_t STATE_INIT = 0;
static const uint32_t STATE_OFFLINE = 1;
static const uint32_t STATE_ONLINE = 2;
+ std::map<std::string, struct operation_history*> op_history;
Assembly();
Assembly(Deployable *dep, std::string& name,
std::string& uuid, std::string& ipaddr);
~Assembly();
+ void matahari_discover(qmf::ConsoleSession *session);
void stop(void);
- uint32_t state_get(void) { return this->state; };
- std::string name_get(void) { return this->_name; };
+ uint32_t state_get(void) { return _state; };
+ const std::string& name_get(void) const { return _name; }
+ const std::string& uuid_get(void) const { return _uuid; }
void insert_status(xmlNode *status);
- void resource_execute(struct pe_operation *op);
- void _resource_execute(struct pe_operation *op);
+ void resource_execute(struct pe_operation *op, std::string method,
qpid::types::Variant::Map args);
struct pe_operation * op_remove_by_correlator(uint32_t correlator);
- gboolean process_qmf_events(void);
+ bool process_qmf_events(qmf::ConsoleEvent &event);
};
-
-
diff --git a/src/cf2pe.xsl b/src/cf2pe.xsl
new file mode 100644
index 0000000..581fab5
--- /dev/null
+++ b/src/cf2pe.xsl
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!-- Author: Angus Salkeld -->
+<!-- -->
+<!-- This is to convert a deployable configuration from Aeolus into the -->
+<!-- configuration needed by pacemaker -->
+<!-- -->
+<xsl:stylesheet version="1.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
+<xsl:output method="xml" indent="yes" encoding="utf-8"
omit-xml-declaration="no"/>
+<xsl:template match="/deployable">
+<cib admin_epoch="1" epoch="1" num_updates="1"
have-quorum="1" dc-uuid="0"
validate-with="pacemaker-1.2">
+ <configuration>
+ <crm_config>
+ <cluster_property_set id="bootstrap-options">
+ <!--
+ <nvpair id="opt-no-probes"
name="enable-startup-probes" value="false"/>
+ <nvpair id="opt-stop-rsc"
name="stop-all-resources" value="true"/>
+ <nvpair id="opt-stop-orphans" name="stop-orphan-resources"
value="true"/>
+ <nvpair id="opt-honey"
name="default-resource-stickiness" value="0"/>
+ <nvpair id="opt-cluster-delay" name="cluster-delay"
value="5s"/>
+
+ -->
+ <nvpair id="opt-startup-fencing" name="startup-fencing"
value="false"/>
+ <nvpair id="opt-health-strategy"
name="node-health-strategy" value="none"/>
+ <nvpair id="opt-no-start-failure"
name="start-failure-is-fatal" value="false"/>
+ <nvpair id="opt-not-symmetric"
name="symmetric-cluster" value="false"/>
+ <nvpair id="opt-stonith-disabled" name="stonith-enabled"
value="false"/>
+ <nvpair id="opt-no-quorum-policy"
name="no-quorum-policy" value="ignore"/>
+ </cluster_property_set>
+ </crm_config>
+ <rsc_defaults>
+ <meta_attributes id="opt1">
+ <nvpair id="rsc-default-2" name="is-managed-default"
value="true"/>
+ </meta_attributes>
+ <meta_attributes id="opt2">
+ <nvpair id="rsc-default-3" name="multiple-active"
value="stop_start"/>
+ </meta_attributes>
+ </rsc_defaults>
+ <nodes>
+<xsl:for-each select="assemblies/assembly">
+ <node>
+<xsl:attribute name="id"><xsl:value-of
select="@uuid"/></xsl:attribute>
+<xsl:attribute name="uname"><xsl:value-of
select="@name"/></xsl:attribute>
+<xsl:attribute name="type">normal</xsl:attribute>
+ </node>
+</xsl:for-each>
+ </nodes>
+ <resources>
+<xsl:for-each select="assemblies/assembly">
+<xsl:variable name="ass_name" select="@name"/>
+<xsl:for-each select="services/service">
+ <primitive>
+<xsl:attribute name="id">rsc_<xsl:value-of
select="$ass_name"/>_<xsl:value-of
select="@name"/></xsl:attribute>
+<xsl:attribute name="class">lsb</xsl:attribute>
+<xsl:attribute name="type"><xsl:value-of
select="@name"/></xsl:attribute>
+ <operations>
+ <op>
+<xsl:attribute name="id">monitor_<xsl:value-of
select="$ass_name"/>_<xsl:value-of
select="@name"/></xsl:attribute>
+<xsl:attribute name="name">monitor</xsl:attribute>
+<!--<xsl:attribute
name="requires">nothing</xsl:attribute>-->
+<!--<xsl:attribute name="on-fail">restart</xsl:attribute>-->
+<xsl:attribute name="interval"><xsl:value-of
select="@monitor_interval"/></xsl:attribute>
+ </op>
+ </operations>
+ </primitive>
+</xsl:for-each>
+</xsl:for-each>
+ </resources>
+ <constraints>
+<xsl:for-each select="assemblies/assembly">
+<xsl:variable name="ass_name" select="@name"/>
+<xsl:for-each select="services/service">
+ <rsc_location>
+<xsl:attribute name="id">loc_<xsl:value-of
select="$ass_name"/>_<xsl:value-of
select="@name"/></xsl:attribute>
+<xsl:attribute name="rsc">rsc_<xsl:value-of
select="$ass_name"/>_<xsl:value-of
select="@name"/></xsl:attribute>
+<xsl:attribute name="score">INFINITY</xsl:attribute>
+<xsl:attribute name="node"><xsl:value-of
select="$ass_name"/></xsl:attribute>
+ </rsc_location>
+</xsl:for-each>
+</xsl:for-each>
+ </constraints>
+ </configuration>
+</cib>
+</xsl:template>
+</xsl:stylesheet>
diff --git a/src/common_agent.cpp b/src/common_agent.cpp
index 6a16cfc..0a500bf 100644
--- a/src/common_agent.cpp
+++ b/src/common_agent.cpp
@@ -151,17 +151,19 @@ my_glib_handler(const gchar *log_domain, GLogLevelFlags flags, const
gchar *mess
qb_log_from_external_source(__FUNCTION__, __FILE__, "%s",
log_level, __LINE__,
- 1 << 1, message);
+ 2, message);
}
static const char *my_tags_stringify(uint32_t tags)
{
if (qb_bit_is_set(tags, QB_LOG_TAG_LIBQB_MSG_BIT)) {
- return "libqb ";
- } else if (qb_bit_is_set(tags, 0)) {
- return "qpid ";
- } else if (qb_bit_is_set(tags, 1)) {
- return "glib ";
+ return "QB ";
+ } else if (tags == 1) {
+ return "QPID ";
+ } else if (tags == 2) {
+ return "GLIB ";
+ } else if (tags == 3) {
+ return "PCMK ";
} else {
return "MAIN ";
}
@@ -198,7 +200,7 @@ CommonAgent::init(int argc, char **argv, const char *proc_name)
log_selector.enable(error);
log_selector.enable(warning);
log_selector.enable(info);
- log_selector.enable(debug);
+// log_selector.enable(debug);
l.select(log_selector);
out = new LibqbLogger(l);
diff --git a/src/deployable.cpp b/src/deployable.cpp
index 644a4ad..94806fc 100644
--- a/src/deployable.cpp
+++ b/src/deployable.cpp
@@ -18,14 +18,15 @@
* You should have received a copy of the GNU General Public License
* along with pacemaker-cloud. If not, see <
http://www.gnu.org/licenses/>.
*/
-#include <stdio.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <string.h>
-#include <glib.h>
+#include "config.h"
+
+#include <qb/qblog.h>
+#include <uuid/uuid.h>
+
#include <libxml/parser.h>
+#include <libxslt/transform.h>
+
#include "pcmk_pe.h"
-#include <qb/qblog.h>
#include <string>
#include <map>
@@ -34,15 +35,99 @@
#include "config_loader.h"
#include "deployable.h"
#include "assembly.h"
+#include "resource.h"
using namespace std;
+bool
+Deployable::process_qmf_events(void)
+{
+ uint32_t rc = 0;
+ ConsoleEvent event;
+ Assembly *a;
+ map<string, Assembly*>::iterator iter;
+
+ for (iter = _assemblies.begin(); iter != _assemblies.end(); iter++) {
+ a = iter->second;
+ if (a->state_get() != Assembly::STATE_ONLINE) {
+ a->matahari_discover(session);
+ }
+ }
+ a = NULL;
+ while (session->nextEvent(event, qpid::messaging::Duration::IMMEDIATE)) {
+ a = _agents_ass[event.getAgent().getName()];
+ if (a) {
+ a->process_qmf_events(event);
+ }
+ if (event.getType() == CONSOLE_AGENT_DEL) {
+ _agents_ass.erase(event.getAgent().getName());
+ }
+ }
+ return true;
+}
+
+void
+Deployable::map_agents_ass(string &agent_name, Assembly *a)
+{
+ _agents_ass[agent_name] = a;
+}
+
+static void
+_poll_for_qmf_events(gpointer data)
+{
+ Deployable *d = (Deployable *)data;
+ qb_loop_timer_handle timer_handle;
+
+ if (d->process_qmf_events()) {
+ mainloop_timer_add(1000, data,
+ _poll_for_qmf_events,
+ &timer_handle);
+ }
+}
+
Deployable::Deployable(std::string& uuid, CommonAgent *agent) :
- _name(""), _uuid(uuid), _config(NULL), _pe(NULL),
- _status_changed(false), _agent(agent)
+ _name(""), _uuid(uuid), _crmd_uuid(""), _config(NULL), _pe(NULL),
+ _status_changed(false), _agent(agent), _file_count(0),
+ _resource_counter(0)
{
+ qb_loop_timer_handle timer_handle;
+ std::stringstream filter;
+ string url("localhost:49000");
+ uuid_t tmp_id;
+ char tmp_id_s[37];
+
xmlInitParser();
+
+ uuid_generate(tmp_id);
+ uuid_unparse(tmp_id, tmp_id_s);
+ _crmd_uuid.insert(0, (char*)tmp_id_s, sizeof(tmp_id_s));
+
reload();
+
+ connection = new qpid::messaging::Connection(url, "");
+ connection->open();
+
+ session = new ConsoleSession(*connection, "max-agent-age:1");
+
+ filter << "[and";
+ filter << ", [eq, _vendor, [quote, 'matahariproject.org']]";
+// filter << ", [eq, _product, [quote, 'service']]";
+// filter << ", [or";
+// for (iter = _assemblies.begin(); iter != _assemblies.end(); iter++) {
+// filter << ", [eq, hostname, [quote, '" <<
iter->second->name_get() << "']]";
+// }
+// filter << "]";
+ filter << "]";
+
+ session->setAgentFilter(filter.str());
+
+ session->open();
+
+ qb_log(LOG_INFO, "session open. Filter is: %s", filter.str().c_str());
+
+ mainloop_timer_add(1000, this,
+ _poll_for_qmf_events,
+ &timer_handle);
}
Deployable::~Deployable()
@@ -58,86 +143,68 @@ Deployable::~Deployable()
_assemblies.erase(kill);
a->stop();
}
+ session->close();
+ connection->close();
+
/* Shutdown libxml */
xmlCleanupParser();
}
-/*
- <nodes>
- <node id="node1" uname="node1" type="member"/>
- <node id="node2" uname="node2" type="member"/>
- </nodes>
- */
void
-Deployable::assemblies2nodes(xmlNode * pcmk_config, xmlNode * assemblies)
+Deployable::create_services(string& ass_name, xmlNode * services)
{
- string ass_name;
- string ass_uuid;
- string ass_ip;
xmlNode *cur_node = NULL;
- xmlNode *nodes = xmlNewChild(pcmk_config, NULL, BAD_CAST "nodes", NULL);
- xmlNode *node = NULL;
+ string name;
+ string type;
- for (cur_node = assemblies; cur_node; cur_node = cur_node->next) {
- if (cur_node->type == XML_ELEMENT_NODE) {
- qb_log(LOG_DEBUG, "node name: %s", cur_node->name);
- ass_name = (char*)xmlGetProp(cur_node, BAD_CAST "name");
- ass_uuid = ass_name/* FIXME (char*)xmlGetProp(cur_node, BAD_CAST "uuid")*/;
- ass_ip = (char*)xmlGetProp(cur_node, BAD_CAST "ipaddr");
- assert(ass_ip.length() > 0);
-
- node = xmlNewChild(nodes, NULL, BAD_CAST "node", NULL);
- xmlNewProp(node, BAD_CAST "id", BAD_CAST ass_name.c_str());
- xmlNewProp(node, BAD_CAST "uname", BAD_CAST ass_name.c_str());
- xmlNewProp(node, BAD_CAST "type", BAD_CAST "member");
-
- assembly_add(ass_name, ass_uuid, ass_ip);
+ for (cur_node = services; cur_node; cur_node = cur_node->next) {
+ if (cur_node->type != XML_ELEMENT_NODE) {
+ continue;
+ }
+ type = (char*)xmlGetProp(cur_node, BAD_CAST "name");
+ name = "rsc_";
+ name += ass_name;
+ name += "_";
+ name += type;
+ qb_log(LOG_DEBUG, "service name: %s", name.c_str());
+
+ if (_resources[name] == NULL) {
+ string cl = "lsb";
+ string pr = "pacemaker";
+ _resources[name] = new Resource(this, name, type,
+ cl, pr);
}
}
}
-/*
- <resources>
- <primitive id="rsc1" class="heartbeat"
type="apache"/>
- <primitive id="rsc2" class="heartbeat"
type="apache"/>
- </resources>
- */
void
-Deployable::services2resources(xmlNode * pcmk_config, xmlNode * services)
+Deployable::create_assemblies(xmlNode * assemblies)
{
+ string ass_name;
+ string ass_uuid;
+ string ass_ip;
xmlNode *cur_node = NULL;
- xmlNode *resource = NULL;
- xmlChar *name = NULL;
- xmlChar *ha = NULL;
- xmlChar res_id[128];
- xmlNode *resources;
- xmlNode *operations = NULL;
- xmlNode *op = NULL;
- xmlChar op_id[128];
+ xmlNode *child_node = NULL;
- resources = xmlNewChild(pcmk_config, NULL, BAD_CAST "resources", NULL);
-
- for (cur_node = services; cur_node; cur_node = cur_node->next) {
- if (cur_node->type == XML_ELEMENT_NODE) {
- name = xmlGetProp(cur_node, BAD_CAST "name");
- ha = xmlGetProp(cur_node, BAD_CAST "HA");
- snprintf((char*)res_id, 128, "res-%d", _resource_counter);
- _resource_counter++;
-
- resource = xmlNewChild(resources, NULL, BAD_CAST "primitive", NULL);
- xmlNewProp(resource, BAD_CAST "id", res_id);
- xmlNewProp(resource, BAD_CAST "class", BAD_CAST "lsb");
- xmlNewProp(resource, BAD_CAST "type", name);
- if (ha && strcmp((char*)ha, "True") == 0) {
- operations = xmlNewChild(resource, NULL, BAD_CAST "operations", NULL);
- op = xmlNewChild(operations, NULL, BAD_CAST "op", NULL);
- snprintf((char*)op_id, 128, "monitor-%s", (char*)res_id);
-
- xmlNewProp(op, BAD_CAST "id", op_id);
- xmlNewProp(op, BAD_CAST "name", BAD_CAST "monitor");
- xmlNewProp(op, BAD_CAST "interval", BAD_CAST "10s");
+ for (cur_node = assemblies; cur_node; cur_node = cur_node->next) {
+ if (cur_node->type != XML_ELEMENT_NODE) {
+ continue;
+ }
+ ass_name = (char*)xmlGetProp(cur_node, BAD_CAST "name");
+ ass_uuid = ass_name/* FIXME (char*)xmlGetProp(cur_node, BAD_CAST "uuid")*/;
+ ass_ip = (char*)xmlGetProp(cur_node, BAD_CAST "ipaddr");
+ assert(ass_ip.length() > 0);
+ qb_log(LOG_DEBUG, "node name: %s", ass_name.c_str());
+
+ for (child_node = cur_node->children; child_node; child_node = child_node->next)
{
+ if (child_node->type != XML_ELEMENT_NODE) {
+ continue;
+ }
+ if (strcmp((char*)child_node->name, "services") == 0) {
+ create_services(ass_name, child_node->children);
}
}
+ assembly_add(ass_name, ass_uuid, ass_ip);
}
}
@@ -147,13 +214,10 @@ Deployable::reload(void)
int32_t rc;
xmlNode *cur_node = NULL;
xmlNode *dep_node = NULL;
- xmlNode *cib = NULL;
- xmlNode *nvpair = NULL;
- xmlNode *configuration = NULL;
- xmlNode *crm_config = NULL;
- xmlNode *cluster_property = NULL;
-
+ xsltStylesheetPtr ss = NULL;
+ const char *params[1];
::qpid::sys::Mutex::ScopedLock _lock(xml_lock);
+
if (_config != NULL) {
xmlFreeDoc(_config);
_config = NULL;
@@ -173,73 +237,79 @@ Deployable::reload(void)
}
_resource_counter = 1;
- _pe = xmlNewDoc(BAD_CAST "1.0");
-
- /* header gumf */
- cib = xmlNewNode(NULL, BAD_CAST "cib");
- xmlDocSetRootElement(_pe, cib);
- xmlNewProp(cib, BAD_CAST "admin_epoch", BAD_CAST "1");
- xmlNewProp(cib, BAD_CAST "epoch", BAD_CAST "1");
- xmlNewProp(cib, BAD_CAST "num_updates", BAD_CAST "1");
- xmlNewProp(cib, BAD_CAST "have-quorum", BAD_CAST "false");
- xmlNewProp(cib, BAD_CAST "dc-uuid", BAD_CAST "0");
- xmlNewProp(cib, BAD_CAST "remote-tls-port", BAD_CAST "0");
- xmlNewProp(cib, BAD_CAST "validate-with", BAD_CAST
"pacemaker-1.0");
-
- configuration = xmlNewChild(cib, NULL, BAD_CAST "configuration", NULL);
- crm_config = xmlNewChild(configuration, NULL, BAD_CAST "crm_config", NULL);
-
- cluster_property = xmlNewChild(crm_config, NULL, BAD_CAST
"cluster_property_set", NULL);
- xmlNewProp(cluster_property, BAD_CAST "id", BAD_CAST "no-stonith");
- nvpair = xmlNewChild(cluster_property, NULL, BAD_CAST "nvpair", NULL);
- xmlNewProp(nvpair, BAD_CAST "id", BAD_CAST "opt-no-stonith");
- xmlNewProp(nvpair, BAD_CAST "name", BAD_CAST "stonith-enabled");
- xmlNewProp(nvpair, BAD_CAST "value", BAD_CAST "false");
-
- cluster_property = xmlNewChild(crm_config, NULL, BAD_CAST
"cluster_property_set", NULL);
- xmlNewProp(cluster_property, BAD_CAST "id", BAD_CAST
"bootstrap-options");
- nvpair = xmlNewChild(cluster_property, NULL, BAD_CAST "nvpair", NULL);
- xmlNewProp(nvpair, BAD_CAST "id", BAD_CAST "opt-not-symetric");
- xmlNewProp(nvpair, BAD_CAST "name", BAD_CAST "symetric_cluster");
- xmlNewProp(nvpair, BAD_CAST "value", BAD_CAST "false");
-
- nvpair = xmlNewChild(cluster_property, NULL, BAD_CAST "nvpair", NULL);
- xmlNewProp(nvpair, BAD_CAST "id", BAD_CAST "opt-no-quorum-policy");
- xmlNewProp(nvpair, BAD_CAST "name", BAD_CAST "no-quorum-policy");
- xmlNewProp(nvpair, BAD_CAST "value", BAD_CAST "ignore");
+ ss = xsltParseStylesheetFile(BAD_CAST
"/usr/share/pacemaker-cloud/cf2pe.xsl");
+ params[0] = NULL;
+ _pe = xsltApplyStylesheet(ss, _config, params);
dep_node = xmlDocGetRootElement(_config);
for (cur_node = dep_node->children; cur_node;
cur_node = cur_node->next) {
if (cur_node->type == XML_ELEMENT_NODE) {
- qb_log(LOG_DEBUG, "hostname: %s", cur_node->name);
if (strcmp((char*)cur_node->name, "assemblies") == 0) {
- assemblies2nodes(configuration, cur_node->children);
- } else if (strcmp((char*)cur_node->name, "services") == 0) {
- services2resources(configuration, cur_node->children);
+ create_assemblies(cur_node->children);
}
}
}
- crm_config = xmlNewChild(configuration, NULL, BAD_CAST "constraints", NULL);
+
+ xsltFreeStylesheet(ss);
}
static void
_status_timeout(void *data)
{
Deployable *d = (Deployable *)data;
- d->process();
+
+ if (pe_is_busy_processing()) {
+ // try later
+ qb_log(LOG_INFO, "pe_is_busy_processing: trying later");
+ d->schedule_processing();
+ } else {
+ d->process();
+ }
}
static void
-resource_execute(struct pe_operation *op)
+resource_execute_cb(struct pe_operation *op)
{
- Assembly *a;
Deployable *d = (Deployable *)op->user_data;
- string name = op->hostname;
+ d->resource_execute(op);
+}
- a = d->assembly_get(name);
- assert(a != NULL);
- a->resource_execute(op);
+void
+Deployable::resource_execute(struct pe_operation *op)
+{
+ Resource *r = resource_get(op);
+ assert(r != NULL);
+
+ if (op->interval > 0 && strcmp(op->method, "monitor") == 0)
{
+ r->start_recurring(op);
+ } else if (strcmp(op->method, "delete") == 0) {
+ r->delete_op_history(op);
+ } else if (strcmp(op->method, "stop") == 0) {
+ r->stop(op);
+ } else {
+ r->execute(op);
+ }
+}
+
+static void
+transition_completed_cb(void* user_data, int32_t result)
+{
+ Deployable *d = (Deployable *)user_data;
+ d->transition_completed(result);
+}
+
+void
+Deployable::transition_completed(int32_t result)
+{
+ qb_log(LOG_INFO, "-- transition_completed -- %d", result);
+}
+
+Resource*
+Deployable::resource_get(struct pe_operation *op)
+{
+ string name = op->rname;
+ return _resources[name];
}
Assembly*
@@ -253,18 +323,33 @@ Deployable::assembly_get(std::string& hostname)
void
Deployable::process(void)
{
- map<string, Assembly*>::iterator iter;
xmlNode * cur_node = NULL;
xmlNode * pe_root = NULL;
xmlNode * status = NULL;
+ xmlNode * rscs = NULL;
+ int32_t rc = 0;
Assembly *a;
+ Resource *r;
::qpid::sys::Mutex::ScopedLock _lock(xml_lock);
if (_pe == NULL) {
return;
}
+ _status_changed = true;
+
+ if (_dc_uuid.length() == 0 ||
+ _assemblies[_dc_uuid]->state_get() != Assembly::STATE_ONLINE) {
+ for (map<string, Assembly*>::iterator a_iter = _assemblies.begin();
+ a_iter != _assemblies.end(); a_iter++) {
+ a = a_iter->second;
+ if (a->state_get() == Assembly::STATE_ONLINE) {
+ _dc_uuid = a->uuid_get();
+ }
+ }
+ }
pe_root = xmlDocGetRootElement(_pe);
+ xmlSetProp(pe_root, BAD_CAST "dc-uuid", BAD_CAST dc_uuid_get().c_str());
for (cur_node = pe_root->children; cur_node; cur_node = cur_node->next) {
if (cur_node->type == XML_ELEMENT_NODE &&
strcmp((char*)cur_node->name, "status") == 0) {
@@ -278,52 +363,71 @@ Deployable::process(void)
}
status = xmlNewChild(pe_root, NULL, BAD_CAST "status", NULL);
- for (iter = _assemblies.begin(); iter != _assemblies.end(); iter++) {
- a = iter->second;
+ for (map<string, Assembly*>::iterator a_iter = _assemblies.begin();
+ a_iter != _assemblies.end(); a_iter++) {
+ a = a_iter->second;
a->insert_status(status);
}
- pe_process_state(pe_root, resource_execute, this);
+ stringstream nf;
+ nf << "pe-out-" << _file_count << ".xml";
+ _file_count++;
+ qb_log(LOG_INFO, "processing new state with %s", nf.str().c_str());
+ xmlSaveFormatFileEnc(nf.str().c_str(), _pe, "UTF-8", 1);
+
+ rc = pe_process_state(pe_root, resource_execute_cb,
+ transition_completed_cb, this);
_status_changed = false;
+ if (rc != 0) {
+ schedule_processing();
+ }
}
void
-Deployable::service_state_changed(Assembly *a, string& service_name,
- string state, string reason)
+Deployable::schedule_processing(void)
+{
+ if (_status_changed) {
+ qb_log(LOG_INFO, "not scheduling - collecting status");
+ // busy collecting status
+ return;
+ }
+
+ if (mainloop_timer_is_running(_processing_timer)) {
+ qb_log(LOG_INFO, "not scheduling - already scheduled");
+ } else {
+ mainloop_timer_add(1000, this,
+ _status_timeout,
+ &_processing_timer);
+ }
+}
+
+void
+Deployable::service_state_changed(const string& ass_name, string& service_name,
+ string &state, string &reason)
{
qmf::Data event = qmf::Data(_agent->package.event_service_state_change);
+
event.setProperty("deployable", _uuid);
- event.setProperty("assembly", a->name_get());
+ event.setProperty("assembly", ass_name);
event.setProperty("service", service_name);
event.setProperty("state", state);
event.setProperty("reason", reason);
_agent->agent_session.raiseEvent(event);
-
- if (!_status_changed) {
- _status_changed = true;
- mainloop_job_add(QB_LOOP_LOW,
- this,
- _status_timeout);
- }
}
void
Deployable::assembly_state_changed(Assembly *a, string state, string reason)
{
+ qb_loop_timer_handle th;
qmf::Data event = qmf::Data(_agent->package.event_assembly_state_change);
+
event.setProperty("deployable", _uuid);
event.setProperty("assembly", a->name_get());
event.setProperty("state", state);
event.setProperty("reason", reason);
_agent->agent_session.raiseEvent(event);
-
- if (!_status_changed) {
- _status_changed = true;
- mainloop_job_add(QB_LOOP_LOW,
- this,
- _status_timeout);
- }
+ schedule_processing();
}
int32_t
diff --git a/src/deployable.h b/src/deployable.h
index c70324f..b8ecc09 100644
--- a/src/deployable.h
+++ b/src/deployable.h
@@ -21,31 +21,40 @@
#include <string>
#include <map>
-#include <libxml/parser.h>
-#include <libxml/tree.h>
-#include <libxml/xpath.h>
-#include <libxml/xpathInternals.h>
-
-#include <qpid/sys/Mutex.h>
+#include <qmf/ConsoleSession.h>
+#include "pcmk_pe.h"
#include "common_agent.h"
class Assembly;
+class Resource;
class Deployable {
private:
+ CommonAgent *_agent;
+ qpid::messaging::Connection *connection;
+ qmf::ConsoleSession *session;
+
std::string _name;
std::string _uuid;
+ std::string _dc_uuid;
+ std::string _crmd_uuid;
+ int _file_count;
+
std::map<std::string, Assembly*> _assemblies;
+ std::map<std::string, Resource*> _resources;
+ std::map<std::string, Assembly*> _agents_ass;
+
xmlDocPtr _config;
xmlDocPtr _pe;
qpid::sys::Mutex xml_lock;
+
int _resource_counter;
bool _status_changed;
- CommonAgent *_agent;
+ qb_loop_timer_handle _processing_timer;
- void services2resources(xmlNode * pcmk_config, xmlNode * services);
- void assemblies2nodes(xmlNode * pcmk_config, xmlNode * nodes);
+ void create_assemblies(xmlNode * nodes);
+ void create_services(std::string& ass_name, xmlNode * services);
int32_t assembly_add(std::string& name,
std::string& uuid,
@@ -54,22 +63,29 @@ private:
std::string& uuid);
public:
-
Deployable();
Deployable(std::string& uuid, CommonAgent *agent);
~Deployable();
- const std::string& get_name() const { return _name; }
- const std::string& get_uuid() const { return _uuid; }
+
+ const std::string& name_get() const { return _name; }
+ const std::string& uuid_get() const { return _uuid; }
+ const std::string& dc_uuid_get() const { return _dc_uuid; }
+ const std::string& crmd_uuid_get() const { return _crmd_uuid; }
+ Assembly* assembly_get(std::string& hostname);
+ Resource* resource_get(struct pe_operation * op);
void reload(void);
void process(void);
- void service_state_changed(Assembly *a, std::string& service_name,
- std::string state, std::string reason);
+ void schedule_processing(void);
+ void resource_execute(struct pe_operation *op);
+ void transition_completed(int32_t result);
+
+ void service_state_changed(const std::string& ass_name, std::string&
service_name,
+ std::string &state, std::string &reason);
void assembly_state_changed(Assembly *a, std::string state,
std::string reason);
- Assembly* assembly_get(std::string& hostname);
+ bool process_qmf_events(void);
+ void map_agents_ass(string &agent_name, Assembly *a);
};
-
-
diff --git a/src/dpe_agent.cpp b/src/dpe_agent.cpp
index 9eaad74..0cde625 100644
--- a/src/dpe_agent.cpp
+++ b/src/dpe_agent.cpp
@@ -88,12 +88,12 @@ DpeAgent::event_dispatch(AgentEvent *event)
for (map<string, Deployable*>::iterator iter = deployments.begin();
iter != deployments.end(); iter++) {
- d_list.push_back(iter->second->get_uuid());
+ d_list.push_back(iter->second->uuid_get());
cout << "listing(active) " << iter->first <<
- iter->second->get_name() <<
+ iter->second->name_get() <<
", uuid " <<
- iter->second->get_uuid() << endl;
+ iter->second->uuid_get() << endl;
}
}
if (rc == 0) {
diff --git a/src/mainloop.cpp b/src/mainloop.cpp
index 6cb60db..a91f22e 100644
--- a/src/mainloop.cpp
+++ b/src/mainloop.cpp
@@ -85,10 +85,11 @@ mainloop_job_add(enum qb_loop_priority p,
}
-int32_t mainloop_timer_add(uint32_t msec_duration,
- void *data,
- qb_loop_timer_dispatch_fn dispatch_fn,
- qb_loop_timer_handle * timer_handle_out)
+int32_t
+mainloop_timer_add(uint32_t msec_duration,
+ void *data,
+ qb_loop_timer_dispatch_fn dispatch_fn,
+ qb_loop_timer_handle * timer_handle_out)
{
return qb_loop_timer_add(default_loop, QB_LOOP_MED,
msec_duration * QB_TIME_NS_IN_MSEC,
@@ -97,3 +98,16 @@ int32_t mainloop_timer_add(uint32_t msec_duration,
timer_handle_out);
}
+int32_t
+mainloop_timer_del(qb_loop_timer_handle th)
+{
+ return qb_loop_timer_del(default_loop, th);
+}
+
+
+bool
+mainloop_timer_is_running(qb_loop_timer_handle timer_handle)
+{
+ return (qb_loop_timer_expire_time_get(default_loop, timer_handle) > 0);
+}
+
diff --git a/src/mainloop.h b/src/mainloop.h
index a1fd506..1c70897 100644
--- a/src/mainloop.h
+++ b/src/mainloop.h
@@ -18,6 +18,7 @@
#ifndef __MH_MAINLOOP__
#define __MH_MAINLOOP__
+#include <stdbool.h>
#include <qb/qbloop.h>
#ifdef __cplusplus
@@ -52,6 +53,10 @@ int32_t mainloop_timer_add(uint32_t msec_duration,
qb_loop_timer_dispatch_fn dispatch_fn,
qb_loop_timer_handle * timer_handle_out);
+int32_t mainloop_timer_del(qb_loop_timer_handle th);
+
+bool mainloop_timer_is_running(qb_loop_timer_handle timer_handle);
+
#ifdef __cplusplus
}
#endif /* __cplusplus */
diff --git a/src/pcmk_pe.c b/src/pcmk_pe.c
index faf1aa5..1ae0817 100644
--- a/src/pcmk_pe.c
+++ b/src/pcmk_pe.c
@@ -59,8 +59,10 @@ extern xmlNode * do_calculations(pe_working_set_t *data_set,
extern void cleanup_alloc_calculations(pe_working_set_t *data_set);
static pe_resource_execute_t run_fn = NULL;
+static pe_transition_completed_t completed_fn = NULL;
static void * run_user_data = NULL;
static pe_working_set_t *working_set = NULL;
+static int graph_updated = FALSE;
enum ocf_exitcode pe_resource_ocf_exitcode_get(struct pe_operation *op, int
lsb_exitcode)
{
@@ -85,8 +87,6 @@ enum ocf_exitcode pe_resource_ocf_exitcode_get(struct pe_operation *op,
int lsb_
return (enum ocf_exitcode)lsb_exitcode;
}
-static int graph_updated = FALSE;
-
void pe_resource_ref(struct pe_operation *op)
{
op->refcount++;
@@ -119,12 +119,7 @@ pe_resource_completed(struct pe_operation *op, uint32_t return_code)
return;
}
- if (return_code == op->target_outcome) {
- qb_log(LOG_DEBUG, "rsc %s succeeded %d == %d", op->method,
- return_code, op->target_outcome);
- } else {
- qb_log(LOG_ERR, "rsc %s failed %d != %d", op->method,
- return_code, op->target_outcome);
+ if (return_code != op->target_outcome) {
action->failed = TRUE;
graph->abort_priority = INFINITY;
}
@@ -134,13 +129,10 @@ pe_resource_completed(struct pe_operation *op, uint32_t
return_code)
}
-static gboolean
-exec_pseudo_action(crm_graph_t *graph, crm_action_t *action)
+static void
+dup_attr(gpointer key, gpointer value, gpointer user_data)
{
- action->confirmed = TRUE;
- update_graph(graph, action);
- graph_updated = TRUE;
- return TRUE;
+ g_hash_table_replace(user_data, crm_strdup(key), crm_strdup(value));
}
static gboolean
@@ -152,11 +144,12 @@ exec_rsc_action(crm_graph_t *graph, crm_action_t *action)
xmlNode *action_rsc = first_named_child(action->xml, XML_CIB_TAG_RESOURCE);
char *node = crm_element_value_copy(action->xml, XML_LRM_ATTR_TARGET);
const char *tmp_provider;
+ xmlNode *params_all;
qb_enter();
if (safe_str_eq(crm_element_value(action->xml, "operation"),
"probe_complete")) {
- qb_log(LOG_DEBUG, "Skipping %s op for %s\n",
+ qb_log(LOG_INFO, "Skipping rsc %s op for %s\n",
crm_element_value(action->xml, "operation"), node);
crm_free(node);
action->confirmed = TRUE;
@@ -188,55 +181,76 @@ exec_rsc_action(crm_graph_t *graph, crm_action_t *action)
}
op = convert_graph_action(NULL, action, 0, pe_op->target_outcome);
+ params_all = create_xml_node(NULL, XML_TAG_PARAMS);
+ g_hash_table_foreach(op->params, hash2field, params_all);
+/*
+ * TODO at some point.
+ g_hash_table_foreach(action->extra, hash2field, params_all);
+ g_hash_table_foreach(rsc->parameters, hash2field, params_all);
+ g_hash_table_foreach(action->meta, hash2metafield, params_all);
+*/
+ filter_action_parameters(params_all, PE_CRM_VERSION);
+ pe_op->op_digest = calculate_operation_digest(params_all, PE_CRM_VERSION);
+
pe_op->method = strdup(op->op_type);
- pe_op->params = op->params; /* TODO copy params */
+
+ pe_op->params = g_hash_table_new_full(crm_str_hash, g_str_equal,
+ g_hash_destroy_str, g_hash_destroy_str);
+
+ if (op->params != NULL) {
+ g_hash_table_foreach(op->params, dup_attr, pe_op->params);
+ }
+
pe_op->timeout = op->timeout;
pe_op->interval = op->interval;
pe_op->action = action;
pe_op->graph = graph;
+ pe_op->action_id = action->id;
+ pe_op->graph_id = graph->id;
free_lrm_op(op);
+ free_xml(params_all);
run_fn(pe_op);
return TRUE;
}
static gboolean
+exec_pseudo_action(crm_graph_t *graph, crm_action_t *action)
+{
+ qb_log(LOG_INFO, "Skipping pseudo %s op \n",
+ crm_element_value(action->xml, "operation"));
+
+ action->confirmed = TRUE;
+ update_graph(graph, action);
+ graph_updated = TRUE;
+ return TRUE;
+}
+
+static gboolean
exec_crmd_action(crm_graph_t *graph, crm_action_t *action)
{
+ qb_log(LOG_INFO, "Skipping crmd %s op \n",
+ crm_element_value(action->xml, "operation"));
+
action->confirmed = TRUE;
update_graph(graph, action);
graph_updated = TRUE;
return TRUE;
}
-#define STATUS_PATH_MAX 512
static gboolean
exec_stonith_action(crm_graph_t *graph, crm_action_t *action)
{
-#if 0
- int rc = 0;
- char xpath[STATUS_PATH_MAX];
char *target = crm_element_value_copy(action->xml, XML_LRM_ATTR_TARGET);
- xmlNode *cib_node = modify_node(global_cib, target, FALSE);
- crm_xml_add(cib_node, XML_ATTR_ORIGIN, __FUNCTION__);
- CRM_ASSERT(cib_node != NULL);
-
- quiet_log(" * Fencing %s\n", target);
- rc = global_cib->cmds->replace(global_cib, XML_CIB_TAG_STATUS, cib_node,
cib_sync_call|cib_scope_local);
- CRM_ASSERT(rc == cib_ok);
- snprintf(xpath, STATUS_PATH_MAX, "//node_state[@uname='%s']/%s",
target, XML_CIB_TAG_LRM);
- rc = global_cib->cmds->delete(global_cib, xpath, NULL,
cib_xpath|cib_sync_call|cib_scope_local);
+ qb_log(LOG_WARNING, "Skipping STONITH %s op (not fencing %s)\n",
+ crm_element_value(action->xml, "operation"), target);
+ crm_free(target);
- snprintf(xpath, STATUS_PATH_MAX, "//node_state[@uname='%s']/%s",
target, XML_TAG_TRANSIENT_NODEATTRS);
- rc = global_cib->cmds->delete(global_cib, xpath, NULL,
cib_xpath|cib_sync_call|cib_scope_local);
-#endif
action->confirmed = TRUE;
update_graph(graph, action);
graph_updated = TRUE;
- // free_xml(cib_node);
- // crm_free(target);
return TRUE;
}
@@ -253,11 +267,12 @@ process_next_job(void* data)
{
crm_graph_t *transition = (crm_graph_t *)data;
enum transition_status graph_rc;
+ qb_loop_timer_handle th;
if (!graph_updated) {
- mainloop_job_add(QB_LOOP_LOW,
- transition,
- process_next_job);
+ mainloop_timer_add(1000,
+ transition,
+ process_next_job, &th);
return;
}
qb_enter();
@@ -265,20 +280,20 @@ process_next_job(void* data)
graph_updated = FALSE;
graph_rc = run_graph(transition);
+ qb_log(LOG_INFO, "run_graph returned: %s", transition_status(graph_rc));
+
if (graph_rc == transition_active || graph_rc == transition_pending) {
- mainloop_job_add(QB_LOOP_LOW,
- transition,
- process_next_job);
+ mainloop_timer_add(1000,
+ transition,
+ process_next_job, &th);
return;
}
-
if (graph_rc == transition_complete) {
- qb_log(LOG_DEBUG, "Transition Completed");
+ qb_log(LOG_INFO, "Transition Completed");
} else {
qb_log(LOG_ERR, "Transition failed: %s",
transition_status(graph_rc));
- print_graph(LOG_ERR, transition);
}
destroy_graph(transition);
@@ -288,11 +303,40 @@ process_next_job(void* data)
free(working_set);
working_set = NULL;
+ completed_fn(run_user_data, graph_rc);
+
return;
}
int32_t
-pe_process_state(xmlNode *xml_input, pe_resource_execute_t fn,
+pe_is_busy_processing(void)
+{
+ if (working_set != NULL) {
+ return TRUE;
+ }
+ return FALSE;
+}
+
+
+void
+cl_log(int priority, const char * fmt, ...)
+{
+ va_list ap;
+ char buf[512];
+
+ buf[512-1] = '\0';
+ va_start(ap, fmt);
+ (void)vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ qb_log_from_external_source(__func__, __FILE__,
+ "%s", priority, __LINE__, 3, buf);
+}
+
+int32_t
+pe_process_state(xmlNode *xml_input,
+ pe_resource_execute_t exec_fn,
+ pe_transition_completed_t done_fn,
void *user_data)
{
crm_graph_t *transition = NULL;
@@ -301,10 +345,17 @@ pe_process_state(xmlNode *xml_input, pe_resource_execute_t fn,
qb_log(LOG_ERR, "Transition already in progress");
return -EEXIST;
}
+
+ set_crm_log_level(LOG_INFO);
+ //set_crm_log_level(12);
+
+ assert(validate_xml(xml_input, NULL, FALSE) == TRUE);
+
qb_log(LOG_INFO, "Executing deployable transition");
working_set = calloc(1, sizeof(pe_working_set_t));
- run_fn = fn;
+ run_fn = exec_fn;
+ completed_fn = done_fn;
run_user_data = user_data;
set_graph_functions(&graph_exec_fns);
@@ -313,8 +364,8 @@ pe_process_state(xmlNode *xml_input, pe_resource_execute_t fn,
/* calculate output */
do_calculations(working_set, xml_input, NULL);
- transition = unpack_graph(working_set->graph, crm_system_name);
- //print_graph(LOG_DEBUG, transition);
+ transition = unpack_graph(working_set->graph, __func__);
+ //print_graph(LOG_INFO, transition);
graph_updated = TRUE;
mainloop_job_add(QB_LOOP_HIGH,
diff --git a/src/pcmk_pe.h b/src/pcmk_pe.h
index dd7bc63..e2dd3d1 100644
--- a/src/pcmk_pe.h
+++ b/src/pcmk_pe.h
@@ -29,6 +29,7 @@ extern "C" {
#include <glib.h>
#include <libxml/parser.h>
+#define PE_CRM_VERSION "3.0.5"
enum ocf_exitcode {
OCF_PENDING = -1,
@@ -52,16 +53,22 @@ struct pe_operation {
char *rprovider;
char *rtype;
GHashTable *params;
+ char *op_digest;
uint32_t timeout;
+ uint32_t times_executed;
uint32_t interval;
uint32_t target_outcome;
void *user_data;
void *graph;
void *action;
+ void *resource;
+ uint32_t graph_id;
+ uint32_t action_id;
uint32_t refcount;
};
typedef void (*pe_resource_execute_t)(struct pe_operation *op);
+typedef void (*pe_transition_completed_t)(void* user_data, int32_t result);
enum ocf_exitcode pe_resource_ocf_exitcode_get(struct pe_operation *op,
int lsb_exitcode);
@@ -69,9 +76,13 @@ void pe_resource_completed(struct pe_operation *op, uint32_t
return_code);
void pe_resource_ref(struct pe_operation *op);
void pe_resource_unref(struct pe_operation *op);
-int32_t pe_process_state(xmlNode *xml_input, pe_resource_execute_t fn,
+int32_t pe_process_state(xmlNode *xml_input,
+ pe_resource_execute_t exec_fn,
+ pe_transition_completed_t done_fn,
void *user_data);
+int32_t pe_is_busy_processing(void);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/resource.cpp b/src/resource.cpp
new file mode 100644
index 0000000..67ba43d
--- /dev/null
+++ b/src/resource.cpp
@@ -0,0 +1,301 @@
+/*
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * Authors: Angus Salkeld <asalkeld(a)redhat.com>
+ *
+ * This file is part of pacemaker-cloud.
+ *
+ * pacemaker-cloud is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * pacemaker-cloud is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with pacemaker-cloud. If not, see <
http://www.gnu.org/licenses/>.
+ */
+#include "config.h"
+
+#include <qb/qblog.h>
+
+#include <iostream>
+#include <sstream>
+#include <map>
+
+#include "pcmk_pe.h"
+#include "mainloop.h"
+#include "resource.h"
+#include "assembly.h"
+#include "deployable.h"
+
+static uint32_t call_order = 0;
+
+using namespace std;
+using namespace qmf;
+
+static void
+resource_interval_timeout(gpointer data)
+{
+ struct pe_operation *op = (struct pe_operation *)data;
+ Resource *rsc = (Resource *)op->resource;
+
+ if (op->refcount == 1) {
+ // we are the last ref holder
+ pe_resource_unref(op);
+ return;
+ }
+
+ rsc->__execute(op);
+
+ mainloop_timer_add(op->interval, data,
+ resource_interval_timeout,
+ &rsc->_monitor_timer);
+}
+
+void
+Resource::failed(struct pe_operation *op)
+{
+ string ass_name = op->hostname;
+ string rname = op->rtype;
+ string running = "failed";
+ string reason = "monitor failed";
+ qb_log(LOG_NOTICE, "resource %s FAILED", _id.c_str());
+
+ _dep->service_state_changed(ass_name, rname, running, reason);
+ _dep->schedule_processing();
+}
+
+void
+Resource::execute(struct pe_operation *op)
+{
+ op->resource = this;
+ __execute(op);
+}
+
+void
+Resource::stop(struct pe_operation *op)
+{
+ if (mainloop_timer_is_running(_monitor_timer)) {
+ pe_resource_unref(_monitor_op);
+ mainloop_timer_del(_monitor_timer);
+ }
+ op->resource = this;
+ __execute(op);
+}
+
+void
+Resource::start_recurring(struct pe_operation *op)
+{
+ qb_enter();
+
+ op->resource = this;
+ if (!mainloop_timer_is_running(_monitor_timer)) {
+ __execute(op);
+
+ op->user_data = this;
+ _monitor_op = op;
+ pe_resource_ref(op);
+ mainloop_timer_add(op->interval,
+ op, resource_interval_timeout,
+ &_monitor_timer);
+ } else {
+ pe_resource_completed(op, OCF_OK);
+ pe_resource_unref(op); // delete
+ }
+}
+
+void
+Resource::__execute(struct pe_operation *op)
+{
+ Agent a;
+ qpid::types::Variant::Map in_args;
+ qpid::types::Variant::Map in_params;
+ const char *rmethod = op->method;
+ string name = op->hostname;
+ Assembly *ass = _dep->assembly_get(name);
+ assert(ass != NULL);
+
+ qb_enter();
+
+ if (ass->state_get() != Assembly::STATE_ONLINE) {
+ qb_log(LOG_DEBUG, "can't execute resource in offline state");
+ completed(op, OCF_UNKNOWN_ERROR);
+ return;
+ }
+ if (strstr(_id.c_str(), op->hostname) == NULL) {
+ if (strcmp(op->method, "monitor") == 0) {
+ completed(op, OCF_NOT_RUNNING);
+ } else {
+ completed(op, OCF_UNKNOWN_ERROR);
+ }
+ qb_leave();
+ return;
+ }
+ if (strcmp(op->method, "monitor") == 0) {
+ in_args["interval"] = 0;
+ }
+ if (op->timeout < 20000) {
+ in_args["timeout"] = 20000;
+ } else {
+ in_args["timeout"] = op->timeout;
+ }
+
+ if (strcmp(op->rclass, "lsb") == 0) {
+ if (strcmp(op->method, "monitor") == 0) {
+ rmethod = "status";
+ }
+ in_args["name"] = op->rtype;
+ pe_resource_ref(op);
+ ass->resource_execute(op, rmethod, in_args);
+ } else {
+ // make a non-empty parameters map
+ in_params["qmf"] = "frustrating";
+
+ in_args["name"] = op->rname;
+ in_args["class"] = op->rclass;
+ in_args["provider"] = op->rprovider;
+ in_args["type"] = op->rtype;
+ in_args["parameters"] = in_params;
+ pe_resource_ref(op);
+ ass->resource_execute(op, rmethod, in_args);
+ }
+ qb_leave();
+}
+
+xmlNode *
+Resource::insert_status(xmlNode *rscs)
+{
+ xmlNode *rsc = xmlNewChild(rscs, NULL, BAD_CAST "lrm_resource", NULL);
+ xmlNewProp(rsc, BAD_CAST "id", BAD_CAST _id.c_str());
+ xmlNewProp(rsc, BAD_CAST "type", BAD_CAST _type.c_str());
+ xmlNewProp(rsc, BAD_CAST "class", BAD_CAST _class.c_str());
+ if (strcmp(_class.c_str(), "ocf") == 0) {
+ xmlNewProp(rsc, BAD_CAST "provider", BAD_CAST _provider.c_str());
+ }
+ return rsc;
+}
+
+void
+Resource::save(struct pe_operation *op, enum ocf_exitcode ec)
+{
+ struct operation_history *oh;
+ stringstream id;
+ string node_name = op->hostname;
+ Assembly* a = _dep->assembly_get(node_name);
+
+ if (strstr(op->rname, op->hostname) == NULL) {
+ return;
+ }
+
+ id << op->rname << "_" << op->method <<
"_" << op->interval;
+
+ oh = a->op_history[id.str()];
+ if (oh == NULL) {
+ oh = (struct operation_history *)calloc(1, sizeof(struct operation_history));
+ oh->resource = this;
+ oh->rsc_id = new string(id.str());
+ oh->operation = new string(op->method);
+ oh->target_outcome = op->target_outcome;
+ oh->interval = op->interval;
+ oh->rc = OCF_PENDING;
+ oh->op_digest = op->op_digest;
+
+ a->op_history[id.str()] = oh;
+ } else if (strcmp(oh->op_digest, op->op_digest) != 0) {
+ free(oh->op_digest);
+ oh->op_digest = op->op_digest;
+ }
+ if (oh->rc != ec) {
+ oh->last_rc_change = time(NULL);
+ oh->rc = ec;
+ }
+
+ oh->last_run = time(NULL);
+ oh->call_id = call_order++;
+ oh->graph_id = op->graph_id;
+ oh->action_id = op->action_id;
+}
+
+void
+Resource::delete_op_history(struct pe_operation *op)
+{
+ string node_name = op->hostname;
+ Assembly* a = _dep->assembly_get(node_name);
+
+ /* delete the op history
+ */
+ for (map<string, struct operation_history*>::iterator iter =
a->op_history.begin();
+ iter != a->op_history.end(); iter++) {
+ struct operation_history *oh = iter->second;
+ if (this == oh->resource) {
+ a->op_history.erase(iter);
+ }
+ }
+ /* stop the recurring monitor.
+ */
+ if (mainloop_timer_is_running(_monitor_timer)) {
+ pe_resource_unref(_monitor_op);
+ mainloop_timer_del(_monitor_timer);
+ }
+ qb_log(LOG_INFO, "%s_%s_%d [%s] on %s rc:0 target_rc:%d",
+ op->rname, op->method, op->interval, op->rclass, op->hostname,
+ op->target_outcome);
+ pe_resource_completed(op, OCF_OK);
+ pe_resource_unref(op);
+}
+
+void
+Resource::completed(struct pe_operation *op, enum ocf_exitcode ec)
+{
+ string name = op->hostname;
+ Assembly *ass = _dep->assembly_get(name);
+ assert(ass != NULL);
+
+ qb_log(LOG_INFO, "%s_%s_%d [%s] on %s rc:%d target_rc:%d",
+ op->rname, op->method, op->interval, op->rclass, op->hostname,
+ ec, op->target_outcome);
+
+ if (ass->state_get() != Assembly::STATE_ONLINE) {
+ if (op->interval == 0) {
+ pe_resource_completed(op, OCF_UNKNOWN_ERROR);
+ }
+ pe_resource_unref(op);
+ _dep->schedule_processing();
+ return;
+ }
+
+ save(op, ec);
+
+ if (ec != op->target_outcome) {
+ _dep->schedule_processing();
+ }
+ if (op->interval == 0) {
+ if (strcmp(op->method, "start") == 0 && ec == OCF_OK) {
+ string rname = op->rtype;
+ string running = "running";
+ string reason = "started OK";
+ _dep->service_state_changed(ass->name_get(), rname, running, reason);
+ }
+
+ pe_resource_completed(op, ec);
+ pe_resource_unref(op);
+ return;
+ }
+
+ if (op->times_executed <= 1) {
+ pe_resource_completed(op, ec);
+ } else if (ec != op->target_outcome) {
+ failed(op);
+ }
+}
+
+Resource::Resource(Deployable *d, string& id, string& type, string&
class_name,
+ string& provider)
+: _dep(d), _id(id), _type(type), _class(class_name), _provider(provider)
+{
+}
+
diff --git a/src/resource.h b/src/resource.h
new file mode 100644
index 0000000..e79cbc5
--- /dev/null
+++ b/src/resource.h
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2011 Red Hat, Inc.
+ *
+ * Authors: Angus Salkeld <asalkeld(a)redhat.com>
+ *
+ * This file is part of pacemaker-cloud.
+ *
+ * pacemaker-cloud is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * pacemaker-cloud is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with pacemaker-cloud. If not, see <
http://www.gnu.org/licenses/>.
+ */
+
+#include <string>
+#include <libxml/parser.h>
+
+struct operation_history {
+ std::string* rsc_id;
+ std::string* operation;
+ uint32_t call_id;
+ uint32_t interval;
+ enum ocf_exitcode rc;
+ uint32_t target_outcome;
+ time_t last_run;
+ time_t last_rc_change;
+ uint32_t graph_id;
+ uint32_t action_id;
+ void *resource;
+ char *op_digest;
+};
+
+class Deployable;
+
+class Resource {
+private:
+ std::string _id;
+ std::string _type;
+ std::string _class;
+ std::string _provider;
+ Deployable *_dep;
+
+ void save(struct pe_operation *op, enum ocf_exitcode ec);
+
+public:
+ qb_loop_timer_handle _monitor_timer;
+ struct pe_operation *_monitor_op;
+ Resource();
+ Resource(Deployable *d, std::string& id, std::string& type,
+ std::string& class_name, std::string& provider);
+ ~Resource();
+
+ void stop(struct pe_operation *op);
+ void delete_op_history(struct pe_operation *op);
+ void start_recurring(struct pe_operation *op);
+ void execute(struct pe_operation *op);
+ void __execute(struct pe_operation *op);
+ void completed(struct pe_operation *op, enum ocf_exitcode ec);
+ void failed(struct pe_operation *op);
+ xmlNode * insert_status(xmlNode *rscs);
+};
--
1.7.5.2