/** * Add a metric event to be aggregated. * Events with the same name, unit, and dimensions will have their values * aggregated into {@link StatisticSet}s, with the aggregated data * available via {@link #flush}. * * @param context Metric context to use for dimension information * @param name Metric name * @param value Recorded value for the metric event * @param unit Unit for interpreting the value */ public void add( final TypedMap context, final Metric name, final double value, final StandardUnit unit) { //TODO: avoid doing this every time for a context - caching, or? List<Dimension> dimensions = dimensionMapper.getDimensions(name, context); DatumKey key = new DatumKey(name.toString(), unit, dimensions); statisticsMap.merge( key, new StatisticSet() .withMaximum(value) .withMinimum(value) .withSampleCount(1D) .withSum(value), MetricDataAggregator::sum); }
/** * Flush all the current aggregated MetricDatum and return as a list. * This is safe to call concurrently with {@link #add}. * All data added prior to a flush call will be included in the returned aggregate. * Any data added after the flush call returns will be included in a subsequent flush. * Data added while a flush call is processing may be included in the current flush * or a subsequent flush, but will not be included twice. * * The timestamp on the aggregated data will be the time it was flushed, * not the time of any of the original metric events. * * @return list of all data aggregated since the last flush */ public List<MetricDatum> flush() { if (statisticsMap.size() == 0) { return Collections.emptyList(); } // Capture all the current metrics, as represented by the set of keys // at this time in the statisticsMap. // Note that this iterates over the key set of the underlying map, and // removes keys from the map at the same time. It is possible keys may // be added during this iteration, or data for keys modified between // a key being chosen for iteration and being removed from the map. // This is ok. Any new keys will be picked up on subsequent flushes. //TODO: use two maps and swap between, to ensure 'perfect' segmentation? List<MetricDatum> metricData = new ArrayList<>(); for (DatumKey key : statisticsMap.keySet()) { StatisticSet value = statisticsMap.remove(key); //TODO: better to have no timestamp at all? MetricDatum metricDatum = key.getDatum().withTimestamp(Date.from(Instant.now())) .withStatisticValues(value); metricData.add(metricDatum); } return metricData; }
private MetricDatum makeDatum( final String id, final String name, final double sum, final double min, final double max, final int count, final StandardUnit unit) { MetricDatum md = new MetricDatum().withMetricName(name).withUnit(unit); final StatisticSet statSet = new StatisticSet() .withSampleCount(Double.valueOf(count)) .withSum(sum) .withMinimum(min) .withMaximum(max); md.setStatisticValues(statSet); List<Dimension> dimensions = new ArrayList<>(1); Dimension trace = new Dimension().withName(ContextData.ID.name).withValue(id); dimensions.add(trace); md.setDimensions(dimensions); return md; }
private void stageMetricDatumWithConvertedSnapshot(final boolean metricConfigured, final String metricName, final Snapshot snapshot, final StandardUnit standardUnit, final List<MetricDatum> metricData) { if (metricConfigured) { double scaledSum = convertDuration(LongStream.of(snapshot.getValues()).sum()); final StatisticSet statisticSet = new StatisticSet() .withSum(scaledSum) .withSampleCount((double) snapshot.size()) .withMinimum(convertDuration(snapshot.getMin())) .withMaximum(convertDuration(snapshot.getMax())); final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions); dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY)); metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) .withMetricName(metricName) .withDimensions(dimensions) .withStatisticValues(statisticSet) .withUnit(standardUnit)); } }
private void stageMetricDatumWithRawSnapshot(final boolean metricConfigured, final String metricName, final Snapshot snapshot, final StandardUnit standardUnit, final List<MetricDatum> metricData) { if (metricConfigured) { double total = LongStream.of(snapshot.getValues()).sum(); final StatisticSet statisticSet = new StatisticSet() .withSum(total) .withSampleCount((double) snapshot.size()) .withMinimum((double) snapshot.getMin()) .withMaximum((double) snapshot.getMax()); final Set<Dimension> dimensions = new LinkedHashSet<>(builder.globalDimensions); dimensions.add(new Dimension().withName(DIMENSION_NAME_TYPE).withValue(DIMENSION_SNAPSHOT_SUMMARY)); metricData.add(new MetricDatum() .withTimestamp(new Date(builder.clock.getTime())) .withMetricName(metricName) .withDimensions(dimensions) .withStatisticValues(statisticSet) .withUnit(standardUnit)); } }
/** * @param rescale the submitted sum by this multiplier. 1.0 is the identity (no rescale). */ void reportSampling(Map.Entry<String, ? extends Sampling> entry, String typeDimValue, double rescale, List<MetricDatum> data) { Sampling metric = entry.getValue(); Snapshot snapshot = metric.getSnapshot(); double scaledSum = sum(snapshot.getValues()) * rescale; final StatisticSet statisticSet = new StatisticSet() .withSum(scaledSum) .withSampleCount((double) snapshot.size()) .withMinimum((double) snapshot.getMin() * rescale) .withMaximum((double) snapshot.getMax() * rescale); DemuxedKey key = new DemuxedKey(appendGlobalDimensions(entry.getKey())); Iterables.addAll(data, key.newDatums(typeDimName, typeDimValue, new Function<MetricDatum, MetricDatum>() { @Override public MetricDatum apply(MetricDatum datum) { return datum.withStatisticValues(statisticSet); } })); }
private static StatisticSet sum( StatisticSet v1, StatisticSet v2 ) { //TODO: reuse one of the passed sets, and pollute a MetricDatum? StatisticSet stats = new StatisticSet(); stats.setMaximum(Math.max(v1.getMaximum(), v2.getMaximum())); stats.setMinimum(Math.min(v1.getMinimum(), v2.getMinimum())); stats.setSampleCount(v1.getSampleCount() + v2.getSampleCount()); stats.setSum(v1.getSum() + v2.getSum()); return stats; }
@Test public void single() throws Exception { final DimensionMapper mapper = new DimensionMapper.Builder() .addGlobalDimension(ContextData.ID) .build(); final Metric name = Metric.define("SomeMetric"); final double value = 3.14; final StandardUnit unit = StandardUnit.Terabits; final TypedMap context = ContextData.withId(UUID.randomUUID().toString()).build(); MetricDataAggregator aggregator = new MetricDataAggregator(mapper); aggregator.add(context, name, value, unit); List<MetricDatum> ags = aggregator.flush(); assertEquals("One metric datum should aggregate to one entry", 1, ags.size()); assertEquals("Metric datum has wrong name", name.toString(), ags.get(0).getMetricName()); assertEquals("Metric datum has wrong unit", unit.toString(), ags.get(0).getUnit()); StatisticSet stats = ags.get(0).getStatisticValues(); assertEquals("Metric datum has wrong stats value", Double.valueOf(value), stats.getSum()); assertEquals("Metric datum has wrong stats value", Double.valueOf(value), stats.getMinimum()); assertEquals("Metric datum has wrong stats value", Double.valueOf(value), stats.getMaximum()); assertEquals("Metric datum has wrong stats count", Double.valueOf(1), stats.getSampleCount()); assertTrue("Flush with no data was non-empty", aggregator.flush().isEmpty()); }
private void reportTimer(String key, Collection<MetricDatum> data, Map.Entry<String, Timer> met) { Timer timer = met.getValue(); Snapshot snapshot = timer.getSnapshot(); if (reportAggregates) { reportAggregate(key, data, "count", null, timer.getCount()); reportAggregate(key, data, "rate", "1minute", timer.getOneMinuteRate()); reportAggregate(key, data, "rate", "5minute", timer.getFiveMinuteRate()); reportAggregate(key, data, "rate", "15minute", timer.getFifteenMinuteRate()); reportAggregate(key, data, "rate", "mean", timer.getMeanRate()); reportSnapshot(data, snapshot, key); } else { // if no data, don't bother Amazon with it. if (snapshot.size() == 0) { return; } double sum = 0; for (double val : snapshot.getValues()) { sum += val; } // Metrics works in Nanoseconds, which is not one of Amazon's favorites. double max = (double) TimeUnit.NANOSECONDS.toMicros(snapshot.getMax()); double min = (double) TimeUnit.NANOSECONDS.toMicros(snapshot.getMin()); double sumMicros = TimeUnit.NANOSECONDS.toMicros((long) sum); StatisticSet stats = new StatisticSet() .withMaximum(max) .withMinimum(min) .withSum(sumMicros) .withSampleCount((double) snapshot.getValues().length); if (LOG.isDebugEnabled()) { LOG.debug("timer {}: {}", met.getKey(), stats); } data.add(new MetricDatum().withMetricName(met.getKey()) .withDimensions(dimensions) .withStatisticValues(stats) .withUnit(StandardUnit.Microseconds)); } }
private void reportHistogram(Collection<MetricDatum> data, Map.Entry<String, Histogram> meh) { Snapshot snapshot = meh.getValue().getSnapshot(); if (reportAggregates) { String key = meh.getKey(); reportSnapshot(data, snapshot, key); } else { // if no data, don't bother Amazon with it. if (snapshot.size() == 0) { return; } double sum = 0; for (double val : snapshot.getValues()) { sum += val; } StatisticSet stats = new StatisticSet().withMaximum((double) snapshot.getMax()) .withMinimum((double) snapshot.getMin()) .withSum(sum) .withSampleCount((double) snapshot.getValues().length); if (LOG.isDebugEnabled()) { LOG.debug("histogram {}: {}", meh.getKey(), stats); } data.add(new MetricDatum().withMetricName(meh.getKey()) .withDimensions(dimensions) .withStatisticValues(stats)); } }
/** * Collect the aggregated values (min, max, count, sum) into a * StatisticSet and send the data to Cloud Watch */ @Override public void run() { PutMetricDataRequest localPutMetricDataRequest = zeroValuePutMetricDataRequest; MetricDatum metricDatum = localPutMetricDataRequest.getMetricData().get(0); if (sampleCount > 0) { localPutMetricDataRequest = putMetricDataRequest; metricDatum = localPutMetricDataRequest.getMetricData().get(0); StatisticSet statisticSet = metricDatum.getStatisticValues(); synchronized (lock) { statisticSet.setMaximum(maximum); statisticSet.setMinimum(minimum); statisticSet.setSampleCount(sampleCount); statisticSet.setSum(sum); minimum = Double.MAX_VALUE; maximum = Double.MIN_VALUE; sampleCount = 0; sum = 0; } } metricDatum.setTimestamp(new Date()); if (log.isDebugEnabled()) { log.debug("sending " + localPutMetricDataRequest); } cloudWatchClient.putMetricData(localPutMetricDataRequest); }