diff --git a/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapCounterBenchmark.java b/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapCounterBenchmark.java new file mode 100644 index 00000000000..985cbf9a734 --- /dev/null +++ b/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapCounterBenchmark.java @@ -0,0 +1,147 @@ +package datadog.trace.util; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.LongAdder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Benchmarks the "find and increment" pattern: look up an entry by key, then atomically increment + * its counter. Models per-class or per-method hit counters in the tracer. + * + *

The key insight is that {@link ConcurrentHashtable.D1} allows the counter to be embedded + * directly in the entry as a {@code volatile long} updated via {@link AtomicLongFieldUpdater}, + * avoiding the extra object allocation that {@link ConcurrentHashMap} requires when pairing each + * key with an {@link AtomicLong} or {@link LongAdder}. + * + *

Strategies compared: + * + *

+ * + *

Java 17 results ({@code @Fork(2)}, {@code @Threads(8)}, 64 pre-populated keys): + * + *

{@code
+ * Benchmark                          Score   Units
+ * increment_longAdder                   79   ops/us  (fastest)
+ * increment_atomicLong                  71   ops/us
+ * increment_concurrentHashtable         69   ops/us
+ * }
+ * + *

Key findings: + * + *

+ */ +@Fork(2) +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(MICROSECONDS) +@Threads(8) +public class ThreadSafeMapCounterBenchmark { + + static final int N_KEYS = 64; + static final int CAPACITY = 128; + + static final String[] KEYS = new String[N_KEYS]; + + static { + for (int i = 0; i < N_KEYS; ++i) { + KEYS[i] = "key-" + i; + } + } + + static final class CounterEntry extends Hashtable.D1.Entry { + private static final AtomicLongFieldUpdater COUNT = + AtomicLongFieldUpdater.newUpdater(CounterEntry.class, "count"); + + volatile long count; + + CounterEntry(String key) { + super(key); + } + + long increment() { + return COUNT.incrementAndGet(this); + } + } + + /** + * Shared state ({@link Scope#Benchmark}): one instance of each map across all threads, modelling + * a shared instrumentation counter table. + */ + @State(Scope.Benchmark) + public static class SharedState { + ConcurrentHashtable.D1 table; + ConcurrentHashMap atomicLongMap; + ConcurrentHashMap longAdderMap; + + @Setup(Level.Iteration) + public void setUp() { + table = new ConcurrentHashtable.D1<>(CAPACITY); + atomicLongMap = new ConcurrentHashMap<>(CAPACITY); + longAdderMap = new ConcurrentHashMap<>(CAPACITY); + for (int i = 0; i < N_KEYS; ++i) { + table.getOrCreate(KEYS[i], CounterEntry::new); + atomicLongMap.put(KEYS[i], new AtomicLong()); + longAdderMap.put(KEYS[i], new LongAdder()); + } + } + } + + /** Per-thread cursor so each thread cycles through keys independently. */ + @State(Scope.Thread) + public static class ThreadState { + int cursor; + + int next() { + int i = cursor; + cursor = (i + 1) & (N_KEYS - 1); + return i; + } + } + + @Benchmark + public long increment_concurrentHashtable(SharedState s, ThreadState t) { + return s.table.get(KEYS[t.next()]).increment(); + } + + @Benchmark + public long increment_atomicLong(SharedState s, ThreadState t) { + return s.atomicLongMap.get(KEYS[t.next()]).incrementAndGet(); + } + + @Benchmark + public void increment_longAdder(SharedState s, ThreadState t) { + s.longAdderMap.get(KEYS[t.next()]).increment(); + } +} diff --git a/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapD1Benchmark.java b/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapD1Benchmark.java new file mode 100644 index 00000000000..13d2825fdad --- /dev/null +++ b/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapD1Benchmark.java @@ -0,0 +1,192 @@ +package datadog.trace.util; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Compares thread-safe map strategies for shared, concurrent single-key lookups. + * + *

See {@link ThreadSafeMapD2Benchmark} for the composite-key variant, which adds the cost of + * hashing two keys and a wrapper object allocation for map-based alternatives. + * + *

