modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java | 74 ++++++ modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java | 38 +++ modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java | 108 +++++----- modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java | 21 + modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java | 10 modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java | 18 + 6 files changed, 210 insertions(+), 59 deletions(-)
New commits: commit 64735ee2aa844031c46f1da06fc2eedea00335c2 Author: John Sanda jsanda@redhat.com Date: Thu May 23 22:20:18 2013 -0400
updating logging
removing logging from MetricsServer.updateMetricsIndex method as it will result in a lot of output in the logs.
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java index ba228d9..6242df6 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java @@ -40,7 +40,7 @@ import org.rhq.server.metrics.domain.AggregateType; */ public class MetricsBaselineCalculator {
- private final Log log = LogFactory.getLog(MetricsServer.class); + private final Log log = LogFactory.getLog(MetricsBaselineCalculator.class);
private MetricsDAO metricsDAO;
diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java index 0e07d4d..d6d388b 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsServer.java @@ -26,8 +26,6 @@ package org.rhq.server.metrics;
import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -234,28 +232,36 @@ public class MetricsServer { }
public void addNumericData(Set<MeasurementDataNumeric> dataSet) { + if (log.isDebugEnabled()) { + log.debug("Persisting " + dataSet.size() + " raw metrics"); + } + long startTime = System.currentTimeMillis(); + int count = 0; try { for (MeasurementDataNumeric data : dataSet) { dao.insertRawData(data); + ++count; } updateMetricsIndex(dataSet); } catch (Exception e) { log.error("An error occurred while inserting raw numeric data", e); throw new RuntimeException(e); + } finally { + long endTime = System.currentTimeMillis(); + if (log.isDebugEnabled()) { + log.debug("Persisted " + count + " raw metrics in " + (endTime - startTime) + " ms"); + } } }
void updateMetricsIndex(Set<MeasurementDataNumeric> rawMetrics) { + Map<Integer, Long> updates = new TreeMap<Integer, Long>(); for (MeasurementDataNumeric rawMetric : rawMetrics) { updates.put(rawMetric.getScheduleId(), dateTimeService.getTimeSlice( new DateTime(rawMetric.getTimestamp()), configuration.getRawTimeSliceDuration()).getMillis()); }
- if (log.isDebugEnabled()) { - log.debug("Updating one hour index with time slices " + StringUtil.collectionToString(updates.values())); - } - dao.updateMetricsIndex(MetricsTable.ONE_HOUR, updates); }
commit e8582b3995f749923255418277f9dff88e8125d4 Author: John Sanda jsanda@redhat.com Date: Wed May 22 20:20:35 2013 -0400
changing transaction demarcation for basline and OOB calculations
The queries and methods in MeasurementBaselineManagerBean and in MeasurementOOBManagerBean were too coarse-grained. In a test environment, I was seeing regular timeouts while querying for schedules that do not yet have baselines. And the method that computes OOBs was experiencing frequent transaction timeouts. The transaction demarcation has been changed so that we are doing a small, fixed size amount of working for each transaction when calculating baselines and OOBs. Calls to Cassandra have been moved to non-transactional methods.
Lastly, fixing a bug in MetricsBaselineCalculator to make sure we always return a baseline for each schedule that it receives.
diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java index 8fb615c..37af9de 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerBean.java @@ -20,6 +20,8 @@ package org.rhq.enterprise.server.measurement;
import java.util.Arrays; import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Properties;
@@ -88,7 +90,8 @@ public class MeasurementBaselineManagerBean implements MeasurementBaselineManage private SessionManagerBean sessionManager;
private final Log log = LogFactory.getLog(MeasurementBaselineManagerBean.class); - private static final int BASELINE_PROCESSING_LIMIT = 50000; + + private static final int BASELINE_PROCESSING_LIMIT = 100;
@TransactionAttribute(TransactionAttributeType.NEVER) public void calculateAutoBaselines() { @@ -185,11 +188,14 @@ public class MeasurementBaselineManagerBean implements MeasurementBaselineManage * In any event, an appropriate chunking solution needs to be found, and that partitioning strategy * needs to replace the limits in the query today. */ - int schedulesWithoutBaselines = measurementBaselineManager - ._calculateAutoBaselinesINSERT(amountOfData); - totalProcessed += schedulesWithoutBaselines; - - if (schedulesWithoutBaselines < BASELINE_PROCESSING_LIMIT) { +// int schedulesWithoutBaselines = measurementBaselineManager +// ._calculateAutoBaselinesINSERT(amountOfData); + List<MeasurementSchedule> schedulesWithoutBaselines = + measurementBaselineManager.getSchedulesWithoutBaselines(); + measurementBaselineManager.calculateBaselines(schedulesWithoutBaselines, now, amountOfData); + totalProcessed += schedulesWithoutBaselines.size(); + + if (schedulesWithoutBaselines.size() < BASELINE_PROCESSING_LIMIT) { break; } } @@ -217,6 +223,62 @@ public class MeasurementBaselineManagerBean implements MeasurementBaselineManage
@SuppressWarnings("unchecked") @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) + public List<MeasurementSchedule> getSchedulesWithoutBaselines() { + Query query = this.entityManager + .createNamedQuery(MeasurementBaseline.QUERY_FIND_MEASUREMENT_SCHEDULES_WITHOUT_AUTOBASELINES); + query.setMaxResults(BASELINE_PROCESSING_LIMIT); + List<MeasurementSchedule> scheduleIdsWithoutBaselines = query.getResultList(); + + return scheduleIdsWithoutBaselines; + } + + @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) + public void calculateBaselines(List<MeasurementSchedule> schedules, long olderThan, long amountOfData) { + long endTime = olderThan; + long startTime = endTime - amountOfData; + + log.debug("Computing baselines for " + schedules.size() + " schedules"); + MetricsBaselineCalculator baselineCalculator = new MetricsBaselineCalculator(sessionManager.getMetricsDAO()); + long calcStartTime = System.currentTimeMillis(); + List<MeasurementBaseline> results = baselineCalculator.calculateBaselines(schedules, startTime, endTime); + long calcEndTime = System.currentTimeMillis(); + int count = results.size(); + + if (log.isDebugEnabled()) { + log.debug("Finished computing " + count + " new baselines in " + (calcEndTime - calcStartTime) + " ms"); + } + + log.debug("Persisting baselines calculations"); + long saveStartTime = System.currentTimeMillis(); + Iterator<MeasurementBaseline> iterator = results.iterator(); + List<MeasurementBaseline> queue = new LinkedList<MeasurementBaseline>(); + while (iterator.hasNext()) { + if (queue.size() == 10) { + measurementBaselineManager.saveNewBaselines(queue); + queue = new LinkedList<MeasurementBaseline>(); + } + queue.add(iterator.next()); + } + if (!queue.isEmpty()) { + measurementBaselineManager.saveNewBaselines(queue); + } + + long saveEndTime = System.currentTimeMillis(); + if (log.isDebugEnabled()) { + log.debug("Finished persisting " + count + " baselines in " + (saveEndTime - saveStartTime) + " ms"); + } + } + + @Override + @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) + public void saveNewBaselines(List<MeasurementBaseline> baselines) { + for (MeasurementBaseline baseline : baselines) { + entityManager.merge(baseline); + } + } + + @SuppressWarnings("unchecked") + @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) public int _calculateAutoBaselinesINSERT(long amountOfData) throws Exception { long endTime = System.currentTimeMillis(); long startTime = endTime - amountOfData; diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java index 7f7655a..8781fc5 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementBaselineManagerLocal.java @@ -24,6 +24,7 @@ import javax.ejb.Local;
import org.rhq.core.domain.auth.Subject; import org.rhq.core.domain.measurement.MeasurementBaseline; +import org.rhq.core.domain.measurement.MeasurementSchedule; import org.rhq.core.domain.resource.Resource;
/** @@ -68,10 +69,47 @@ public interface MeasurementBaselineManagerLocal { * Inserts baselines "as appropriate" for measurements that have at least amountOfData * @param amountOfData will use amountOfData to compute new min/max/mean for baselines as appropriate * @return number of rows inserted + * @deprecated This method is pending removal after peer review. Because 1hr data is + * now stored in Cassandra, transaction boundaries needed to change. This method has + * been replaced by {@link #getSchedulesWithoutBaselines()}, {@link #calculateBaselines(java.util.List, long)}, + * and {@link #saveNewBaselines(java.util.List)}. * @throws Exception */ int _calculateAutoBaselinesINSERT(long amountOfData) throws Exception;
+ /** + * <strong>Note</strong> This method exists only for transaction demarcation. + * + * @return A list of schedules that do not have baselines. This list is not assumed + * to be an exhaustive list of schedules that lack a baseline. As such, this method + * will be called repeatedly during baseline calculations to get all of the necessary + * schedules. + */ + List<MeasurementSchedule> getSchedulesWithoutBaselines(); + + /** + * Given a list of schedules, this method calculates and stores baselines using the + * amount of 1 hr data specified and older than the time specified. + * <br/><br/> + * <strong>Note</strong> This method exists only for transaction demarcation. + * + * @param schedules The schedules that do not yet have baselines + * @param olderThan Use 1 hr data prior to this time + * @param amountOfData The amount of data to use for calculating baselines. This value + * is treated as a duration. For example, a value of 259200000 + * would be treated as 3 days. + */ + void calculateBaselines(List<MeasurementSchedule> schedules, long olderThan, long amountOfData); + + /** + * Persists the newly calculated baselines. + * <br/><br/> + * <strong>Note</strong> This method exists only for transaction demarcation. + * + * @param baselines The baselines to persist. + */ + void saveNewBaselines(List<MeasurementBaseline> baselines); + MeasurementBaseline getBaselineIfEqual(Subject subject, int groupId, int definitionId);
/** diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java index eecbbca..f742c7f 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerBean.java @@ -82,6 +82,9 @@ public class MeasurementOOBManagerBean implements MeasurementOOBManagerLocal { @EJB AuthorizationManagerLocal authMangager;
+ @EJB + MeasurementOOBManagerLocal oobManager; + /** * Compute oobs from the values in the 1h measurement table that just got added. * For the total result, this is an incremental computation. The idea is that @@ -191,56 +194,15 @@ public class MeasurementOOBManagerBean implements MeasurementOOBManagerLocal { }
@SuppressWarnings("unchecked") + @TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED) public void computeOOBsForLastHour(Subject subject, Iterable<AggregateNumericMetric> metrics) { + log.info("Computing OOBs"); int count = 0; long startTime = System.currentTimeMillis(); try { for (AggregateNumericMetric metric : metrics) { try { - List<MeasurementBaseline> baselines = entityManager.createQuery( - "select baseline from MeasurementBaseline baseline where baseline.schedule.id = :scheduleId") - .setParameter("scheduleId", metric.getScheduleId()) - .getResultList(); - if (baselines.isEmpty()) { - continue; - } - MeasurementBaseline baseline = baselines.get(0); - Long upperDelta = null; - Long lowerDelta = null; - - if (isPastUpperBound(baseline, metric)) { - upperDelta = - Math.round(((metric.getMax() - baseline.getMax()) / (baseline.getMax() - baseline.getMin())) * 100); - } - - if (isPastLowerBound(baseline, metric)) { - lowerDelta = - Math.round(((baseline.getMin() - metric.getMin()) / (baseline.getMax() - baseline.getMin())) * 100); - } - - Integer oobFactor; - if (upperDelta != null && lowerDelta == null) { - oobFactor = upperDelta.intValue(); - } else if (upperDelta == null && lowerDelta != null) { - oobFactor = lowerDelta.intValue(); - } else if (upperDelta != null && lowerDelta != null) { - if (upperDelta > lowerDelta) { - oobFactor = upperDelta.intValue(); - } else { - oobFactor = lowerDelta.intValue(); - } - } else { // both are null - oobFactor = null; - } - - if (oobFactor != null) { - MeasurementOOB oob = new MeasurementOOB(); - oob.setScheduleId(metric.getScheduleId()); - oob.setTimestamp(metric.getTimestamp()); - oob.setOobFactor(oobFactor); - entityManager.merge(oob); - ++count; - } + count += oobManager.calculateOOB(metric); } catch (Exception e) { log.error("An error occurred while calculating OOBs for " + metric, e); throw new RuntimeException(e); @@ -248,10 +210,66 @@ public class MeasurementOOBManagerBean implements MeasurementOOBManagerLocal { } } finally { long endTime = System.currentTimeMillis(); + if (log.isInfoEnabled()) { + log.info("Finished calculating " + count + " OOBs in " + (endTime - startTime) + " ms"); + } + } + } + + @SuppressWarnings("unchecked") + @TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW) + public int calculateOOB(AggregateNumericMetric metric) { + List<MeasurementBaseline> baselines = entityManager.createQuery( + "select baseline from MeasurementBaseline baseline where baseline.schedule.id = :scheduleId") + .setParameter("scheduleId", metric.getScheduleId()) + .getResultList(); + if (baselines.isEmpty()) { + return 0; + } + MeasurementBaseline baseline = baselines.get(0); + Long upperDelta = null; + Long lowerDelta = null; + + if (isPastUpperBound(baseline, metric)) { + upperDelta = + Math.round(((metric.getMax() - baseline.getMax()) / (baseline.getMax() - baseline.getMin())) * 100); + } + + if (isPastLowerBound(baseline, metric)) { + lowerDelta = + Math.round(((baseline.getMin() - metric.getMin()) / (baseline.getMax() - baseline.getMin())) * 100); + } + + Integer oobFactor; + if (upperDelta != null && lowerDelta == null) { + oobFactor = upperDelta.intValue(); + } else if (upperDelta == null && lowerDelta != null) { + oobFactor = lowerDelta.intValue(); + } else if (upperDelta != null && lowerDelta != null) { + if (upperDelta > lowerDelta) { + oobFactor = upperDelta.intValue(); + } else { + oobFactor = lowerDelta.intValue(); + } + } else { // both are null + oobFactor = null; + } + + if (oobFactor != null) { + MeasurementOOB oob = new MeasurementOOB(); + oob.setScheduleId(metric.getScheduleId()); + oob.setTimestamp(metric.getTimestamp()); + oob.setOobFactor(oobFactor); + if (log.isDebugEnabled()) { - log.debug("Finished calculating " + count + " OOBs in " + (endTime - startTime) + " ms"); + log.debug("Generated OOB " + oob + " for 1 hr metric " + metric + " with baseline " + baseline); } + + entityManager.merge(oob); + return 1; } + + return 0; }
private boolean isPastUpperBound(MeasurementBaseline baseline, AggregateNumericMetric metric) { diff --git a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java index b452a28..853c5af 100644 --- a/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java +++ b/modules/enterprise/server/jar/src/main/java/org/rhq/enterprise/server/measurement/MeasurementOOBManagerLocal.java @@ -45,9 +45,30 @@ public interface MeasurementOOBManagerLocal { */ void computeOOBsFromHourBeginingAt(Subject subject, long begin);
+ /** + * Computes OOBs using the provided 1 hr data which should be the most recent 1 hr + * aggregates. These metrics are provided as an argument as opposed to querying for + * them because we already have the 1 hr aggregates load in memory when metrics + * aggregation runs prior to calculating OOBs. + * + * @param subject + * @param metrics The most recent 1 hr aggregates + */ void computeOOBsForLastHour(Subject subject, Iterable<AggregateNumericMetric> metrics);
/** + * Determines and calculates an OOB if necessary, If an OOB is generated, this method + * saves it to the database. + * <br/><br/> + * <strong>Note</strong> This method exists only for transaction demarcation. + * + * @param metric The 1 hr metric that is used to determine whether or not an OOB should + * be generated + * @return 1 if an OOB is generated, 0 otherwise + */ + int calculateOOB(AggregateNumericMetric metric); + + /** * Return OOB Composites that contain all information about the OOBs in a given time as aggregates. * @param subject The caller * @param metricNameFilter diff --git a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java index 3aa5441..ba228d9 100644 --- a/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java +++ b/modules/enterprise/server/server-metrics/src/main/java/org/rhq/server/metrics/MetricsBaselineCalculator.java @@ -113,6 +113,12 @@ public class MetricsBaselineCalculator { return baseline; }
- return null; + MeasurementBaseline baseline = new MeasurementBaseline(); + baseline.setMax(Double.NaN); + baseline.setMin(Double.NaN); + baseline.setMean(Double.NaN); + baseline.setSchedule(schedule); + + return baseline; } }