【源码系列】Sentinel 的核心设计与代码实现

Sentinel 是一个面向分布式系统的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来保护分布式架构下的微服务;下面将讲述一下我对 Sentinel 的核心设计思想的简单理解

1. Sentinel 核心设计

1.1 基本概念

Sentinel 以流量作为切入点进行控制保护,所谓流量可以浅显的认为外部服务调用过来的请求或是对外部服务的请求;而被 Sentinel 保护的部分被定义成资源,资源可以是一个方法、一段代码;对于需要被保护的资源都有一套保护规则,保护规则定义了资源相关指标达到标准之后则开启相应的保护策略(限流、降级、熔断);其中,资源相关指标(单位时间内的QPS、平均RT、异常数等)需要被统计记录起来,以此为指标进行判断触发条件。

1.2 资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class ResourceWrapper {
// 资源名
protected final String name;
// 外部服务调用过来的请求(IN)还是对外部服务的请求(OUT)
protected final EntryType entryType;
// 资源类型,例如 dubbo rpc 或者 web http
protected final int resourceType;
}

public enum EntryType {
/**
* Inbound traffic
*/
IN,
/**
* Outbound traffic
*/
OUT;
}

ResourceWrapper 对象示例是一个唯一标识 ID

1.2 流量定义

Sentinel 以 context 来代表链路上下文,维护整条链路中的所有 EntryEntry 则包含着资源相关的信息,例如资源信息(ResourceWrapper)、统计数据(Node) 、限流/降级/熔断判断链路(ProcessorSlot)。

A. Context

Sentinel 的 Context 是存放 ThreadLocal 中,借助 ThreadLocal 的特性,保证每个线程的资源统计数据互不干扰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ContextUtil {

/**
* Store the context in ThreadLocal for easy access.
*/
private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
}

public static Context getContext() {
return contextHolder.get();
}

// 退出的时候,手动设置 value 为 null 避免内存泄漏
public static void exit() {
Context context = contextHolder.get();
if (context != null && context.getCurEntry() == null) {
contextHolder.set(null);
}
}

B. Entry

context 代表是调用链路的上下文,Entry 则代表调用链路中某一个被保护的资源调用,一个 context 可能包含多个 Entry; 原因是一个调用链路可能会调用多个资源,例如:
服务对外提供接口 A, A 内部实现依赖服务 1 和服务 2 提供的接口,如果我们分别对服务 1 和服务 2 进行限流,那么 A 的调用链路中会包含两个 Entry,分别代表对服务 1 和服务 2 的调用;如下所示

1
2
3
context
|—— Entry1
|—— Entry2
1
2
3
4
5
6
7
8
9
10
11
12
13
// Entry 部分源码
public abstract class Entry implements AutoCloseable {
// 资源相关的统计数据
private Node curNode;
// 调用来源相关的统计数据
private Node originNode;
// 资源
protected final ResourceWrapper resourceWrapper;
// 处理链
protected ProcessorSlot<Object> chain;
// 链路上下文
protected Context context;
}

C. Node

记录资源相关的统计数据

1
2
3
4
5
6
// 部分源码
public interface Node extends OccupySupport, DebugSupport {
long totalRequest();
long totalPass();
long totalSuccess();
}

Node 是一个接口,定义了基本的统计数据口径,包含 DefaultNodeClusterNodeStatisticNodeEntranceNode 四个实现类;

StatisticNode

Node 的具体实现类,底层是通过滑动窗口对相关调用信息进行记录

1
2
3
4
5
6
7
8
public class StatisticNode implements Node {
// 秒级滑动窗口
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
// 分钟级滑动窗口
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
// CAS + cell 数组提升并发性能
private LongAdder curThreadNum = new LongAdder();
}

滑动窗口是 Sentinel 统计的核心设计,Sentinel 通过滑动窗口对资源调用信息进行统计,以秒级、分钟级为单位进行统计,为后续的限流、降级、熔断提供数据支撑;我们稍微展开一下 Sentinel 是如何实现的滑动窗口。

滑动窗口

滑动窗口的核心概念就是:将固定单位时间划分若干个小单位时间,每一个小单位时间就是一个窗口;在 Sentinel 中,资源的调用相关的统计信息(调用成功、调用失败、调用总数等数据信息)就是存放在窗口中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Sentinel 滑动窗口核心定义
public abstract class LeapArray<T> {
// 窗口长度
protected int windowLengthInMs;
// 窗口数量
protected int sampleCount;
// 滑动窗口总长度(单位: ms) = 窗口长度 * 窗口数量
protected int intervalInMs;
// 滑动窗口总长度(单位: s)
private double intervalInSecond;
// 窗口数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
// 可重入锁
private final ReentrantLock updateLock = new ReentrantLock();
}