The table is shared across all threads ({@link Scope#Benchmark}) and pre-populated before the + * measurement iteration — modelling the steady-state read-mostly pattern that the tracer uses (a + * per-class or per-method instrumentation cache consulted on every invocation). + * + *

Strategies compared: + * + *

    + *
  • {@link ConcurrentHashtable.D1} — lock-free reads, no extra allocation per lookup. + *
  • {@link ConcurrentHashMap} — striped locking; the key is the string itself, no wrapper. + *
  • {@link ConcurrentSkipListMap} — fully lock-free (CAS), but pays tree traversal and {@link + * Comparable} overhead on every operation. + *
  • {@link Collections#synchronizedMap} wrapping {@link HashMap} — global lock on every + * operation. Establishes the coarse-locking baseline. + *
+ * + *

Java 17 results ({@code @Fork(2)}, {@code @Threads(8)}, 64 pre-populated keys): + * + *

{@code
+ * Benchmark                             Score   Units
+ * get_concurrentHashtable               1583   ops/us  (fastest)
+ * get_concurrentHashMap                 1145   ops/us
+ * get_concurrentSkipListMap              170   ops/us
+ * get_synchronizedHashMap                 33   ops/us
+ *
+ * getOrCreate_concurrentHashtable       1450   ops/us  (fastest)
+ * getOrCreate_concurrentHashMap         1125   ops/us
+ * getOrCreate_synchronizedHashMap         31   ops/us
+ * }
+ * + *

Key findings: + * + *

    + *
  • {@code ConcurrentHashtable} is ~38% faster than {@code ConcurrentHashMap} on {@code get} + * (1583 vs 1145 ops/us); avoids the hash-to-segment translation CHM pays even on its fast + * path. + *
  • {@code ConcurrentSkipListMap} is ~9× slower than {@code ConcurrentHashMap} — tree traversal + * cost is high even under lock-free CAS. + *
  • Synchronized {@code HashMap} is ~47× slower than {@code ConcurrentHashtable}; the global + * lock serializes all 8 threads. + *
  • {@code getOrCreate} is near-identical to {@code get} because all keys are pre-populated — + * the lock branch is never taken during measurement. + *
+ */ +@Fork(2) +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(MICROSECONDS) +@Threads(8) +public class ThreadSafeMapD1Benchmark { + + static final int N_KEYS = 64; + static final int CAPACITY = 128; + + static final String[] KEYS = new String[N_KEYS]; + + static { + for (int i = 0; i < N_KEYS; ++i) { + KEYS[i] = "key-" + i; + } + } + + static final class D1Entry extends Hashtable.D1.Entry { + final long value; + + D1Entry(String key) { + super(key); + this.value = 1L; + } + } + + /** + * Shared state ({@link Scope#Benchmark}): one instance of each map across all threads, modelling + * a shared instrumentation cache. + */ + @State(Scope.Benchmark) + public static class SharedState { + ConcurrentHashtable.D1 table; + ConcurrentHashMap concurrentHashMap; + ConcurrentSkipListMap skipListMap; + Map synchronizedHashMap; + + @Setup(Level.Iteration) + public void setUp() { + table = new ConcurrentHashtable.D1<>(CAPACITY); + concurrentHashMap = new ConcurrentHashMap<>(CAPACITY); + skipListMap = new ConcurrentSkipListMap<>(); + synchronizedHashMap = Collections.synchronizedMap(new HashMap<>(CAPACITY)); + for (int i = 0; i < N_KEYS; ++i) { + table.getOrCreate(KEYS[i], D1Entry::new); + concurrentHashMap.put(KEYS[i], (long) i); + skipListMap.put(KEYS[i], (long) i); + synchronizedHashMap.put(KEYS[i], (long) i); + } + } + } + + /** Per-thread cursor so each thread cycles through keys independently. */ + @State(Scope.Thread) + public static class ThreadState { + int cursor; + + int next() { + int i = cursor; + cursor = (i + 1) & (N_KEYS - 1); + return i; + } + } + + @Benchmark + public D1Entry get_concurrentHashtable(SharedState s, ThreadState t) { + return s.table.get(KEYS[t.next()]); + } + + @Benchmark + public Long get_concurrentHashMap(SharedState s, ThreadState t) { + return s.concurrentHashMap.get(KEYS[t.next()]); + } + + @Benchmark + public Long get_concurrentSkipListMap(SharedState s, ThreadState t) { + return s.skipListMap.get(KEYS[t.next()]); + } + + @Benchmark + public Long get_synchronizedHashMap(SharedState s, ThreadState t) { + return s.synchronizedHashMap.get(KEYS[t.next()]); + } + + @Benchmark + public D1Entry getOrCreate_concurrentHashtable(SharedState s, ThreadState t) { + return s.table.getOrCreate(KEYS[t.next()], D1Entry::new); + } + + /** + * get-first pattern for CHM — the idiomatic equivalent of D1.getOrCreate on a mostly-populated + * table. + */ + @Benchmark + public Long getOrCreate_concurrentHashMap(SharedState s, ThreadState t) { + String key = KEYS[t.next()]; + Long existing = s.concurrentHashMap.get(key); + if (existing != null) { + return existing; + } + return s.concurrentHashMap.computeIfAbsent(key, k -> 0L); + } + + /** + * get-first pattern for synchronized HashMap. On hit: one lock acquire/release for get. On miss: + * a second synchronized block for the double-checked put. + */ + @Benchmark + public Long getOrCreate_synchronizedHashMap(SharedState s, ThreadState t) { + String key = KEYS[t.next()]; + Long existing = s.synchronizedHashMap.get(key); + if (existing != null) { + return existing; + } + synchronized (s.synchronizedHashMap) { + return s.synchronizedHashMap.computeIfAbsent(key, k -> 0L); + } + } +} diff --git a/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapD2Benchmark.java b/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapD2Benchmark.java new file mode 100644 index 00000000000..c98501fe820 --- /dev/null +++ b/internal-api/src/jmh/java/datadog/trace/util/ThreadSafeMapD2Benchmark.java @@ -0,0 +1,346 @@ +package datadog.trace.util; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Compares thread-safe map strategies for shared, concurrent composite-key lookups. + * + *

See {@link ThreadSafeMapD1Benchmark} for the single-key variant. + * + *

The table is shared across all threads ({@link Scope#Benchmark}) and pre-populated before the + * measurement iteration — modelling the steady-state read-mostly pattern that the tracer uses (a + * per-class or per-method instrumentation cache consulted on every invocation). + * + *

Strategies compared: + * + *

    + *
  • {@link ConcurrentHashtable.D2} — lock-free reads, no composite key allocation per lookup. + * K2 is {@link Integer} (boxed), so EA may still eliminate the box on hits, but the + * allocation is observable on misses. + *
  • {@link ConcurrentHashtable.Support} (custom entry) — same lock-free read path, but K2 is a + * primitive {@code int} embedded directly in the entry. No boxing at any point; demonstrates + * the flexibility available when {@code D2}'s object-key constraint is too limiting. + *
  • {@link ConcurrentHashMap} — striped locking, allocates a {@link Key2} wrapper per lookup + * (boxes the {@code int} K2 inside). + *
  • {@link ConcurrentSkipListMap} — fully lock-free (CAS), but pays tree traversal and {@link + * Comparable} overhead; allocates {@link Key2} per lookup. {@code getOrCreate} uses + * get-then-{@code putIfAbsent} (no native {@code computeIfAbsent}). + *
  • {@link Collections#synchronizedMap} wrapping {@link HashMap} — global lock on every + * operation; allocates {@link Key2} per lookup. Establishes the coarse-locking baseline. + *
+ * + *

Java 17 results ({@code @Fork(2)}, {@code @Threads(8)}, 64 pre-populated keys): + * + *

{@code
+ * Benchmark                              Score   Units
+ * get_concurrentHashtable                1452   ops/us  (tied fastest)
+ * get_support                            1450   ops/us  (primitive int K2)
+ * get_concurrentHashMap                   777   ops/us  (allocates Key2 wrapper)
+ * get_concurrentSkipListMap               146   ops/us
+ * get_synchronizedHashMap                  27   ops/us
+ *
+ * getOrCreate_support                    1379   ops/us  (fastest)
+ * getOrCreate_concurrentHashtable        1119   ops/us
+ * getOrCreate_concurrentHashMap           769   ops/us
+ * getOrCreate_concurrentSkipListMap       151   ops/us
+ * getOrCreate_synchronizedHashMap          28   ops/us
+ * }
+ * + *

Key findings: + * + *

    + *
  • {@code ConcurrentHashtable} and {@code Support} are neck-and-neck on {@code get} (1452 vs + * 1450 ops/us); both avoid the {@link Key2} wrapper allocation that {@code ConcurrentHashMap} + * requires on every lookup. + *
  • {@code ConcurrentHashMap} is ~2× slower than {@code ConcurrentHashtable} on {@code get} + * (777 vs 1452 ops/us) — the {@link Key2} allocation plus two-level hash lookup adds up. + *
  • {@code Support} shows slightly higher {@code getOrCreate} throughput than {@code D2} (1379 + * vs 1119 ops/us) because its primitive {@code int} K2 field avoids boxing inside the entry + * match on the write-path re-check. + *
  • {@code ConcurrentSkipListMap} is ~5× slower than {@code ConcurrentHashMap} due to tree + * traversal; the two-traversal {@code getOrCreate} pattern adds further overhead on misses. + *
  • Synchronized {@code HashMap} is ~50× slower than {@code ConcurrentHashtable}. + *
+ */ +@Fork(2) +@Warmup(iterations = 2) +@Measurement(iterations = 3) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(MICROSECONDS) +@Threads(8) +public class ThreadSafeMapD2Benchmark { + + static final int N_KEYS = 64; + static final int CAPACITY = 128; + + static final String[] SOURCE_K1 = new String[N_KEYS]; + static final Integer[] SOURCE_K2 = new Integer[N_KEYS]; + static final int[] SOURCE_K2_INT = new int[N_KEYS]; + + static { + for (int i = 0; i < N_KEYS; ++i) { + SOURCE_K1[i] = "key-" + i; + SOURCE_K2_INT[i] = i * 31 + 17; + SOURCE_K2[i] = SOURCE_K2_INT[i]; + } + } + + static final class D2Entry extends Hashtable.D2.Entry { + final long value; + + D2Entry(String k1, Integer k2) { + super(k1, k2); + this.value = 1L; + } + } + + /** + * Support-based entry with a primitive {@code int} K2 — no boxing at any point. The hash is + * computed with the same formula as {@link Hashtable.D2.Entry#hash} but avoids the {@link + * Integer#hashCode(int)} boxing path by calling {@link LongHashingUtils} directly. + */ + static final class SupportEntry extends Hashtable.Entry { + final String k1; + final int k2; + final long value; + + SupportEntry(String k1, int k2) { + super(hash(k1, k2)); + this.k1 = k1; + this.k2 = k2; + this.value = 1L; + } + + static long hash(String k1, int k2) { + return LongHashingUtils.hash(k1.hashCode(), Integer.hashCode(k2)); + } + + boolean matches(String k1, int k2) { + return this.k2 == k2 && this.k1.equals(k1); + } + } + + /** Composite key for map-based baselines. */ + static final class Key2 implements Comparable { + final String k1; + final Integer k2; + final int hash; + + Key2(String k1, Integer k2) { + this.k1 = k1; + this.k2 = k2; + this.hash = Objects.hash(k1, k2); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Key2)) { + return false; + } + Key2 other = (Key2) o; + return Objects.equals(k1, other.k1) && Objects.equals(k2, other.k2); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public int compareTo(Key2 other) { + int c = k1.compareTo(other.k1); + return c != 0 ? c : k2.compareTo(other.k2); + } + } + + /** + * Shared state ({@link Scope#Benchmark}): one instance of each map across all threads, modelling + * a shared instrumentation cache. + */ + @State(Scope.Benchmark) + public static class SharedState { + ConcurrentHashtable.D2 table; + java.util.concurrent.atomic.AtomicReferenceArray supportBuckets; + ConcurrentHashMap concurrentHashMap; + ConcurrentSkipListMap skipListMap; + Map synchronizedHashMap; + + @Setup(Level.Iteration) + public void setUp() { + table = new ConcurrentHashtable.D2<>(CAPACITY); + supportBuckets = + new java.util.concurrent.atomic.AtomicReferenceArray<>( + Hashtable.Support.sizeFor(CAPACITY)); + concurrentHashMap = new ConcurrentHashMap<>(CAPACITY); + skipListMap = new ConcurrentSkipListMap<>(); + synchronizedHashMap = Collections.synchronizedMap(new HashMap<>(CAPACITY)); + for (int i = 0; i < N_KEYS; ++i) { + int k2 = SOURCE_K2[i]; + table.getOrCreate(SOURCE_K1[i], SOURCE_K2[i], D2Entry::new); + // populate support table + SupportEntry se = new SupportEntry(SOURCE_K1[i], k2); + int idx = ConcurrentHashtable.Support.bucketIndex(supportBuckets, se.keyHash); + se.setNext(ConcurrentHashtable.Support.bucket(supportBuckets, idx)); + supportBuckets.set(idx, se); + Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]); + concurrentHashMap.put(key, (long) i); + skipListMap.put(key, (long) i); + synchronizedHashMap.put(key, (long) i); + } + } + } + + /** Per-thread cursor so each thread cycles through keys independently. */ + @State(Scope.Thread) + public static class ThreadState { + int cursor; + + int next() { + int i = cursor; + cursor = (i + 1) & (N_KEYS - 1); + return i; + } + } + + @Benchmark + public D2Entry get_concurrentHashtable(SharedState s, ThreadState t) { + int i = t.next(); + return s.table.get(SOURCE_K1[i], SOURCE_K2[i]); + } + + @Benchmark + public SupportEntry get_support(SharedState s, ThreadState t) { + int i = t.next(); + String k1 = SOURCE_K1[i]; + int k2 = SOURCE_K2_INT[i]; + long keyHash = SupportEntry.hash(k1, k2); + for (SupportEntry e = ConcurrentHashtable.Support.bucket(s.supportBuckets, keyHash); + e != null; + e = e.next()) { + if (e.keyHash == keyHash && e.matches(k1, k2)) { + return e; + } + } + return null; + } + + @Benchmark + public Long get_concurrentHashMap(SharedState s, ThreadState t) { + int i = t.next(); + return s.concurrentHashMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i])); + } + + @Benchmark + public Long get_concurrentSkipListMap(SharedState s, ThreadState t) { + int i = t.next(); + return s.skipListMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i])); + } + + @Benchmark + public Long get_synchronizedHashMap(SharedState s, ThreadState t) { + int i = t.next(); + return s.synchronizedHashMap.get(new Key2(SOURCE_K1[i], SOURCE_K2[i])); + } + + @Benchmark + public D2Entry getOrCreate_concurrentHashtable(SharedState s, ThreadState t) { + int i = t.next(); + return s.table.getOrCreate(SOURCE_K1[i], SOURCE_K2[i], D2Entry::new); + } + + @Benchmark + public SupportEntry getOrCreate_support(SharedState s, ThreadState t) { + int i = t.next(); + String k1 = SOURCE_K1[i]; + int k2 = SOURCE_K2_INT[i]; + long keyHash = SupportEntry.hash(k1, k2); + int index = ConcurrentHashtable.Support.bucketIndex(s.supportBuckets, keyHash); + for (SupportEntry e = ConcurrentHashtable.Support.bucket(s.supportBuckets, index); + e != null; + e = e.next()) { + if (e.keyHash == keyHash && e.matches(k1, k2)) { + return e; + } + } + synchronized (s.supportBuckets) { + for (SupportEntry e = ConcurrentHashtable.Support.bucket(s.supportBuckets, index); + e != null; + e = e.next()) { + if (e.keyHash == keyHash && e.matches(k1, k2)) { + return e; + } + } + SupportEntry newEntry = new SupportEntry(k1, k2); + newEntry.setNext(ConcurrentHashtable.Support.bucket(s.supportBuckets, index)); + s.supportBuckets.set(index, newEntry); + return newEntry; + } + } + + /** + * get-first pattern for CHM to avoid capturing-lambda allocation on hits — the idiomatic + * equivalent of D2.getOrCreate on a mostly-populated table. + */ + @Benchmark + public Long getOrCreate_concurrentHashMap(SharedState s, ThreadState t) { + int i = t.next(); + Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]); + Long existing = s.concurrentHashMap.get(key); + if (existing != null) { + return existing; + } + return s.concurrentHashMap.computeIfAbsent(key, k -> 0L); + } + + /** + * get-first pattern for ConcurrentSkipListMap — manual get-then-putIfAbsent since CSLM has no + * computeIfAbsent. Two traversals on miss; one on hit. + */ + @Benchmark + public Long getOrCreate_concurrentSkipListMap(SharedState s, ThreadState t) { + int i = t.next(); + Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]); + Long existing = s.skipListMap.get(key); + if (existing != null) { + return existing; + } + Long prev = s.skipListMap.putIfAbsent(key, 0L); + return prev != null ? prev : 0L; + } + + /** + * get-first pattern for synchronized HashMap. On hit: one lock acquire/release for get. On miss: + * a second synchronized block for the double-checked put. + */ + @Benchmark + public Long getOrCreate_synchronizedHashMap(SharedState s, ThreadState t) { + int i = t.next(); + Key2 key = new Key2(SOURCE_K1[i], SOURCE_K2[i]); + Long existing = s.synchronizedHashMap.get(key); + if (existing != null) { + return existing; + } + synchronized (s.synchronizedHashMap) { + return s.synchronizedHashMap.computeIfAbsent(key, k -> 0L); + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/util/ConcurrentHashtable.java b/internal-api/src/main/java/datadog/trace/util/ConcurrentHashtable.java new file mode 100644 index 00000000000..03db0b86739 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/util/ConcurrentHashtable.java @@ -0,0 +1,261 @@ +package datadog.trace.util; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Concurrent counterpart to {@link Hashtable}. Provides lock-free reads and locked writes for + * {@link D1} (single-key) and {@link D2} (composite-key) tables. + * + *

Like {@link Hashtable}, capacity is fixed at construction and the table does not resize. + * Unlike {@link Hashtable}, all operations are safe for concurrent access without external + * synchronization. + * + *

The primary advantage over {@link java.util.concurrent.ConcurrentHashMap} for composite-key + * use cases is that {@link D2#get(Object, Object)} and {@link D2#getOrCreate(Object, Object, + * BiFunction)} accept key parts directly — no composite key object is allocated for the lookup. + * {@code ConcurrentHashMap} requires a wrapper object whose ownership may transfer to the map on + * insert; escape analysis must conservatively assume the key escapes even on hit paths, preventing + * scalar replacement. + * + *

Memory model. Bucket slots are held in an {@link AtomicReferenceArray}, so each {@link + * #get} begins with a volatile read of the slot. Entries are inserted at the bucket head: the new + * entry's {@code next} pointer is set before the volatile slot write, so any subsequent volatile + * read of that slot carries happens-before over the full chain — chain {@code next} fields do not + * need to be volatile. + */ +public final class ConcurrentHashtable { + private ConcurrentHashtable() {} + + /** + * Single-key concurrent hash table. Lock-free on hit; locked on miss. + * + * @param the key type + * @param the user's {@link Hashtable.D1.Entry D1.Entry<K>} subclass + */ + public static final class D1> { + + private final AtomicReferenceArray buckets; + private final AtomicInteger size = new AtomicInteger(); + + public D1(int capacity) { + this.buckets = new AtomicReferenceArray<>(Hashtable.Support.sizeFor(capacity)); + } + + public int size() { + return size.get(); + } + + public TEntry get(K key) { + long keyHash = Hashtable.D1.Entry.hash(key); + for (TEntry te = Support.bucket(buckets, keyHash); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key)) { + return te; + } + } + return null; + } + + /** + * Returns the entry for {@code key}, creating one via {@code creator} if absent. Lock-free on + * hit; acquires a table-level lock on miss. Re-checks under the lock to avoid duplicate entries + * under concurrent misses. + */ + public TEntry getOrCreate(K key, Function creator) { + long keyHash = Hashtable.D1.Entry.hash(key); + int index = Support.bucketIndex(buckets, keyHash); + for (TEntry te = Support.bucket(buckets, index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key)) { + return te; + } + } + synchronized (this) { + for (TEntry te = Support.bucket(buckets, index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key)) { + return te; + } + } + TEntry newEntry = creator.apply(key); + newEntry.setNext(Support.bucket(buckets, index)); + buckets.set(index, newEntry); + size.incrementAndGet(); + return newEntry; + } + } + + public void forEach(Consumer consumer) { + Support.forEach(buckets, consumer); + } + + /** + * Context-passing forEach. Avoids a capturing-lambda allocation — pass a non-capturing {@link + * BiConsumer} (typically a {@code static final}) plus whatever side-band state it needs. + */ + public void forEach(T context, BiConsumer consumer) { + Support.forEach(buckets, context, consumer); + } + } + + /** + * Two-key (composite-key) concurrent hash table. Lock-free on hit; locked on miss. + * + *

Key parts are passed directly to {@link #get} and {@link #getOrCreate}, eliminating the + * per-lookup composite key object allocation that {@code ConcurrentHashMap, V>} + * requires. + * + * @param first key type + * @param second key type + * @param the user's {@link Hashtable.D2.Entry D2.Entry<K1, K2>} subclass + */ + public static final class D2> { + + private final AtomicReferenceArray buckets; + private final AtomicInteger size = new AtomicInteger(); + + public D2(int capacity) { + this.buckets = new AtomicReferenceArray<>(Hashtable.Support.sizeFor(capacity)); + } + + public int size() { + return size.get(); + } + + public TEntry get(K1 key1, K2 key2) { + long keyHash = Hashtable.D2.Entry.hash(key1, key2); + for (TEntry te = Support.bucket(buckets, keyHash); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key1, key2)) { + return te; + } + } + return null; + } + + /** + * Returns the entry for {@code (key1, key2)}, creating one via {@code creator} if absent. + * Lock-free on hit; acquires a table-level lock on miss. Re-checks under the lock to avoid + * duplicate entries under concurrent misses. + * + *

The {@code creator} should build an entry whose {@code keyHash} equals {@link + * Hashtable.D2.Entry#hash(Object, Object) D2.Entry.hash(key1, key2)}. + */ + public TEntry getOrCreate( + K1 key1, K2 key2, BiFunction creator) { + long keyHash = Hashtable.D2.Entry.hash(key1, key2); + int index = Support.bucketIndex(buckets, keyHash); + for (TEntry te = Support.bucket(buckets, index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key1, key2)) { + return te; + } + } + synchronized (this) { + for (TEntry te = Support.bucket(buckets, index); te != null; te = te.next()) { + if (te.keyHash == keyHash && te.matches(key1, key2)) { + return te; + } + } + TEntry newEntry = creator.apply(key1, key2); + newEntry.setNext(Support.bucket(buckets, index)); + buckets.set(index, newEntry); + size.incrementAndGet(); + return newEntry; + } + } + + public void forEach(Consumer consumer) { + Support.forEach(buckets, consumer); + } + + /** + * Context-passing forEach. Avoids a capturing-lambda allocation — pass a non-capturing {@link + * BiConsumer} (typically a {@code static final}) plus whatever side-band state it needs. + */ + public void forEach(T context, BiConsumer consumer) { + Support.forEach(buckets, context, consumer); + } + } + + /** + * Building blocks for concurrent hash-table operations, mirroring {@link Hashtable.Support}. + * + *

Use {@link D1} or {@link D2} when their object-key constraints are acceptable — they handle + * synchronization internally. Use {@code Support} directly only when you need primitive key + * components or other entry-level flexibility that {@code D1}/{@code D2} cannot provide. + * + *

Synchronization contract. {@link #bucket} performs a volatile read of the bucket slot + * and is safe to call from any thread without a lock — this is the lock-free read path. Writes + * (inserting a new entry) are the caller's responsibility: use the same double-checked locking + * pattern that {@link D1} and {@link D2} use internally — + * + *

    + *
  1. Lock-free pre-check: walk the chain via {@link #bucket}; return if found. + *
  2. Acquire a lock on a stable object owned by the same class that owns the {@code buckets} + * array (typically {@code synchronized (this)}). + *
  3. Re-check under the lock (another thread may have inserted between step 1 and step 2). + *
  4. Build the new entry, set its {@code next} via {@link Hashtable.Entry#setNext}, then write + * it to the bucket with {@link AtomicReferenceArray#set} (volatile write). + *
+ * + * Locking on the {@code AtomicReferenceArray} itself is also valid but no cleaner — pick + * whichever lock object is most natural for the owning class. + * + *

One advantage of using {@code Support} directly over {@link D1}/{@link D2} is that the + * caller controls the lock object, enabling lock striping: shard the lock by bucket index or key + * hash to reduce write-path contention if profiling shows the single table-level lock is a + * bottleneck. + */ + public static final class Support { + private Support() {} + + public static int bucketIndex(AtomicReferenceArray buckets, long keyHash) { + return (int) (keyHash & (buckets.length() - 1)); + } + + /** + * Returns the head entry of the bucket that {@code keyHash} maps to, cast to the caller's + * concrete entry type. The unchecked cast lives here so chain-walk loops at call sites don't + * need to thread a raw {@link Hashtable.Entry} variable through. + */ + @SuppressWarnings("unchecked") + public static TEntry bucket( + AtomicReferenceArray buckets, long keyHash) { + return (TEntry) buckets.get(bucketIndex(buckets, keyHash)); + } + + /** + * Returns the head entry of the bucket at {@code index}, cast to the caller's concrete entry + * type. Use when the bucket index is already computed (e.g. inside {@code getOrCreate} where + * the same index is reused across the lock boundary). + */ + @SuppressWarnings("unchecked") + public static TEntry bucket( + AtomicReferenceArray buckets, int index) { + return (TEntry) buckets.get(index); + } + + @SuppressWarnings("unchecked") + public static void forEach( + AtomicReferenceArray buckets, Consumer consumer) { + for (int i = 0; i < buckets.length(); i++) { + for (TEntry te = (TEntry) buckets.get(i); te != null; te = te.next()) { + consumer.accept(te); + } + } + } + + @SuppressWarnings("unchecked") + public static void forEach( + AtomicReferenceArray buckets, + T context, + BiConsumer consumer) { + for (int i = 0; i < buckets.length(); i++) { + for (TEntry te = (TEntry) buckets.get(i); te != null; te = te.next()) { + consumer.accept(context, te); + } + } + } + } +} diff --git a/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD1Test.java b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD1Test.java new file mode 100644 index 00000000000..aff0c47537a --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD1Test.java @@ -0,0 +1,230 @@ +package datadog.trace.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class ConcurrentHashtableD1Test { + + @Test + void getReturnsMappedEntry() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + StringEntry e = table.getOrCreate("hello", k -> new StringEntry(k, 42)); + assertSame(e, table.get("hello")); + assertNull(table.get("world")); + } + + @Test + void getOrCreateOnMissBuildsEntry() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + int[] createCount = {0}; + StringEntry created = + table.getOrCreate( + "a", + k -> { + createCount[0]++; + return new StringEntry(k, 1); + }); + assertNotNull(created); + assertEquals(1, table.size()); + assertEquals(1, createCount[0]); + assertSame(created, table.get("a")); + } + + @Test + void getOrCreateOnHitSkipsCreator() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + StringEntry seeded = table.getOrCreate("a", k -> new StringEntry(k, 100)); + int[] createCount = {0}; + StringEntry got = + table.getOrCreate( + "a", + k -> { + createCount[0]++; + return new StringEntry(k, 999); + }); + assertSame(seeded, got); + assertEquals(1, table.size()); + assertEquals(0, createCount[0]); + } + + @Test + void nullKeyIsSupported() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + StringEntry e = table.getOrCreate(null, k -> new StringEntry(k, 0)); + assertNotNull(e); + assertSame(e, table.get(null)); + } + + @Test + void forEachVisitsAllEntries() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + table.getOrCreate("a", k -> new StringEntry(k, 1)); + table.getOrCreate("b", k -> new StringEntry(k, 2)); + table.getOrCreate("c", k -> new StringEntry(k, 3)); + Set seen = new HashSet<>(); + table.forEach(e -> seen.add(e.key)); + assertEquals(3, seen.size()); + assertTrue(seen.contains("a")); + assertTrue(seen.contains("b")); + assertTrue(seen.contains("c")); + } + + @Test + void forEachWithContextPassesContext() { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + table.getOrCreate("x", k -> new StringEntry(k, 10)); + table.getOrCreate("y", k -> new StringEntry(k, 20)); + Set seen = new HashSet<>(); + table.forEach(seen, (ctx, e) -> ctx.add(e.key)); + assertEquals(2, seen.size()); + assertTrue(seen.contains("x")); + assertTrue(seen.contains("y")); + } + + @Test + void concurrentGetOrCreateProducesExactlyOneEntry() throws InterruptedException { + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(8); + int threads = 16; + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + AtomicInteger createCount = new AtomicInteger(); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate( + "shared", + k -> { + createCount.incrementAndGet(); + return new StringEntry(k, 1); + }); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(1, table.size()); + assertEquals(1, createCount.get()); + } + + @Test + void chainedEntriesInSameBucketAreAllReachable() { + // 2 buckets: keyHash & 1 determines the slot. Hashes 0 and 2 both land in bucket 0. + ConcurrentHashtable.D1 table = new ConcurrentHashtable.D1<>(2); + CollidingKey a = new CollidingKey("a", 0); + CollidingKey b = new CollidingKey("b", 0); // same bucket as a + CollidingKey c = new CollidingKey("c", 2); // 2 & 1 == 0, same bucket + CollidingEntry ea = table.getOrCreate(a, CollidingEntry::new); + CollidingEntry eb = table.getOrCreate(b, CollidingEntry::new); + CollidingEntry ec = table.getOrCreate(c, CollidingEntry::new); + assertEquals(3, table.size()); + assertSame(ea, table.get(a)); + assertSame(eb, table.get(b)); + assertSame(ec, table.get(c)); + assertNull(table.get(new CollidingKey("d", 0))); // same bucket, different label → miss + } + + @Test + void concurrentDistinctKeyInsertionsAreAllRetained() throws InterruptedException { + int threads = 16; + String[] keys = new String[threads]; + for (int i = 0; i < threads; i++) { + keys[i] = "key-" + i; + } + ConcurrentHashtable.D1 table = + new ConcurrentHashtable.D1<>(threads * 2); + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + final String key = keys[i]; + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate(key, k -> new StringEntry(k, 1)); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(threads, table.size()); + for (String key : keys) { + assertNotNull(table.get(key)); + } + } + + // Reuses Hashtable.D1.Entry — ConcurrentHashtable.D1 accepts any D1.Entry subclass. + private static final class StringEntry extends Hashtable.D1.Entry { + final int value; + + StringEntry(String key, int value) { + super(key); + this.value = value; + } + } + + /** Key with a fixed hashCode to force deterministic bucket placement. */ + private static final class CollidingKey { + final String label; + final int fixedHash; + + CollidingKey(String label, int fixedHash) { + this.label = label; + this.fixedHash = fixedHash; + } + + @Override + public int hashCode() { + return fixedHash; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof CollidingKey)) { + return false; + } + CollidingKey that = (CollidingKey) o; + return fixedHash == that.fixedHash && label.equals(that.label); + } + } + + private static final class CollidingEntry extends Hashtable.D1.Entry { + CollidingEntry(CollidingKey key) { + super(key); + } + } +} diff --git a/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD2Test.java b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD2Test.java new file mode 100644 index 00000000000..46089bf6563 --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/util/ConcurrentHashtableD2Test.java @@ -0,0 +1,197 @@ +package datadog.trace.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Test; + +class ConcurrentHashtableD2Test { + + @Test + void pairKeysParticipateInIdentity() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + PairEntry ab = table.getOrCreate("a", 1, PairEntry::new); + PairEntry ac = table.getOrCreate("a", 2, PairEntry::new); + PairEntry bb = table.getOrCreate("b", 1, PairEntry::new); + assertEquals(3, table.size()); + assertSame(ab, table.get("a", 1)); + assertSame(ac, table.get("a", 2)); + assertSame(bb, table.get("b", 1)); + assertNull(table.get("a", 3)); + } + + @Test + void getOrCreateOnMissBuildsEntryViaCreator() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + int[] createCount = {0}; + PairEntry created = + table.getOrCreate( + "a", + 1, + (k1, k2) -> { + createCount[0]++; + return new PairEntry(k1, k2); + }); + assertNotNull(created); + assertEquals("a", created.key1); + assertEquals(Integer.valueOf(1), created.key2); + assertEquals(1, table.size()); + assertEquals(1, createCount[0]); + assertSame(created, table.get("a", 1)); + } + + @Test + void getOrCreateOnHitSkipsCreator() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + PairEntry seeded = table.getOrCreate("a", 1, PairEntry::new); + int[] createCount = {0}; + PairEntry got = + table.getOrCreate( + "a", + 1, + (k1, k2) -> { + createCount[0]++; + return new PairEntry(k1, k2); + }); + assertSame(seeded, got); + assertEquals(1, table.size()); + assertEquals(0, createCount[0]); + } + + @Test + void forEachVisitsBothPairs() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + table.getOrCreate("a", 1, PairEntry::new); + table.getOrCreate("b", 2, PairEntry::new); + Set seen = new HashSet<>(); + table.forEach(e -> seen.add(e.key1 + ":" + e.key2)); + assertEquals(2, seen.size()); + assertTrue(seen.contains("a:1")); + assertTrue(seen.contains("b:2")); + } + + @Test + void forEachWithContextPassesContextToConsumer() { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + table.getOrCreate("a", 1, PairEntry::new); + table.getOrCreate("b", 2, PairEntry::new); + Set seen = new HashSet<>(); + table.forEach(seen, (ctx, e) -> ctx.add(e.key1 + ":" + e.key2)); + assertEquals(2, seen.size()); + assertTrue(seen.contains("a:1")); + assertTrue(seen.contains("b:2")); + } + + @Test + void concurrentGetOrCreateProducesExactlyOneEntry() throws InterruptedException { + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(8); + int threads = 16; + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + AtomicInteger createCount = new AtomicInteger(); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate( + "shared", + 42, + (k1, k2) -> { + createCount.incrementAndGet(); + return new PairEntry(k1, k2); + }); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(1, table.size()); + assertEquals(1, createCount.get()); + } + + @Test + void chainedEntriesInSameBucketAreAllReachable() { + // 2 buckets: 4 entries guarantees at least 2 share a bucket by pigeonhole. + ConcurrentHashtable.D2 table = new ConcurrentHashtable.D2<>(2); + PairEntry e1 = table.getOrCreate("a", 1, PairEntry::new); + PairEntry e2 = table.getOrCreate("a", 2, PairEntry::new); + PairEntry e3 = table.getOrCreate("b", 1, PairEntry::new); + PairEntry e4 = table.getOrCreate("b", 2, PairEntry::new); + assertEquals(4, table.size()); + assertSame(e1, table.get("a", 1)); + assertSame(e2, table.get("a", 2)); + assertSame(e3, table.get("b", 1)); + assertSame(e4, table.get("b", 2)); + assertNull(table.get("a", 3)); + } + + @Test + void concurrentDistinctKeyInsertionsAreAllRetained() throws InterruptedException { + int threads = 16; + String[] k1s = new String[threads]; + Integer[] k2s = new Integer[threads]; + for (int i = 0; i < threads; i++) { + k1s[i] = "key-" + i; + k2s[i] = i; + } + ConcurrentHashtable.D2 table = + new ConcurrentHashtable.D2<>(threads * 2); + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch go = new CountDownLatch(1); + + Thread[] workers = new Thread[threads]; + for (int i = 0; i < threads; i++) { + final String k1 = k1s[i]; + final Integer k2 = k2s[i]; + workers[i] = + new Thread( + () -> { + ready.countDown(); + try { + go.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return; + } + table.getOrCreate(k1, k2, PairEntry::new); + }); + workers[i].start(); + } + ready.await(); + go.countDown(); + for (Thread w : workers) { + w.join(); + } + + assertEquals(threads, table.size()); + for (int i = 0; i < threads; i++) { + assertNotNull(table.get(k1s[i], k2s[i])); + } + } + + private static final class PairEntry extends Hashtable.D2.Entry { + PairEntry(String key1, Integer key2) { + super(key1, key2); + } + } +}