Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ public class DataPoint implements DistributionDataPoint {
private final LongAdder nativeZeroCount = new LongAdder();
private final LongAdder count = new LongAdder();
private final DoubleAdder sum = new DoubleAdder();
private final long[] classicOnlyBuckets;
private long classicOnlyCount;
private double classicOnlySum;
private volatile int nativeSchema =
nativeInitialSchema; // integer in [-4, 8] or CLASSIC_HISTOGRAM
private volatile double nativeZeroThreshold = Histogram.this.nativeMinZeroThreshold;
Expand All @@ -223,16 +226,27 @@ private DataPoint() {
for (int i = 0; i < classicUpperBounds.length; i++) {
classicBuckets[i] = new LongAdder();
}
classicOnlyBuckets = new long[classicUpperBounds.length];
maybeScheduleNextReset();
}

@Override
public double getSum() {
if (isClassicOnly()) {
synchronized (this) {
return classicOnlySum;
}
}
return sum.sum();
}

@Override
public long getCount() {
if (isClassicOnly()) {
synchronized (this) {
return classicOnlyCount;
}
}
return count.sum();
}

Expand All @@ -242,7 +256,9 @@ public void observe(double value) {
// See https://github.com/prometheus/client_golang/issues/1275 on ignoring NaN observations.
return;
}
if (!buffer.append(value)) {
if (isClassicOnly()) {
doObserveClassicOnly(value);
} else if (!buffer.append(value)) {
doObserve(value, false);
}
if (exemplarSampler != null) {
Expand All @@ -256,14 +272,32 @@ public void observeWithExemplar(double value, Labels labels) {
// See https://github.com/prometheus/client_golang/issues/1275 on ignoring NaN observations.
return;
}
if (!buffer.append(value)) {
if (isClassicOnly()) {
doObserveClassicOnly(value);
} else if (!buffer.append(value)) {
doObserve(value, false);
}
if (exemplarSampler != null) {
exemplarSampler.observeWithExemplar(value, labels);
}
}

private boolean isClassicOnly() {
return Histogram.this.nativeInitialSchema == CLASSIC_HISTOGRAM;
}

private synchronized void doObserveClassicOnly(double value) {
for (int i = 0; i < classicUpperBounds.length; ++i) {
// The last bucket is +Inf, so we always increment.
if (value <= classicUpperBounds[i]) {
classicOnlyBuckets[i]++;
break;
}
}
classicOnlySum += value;
classicOnlyCount++;
}

private void doObserve(double value, boolean fromBuffer) {
// classicUpperBounds is an empty array if this is a native histogram only.
for (int i = 0; i < classicUpperBounds.length; ++i) {
Expand Down Expand Up @@ -301,6 +335,16 @@ private void doObserve(double value, boolean fromBuffer) {

private HistogramSnapshot.HistogramDataPointSnapshot collect(Labels labels) {
Exemplars exemplars = exemplarSampler != null ? exemplarSampler.collect() : Exemplars.EMPTY;
if (isClassicOnly()) {
synchronized (this) {
return new HistogramSnapshot.HistogramDataPointSnapshot(
ClassicHistogramBuckets.of(classicUpperBounds, classicOnlyBuckets),
classicOnlySum,
labels,
exemplars,
createdTimeMillis);
}
}
return buffer.run(
expectedCount -> count.sum() == expectedCount,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,58 @@ void testObserveMultithreaded()
assertThat(executor.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
}

@Test
void testClassicOnlyObserveMultithreaded()
throws InterruptedException, ExecutionException, TimeoutException {
Histogram histogram =
Histogram.builder().name("test").classicOnly().labelNames("status").build();
int nThreads = 8;
DistributionDataPoint obs = histogram.labelValues("200");
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
CompletionService<List<HistogramSnapshot>> completionService =
new ExecutorCompletionService<>(executor);
CountDownLatch startSignal = new CountDownLatch(nThreads);
for (int t = 0; t < nThreads; t++) {
completionService.submit(
() -> {
List<HistogramSnapshot> snapshots = new ArrayList<>();
startSignal.countDown();
startSignal.await();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 1000; j++) {
obs.observe(1.1);
}
snapshots.add(histogram.collect());
}
return snapshots;
});
}
long maxCount = 0;
for (int i = 0; i < nThreads; i++) {
Future<List<HistogramSnapshot>> future = completionService.take();
List<HistogramSnapshot> snapshots = future.get(5, TimeUnit.SECONDS);
long count = 0;
for (HistogramSnapshot snapshot : snapshots) {
assertThat(snapshot.getDataPoints().size()).isOne();
HistogramSnapshot.HistogramDataPointSnapshot data =
snapshot.getDataPoints().stream().findFirst().orElseThrow(RuntimeException::new);
assertThat(data.getCount()).isGreaterThanOrEqualTo(count + 1000);
assertThat(data.getSum()).isCloseTo(data.getCount() * 1.1, offset(0.0000001));
count = data.getCount();
}
if (count > maxCount) {
maxCount = count;
}
}
assertThat(maxCount).isEqualTo(nThreads * 10_000L);
assertThat(obs.getCount()).isEqualTo(nThreads * 10_000L);
assertThat(obs.getSum()).isCloseTo(nThreads * 10_000L * 1.1, offset(0.0000001));
assertThat(nThreads * 10_000)
.isEqualTo(getBucket(histogram, 2.5, "status", "200").getCount());
executor.shutdown();
assertThat(executor.awaitTermination(5, TimeUnit.SECONDS)).isTrue();
}

@Test
void testNativeResetDuration() {
// Test that nativeResetDuration can be configured without error and the histogram
Expand Down
Loading