Skip to content

Commit 585508f

Browse files
committed
FME-15373-impressions: introduce UniqueKeysSender interface; decouple UniqueKeysTrackerImp from TelemetrySynchronizer
- Add UniqueKeysSender interface replacing TelemetrySynchronizer dependency - git mv UniqueKeysTrackerImp to impressions module - Constructor now accepts UniqueKeysSender instead of TelemetrySynchronizer - Replace Lists.partition (Guava) with local partition() helper using subList - Replace SplitExecutorFactory with Executors.newSingleThreadScheduledExecutor - Update tests to mock UniqueKeysSender AI-Session-Id: 52375eb8-af89-45b8-bbad-1698b6636202 AI-Tool: claude-code AI-Model: unknown
1 parent 2061b95 commit 585508f

3 files changed

Lines changed: 211 additions & 20 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.split.client.impressions;
2+
3+
import io.split.client.dtos.UniqueKeys;
4+
5+
public interface UniqueKeysSender {
6+
void send(UniqueKeys uniqueKeys);
7+
}

client/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java renamed to impressions/src/main/java/io/split/client/impressions/UniqueKeysTrackerImp.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package io.split.client.impressions;
22

3-
import com.google.common.collect.Lists;
43
import io.split.client.dtos.UniqueKeys;
54
import io.split.client.impressions.filters.BloomFilterImp;
65
import io.split.client.impressions.filters.Filter;
76
import io.split.client.impressions.filters.FilterAdapter;
87
import io.split.client.impressions.filters.FilterAdapterImpl;
9-
import io.split.client.utils.SplitExecutorFactory;
10-
import io.split.telemetry.synchronizer.TelemetrySynchronizer;
118
import org.slf4j.Logger;
129
import org.slf4j.LoggerFactory;
1310

@@ -18,38 +15,43 @@
1815
import java.util.Map;
1916
import java.util.Optional;
2017
import java.util.concurrent.ConcurrentHashMap;
18+
import java.util.concurrent.Executors;
2119
import java.util.concurrent.ScheduledExecutorService;
2220
import java.util.concurrent.ThreadFactory;
2321
import java.util.concurrent.TimeUnit;
2422
import java.util.concurrent.atomic.AtomicBoolean;
2523
import java.util.concurrent.atomic.AtomicInteger;
2624

