modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java | 2
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java | 40 +++++++---
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java | 40 ++++++++++
modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java | 3
modules/helpers/metrics-simulator/src/main/resources/conf/log4j.properties | 4 -
5 files changed, 76 insertions(+), 13 deletions(-)
New commits:
commit 4fc8389bc479b00065d6abea2944b911517f4638
Author: John Sanda <jsanda(a)redhat.com>
Date: Sat Oct 19 22:07:45 2013 -0400
adding configuration support for running async aggregation
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java
index 7a7c6b9..a14adf7 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/MeasurementAggregator.java
@@ -62,6 +62,7 @@ public class MeasurementAggregator implements Runnable {
@Override
public void run() {
Timer.Context context = metrics.totalAggregationTime.time();
+ long start = System.currentTimeMillis();
try {
log.debug("Starting metrics aggregation");
metricsServer.calculateAggregates();
@@ -71,6 +72,7 @@ public class MeasurementAggregator implements Runnable {
shutdownManager.shutdown(1);
} finally {
context.stop();
+ log.debug("Finished metrics aggregation in " + (System.currentTimeMillis() - start) + " ms");
metrics.totalAggregationRuns.inc();
}
}
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
index 9744ef2..4a18fb0 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/Simulator.java
@@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import com.codahale.metrics.ConsoleReporter;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -65,29 +67,37 @@ public class Simulator implements ShutdownManager {
Metrics metrics = new Metrics();
final ConsoleReporter consoleReporter = createConsoleReporter(metrics, plan.getMetricsReportInterval());
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- shutdown(collectors, "collectors", 5);
- shutdown(readers, "readers", 5);
- shutdown(aggregators, "aggregators", 1);
- shutdown(aggregationQueue, "aggregationQueue", Integer.MAX_VALUE);
- consoleReporter.stop();
- }
- });
-
createSchema(plan.getNodes(), plan.getCqlPort());
Session session = createSession(plan.getNodes(), plan.getCqlPort());
StorageSession storageSession = new StorageSession(session);
MetricsDAO metricsDAO = new MetricsDAO(storageSession, plan.getMetricsServerConfiguration());
- MetricsServer metricsServer = new MetricsServer();
+ final MetricsServer metricsServer = new MetricsServer();
metricsServer.setDAO(metricsDAO);
metricsServer.setConfiguration(plan.getMetricsServerConfiguration());
+ metricsServer.setAggregationBatchSize(plan.getAggregationBatchSize());
+ metricsServer.setUseAsyncAggregation(plan.getAggregationType() == SimulationPlan.AggregationType.ASYNC);
metricsServer.setDateTimeService(plan.getDateTimeService());
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown(collectors, "collectors", 5);
+ shutdown(readers, "readers", 5);
+ shutdown(aggregators, "aggregators", 1);
+ shutdown(aggregationQueue, "aggregationQueue", Integer.MAX_VALUE);
+ metricsServer.shutdown();
+ log.info("Wait for console reporter...");
+ try {
+ Thread.sleep(181000);
+ } catch (InterruptedException e) {
+ }
+ consoleReporter.stop();
+ }
+ });
+
MeasurementAggregator measurementAggregator = new MeasurementAggregator(metricsServer, this, metrics,
aggregationQueue);
@@ -156,6 +166,7 @@ public class Simulator implements ShutdownManager {
log.info("Creating schema");
SchemaManager schemaManager = new SchemaManager("rhqadmin", "1eeb2f255e832171df8592078de921bc", nodes,
cqlPort);
+ schemaManager.drop();
schemaManager.install();
} catch (Exception e) {
throw new RuntimeException("Failed to start simulator. An error occurred during schema creation.", e);
@@ -167,6 +178,11 @@ public class Simulator implements ShutdownManager {
Cluster cluster = new ClusterBuilder().addContactPoints(nodes).withPort(cqlPort)
.withCredentials("rhqadmin", "rhqadmin")
.build();
+ PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, 24);
+ poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, 24);
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, 32);
+ poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, 32);
log.debug("Created cluster object with " + cluster.getConfiguration().getProtocolOptions().getCompression()
+ " compression.");
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java
index 33b3bbf..7ad50d1 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlan.java
@@ -56,6 +56,26 @@ public class SimulationPlan {
}
}
+ public static enum AggregationType {
+ SYNC("sync"), ASYNC("async");
+
+ private final String text;
+
+ AggregationType(String text) {
+ this.text = text;
+ }
+
+ public static AggregationType fromText(String text) {
+ if (text.equals("sync")) {
+ return SYNC;
+ }
+ if (text.equals("async")) {
+ return ASYNC;
+ }
+ throw new IllegalArgumentException(text + " is not a valid aggregation type");
+ }
+ }
+
private long collectionInterval;
private long aggregationInterval;
@@ -84,6 +104,10 @@ public class SimulationPlan {
private long simulationRate;
+ private int aggregationBatchSize;
+
+ private AggregationType aggregationType;
+
public DateTimeService getDateTimeService() {
return dateTimeService;
}
@@ -195,4 +219,20 @@ public class SimulationPlan {
public void setSimulationRate(long simulationRate) {
this.simulationRate = simulationRate;
}
+
+ public int getAggregationBatchSize() {
+ return aggregationBatchSize;
+ }
+
+ public void setAggregationBatchSize(int aggregationBatchSize) {
+ this.aggregationBatchSize = aggregationBatchSize;
+ }
+
+ public AggregationType getAggregationType() {
+ return aggregationType;
+ }
+
+ public void setAggregationType(AggregationType aggregationType) {
+ this.aggregationType = aggregationType;
+ }
}
diff --git a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java
index e601212..5b3248d 100644
--- a/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java
+++ b/modules/helpers/metrics-simulator/src/main/java/org/rhq/metrics/simulator/plan/SimulationPlanner.java
@@ -98,6 +98,9 @@ public class SimulationPlanner {
simulation.setNodes(nodes);
simulation.setCqlPort(getInt(root.get("cqlPort"), 9142));
+ simulation.setAggregationBatchSize(getInt(root.get("aggregationBatchSize"), 25));
+ simulation.setAggregationType(SimulationPlan.AggregationType.fromText(getString(root.get("aggregationType"),
+ "sync")));
return simulation;
}
diff --git a/modules/helpers/metrics-simulator/src/main/resources/conf/log4j.properties b/modules/helpers/metrics-simulator/src/main/resources/conf/log4j.properties
index 58fa73c..b8c1c66 100644
--- a/modules/helpers/metrics-simulator/src/main/resources/conf/log4j.properties
+++ b/modules/helpers/metrics-simulator/src/main/resources/conf/log4j.properties
@@ -36,4 +36,6 @@ log4j.appender.FILE.Append=false
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p [%t] (%c{5}) - %m%n
-log4j.logger.org.rhq=DEBUG
\ No newline at end of file
+
+log4j.logger.org.rhq=DEBUG
+log4j.logger.com.datastax=DEBUG