LeapArray 定义了滑动窗口的基本信息,窗口个数、长度;可以看到 Sentinel 采用数组的形式来实现滑动窗口,每一个元素为 WindowWrap 类型

1
2
3
4
5
6
7
8
public class WindowWrap<T> {
// 窗口长度
private final long windowLengthInMs;
// 窗口开始时间
private long windowStart;
// 窗口内存放的数据
private T value;
}

WindowWrap 的 value 在 Sentinel 中资源的调用统计信息,MetricBucket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MetricBucket {
// 统计信息数组
private final LongAdder[] counters;
// 最小 rt
private volatile long minRt;

public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public enum MetricEvent {

/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,

/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}

可以看到 Sentinel 采用了非常巧妙的方式来存储不同类型的统计信息,通过 MetricEvent 枚举作为下标,从 counters 获取到对应的 LongAdder。Sentinel 以流量作为切入点,其统计信息操作肯定是非常频繁的,在并发场景下,如何保证性能? Sentinel 通过引入 LongAdder 进行记录;LongAdder 是 JUC 下的一个包,其核心思想是通过 CAS + cell 数组提升并发性能,1. 通过 CAS 保证操作的原子性 2. 在存在并发的情况下,分散请求到不同的 cell CAS 更新以减少并发冲突。


当请求到来时,Sentinel 首先会先计算当前请求落在哪个小窗口中,获取到对应的 WindowWrap,然后通过 MetricBucketadd() 方法来记录对应的统计信息,计算落在哪个小窗口中的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}

int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);

/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

非常通俗易懂;Sentinel 采用的是循环数组,通过 hash 计算下标循环利用空间;

DefaultNode

DefaultNode 是 StatisticNode 的子类,DefaultNode 拥有资源信息(resourceHolder)

1
2
3
4
5
6
7
8
public class DefaultNode extends StatisticNode {
// 资源
private ResourceWrapper id;
// 下级节点
private volatile Set<Node> childList = new HashSet<>();
// 全局统计信息
private ClusterNode clusterNode;
}

childList 存储下级节点,形成资源调用链表,例如

1
2
3
4
5
EntranceNode(入口: /api)  
└── DefaultNode(资源A)
├── DefaultNode(资源B,上下文:资源A调用)
└── DefaultNode(资源C,上下文:资源A调用)
└── DefaultNode(资源D,上下文:资源C调用)

每一个节点都能够单独其所在上下文context的调用情况

ClusterNode

ClusterNode 是 StatisticNode 的子类,同一个资源可能同时有多个调用链路,例如 context1、content2 同时使用了资源,两个上下文中分别会有 DefaultNode 记录其相关信息,此时为了方便统计整体资源的调用数据;就需要遍历资源所有的 DefaultNode,无疑这是非常损耗性能的,所以 Sentinel 定义了 ClusterNode,ClusterNode 存储了所有 DefaultNode 的统计信息,通过 ClusterNode 统计资源调用情况,而不需要遍历所有 DefaultNode

1
2
3
4
public class ClusterNode extends StatisticNode {
private final String name;
private final int resourceType;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class DefaultNode extends StatisticNode {
// 省略部分源码
@Override
public void increaseBlockQps(int count) {
super.increaseBlockQps(count);
this.clusterNode.increaseBlockQps(count);
}
@Override
public void increaseExceptionQps(int count) {
super.increaseExceptionQps(count);
this.clusterNode.increaseExceptionQps(count);
}
}

可以看到 DefaultNodeincrease 的时候,会对 clusterNode 也进行 increase

EntranceNode

EntranceNode 是一个特殊的 DefaultNode,其作为调用链路的入口节点,子节点为 DefaultNode; 主要是用来记录流量入口,使 Sentinel 能够做到入口级别操作(统计/流控/其他)

1
2
3
4
5
6
public class EntranceNode extends DefaultNode {

public EntranceNode(ResourceWrapper id, ClusterNode clusterNode) {
super(id, clusterNode);
}
}

特别的, Sentinel 定义了 Root EntranceNode 类型,作为 Sentinel 的入口节点,所有资源调用链路都从该节点开始;

1
2
3
4
5
/**
* Global ROOT statistic node that represents the universal parent node.
*/
public final static DefaultNode ROOT = new EntranceNode(new StringResourceWrapper(ROOT_ID, EntryType.IN),
new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON));

D. 小结

上述 contextEntryNode 都是 Sentinel 中非常重要的概念,Sentinel 使用 contextEntryNode 来实现对流量的定义、信息统计,为后续的操作提供数据支撑。

  • context: 线程维度的调用上下文,存储当前调用链的入口节点、调用来源等信息,绑定在 ThreadLocal
  • Entry: 资源的调用上下文,包含资源名称、创建时间等元数据,每次资源调用都会创建新 Entry
  • Node: 记录资源调用数据的统计节点(如QPS、RT),底层实现是滑动窗口

