public KafkaUDPConsumer( Stage.Context context, UDPConfigBean udpConfigBean, KafkaTargetConfig kafkaTargetConfig, BlockingQueue<Exception> errorQueue ) { this.udpConfigBean = udpConfigBean; this.kafkaTargetConfig = kafkaTargetConfig; this.errorQueue = errorQueue; acceptedPackagesMeter = context.createMeter("acceptedPackages"); discardedPackagesMeter = context.createMeter("discardedPackages"); errorPackagesMeter = context.createMeter("errorPackages"); udpTimer = context.createTimer("udp"); kafkaTimer = context.createTimer("kafka"); kafkaMessagesMeter = context.createMeter("kafkaMessages"); // context does not have a createHistogram(), TODO open JIRA for that concurrencyHistogram = new Histogram(new ExponentiallyDecayingReservoir()); context .getMetrics() .register("custom." + context.getPipelineInfo().get(0).getInstanceName() + ".concurrentPackages.histogram", concurrencyHistogram); }
@Override public List<Stage.ConfigIssue> init(Stage.Context context) { List<Stage.ConfigIssue> issues = new ArrayList<>(); kafkaTimer = context.createTimer("kafka"); kafkaMessagesMeter = context.createMeter("kafkaMessages"); //TODO: change to use API-66 when API-66 is done. concurrencyHistogram = new Histogram(new ExponentiallyDecayingReservoir()); context .getMetrics() .register("custom." + context.getPipelineInfo().get(0).getInstanceName() + ".concurrentRequests.histogram", concurrencyHistogram); try { kafkaProducerPool = createKafkaProducerPool(); } catch (Exception ex) { } return issues; }
@Override public void run( SourcePipe originPipe, List<PipeRunner> pipes, BadRecordsHandler badRecordsHandler, List<StageOutput> stageOutputsToOverride, StatsAggregationHandler statsAggregationHandler ) throws StageException, PipelineRuntimeException { this.originPipe = originPipe; this.pipes = pipes; this.badRecordsHandler = badRecordsHandler; this.statsAggregationHandler = statsAggregationHandler; this.runnerPool = new RunnerPool<>(pipes, new RuntimeStats(), new Histogram(new ExponentiallyDecayingReservoir())); stagesToSkip = new HashMap<>(); for (StageOutput stageOutput : stageOutputsToOverride) { stagesToSkip.put(stageOutput.getInstanceName(), stageOutput); } if (originPipe.getStage().getStage() instanceof PushSource) { runPushSource(); } else { runPollSource(); } }
@Override @SuppressWarnings("unchecked") public <T> T create(final Class<T> type) { if (type.equals(Meter.class)) { return (T) new Meter(); } if (type.equals(Counter.class)) { return (T) new Counter(); } if (type.equals(Histogram.class)) { return (T) new Histogram(new ExponentiallyDecayingReservoir()); } return null; }
@Test public void testFromHistogram() { final Set<String> fieldKeys = ImmutableSet.of( "count", "min", "max", "mean", "std-dev", "50-percentile", "75-percentile", "95-percentile", "99-percentile", "999-percentile", "run-count" ); final DropwizardMeasurementParser parser = mock(DropwizardMeasurementParser.class); final DropwizardTransformer transformer = transformerWithParser(parser, true); when(parser.parse("some.metric.name")).thenReturn( DropwizardMeasurement.create("Measurement", MEASUREMENT_TAGS, Optional.empty()) ); final Histogram histogram = new Histogram(new ExponentiallyDecayingReservoir()); histogram.update(15L); histogram.update(70L); histogram.update(100L); final InfluxDbMeasurement measurement = transformer.fromHistogram("some.metric.name", histogram, 90210L); assertEquals("should parse name from full metric key", "Measurement", measurement.name()); assertEquals("should add global and measurement tags", ALL_TAGS, measurement.tags()); assertEquals("should timestamp measurement", 90210L, measurement.timestamp()); assertEquals("should add all histogram fields", fieldKeys, measurement.fields().keySet()); }
public CompositeForwardingHistogram(Histogram mainDelegate, MetricProvider<Histogram> supplementaryMetricProvider) { super(new ExponentiallyDecayingReservoir()); Preconditions.checkNotNull(mainDelegate); Preconditions.checkNotNull(supplementaryMetricProvider); this.mainDelegate = mainDelegate; this.supplementaryMetricProvider = supplementaryMetricProvider; }
public ForwardingReadOnlyHistogram(final Histogram delegate) { super(new ExponentiallyDecayingReservoir()); Preconditions.checkNotNull(delegate); this.metricProvider = new MetricProvider<Histogram>() { @Override public Histogram get() { return delegate; } }; }
@Test public void testCreateTimer() throws Exception { final ExponentiallyDecayingTimingStrategy strategy = new ExponentiallyDecayingTimingStrategy(); final Timer timer = strategy.createTimer(new MapConfig()); ExponentiallyDecayingReservoir reservoir = reservoir(timer); assertNotNull(reservoir); assertEquals("decay", strategy.name()); }
ContextAwareHistogram(MetricContext context, String name) { super(new ExponentiallyDecayingReservoir()); this.name = name; this.context = context; this.tagged = new Tagged(); Optional<MetricContext> parentContext = context.getParent(); if (parentContext.isPresent()) { this.parentHistogram = Optional.fromNullable(parentContext.get().contextAwareHistogram(name)); } else { this.parentHistogram = Optional.absent(); } }
/** * Get reservoir implementation based on the reservoir type * * @return The {@link Reservoir} implementation */ private Reservoir getReservoir() { // The Reservoir implementation is selected using a switch statement. // The ReservoirType enum is a part of YAML configuration // and foreign imports are not supported by Carbon Configuration Maven Plugin. // Therefore, the Reservoir class cannot be imported and the Reservoir // creation logic cannot be written inside ReservoirType enum. switch (reservoirType) { case EXPONENTIALLY_DECAYING: return new ExponentiallyDecayingReservoir(); case UNIFORM: return new UniformReservoir(reservoirParametersConfig.getSize()); case SLIDING_WINDOW: return new SlidingWindowReservoir(reservoirParametersConfig.getSize()); case SLIDING_TIME_WINDOW: return new SlidingTimeWindowReservoir(reservoirParametersConfig.getWindow(), reservoirParametersConfig.getWindowUnit()); case HDR_HISTOGRAM: Recorder recorder = new Recorder(reservoirParametersConfig.getNumberOfSignificantValueDigits()); if (reservoirParametersConfig.isResetOnSnapshot()) { return new HdrHistogramResetOnSnapshotReservoir(recorder); } else { return new HdrHistogramReservoir(recorder); } default: throw new RuntimeException("Invalid Reservoir Type"); } }
public SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) { this.histogram = new Histogram(new ExponentiallyDecayingReservoir()); this.percentiles = percentiles; this.gauges = this.percentiles.stream() .filter(x -> x > 0 && x <= 100) .collect( Collectors.toMap(Function.identity(), x -> registry.newGauge(group, name + "_" + String.valueOf(0), 0D))); }
SamzaHistogram(MetricsRegistry registry, String group, String name, List<Double> percentiles) { this.registry = registry; this.histogram = new Histogram(new ExponentiallyDecayingReservoir()); this.percentiles = percentiles; this.gauges = percentiles.stream() .filter(x -> x > 0 && x <= 100) .collect( Collectors.toMap(Function.identity(), x -> this.registry.newGauge(group, name + "_" + String.valueOf(0), 0D))); }
public static Histogram createHistogram5Min(MetricRegistry metrics, String name, final String pipelineName, final String pipelineRev) { return create( metrics, new Histogram(new ExponentiallyDecayingReservoir()), metricName(name, HISTOGRAM_M5_SUFFIX), pipelineName, pipelineRev ); }
@Before public void createRunnerPool() { this.runnerPool = new RunnerPool<>( ImmutableList.of("a", "b"), new RuntimeStats(), new Histogram(new ExponentiallyDecayingReservoir()) ); }
public void receiveTiming(InetAddress host, long latency) // this is cheap { ExponentiallyDecayingReservoir sample = samples.get(host); if (sample == null) { ExponentiallyDecayingReservoir maybeNewSample = new ExponentiallyDecayingReservoir(WINDOW_SIZE, ALPHA); sample = samples.putIfAbsent(host, maybeNewSample); if (sample == null) sample = maybeNewSample; } sample.update(latency); }
public List<Double> dumpTimings(String hostname) throws UnknownHostException { InetAddress host = InetAddress.getByName(hostname); ArrayList<Double> timings = new ArrayList<Double>(); ExponentiallyDecayingReservoir sample = samples.get(host); if (sample != null) { for (double time: sample.getSnapshot().getValues()) timings.add(time); } return timings; }
@Test public void testHistogramWithMetricsEnabled() { HistogramAdapter histogramAdapter = new HistogramAdapter(new com.codahale.metrics.Histogram(new ExponentiallyDecayingReservoir()), true); for (int i = 1; i <= 9; i++) { histogramAdapter.update(i); } assertEquals(9, histogramAdapter.getCount()); histogramAdapter.update(10L); assertEquals(10, histogramAdapter.getCount()); Snapshot snapshot = histogramAdapter.getSnapshot(); assertEquals(8.0, snapshot.get75thPercentile(), 0); assertEquals(10, snapshot.get95thPercentile(), 0); assertEquals(10, snapshot.get99thPercentile(), 0); assertEquals(10, snapshot.get999thPercentile(), 0); assertEquals(10, snapshot.getMax()); assertEquals(5.5, snapshot.getMean(), .001); assertEquals(6.0, snapshot.getMedian(), 0); assertEquals(1, snapshot.getMin()); assertEquals(2.872, snapshot.getStdDev(), .001); assertEquals(6.0, snapshot.getValue(.5), 0); assertArrayEquals(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, snapshot.getValues()); }
@Test public void testHistogramWithMetricsDisabled() { HistogramAdapter histogramAdapter = new HistogramAdapter(new com.codahale.metrics.Histogram(new ExponentiallyDecayingReservoir()), false); for (int i = 1; i <= 9; i++) { histogramAdapter.update(i); } assertEquals(0, histogramAdapter.getCount()); histogramAdapter.update(10L); assertEquals(0, histogramAdapter.getCount()); Snapshot snapshot = histogramAdapter.getSnapshot(); assertEquals(0, snapshot.get75thPercentile(), 0); assertEquals(0, snapshot.get95thPercentile(), 0); assertEquals(0, snapshot.get99thPercentile(), 0); assertEquals(0, snapshot.get999thPercentile(), 0); assertEquals(0, snapshot.getMax()); assertEquals(0, snapshot.getMean(), 0); assertEquals(0, snapshot.getMedian(), 0); assertEquals(0, snapshot.getMin()); assertEquals(0, snapshot.getStdDev(), 0); assertEquals(0, snapshot.getValue(.5), 0); assertArrayEquals(new long[]{}, snapshot.getValues()); }
protected Histogram histogram(String... names) { try { return registry.histogram(nameOf(names)); } catch (Exception e) { return new Histogram(new ExponentiallyDecayingReservoir()); } }
InnerHistogram(MetricContext context, String name, ContextAwareHistogram contextAwareHistogram) { super(new ExponentiallyDecayingReservoir()); this.name = name; Optional<MetricContext> parentContext = context.getParent(); if (parentContext.isPresent()) { this.parentHistogram = Optional.fromNullable(parentContext.get().contextAwareHistogram(name)); } else { this.parentHistogram = Optional.absent(); } this.contextAwareHistogram = new WeakReference<>(contextAwareHistogram); }
protected void runAlgorithm(long[] numbers) { reservoir = new ExponentiallyDecayingReservoir() ; for(int i = 0; i < numbers.length; i++) { reservoir.update(numbers[i]); } snapshot = reservoir.getSnapshot() ; }
@Test public void testBuilder() { final MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class); DataSource dataSource = Mockito.mock(DataSource.class); PoolAdapterFactory<DataSource> poolAdapterFactory = Mockito.mock(PoolAdapterFactory.class); ConnectionProxyFactory connectionProxyFactory = Mockito.mock(ConnectionProxyFactory.class); Metrics metrics = Mockito.mock(Metrics.class); PoolAdapter poolAdapter = Mockito.mock(PoolAdapter.class); when(poolAdapterFactory.newInstance(any(ConfigurationProperties.class))).thenReturn(poolAdapter); Configuration<DataSource> configuration = new Configuration.Builder<DataSource>( "unique", dataSource, poolAdapterFactory) .setConnectionProxyFactory(connectionProxyFactory) .setJmxAutoStart(true) .setJmxEnabled(true) .setMetricLogReporterMillis(120) .setMetricsFactory(new MetricsFactory() { @Override public Metrics newInstance(ConfigurationProperties configurationProperties) { return new DropwizardMetrics(configurationProperties, metricRegistry, new ReservoirFactory() { @Override public Reservoir newInstance(Class<? extends Metric> metricClass, String metricName) { return new ExponentiallyDecayingReservoir(); } }); } }) .build(); assertSame("unique", configuration.getUniqueName()); assertSame(connectionProxyFactory, configuration.getConnectionProxyFactory()); assertTrue(configuration.isJmxAutoStart()); assertTrue(configuration.isJmxEnabled()); assertEquals(120, configuration.getMetricLogReporterMillis()); assertSame(poolAdapter, configuration.getPoolAdapter()); assertSame(dataSource, configuration.getTargetDataSource()); }
@Test public void testBuilder() { final MetricRegistry metricRegistry = Mockito.mock(MetricRegistry.class); DataSource dataSource = Mockito.mock(DataSource.class); PoolAdapterFactory<DataSource> poolAdapterFactory = Mockito.mock(PoolAdapterFactory.class); ConnectionProxyFactory connectionProxyFactory = Mockito.mock(ConnectionProxyFactory.class); Metrics metrics = Mockito.mock(Metrics.class); PoolAdapter poolAdapter = Mockito.mock(PoolAdapter.class); when(poolAdapterFactory.newInstance(any(ConfigurationProperties.class))).thenReturn(poolAdapter); Configuration<DataSource> configuration = new Configuration.Builder<DataSource>( "unique", dataSource, poolAdapterFactory) .setConnectionProxyFactory(connectionProxyFactory) .setJmxAutoStart(true) .setJmxEnabled(true) .setMetricLogReporterMillis(120) .setMetricsFactory(new MetricsFactory() { @Override public Metrics newInstance(ConfigurationProperties configurationProperties) { return new CodahaleMetrics(configurationProperties, metricRegistry, new ReservoirFactory() { @Override public Reservoir newInstance(Class<? extends Metric> metricClass, String metricName) { return new ExponentiallyDecayingReservoir(); } }); } }) .build(); assertSame("unique", configuration.getUniqueName()); assertSame(connectionProxyFactory, configuration.getConnectionProxyFactory()); assertTrue(configuration.isJmxAutoStart()); assertTrue(configuration.isJmxEnabled()); assertEquals(120, configuration.getMetricLogReporterMillis()); assertSame(poolAdapter, configuration.getPoolAdapter()); assertSame(dataSource, configuration.getTargetDataSource()); }
public static void main(String[] args) throws Exception { Random random = new Random(System.nanoTime()); Config config = new Config.Builder().directory(TestFileHelper.TEMP_PATH).compactionStrategy (CompactionStrategies.SIZE_TIERED_COMPACTION_STRATEGY).tableCacheSize(512000000).indexCacheSize (64000000).maxWriteRate(Integer.MAX_VALUE).build(); MetricRegistry metrics = new MetricRegistry(); ConsoleReporter reporter = PerformanceHelper.consoleReporter(metrics); Timer readTimer = metrics.register("reads", new Timer(new ExponentiallyDecayingReservoir())); DB db = HeftyDB.open(config); db.compact().get(); //Read for (int i = 0; i < RECORD_COUNT * 10; i++) { String key = random.nextInt(RECORD_COUNT) + ""; Timer.Context watch = readTimer.time(); db.get(ByteBuffers.fromString(key)); watch.stop(); } reporter.report(); db.logMetrics(); db.close(); System.exit(0); }
public TracedHistogram(String metricName) { super(new ExponentiallyDecayingReservoir()); this.metricName = metricName; }
@Test(expected = UnsupportedOperationException.class) public void testHistogramSupplier() throws Exception { registry.histogram("HXX", () -> new Histogram(new ExponentiallyDecayingReservoir())); }
@Test public void testReport() { final Sender sender = mock(Sender.class); final DropwizardTransformer transformer = mock(DropwizardTransformer.class); final InfluxDbMeasurementReporter reporter = new InfluxDbMeasurementReporter( sender, new MetricRegistry(), MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS, clock, transformer ); final long timestamp = clock.instant().toEpochMilli(); final SortedMap<String, Gauge> gauges = ImmutableSortedMap.of("some", () -> 5); final SortedMap<String, Counter> counters = ImmutableSortedMap.of("more", new Counter()); final SortedMap<String, Histogram> histograms = ImmutableSortedMap.of("metrics", new Histogram(new ExponentiallyDecayingReservoir())); final SortedMap<String, Meter> meters = ImmutableSortedMap.of("for", new Meter()); final SortedMap<String, Timer> timers = ImmutableSortedMap.of("for", new Timer()); final List<InfluxDbMeasurement> expectedMeasurements = Stream .of("gauge", "counter", "histogram", "meter", "timer") .map(name -> InfluxDbMeasurement.create( "some", ImmutableMap.of(), ImmutableMap.of(name, "stuff"), timestamp ) ).collect(toList()); when(transformer.fromGauges(gauges, timestamp)) .thenReturn(ImmutableList.of(expectedMeasurements.get(0))); when(transformer.fromCounters(counters, timestamp)) .thenReturn(ImmutableList.of(expectedMeasurements.get(1))); when(transformer.fromHistograms(histograms, timestamp)) .thenReturn(ImmutableList.of(expectedMeasurements.get(2))); when(transformer.fromMeters(meters, timestamp)) .thenReturn(ImmutableList.of(expectedMeasurements.get(3))); when(transformer.fromTimers(timers, timestamp)) .thenReturn(ImmutableList.of(expectedMeasurements.get(4))); reporter.report(gauges, counters, histograms, meters, timers); verify(sender).send(expectedMeasurements); }
@Override public Histogram newMetric() { return new Histogram(new ExponentiallyDecayingReservoir()); }
public ForwardingReadOnlyHistogram(MetricProvider<Histogram> metricProvider) { super(new ExponentiallyDecayingReservoir()); Preconditions.checkNotNull(metricProvider); this.metricProvider = metricProvider; }
@Override public Histogram getHistogram(String topLevelName, String... additionalNames) { return new Histogram(new ExponentiallyDecayingReservoir()); }
@Override public Timer createTimer(Config config) { return new Timer(new ExponentiallyDecayingReservoir( config.integerValue(SIZE_PROP).orElse(DEFAULT_SIZE), config.doubleValue(ALPHA_PROP).orElse(DEFAULT_ALPHA))); }
public TimerAlaCoda() { this(new ExponentiallyDecayingReservoir()); }
@Override public Histogram histogram(String name) { Histogram existed = (Histogram) getMetrics().get(name); if(existed != null) return existed; return register(name, new HistogramAlaCoda(new ExponentiallyDecayingReservoir())); }
@Test public void testReportMetrics() { Gauge<Integer> queueSizeGauge = new Gauge<Integer>() { @Override public Integer getValue() { return 1000; } }; Counter recordsProcessedCounter = new Counter(); recordsProcessedCounter.inc(10l); Histogram recordSizeDistributionHistogram = new Histogram(new ExponentiallyDecayingReservoir()); recordSizeDistributionHistogram.update(1); recordSizeDistributionHistogram.update(2); recordSizeDistributionHistogram.update(3); Meter recordProcessRateMeter = new Meter(); recordProcessRateMeter.mark(1l); recordProcessRateMeter.mark(2l); recordProcessRateMeter.mark(3l); Timer totalDurationTimer = new Timer(); totalDurationTimer.update(1, TimeUnit.SECONDS); totalDurationTimer.update(2, TimeUnit.SECONDS); totalDurationTimer.update(3, TimeUnit.SECONDS); SortedMap<String, Counter> counters = ImmutableSortedMap.<String, Counter>naturalOrder() .put(RECORDS_PROCESSED, recordsProcessedCounter).build(); SortedMap<String, Gauge> gauges = ImmutableSortedMap.<String, Gauge>naturalOrder() .put(QUEUE_SIZE, queueSizeGauge).build(); SortedMap<String, Histogram> histograms = ImmutableSortedMap.<String, Histogram>naturalOrder() .put(RECORD_SIZE_DISTRIBUTION, recordSizeDistributionHistogram).build(); SortedMap<String, Meter> meters = ImmutableSortedMap.<String, Meter>naturalOrder() .put(RECORD_PROCESS_RATE, recordProcessRateMeter).build(); SortedMap<String, Timer> timers = ImmutableSortedMap.<String, Timer>naturalOrder() .put(TOTAL_DURATION, totalDurationTimer).build(); this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers); Mockito.verify(this.recordsProcessedCount).increment(10l); Mockito.verify(this.recordProcessRateCount).increment(6l); Mockito.verify(this.recordSizeDistributionCount).increment(3l); Mockito.verify(this.totalDurationCount).increment(3l); Mockito.verify(this.queueSize).setValue(1000); recordsProcessedCounter.inc(5l); recordSizeDistributionHistogram.update(4); recordProcessRateMeter.mark(4l); totalDurationTimer.update(4, TimeUnit.SECONDS); this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers); Mockito.verify(this.recordsProcessedCount).increment(5l); Mockito.verify(this.recordProcessRateCount).increment(4l); Mockito.verify(this.recordSizeDistributionCount).increment(1l); Mockito.verify(this.totalDurationCount).increment(1l); }
@Test public void testReportMetrics() { Gauge<Integer> queueSizeGauge = new Gauge<Integer>() { @Override public Integer getValue() { return 1000; } }; com.codahale.metrics.Counter recordsProcessedCounter = new com.codahale.metrics.Counter(); recordsProcessedCounter.inc(10l); Histogram recordSizeDistributionHistogram = new Histogram(new ExponentiallyDecayingReservoir()); recordSizeDistributionHistogram.update(1); recordSizeDistributionHistogram.update(2); recordSizeDistributionHistogram.update(3); Meter recordProcessRateMeter = new Meter(); recordProcessRateMeter.mark(1l); recordProcessRateMeter.mark(2l); recordProcessRateMeter.mark(3l); Timer totalDurationTimer = new Timer(); totalDurationTimer.update(1, TimeUnit.SECONDS); totalDurationTimer.update(2, TimeUnit.SECONDS); totalDurationTimer.update(3, TimeUnit.SECONDS); SortedMap<String, com.codahale.metrics.Counter> counters = ImmutableSortedMap.<String, com.codahale.metrics.Counter>naturalOrder() .put(RECORDS_PROCESSED, recordsProcessedCounter).build(); SortedMap<String, Gauge> gauges = ImmutableSortedMap.<String, Gauge>naturalOrder() .put(QUEUE_SIZE, queueSizeGauge).build(); SortedMap<String, Histogram> histograms = ImmutableSortedMap.<String, Histogram>naturalOrder() .put(RECORD_SIZE_DISTRIBUTION, recordSizeDistributionHistogram).build(); SortedMap<String, Meter> meters = ImmutableSortedMap.<String, Meter>naturalOrder() .put(RECORD_PROCESS_RATE, recordProcessRateMeter).build(); SortedMap<String, Timer> timers = ImmutableSortedMap.<String, Timer>naturalOrder() .put(TOTAL_DURATION, totalDurationTimer).build(); this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers); Mockito.verify(this.recordsProcessedCount).increment(10l); Mockito.verify(this.recordProcessRateCount).increment(6l); Mockito.verify(this.recordSizeDistributionCount).increment(3l); Mockito.verify(this.totalDurationCount).increment(3l); Mockito.verify(this.queueSize).setValue(1000); recordsProcessedCounter.inc(5l); recordSizeDistributionHistogram.update(4); recordProcessRateMeter.mark(4l); totalDurationTimer.update(4, TimeUnit.SECONDS); this.hadoopCounterReporter.report(gauges, counters, histograms, meters, timers); Mockito.verify(this.recordsProcessedCount).increment(5l); Mockito.verify(this.recordProcessRateCount).increment(4l); Mockito.verify(this.recordSizeDistributionCount).increment(1l); Mockito.verify(this.totalDurationCount).increment(1l); }
public Histogram newMetric() { return new Histogram( // A min/max value will stay around for 2 * 30 seconds new MinMaxSlidingTimeReservoir(Clock.defaultClock(), 2, 30, TimeUnit.SECONDS, new ExponentiallyDecayingReservoir())); }
@Test public void testBasicStatisticsSlowRate() throws Exception { final DeterministicClock clock = new DeterministicClock(); int iterations = 10; int numSamples = 100; final Reservoir delegate = new ExponentiallyDecayingReservoir(1028, 0.015, clock); final MinMaxSlidingTimeReservoir reservoir = new MinMaxSlidingTimeReservoir(clock, SIZE, STEP, TimeUnit.NANOSECONDS, delegate); long exactValues[] = new long[(numSamples + 2) * iterations]; int i = 0; for (int iteration = 0; iteration < iterations; iteration++) { long maxPos = ThreadLocalRandom.current().nextInt(0, numSamples); long minPos = ThreadLocalRandom.current().nextInt(0, numSamples); for (long pos = 0; pos < numSamples; pos++) { long val = ThreadLocalRandom.current().nextLong(-VALUE_RANGE, VALUE_RANGE); reservoir.update(val); exactValues[i] = val; i++; if (pos == maxPos) { reservoir.update(MAX_VALUE); exactValues[i] = MAX_VALUE; i++; } if (pos == minPos) { reservoir.update(MIN_VALUE); exactValues[i] = MIN_VALUE; i++; } } final Snapshot snapshot = reservoir.getSnapshot(); assertEquals(MAX_VALUE, snapshot.getMax()); assertEquals(MIN_VALUE, snapshot.getMin()); long expectedValues[] = Arrays.copyOf(exactValues, i); Arrays.sort(expectedValues); long reservoirValues[] = Arrays.copyOf(snapshot.getValues(), snapshot.getValues().length); Arrays.sort(reservoirValues); assertArrayEquals(expectedValues, reservoirValues); } }
@Test public void testBasicStatisticsHighRate() throws Exception { final DeterministicClock clock = new DeterministicClock(); int iterations = 2; for (int iteration = 0; iteration < iterations; iteration++) { final Reservoir delegate = new ExponentiallyDecayingReservoir(1028, 0.015, clock); final MinMaxSlidingTimeReservoir reservoir = new MinMaxSlidingTimeReservoir(clock, SIZE, STEP, TimeUnit.NANOSECONDS, delegate); int numSamples = 1000000; int clockInterval = numSamples / SIZE; long exactValues[] = new long[numSamples + 2]; long maxPos = ThreadLocalRandom.current().nextInt(0, numSamples); long minPos = ThreadLocalRandom.current().nextInt(0, numSamples); int i = 0; for (long pos = 0; pos < numSamples; pos++) { if (pos > 0 && pos % clockInterval == 0) { clock.add(STEP); } long val = ThreadLocalRandom.current().nextLong(-VALUE_RANGE, VALUE_RANGE); reservoir.update(val); exactValues[i] = val; i++; // Insert an extreme max / min value at a random point in the reservoir if (pos == maxPos) { reservoir.update(MAX_VALUE); exactValues[i] = MAX_VALUE; i++; } if (pos == minPos) { reservoir.update(MIN_VALUE); exactValues[i] = MIN_VALUE; i++; } } final Snapshot snapshot = reservoir.getSnapshot(); assertEquals("Max value", MAX_VALUE, snapshot.getMax()); assertEquals("Min value", MIN_VALUE, snapshot.getMin()); final long actualValues[] = Arrays.copyOf(snapshot.getValues(), snapshot.getValues().length); assertTrue("Reservoir contains values", actualValues.length > 1000); final Set<Long> exactValueSet = new HashSet<>(); for (i = 0; i < exactValues.length; i++) { exactValueSet.add(exactValues[i]); } assertTrue("Only known values in the reservoir", Arrays .stream(actualValues) .filter(value -> !exactValueSet.contains(value)) .count() == 0); final long zeroValueRange = (VALUE_RANGE * 10) / 100; assertThat("Mean value is within 10% error rate of 0", (long) snapshot.getMean(), allOf(greaterThan(-zeroValueRange), lessThan(zeroValueRange))); final long stdDev = (long) snapshot.getStdDev(); assertThat("Mean deviation is more than 40% of value range", stdDev, greaterThan((VALUE_RANGE * 40) / 100)); assertThat("Mean deviation is less than the max value range", stdDev, lessThan(MAX_VALUE)); final Snapshot snapshot2 = reservoir.getSnapshot(); assertArrayEquals("Two calls to get snapshot results in same data", snapshot.getValues(), snapshot2.getValues()); } }
private Histogram newHistogram() { return new Histogram(new ExponentiallyDecayingReservoir()); }