modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java
| 74 ++++++
modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java
| 38 +++
modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java
| 108 +++++-----
modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java
| 21 +
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
| 10
modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
| 18 +
6 files changed, 210 insertions(+), 59 deletions(-)
New commits:
commit 64735ee2aa844031c46f1da06fc2eedea00335c2
Author: John Sanda <jsanda(a)redhat.com>
Date: Thu May 23 22:20:18 2013 -0400
updating logging
removing logging from MetricsServer.updateMetricsIndex method as it will result
in a lot of output in the logs.
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
index ba228d9..6242df6 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
@@ -40,7 +40,7 @@ import org.rhq.server.metrics.domain.AggregateType;
*/
public class MetricsBaselineCalculator {
- private final Log log = LogFactory.getLog(MetricsServer.class);
+ private final Log log = LogFactory.getLog(MetricsBaselineCalculator.class);
private MetricsDAO metricsDAO;
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
index 0e07d4d..d6d388b 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java
@@ -26,8 +26,6 @@
package org.rhq.server.metrics;
import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -234,28 +232,36 @@ public class MetricsServer {
}
public void addNumericData(Set<MeasurementDataNumeric> dataSet) {
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting " + dataSet.size() + " raw
metrics");
+ }
+ long startTime = System.currentTimeMillis();
+ int count = 0;
try {
for (MeasurementDataNumeric data : dataSet) {
dao.insertRawData(data);
+ ++count;
}
updateMetricsIndex(dataSet);
} catch (Exception e) {
log.error("An error occurred while inserting raw numeric data",
e);
throw new RuntimeException(e);
+ } finally {
+ long endTime = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("Persisted " + count + " raw metrics in " +
(endTime - startTime) + " ms");
+ }
}
}
void updateMetricsIndex(Set<MeasurementDataNumeric> rawMetrics) {
+
Map<Integer, Long> updates = new TreeMap<Integer, Long>();
for (MeasurementDataNumeric rawMetric : rawMetrics) {
updates.put(rawMetric.getScheduleId(), dateTimeService.getTimeSlice(
new DateTime(rawMetric.getTimestamp()),
configuration.getRawTimeSliceDuration()).getMillis());
}
- if (log.isDebugEnabled()) {
- log.debug("Updating one hour index with time slices " +
StringUtil.collectionToString(updates.values()));
- }
-
dao.updateMetricsIndex(MetricsTable.ONE_HOUR, updates);
}
commit e8582b3995f749923255418277f9dff88e8125d4
Author: John Sanda <jsanda(a)redhat.com>
Date: Wed May 22 20:20:35 2013 -0400
changing transaction demarcation for basline and OOB calculations
The queries and methods in MeasurementBaselineManagerBean and in
MeasurementOOBManagerBean were too coarse-grained. In a test environment, I
was seeing regular timeouts while querying for schedules that do not yet have
baselines. And the method that computes OOBs was experiencing frequent
transaction timeouts. The transaction demarcation has been changed so that we
are doing a small, fixed size amount of working for each transaction when
calculating baselines and OOBs. Calls to Cassandra have been moved to
non-transactional methods.
Lastly, fixing a bug in MetricsBaselineCalculator to make sure we always return
a baseline for each schedule that it receives.
diff --git
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java
index 8fb615c..37af9de 100644
---
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java
+++
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java
@@ -20,6 +20,8 @@ package org.rhq.enterprise.server.measurement;
import java.util.Arrays;
import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
@@ -88,7 +90,8 @@ public class MeasurementBaselineManagerBean implements
MeasurementBaselineManage
private SessionManagerBean sessionManager;
private final Log log = LogFactory.getLog(MeasurementBaselineManagerBean.class);
- private static final int BASELINE_PROCESSING_LIMIT = 50000;
+
+ private static final int BASELINE_PROCESSING_LIMIT = 100;
@TransactionAttribute(TransactionAttributeType.NEVER)
public void calculateAutoBaselines() {
@@ -185,11 +188,14 @@ public class MeasurementBaselineManagerBean implements
MeasurementBaselineManage
* In any event, an appropriate chunking solution needs to be found, and
that partitioning strategy
* needs to replace the limits in the query today.
*/
- int schedulesWithoutBaselines = measurementBaselineManager
- ._calculateAutoBaselinesINSERT(amountOfData);
- totalProcessed += schedulesWithoutBaselines;
-
- if (schedulesWithoutBaselines < BASELINE_PROCESSING_LIMIT) {
+// int schedulesWithoutBaselines = measurementBaselineManager
+// ._calculateAutoBaselinesINSERT(amountOfData);
+ List<MeasurementSchedule> schedulesWithoutBaselines =
+ measurementBaselineManager.getSchedulesWithoutBaselines();
+ measurementBaselineManager.calculateBaselines(schedulesWithoutBaselines,
now, amountOfData);
+ totalProcessed += schedulesWithoutBaselines.size();
+
+ if (schedulesWithoutBaselines.size() < BASELINE_PROCESSING_LIMIT) {
break;
}
}
@@ -217,6 +223,62 @@ public class MeasurementBaselineManagerBean implements
MeasurementBaselineManage
@SuppressWarnings("unchecked")
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
+ public List<MeasurementSchedule> getSchedulesWithoutBaselines() {
+ Query query = this.entityManager
+
.createNamedQuery(MeasurementBaseline.QUERY_FIND_MEASUREMENT_SCHEDULES_WITHOUT_AUTOBASELINES);
+ query.setMaxResults(BASELINE_PROCESSING_LIMIT);
+ List<MeasurementSchedule> scheduleIdsWithoutBaselines =
query.getResultList();
+
+ return scheduleIdsWithoutBaselines;
+ }
+
+ @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
+ public void calculateBaselines(List<MeasurementSchedule> schedules, long
olderThan, long amountOfData) {
+ long endTime = olderThan;
+ long startTime = endTime - amountOfData;
+
+ log.debug("Computing baselines for " + schedules.size() + "
schedules");
+ MetricsBaselineCalculator baselineCalculator = new
MetricsBaselineCalculator(sessionManager.getMetricsDAO());
+ long calcStartTime = System.currentTimeMillis();
+ List<MeasurementBaseline> results =
baselineCalculator.calculateBaselines(schedules, startTime, endTime);
+ long calcEndTime = System.currentTimeMillis();
+ int count = results.size();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Finished computing " + count + " new baselines in
" + (calcEndTime - calcStartTime) + " ms");
+ }
+
+ log.debug("Persisting baselines calculations");
+ long saveStartTime = System.currentTimeMillis();
+ Iterator<MeasurementBaseline> iterator = results.iterator();
+ List<MeasurementBaseline> queue = new
LinkedList<MeasurementBaseline>();
+ while (iterator.hasNext()) {
+ if (queue.size() == 10) {
+ measurementBaselineManager.saveNewBaselines(queue);
+ queue = new LinkedList<MeasurementBaseline>();
+ }
+ queue.add(iterator.next());
+ }
+ if (!queue.isEmpty()) {
+ measurementBaselineManager.saveNewBaselines(queue);
+ }
+
+ long saveEndTime = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("Finished persisting " + count + " baselines in
" + (saveEndTime - saveStartTime) + " ms");
+ }
+ }
+
+ @Override
+ @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
+ public void saveNewBaselines(List<MeasurementBaseline> baselines) {
+ for (MeasurementBaseline baseline : baselines) {
+ entityManager.merge(baseline);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public int _calculateAutoBaselinesINSERT(long amountOfData) throws Exception {
long endTime = System.currentTimeMillis();
long startTime = endTime - amountOfData;
diff --git
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java
index 7f7655a..8781fc5 100644
---
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java
+++
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java
@@ -24,6 +24,7 @@ import javax.ejb.Local;
import org.rhq.core.domain.auth.Subject;
import org.rhq.core.domain.measurement.MeasurementBaseline;
+import org.rhq.core.domain.measurement.MeasurementSchedule;
import org.rhq.core.domain.resource.Resource;
/**
@@ -68,10 +69,47 @@ public interface MeasurementBaselineManagerLocal {
* Inserts baselines "as appropriate" for measurements that have at least
amountOfData
* @param amountOfData will use amountOfData to compute new min/max/mean for
baselines as appropriate
* @return number of rows inserted
+ * @deprecated This method is pending removal after peer review. Because 1hr data is
+ * now stored in Cassandra, transaction boundaries needed to change. This method has
+ * been replaced by {@link #getSchedulesWithoutBaselines()}, {@link
#calculateBaselines(java.util.List, long)},
+ * and {@link #saveNewBaselines(java.util.List)}.
* @throws Exception
*/
int _calculateAutoBaselinesINSERT(long amountOfData) throws Exception;
+ /**
+ * <strong>Note</strong> This method exists only for transaction
demarcation.
+ *
+ * @return A list of schedules that do not have baselines. This list is not assumed
+ * to be an exhaustive list of schedules that lack a baseline. As such, this method
+ * will be called repeatedly during baseline calculations to get all of the
necessary
+ * schedules.
+ */
+ List<MeasurementSchedule> getSchedulesWithoutBaselines();
+
+ /**
+ * Given a list of schedules, this method calculates and stores baselines using the
+ * amount of 1 hr data specified and older than the time specified.
+ * <br/><br/>
+ * <strong>Note</strong> This method exists only for transaction
demarcation.
+ *
+ * @param schedules The schedules that do not yet have baselines
+ * @param olderThan Use 1 hr data prior to this time
+ * @param amountOfData The amount of data to use for calculating baselines. This
value
+ * is treated as a duration. For example, a value of 259200000
+ * would be treated as 3 days.
+ */
+ void calculateBaselines(List<MeasurementSchedule> schedules, long olderThan,
long amountOfData);
+
+ /**
+ * Persists the newly calculated baselines.
+ * <br/><br/>
+ * <strong>Note</strong> This method exists only for transaction
demarcation.
+ *
+ * @param baselines The baselines to persist.
+ */
+ void saveNewBaselines(List<MeasurementBaseline> baselines);
+
MeasurementBaseline getBaselineIfEqual(Subject subject, int groupId, int
definitionId);
/**
diff --git
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java
index eecbbca..f742c7f 100644
---
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java
+++
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java
@@ -82,6 +82,9 @@ public class MeasurementOOBManagerBean implements
MeasurementOOBManagerLocal {
@EJB
AuthorizationManagerLocal authMangager;
+ @EJB
+ MeasurementOOBManagerLocal oobManager;
+
/**
* Compute oobs from the values in the 1h measurement table that just got added.
* For the total result, this is an incremental computation. The idea is that
@@ -191,56 +194,15 @@ public class MeasurementOOBManagerBean implements
MeasurementOOBManagerLocal {
}
@SuppressWarnings("unchecked")
+ @TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
public void computeOOBsForLastHour(Subject subject,
Iterable<AggregateNumericMetric> metrics) {
+ log.info("Computing OOBs");
int count = 0;
long startTime = System.currentTimeMillis();
try {
for (AggregateNumericMetric metric : metrics) {
try {
- List<MeasurementBaseline> baselines =
entityManager.createQuery(
- "select baseline from MeasurementBaseline baseline where
baseline.schedule.id = :scheduleId")
- .setParameter("scheduleId", metric.getScheduleId())
- .getResultList();
- if (baselines.isEmpty()) {
- continue;
- }
- MeasurementBaseline baseline = baselines.get(0);
- Long upperDelta = null;
- Long lowerDelta = null;
-
- if (isPastUpperBound(baseline, metric)) {
- upperDelta =
- Math.round(((metric.getMax() - baseline.getMax()) /
(baseline.getMax() - baseline.getMin())) * 100);
- }
-
- if (isPastLowerBound(baseline, metric)) {
- lowerDelta =
- Math.round(((baseline.getMin() - metric.getMin()) /
(baseline.getMax() - baseline.getMin())) * 100);
- }
-
- Integer oobFactor;
- if (upperDelta != null && lowerDelta == null) {
- oobFactor = upperDelta.intValue();
- } else if (upperDelta == null && lowerDelta != null) {
- oobFactor = lowerDelta.intValue();
- } else if (upperDelta != null && lowerDelta != null) {
- if (upperDelta > lowerDelta) {
- oobFactor = upperDelta.intValue();
- } else {
- oobFactor = lowerDelta.intValue();
- }
- } else { // both are null
- oobFactor = null;
- }
-
- if (oobFactor != null) {
- MeasurementOOB oob = new MeasurementOOB();
- oob.setScheduleId(metric.getScheduleId());
- oob.setTimestamp(metric.getTimestamp());
- oob.setOobFactor(oobFactor);
- entityManager.merge(oob);
- ++count;
- }
+ count += oobManager.calculateOOB(metric);
} catch (Exception e) {
log.error("An error occurred while calculating OOBs for " +
metric, e);
throw new RuntimeException(e);
@@ -248,10 +210,66 @@ public class MeasurementOOBManagerBean implements
MeasurementOOBManagerLocal {
}
} finally {
long endTime = System.currentTimeMillis();
+ if (log.isInfoEnabled()) {
+ log.info("Finished calculating " + count + " OOBs in
" + (endTime - startTime) + " ms");
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW)
+ public int calculateOOB(AggregateNumericMetric metric) {
+ List<MeasurementBaseline> baselines = entityManager.createQuery(
+ "select baseline from MeasurementBaseline baseline where
baseline.schedule.id = :scheduleId")
+ .setParameter("scheduleId", metric.getScheduleId())
+ .getResultList();
+ if (baselines.isEmpty()) {
+ return 0;
+ }
+ MeasurementBaseline baseline = baselines.get(0);
+ Long upperDelta = null;
+ Long lowerDelta = null;
+
+ if (isPastUpperBound(baseline, metric)) {
+ upperDelta =
+ Math.round(((metric.getMax() - baseline.getMax()) / (baseline.getMax() -
baseline.getMin())) * 100);
+ }
+
+ if (isPastLowerBound(baseline, metric)) {
+ lowerDelta =
+ Math.round(((baseline.getMin() - metric.getMin()) / (baseline.getMax() -
baseline.getMin())) * 100);
+ }
+
+ Integer oobFactor;
+ if (upperDelta != null && lowerDelta == null) {
+ oobFactor = upperDelta.intValue();
+ } else if (upperDelta == null && lowerDelta != null) {
+ oobFactor = lowerDelta.intValue();
+ } else if (upperDelta != null && lowerDelta != null) {
+ if (upperDelta > lowerDelta) {
+ oobFactor = upperDelta.intValue();
+ } else {
+ oobFactor = lowerDelta.intValue();
+ }
+ } else { // both are null
+ oobFactor = null;
+ }
+
+ if (oobFactor != null) {
+ MeasurementOOB oob = new MeasurementOOB();
+ oob.setScheduleId(metric.getScheduleId());
+ oob.setTimestamp(metric.getTimestamp());
+ oob.setOobFactor(oobFactor);
+
if (log.isDebugEnabled()) {
- log.debug("Finished calculating " + count + " OOBs in
" + (endTime - startTime) + " ms");
+ log.debug("Generated OOB " + oob + " for 1 hr metric
" + metric + " with baseline " + baseline);
}
+
+ entityManager.merge(oob);
+ return 1;
}
+
+ return 0;
}
private boolean isPastUpperBound(MeasurementBaseline baseline, AggregateNumericMetric
metric) {
diff --git
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java
index b452a28..853c5af 100644
---
a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java
+++
b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java
@@ -45,9 +45,30 @@ public interface MeasurementOOBManagerLocal {
*/
void computeOOBsFromHourBeginingAt(Subject subject, long begin);
+ /**
+ * Computes OOBs using the provided 1 hr data which should be the most recent 1 hr
+ * aggregates. These metrics are provided as an argument as opposed to querying for
+ * them because we already have the 1 hr aggregates load in memory when metrics
+ * aggregation runs prior to calculating OOBs.
+ *
+ * @param subject
+ * @param metrics The most recent 1 hr aggregates
+ */
void computeOOBsForLastHour(Subject subject, Iterable<AggregateNumericMetric>
metrics);
/**
+ * Determines and calculates an OOB if necessary, If an OOB is generated, this
method
+ * saves it to the database.
+ * <br/><br/>
+ * <strong>Note</strong> This method exists only for transaction
demarcation.
+ *
+ * @param metric The 1 hr metric that is used to determine whether or not an OOB
should
+ * be generated
+ * @return 1 if an OOB is generated, 0 otherwise
+ */
+ int calculateOOB(AggregateNumericMetric metric);
+
+ /**
* Return OOB Composites that contain all information about the OOBs in a given time
as aggregates.
* @param subject The caller
* @param metricNameFilter
diff --git
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
index 3aa5441..ba228d9 100644
---
a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
+++
b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java
@@ -113,6 +113,12 @@ public class MetricsBaselineCalculator {
return baseline;
}
- return null;
+ MeasurementBaseline baseline = new MeasurementBaseline();
+ baseline.setMax(Double.NaN);
+ baseline.setMin(Double.NaN);
+ baseline.setMean(Double.NaN);
+ baseline.setSchedule(schedule);
+
+ return baseline;
}
}