modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
| 6
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
| 72 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
| 133 ----
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
| 91 ---
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
| 85 --
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
| 137 ----
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
| 144 ++--
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java
| 25
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
| 289 +---------
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java
| 111 +++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java
| 108 +++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java
| 78 ++
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java
| 47 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java
| 47 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java
| 46 +
15 files changed, 620 insertions(+), 799 deletions(-)
New commits:
commit af7af8fc5f453246637a2a6aea9e9e297abb7147
Author: John Sanda <jsanda(a)redhat.com>
Date: Mon Feb 3 11:30:00 2014 -0500
[BZ 1045589] major refactoring of aggregation to limit concurrent reads
Even with really high throttling like only allowing 5k requests/sec, it was
still easily possible to do enough concurrent reads during aggregation that
could result in OMMs. Prior to this commit, several hundred or thousdand
queries could be basically executed in parallel to fetch raw data. The max
size of the payload coming in the form of a StorageResultSetFuture object is
about 30 KB.
With this commit, the number of concurrent reads that is done during
aggregation has a fix, configurable upper bound. It can be configured with the
following system properties,
* rhq.metrics.aggregation.batch-size
* rhq.metrics.aggregation.parallelism
Data for measurement schedules is fetched and computed in chunks or batches.
The batch-size property specifies the number of schedules for which data will
be fetched. The parallelism property specifies the number of batches that can
be processed in parallel. In terms of implementation, this is all handled using
a Semaphore. The number of semaphore permits is determined by
batch-size * parallelism. So for example, if batch-size is 25 and parallelism
is 4, then this will yield 100 permits, meaning at most 100 concurrent metric
data queries are allowed.
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
index 2d0d190..edcb7b7 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/SignalingCountDownLatch.java
@@ -11,6 +11,8 @@ public class SignalingCountDownLatch {
private CountDownLatch latch;
+ private String msg;
+
public SignalingCountDownLatch(CountDownLatch latch) {
this.latch = latch;
}
@@ -18,11 +20,11 @@ public class SignalingCountDownLatch {
public void await() throws InterruptedException, AbortedException {
latch.await();
if (aborted) {
- throw new AbortedException();
+ throw new AbortedException(msg);
}
}
- public void abort() {
+ public void abort(String msg) {
aborted = true;
latch.countDown();
}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
index d61f380..41cdc92 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/StorageSession.java
@@ -4,7 +4,6 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
@@ -35,7 +34,7 @@ public class StorageSession implements Host.StateListener {
private int minRequestLimit =
Integer.parseInt(System.getProperty("rhq.storage.request-limit.min",
"1000"));
private RateLimiter permits = RateLimiter.create(Double.parseDouble(
- System.getProperty("rhq.storage.request-limit", "50000")), 3,
TimeUnit.MINUTES);
+ System.getProperty("rhq.storage.request-limit", "10000")), 3,
TimeUnit.MINUTES);
private int requestLimitDelta;
@@ -43,10 +42,6 @@ public class StorageSession implements Host.StateListener {
private long permitsChangeWindow = 1000 * 10;
- private boolean permitsChanging;
-
- private ReentrantReadWriteLock permitsLock = new ReentrantReadWriteLock();
-
public StorageSession(Session wrappedSession) {
this.wrappedSession = wrappedSession;
this.wrappedSession.getCluster().register(this);
@@ -98,8 +93,7 @@ public class StorageSession implements Host.StateListener {
public ResultSet execute(String query) {
try {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
return wrappedSession.execute(query);
} catch (NoHostAvailableException e) {
handleNoHostAvailable(e);
@@ -109,8 +103,7 @@ public class StorageSession implements Host.StateListener {
public ResultSet execute(Query query) {
try {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
return wrappedSession.execute(query);
} catch (NoHostAvailableException e) {
handleNoHostAvailable(e);
@@ -119,22 +112,19 @@ public class StorageSession implements Host.StateListener {
}
public StorageResultSetFuture executeAsync(String query) {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
return new StorageResultSetFuture(future, this);
}
public StorageResultSetFuture executeAsync(Query query) {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
ResultSetFuture future = wrappedSession.executeAsync(query);
return new StorageResultSetFuture(future, this);
}
public PreparedStatement prepare(String query) {
-// permits.acquire();
- acquirePermit();
+ permits.acquire();
return wrappedSession.prepare(query);
}
@@ -195,54 +185,30 @@ public class StorageSession implements Host.StateListener {
void handleNoHostAvailable(NoHostAvailableException e) {
log.warn("Encountered " +
NoHostAvailableException.class.getSimpleName() + " due to following error(s): "
+
e.getErrors());
+ boolean isClientTimeout = false;
for (InetAddress address : e.getErrors().keySet()) {
String error = e.getErrors().get(address);
if (error != null && error.contains("Timeout during read"))
{
- try {
- permitsLock.writeLock().lock();
- if (System.currentTimeMillis() - permitsLastChanged >
permitsChangeWindow) {
- permitsChanging = true;
- int newRate = (int) permits.getRate() - requestLimitDelta;
- if (newRate < minRequestLimit) {
- newRate = minRequestLimit;
- }
- log.warn("The request timed out. Decreasing request
throughput to " + newRate);
-// permits.setRate(newRate);
- permits = RateLimiter.create(newRate, 2, TimeUnit.MINUTES);
- permitsLastChanged = System.currentTimeMillis();
- requestLimitDelta = requestLimitDelta * 2;
+ if (System.currentTimeMillis() - permitsLastChanged >
permitsChangeWindow) {
+ int newRate = (int) permits.getRate() - requestLimitDelta;
+ if (newRate < minRequestLimit) {
+ newRate = minRequestLimit;
}
- } finally {
- permitsChanging = false;
- permitsLock.writeLock().unlock();
+ log.warn("The request timed out. Decreasing request throughput
to " + newRate);
+ permits.setRate(newRate);
+ permitsLastChanged = System.currentTimeMillis();
+ requestLimitDelta = requestLimitDelta * 2;
}
-
+ isClientTimeout = true;
break;
}
}
-// for (String error : e.getErrors().values()) {
-// if (error.contains("Timeout during read")) {
-// log.warn("The request timed out. Decreasing request
throughput.");
-// permits.setRate(permits.getRate() - requestLimitDelta);
-// break;
-// }
-// }
- }
-
- private void acquirePermit() {
- if (permitsChanging) {
- try {
- permitsLock.readLock().lock();
- permits.acquire();
- } finally {
- permitsLock.readLock().unlock();
- }
- } else {
- permits.acquire();
+ if (!isClientTimeout) {
+ fireClusterDownEvent(e);
}
}
- void fireClusterDownEvent(NoHostAvailableException e) {
+ private void fireClusterDownEvent(NoHostAvailableException e) {
isClusterAvailable = false;
for (StorageStateListener listener : listeners) {
listener.onStorageClusterDown(e);
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
deleted file mode 100644
index 702b64f..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate1HourData.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.AbortedException;
-import org.rhq.server.metrics.MetricsDAO;
-import org.rhq.server.metrics.StorageResultSetFuture;
-
-/**
- * Generates 6 hour data for a batch of 1 hour data futures. After data is inserted for
the batch, aggregation of 6 hour
- * data will start immediately for the batch if the 24 hour time slice has finished.
- *
- * @see Compute6HourData
- * @author John Sanda
- */
-class Aggregate1HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate1HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate1HourData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final Stopwatch stopwatch = new Stopwatch().start();
- ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new
FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
- log.error("An error occurred while fetching one hour data",
t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures =
Futures.transform(queriesFuture,
- state.getCompute6HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- stopwatch.stop();
- log.debug("Finished aggregating 1 hour data for " +
result.size() + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- start6HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- // TODO should we log the schedule ids?
- log.debug("Failed to aggregate 1 hour data for " +
scheduleIds.size() + " schedules. An " +
- "unexpected error occurred.", t);
- } else {
- log.warn("Failed to aggregate 1 hour data for " +
scheduleIds.size() + " schedules. An " +
- "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
- }
- start6HourDataAggregationIfNecessary();
- }
- });
- }
-
- private void start6HourDataAggregationIfNecessary() {
- try {
- if (state.is24HourTimeSliceFinished()) {
- update6HourIndexEntries();
- List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate6HourData(dao, state,
scheduleIds, queryFutures));
- }
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("An interrupt occurred while waiting for 6 hour data index
entries. Aborting data aggregation",
- e);
- } else {
- log.info("An interrupt occurred while waiting for 6 hour data index
entries. Aborting data " +
- "aggregation: " + e.getMessage());
- }
- } finally {
- int remainingSchedules =
state.getRemaining1HourData().addAndGet(-scheduleIds.size());
- if (log.isDebugEnabled()) {
- log.debug("There are " + remainingSchedules + " remaining
schedules with 1 hr data to be aggregated");
- }
- }
- }
-
- private void update6HourIndexEntries() throws InterruptedException {
- try {
- state.getSixHourIndexEntriesArrival().await();
- try {
- state.getSixHourIndexEntriesLock().writeLock().lock();
- state.getSixHourIndexEntries().removeAll(scheduleIds);
- } finally {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 6 hour data because we do not need the index
- // here since we already have 6 hour data to aggregate along with the
- // schedule ids.
- log.debug("The wait for 6 hour index entries has been aborted.
Proceeding with scheduling computation of " +
- "1 hour aggregates for previously assigned schedules.");
- state.getRemaining6HourData().addAndGet(scheduleIds.size());
- }
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
deleted file mode 100644
index 836e070..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregate6HourData.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.FutureFallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.MetricsDAO;
-import org.rhq.server.metrics.StorageResultSetFuture;
-
-/**
- * Generates 24 hour data for a batch of 1 hour data futures. After data is inserted for
the batch, aggregation of 6
- * hour data will start immediately for the batch if the 24 hour time slice has
finished.
- *
- * @see Compute24HourData
- * @author John Sanda
- */
-class Aggregate6HourData implements Runnable {
-
- private final Log log = LogFactory.getLog(Aggregate6HourData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public Aggregate6HourData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final Stopwatch stopwatch = new Stopwatch().start();
- ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(queryFutures);
- Futures.withFallback(queriesFuture, new
FutureFallback<List<ResultSet>>() {
- @Override
- public ListenableFuture<List<ResultSet>> create(Throwable t)
throws Exception {
- log.error("An error occurred while fetching 6 hour data", t);
- return Futures.immediateFailedFuture(t);
- }
- });
- ListenableFuture<List<ResultSet>> computeFutures =
Futures.transform(queriesFuture,
- state.getCompute24HourData(), state.getAggregationTasks());
- Futures.addCallback(computeFutures, new
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> result) {
- stopwatch.stop();
- log.debug("Finished aggregating 6 hour data for " +
result.size() + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- updateRemaining6HrData();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- // TODO should we log the schedule ids?
- log.debug("Failed to aggregate 6 hour data for " +
scheduleIds.size() + " schedules. An " +
- "unexpected error occurred.", t);
- } else {
- log.warn("Failed to aggregate 6 hour data for " +
scheduleIds.size() + " schedules. An " +
- "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
- }
- updateRemaining6HrData();
- }
- });
- }
-
- private void updateRemaining6HrData() {
- int remainingSchedules =
state.getRemaining6HourData().addAndGet(-scheduleIds.size());
- if (log.isDebugEnabled()) {
- log.debug("There are " + remainingSchedules + " remaining
schedules with 6 hr data to be aggregated");
- }
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
deleted file mode 100644
index f8c5318..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateIndexEntriesHandler.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.SignalingCountDownLatch;
-
-/**
-* @author John Sanda
-*/
-class AggregateIndexEntriesHandler implements FutureCallback<ResultSet> {
-
- private final Log log = LogFactory.getLog(AggregateIndexEntriesHandler.class);
-
- private Set<Integer> indexEntries;
-
- private AtomicInteger remainingData;
-
- private SignalingCountDownLatch indexEntriesArrival;
-
- private Stopwatch stopwatch;
-
- private String src;
-
- private String dest;
-
- public AggregateIndexEntriesHandler(Set<Integer> indexEntries, AtomicInteger
remainingData,
- SignalingCountDownLatch indexEntriesArrival, Stopwatch stopwatch, String src,
String dest) {
- this.indexEntries = indexEntries;
- this.remainingData = remainingData;
- this.indexEntriesArrival = indexEntriesArrival;
- this.stopwatch = stopwatch;
- this.src = src;
- this.dest = dest;
- }
-
- @Override
- public void onSuccess(ResultSet resultSet) {
- for (Row row : resultSet) {
- indexEntries.add(row.getInt(1));
- }
-
- // Even if indexInetries is empty, it is possible (though unlikely) that a
subsequent query could return a
- // non-empty result set. This might happen if the index was updated at the very
end of the last time slice, and
- // we do an inconsistent read. When it is empty, abort() is called to let the
AggregateXXXData objects know that
- // the should update remainingData.
- if (indexEntries.isEmpty()) {
- indexEntriesArrival.abort();
- } else {
- remainingData.set(indexEntries.size());
- indexEntriesArrival.countDown();
- }
-
- stopwatch.stop();
-
- if (log.isDebugEnabled()) {
- log.debug("Finished loading " + indexEntries.size() + " "
+ src + " index entries in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- }
-
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- log.debug("Some " + dest + " aggregates may not get computed.
An unexpected error occurred while " +
- "retrieving " + src + " index entries", t);
- } else {
- log.warn("Some " + dest + " aggregates may not get computed.
An unexpected error occurred while " +
- "retrieving " + src + " index entries: " +
ThrowableUtil.getRootMessage(t));
- }
- remainingData.set(0);
- indexEntriesArrival.abort();
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
deleted file mode 100644
index 0402f9e..0000000
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregateRawData.java
+++ /dev/null
@@ -1,137 +0,0 @@
-package org.rhq.server.metrics.aggregation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import com.datastax.driver.core.ResultSet;
-import com.google.common.base.Stopwatch;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.rhq.core.util.exception.ThrowableUtil;
-import org.rhq.server.metrics.AbortedException;
-import org.rhq.server.metrics.MetricsDAO;
-import org.rhq.server.metrics.StorageResultSetFuture;
-
-/**
- * Generates 1 hour data for a batch of raw data futures. After data is inserted for the
batch, aggregation of 1 hour
- * data will start immediately for the batch if the 6 hour time slice has finished.
- *
- * @see Compute1HourData
- * @author John Sanda
- */
-class AggregateRawData implements Runnable {
-
- private final Log log = LogFactory.getLog(AggregateRawData.class);
-
- private MetricsDAO dao;
-
- private AggregationState state;
-
- private Set<Integer> scheduleIds;
-
- private List<StorageResultSetFuture> queryFutures;
-
- public AggregateRawData(MetricsDAO dao, AggregationState state, Set<Integer>
scheduleIds,
- List<StorageResultSetFuture> queryFutures) {
- this.dao = dao;
- this.state = state;
- this.scheduleIds = scheduleIds;
- this.queryFutures = queryFutures;
- }
-
- @Override
- public void run() {
- final Stopwatch stopwatch = new Stopwatch().start();
- try {
- ListenableFuture<List<ResultSet>> rawDataFutures =
Futures.successfulAsList(queryFutures);
- final ListenableFuture<List<ResultSet>> insert1HourDataFutures =
Futures.transform(rawDataFutures,
- state.getCompute1HourData(), state.getAggregationTasks());
- Futures.addCallback(insert1HourDataFutures, new
FutureCallback<List<ResultSet>>() {
- @Override
- public void onSuccess(List<ResultSet> resultSets) {
- stopwatch.stop();
- log.debug("Finished aggregating raw data for " +
scheduleIds.size() + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- start1HourDataAggregationIfNecessary();
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- // TODO should we log the schedule ids?
- log.debug("Failed to aggregate raw data for " +
scheduleIds.size() + " schedules. An unexpected " +
- "error occurred.", t);
- } else {
- log.warn("Failed to aggregate raw data for " +
scheduleIds.size() + " schedules. An " +
- "unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
- }
- start1HourDataAggregationIfNecessary();
- }
- }, state.getAggregationTasks());
- } catch (Exception e) {
- log.error("An unexpected error occurred while aggregating raw
data", e);
- }
- }
-
- private void start1HourDataAggregationIfNecessary() {
- try {
- if (state.is6HourTimeSliceFinished()) {
- update1HourIndexEntries();
- List<StorageResultSetFuture> oneHourDataQueryFutures = new
ArrayList<StorageResultSetFuture>(
- scheduleIds.size());
- for (Integer scheduleId : scheduleIds) {
- oneHourDataQueryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
- state.getSixHourTimeSlice().getMillis(),
state.getSixHourTimeSliceEnd().getMillis()));
- }
- state.getAggregationTasks().submit(new Aggregate1HourData(dao, state,
scheduleIds,
- oneHourDataQueryFutures));
- }
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("An interrupt occurred while waiting for 1 hour data index
entries. Aborting data aggregation",
- e);
- } else {
- log.info("An interrupt occurred while waiting for 1 hour data index
entries. Aborting data " +
- "aggregation: " + e.getMessage());
- }
- } finally {
- int remainingSchedules =
state.getRemainingRawData().addAndGet(-scheduleIds.size());
- if (log.isDebugEnabled()) {
- log.debug("There are " + remainingSchedules + " remaining
schedules with raw data to be aggregated");
- }
-
- }
- }
-
- private void update1HourIndexEntries() throws InterruptedException {
- try {
- // Wait for the arrival so that we can remove the schedules ids in this
- // batch from the one hour index entries. This will prevent duplicate tasks
- // being submitted to process the same 1 hour data.
- log.debug("Waiting for arrival of 1 hour index entries");
- state.getOneHourIndexEntriesArrival().await();
- try {
- state.getOneHourIndexEntriesLock().writeLock().lock();
- state.getOneHourIndexEntries().removeAll(scheduleIds);
- log.debug("Finished updating state.oneHourIndexEntries");
- } finally {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- } catch (AbortedException e) {
- // This means we failed to retrieve the index entries. We can however
- // continue generating 1 hour data because we do not need the index
- // here since we already have 1 hour data to aggregate along with the
- // schedule ids.
- log.debug("The wait for 1 hour index entries has been aborted.
Proceeding with scheduling computation of " +
- "1 hour aggregates for previously assigned schedules.");
- state.getRemaining1HourData().addAndGet(scheduleIds.size());
- }
- }
-}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
index 345e53a..78f1bc4 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationState.java
@@ -1,13 +1,13 @@
package org.rhq.server.metrics.aggregation;
-import java.util.Set;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.joda.time.DateTime;
+import org.rhq.server.metrics.MetricsDAO;
import org.rhq.server.metrics.SignalingCountDownLatch;
/**
@@ -15,28 +15,32 @@ import org.rhq.server.metrics.SignalingCountDownLatch;
*/
class AggregationState {
- private ListeningExecutorService aggregationTasks;
+ private DateTime startTime;
- private SignalingCountDownLatch oneHourIndexEntriesArrival;
+ private int batchSize;
- private SignalingCountDownLatch sixHourIndexEntriesArrival;
+ private MetricsDAO dao;
- private AtomicInteger remainingRawData;
+ private ListeningExecutorService aggregationTasks;
- private AtomicInteger remaining1HourData;
+ private Semaphore permits;
- private AtomicInteger remaining6HourData;
+ private SignalingCountDownLatch rawAggregationDone;
- private Set<Integer> oneHourIndexEntries;
+ private SignalingCountDownLatch oneHourAggregationDone;
- private Set<Integer> sixHourIndexEntries;
+ private SignalingCountDownLatch sixHourAggregationDone;
- private ReentrantReadWriteLock oneHourIndexEntriesLock;
+ private AtomicInteger remainingRawData;
+
+ private AtomicInteger remaining1HourData;
- private ReentrantReadWriteLock sixHourIndexEntriesLock;
+ private AtomicInteger remaining6HourData;
private DateTime oneHourTimeSlice;
+ private DateTime oneHourTimeSliceEnd;
+
private DateTime sixHourTimeSlice;
private DateTime sixHourTimeSliceEnd;
@@ -55,6 +59,33 @@ class AggregationState {
private Compute24HourData compute24HourData;
+ DateTime getStartTime() {
+ return startTime;
+ }
+
+ AggregationState setStartTime(DateTime startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ int getBatchSize() {
+ return batchSize;
+ }
+
+ AggregationState setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ MetricsDAO getDao() {
+ return dao;
+ }
+
+ AggregationState setDao(MetricsDAO dao) {
+ this.dao = dao;
+ return this;
+ }
+
public ListeningExecutorService getAggregationTasks() {
return aggregationTasks;
}
@@ -64,29 +95,39 @@ class AggregationState {
return this;
}
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries
for schedules with 1 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getOneHourIndexEntriesArrival() {
- return oneHourIndexEntriesArrival;
+ Semaphore getPermits() {
+ return permits;
}
- public AggregationState setOneHourIndexEntriesArrival(SignalingCountDownLatch
oneHourIndexEntriesArrival) {
- this.oneHourIndexEntriesArrival = oneHourIndexEntriesArrival;
+ AggregationState setPermits(Semaphore permits) {
+ this.permits = permits;
return this;
}
- /**
- * @return A {@link SignalingCountDownLatch} to signal the arrival of index entries
for schedules with 6 hour
- * data to be aggregated
- */
- public SignalingCountDownLatch getSixHourIndexEntriesArrival() {
- return sixHourIndexEntriesArrival;
+ SignalingCountDownLatch getRawAggregationDone() {
+ return rawAggregationDone;
}
- public AggregationState setSixHourIndexEntriesArrival(SignalingCountDownLatch
sixHourIndexEntriesArrival) {
- this.sixHourIndexEntriesArrival = sixHourIndexEntriesArrival;
+ AggregationState setRawAggregationDone(SignalingCountDownLatch rawAggregationDone) {
+ this.rawAggregationDone = rawAggregationDone;
+ return this;
+ }
+
+ SignalingCountDownLatch getOneHourAggregationDone() {
+ return oneHourAggregationDone;
+ }
+
+ AggregationState setOneHourAggregationDone(SignalingCountDownLatch
oneHourAggregationDone) {
+ this.oneHourAggregationDone = oneHourAggregationDone;
+ return this;
+ }
+
+ SignalingCountDownLatch getSixHourAggregationDone() {
+ return sixHourAggregationDone;
+ }
+
+ AggregationState setSixHourAggregationDone(SignalingCountDownLatch
sixHourAggregationDone) {
+ this.sixHourAggregationDone = sixHourAggregationDone;
return this;
}
@@ -126,51 +167,21 @@ class AggregationState {
return this;
}
- /**
- * @return The schedule ids with 1 hour data to be aggregated
- */
- public Set<Integer> getOneHourIndexEntries() {
- return oneHourIndexEntries;
- }
-
- public AggregationState setOneHourIndexEntries(Set<Integer>
oneHourIndexEntries) {
- this.oneHourIndexEntries = oneHourIndexEntries;
- return this;
- }
-
- public Set<Integer> getSixHourIndexEntries() {
- return sixHourIndexEntries;
- }
-
- public AggregationState setSixHourIndexEntries(Set<Integer>
sixHourIndexEntries) {
- this.sixHourIndexEntries = sixHourIndexEntries;
- return this;
- }
-
- public ReentrantReadWriteLock getOneHourIndexEntriesLock() {
- return oneHourIndexEntriesLock;
- }
-
- public AggregationState setOneHourIndexEntriesLock(ReentrantReadWriteLock
oneHourIndexEntriesLock) {
- this.oneHourIndexEntriesLock = oneHourIndexEntriesLock;
- return this;
- }
-
- public ReentrantReadWriteLock getSixHourIndexEntriesLock() {
- return sixHourIndexEntriesLock;
+ public DateTime getOneHourTimeSlice() {
+ return oneHourTimeSlice;
}
- public AggregationState setSixHourIndexEntriesLock(ReentrantReadWriteLock
sixHourIndexEntriesLock) {
- this.sixHourIndexEntriesLock = sixHourIndexEntriesLock;
+ public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
+ this.oneHourTimeSlice = oneHourTimeSlice;
return this;
}
- public DateTime getOneHourTimeSlice() {
- return oneHourTimeSlice;
+ DateTime getOneHourTimeSliceEnd() {
+ return oneHourTimeSliceEnd;
}
- public AggregationState setOneHourTimeSlice(DateTime oneHourTimeSlice) {
- this.oneHourTimeSlice = oneHourTimeSlice;
+ AggregationState setOneHourTimeSliceEnd(DateTime oneHourTimeSliceEnd) {
+ this.oneHourTimeSliceEnd = oneHourTimeSliceEnd;
return this;
}
@@ -254,4 +265,5 @@ class AggregationState {
this.compute24HourData = compute24HourData;
return this;
}
+
}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java
new file mode 100644
index 0000000..4a6c886
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/AggregationType.java
@@ -0,0 +1,25 @@
+package org.rhq.server.metrics.aggregation;
+
+/**
+ * @author John Sanda
+ */
+public enum AggregationType {
+
+ RAW("raw data"),
+
+ ONE_HOUR("one hour data"),
+
+ SIX_HOUR("six hour data");
+
+ private String type;
+
+ private AggregationType(String type) {
+ this.type = type;
+ }
+
+
+ @Override
+ public String toString() {
+ return type;
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
index d86379e..c1ed47a 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/Aggregator.java
@@ -1,18 +1,17 @@
package org.rhq.server.metrics.aggregation;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -58,19 +57,11 @@ public class Aggregator {
private DateTime startTime;
- /**
- * Signals when raw data index entries (in metrics_index) can be deleted. We cannot
delete the row in metrics_index
- * until we know that it has been read, successfully or otherwise.
- */
- private SignalingCountDownLatch rawDataIndexEntriesArrival;
-
- private int batchSize;
-
private AggregationState state;
private Set<AggregateNumericMetric> oneHourData;
- private AtomicInteger remainingIndexEntries;
+ private int maxParallelism =
Integer.parseInt(System.getProperty("rhq.metrics.aggregation.parallelism",
"5"));
public Aggregator(ListeningExecutorService aggregationTasks, MetricsDAO dao,
MetricsConfiguration configuration,
DateTimeService dtService, DateTime startTime, int batchSize) {
@@ -78,17 +69,22 @@ public class Aggregator {
this.configuration = configuration;
this.dtService = dtService;
this.startTime = startTime;
- this.batchSize = batchSize;
oneHourData = new
ConcurrentSkipListSet<AggregateNumericMetric>(AGGREGATE_COMPARATOR);
- rawDataIndexEntriesArrival = new SignalingCountDownLatch(new CountDownLatch(1));
- remainingIndexEntries = new AtomicInteger(1);
DateTime sixHourTimeSlice = get6HourTimeSlice();
DateTime twentyFourHourTimeSlice = get24HourTimeSlice();
state = new AggregationState()
+ .setDao(dao)
+ .setStartTime(startTime)
+ .setBatchSize(batchSize)
.setAggregationTasks(aggregationTasks)
+ .setPermits(new Semaphore(maxParallelism * batchSize, true))
+ .setRawAggregationDone(new SignalingCountDownLatch(new CountDownLatch(1)))
+ .setOneHourAggregationDone(new SignalingCountDownLatch(new
CountDownLatch(1)))
+ .setSixHourAggregationDone(new SignalingCountDownLatch(new
CountDownLatch(1)))
.setOneHourTimeSlice(startTime)
+
.setOneHourTimeSliceEnd(startTime.plus(configuration.getRawTimeSliceDuration()))
.setSixHourTimeSlice(sixHourTimeSlice)
.setSixHourTimeSliceEnd(sixHourTimeSlice.plus(configuration.getOneHourTimeSliceDuration()))
.setTwentyFourHourTimeSlice(twentyFourHourTimeSlice)
@@ -101,27 +97,7 @@ public class Aggregator {
configuration.getSixHourTimeSliceDuration()))
.setRemainingRawData(new AtomicInteger(0))
.setRemaining1HourData(new AtomicInteger(0))
- .setRemaining6HourData(new AtomicInteger(0))
- .setOneHourIndexEntries(new TreeSet<Integer>())
- .setSixHourIndexEntries(new TreeSet<Integer>())
- .setOneHourIndexEntriesLock(new ReentrantReadWriteLock())
- .setSixHourIndexEntriesLock(new ReentrantReadWriteLock());
-
- if (state.is6HourTimeSliceFinished()) {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setOneHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(0)));
- state.setRemaining1HourData(new AtomicInteger(0));
- }
-
- if (state.is24HourTimeSliceFinished()) {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(1)));
- remainingIndexEntries.incrementAndGet();
- } else {
- state.setSixHourIndexEntriesArrival(new SignalingCountDownLatch(new
CountDownLatch(0)));
- state.setRemaining6HourData(new AtomicInteger(0));
- }
+ .setRemaining6HourData(new AtomicInteger(0));
}
private DateTime get24HourTimeSlice() {
@@ -143,214 +119,63 @@ public class Aggregator {
public Set<AggregateNumericMetric> run() {
log.info("Starting aggregation for time slice " + startTime);
- StorageResultSetFuture rawFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
- startTime.getMillis());
- Futures.addCallback(rawFuture, new FutureCallback<ResultSet>() {
- @Override
- public void onSuccess(ResultSet result) {
- int schedules = 0;
- List<Row> rows = result.all();
- log.debug("Retrieved " + rows.size() + " schedule ids from
raw data index");
- state.getRemainingRawData().set(rows.size());
- rawDataIndexEntriesArrival.countDown();
-
- Stopwatch stopwatch = new Stopwatch().start();
-
- final DateTime endTime =
startTime.plus(configuration.getRawTimeSliceDuration());
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- List<StorageResultSetFuture> rawDataFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- for (final Row row : rows) {
- scheduleIds.add(row.getInt(1));
- rawDataFutures.add(dao.findRawMetricsAsync(row.getInt(1),
startTime.getMillis(),
- endTime.getMillis()));
- if (rawDataFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new AggregateRawData(dao,
state, scheduleIds,
- rawDataFutures));
- schedules += rawDataFutures.size();
- rawDataFutures = new ArrayList<StorageResultSetFuture>();
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!rawDataFutures.isEmpty()) {
- state.getAggregationTasks().submit(new AggregateRawData(dao, state,
scheduleIds,
- rawDataFutures));
- schedules += rawDataFutures.size();
- }
-
- if (log.isDebugEnabled()) {
- stopwatch.stop();
- log.debug("Finished scheduling raw data aggregation tasks for
" + schedules + " schedules in " +
- stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- if (log.isDebugEnabled()) {
- log.debug("Aggregation for time slice [" + startTime +
"] cannot proceed. There was an " +
- "unexpected error while retrieving raw data index
entries.", t);
- } else {
- log.warn("Aggregation for time slice [" + startTime +
"] cannot proceed. There was an " +
- "unexpected error while retrieving raw data index entries:
" + ThrowableUtil.getRootMessage(t));
- }
- state.setRemainingRawData(new AtomicInteger(0));
- rawDataIndexEntriesArrival.abort();
- deleteIndexEntries(MetricsTable.ONE_HOUR);
- }
- }, state.getAggregationTasks());
-
- if (state.is6HourTimeSliceFinished()) {
- log.debug("Fetching 1 hour index entries");
- Stopwatch stopwatch = new Stopwatch().start();
- StorageResultSetFuture oneHourFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
- state.getSixHourTimeSlice().getMillis());
- Futures.addCallback(oneHourFuture, new
AggregateIndexEntriesHandler(state.getOneHourIndexEntries(),
- state.getRemaining1HourData(), state.getOneHourIndexEntriesArrival(),
stopwatch, "1 hour", "6 hour"),
- state.getAggregationTasks());
- }
-
- if (state.is24HourTimeSliceFinished()) {
- log.debug("Fetching 6 hour index entries");
+ try {
Stopwatch stopwatch = new Stopwatch().start();
- StorageResultSetFuture sixHourFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.TWENTY_FOUR_HOUR,
- state.getTwentyFourHourTimeSlice().getMillis());
- Futures.addCallback(sixHourFuture, new
AggregateIndexEntriesHandler(state.getSixHourIndexEntries(),
- state.getRemaining6HourData(), state.getSixHourIndexEntriesArrival(),
stopwatch, "6 hour", "24 hour"),
- state.getAggregationTasks());
- }
+ List<MetricsTable> indexUpdates = new
ArrayList<MetricsTable>(3);
+ indexUpdates.add(MetricsTable.ONE_HOUR);
+ StorageResultSetFuture rawIndexFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.ONE_HOUR,
+ startTime.getMillis());
+ Futures.addCallback(rawIndexFuture, new RawDataScheduler(state),
state.getAggregationTasks());
- try {
- try {
- rawDataIndexEntriesArrival.await();
- } catch (AbortedException e) {
- }
- deleteIndexEntries(MetricsTable.ONE_HOUR);
+ state.getRawAggregationDone().await();
+ stopwatch.stop();
+ log.info("Finished aggregating raw data in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
if (state.is6HourTimeSliceFinished()) {
- boolean is1HourIndexWaitAborted = false;
- waitFor(state.getRemainingRawData());
- try {
- state.getOneHourIndexEntriesArrival().await();
-
- List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getOneHourIndexEntriesLock().writeLock().lock();
- log.debug("Preparing to submit 1 hour data aggregation tasks for
" +
- state.getOneHourIndexEntries().size() + " schedules");
- for (Integer scheduleId : state.getOneHourIndexEntries()) {
- queryFutures.add(dao.findOneHourMetricsAsync(scheduleId,
state.getSixHourTimeSlice().getMillis(),
- state.getSixHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new
Aggregate1HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- state.getAggregationTasks().submit(new Aggregate1HourData(dao,
state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- is1HourIndexWaitAborted = true;
- if (log.isDebugEnabled()) {
- log.debug("Some 6 hour aggregates may not get generated.
There was an unexpected error while " +
- "loading 1 hour index entries", e);
- } else {
- log.warn("Some 6 hour aggregates may not get generated.
There was an unexpected error while " +
- "loading 1 hour index entries: " +
ThrowableUtil.getRootMessage(e));
- }
- } finally {
- deleteIndexEntries(MetricsTable.SIX_HOUR);
- if (!is1HourIndexWaitAborted) {
- state.getOneHourIndexEntriesLock().writeLock().unlock();
- }
- }
+ log.info("Starting aggregation of 1 hour data");
+ stopwatch.reset().start();
+ indexUpdates.add(MetricsTable.SIX_HOUR);
+ StorageResultSetFuture oneHourIndexFuture =
dao.findMetricsIndexEntriesAsync(MetricsTable.SIX_HOUR,
+ state.getSixHourTimeSlice().getMillis());
+ Futures.addCallback(oneHourIndexFuture, new OneHourDataScheduler(state),
+ state.getAggregationTasks());
+
+ state.getOneHourAggregationDone().await();
+ stopwatch.stop();
+ log.info("Finished aggregating one hour data in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
if (state.is24HourTimeSliceFinished()) {
- boolean is6HourIndexWaitAborted = false;
- waitFor(state.getRemaining1HourData());
- try {
- state.getSixHourIndexEntriesArrival().await();
-
- List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- Set<Integer> scheduleIds = new TreeSet<Integer>();
- state.getSixHourIndexEntriesLock().writeLock().lock();
- log.debug("Preparing to submit 6 hour data aggregation tasks for
" +
- state.getSixHourIndexEntries().size() + " schedules");
- for (Integer scheduleId : state.getSixHourIndexEntries()) {
- queryFutures.add(dao.findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
- state.getTwentyFourHourTimeSliceEnd().getMillis()));
- scheduleIds.add(scheduleId);
- if (queryFutures.size() == batchSize) {
- state.getAggregationTasks().submit(new
Aggregate6HourData(dao, state, scheduleIds,
- queryFutures));
- queryFutures = new
ArrayList<StorageResultSetFuture>(batchSize);
- scheduleIds = new TreeSet<Integer>();
- }
- }
- if (!queryFutures.isEmpty()) {
- state.getAggregationTasks().submit(new Aggregate6HourData(dao,
state, scheduleIds,
- queryFutures));
- queryFutures = null;
- scheduleIds = null;
- }
- } catch (AbortedException e) {
- is6HourIndexWaitAborted = true;
- if (log.isDebugEnabled()) {
- log.debug("Some 24 hour aggregates may not get generated.
There was an unexpected error while " +
- "loading 6 hour index entries", e);
- } else {
- log.warn("Some 24 hour aggregates may not get generated.
There was an unexpected error while " +
- "loading 6 hour index entries: " +
ThrowableUtil.getRootMessage(e));
- }
- } finally {
- deleteIndexEntries(MetricsTable.TWENTY_FOUR_HOUR);
- if (!is6HourIndexWaitAborted) {
- state.getSixHourIndexEntriesLock().writeLock().unlock();
- }
- }
+ log.info("Starting aggregation of 6 hour data");
+ stopwatch.reset().start();
+ indexUpdates.add(MetricsTable.TWENTY_FOUR_HOUR);
+ StorageResultSetFuture sixHourIndexFuture =
dao.findMetricsIndexEntriesAsync(
+ MetricsTable.TWENTY_FOUR_HOUR,
state.getTwentyFourHourTimeSlice().getMillis());
+ Futures.addCallback(sixHourIndexFuture, new SixHourDataScheduler(state),
+ state.getAggregationTasks());
+
+ state.getSixHourAggregationDone().await();
+ stopwatch.stop();
+ log.info("Finished aggregating six hour data in " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
}
- long lastUpdated = System.currentTimeMillis();
- while (!isAggregationFinished()) {
- if (log.isDebugEnabled() && ((System.currentTimeMillis() -
lastUpdated) >= 30000)) {
- log.debug("Remaining aggregation:\n" +
- "raw data - " + state.getRemainingRawData().get() +
"\n" +
- "1 hour data - " + state.getRemaining1HourData().get()
+ "\n" +
- "6 hour data - " + state.getRemaining6HourData().get()
+ "\n");
- lastUpdated = System.currentTimeMillis();
- }
- Thread.sleep(3000);
+ CountDownLatch updateIndexSignal = new CountDownLatch(indexUpdates.size());
+ for (MetricsTable table : indexUpdates) {
+ deleteIndexEntries(table, updateIndexSignal);
}
- } catch (InterruptedException e) {
- if (log.isDebugEnabled()) {
- log.debug("An interrupt occurred while waiting for aggregation to
finish. Aborting remaining work.", e);
- } else {
- log.warn("An interrupt occurred while waiting for aggregation to
finish. Aborting remaining work: " +
- ThrowableUtil.getRootMessage(e));
- }
- log.warn("An interrupt occurred while waiting for aggregation to
finish", e);
- }
- return oneHourData;
- }
+ updateIndexSignal.await();
- private void waitFor(AtomicInteger remainingData) throws InterruptedException {
- while (remainingData.get() > 0) {
- Thread.sleep(50);
+ return oneHourData;
+ } catch (InterruptedException e) {
+ log.info("There was an interrupt while waiting for aggregation to
finish. Aggregation will be aborted.");
+ return Collections.emptySet();
+ } catch (AbortedException e) {
+ log.warn("Aggregation has been aborted: " + e.getMessage());
+ return Collections.emptySet();
}
}
- private boolean isAggregationFinished() {
- return state.getRemainingRawData().get() <= 0 &&
state.getRemaining1HourData().get() <= 0 &&
- state.getRemaining6HourData().get() <= 0 &&
remainingIndexEntries.get() <= 0;
- }
-
- private void deleteIndexEntries(final MetricsTable table) {
+ private void deleteIndexEntries(final MetricsTable table, final CountDownLatch
doneSignal) {
final DateTime time;
switch (table) {
case ONE_HOUR:
@@ -368,7 +193,7 @@ public class Aggregator {
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
- remainingIndexEntries.decrementAndGet();
+ doneSignal.countDown();
}
@Override
@@ -380,7 +205,7 @@ public class Aggregator {
log.warn("Failed to delete index entries for table " +
table + " at time [" + time + "]. An " +
"unexpected error occurred: " +
ThrowableUtil.getRootMessage(t));
}
- remainingIndexEntries.decrementAndGet();
+ doneSignal.countDown();
}
});
}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java
new file mode 100644
index 0000000..415c677
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationScheduler.java
@@ -0,0 +1,111 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.FutureCallback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+abstract class BatchAggregationScheduler implements FutureCallback<ResultSet> {
+
+ private final Log log = LogFactory.getLog(BatchAggregationScheduler.class);
+
+ protected AggregationState state;
+
+ public BatchAggregationScheduler(AggregationState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void onSuccess(ResultSet indexResultSet) {
+ Stopwatch stopwatch = new Stopwatch().start();
+ Stopwatch batchStopwatch = new Stopwatch().start();
+ List<StorageResultSetFuture> queryFutures = new
ArrayList<StorageResultSetFuture>(state.getBatchSize());
+ int numSchedules = 0;
+ try {
+ for (Row row : indexResultSet) {
+ state.getPermits().acquire();
+ ++numSchedules;
+ getRemainingSchedules().incrementAndGet();
+ queryFutures.add(findMetricData(row.getInt(1)));
+ if (queryFutures.size() == state.getBatchSize()) {
+ state.getAggregationTasks().submit(new
BatchAggregator(createBatchAggregationState(queryFutures,
+ batchStopwatch)));
+ queryFutures = new
ArrayList<StorageResultSetFuture>(state.getBatchSize());
+ batchStopwatch = new Stopwatch().start();
+ }
+ }
+ if (!queryFutures.isEmpty()) {
+ state.getAggregationTasks().submit(new
BatchAggregator(createBatchAggregationState(queryFutures,
+ batchStopwatch)));
+ }
+ if (numSchedules == 0) {
+ getAggregationDoneSignal().countDown();
+ }
+ stopwatch.stop();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished scheduling " + getAggregationType() +
" aggregation tasks for " + numSchedules +
+ " schedules in " + stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ " ms");
+ }
+ } catch (InterruptedException e) {
+ log.info("There was an interrupt while scheduling aggregation tasks for
" + getAggregationType() + ": " +
+ e.getMessage());
+ log.info("Aggregation will be aborted");
+ getAggregationDoneSignal().abort("There was an interrupt while
scheduling aggregation tasks for " +
+ getAggregationType() + ": " + e.getMessage());
+ }
+ }
+
+ private BatchAggregationState
createBatchAggregationState(List<StorageResultSetFuture> queryFutures,
+ Stopwatch batchStopwatch) {
+ return new BatchAggregationState()
+ .setAggregationTasks(state.getAggregationTasks())
+ .setAggregationType(getAggregationType())
+ .setComputeAggregates(getComputeAggregates())
+ .setDoneSignal(getAggregationDoneSignal())
+ .setPermits(state.getPermits())
+ .setQueryFutures(queryFutures)
+ .setRemainingSchedules(getRemainingSchedules())
+ .setStopwatch(batchStopwatch);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("Aggregation for time slice [" + state.getStartTime() +
"] cannot proceed. There was an " +
+ "unexpected error while retrieving " + getAggregationType() +
" index entries.", t);
+ } else {
+ log.warn("Aggregation for time slice [" + state.getStartTime() +
"] cannot proceed. There was an " +
+ "unexpected error while retrieving " + getAggregationType() +
" index entries: " +
+ ThrowableUtil.getRootMessage(t));
+ }
+ getAggregationDoneSignal().abort("There was an error while retrieving "
+ getAggregationType() +
+ " index entries: " + ThrowableUtil.getRootMessage(t));
+ }
+
+ protected abstract SignalingCountDownLatch getAggregationDoneSignal();
+
+ protected abstract AggregationType getAggregationType();
+
+ protected abstract StorageResultSetFuture findMetricData(int scheduleId);
+
+ protected abstract AsyncFunction<List<ResultSet>, List<ResultSet>>
getComputeAggregates();
+
+ protected abstract AtomicInteger getRemainingSchedules();
+
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java
new file mode 100644
index 0000000..4e7985e
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregationState.java
@@ -0,0 +1,108 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.base.Stopwatch;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class BatchAggregationState {
+
+ private List<StorageResultSetFuture> queryFutures;
+
+ private AsyncFunction<List<ResultSet>, List<ResultSet>>
computeAggregates;
+
+ private ListeningExecutorService aggregationTasks;
+
+ private Semaphore permits;
+
+ private AtomicInteger remainingSchedules;
+
+ private SignalingCountDownLatch doneSignal;
+
+ private AggregationType aggregationType;
+
+ private Stopwatch stopwatch;
+
+ AggregationType getAggregationType() {
+ return aggregationType;
+ }
+
+ BatchAggregationState setAggregationType(AggregationType aggregationType) {
+ this.aggregationType = aggregationType;
+ return this;
+ }
+
+ List<StorageResultSetFuture> getQueryFutures() {
+ return queryFutures;
+ }
+
+ BatchAggregationState setQueryFutures(List<StorageResultSetFuture>
queryFutures) {
+ this.queryFutures = queryFutures;
+ return this;
+ }
+
+ AsyncFunction<List<ResultSet>, List<ResultSet>>
getComputeAggregates() {
+ return computeAggregates;
+ }
+
+ BatchAggregationState setComputeAggregates(AsyncFunction<List<ResultSet>,
List<ResultSet>> computeAggregates) {
+ this.computeAggregates = computeAggregates;
+ return this;
+ }
+
+ ListeningExecutorService getAggregationTasks() {
+ return aggregationTasks;
+ }
+
+ BatchAggregationState setAggregationTasks(ListeningExecutorService aggregationTasks)
{
+ this.aggregationTasks = aggregationTasks;
+ return this;
+ }
+
+ Semaphore getPermits() {
+ return permits;
+ }
+
+ BatchAggregationState setPermits(Semaphore permits) {
+ this.permits = permits;
+ return this;
+ }
+
+ AtomicInteger getRemainingSchedules() {
+ return remainingSchedules;
+ }
+
+ BatchAggregationState setRemainingSchedules(AtomicInteger remainingSchedules) {
+ this.remainingSchedules = remainingSchedules;
+ return this;
+ }
+
+ SignalingCountDownLatch getDoneSignal() {
+ return doneSignal;
+ }
+
+ BatchAggregationState setDoneSignal(SignalingCountDownLatch doneSignal) {
+ this.doneSignal = doneSignal;
+ return this;
+ }
+
+ Stopwatch getStopwatch() {
+ return stopwatch;
+ }
+
+ BatchAggregationState setStopwatch(Stopwatch stopwatch) {
+ this.stopwatch = stopwatch;
+ return this;
+ }
+
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java
new file mode 100644
index 0000000..363a6f2
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/BatchAggregator.java
@@ -0,0 +1,78 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.rhq.core.util.exception.ThrowableUtil;
+
+/**
+ * @author John Sanda
+ */
+class BatchAggregator implements Runnable {
+
+ private final Log log = LogFactory.getLog(BatchAggregator.class);
+
+ private BatchAggregationState state;
+
+ public BatchAggregator(BatchAggregationState state) {
+ this.state = state;
+ }
+
+ @Override
+ public void run() {
+ ListenableFuture<List<ResultSet>> queriesFuture =
Futures.successfulAsList(state.getQueryFutures());
+ ListenableFuture<List<ResultSet>> insertFutures =
Futures.transform(queriesFuture,
+ state.getComputeAggregates(), state.getAggregationTasks());
+ Futures.addCallback(insertFutures, new
FutureCallback<List<ResultSet>>() {
+ @Override
+ public void onSuccess(List<ResultSet> result) {
+ updateRemainingSchedules();
+ state.getStopwatch().stop();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Finished aggregating " +
state.getAggregationType() + " for " +
+ state.getQueryFutures().size() + " schedules in " +
+ state.getStopwatch().elapsed(TimeUnit.MILLISECONDS) + "
ms");
+ }
+
+ state.getPermits().release(state.getQueryFutures().size());
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (log.isDebugEnabled()) {
+ log.debug("There was an error during " +
state.getAggregationType() + " aggregation",
+ ThrowableUtil.getRootCause(t));
+ } else {
+ log.warn("There was an error during " +
state.getAggregationType() + " aggregation: " +
+ ThrowableUtil.getRootMessage(t));
+ }
+ state.getPermits().release(state.getQueryFutures().size());
+ updateRemainingSchedules();
+ }
+ }, state.getAggregationTasks());
+ }
+
+ private void updateRemainingSchedules() {
+ int count =
state.getRemainingSchedules().addAndGet(-state.getQueryFutures().size());
+ if (log.isDebugEnabled()) {
+ log.debug("There are " + count + " remaining schedules with
" + state.getAggregationType() +
+ " to be aggregated");
+ }
+ if (count == 0) {
+ state.getDoneSignal().countDown();
+ } else if (count < 0) {
+ log.warn("The number of remaining schedules should never be less that
zero. ");
+ state.getDoneSignal().abort("There are " + count + " remaining
schedules with " +
+ state.getAggregationType() + " to be aggregated. The count should
never be less than zero.");
+ }
+ }
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java
new file mode 100644
index 0000000..d172f08
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/OneHourDataScheduler.java
@@ -0,0 +1,47 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.AsyncFunction;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class OneHourDataScheduler extends BatchAggregationScheduler {
+
+ public OneHourDataScheduler(AggregationState state) {
+ super(state);
+ }
+
+ @Override
+ protected SignalingCountDownLatch getAggregationDoneSignal() {
+ return state.getOneHourAggregationDone();
+ }
+
+ @Override
+ protected AggregationType getAggregationType() {
+ return AggregationType.ONE_HOUR;
+ }
+
+ @Override
+ protected StorageResultSetFuture findMetricData(int scheduleId) {
+ return state.getDao().findOneHourMetricsAsync(scheduleId,
state.getSixHourTimeSlice().getMillis(),
+ state.getSixHourTimeSliceEnd().getMillis());
+ }
+
+ @Override
+ protected AsyncFunction<List<ResultSet>, List<ResultSet>>
getComputeAggregates() {
+ return state.getCompute6HourData();
+ }
+
+ @Override
+ protected AtomicInteger getRemainingSchedules() {
+ return state.getRemaining1HourData();
+ }
+
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java
new file mode 100644
index 0000000..cd20aad
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/RawDataScheduler.java
@@ -0,0 +1,47 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.AsyncFunction;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class RawDataScheduler extends BatchAggregationScheduler {
+
+ public RawDataScheduler(AggregationState state) {
+ super(state);
+ }
+
+ @Override
+ protected SignalingCountDownLatch getAggregationDoneSignal() {
+ return state.getRawAggregationDone();
+ }
+
+ @Override
+ protected AggregationType getAggregationType() {
+ return AggregationType.RAW;
+ }
+
+ @Override
+ protected StorageResultSetFuture findMetricData(int scheduleId) {
+ return state.getDao().findRawMetricsAsync(scheduleId,
state.getOneHourTimeSlice().getMillis(),
+ state.getOneHourTimeSliceEnd().getMillis());
+ }
+
+ @Override
+ protected AsyncFunction<List<ResultSet>, List<ResultSet>>
getComputeAggregates() {
+ return state.getCompute1HourData();
+ }
+
+ @Override
+ protected AtomicInteger getRemainingSchedules() {
+ return state.getRemainingRawData();
+ }
+
+}
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java
new file mode 100644
index 0000000..1dc17a5
--- /dev/null
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/aggregation/SixHourDataScheduler.java
@@ -0,0 +1,46 @@
+package org.rhq.server.metrics.aggregation;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.datastax.driver.core.ResultSet;
+import com.google.common.util.concurrent.AsyncFunction;
+
+import org.rhq.server.metrics.SignalingCountDownLatch;
+import org.rhq.server.metrics.StorageResultSetFuture;
+
+/**
+ * @author John Sanda
+ */
+class SixHourDataScheduler extends BatchAggregationScheduler {
+
+ public SixHourDataScheduler(AggregationState state) {
+ super(state);
+ }
+
+ @Override
+ protected SignalingCountDownLatch getAggregationDoneSignal() {
+ return state.getSixHourAggregationDone();
+ }
+
+ @Override
+ protected AggregationType getAggregationType() {
+ return AggregationType.SIX_HOUR;
+ }
+
+ @Override
+ protected StorageResultSetFuture findMetricData(int scheduleId) {
+ return state.getDao().findSixHourMetricsAsync(scheduleId,
state.getTwentyFourHourTimeSlice().getMillis(),
+ state.getTwentyFourHourTimeSliceEnd().getMillis());
+ }
+
+ @Override
+ protected AsyncFunction<List<ResultSet>, List<ResultSet>>
getComputeAggregates() {
+ return state.getCompute24HourData();
+ }
+
+ @Override
+ protected AtomicInteger getRemainingSchedules() {
+ return state.getRemaining6HourData();
+ }
+}