modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
| 109 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
| 66 -
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
| 62 -
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
| 116 ---
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
| 255 ------
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
| 331 --------
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
| 104 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
| 91 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
| 97 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
| 4
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
| 127 +++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
| 84 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
| 73 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
| 133 +++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
| 257 ++++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
| 378 ++++++++++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
| 113 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
| 99 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
| 106 ++
modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
| 1
20 files changed, 1373 insertions(+), 1233 deletions(-)
New commits:
commit 1339dad67f4d008561bbb99b7fd5bec8bc90eca3
Author: John Sanda <jsanda(a)redhat.com>
Date: Thu Dec 19 13:41:10 2013 -0500
[BZ 1009945] turn on async aggregation by default
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 7f5b639..d319146 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -92,7 +92,7 @@ public class MetricsServer {
private int aggregationBatchSize;
- private boolean useAsyncAggregation =
System.getProperty("rhq.metrics.aggregation.async") != null;
+ private boolean useAsyncAggregation =
Boolean.valueOf(System.getProperty("rhq.metrics.aggregation.async",
"true"));
public void setDAO(MetricsDAO dao) {
this.dao = dao;
commit 0852a2f987de1af4c409067da2a13b6064a640bc
Author: John Sanda <jsanda(a)redhat.com>
Date: Thu Dec 19 11:11:50 2013 -0500
[BZ 1009945] updating test with package refactoring
diff --git
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
index 00803db..7d66cef 100644
---
a/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
+++
b/modules/enterprise/server/server-metrics/src/test/java/org/rhq/server/metrics/AggregationTests.java
@@ -26,6 +26,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.rhq.core.domain.measurement.MeasurementDataNumeric;
+import org.rhq.server.metrics.aggregation.Aggregator;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
import org.rhq.server.metrics.domain.MetricsIndexEntry;
commit 4da0ad97bbcb2e36c70d47a96e8345825bb72615
Author: John Sanda <jsanda(a)redhat.com>
Date: Thu Dec 19 11:05:14 2013 -0500
[BZ 1009945] cleaning up logging and adding some javadocs
Also moving all aggregation related classes into the
org.rhq.server.metrics.aggregation
package. All classes except Aggregator now have package-level access.
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
deleted file mode 100644
index 4a5e78b..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate1HourData.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted for
the batch, aggregation of 6 hour
- * data will start immediately for the batch if the 24 hour time slice has finished.
- *
- * @see Compute6HourData
- * @author John Sanda
- */
-public class Aggregate1HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate1HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
- ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new
FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
- log.error("An error occurred while fetching one hour data",
t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures =
Futures.transform(queriesFuture,
- state.getCompute6HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- log.debug("Finished aggregating 1 hour data for " +
result.size() + " schedules in " +
- (System.currentTimeMillis() - start) + " ms");
- start6HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to aggregate 1 hour data", t);
- start6HourDataAggregationIfNecessary();
- }
- });
- }
-
- private void start6HourDataAggregationIfNecessary() {
- try {
- if (state.is24HourTimeSliceFinished()) {
- update6HourIndexEntries();
- List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate6HourData(dao, state,
scheduleIds, queryFutures));
- }
- } catch (InterruptedException e) {
- log.debug("An interrupt occurred while waiting for 6 hour data index
entries. Aborting data aggregation",
- e);
- log.info("An interrupt occurred while waiting for 6 hour data index
entries. Aborting data aggregation: " +
- e.getMessage());
- } finally {
- state.getRemaining1HourData().addAndGet(-scheduleIds.size());
- }
- }
-
- private void update6HourIndexEntries() throws InterruptedException {
- try {
- state.getSixHourIndexEntriesArrival().await();
- try {
- state.getSixHourIndexEntriesLock().writeLock().lock();
- state.getSixHourIndexEntries().removeAll(scheduleIds);
- } finally {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 6 hour data because we do not need the index
- // here since we already have 6 hour data to aggregate along with the
- // schedule ids.
- }
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
deleted file mode 100644
index fc8c0cb..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregate6HourData.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * @author John Sanda
- */
-public class Aggregate6HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate6HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
- ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new
FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
- log.error("An error occurred while fetching 6 hour data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures =
Futures.transform(queriesFuture,
- state.getCompute24HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- log.debug("Finished aggregating 6 hour data for " +
result.size() + " schedules in " +
- (System.currentTimeMillis() - start) + " ms");
- state.getRemaining6HourData().addAndGet(-scheduleIds.size());
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to aggregate 6 hour data", t);
- state.getRemaining6HourData().addAndGet(-scheduleIds.size());
- }
- });
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
deleted file mode 100644
index 1eb9e53..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateIndexEntriesHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.FutureCallback;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
-* @author John Sanda
-*/
-class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
-
- private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
-
- private Set<Integer> indexEntries;
-
- private AtomicInteger remainingData;
-
- private SignalingCountDownLatch indexEntriesArrival;
-
- private long startTime;
-
- private String src;
-
- private String dest;
-
- public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger
remainingData,
- SignalingCountDownLatch indexEntriesArrival, long startTime, String src, String
dest) {
- this.indexEntries = indexEntries;
- this.remainingData = remainingData;
- this.indexEntriesArrival = indexEntriesArrival;
- this.startTime = startTime;
- this.src = src;
- this.dest = dest;
- }
-
- @Override
- public void onSuccess(ResultSet resultSet) {
- for (Row row : resultSet) {
- indexEntries.add(row.getInt(1));
- }
- remainingData.set(indexEntries.size());
- indexEntriesArrival.countDown();
- if (log.isDebugEnabled()) {
- log.debug("Finished loading " + indexEntries.size() + " "
+ src + " index entries in " +
- (System.currentTimeMillis() - startTime) + " ms");
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to retrieve " + src + " index entries. Some
" + dest + " aggregates may not get generated.",
- t);
- remainingData.set(0);
- indexEntriesArrival.abort();
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
deleted file mode 100644
index dbfc289..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregateRawData.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * Generates 1 hour data for a batch of raw data futures. After data is inserted for the
batch, aggregation of 1 hour
- * data will start immediately for the batch if the 6 hour time slice has finished.
- *
- * @see Compute1HourData
- * @author John Sanda
- */
-public class AggregateRawData implements Runnable {
-
- private final Log log = LogFactory.getLog(AggregateRawData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final long start = System.currentTimeMillis();
- ListenableFuture<List<ResultSet>> rawDataFutures =
Futures.successfulAsList(queryFutures);
- Futures.withFallback(rawDataFutures, new
FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
- log.error("An error occurred while fetching raw data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
-
- final ListenableFuture<List<ResultSet>> insert1HourDataFutures =
Futures.transform(rawDataFutures,
- state.getCompute1HourData(), state.getAggregationTasks());
- Futures.addCallback(insert1HourDataFutures, new
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> resultSets) {
- log.debug("Finished aggregating raw data for " +
resultSets.size() + " schedules in " +
- (System.currentTimeMillis() - start) + " ms");
- start1HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to aggregate raw data", t);
- // TODO maybe add debug statement to log those schedule ids for which
aggregation failed
- start1HourDataAggregationIfNecessary();
- }
- }, state.getAggregationTasks());
- }
-
- private void start1HourDataAggregationIfNecessary() {
- try {
- if (state.is6HourTimeSliceFinished()) {
- update1HourIndexEntries();
- List<StorageResultSetFuture> oneHourDataQueryFutures = new
ArrayList<StorageResultSetFuture>(
- scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
- state.getSixHourTimeSlice().getMillis(),
state.getSixHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate1HourData(dao, state,
scheduleIds,
- oneHourDataQueryFutures));
- }
- } catch (InterruptedException e) {
- log.debug("An interrupt occurred while waiting for 1 hour data index
entries. Aborting data aggregation",
- e);
- log.info("An interrupt occurred while waiting for 1 hour data index
entries. Aborting data aggregation: " +
- e.getMessage());
- } finally {
- state.getRemainingRawData().addAndGet(-scheduleIds.size());
- }
- }
-
- private void update1HourIndexEntries() throws InterruptedException {
- try {
- // Wait for the arrival so that we can remove the schedules ids in this
- // batch from the one hour index entries. This will prevent duplicate tasks
- // being submitted to process the same 1 hour data.
- state.getOneHourIndexEntriesArrival().await();
- try {
- state.getOneHourIndexEntriesLock().writeLock().lock();
- state.getOneHourIndexEntries().removeAll(scheduleIds);
- } finally {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 1 hour data because we do not need the index
- // here since we already have 1 hour data to aggregate along with the
- // schedule ids.
- }
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
deleted file mode 100644
index bd1ce0d..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/AggregationState.java
+++ /dev/null
@@ -1,255 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-import org.joda.time.DateTime;
-
-/**
- * @author John Sanda
- */
-public class AggregationState {
-
- private ListeningExecutorService aggregationTasks;
-
- private SignalingCountDownLatch oneHourIndexEntriesArrival;
-
- private SignalingCountDownLatch sixHourIndexEntriesArrival;
-
- private AtomicInteger remainingRawData;
-
- private AtomicInteger remaining1HourData;
-
- private AtomicInteger remaining6HourData;
-
- private Set<Integer> oneHourIndexEntries;
-
- private Set<Integer> sixHourIndexEntries;
-
- private ReentrantReadWriteLock oneHourIndexEntriesLock;
-
- private ReentrantReadWriteLock sixHourIndexEntriesLock;
-
- private DateTime oneHourTimeSlice;
-
- private DateTime sixHourTimeSlice;
-
- private DateTime sixHourTimeSliceEnd;
-
- private DateTime twentyFourHourTimeSlice;
-
- private DateTime twentyFourHourTimeSliceEnd;
-
- private boolean sixHourTimeSliceFinished;
-
- private boolean twentyFourHourTimeSliceFinished;
-
- private Compute1HourData compute1HourData;
-
- private Compute6HourData compute6HourData;
-
- private Compute24HourData compute24HourData;
-
- public ListeningExecutorService getAggregationTasks() {
- return aggregationTasks;
- }
-
- public AggregationState setAggregationTasks(ListeningExecutorService
aggregationTasks) {
- this.aggregationTasks = aggregationTasks;
- return this;
- }
-
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries
for schedules with 1 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
- return oneHourIndexEntriesArrival;
- }
-
- public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch
oneHourIndexEntriesArrival) {
- this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
- return this;
- }
-
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries
for schedules with 6 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
- return sixHourIndexEntriesArrival;
- }
-
- public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch
sixHourIndexEntriesArrival) {
- this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
- return this;
- }
-
- /**
- * @return The remaining number of schedules with raw data to be aggregated
- */
- public AtomicInteger getRemainingRawData() {
- return remainingRawData;
- }
-
- public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
- this.remainingRawData = remainingRawData;
- return this;
- }
-
- /**
- * @return The remaining number of schedules with 1 hour data to be aggregated
- */
- public AtomicInteger getRemaining1HourData() {
- return remaining1HourData;
- }
-
- public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) {
- this.remaining1HourData = remaining1HourData;
- return this;
- }
-
- /**
- * @return The remaining number of schedules with 6 hour data to be aggregated
- */
- public AtomicInteger getRemaining6HourData() {
- return remaining6HourData;
- }
-
- public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) {
- this.remaining6HourData = remaining6HourData;
- return this;
- }
-
- /**
- * @return The schedule ids with 1 hour data to be aggregated
- */
- public Set<Integer> getOneHourIndexEntries() {
- return oneHourIndexEntries;
- }
-
- public AggregationState setOneHourIndexEntries(Set<Integer>
oneHourIndexEntries) {
- this.oneHourIndexEntries = oneHourIndexEntries;
- return this;
- }
-
- public Set<Integer> getSixHourIndexEntries() {
- return sixHourIndexEntries;
- }
-
- public AggregationState setSixHourIndexEntries(Set<Integer>
sixHourIndexEntries) {
- this.sixHourIndexEntries = sixHourIndexEntries;
- return this;
- }
-
- public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
- return oneHourIndexEntriesLock;
- }
-
- public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock
oneHourIndexEntriesLock) {
- this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
- return this;
- }
-
- public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
- return sixHourIndexEntriesLock;
- }
-
- public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock
sixHourIndexEntriesLock) {
- this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
- return this;
- }
-
- public DateTime getOneHourTimeSlice() {
- return oneHourTimeSlice;
- }
-
- public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
- this.oneHourTimeSlice = oneHourTimeSlice;
- return this;
- }
-
- public DateTime getSixHourTimeSlice() {
- return sixHourTimeSlice;
- }
-
- public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
- this.sixHourTimeSlice = sixHourTimeSlice;
- return this;
- }
-
- public DateTime getSixHourTimeSliceEnd() {
- return sixHourTimeSliceEnd;
- }
-
- public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
- this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
- return this;
- }
-
- public DateTime getTwentyFourHourTimeSlice() {
- return twentyFourHourTimeSlice;
- }
-
- public AggregationState setTwentyFourHourTimeSlice(DateTime twentyFourHourTimeSlice)
{
- this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
- return this;
- }
-
- public DateTime getTwentyFourHourTimeSliceEnd() {
- return twentyFourHourTimeSliceEnd;
- }
-
- public AggregationState setTwentyFourHourTimeSliceEnd(DateTime
twentyFourHourTimeSliceEnd) {
- this.twentyFourHourTimeSliceEnd = twentyFourHourTimeSliceEnd;
- return this;
- }
-
- public boolean is6HourTimeSliceFinished() {
- return sixHourTimeSliceFinished;
- }
-
- public AggregationState set6HourTimeSliceFinished(boolean is6HourTimeSliceFinished)
{
- this.sixHourTimeSliceFinished = is6HourTimeSliceFinished;
- return this;
- }
-
- public boolean is24HourTimeSliceFinished() {
- return twentyFourHourTimeSliceFinished;
- }
-
- public AggregationState set24HourTimeSliceFinished(boolean is24HourTimeSliceFinished)
{
- this.twentyFourHourTimeSliceFinished = is24HourTimeSliceFinished;
- return this;
- }
-
- public Compute1HourData getCompute1HourData() {
- return compute1HourData;
- }
-
- public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
- this.compute1HourData = compute1HourData;
- return this;
- }
-
- public Compute6HourData getCompute6HourData() {
- return compute6HourData;
- }
-
- public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
- this.compute6HourData = compute6HourData;
- return this;
- }
-
- public Compute24HourData getCompute24HourData() {
- return compute24HourData;
- }
-
- public AggregationState setCompute24HourData(Compute24HourData compute24HourData) {
- this.compute24HourData = compute24HourData;
- return this;
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
deleted file mode 100644
index 4ff222a..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Aggregator.java
+++ /dev/null
@@ -1,331 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeComparator;
-import org.joda.time.Duration;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Aggregator {
-
- private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR =
new Comparator<AggregateNumericMetric>() {
- @Override
- public int compare(AggregateNumericMetric left, AggregateNumericMetric right) {
- return (left.getScheduleId() < right.getScheduleId()) ? -1 :
((left.getScheduleId() == right.getScheduleId()) ? 0 : 1);
- }
- };
-
- private final Log log = LogFactory.getLog(Aggregator.class);
-
- private MetricsDAO dao;
-
- private MetricsConfiguration configuration;
-
- private DateTimeService dtService;
-
- private DateTime startTime;
-
- /**
- * Signals when raw data index entries (in metrics_index) can be deleted. We cannot
delete the row in metrics_index
- * until we know that it has been read, successfully or otherwise.
- */
- private SignalingCountDownLatch rawDataIndexEntriesArrival;
-
- private RateLimiter readPermits;
- private RateLimiter writePermits;
-
- private int batchSize;
-
- private AggregationState state;
-
- private Set<AggregateNumericMetric> oneHourData;
-
- private AtomicInteger remainingIndexEntries;
-
- public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao,
MetricsConfiguration configuration,
- DateTimeService dtService, DateTime startTime, int batchSize, RateLimiter
writePermits,
- RateLimiter readPermits) {
- this.dao = dao;
- this.configuration = configuration;
- this.dtService = dtService;
- this.startTime = startTime;
- this.readPermits = readPermits;
- this.writePermits = writePermits;
- this.batchSize = batchSize;
- oneHourData = new
ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
- rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1));
- remainingIndexEntries = new AtomicInteger(1);
-
- DateTime sixHourTimeSlice = get6HourTimeSlice();
- DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
-
- state = new AggregationState()
- .setAggregationTasks(aggregationTasks)
- .setOneHourTimeSlice(startTime)
- .setSixHourTimeSlice(sixHourTimeSlice)
-
.setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
- .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
-
.setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
- .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice,
writePermits, dao, oneHourData))
- .setCompute6HourData(new Compute6HourData(sixHourTimeSlice,
twentyFourHourTimeSlice, writePermits, dao))
- .setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice,
writePermits, dao))
- .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice,
configuration.getOneHourTimeSliceDuration()))
- .set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice,
- configuration.getSixHourTimeSliceDuration()))
- .setRemainingRawData(new AtomicInteger(0))
- .setRemaining1HourData(new AtomicInteger(0))
- .setRemaining6HourData(new AtomicInteger(0))
- .setOneHourIndexEntries(new TreeSet<Integer>())
- .setSixHourIndexEntries(new TreeSet<Integer>())
- .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
- .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
-
- if (state.is6HourTimeSliceFinished()) {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(0)));
- state.setRemaining1HourData(new AtomicInteger(0));
- }
-
- if (state.is24HourTimeSliceFinished()) {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(0)));
- state.setRemaining6HourData(new AtomicInteger(0));
- }
- }
-
- private DateTime get24HourTimeSlice() {
- return dtService.getTimeSlice(startTime,
configuration.getSixHourTimeSliceDuration());
- }
-
- private DateTime get6HourTimeSlice() {
- return dtService.getTimeSlice(startTime,
configuration.getOneHourTimeSliceDuration());
- }
-
- private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
- DateTime endTime = startTime.plus(duration);
- return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= 0;
- }
-
- protected DateTime currentHour() {
- return dtService.getTimeSlice(dtService.now(),
configuration.getRawTimeSliceDuration());
- }
-
- public Set<AggregateNumericMetric> run() {
- log.info("Starting aggregation for time slice " + startTime);
- readPermits.acquire();
- StorageResultSetFuture rawFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
- startTime.getMillis());
- Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet result) {
- List<Row> rows = result.all();
- state.getRemainingRawData().set(rows.size());
- rawDataIndexEntriesArrival.countDown();
-
- log.debug("Starting raw data aggregation for " + rows.size() +
" schedules");
- long start = System.currentTimeMillis();
- final DateTime endTime =
startTime.plus(configuration.getRawTimeSliceDuration());
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- List<StorageResultSetFuture> rawDataFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- for (final Row row : rows) {
- scheduleIds.add(row.getInt(1));
- readPermits.acquire();
- rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1),
startTime.getMillis(),
- endTime.getMillis()));
- if (rawDataFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new AggregateRawData(dao,
state, scheduleIds,
- rawDataFutures));
- rawDataFutures = new ArrayList<StorageResultSetFuture>();
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!rawDataFutures.isEmpty()) {
- state.getAggregationTasks().submit(new AggregateRawData(dao, state,
scheduleIds,
- rawDataFutures));
- }
- log.debug("Finished processing one hour index entries in " +
(System.currentTimeMillis() - start) +
- " ms");
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to retrieve raw data index entries. Raw data
aggregation for time slice [" +
- startTime + "] cannot proceed.", t);
- state.setRemainingRawData(new AtomicInteger(0));
- rawDataIndexEntriesArrival.abort();
- deleteIndexEntries(MetricsTable.ONE_HOUR);
- }
- }, state.getAggregationTasks());
-
- if (state.is6HourTimeSliceFinished()) {
- long start = System.currentTimeMillis();
- log.debug("Fetching 1 hour index entries");
- StorageResultSetFuture oneHourFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
- state.getSixHourTimeSlice().getMillis());
- Futures.addCallback(oneHourFuture, new
AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
- state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(),
start, "1 hour", "6 hour"),
- state.getAggregationTasks());
- }
-
- if (state.is24HourTimeSliceFinished()) {
- long start = System.currentTimeMillis();
- log.debug("Fetching 6 hour index entries");
- StorageResultSetFuture sixHourFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
- state.getTwentyFourHourTimeSlice().getMillis());
- Futures.addCallback(sixHourFuture, new
AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
- state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(),
start, "6 hour", "24 hour"),
- state.getAggregationTasks());
- }
-
- try {
- try {
- rawDataIndexEntriesArrival.await();
- deleteIndexEntries(MetricsTable.ONE_HOUR);
- } catch (AbortedException e) {
- }
-
- if (state.is6HourTimeSliceFinished()) {
- waitFor(state.getRemainingRawData());
- try {
- state.getOneHourIndexEntriesArrival().await();
- deleteIndexEntries(MetricsTable.SIX_HOUR);
-
- List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getOneHourIndexEntriesLock().writeLock().lock();
- for (Integer scheduleId : state.getOneHourIndexEntries()) {
- queryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
state.getSixHourTimeSlice().getMillis(),
- state.getSixHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new
Aggregate1HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- state.getAggregationTasks().submit(new Aggregate1HourData(dao,
state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- log.warn("Failed to load 1 hour index entries. Some 6 hour
aggregates may not get generated.", e);
- } finally {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- }
-
- if (state.is24HourTimeSliceFinished()) {
- waitFor(state.getRemaining1HourData());
- try {
- state.getSixHourIndexEntriesArrival().await();
- deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
-
- List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getSixHourIndexEntriesLock().writeLock().lock();
- for (Integer scheduleId : state.getSixHourIndexEntries()) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new
Aggregate6HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- log.debug("Submitting 6 hour aggregation task for schedule
ids " + scheduleIds);
- state.getAggregationTasks().submit(new Aggregate6HourData(dao,
state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- log.warn("Failed to load 6 hour index entries. Some 24 hour
aggregates may not get generated.", e);
- } finally {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- }
-
- while (!isAggregationFinished()) {
- Thread.sleep(50);
- }
- } catch (InterruptedException e) {
- log.warn("An interrupt occurred while waiting for aggregation to
finish", e);
- }
- return oneHourData;
- }
-
- private void waitFor(AtomicInteger remainingData) throws InterruptedException {
- while (remainingData.get() > 0) {
- Thread.sleep(50);
- }
- }
-
- private boolean isAggregationFinished() {
- return state.getRemainingRawData().get() <= 0 &&
state.getRemaining1HourData().get() <= 0 &&
- state.getRemaining6HourData().get() <= 0 &&
remainingIndexEntries.get() <= 0;
- }
-
- private void deleteIndexEntries(final MetricsTable table) {
- final DateTime time;
- switch (table) {
- case ONE_HOUR:
- time = startTime;
- break;
- case SIX_HOUR:
- time = state.getSixHourTimeSlice();
- break;
- default:
- time = state.getTwentyFourHourTimeSlice();
- break;
- }
- log.debug("Deleting " + table + " index entries for time slice
" + time);
- writePermits.acquire();
- StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table,
time.getMillis());
- Futures.addCallback(future, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet result) {
- remainingIndexEntries.decrementAndGet();
- }
-
- @Override
- public void onFailure(Throwable t) {
- log.warn("Failed to delete index entries for table " + table +
" at time [" + time + "]");
- remainingIndexEntries.decrementAndGet();
- }
- });
- }
-
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
deleted file mode 100644
index 7146a92..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute1HourData.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Compute1HourData implements AsyncFunction<List<ResultSet>,
List<ResultSet>> {
-
- private final Log log = LogFactory.getLog(Compute1HourData.class);
-
- private DateTime startTime;
-
- private RateLimiter writePermits;
-
- private MetricsDAO dao;
-
- private DateTime sixHourTimeSlice;
-
- private Set<AggregateNumericMetric> oneHourData;
-
- public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, RateLimiter
writePermits, MetricsDAO dao,
- Set<AggregateNumericMetric> oneHourData) {
- this.startTime = startTime;
- this.sixHourTimeSlice = sixHourTimeSlice;
- this.writePermits = writePermits;
- this.dao = dao;
- this.oneHourData = oneHourData;
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> apply(List<ResultSet>
rawDataResultSets) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Computing and storing 1 hour data for " +
rawDataResultSets.size() + " schedules");
- }
- long start = System.currentTimeMillis();
- try {
- List<StorageResultSetFuture> insertFutures = new
ArrayList<StorageResultSetFuture>(rawDataResultSets.size());
- for (ResultSet resultSet : rawDataResultSets) {
- AggregateNumericMetric aggregate = calculateAggregatedRaw(resultSet);
- oneHourData.add(aggregate);
- writePermits.acquire(4);
- insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.MIN, aggregate.getMin()));
- insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.MAX, aggregate.getMax()));
- insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.AVG, aggregate.getAvg()));
- insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR,
aggregate.getScheduleId(),
- sixHourTimeSlice.getMillis()));
- }
- return Futures.successfulAsList(insertFutures);
- } finally {
- if (log.isDebugEnabled()) {
- log.debug("Finished computing and storing 1 hour data for " +
rawDataResultSets.size() +
- " schedules in " + (System.currentTimeMillis() - start) +
" ms");
- }
- }
- }
-
- private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
- double min = Double.NaN;
- double max = min;
- int count = 0;
- ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
- double value;
- List<Row> rows = resultSet.all();
-
- for (Row row : rows) {
- value = row.getDouble(2);
- if (count == 0) {
- min = value;
- max = min;
- }
- if (value < min) {
- min = value;
- } else if (value > max) {
- max = value;
- }
- mean.add(value);
- ++count;
- }
-
- return new AggregateNumericMetric(rows.get(0).getInt(0),
mean.getArithmeticMean(), min, max,
- startTime.getMillis());
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
deleted file mode 100644
index 6274bd3..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute24HourData.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-
-/**
- * @author John Sanda
- */
-public class Compute24HourData implements AsyncFunction<List<ResultSet>,
List<ResultSet>> {
-
- private final Log log = LogFactory.getLog(Compute24HourData.class);
-
- private DateTime startTime;
-
- private RateLimiter writePermits;
-
- private MetricsDAO dao;
-
- public Compute24HourData(DateTime startTime, RateLimiter writePermits, MetricsDAO
dao) {
- this.startTime = startTime;
- this.writePermits = writePermits;
- this.dao = dao;
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> apply(List<ResultSet>
sixHourDataResultSets) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Computing and storing 24 hour data for " +
sixHourDataResultSets.size() + " schedules");
- }
- long start = System.currentTimeMillis();
- try {
- List<StorageResultSetFuture> insertFutures =
- new
ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
- for (ResultSet resultSet : sixHourDataResultSets) {
- AggregateNumericMetric aggregate = calculateAggregate(resultSet);
- writePermits.acquire(3);
-
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.MIN, aggregate.getMin()));
-
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.MAX, aggregate.getMax()));
-
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.AVG, aggregate.getAvg()));
- }
- return Futures.successfulAsList(insertFutures);
- } finally {
- if (log.isDebugEnabled()) {
- log.debug("Finished computing and storing 24 hour data for " +
sixHourDataResultSets.size() +
- " schedules in " + (System.currentTimeMillis() - start) +
" ms");
- }
- }
- }
-
- private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
- double min = Double.NaN;
- double max = min;
- ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
- List<Row> rows = resultSet.all();
-
- for (int i = 0; i < rows.size(); i += 3) {
- if (i == 0) {
- min = rows.get(i + 1).getDouble(3);
- max = rows.get(i).getDouble(3);
- } else {
- if (rows.get(i + 1).getDouble(3) < min) {
- min = rows.get(i + 1).getDouble(3);
- }
- if (rows.get(i).getDouble(3) > max) {
- max = rows.get(i).getDouble(3);
- }
- }
- mean.add(rows.get(i + 2).getDouble(3));
- }
- return new AggregateNumericMetric(rows.get(0).getInt(0),
mean.getArithmeticMean(), min, max,
- startTime.getMillis());
- }
-
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
deleted file mode 100644
index a1efab5..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/Compute6HourData.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package org.rhq.server.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.joda.time.DateTime;
-
-import org.rhq.server.metrics.domain.AggregateNumericMetric;
-import org.rhq.server.metrics.domain.AggregateType;
-import org.rhq.server.metrics.domain.MetricsTable;
-
-/**
- * @author John Sanda
- */
-public class Compute6HourData implements AsyncFunction<List<ResultSet>,
List<ResultSet>> {
-
- private final Log log = LogFactory.getLog(Compute6HourData.class);
-
- private DateTime startTime;
-
- private RateLimiter writePermits;
-
- private MetricsDAO dao;
-
- private DateTime twentyFourHourTimeSlice;
-
- public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice,
RateLimiter writePermits,
- MetricsDAO dao) {
- this.startTime = startTime;
- this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
- this.writePermits = writePermits;
- this.dao = dao;
- }
-
- @Override
- public ListenableFuture<List<ResultSet>> apply(List<ResultSet>
oneHourDataResultSets) throws Exception {
- if (log.isDebugEnabled()) {
- log.debug("Computing and storing 6 hour data for " +
oneHourDataResultSets.size() + " schedules");
- }
- long start = System.currentTimeMillis();
- try {
- List<StorageResultSetFuture> insertFutures =
- new
ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
- for (ResultSet resultSet : oneHourDataResultSets) {
- AggregateNumericMetric aggregate = calculateAggregate(resultSet);
- writePermits.acquire(4);
- insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.MIN, aggregate.getMin()));
- insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.MAX, aggregate.getMax()));
- insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
- AggregateType.AVG, aggregate.getAvg()));
- insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR,
aggregate.getScheduleId(),
- twentyFourHourTimeSlice.getMillis()));
- }
- return Futures.successfulAsList(insertFutures);
- } finally {
- if (log.isDebugEnabled()) {
- log.debug("Finished computing and storing 6 hour data for " +
oneHourDataResultSets.size() +
- " schedules in " + (System.currentTimeMillis() - start) +
" ms");
- }
- }
- }
-
- private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
- double min = Double.NaN;
- double max = min;
- ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
- List<Row> rows = resultSet.all();
-
- for (int i = 0; i < rows.size(); i += 3) {
- if (i == 0) {
- min = rows.get(i + 1).getDouble(3);
- max = rows.get(i).getDouble(3);
- } else {
- if (rows.get(i + 1).getDouble(3) < min) {
- min = rows.get(i + 1).getDouble(3);
- }
- if (rows.get(i).getDouble(3) > max) {
- max = rows.get(i).getDouble(3);
- }
- }
- mean.add(rows.get(i + 2).getDouble(3));
- }
- return new AggregateNumericMetric(rows.get(0).getInt(0),
mean.getArithmeticMean(), min, max,
- startTime.getMillis());
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 0dc4605..7f5b639 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +55,7 @@ import org.joda.time.Duration;
import org.rhq.core.domain.measurement.MeasurementDataNumeric;
import org.rhq.core.domain.measurement.composite.MeasurementDataNumericHighLowComposite;
+import org.rhq.server.metrics.aggregation.Aggregator;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.AggregateType;
import org.rhq.server.metrics.domain.MetricsIndexEntry;
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
new file mode 100644
index 0000000..1698b28
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
@@ -0,0 +1,127 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted for
the batch, aggregation of 6 hour
+ * data will start immediately for the batch if the 24 hour time slice has finished.
+ *
+ * @see Compute6HourData
+ * @author John Sanda
+ */
+class Aggregate1HourData implements Runnable {
+
+ private final Log log = LogFactory.getLog(Aggregate1HourData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
+ List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(queryFutures);
+ Futures.withFallback(queriesFuture, new
FutureFallback<List<ResultSet>>() {
+ @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
+ log.error("An error occurred while fetching one hour data",
t);
+ return Futures.immediateFailedFuture(t);
+ }
+ });
+ ListenableFuture<List<ResultSet>> computeFutures =
Futures.transform(queriesFuture,
+ state.getCompute6HourData(), state.getAggregationTasks());
+ Futures.addCallback(computeFutures, new
FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> result) {
+ stopwatch.stop();
+ log.debug("Finished aggregating 1 hour data for " +
result.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ start6HourDataAggregationIfNecessary();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate 1 hour data for " +
scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred.", t);
+ } else {
+ log.warn("Failed to aggregate 1 hour data for " +
scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
+ }
+ start6HourDataAggregationIfNecessary();
+ }
+ });
+ }
+
+ private void start6HourDataAggregationIfNecessary() {
+ try {
+ if (state.is24HourTimeSliceFinished()) {
+ update6HourIndexEntries();
+ List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(scheduleIds.size());
+ for (Integer scheduleId : scheduleIds) {
+ queryFutures.add(dao.findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
+ state.getTwentyFourHourTimeSliceEnd().getMillis()));
+ }
+ state.getAggregationTasks().submit(new Aggregate6HourData(dao, state,
scheduleIds, queryFutures));
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for 6 hour data index
entries. Aborting data aggregation",
+ e);
+ } else {
+ log.info("An interrupt occurred while waiting for 6 hour data index
entries. Aborting data " +
+ "aggregation: " + e.getMessage());
+ }
+ } finally {
+ state.getRemaining1HourData().addAndGet(-scheduleIds.size());
+ }
+ }
+
+ private void update6HourIndexEntries() throws InterruptedException {
+ try {
+ state.getSixHourIndexEntriesArrival().await();
+ try {
+ state.getSixHourIndexEntriesLock().writeLock().lock();
+ state.getSixHourIndexEntries().removeAll(scheduleIds);
+ } finally {
+ state.getSixHourIndexEntriesLock().writeLock().unlock();
+ }
+ } catch (AbortedException e) {
+ // This means we failed to retrieve the index entries. We can however
+ // continue generating 6 hour data because we do not need the index
+ // here since we already have 6 hour data to aggregate along with the
+ // schedule ids.
+ }
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
new file mode 100644
index 0000000..fbd5057
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
@@ -0,0 +1,84 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 24 hour data for a batch of 1 hour data futures. After data is inserted for
the batch, aggregation of 6
+ * hour data will start immediately for the batch if the 24 hour time slice has
finished.
+ *
+ * @see Compute24HourData
+ * @author John Sanda
+ */
+class Aggregate6HourData implements Runnable {
+
+ private final Log log = LogFactory.getLog(Aggregate6HourData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
+ List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(queryFutures);
+ Futures.withFallback(queriesFuture, new
FutureFallback<List<ResultSet>>() {
+ @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
+ log.error("An error occurred while fetching 6 hour data", t);
+ return Futures.immediateFailedFuture(t);
+ }
+ });
+ ListenableFuture<List<ResultSet>> computeFutures =
Futures.transform(queriesFuture,
+ state.getCompute24HourData(), state.getAggregationTasks());
+ Futures.addCallback(computeFutures, new
FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> result) {
+ stopwatch.stop();
+ log.debug("Finished aggregating 6 hour data for " +
result.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate 6 hour data for " +
scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred.", t);
+ } else {
+ log.warn("Failed to aggregate 6 hour data for " +
scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
+ }
+ state.getRemaining6HourData().addAndGet(-scheduleIds.size());
+ }
+ });
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
new file mode 100644
index 0000000..cb64fcf
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
@@ -0,0 +1,73 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+* @author John Sanda
+*/
+class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
+
+ private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
+
+ private Set<Integer> indexEntries;
+
+ private AtomicInteger remainingData;
+
+ private SignalingCountDownLatch indexEntriesArrival;
+
+ private Stopwatch stopwatch;
+
+ private String src;
+
+ private String dest;
+
+ public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger
remainingData,
+ SignalingCountDownLatch indexEntriesArrival, Stopwatch stopwatch, String src,
String dest) {
+ this.indexEntries = indexEntries;
+ this.remainingData = remainingData;
+ this.indexEntriesArrival = indexEntriesArrival;
+ this.stopwatch = stopwatch;
+ this.src = src;
+ this.dest = dest;
+ }
+
+ @Override
+ public void onSuccess(ResultSet resultSet) {
+ for (Row row : resultSet) {
+ indexEntries.add(row.getInt(1));
+ }
+ remainingData.set(indexEntries.size());
+ indexEntriesArrival.countDown();
+ stopwatch.stop();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished loading " + indexEntries.size() + " "
+ src + " index entries in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some " + dest + " aggregates may not get computed.
An unexpected error occurred while " +
+ "retrieving " + src + " index entries", t);
+ } else {
+ log.warn("Some " + dest + " aggregates may not get computed.
An unexpected error occurred while " +
+ "retrieving " + src + " index entries: " +
ThrowableUtil.getRootMessage(t));
+ }
+ remainingData.set(0);
+ indexEntriesArrival.abort();
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
new file mode 100644
index 0000000..87a7266
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
@@ -0,0 +1,133 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * Generates 1 hour data for a batch of raw data futures. After data is inserted for the
batch, aggregation of 1 hour
+ * data will start immediately for the batch if the 6 hour time slice has finished.
+ *
+ * @see Compute1HourData
+ * @author John Sanda
+ */
+class AggregateRawData implements Runnable {
+
+ private final Log log = LogFactory.getLog(AggregateRawData.class);
+
+ private MetricsDAO dao;
+
+ private AggregationState state;
+
+ private Set<Integer> scheduleIds;
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
+ List<StorageResultSetFuture> queryFutures) {
+ this.dao = dao;
+ this.state = state;
+ this.scheduleIds = scheduleIds;
+ this.queryFutures = queryFutures;
+ }
+
+ @Override
+ public void run() {
+ final Stopwatch stopwatch = new Stopwatch().start();
+ ListenableFuture<List<ResultSet>> rawDataFutures =
Futures.successfulAsList(queryFutures);
+ Futures.withFallback(rawDataFutures, new
FutureFallback<List<ResultSet>>() {
+ @Override
+ public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
+ log.error("An error occurred while fetching raw data", t);
+ return Futures.immediateFailedFuture(t);
+ }
+ });
+
+ final ListenableFuture<List<ResultSet>> insert1HourDataFutures =
Futures.transform(rawDataFutures,
+ state.getCompute1HourData(), state.getAggregationTasks());
+ Futures.addCallback(insert1HourDataFutures, new
FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> resultSets) {
+ stopwatch.stop();
+ log.debug("Finished aggregating raw data for " +
resultSets.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ start1HourDataAggregationIfNecessary();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ // TODO should we log the schedule ids?
+ log.debug("Failed to aggregate raw data for " +
scheduleIds.size() + " schedules. An unexpected " +
+ "error occurred.", t);
+ } else {
+ log.warn("Failed to aggregate raw data for " +
scheduleIds.size() + " schedules. An " +
+ "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
+ }
+ start1HourDataAggregationIfNecessary();
+ }
+ }, state.getAggregationTasks());
+ }
+
+ private void start1HourDataAggregationIfNecessary() {
+ try {
+ if (state.is6HourTimeSliceFinished()) {
+ update1HourIndexEntries();
+ List<StorageResultSetFuture> oneHourDataQueryFutures = new
ArrayList<StorageResultSetFuture>(
+ scheduleIds.size());
+ for (Integer scheduleId : scheduleIds) {
+ oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
+ state.getSixHourTimeSlice().getMillis(),
state.getSixHourTimeSliceEnd().getMillis()));
+ }
+ state.getAggregationTasks().submit(new Aggregate1HourData(dao, state,
scheduleIds,
+ oneHourDataQueryFutures));
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for 1 hour data index
entries. Aborting data aggregation",
+ e);
+ } else {
+ log.info("An interrupt occurred while waiting for 1 hour data index
entries. Aborting data " +
+ "aggregation: " + e.getMessage());
+ }
+ } finally {
+ state.getRemainingRawData().addAndGet(-scheduleIds.size());
+ }
+ }
+
+ private void update1HourIndexEntries() throws InterruptedException {
+ try {
+ // Wait for the arrival so that we can remove the schedules ids in this
+ // batch from the one hour index entries. This will prevent duplicate tasks
+ // being submitted to process the same 1 hour data.
+ state.getOneHourIndexEntriesArrival().await();
+ try {
+ state.getOneHourIndexEntriesLock().writeLock().lock();
+ state.getOneHourIndexEntries().removeAll(scheduleIds);
+ } finally {
+ state.getOneHourIndexEntriesLock().writeLock().unlock();
+ }
+ } catch (AbortedException e) {
+ // This means we failed to retrieve the index entries. We can however
+ // continue generating 1 hour data because we do not need the index
+ // here since we already have 1 hour data to aggregate along with the
+ // schedule ids.
+ }
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
new file mode 100644
index 0000000..345e53a
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
@@ -0,0 +1,257 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+
+/**
+ * @author John Sanda
+ */
+class AggregationState {
+
+ private ListeningExecutorService aggregationTasks;
+
+ private SignalingCountDownLatch oneHourIndexEntriesArrival;
+
+ private SignalingCountDownLatch sixHourIndexEntriesArrival;
+
+ private AtomicInteger remainingRawData;
+
+ private AtomicInteger remaining1HourData;
+
+ private AtomicInteger remaining6HourData;
+
+ private Set<Integer> oneHourIndexEntries;
+
+ private Set<Integer> sixHourIndexEntries;
+
+ private ReentrantReadWriteLock oneHourIndexEntriesLock;
+
+ private ReentrantReadWriteLock sixHourIndexEntriesLock;
+
+ private DateTime oneHourTimeSlice;
+
+ private DateTime sixHourTimeSlice;
+
+ private DateTime sixHourTimeSliceEnd;
+
+ private DateTime twentyFourHourTimeSlice;
+
+ private DateTime twentyFourHourTimeSliceEnd;
+
+ private boolean sixHourTimeSliceFinished;
+
+ private boolean twentyFourHourTimeSliceFinished;
+
+ private Compute1HourData compute1HourData;
+
+ private Compute6HourData compute6HourData;
+
+ private Compute24HourData compute24HourData;
+
+ public ListeningExecutorService getAggregationTasks() {
+ return aggregationTasks;
+ }
+
+ public AggregationState setAggregationTasks(ListeningExecutorService
aggregationTasks) {
+ this.aggregationTasks = aggregationTasks;
+ return this;
+ }
+
+ /**
+ * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries
for schedules with 1 hour
+ * data to be aggregated
+ */
+ public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
+ return oneHourIndexEntriesArrival;
+ }
+
+ public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch
oneHourIndexEntriesArrival) {
+ this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
+ return this;
+ }
+
+ /**
+ * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries
for schedules with 6 hour
+ * data to be aggregated
+ */
+ public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
+ return sixHourIndexEntriesArrival;
+ }
+
+ public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch
sixHourIndexEntriesArrival) {
+ this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with raw data to be aggregated
+ */
+ public AtomicInteger getRemainingRawData() {
+ return remainingRawData;
+ }
+
+ public AggregationState setRemainingRawData(AtomicInteger remainingRawData) {
+ this.remainingRawData = remainingRawData;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with 1 hour data to be aggregated
+ */
+ public AtomicInteger getRemaining1HourData() {
+ return remaining1HourData;
+ }
+
+ public AggregationState setRemaining1HourData(AtomicInteger remaining1HourData) {
+ this.remaining1HourData = remaining1HourData;
+ return this;
+ }
+
+ /**
+ * @return The remaining number of schedules with 6 hour data to be aggregated
+ */
+ public AtomicInteger getRemaining6HourData() {
+ return remaining6HourData;
+ }
+
+ public AggregationState setRemaining6HourData(AtomicInteger remaining6HourData) {
+ this.remaining6HourData = remaining6HourData;
+ return this;
+ }
+
+ /**
+ * @return The schedule ids with 1 hour data to be aggregated
+ */
+ public Set<Integer> getOneHourIndexEntries() {
+ return oneHourIndexEntries;
+ }
+
+ public AggregationState setOneHourIndexEntries(Set<Integer>
oneHourIndexEntries) {
+ this.oneHourIndexEntries = oneHourIndexEntries;
+ return this;
+ }
+
+ public Set<Integer> getSixHourIndexEntries() {
+ return sixHourIndexEntries;
+ }
+
+ public AggregationState setSixHourIndexEntries(Set<Integer>
sixHourIndexEntries) {
+ this.sixHourIndexEntries = sixHourIndexEntries;
+ return this;
+ }
+
+ public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
+ return oneHourIndexEntriesLock;
+ }
+
+ public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock
oneHourIndexEntriesLock) {
+ this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
+ return this;
+ }
+
+ public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
+ return sixHourIndexEntriesLock;
+ }
+
+ public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock
sixHourIndexEntriesLock) {
+ this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
+ return this;
+ }
+
+ public DateTime getOneHourTimeSlice() {
+ return oneHourTimeSlice;
+ }
+
+ public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
+ this.oneHourTimeSlice = oneHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getSixHourTimeSlice() {
+ return sixHourTimeSlice;
+ }
+
+ public AggregationState setSixHourTimeSlice(DateTime sixHourTimeSlice) {
+ this.sixHourTimeSlice = sixHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getSixHourTimeSliceEnd() {
+ return sixHourTimeSliceEnd;
+ }
+
+ public AggregationState setSixHourTimeSliceEnd(DateTime sixHourTimeSliceEnd) {
+ this.sixHourTimeSliceEnd = sixHourTimeSliceEnd;
+ return this;
+ }
+
+ public DateTime getTwentyFourHourTimeSlice() {
+ return twentyFourHourTimeSlice;
+ }
+
+ public AggregationState setTwentyFourHourTimeSlice(DateTime twentyFourHourTimeSlice)
{
+ this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
+ return this;
+ }
+
+ public DateTime getTwentyFourHourTimeSliceEnd() {
+ return twentyFourHourTimeSliceEnd;
+ }
+
+ public AggregationState setTwentyFourHourTimeSliceEnd(DateTime
twentyFourHourTimeSliceEnd) {
+ this.twentyFourHourTimeSliceEnd = twentyFourHourTimeSliceEnd;
+ return this;
+ }
+
+ public boolean is6HourTimeSliceFinished() {
+ return sixHourTimeSliceFinished;
+ }
+
+ public AggregationState set6HourTimeSliceFinished(boolean is6HourTimeSliceFinished)
{
+ this.sixHourTimeSliceFinished = is6HourTimeSliceFinished;
+ return this;
+ }
+
+ public boolean is24HourTimeSliceFinished() {
+ return twentyFourHourTimeSliceFinished;
+ }
+
+ public AggregationState set24HourTimeSliceFinished(boolean is24HourTimeSliceFinished)
{
+ this.twentyFourHourTimeSliceFinished = is24HourTimeSliceFinished;
+ return this;
+ }
+
+ public Compute1HourData getCompute1HourData() {
+ return compute1HourData;
+ }
+
+ public AggregationState setCompute1HourData(Compute1HourData compute1HourData) {
+ this.compute1HourData = compute1HourData;
+ return this;
+ }
+
+ public Compute6HourData getCompute6HourData() {
+ return compute6HourData;
+ }
+
+ public AggregationState setCompute6HourData(Compute6HourData compute6HourData) {
+ this.compute6HourData = compute6HourData;
+ return this;
+ }
+
+ public Compute24HourData getCompute24HourData() {
+ return compute24HourData;
+ }
+
+ public AggregationState setCompute24HourData(Compute24HourData compute24HourData) {
+ this.compute24HourData = compute24HourData;
+ return this;
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
new file mode 100644
index 0000000..bf0bc1a
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
@@ -0,0 +1,378 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeComparator;
+import org.joda.time.Duration;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.AbortedException;
+import org.rhq.server.metrics.DateTimeService;
+import org.rhq.server.metrics.MetricsConfiguration;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * This class provides the main interface for metric data aggregation.
+ *
+ * @author John Sanda
+ */
+public class Aggregator {
+
+ private static final Comparator<AggregateNumericMetric> AGGREGATE_COMPARATOR =
new Comparator<AggregateNumericMetric>() {
+ @Override
+ public int compare(AggregateNumericMetric left, AggregateNumericMetric right) {
+ return (left.getScheduleId() < right.getScheduleId()) ? -1 :
((left.getScheduleId() == right.getScheduleId()) ? 0 : 1);
+ }
+ };
+
+ private final Log log = LogFactory.getLog(Aggregator.class);
+
+ private MetricsDAO dao;
+
+ private MetricsConfiguration configuration;
+
+ private DateTimeService dtService;
+
+ private DateTime startTime;
+
+ /**
+ * Signals when raw data index entries (in metrics_index) can be deleted. We cannot
delete the row in metrics_index
+ * until we know that it has been read, successfully or otherwise.
+ */
+ private SignalingCountDownLatch rawDataIndexEntriesArrival;
+
+ private RateLimiter readPermits;
+ private RateLimiter writePermits;
+
+ private int batchSize;
+
+ private AggregationState state;
+
+ private Set<AggregateNumericMetric> oneHourData;
+
+ private AtomicInteger remainingIndexEntries;
+
+ public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao,
MetricsConfiguration configuration,
+ DateTimeService dtService, DateTime startTime, int batchSize, RateLimiter
writePermits,
+ RateLimiter readPermits) {
+ this.dao = dao;
+ this.configuration = configuration;
+ this.dtService = dtService;
+ this.startTime = startTime;
+ this.readPermits = readPermits;
+ this.writePermits = writePermits;
+ this.batchSize = batchSize;
+ oneHourData = new
ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
+ rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1));
+ remainingIndexEntries = new AtomicInteger(1);
+
+ DateTime sixHourTimeSlice = get6HourTimeSlice();
+ DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
+
+ state = new AggregationState()
+ .setAggregationTasks(aggregationTasks)
+ .setOneHourTimeSlice(startTime)
+ .setSixHourTimeSlice(sixHourTimeSlice)
+
.setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
+ .setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
+
.setTwentyFourHourTimeSliceEnd(twentyFourHourTimeSlice.plus(configuration.getSixHourTimeSliceDuration()))
+ .setCompute1HourData(new Compute1HourData(startTime, sixHourTimeSlice,
writePermits, dao, oneHourData))
+ .setCompute6HourData(new Compute6HourData(sixHourTimeSlice,
twentyFourHourTimeSlice, writePermits, dao))
+ .setCompute24HourData(new Compute24HourData(twentyFourHourTimeSlice,
writePermits, dao))
+ .set6HourTimeSliceFinished(hasTimeSliceEnded(sixHourTimeSlice,
configuration.getOneHourTimeSliceDuration()))
+ .set24HourTimeSliceFinished(hasTimeSliceEnded(twentyFourHourTimeSlice,
+ configuration.getSixHourTimeSliceDuration()))
+ .setRemainingRawData(new AtomicInteger(0))
+ .setRemaining1HourData(new AtomicInteger(0))
+ .setRemaining6HourData(new AtomicInteger(0))
+ .setOneHourIndexEntries(new TreeSet<Integer>())
+ .setSixHourIndexEntries(new TreeSet<Integer>())
+ .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
+ .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
+
+ if (state.is6HourTimeSliceFinished()) {
+ state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(1)));
+ remainingIndexEntries.incrementAndGet();
+ } else {
+ state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(0)));
+ state.setRemaining1HourData(new AtomicInteger(0));
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(1)));
+ remainingIndexEntries.incrementAndGet();
+ } else {
+ state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(0)));
+ state.setRemaining6HourData(new AtomicInteger(0));
+ }
+ }
+
+ private DateTime get24HourTimeSlice() {
+ return dtService.getTimeSlice(startTime,
configuration.getSixHourTimeSliceDuration());
+ }
+
+ private DateTime get6HourTimeSlice() {
+ return dtService.getTimeSlice(startTime,
configuration.getOneHourTimeSliceDuration());
+ }
+
+ private boolean hasTimeSliceEnded(DateTime startTime, Duration duration) {
+ DateTime endTime = startTime.plus(duration);
+ return DateTimeComparator.getInstance().compare(currentHour(), endTime) >= 0;
+ }
+
+ protected DateTime currentHour() {
+ return dtService.getTimeSlice(dtService.now(),
configuration.getRawTimeSliceDuration());
+ }
+
+ public Set<AggregateNumericMetric> run() {
+ log.info("Starting aggregation for time slice " + startTime);
+ readPermits.acquire();
+ StorageResultSetFuture rawFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
+ startTime.getMillis());
+ Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ List<Row> rows = result.all();
+ state.getRemainingRawData().set(rows.size());
+ rawDataIndexEntriesArrival.countDown();
+
+ Stopwatch stopwatch = new Stopwatch().start();
+
+ final DateTime endTime =
startTime.plus(configuration.getRawTimeSliceDuration());
+ Set<Integer> scheduleIds = new TreeSet<Integer>();
+ List<StorageResultSetFuture> rawDataFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
+ for (final Row row : rows) {
+ scheduleIds.add(row.getInt(1));
+ readPermits.acquire();
+ rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1),
startTime.getMillis(),
+ endTime.getMillis()));
+ if (rawDataFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new AggregateRawData(dao,
state, scheduleIds,
+ rawDataFutures));
+ rawDataFutures = new ArrayList<StorageResultSetFuture>();
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!rawDataFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new AggregateRawData(dao, state,
scheduleIds,
+ rawDataFutures));
+ }
+
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished scheduling raw data aggregation tasks for
" + rows.size() + " schedules in " +
+ stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aggregation for time slice [" + startTime +
"] cannot proceed. There was an " +
+ "unexpected error while retrieving raw data index
entries.", t);
+ } else {
+ log.warn("Aggregation for time slice [" + startTime +
"] cannot proceed. There was an " +
+ "unexpected error while retrieving raw data index entries:
" + ThrowableUtil.getRootMessage(t));
+ }
+ state.setRemainingRawData(new AtomicInteger(0));
+ rawDataIndexEntriesArrival.abort();
+ deleteIndexEntries(MetricsTable.ONE_HOUR);
+ }
+ }, state.getAggregationTasks());
+
+ if (state.is6HourTimeSliceFinished()) {
+ log.debug("Fetching 1 hour index entries");
+ Stopwatch stopwatch = new Stopwatch().start();
+ StorageResultSetFuture oneHourFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
+ state.getSixHourTimeSlice().getMillis());
+ Futures.addCallback(oneHourFuture, new
AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
+ state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(),
stopwatch, "1 hour", "6 hour"),
+ state.getAggregationTasks());
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ log.debug("Fetching 6 hour index entries");
+ Stopwatch stopwatch = new Stopwatch().start();
+ StorageResultSetFuture sixHourFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
+ state.getTwentyFourHourTimeSlice().getMillis());
+ Futures.addCallback(sixHourFuture, new
AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
+ state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(),
stopwatch, "6 hour", "24 hour"),
+ state.getAggregationTasks());
+ }
+
+ try {
+ try {
+ rawDataIndexEntriesArrival.await();
+ deleteIndexEntries(MetricsTable.ONE_HOUR);
+ } catch (AbortedException e) {
+ }
+
+ if (state.is6HourTimeSliceFinished()) {
+ waitFor(state.getRemainingRawData());
+ try {
+ state.getOneHourIndexEntriesArrival().await();
+ deleteIndexEntries(MetricsTable.SIX_HOUR);
+
+ List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
+ Set<Integer> scheduleIds = new TreeSet<Integer>();
+ state.getOneHourIndexEntriesLock().writeLock().lock();
+ log.debug("Preparing to submit 1 hour data aggregation tasks for
" +
+ state.getOneHourIndexEntries().size() + " schedules");
+ for (Integer scheduleId : state.getOneHourIndexEntries()) {
+ queryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
state.getSixHourTimeSlice().getMillis(),
+ state.getSixHourTimeSliceEnd().getMillis()));
+ scheduleIds.add(scheduleId);
+ if (queryFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new
Aggregate1HourData(dao, state, scheduleIds,
+ queryFutures));
+ queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new Aggregate1HourData(dao,
state, scheduleIds,
+ queryFutures));
+ queryFutures = null;
+ scheduleIds = null;
+ }
+ } catch (AbortedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some 6 hour aggregates may not get generated.
There was an unexpected error while " +
+ "loading 1 hour index entries", e);
+ } else {
+ log.warn("Some 6 hour aggregates may not get generated.
There was an unexpected error while " +
+ "loading 1 hour index entries: " +
ThrowableUtil.getRootMessage(e));
+ }
+ } finally {
+ state.getOneHourIndexEntriesLock().writeLock().unlock();
+ }
+ }
+
+ if (state.is24HourTimeSliceFinished()) {
+ waitFor(state.getRemaining1HourData());
+ try {
+ state.getSixHourIndexEntriesArrival().await();
+ deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
+
+ List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
+ Set<Integer> scheduleIds = new TreeSet<Integer>();
+ state.getSixHourIndexEntriesLock().writeLock().lock();
+ log.debug("Preparing to submit 6 hour data aggregation tasks for
" +
+ state.getSixHourIndexEntries().size() + " schedules");
+ for (Integer scheduleId : state.getSixHourIndexEntries()) {
+ queryFutures.add(dao.findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
+ state.getTwentyFourHourTimeSliceEnd().getMillis()));
+ scheduleIds.add(scheduleId);
+ if (queryFutures.size() == batchSize) {
+ state.getAggregationTasks().submit(new
Aggregate6HourData(dao, state, scheduleIds,
+ queryFutures));
+ queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
+ scheduleIds = new TreeSet<Integer>();
+ }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new Aggregate6HourData(dao,
state, scheduleIds,
+ queryFutures));
+ queryFutures = null;
+ scheduleIds = null;
+ }
+ } catch (AbortedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Some 24 hour aggregates may not get generated.
There was an unexpected error while " +
+ "loading 6 hour index entries", e);
+ } else {
+ log.warn("Some 24 hour aggregates may not get generated.
There was an unexpected error while " +
+ "loading 6 hour index entries: " +
ThrowableUtil.getRootMessage(e));
+ }
+ } finally {
+ state.getSixHourIndexEntriesLock().writeLock().unlock();
+ }
+ }
+
+ while (!isAggregationFinished()) {
+ Thread.sleep(50);
+ }
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("An interrupt occurred while waiting for aggregation to
finish. Aborting remaining work.", e);
+ } else {
+ log.warn("An interrupt occurred while waiting for aggregation to
finish. Aborting remaining work: " +
+ ThrowableUtil.getRootMessage(e));
+ }
+ log.warn("An interrupt occurred while waiting for aggregation to
finish", e);
+ }
+ return oneHourData;
+ }
+
+ private void waitFor(AtomicInteger remainingData) throws InterruptedException {
+ while (remainingData.get() > 0) {
+ Thread.sleep(50);
+ }
+ }
+
+ private boolean isAggregationFinished() {
+ return state.getRemainingRawData().get() <= 0 &&
state.getRemaining1HourData().get() <= 0 &&
+ state.getRemaining6HourData().get() <= 0 &&
remainingIndexEntries.get() <= 0;
+ }
+
+ private void deleteIndexEntries(final MetricsTable table) {
+ final DateTime time;
+ switch (table) {
+ case ONE_HOUR:
+ time = startTime;
+ break;
+ case SIX_HOUR:
+ time = state.getSixHourTimeSlice();
+ break;
+ default:
+ time = state.getTwentyFourHourTimeSlice();
+ break;
+ }
+ log.debug("Deleting " + table + " index entries for time slice
" + time);
+ writePermits.acquire();
+ StorageResultSetFuture future = dao.deleteMetricsIndexEntriesAsync(table,
time.getMillis());
+ Futures.addCallback(future, new FutureCallback<ResultSet>() {
+ @Override
+ public void onSuccess(ResultSet result) {
+ remainingIndexEntries.decrementAndGet();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to delete index entries for table " +
table + " at time [" + time + "]. An " +
+ "unexpected error occurred.", t);
+ } else {
+ log.warn("Failed to delete index entries for table " +
table + " at time [" + time + "]. An " +
+ "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
+ }
+ remainingIndexEntries.decrementAndGet();
+ }
+ });
+ }
+
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
new file mode 100644
index 0000000..f130f75
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute1HourData.java
@@ -0,0 +1,113 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 1 hour data for a batch of raw data result sets. The generated 1 hour
aggregates are inserted along with
+ * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute1HourData implements AsyncFunction<List<ResultSet>,
List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute1HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ private DateTime sixHourTimeSlice;
+
+ private Set<AggregateNumericMetric> oneHourData;
+
+ public Compute1HourData(DateTime startTime, DateTime sixHourTimeSlice, RateLimiter
writePermits, MetricsDAO dao,
+ Set<AggregateNumericMetric> oneHourData) {
+ this.startTime = startTime;
+ this.sixHourTimeSlice = sixHourTimeSlice;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ this.oneHourData = oneHourData;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet>
rawDataResultSets) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 1 hour data for " +
rawDataResultSets.size() + " schedules");
+ }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures = new
ArrayList<StorageResultSetFuture>(rawDataResultSets.size());
+ for (ResultSet resultSet : rawDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregatedRaw(resultSet);
+ oneHourData.add(aggregate);
+ writePermits.acquire(4);
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.MIN, aggregate.getMin()));
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.MAX, aggregate.getMax()));
+ insertFutures.add(dao.insertOneHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.AVG, aggregate.getAvg()));
+ insertFutures.add(dao.updateMetricsIndex(MetricsTable.SIX_HOUR,
aggregate.getScheduleId(),
+ sixHourTimeSlice.getMillis()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 1 hour data for " +
rawDataResultSets.size() +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ " ms");
+ }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregatedRaw(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ int count = 0;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ double value;
+ List<Row> rows = resultSet.all();
+
+ for (Row row : rows) {
+ value = row.getDouble(2);
+ if (count == 0) {
+ min = value;
+ max = min;
+ }
+ if (value < min) {
+ min = value;
+ } else if (value > max) {
+ max = value;
+ }
+ mean.add(value);
+ ++count;
+ }
+
+ return new AggregateNumericMetric(rows.get(0).getInt(0),
mean.getArithmeticMean(), min, max,
+ startTime.getMillis());
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
new file mode 100644
index 0000000..6fe9d79
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute24HourData.java
@@ -0,0 +1,99 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+
+/**
+ * Computes 24 hour data for a batch of raw data result sets. The generated 6 hour
aggregates are inserted.
+ *
+ * @author John Sanda
+ */
+class Compute24HourData implements AsyncFunction<List<ResultSet>,
List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute24HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ public Compute24HourData(DateTime startTime, RateLimiter writePermits, MetricsDAO
dao) {
+ this.startTime = startTime;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet>
sixHourDataResultSets) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 24 hour data for " +
sixHourDataResultSets.size() + " schedules");
+ }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures =
+ new
ArrayList<StorageResultSetFuture>(sixHourDataResultSets.size());
+ for (ResultSet resultSet : sixHourDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+ writePermits.acquire(3);
+
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.MIN, aggregate.getMin()));
+
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.MAX, aggregate.getMax()));
+
insertFutures.add(dao.insertTwentyFourHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.AVG, aggregate.getAvg()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 24 hour data for " +
sixHourDataResultSets.size() +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ " ms");
+ }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ List<Row> rows = resultSet.all();
+
+ for (int i = 0; i < rows.size(); i += 3) {
+ if (i == 0) {
+ min = rows.get(i + 1).getDouble(3);
+ max = rows.get(i).getDouble(3);
+ } else {
+ if (rows.get(i + 1).getDouble(3) < min) {
+ min = rows.get(i + 1).getDouble(3);
+ }
+ if (rows.get(i).getDouble(3) > max) {
+ max = rows.get(i).getDouble(3);
+ }
+ }
+ mean.add(rows.get(i + 2).getDouble(3));
+ }
+ return new AggregateNumericMetric(rows.get(0).getInt(0),
mean.getArithmeticMean(), min, max,
+ startTime.getMillis());
+ }
+
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
new file mode 100644
index 0000000..ec1ee26
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Compute6HourData.java
@@ -0,0 +1,106 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.joda.time.DateTime;
+
+import org.rhq.server.metrics.ArithmeticMeanCalculator;
+import org.rhq.server.metrics.MetricsDAO;
+import org.rhq.server.metrics.StorageResultSetFuture;
+import org.rhq.server.metrics.domain.AggregateNumericMetric;
+import org.rhq.server.metrics.domain.AggregateType;
+import org.rhq.server.metrics.domain.MetricsTable;
+
+/**
+ * Computes 6 hour data for a batch of raw data result sets. The generated 6 hour
aggregates are inserted along with
+ * their corresponding index updates.
+ *
+ * @author John Sanda
+ */
+class Compute6HourData implements AsyncFunction<List<ResultSet>,
List<ResultSet>> {
+
+ private final Log log = LogFactory.getLog(Compute6HourData.class);
+
+ private DateTime startTime;
+
+ private RateLimiter writePermits;
+
+ private MetricsDAO dao;
+
+ private DateTime twentyFourHourTimeSlice;
+
+ public Compute6HourData(DateTime startTime, DateTime twentyFourHourTimeSlice,
RateLimiter writePermits,
+ MetricsDAO dao) {
+ this.startTime = startTime;
+ this.twentyFourHourTimeSlice = twentyFourHourTimeSlice;
+ this.writePermits = writePermits;
+ this.dao = dao;
+ }
+
+ @Override
+ public ListenableFuture<List<ResultSet>> apply(List<ResultSet>
oneHourDataResultSets) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("Computing and storing 6 hour data for " +
oneHourDataResultSets.size() + " schedules");
+ }
+ Stopwatch stopwatch = new Stopwatch().start();
+ try {
+ List<StorageResultSetFuture> insertFutures =
+ new
ArrayList<StorageResultSetFuture>(oneHourDataResultSets.size());
+ for (ResultSet resultSet : oneHourDataResultSets) {
+ AggregateNumericMetric aggregate = calculateAggregate(resultSet);
+ writePermits.acquire(4);
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.MIN, aggregate.getMin()));
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.MAX, aggregate.getMax()));
+ insertFutures.add(dao.insertSixHourDataAsync(aggregate.getScheduleId(),
aggregate.getTimestamp(),
+ AggregateType.AVG, aggregate.getAvg()));
+ insertFutures.add(dao.updateMetricsIndex(MetricsTable.TWENTY_FOUR_HOUR,
aggregate.getScheduleId(),
+ twentyFourHourTimeSlice.getMillis()));
+ }
+ return Futures.successfulAsList(insertFutures);
+ } finally {
+ if (log.isDebugEnabled()) {
+ stopwatch.stop();
+ log.debug("Finished computing and storing 6 hour data for " +
oneHourDataResultSets.size() +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ " ms");
+ }
+ }
+ }
+
+ private AggregateNumericMetric calculateAggregate(ResultSet resultSet) {
+ double min = Double.NaN;
+ double max = min;
+ ArithmeticMeanCalculator mean = new ArithmeticMeanCalculator();
+ List<Row> rows = resultSet.all();
+
+ for (int i = 0; i < rows.size(); i += 3) {
+ if (i == 0) {
+ min = rows.get(i + 1).getDouble(3);
+ max = rows.get(i).getDouble(3);
+ } else {
+ if (rows.get(i + 1).getDouble(3) < min) {
+ min = rows.get(i + 1).getDouble(3);
+ }
+ if (rows.get(i).getDouble(3) > max) {
+ max = rows.get(i).getDouble(3);
+ }
+ }
+ mean.add(rows.get(i + 2).getDouble(3));
+ }
+ return new AggregateNumericMetric(rows.get(0).getInt(0),
mean.getArithmeticMean(), min, max,
+ startTime.getMillis());
+ }
+}