modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementReport.java
| 20 +
modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java
| 54 ++-
modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java
| 153 +++++++++-
3 files changed, 201 insertions(+), 26 deletions(-)
New commits:
commit 7c9cbda06b11ac9740310190598d333f2e45d434
Author: John Mazzitelli <mazz(a)redhat.com>
Date: Wed Jun 27 17:22:39 2012 -0400
[BZ 783603] make sure the cached report in the runnable doesn't grow unbounded.
when a metric is requested, make sure we remove it so it isn't cached and so it
isn't reported again
diff --git
a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementReport.java
b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementReport.java
index f812575..ff7392d 100644
---
a/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementReport.java
+++
b/modules/core/domain/src/main/java/org/rhq/core/domain/measurement/MeasurementReport.java
@@ -145,12 +145,20 @@ public class MeasurementReport implements Serializable {
* @param report the report that contains the measurements to add
* @param metrics the metrics whose measurement data (if found in the given report)
should be added to this report.
* If null or empty, then all of the data in the given report will be
added to this report.
+ * @param removeSourceData if true, any data that was transferred from
<code>report</code> to this object will be removed
+ * from the original <code>report</code> that was
passed into this method.
*/
- public synchronized void add(MeasurementReport report,
Set<MeasurementScheduleRequest> metrics) {
+ public synchronized void add(MeasurementReport report,
Set<MeasurementScheduleRequest> metrics,
+ boolean removeSourceData) {
if (metrics == null || metrics.isEmpty()) {
measurementNumericData.addAll(report.measurementNumericData);
measurementTraitData.addAll(report.measurementTraitData);
callTimeData.addAll(report.callTimeData);
+ if (removeSourceData) {
+ report.measurementNumericData.clear();
+ report.measurementTraitData.clear();
+ report.callTimeData.clear();
+ }
} else {
// note that usually the metric set is very small (typically not more than
around 5, probably normally around 1 or 2)
// so this loop isn't going to be performed with lots of iterations.
@@ -162,6 +170,9 @@ public class MeasurementReport implements Serializable {
MeasurementDataNumeric data = i.next();
if (data.getName().equals(metric.getName())) {
measurementNumericData.add(data);
+ if (removeSourceData) {
+ i.remove();
+ }
}
}
break;
@@ -172,6 +183,9 @@ public class MeasurementReport implements Serializable {
MeasurementDataTrait data = i.next();
if (data.getName().equals(metric.getName())) {
measurementTraitData.add(data);
+ if (removeSourceData) {
+ i.remove();
+ }
}
}
break;
@@ -180,6 +194,10 @@ public class MeasurementReport implements Serializable {
// There is only ever one calltime metric per resource so if we are
being asked to
// add the calltime data, we can avoid doing any iterations and just
use addAll API.
callTimeData.addAll(report.callTimeData);
+ if (removeSourceData) {
+ report.callTimeData.clear();
+ }
+
break;
}
default: {
diff --git
a/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java
b/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java
index 94c024b..bf303b9 100644
---
a/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java
+++
b/modules/core/plugin-api/src/main/java/org/rhq/core/pluginapi/measurement/MeasurementCollectorRunnable.java
@@ -7,6 +7,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -82,8 +83,10 @@ public class MeasurementCollectorRunnable implements Runnable {
/**
* The last known measurements for the resource that this collector is monitoring.
+ * You must synchronize access to this via the R/W lock.
*/
- private MeasurementReport lastReport = new MeasurementReport();
+ private MeasurementReport cachedReport = new MeasurementReport();
+ private ReentrantLock cachedReportLock = new ReentrantLock();
/**
* Accumulated report.
@@ -133,7 +136,7 @@ public class MeasurementCollectorRunnable implements Runnable {
this.initialDelay = initialDelay;
this.interval = interval;
this.threadPool = threadPool;
- this.lastReport = new MeasurementReport();
+ this.cachedReport = new MeasurementReport();
this.facetId = measured.toString();
}
@@ -144,8 +147,16 @@ public class MeasurementCollectorRunnable implements Runnable {
* their {@link MeasurementFacet#getValues()} method should simply be calling this
method.
*/
public void getLastValues(MeasurementReport report,
Set<MeasurementScheduleRequest> metrics) throws Exception {
- this.requestedMetrics.addAll(metrics);
- report.add(this.lastReport, metrics);
+ requestedMetrics.addAll(metrics);
+ cachedReportLock.lock();
+ try {
+ // For all metrics being requested, take their cached values last collected
and transfer them to the given report.
+ // Note that we only remove the metrics that were being requested, leaving
any cached data intact so they can
+ // be retreived later when they are requested.
+ report.add(cachedReport, metrics, true);
+ } finally {
+ cachedReportLock.unlock();
+ }
}
/**
@@ -154,11 +165,11 @@ public class MeasurementCollectorRunnable implements Runnable {
* to start the measurement checking that this object performs.
*/
public void start() {
- boolean isStarted = this.started.getAndSet(true);
+ boolean isStarted = started.getAndSet(true);
if (!isStarted) {
task.cancel(true);
task = threadPool.scheduleWithFixedDelay(this, initialDelay, interval,
TimeUnit.MILLISECONDS);
- log.debug("measurement collector started: " + this.facetId);
+ log.debug("measurement collector started: " + facetId);
}
}
@@ -168,9 +179,18 @@ public class MeasurementCollectorRunnable implements Runnable {
* to stop the measurement checking that this object performs.
*/
public void stop() {
- this.started.set(false);
- this.task.cancel(true);
- this.requestedMetrics.clear();
+ started.set(false);
+ task.cancel(true);
+
+ cachedReportLock.lock();
+ try {
+ cachedReport = new MeasurementReport();
+ } finally {
+ cachedReportLock.unlock();
+ }
+
+ requestedMetrics.clear();
+
log.debug("measurement collector stopped: " + facetId);
}
@@ -183,12 +203,22 @@ public class MeasurementCollectorRunnable implements Runnable {
log.debug("measurement collector is collecting now: " + facetId);
ClassLoader originalClassloader =
Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(this.contextClassloader);
+ Thread.currentThread().setContextClassLoader(contextClassloader);
try {
- this.measured.getValues(lastReport, requestedMetrics);
+ // collect the new data for all metrics previous requested in the past
+ MeasurementReport newData = new MeasurementReport();
+ measured.getValues(newData, requestedMetrics);
if (log.isDebugEnabled()) {
- log.debug("measurement collector last report: " + lastReport);
+ log.debug("measurement collector latest data: " + newData);
+ }
+
+ // put the new data in our cached report (lastReport)
+ cachedReportLock.lock();
+ try {
+ cachedReport.add(newData, null, false);
+ } finally {
+ cachedReportLock.unlock();
}
} catch (Exception e) {
log.warn("measurement collector failed to get values", e);
diff --git
a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java
index f3a6310..238d1ef 100644
---
a/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java
+++
b/modules/core/plugin-container/src/test/java/org/rhq/core/pc/CollectorThreadPoolTest.java
@@ -173,38 +173,136 @@ public class CollectorThreadPoolTest {
runnable.start();
Thread.sleep(1000L);
- report = new MeasurementReport();
- runnable.getLastValues(report, allMetrics); // we are asking for all metrics, and
we should have collected them all
- assert 1000 == report.getCollectionTime();
- assert !report.getNumericData().isEmpty();
- assert !report.getTraitData().isEmpty();
- assert !report.getCallTimeData().isEmpty();
-
// now we only ask for individual metrics, not all of them at once
report = new MeasurementReport();
runnable.getLastValues(report, onlyNumericMetric);
assert 1000 == report.getCollectionTime();
- assert !report.getNumericData().isEmpty();
+ assert report.getNumericData().size() == 1;
assert report.getTraitData().isEmpty() : "we didn't ask for the trait
data";
assert report.getCallTimeData().isEmpty() : "we didn't ask for the
calltime data";
+ // that numeric data should be gone now - if we ask for it again, we
shouldn't get it
+ report = new MeasurementReport();
+ runnable.getLastValues(report, onlyNumericMetric);
+ assert 1000 == report.getCollectionTime();
+ assert report.getNumericData().isEmpty() : "the numeric data should have
already been retrieved and removed";
+
+ // now ask for the trait data
report = new MeasurementReport();
runnable.getLastValues(report, onlyTraitMetric);
assert 1000 == report.getCollectionTime();
assert report.getNumericData().isEmpty() : "we didn't ask for the
numeric data";
- assert !report.getTraitData().isEmpty();
+ assert report.getTraitData().size() == 1;
assert report.getCallTimeData().isEmpty() : "we didn't ask for the
calltime data";
+ report = new MeasurementReport();
+ runnable.getLastValues(report, onlyTraitMetric);
+ assert 1000 == report.getCollectionTime();
+ assert report.getTraitData().isEmpty() : "the trait data should have already
been retrieved and removed";
+
+ // now ask for the calltime data
report = new MeasurementReport();
runnable.getLastValues(report, onlyCalltimeMetric);
assert 1000 == report.getCollectionTime();
assert report.getNumericData().isEmpty() : "we didn't ask for the
numeric data";
assert report.getTraitData().isEmpty() : "we didn't ask for the trait
data";
- assert !report.getCallTimeData().isEmpty();
+ assert report.getCallTimeData().size() == 1;
+ report = new MeasurementReport();
+ runnable.getLastValues(report, onlyCalltimeMetric);
+ assert 1000 == report.getCollectionTime();
+ assert report.getCallTimeData().isEmpty() : "the calltime data should have
already been retrieved and removed";
runnable.stop();
}
+ @Test(enabled = false)
+ // TODO re-enable this test - it passes
+ public void testMultipleCollectionPeriods() throws Exception {
+ TestMultipleMeasumentFacet component = new TestMultipleMeasumentFacet();
+ Set<MeasurementScheduleRequest> allMetrics = new
HashSet<MeasurementScheduleRequest>();
+ allMetrics.add(component.getNumericMetricSchedule());
+ allMetrics.add(component.getTraitMetricSchedule());
+ allMetrics.add(component.getCalltimeMetricSchedule());
+
+ // 123 is too small - the minimum is 60000 so the runnable will bump up our
interval to that min value
+ MeasurementCollectorRunnable runnable = new
MeasurementCollectorRunnable(component, 0L, 123L, null,
+ this.threadPool.getExecutor());
+
+ runnable.getLastValues(new MeasurementReport(), allMetrics); // prime the pump so
we begin collecting everything immediately
+ runnable.start();
+ Thread.sleep(1000L);
+
+ MeasurementReport report = new MeasurementReport();
+ runnable.getLastValues(report, allMetrics);
+ assert 1000L == report.getCollectionTime();
+ assert report.getCallTimeData().size() == 1;
+ assert report.getNumericData().size() == 1;
+ MeasurementDataNumeric nextNumeric = report.getNumericData().iterator().next();
+ assert nextNumeric.getValue() == (double) 1;
+ assert report.getTraitData().size() == 1;
+ MeasurementDataTrait nextTrait = report.getTraitData().iterator().next();
+ assert nextTrait.getValue().equals("1");
+
+ // wait for the next collection interval to occur
+ // don't wait forever, if this fails, we need a way to break the loop.
Don't want longer than 75 seconds
+ int loopCount = 0;
+ report = new MeasurementReport();
+ report.setCollectionTime(1000L);
+ System.out.print("CollectorThreadPoolTest.testMultipleCollectionPeriods()
waiting");
+ while (loopCount++ < 75 && report.getCollectionTime() == 1000L) {
+ System.out.print('.');
+ Thread.sleep(1000L);
+ runnable.getLastValues(report, allMetrics);
+ }
+ System.out.println("done.");
+
+ assert 1001L == report.getCollectionTime();
+ assert report.getCallTimeData().size() == 1 : report.getCallTimeData();
+ assert report.getNumericData().size() == 1 : report.getNumericData();
+ nextNumeric = report.getNumericData().iterator().next();
+ assert nextNumeric.getValue() == (double) 2 : nextNumeric;
+ assert report.getTraitData().size() == 1 : report.getTraitData();
+ nextTrait = report.getTraitData().iterator().next();
+ assert nextTrait.getValue().equals("2") : nextTrait;
+ }
+
+ public void testLotsOfDataMeasurements() throws Exception {
+ TestLotsOfDataMeasurementFacet component = new TestLotsOfDataMeasurementFacet();
+ Set<MeasurementScheduleRequest> allMetrics = new
HashSet<MeasurementScheduleRequest>();
+ allMetrics.add(component.getNumericMetricSchedule());
+ allMetrics.add(component.getTraitMetricSchedule());
+ allMetrics.add(component.getCalltimeMetricSchedule());
+
+ MeasurementCollectorRunnable runnable = new
MeasurementCollectorRunnable(component, 0L, 123L, null,
+ this.threadPool.getExecutor());
+
+ runnable.getLastValues(new MeasurementReport(), allMetrics); // prime the pump so
we begin collecting everything immediately
+ runnable.start();
+ Thread.sleep(1000L);
+
+ // this tests to make sure we can have multiple data points for a single metric
+ MeasurementReport report = new MeasurementReport();
+ runnable.getLastValues(report, allMetrics);
+ assert 1000L == report.getCollectionTime();
+ assert report.getCallTimeData().size() == 1;
+ CallTimeData nextCalltime = report.getCallTimeData().iterator().next();
+ assert nextCalltime.getValues().size() == 2;
+ assert report.getNumericData().size() == 3;
+ MeasurementDataNumeric nextNumeric = report.getNumericData().iterator().next();
+ assert nextNumeric.getValue() == (double) 1;
+ assert report.getTraitData().size() == 3;
+ MeasurementDataTrait nextTrait = report.getTraitData().iterator().next();
+ assert nextTrait.getValue().equals("1");
+
+ // make sure all the data has been flushed now
+ report = new MeasurementReport();
+ runnable.getLastValues(report, allMetrics);
+ assert 1000L == report.getCollectionTime();
+ assert report.getNumericData().isEmpty();
+ assert report.getTraitData().isEmpty();
+ assert report.getCallTimeData().isEmpty();
+ }
+
protected class TestAvailabilityFacet implements AvailabilityFacet {
private AvailabilityType[] avail;
@@ -248,14 +346,15 @@ public class CollectorThreadPoolTest {
return new MeasurementScheduleRequest(3, "calltime", 30000L, true,
DataType.CALLTIME);
}
- private long collectionTime = 1000;
- private int counter = 0;
+ public long collectionTime = 999;
+ public int counter = 0;
@Override
public void getValues(MeasurementReport report,
Set<MeasurementScheduleRequest> metrics) throws Exception {
counter++;
+ collectionTime++;
log("TestMultipleMeasumentFacet.getValues [" + counter + "]:
" + metrics);
- report.setCollectionTime(collectionTime++);
+ report.setCollectionTime(collectionTime);
for (MeasurementScheduleRequest request : metrics) {
if (request.getDataType() == DataType.CALLTIME) {
CallTimeData data = new CallTimeData(request);
@@ -271,6 +370,34 @@ public class CollectorThreadPoolTest {
}
}
}
+ }
+ protected class TestLotsOfDataMeasurementFacet extends TestMultipleMeasumentFacet {
+ @Override
+ public void getValues(MeasurementReport report,
Set<MeasurementScheduleRequest> metrics) throws Exception {
+ counter++;
+ collectionTime++;
+ log("TestLotsOfDataMeasurementFacet.getValues [" + counter +
"]: " + metrics);
+ report.setCollectionTime(collectionTime);
+ for (MeasurementScheduleRequest request : metrics) {
+ if (request.getDataType() == DataType.CALLTIME) {
+ CallTimeData data = new CallTimeData(request);
+ data.addCallData("dest1", new Date(collectionTime - 900),
(long) counter);
+ data.addCallData("dest2", new Date(collectionTime), (long)
counter);
+ report.addData(data);
+ } else if (request.getName().equals(NUMERIC_METRIC_NAME)) {
+ report.addData(new MeasurementDataNumeric(collectionTime - 900,
request, (double) counter));
+ report.addData(new MeasurementDataNumeric(collectionTime - 800,
request, (double) counter));
+ report.addData(new MeasurementDataNumeric(collectionTime, request,
(double) counter));
+ } else if (request.getName().equals(TRAIT_METRIC_NAME)) {
+ report.addData(new MeasurementDataTrait(collectionTime - 900,
request, "" + counter));
+ report.addData(new MeasurementDataTrait(collectionTime - 800,
request, "" + counter));
+ report.addData(new MeasurementDataTrait(collectionTime, request,
"" + counter));
+ } else {
+ log.info("bad test - unknown metric: " + request);
+ throw new IllegalStateException("bad test - unknown metric:
" + request);
+ }
+ }
+ }
}
}