27-
public class UniqueKeysTrackerImp implements UniqueKeysTracker{
25+
public class UniqueKeysTrackerImp implements UniqueKeysTracker {
2826
private static final Logger _log = LoggerFactory.getLogger(UniqueKeysTrackerImp.class);
2927
private static final double MARGIN_ERROR = 0.01;
3028
private static final int MAX_UNIQUE_KEYS_POST_SIZE = 5000;
3129
private static final int MAX_AMOUNT_OF_KEYS = 10000000;
3230
private final AtomicInteger trackerKeysSize = new AtomicInteger(0);
3331
private FilterAdapter filterAdapter;
34-
private final TelemetrySynchronizer _telemetrySynchronizer;
32+
private final UniqueKeysSender _uniqueKeysSender;
3533
private final ScheduledExecutorService _uniqueKeysSyncScheduledExecutorService;
3634
private final ScheduledExecutorService _cleanFilterScheduledExecutorService;
37-
private final ConcurrentHashMap<String,HashSet<String>> uniqueKeysTracker;
35+
private final ConcurrentHashMap<String, HashSet<String>> uniqueKeysTracker;
3836
private final int _uniqueKeysRefreshRate;
3937
private final int _filterRefreshRate;
4038
private final AtomicBoolean sendGuard = new AtomicBoolean(false);
4139
private static final Logger _logger = LoggerFactory.getLogger(UniqueKeysTrackerImp.class);
4240

43-
public UniqueKeysTrackerImp(TelemetrySynchronizer telemetrySynchronizer, int uniqueKeysRefreshRate, int filterRefreshRate,
41+
public UniqueKeysTrackerImp(UniqueKeysSender uniqueKeysSender, int uniqueKeysRefreshRate, int filterRefreshRate,
4442
ThreadFactory threadFactory) {
4543
Filter bloomFilter = new BloomFilterImp(MAX_AMOUNT_OF_KEYS, MARGIN_ERROR);
4644
this.filterAdapter = new FilterAdapterImpl(bloomFilter);
4745
uniqueKeysTracker = new ConcurrentHashMap<>();
48-
_telemetrySynchronizer = telemetrySynchronizer;
46+
_uniqueKeysSender = uniqueKeysSender;
4947
_uniqueKeysRefreshRate = uniqueKeysRefreshRate;
5048
_filterRefreshRate = filterRefreshRate;
51-
_uniqueKeysSyncScheduledExecutorService = SplitExecutorFactory.buildSingleThreadScheduledExecutor(threadFactory,"UniqueKeys-sync-%d");
52-
_cleanFilterScheduledExecutorService = SplitExecutorFactory.buildSingleThreadScheduledExecutor(threadFactory,"Filter-%d");
49+
_uniqueKeysSyncScheduledExecutorService = threadFactory != null
50+
? Executors.newSingleThreadScheduledExecutor(threadFactory)
51+
: Executors.newSingleThreadScheduledExecutor();
52+
_cleanFilterScheduledExecutorService = threadFactory != null
53+
? Executors.newSingleThreadScheduledExecutor(threadFactory)
54+
: Executors.newSingleThreadScheduledExecutor();
5355
}
5456

5557
@Override
@@ -66,7 +68,7 @@ public boolean track(String featureFlagName, String key) {
6668
return keysByFeature;
6769
});
6870
_logger.debug("The feature flag " + featureFlagName + " and key " + key + " was added");
69-
if (trackerKeysSize.intValue() >= MAX_UNIQUE_KEYS_POST_SIZE){
71+
if (trackerKeysSize.intValue() >= MAX_UNIQUE_KEYS_POST_SIZE) {
7072
_logger.warn("The UniqueKeysTracker size reached the maximum limit");
7173
try {
7274
sendUniqueKeys();
@@ -105,8 +107,8 @@ public void stop() {
105107
_cleanFilterScheduledExecutorService.shutdown();
106108
}
107109

108-
public HashMap<String,HashSet<String>> popAll(){
109-
HashMap<String,HashSet<String>> toReturn = new HashMap<>();
110+
public HashMap<String, HashSet<String>> popAll() {
111+
HashMap<String, HashSet<String>> toReturn = new HashMap<>();
110112
for (String key : uniqueKeysTracker.keySet()) {
111113
HashSet<String> value = uniqueKeysTracker.remove(key);
112114
toReturn.put(key, value);
@@ -115,7 +117,7 @@ public HashMap<String,HashSet<String>> popAll(){
115117
return toReturn;
116118
}
117119

118-
private void sendUniqueKeys(){
120+
private void sendUniqueKeys() {
119121
if (!sendGuard.compareAndSet(false, true)) {
120122
_log.debug("SendUniqueKeys already running");
121123
return;
@@ -136,7 +138,7 @@ private void sendUniqueKeys(){
136138
uniqueKeysFromPopAll = capChunksToMaxSize(uniqueKeysFromPopAll);
137139

138140
for (List<UniqueKeys.UniqueKey> chunk : getChunks(uniqueKeysFromPopAll)) {
139-
_telemetrySynchronizer.synchronizeUniqueKeys(new UniqueKeys(chunk));
141+
_uniqueKeysSender.send(new UniqueKeys(chunk));
140142
}
141143
} finally {
142144
sendGuard.set(false);
@@ -147,7 +149,7 @@ private List<UniqueKeys.UniqueKey> capChunksToMaxSize(List<UniqueKeys.UniqueKey>
147149
List<UniqueKeys.UniqueKey> finalChunk = new ArrayList<>();
148150
for (UniqueKeys.UniqueKey uniqueKey : uniqueKeys) {
149151
if (uniqueKey.keysDto.size() > MAX_UNIQUE_KEYS_POST_SIZE) {
150-
for(List<String> subChunk : Lists.partition(uniqueKey.keysDto, MAX_UNIQUE_KEYS_POST_SIZE)) {
152+
for (List<String> subChunk : partition(uniqueKey.keysDto, MAX_UNIQUE_KEYS_POST_SIZE)) {
151153
finalChunk.add(new UniqueKeys.UniqueKey(uniqueKey.featureName, subChunk));
152154
}
153155
continue;
@@ -157,6 +159,14 @@ private List<UniqueKeys.UniqueKey> capChunksToMaxSize(List<UniqueKeys.UniqueKey>
157159
return finalChunk;
158160
}
159161

162+
private static <T> List<List<T>> partition(List<T> list, int size) {
163+
List<List<T>> result = new ArrayList<>();
164+
for (int i = 0; i < list.size(); i += size) {
165+
result.add(list.subList(i, Math.min(i + size, list.size())));
166+
}
167+
return result;
168+
}
169+
160170
private List<List<UniqueKeys.UniqueKey>> getChunks(List<UniqueKeys.UniqueKey> uniqueKeys) {
161171
List<List<UniqueKeys.UniqueKey>> chunks = new ArrayList<>();
162172
List<UniqueKeys.UniqueKey> intermediateChunk = new ArrayList<>();
@@ -180,21 +190,19 @@ private int getChunkSize(List<UniqueKeys.UniqueKey> uniqueKeysChunk) {
180190
}
181191
return totalSize;
182192
}
183-
184-
private interface ExecuteUniqueKeysAction{
193+
194+
private interface ExecuteUniqueKeysAction {
185195
void execute();
186196
}
187197

188198
private class ExecuteCleanFilter implements ExecuteUniqueKeysAction {
189-
190199
@Override
191200
public void execute() {
192201
filterAdapter.clear();
193202
}
194203
}
195204

196205
private class ExecuteSendUniqueKeys implements ExecuteUniqueKeysAction {
197-
198206
@Override
199207
public void execute() {
200208
sendUniqueKeys();
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package io.split.client.impressions;
2+
3+
import io.split.client.dtos.UniqueKeys;
4+
import org.junit.Assert;
5+
import org.junit.Test;
6+
import org.mockito.Mockito;
7+
8+
import java.lang.reflect.Field;
9+
import java.lang.reflect.InvocationTargetException;
10+
import java.lang.reflect.Method;
11+
import java.util.ArrayList;
12+
import java.util.HashMap;
13+
import java.util.HashSet;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.concurrent.atomic.AtomicInteger;
17+
18+
public class UniqueKeysTrackerImpTest {
19+
private static UniqueKeysSender _uniqueKeysSender = Mockito.mock(UniqueKeysSender.class);
20+
21+
@Test
22+
public void addSomeElements(){
23+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_uniqueKeysSender, 10000, 10000, null);
24+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key1"));
25+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key2"));
26+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key3"));
27+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2","key4"));
28+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2","key5"));
29+
30+
HashMap<String, HashSet<String>> result = uniqueKeysTrackerImp.popAll();
31+
Assert.assertEquals(2,result.size());
32+
33+
HashSet<String> value1 = result.get("feature1");
34+
Assert.assertEquals(3,value1.size());
35+
Assert.assertTrue(value1.contains("key1"));
36+
Assert.assertTrue(value1.contains("key2"));
37+
Assert.assertTrue(value1.contains("key3"));
38+
39+
HashSet<String> value2 = result.get("feature2");
40+
Assert.assertEquals(2,value2.size());
41+
Assert.assertTrue(value2.contains("key4"));
42+
Assert.assertTrue(value2.contains("key5"));
43+
}
44+
45+
@Test
46+
public void addTheSameElements(){
47+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_uniqueKeysSender, 10000, 10000, null);
48+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key1"));
49+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key2"));
50+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key3"));
51+
52+
Assert.assertFalse(uniqueKeysTrackerImp.track("feature1","key1"));
53+
Assert.assertFalse(uniqueKeysTrackerImp.track("feature1","key2"));
54+
Assert.assertFalse(uniqueKeysTrackerImp.track("feature1","key3"));
55+
56+
HashMap<String, HashSet<String>> result = uniqueKeysTrackerImp.popAll();
57+
Assert.assertEquals(1,result.size());
58+
59+
HashSet<String> value1 = result.get("feature1");
60+
Assert.assertEquals(3,value1.size());
61+
Assert.assertTrue(value1.contains("key1"));
62+
Assert.assertTrue(value1.contains("key2"));
63+
Assert.assertTrue(value1.contains("key3"));
64+
}
65+
66+
@Test
67+
public void popAllUniqueKeys(){
68+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_uniqueKeysSender, 10000, 10000, null);
69+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key1"));
70+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key2"));
71+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2","key3"));
72+
73+
HashMap<String, HashSet<String>> result = uniqueKeysTrackerImp.popAll();
74+
Assert.assertEquals(2,result.size());
75+
HashMap<String, HashSet<String>> resultAfterPopAll = uniqueKeysTrackerImp.popAll();
76+
Assert.assertEquals(0,resultAfterPopAll.size());
77+
}
78+
79+
@Test
80+
public void testSynchronization() throws Exception {
81+
UniqueKeysSender sender = Mockito.mock(UniqueKeysSender.class);
82+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(sender, 1, 3, null);
83+
uniqueKeysTrackerImp.start();
84+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key1"));
85+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key2"));
86+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2","key3"));
87+
88+
Thread.sleep(2900);
89+
Mockito.verify(sender, Mockito.times(1)).send(Mockito.anyObject());
90+
Thread.sleep(2900);
91+
Mockito.verify(sender, Mockito.times(1)).send(Mockito.anyObject());
92+
}
93+
94+
@Test
95+
public void testStopSynchronization() throws Exception {
96+
UniqueKeysSender sender = Mockito.mock(UniqueKeysSender.class);
97+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(sender, 1, 2, null);
98+
uniqueKeysTrackerImp.start();
99+
Assert.assertFalse(uniqueKeysTrackerImp.getSendGuard().get());
100+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key1"));
101+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1","key2"));
102+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2","key3"));
103+
104+
Thread.sleep(2100);
105+
Mockito.verify(sender, Mockito.times(1)).send(Mockito.anyObject());
106+
uniqueKeysTrackerImp.stop();
107+
Mockito.verify(sender, Mockito.times(1)).send(Mockito.anyObject());
108+
}
109+
110+
@Test
111+
public void testUniqueKeysChunks() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
112+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(_uniqueKeysSender, 10000, 10000, null);
113+
HashMap<String, HashSet<String>> uniqueKeysHashMap = new HashMap<>();
114+
HashSet<String> feature1 = new HashSet<>();
115+
HashSet<String> feature2 = new HashSet<>();
116+
HashSet<String> feature3 = new HashSet<>();
117+
HashSet<String> feature4 = new HashSet<>();
118+
HashSet<String> feature5 = new HashSet<>();
119+
for (Integer i=1; i<6000; i++) {
120+
if (i <= 1000) {
121+
feature1.add("key" + i);
122+
}
123+
if (i <= 2000) {
124+
feature2.add("key" + i);
125+
}
126+
if (i <= 3000) {
127+
feature3.add("key" + i);
128+
}
129+
if (i <= 4000) {
130+
feature4.add("key" + i);
131+
}
132+
feature5.add("key" + i);
133+
}
134+
uniqueKeysHashMap.put("feature1", feature1);
135+
uniqueKeysHashMap.put("feature2", feature2);
136+
uniqueKeysHashMap.put("feature3", feature3);
137+
uniqueKeysHashMap.put("feature4", feature4);
138+
uniqueKeysHashMap.put("feature5", feature5);
139+
140+
List<UniqueKeys.UniqueKey> uniqueKeysFromPopAll = new ArrayList<>();
141+
for (Map.Entry<String, HashSet<String>> uniqueKeyEntry : uniqueKeysHashMap.entrySet()) {
142+
UniqueKeys.UniqueKey uniqueKey = new UniqueKeys.UniqueKey(uniqueKeyEntry.getKey(), new ArrayList<>(uniqueKeyEntry.getValue()));
143+
uniqueKeysFromPopAll.add(uniqueKey);
144+
}
145+
Method methodCapChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("capChunksToMaxSize", List.class);
146+
methodCapChunks.setAccessible(true);
147+
uniqueKeysFromPopAll = (List<UniqueKeys.UniqueKey>)methodCapChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);
148+
149+
Method methodGetChunks = uniqueKeysTrackerImp.getClass().getDeclaredMethod("getChunks", List.class);
150+
methodGetChunks.setAccessible(true);
151+
List<List<UniqueKeys.UniqueKey>> keysChunks = (List<List<UniqueKeys.UniqueKey>>) methodGetChunks.invoke(uniqueKeysTrackerImp, uniqueKeysFromPopAll);
152+
for (List<UniqueKeys.UniqueKey> chunk : keysChunks) {
153+
int chunkSize = 0;
154+
for (UniqueKeys.UniqueKey keys : chunk) {
155+
chunkSize += keys.keysDto.size();
156+
}
157+
Assert.assertTrue(chunkSize <= 5000);
158+
}
159+
}
160+
161+
@Test
162+
public void testTrackReachMaxKeys() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
163+
UniqueKeysSender sender = Mockito.mock(UniqueKeysSender.class);
164+
UniqueKeysTrackerImp uniqueKeysTrackerImp = new UniqueKeysTrackerImp(sender, 10000, 10000, null);
165+
for (int i=1; i<6000; i++) {
166+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature1", "key" + i));
167+
Assert.assertTrue(uniqueKeysTrackerImp.track("feature2", "key" + i));
168+
}
169+
Mockito.verify(sender, Mockito.times(2)).send(Mockito.anyObject());
170+
171+
Field getTrackerSize = uniqueKeysTrackerImp.getClass().getDeclaredField("trackerKeysSize");
172+
getTrackerSize.setAccessible(true);
173+
AtomicInteger trackerSize = (AtomicInteger) getTrackerSize.get(uniqueKeysTrackerImp);
174+
Assert.assertTrue(trackerSize.intValue() == 1998);
175+
}
176+
}

0 commit comments

Comments
 (0)