Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<!-- Compile libs -->
<fastjson.version>1.2.83_noneautotype</fastjson.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<caffeine.version>2.9.3</caffeine.version>

<!-- Test libs -->
<junit.version>4.12</junit.version>
Expand Down Expand Up @@ -219,6 +220,11 @@
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

/**
* We use TokenCacheNodeManager to store the tokenId, whose the underlying storage structure
* is ConcurrentLinkedHashMap, Its storage node is TokenCacheNode. In order to operate the nowCalls value when
* is Caffeine, Its storage node is TokenCacheNode. In order to operate the nowCalls value when
* the expired tokenId is deleted regularly, we need to store the flowId in TokenCacheNode.
*
* @author yunfeiyanggzq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.expire.RegularExpireStrategy;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.CaffeineCacheMapWrapper;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.Weighers;

import java.util.Set;

/**
* @author yunfeiyanggzq
*/
public class TokenCacheNodeManager {
private static ConcurrentLinkedHashMap<Long, TokenCacheNode> TOKEN_CACHE_NODE_MAP;
private static CacheMap<Long, TokenCacheNode> TOKEN_CACHE_NODE_MAP;


private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
Expand All @@ -40,20 +40,15 @@ public static void prepare(int concurrencyLevel, int maximumWeightedCapacity) {
AssertUtil.isTrue(concurrencyLevel > 0, "concurrencyLevel must be positive");
AssertUtil.isTrue(maximumWeightedCapacity > 0, "maximumWeightedCapacity must be positive");

TOKEN_CACHE_NODE_MAP = new ConcurrentLinkedHashMap.Builder<Long, TokenCacheNode>()
.concurrencyLevel(concurrencyLevel)
.maximumWeightedCapacity(maximumWeightedCapacity)
.weigher(Weighers.singleton())
.build();
TOKEN_CACHE_NODE_MAP = new CaffeineCacheMapWrapper<>(maximumWeightedCapacity);
// Start the task of regularly clearing expired keys
RegularExpireStrategy strategy = new RegularExpireStrategy(TOKEN_CACHE_NODE_MAP);
strategy.startClearTaskRegularly();
}


public static TokenCacheNode getTokenCacheNode(long tokenId) {
//use getQuietly to prevent disorder
return TOKEN_CACHE_NODE_MAP.getQuietly(tokenId);
return TOKEN_CACHE_NODE_MAP.get(tokenId);
}

public static void putTokenCacheNode(long tokenId, TokenCacheNode cacheNode) {
Expand All @@ -69,7 +64,7 @@ public static TokenCacheNode removeTokenCacheNode(long tokenId) {
}

public static int getSize() {
return TOKEN_CACHE_NODE_MAP.size();
return (int) TOKEN_CACHE_NODE_MAP.size();
}

public static Set<Long> getCacheKeySet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.alibaba.csp.sentinel.cluster.server.connection.ConnectionManager;
import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.util.AssertUtil;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -62,14 +62,14 @@ public class RegularExpireStrategy implements ExpireStrategy {
/**
* the local cache of tokenId
*/
private ConcurrentLinkedHashMap<Long, TokenCacheNode> localCache;
private CacheMap<Long, TokenCacheNode> localCache;

@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("regular clear expired token thread", true));


public RegularExpireStrategy(ConcurrentLinkedHashMap<Long, TokenCacheNode> localCache) {
public RegularExpireStrategy(CacheMap<Long, TokenCacheNode> localCache) {
AssertUtil.isTrue(localCache != null, " local cache can't be null");
this.localCache = localCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.alibaba.csp.sentinel.slots.statistic.base.LeapArray;
import com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.CaffeineCacheMapWrapper;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
Expand All @@ -38,7 +38,7 @@ public ClusterParameterLeapArray(int sampleCount, int intervalInMs, int maxCapac

@Override
public CacheMap<Object, C> newEmptyBucket(long timeMillis) {
return new ConcurrentLinkedHashMapWrapper<>(maxCapacity);
return new CaffeineCacheMapWrapper<>(maxCapacity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent;

import com.alibaba.csp.sentinel.cluster.flow.rule.ClusterFlowRuleManager;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNode;
import com.alibaba.csp.sentinel.cluster.flow.statistic.concurrent.TokenCacheNodeManager;
import com.alibaba.csp.sentinel.slots.block.ClusterRuleConstant;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.ClusterFlowConfig;
Expand Down
5 changes: 2 additions & 3 deletions sentinel-extension/sentinel-parameter-flow-control/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
</dependency>

<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
<version>1.4.2</version>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import com.alibaba.csp.sentinel.log.RecordLog;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.CaffeineCacheMapWrapper;

/**
* Metrics for frequent ("hot spot") parameters.
Expand Down Expand Up @@ -97,7 +97,7 @@ public void initialize(ParamFlowRule rule) {
synchronized (lock) {
if (ruleTimeCounters.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
ruleTimeCounters.put(rule, new CaffeineCacheMapWrapper<>(size));
}
}
}
Expand All @@ -106,7 +106,7 @@ public void initialize(ParamFlowRule rule) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
ruleTokenCounter.put(rule, new CaffeineCacheMapWrapper<>(size));
}
}
}
Expand All @@ -115,7 +115,7 @@ public void initialize(ParamFlowRule rule) {
synchronized (lock) {
if (threadCountMap.get(rule.getParamIdx()) == null) {
threadCountMap.put(rule.getParamIdx(),
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
new CaffeineCacheMapWrapper<>(THREAD_COUNT_MAX_CAPACITY));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ public interface CacheMap<K, V> {
void clear();

Set<K> keySet(boolean ascending);

default Set<K> keySet() {
return keySet(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,36 @@
*/
package com.alibaba.csp.sentinel.slots.statistic.cache;

import java.util.Set;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.googlecode.concurrentlinkedhashmap.Weighers;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* A {@link ConcurrentLinkedHashMap} wrapper for the universal {@link CacheMap}.
* A {@link Cache} wrapper for the universal {@link CacheMap}.
*
* @author shaoqiangyan
*
* @author Eric Zhao
* @since 0.2.0
*/
public class ConcurrentLinkedHashMapWrapper<T, R> implements CacheMap<T, R> {
public class CaffeineCacheMapWrapper<T, R> implements CacheMap<T, R> {

private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

private final ConcurrentLinkedHashMap<T, R> map;
private final Cache<T, R> map;

public ConcurrentLinkedHashMapWrapper(long size) {
public CaffeineCacheMapWrapper(long size) {
if (size <= 0) {
throw new IllegalArgumentException("Cache max capacity should be positive: " + size);
}
this.map = new ConcurrentLinkedHashMap.Builder<T, R>()
.concurrencyLevel(DEFAULT_CONCURRENCY_LEVEL)
.maximumWeightedCapacity(size)
.weigher(Weighers.singleton())
.build();
this.map = Caffeine.newBuilder()
.maximumSize(size)
.build();
}

public ConcurrentLinkedHashMapWrapper(ConcurrentLinkedHashMap<T, R> map) {
public CaffeineCacheMapWrapper(Cache<T, R> map) {
if (map == null) {
throw new IllegalArgumentException("Invalid map instance");
}
Expand All @@ -52,45 +53,52 @@ public ConcurrentLinkedHashMapWrapper(ConcurrentLinkedHashMap<T, R> map) {

@Override
public boolean containsKey(T key) {
return map.containsKey(key);
return asMap().containsKey(key);
}

@Override
public R get(T key) {
return map.get(key);
return map.getIfPresent(key);
}

@Override
public R remove(T key) {
return map.remove(key);
return asMap().remove(key);
}

@Override
public R put(T key, R value) {
return map.put(key, value);
return asMap().put(key, value);
}

@Override
public R putIfAbsent(T key, R value) {
return map.putIfAbsent(key, value);
return asMap().putIfAbsent(key, value);
}

@Override
public long size() {
return map.weightedSize();
return asMap().size();
}

@Override
public void clear() {
map.clear();
map.invalidateAll();
}

@Override
@SuppressWarnings("unchecked")
public Set<T> keySet(boolean ascending) {
Comparator<? super T> comparator;
if (ascending) {
return map.ascendingKeySet();
comparator = (Comparator<? super T>) Comparator.naturalOrder();
} else {
return map.descendingKeySet();
comparator = (Comparator<? super T>) Comparator.reverseOrder();
}
return asMap().keySet().stream().sorted(comparator).collect(Collectors.toCollection(LinkedHashSet::new));
}

private ConcurrentMap<T, R> asMap() {
return map.asMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.alibaba.csp.sentinel.slots.block.flow.param.RollingParamEvent;
import com.alibaba.csp.sentinel.slots.statistic.cache.CacheMap;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.CaffeineCacheMapWrapper;
import com.alibaba.csp.sentinel.util.AssertUtil;

/**
Expand All @@ -43,7 +43,7 @@ public ParamMapBucket(int capacity) {
RollingParamEvent[] events = RollingParamEvent.values();
this.data = new CacheMap[events.length];
for (RollingParamEvent event : events) {
data[event.ordinal()] = new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(capacity);
data[event.ordinal()] = new CaffeineCacheMapWrapper<>(capacity);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.slotchain.StringResourceWrapper;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.statistic.cache.ConcurrentLinkedHashMapWrapper;
import com.alibaba.csp.sentinel.slots.statistic.cache.CaffeineCacheMapWrapper;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.junit.After;
import org.junit.Before;
Expand All @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -89,7 +88,7 @@ public void testSingleValueCheckQpsWithExceptionItems() throws InterruptedExcept

ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTimeCounterMap().put(rule, new CaffeineCacheMapWrapper<>(4000));

assertTrue(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueA));
assertFalse(ParamFlowChecker.passSingleValueCheck(resourceWrapper, rule, 1, valueB));
Expand Down Expand Up @@ -157,8 +156,8 @@ public void testPassLocalCheckForCollection() throws InterruptedException {
List<String> list = Arrays.asList(v1, v2, v3);
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTimeCounterMap().put(rule, new CaffeineCacheMapWrapper<>(4000));
metric.getRuleTokenCounterMap().put(rule, new CaffeineCacheMapWrapper<>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, list));
Expand All @@ -180,7 +179,7 @@ public void testPassLocalCheckForArray() throws InterruptedException {
Object arr = new String[]{v1, v2, v3};
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTimeCounterMap().put(rule, new CaffeineCacheMapWrapper<>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, arr));
Expand Down Expand Up @@ -214,8 +213,8 @@ public Object paramFlowKey() {
Object[] args = new Object[]{new User(1, "Bob", "Hangzhou"), 10, "Demo"};
ParameterMetric metric = new ParameterMetric();
ParameterMetricStorage.getMetricsMap().put(resourceWrapper.getName(), metric);
metric.getRuleTimeCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTokenCounterMap().put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(4000));
metric.getRuleTimeCounterMap().put(rule, new CaffeineCacheMapWrapper<>(4000));
metric.getRuleTokenCounterMap().put(rule, new CaffeineCacheMapWrapper<>(4000));

assertTrue(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args));
assertFalse(ParamFlowChecker.passCheck(resourceWrapper, rule, 1, args));
Expand Down
Loading