1.3 限流/降级/熔断的实现

上面解析了 Sentinel 是如何对流量进行定义,统计相关信息,下面我们看看 Sentinel 是如何对请求进行封装、统计,然后实现对流量的限流/降级/熔断等功能。

ProcessorSlot

Sentinel 定义了 ProcessorSlot 接口,是 Sentinel 实现限流、降级、熔断等功能的核心;从功能分类来看,有两类 ProcessorSlot

  1. 构建资源统计信息 slot
  2. 实现具体功能的 slot、例如:限流/降级/熔断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ProcessorSlot<T> {
// 进入 slot
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;

// 进入下一个 slot
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;

// 退出 slot
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

// 退出下一个 slot
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}

@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}

@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}

public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}

public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}

AbstractLinkedProcessorSlot 抽象类封装了ProcessorSlot 公共的行为。

ProcessorSlotChain
1
2
3
4
public abstract class ProcessorSlotChain extends AbstractLinkedProcessorSlot<Object> {
public abstract void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor);
public abstract void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor);
}

ProcessorSlotChain 继承了 AbstractLinkedProcessorSlot,实现了 addFirstaddLast 方法,分别用于添加 ProcessorSlot 到链表头、链表尾;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// first 空实现,作为 ProcessSlotChain 链的头节点
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
// ProcessSlotChain 链的尾节点
AbstractLinkedProcessorSlot<?> end = first;

@Override
// 将 processSlot 放到责任链的首位
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}

@Override
// 新增 ProcessSlot 到责任链的尾部
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}

@Override
public void setNext(AbstractLinkedProcessorSlot<?> next) {
addLast(next);
}

@Override
public AbstractLinkedProcessorSlot<?> getNext() {
return first.getNext();
}

// 从责任链第一个 ProcessSlot 开始执行 entry 逻辑
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}

// 从责任链第一个 ProcessSlot 开始执行 exit 逻辑
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
first.exit(context, resourceWrapper, count, args);
}
}

DefaultProcessorSlotChain 为 Sentinel 责任链的默认实现,实现方式为单向链表

ProcessorSlotChain 的构建时机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {

@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// spi 的方式加载 ProcessorSlot
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}

chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}

可以看到是 spi 的形式加载 ProcessorSlot,提升了灵活性

1
2
3
4
5
6
7
8
9
10
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
com.alibaba.csp.sentinel.slots.block.degrade.DefaultCircuitBreakerSlot

上述是默认的 ProcessorSlotChain 的创建顺序

NodeSelectorSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
// key -> value : context -> defaultNode
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

// entry 逻辑
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 构建 DefaultNode
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// 构建调用树
((DefaultNode) context.getLastNode()).addChild(node);
}

}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}

NodeSelectorSlot 的职责是根据 context 构建 DefaultNode, 请求进入到 NodeSelectorSlot 后,会根据 context 获取到资源的 DefaultNode 并且向后传递

ClusterBuilderSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// key -> value: 资源 -> ClusterNode
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);

clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);

if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}

ClusterBuilderSlot 的指责则是创建 ClusterNode 往后传递,非常容易理解

StatisticSlot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// StatisticSlot 先执行后续的 processorSlot 的执行逻辑检查,例如是否需要被限流、降级、熔断
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果没有报错,即代表流量放行,记录相关信息到滑动窗口中
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 流量放行,执行提前注册好的一些回调方法
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// 流量不允许通过
context.getCurEntry().setBlockError(e);
// 记录相关统计信息到滑动窗口中
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// 流量不允许通过,执行提前注册好的一些回调方法
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
// 最后向上抛出异常
throw e;
} catch (Throwable e) {
// 未知错误
context.getCurEntry().setError(e);

throw e;
}
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// 记录执行 rt 相关统计信息
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

Throwable error = context.getCurEntry().getError();
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}

// 流量执行完毕,执行提前注册好的一些回调方法
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}

fireExit(context, resourceWrapper, count, args);
}
}

StatisticSlot 是 Sentinel ProcessorSlotChain 的核心,包含了数据信息统计,判断是否被流控,以及完成一些回调钩子 ProcessorSlotEntryCallback

可以看到 StatisticSlot 作为 Sentinel ProcessorSlotChain 的核心 slot,起着承上启下的作用,在 StatisticSlot 前面的 processorSlot 是构建/获取 DefaultNodeClusterNode,为后面的 processorSlot 的限流/降级/熔断提供数据支撑;真正决定流量是否通行的由 StatisticSlot 之后的 processSlot 决定。