modules/common/cassandra-bundle/pom.xml | 18 + modules/common/cassandra-bundle/src/main/java/org/rhq/cassandra/bundle/EmbeddedDeployer.java | 17 + modules/common/cassandra-bundle/src/main/resources/deploy.xml | 1 modules/enterprise/server/jar/pom.xml | 18 + modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterHeartBeatJob.java | 115 ++++++++++ modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterManagerBean.java | 26 ++ 6 files changed, 193 insertions(+), 2 deletions(-)
New commits: commit 45960649c0bbc1067a6d2d2134703857fd4e858b Author: John Sanda jsanda@redhat.com Date: Sat Sep 29 11:20:54 2012 -0400
Periodically check that we can connect to at least one cassadra node
This is the initial commit for CassandraClusterHeartBeatJob. It checks that the RHQ server can connect to at least one cassandra node. If the server cannot establish a connection to any nodes, the rhq server goes into maintenance mode. When in maintenance mode the server will be brought back into normal operating mode when a connection is established.
CassandraClusterManagerBean submits the jobs in its installBundle method which is called at server start up. EmbeddedDeployer has been updated to return a list of cluster ndoes. That list is passed into CassandraClusterHeartBeatJob.
diff --git a/modules/common/cassandra-bundle/pom.xml b/modules/common/cassandra-bundle/pom.xml index f8c489f..019653f 100644 --- a/modules/common/cassandra-bundle/pom.xml +++ b/modules/common/cassandra-bundle/pom.xml @@ -23,6 +23,24 @@ <artifactId>rhq-ant-bundle-common</artifactId> <version>${project.version}</version> </dependency> + + <dependency> + <groupId>org.hectorclient</groupId> + <artifactId>hector-core</artifactId> + <version>1.1-1</version> + </dependency> + + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <version>1.1.4</version> + </dependency> + + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>0.7.0</version> + </dependency> </dependencies>
<build> diff --git a/modules/common/cassandra-bundle/src/main/java/org/rhq/cassandra/bundle/EmbeddedDeployer.java b/modules/common/cassandra-bundle/src/main/java/org/rhq/cassandra/bundle/EmbeddedDeployer.java index 0e6b952..bd91fac 100644 --- a/modules/common/cassandra-bundle/src/main/java/org/rhq/cassandra/bundle/EmbeddedDeployer.java +++ b/modules/common/cassandra-bundle/src/main/java/org/rhq/cassandra/bundle/EmbeddedDeployer.java @@ -48,7 +48,22 @@ import org.rhq.core.util.stream.StreamUtil; */ public class EmbeddedDeployer {
- public void deploy(DeploymentOptions deploymentOptions) throws CassandraException { + private DeploymentOptions deploymentOptions; + + public void setDeploymentOptions(DeploymentOptions deploymentOptions) { + this.deploymentOptions = deploymentOptions; + } + + public String getCassandraHosts() { + StringBuilder hosts = new StringBuilder(); + for (int i = 0; i < deploymentOptions.getNumNodes(); ++i) { + hosts.append(getLocalIPAddress(i + 1)).append(":9160,"); + } + hosts.deleteCharAt(hosts.length() - 1); + return hosts.toString(); + } + + public void deploy() throws CassandraException { Set<String> ipAddresses = calculateLocalIPAddresses(deploymentOptions.getNumNodes()); File clusterDir = new File(deploymentOptions.getClusterDir()); File installedMarker = new File(clusterDir, ".installed"); diff --git a/modules/common/cassandra-bundle/src/main/resources/deploy.xml b/modules/common/cassandra-bundle/src/main/resources/deploy.xml index c92e7c7..3675e25 100644 --- a/modules/common/cassandra-bundle/src/main/resources/deploy.xml +++ b/modules/common/cassandra-bundle/src/main/resources/deploy.xml @@ -14,6 +14,7 @@ <rhq:input-property name="cluster.dir" description="The directory in which Cassandra nodes will be installed" required="true" + defaultValue="" type="string"/>
<rhq:input-property name="auto.bootstrap" diff --git a/modules/enterprise/server/jar/pom.xml b/modules/enterprise/server/jar/pom.xml index 24a3333..7f220ad 100644 --- a/modules/enterprise/server/jar/pom.xml +++ b/modules/enterprise/server/jar/pom.xml @@ -100,6 +100,24 @@ <version>1.2.1</version> </dependency>
+ <dependency> + <groupId>org.hectorclient</groupId> + <artifactId>hector-core</artifactId> + <version>1.1-1</version> + </dependency> + + <dependency> + <groupId>org.apache.cassandra</groupId> + <artifactId>cassandra-all</artifactId> + <version>1.1.4</version> + </dependency> + + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>0.7.0</version> + </dependency> +
<!--================ Test Deps ================-->
diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterHeartBeatJob.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterHeartBeatJob.java new file mode 100644 index 0000000..8aed753 --- /dev/null +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterHeartBeatJob.java @@ -0,0 +1,115 @@ +/* + * + * * RHQ Management Platform + * * Copyright (C) 2005-2012 Red Hat, Inc. + * * All rights reserved. + * * + * * This program is free software; you can redistribute it and/or modify + * * it under the terms of the GNU General Public License, version 2, as + * * published by the Free Software Foundation, and/or the GNU Lesser + * * General Public License, version 2.1, also as published by the Free + * * Software Foundation. + * * + * * This program is distributed in the hope that it will be useful, + * * but WITHOUT ANY WARRANTY; without even the implied warranty of + * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * * GNU General Public License and the GNU Lesser General Public License + * * for more details. + * * + * * You should have received a copy of the GNU General Public License + * * and the GNU Lesser General Public License along with this program; + * * if not, write to the Free Software Foundation, Inc., + * * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * + */ + +package org.rhq.enterprise.server.cassandra; + +import static org.rhq.core.domain.cloud.Server.OperationMode.MAINTENANCE; +import static org.rhq.core.domain.cloud.Server.OperationMode.NORMAL; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; + +import org.rhq.core.domain.cloud.Server; +import org.rhq.enterprise.server.cloud.CloudManagerLocal; +import org.rhq.enterprise.server.cloud.instance.ServerManagerLocal; +import org.rhq.enterprise.server.util.LookupUtil; + +import me.prettyprint.cassandra.service.CassandraHost; + +/** + * @author John Sanda + */ +public class CassandraClusterHeartBeatJob implements Job { + + public static final String JOB_NAME = CassandraClusterHeartBeatJob.class.getSimpleName(); + public static final String KEY_CONNECTION_TIMEOUT = "rhq.cassandra.connection.timeout"; + public static final String KEY_CASSANDRA_HOSTS = "rhq.cassandra.hosts"; + + private final Log log = LogFactory.getLog(CassandraClusterHeartBeatJob.class); + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + Server rhqServer = getRhqServer(); + JobDataMap dataMap = context.getMergedJobDataMap(); + String hosts = (String) dataMap.get(KEY_CASSANDRA_HOSTS); + int timeout = Integer.parseInt((String) dataMap.get(KEY_CONNECTION_TIMEOUT)); + + for (String s : hosts.split(",")) { + String[] params = s.split(":"); + CassandraHost host = new CassandraHost(params[0], Integer.parseInt(params[1])); + TSocket socket = new TSocket(host.getHost(), host.getPort(), timeout); + try { + socket.open(); + if (log.isDebugEnabled()) { + log.debug("Successfully connected to cassandra node [" + host + "]"); + } + if (rhqServer.getOperationMode() != NORMAL) { + changeServerMode(rhqServer, NORMAL); + } + return; + } catch (TTransportException e) { + String msg = "Unable to open thrift connection to cassandra node [" + host + "]"; + logException(msg, e); + } + } + if (log.isWarnEnabled()) { + log.warn(rhqServer + " is unable to connect to any Cassandra node. Server will go into maintenance mode."); + } + changeServerMode(rhqServer, MAINTENANCE); + } + + private Server getRhqServer() { + ServerManagerLocal serverManager = LookupUtil.getServerManager(); + return serverManager.getServer(); + } + + private void changeServerMode(Server rhqServer, Server.OperationMode mode) { + if (rhqServer.getOperationMode() == mode) { + return; + } + + if (log.isInfoEnabled()) { + log.info("Moving " + rhqServer + " from " + rhqServer.getOperationMode() + " to " + mode); + } + CloudManagerLocal rhqClusterManager = LookupUtil.getCloudManager(); + rhqClusterManager.updateServerMode(new Integer[] {rhqServer.getId()}, mode); + } + + private void logException(String msg, Exception e) { + if (log.isDebugEnabled()) { + log.debug(msg, e); + } else if (log.isInfoEnabled()) { + log.info(msg + ": " + e.getMessage()); + } else { + log.warn(msg); + } + } +} diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterManagerBean.java index ef6e94e..8d15d1f 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterManagerBean.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/cassandra/CassandraClusterManagerBean.java @@ -36,6 +36,7 @@ import java.math.BigInteger; import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.UUID;
import javax.ejb.EJB; import javax.ejb.Stateless; @@ -43,6 +44,8 @@ import javax.ejb.TransactionAttribute;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.quartz.JobDataMap; +import org.quartz.SchedulerException;
import org.rhq.cassandra.CassandraException; import org.rhq.cassandra.bundle.DeploymentOptions; @@ -67,6 +70,7 @@ import org.rhq.enterprise.server.bundle.BundleManagerLocal; import org.rhq.enterprise.server.resource.ResourceManagerLocal; import org.rhq.enterprise.server.resource.ResourceNotFoundException; import org.rhq.enterprise.server.resource.group.ResourceGroupManagerLocal; +import org.rhq.enterprise.server.scheduler.SchedulerLocal;
/** * @author John Sanda @@ -88,6 +92,9 @@ public class CassandraClusterManagerBean implements CassandraClusterManagerLocal @EJB private ResourceGroupManagerLocal resourceGroupManager;
+ @EJB + private SchedulerLocal scheduler; + @Override @TransactionAttribute(NEVER) public void installBundle() throws CassandraException { @@ -124,7 +131,24 @@ public class CassandraClusterManagerBean implements CassandraClusterManagerLocal }
EmbeddedDeployer deployer = new EmbeddedDeployer(); - deployer.deploy(new DeploymentOptions(deploymentProps)); + deployer.setDeploymentOptions(new DeploymentOptions(deploymentProps)); + deployer.deploy(); + + String jobTrigger = "CassandraClusterHeartBeatTrigger - " + UUID.randomUUID().toString(); + String jobGroup = CassandraClusterHeartBeatJob.JOB_NAME + "Group"; + + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put(CassandraClusterHeartBeatJob.KEY_CONNECTION_TIMEOUT, "100"); + jobDataMap.put(CassandraClusterHeartBeatJob.KEY_CASSANDRA_HOSTS, deployer.getCassandraHosts()); + + try { + scheduler.scheduleRepeatingJob(CassandraClusterHeartBeatJob.JOB_NAME, jobGroup, jobDataMap, + CassandraClusterHeartBeatJob.class, true, true, 3000, 5000); + } catch (SchedulerException e) { + String msg = "Unable to schedule " + CassandraClusterHeartBeatJob.class.getSimpleName() + " job. The " + + "server will reamin in maintenance mode without a manual override."; + log.error(msg, e); + }
// Resource platform = getPlatform(overlord, hostname); // ResourceGroup group = getPlatformGroup(overlord, platform, hostname);