Sentinel 是一个面向分布式系统的流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来保护分布式架构下的微服务;下面将讲述一下我对 Sentinel 的核心设计思想的简单理解
1. Sentinel 核心设计 1.1 基本概念 Sentinel 以流量作为切入点进行控制保护,所谓流量 可以浅显的认为外部服务调用过来的请求或是对外部服务的请求;而被 Sentinel 保护的部分被定义成资源 ,资源可以是一个方法、一段代码;对于需要被保护的资源都有一套保护规则 ,保护规则定义了资源相关指标达到标准之后则开启相应的保护策略(限流、降级、熔断);其中,资源相关指标(单位时间内的QPS、平均RT、异常数等)需要被统计记录起来,以此为指标进行判断触发条件。
1.2 资源 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public abstract class ResourceWrapper { protected final String name; protected final EntryType entryType; protected final int resourceType; } public enum EntryType { IN, OUT; }
ResourceWrapper 对象示例是一个唯一标识 ID
1.2 流量定义 Sentinel 以 context
来代表链路上下文,维护整条链路中的所有 Entry
, Entry
则包含着资源相关的信息,例如资源信息(ResourceWrapper
)、统计数据(Node
) 、限流/降级/熔断判断链路(ProcessorSlot
)。
A. Context Sentinel 的 Context 是存放 ThreadLocal
中,借助 ThreadLocal
的特性,保证每个线程的资源统计数据互不干扰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ContextUtil { private static ThreadLocal<Context> contextHolder = new ThreadLocal <>(); } public static Context getContext () { return contextHolder.get(); } public static void exit () { Context context = contextHolder.get(); if (context != null && context.getCurEntry() == null ) { contextHolder.set(null ); } }
B. Entry context
代表是调用链路的上下文,Entry
则代表调用链路中某一个被保护的资源调用,一个 context
可能包含多个 Entry
; 原因是一个调用链路可能会调用多个资源,例如: 服务对外提供接口 A, A 内部实现依赖服务 1 和服务 2 提供的接口,如果我们分别对服务 1 和服务 2 进行限流,那么 A 的调用链路中会包含两个 Entry
,分别代表对服务 1 和服务 2 的调用;如下所示
1 2 3 context |—— Entry1 |—— Entry2
1 2 3 4 5 6 7 8 9 10 11 12 13 public abstract class Entry implements AutoCloseable { private Node curNode; private Node originNode; protected final ResourceWrapper resourceWrapper; protected ProcessorSlot<Object> chain; protected Context context; }
C. Node 记录资源相关的统计数据
1 2 3 4 5 6 public interface Node extends OccupySupport , DebugSupport { long totalRequest () ; long totalPass () ; long totalSuccess () ; }
Node 是一个接口,定义了基本的统计数据口径,包含 DefaultNode
、ClusterNode
、StatisticNode
、EntranceNode
四个实现类;
StatisticNode Node 的具体实现类,底层是通过滑动窗口 对相关调用信息进行记录
1 2 3 4 5 6 7 8 public class StatisticNode implements Node { private transient volatile Metric rollingCounterInSecond = new ArrayMetric (SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); private transient Metric rollingCounterInMinute = new ArrayMetric (60 , 60 * 1000 , false ); private LongAdder curThreadNum = new LongAdder (); }
滑动窗口是 Sentinel 统计的核心设计,Sentinel 通过滑动窗口对资源调用信息进行统计,以秒级、分钟级为单位进行统计,为后续的限流、降级、熔断提供数据支撑;我们稍微展开一下 Sentinel 是如何实现的滑动窗口。
滑动窗口 滑动窗口的核心概念就是:将固定单位时间划分若干个小单位时间,每一个小单位时间就是一个窗口;在 Sentinel 中,资源的调用相关的统计信息(调用成功、调用失败、调用总数等数据信息)就是存放在窗口中;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public abstract class LeapArray <T> { protected int windowLengthInMs; protected int sampleCount; protected int intervalInMs; private double intervalInSecond; protected final AtomicReferenceArray<WindowWrap<T>> array; private final ReentrantLock updateLock = new ReentrantLock (); }
LeapArray 定义了滑动窗口的基本信息,窗口个数、长度;可以看到 Sentinel 采用数组的形式来实现滑动窗口,每一个元素为 WindowWrap
类型
1 2 3 4 5 6 7 8 public class WindowWrap <T> { private final long windowLengthInMs; private long windowStart; private T value; }
WindowWrap
的 value 在 Sentinel 中资源的调用统计信息,MetricBucket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class MetricBucket { private final LongAdder[] counters; private volatile long minRt; public MetricBucket () { MetricEvent[] events = MetricEvent.values(); this .counters = new LongAdder [events.length]; for (MetricEvent event : events) { counters[event.ordinal()] = new LongAdder (); } initMinRt(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public enum MetricEvent { PASS, BLOCK, EXCEPTION, SUCCESS, RT, OCCUPIED_PASS }
可以看到 Sentinel 采用了非常巧妙的方式来存储不同类型的统计信息,通过 MetricEvent
枚举作为下标,从 counters
获取到对应的 LongAdder
。Sentinel 以流量作为切入点,其统计信息操作肯定是非常频繁的,在并发场景下,如何保证性能? Sentinel 通过引入 LongAdder
进行记录;LongAdder
是 JUC 下的一个包,其核心思想是通过 CAS + cell 数组提升并发性能,1. 通过 CAS 保证操作的原子性 2. 在存在并发的情况下,分散请求到不同的 cell CAS 更新以减少并发冲突。
当请求到来时,Sentinel 首先会先计算当前请求落在哪个小窗口中,获取到对应的 WindowWrap
,然后通过 MetricBucket
的 add()
方法来记录对应的统计信息,计算落在哪个小窗口中的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public WindowWrap<T> currentWindow (long timeMillis) { if (timeMillis < 0 ) { return null ; } int idx = calculateTimeIdx(timeMillis); long windowStart = calculateWindowStart(timeMillis); while (true ) { WindowWrap<T> old = array.get(idx); if (old == null ) { WindowWrap<T> window = new WindowWrap <T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null , window)) { return window; } else { Thread.yield (); } } else if (windowStart == old.windowStart()) { return old; } else if (windowStart > old.windowStart()) { if (updateLock.tryLock()) { try { return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { Thread.yield (); } } else if (windowStart < old.windowStart()) { return new WindowWrap <T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
非常通俗易懂;Sentinel 采用的是循环数组,通过 hash 计算下标循环利用空间;
DefaultNode DefaultNode 是 StatisticNode 的子类,DefaultNode 拥有资源信息(resourceHolder)
1 2 3 4 5 6 7 8 public class DefaultNode extends StatisticNode { private ResourceWrapper id; private volatile Set<Node> childList = new HashSet <>(); private ClusterNode clusterNode; }
childList
存储下级节点,形成资源调用链表,例如
1 2 3 4 5 EntranceNode(入口: /api) └── DefaultNode(资源A) ├── DefaultNode(资源B,上下文:资源A调用) └── DefaultNode(资源C,上下文:资源A调用) └── DefaultNode(资源D,上下文:资源C调用)
每一个节点都能够单独其所在上下文context
的调用情况
ClusterNode ClusterNode 是 StatisticNode 的子类,同一个资源可能同时有多个调用链路,例如 context1、content2 同时使用了资源,两个上下文中分别会有 DefaultNode
记录其相关信息,此时为了方便统计整体资源的调用数据;就需要遍历资源所有的 DefaultNode
,无疑这是非常损耗性能的,所以 Sentinel 定义了 ClusterNode,ClusterNode 存储了所有 DefaultNode 的统计信息,通过 ClusterNode
统计资源调用情况,而不需要遍历所有 DefaultNode
;
1 2 3 4 public class ClusterNode extends StatisticNode { private final String name; private final int resourceType; }
1 2 3 4 5 6 7 8 9 10 11 12 13 public class DefaultNode extends StatisticNode { @Override public void increaseBlockQps (int count) { super .increaseBlockQps(count); this .clusterNode.increaseBlockQps(count); } @Override public void increaseExceptionQps (int count) { super .increaseExceptionQps(count); this .clusterNode.increaseExceptionQps(count); } }
可以看到 DefaultNode
在 increase
的时候,会对 clusterNode
也进行 increase
EntranceNode EntranceNode
是一个特殊的 DefaultNode
,其作为调用链路的入口节点,子节点为 DefaultNode
; 主要是用来记录流量入口,使 Sentinel 能够做到入口级别操作(统计/流控/其他)
1 2 3 4 5 6 public class EntranceNode extends DefaultNode { public EntranceNode (ResourceWrapper id, ClusterNode clusterNode) { super (id, clusterNode); } }
特别的, Sentinel 定义了 Root EntranceNode
类型,作为 Sentinel 的入口节点,所有资源调用链路都从该节点开始;
1 2 3 4 5 public final static DefaultNode ROOT = new EntranceNode (new StringResourceWrapper (ROOT_ID, EntryType.IN), new ClusterNode (ROOT_ID, ResourceTypeConstants.COMMON));
D. 小结 上述 context
、Entry
、Node
都是 Sentinel 中非常重要的概念,Sentinel 使用 context
、Entry
、Node
来实现对流量的定义、信息统计,为后续的操作提供数据支撑。
context
: 线程维度的调用上下文,存储当前调用链的入口节点、调用来源等信息,绑定在 ThreadLocal
Entry
: 资源的调用上下文,包含资源名称、创建时间等元数据,每次资源调用都会创建新 Entry
Node
: 记录资源调用数据的统计节点(如QPS、RT),底层实现是滑动窗口
1.3 限流/降级/熔断的实现 上面解析了 Sentinel 是如何对流量进行定义,统计相关信息,下面我们看看 Sentinel 是如何对请求进行封装、统计,然后实现对流量的限流/降级/熔断等功能。
ProcessorSlot Sentinel 定义了 ProcessorSlot
接口,是 Sentinel 实现限流、降级、熔断等功能的核心;从功能分类来看,有两类 ProcessorSlot
:
构建资源统计信息 slot
实现具体功能的 slot、例如:限流/降级/熔断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface ProcessorSlot <T> { void entry (Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; void fireEntry (Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable; void exit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) ; void fireExit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public abstract class AbstractLinkedProcessorSlot <T> implements ProcessorSlot <T> { private AbstractLinkedProcessorSlot<?> next = null ; @Override public void fireEntry (Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null ) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") void transformEntry (Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } @Override public void fireExit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) { if (next != null ) { next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot<?> getNext() { return next; } public void setNext (AbstractLinkedProcessorSlot<?> next) { this .next = next; } }
AbstractLinkedProcessorSlot
抽象类封装了ProcessorSlot
公共的行为。
ProcessorSlotChain 1 2 3 4 public abstract class ProcessorSlotChain extends AbstractLinkedProcessorSlot <Object> { public abstract void addFirst (AbstractLinkedProcessorSlot<?> protocolProcessor) ; public abstract void addLast (AbstractLinkedProcessorSlot<?> protocolProcessor) ; }
ProcessorSlotChain
继承了 AbstractLinkedProcessorSlot
,实现了 addFirst
、addLast
方法,分别用于添加 ProcessorSlot
到链表头、链表尾;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot <Object>() { @Override public void entry (Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super .fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super .fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst (AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast (AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } @Override public void setNext (AbstractLinkedProcessorSlot<?> next) { addLast(next); } @Override public AbstractLinkedProcessorSlot<?> getNext() { return first.getNext(); } @Override public void entry (Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { first.transformEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) { first.exit(context, resourceWrapper, count, args); } }
DefaultProcessorSlotChain
为 Sentinel 责任链的默认实现,实现方式为单向链表
ProcessorSlotChain 的构建时机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build () { ProcessorSlotChain chain = new DefaultProcessorSlotChain (); List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain" ); continue ; } chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }
可以看到是 spi 的形式加载 ProcessorSlot
,提升了灵活性
1 2 3 4 5 6 7 8 9 10 # Sentinel default ProcessorSlots com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot com.alibaba.csp.sentinel.slots.logger.LogSlot com.alibaba.csp.sentinel.slots.statistic.StatisticSlot com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot com.alibaba.csp.sentinel.slots.system.SystemSlot com.alibaba.csp.sentinel.slots.block.flow.FlowSlot com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot com.alibaba.csp.sentinel.slots.block.degrade.DefaultCircuitBreakerSlot
上述是默认的 ProcessorSlotChain
的创建顺序
NodeSelectorSlot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class NodeSelectorSlot extends AbstractLinkedProcessorSlot <Object> { private volatile Map<String, DefaultNode> map = new HashMap <String, DefaultNode>(10 ); public void entry (Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); if (node == null ) { synchronized (this ) { node = map.get(context.getName()); if (node == null ) { node = new DefaultNode (resourceWrapper, null ); HashMap<String, DefaultNode> cacheMap = new HashMap <String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; ((DefaultNode) context.getLastNode()).addChild(node); } } } context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
NodeSelectorSlot
的职责是根据 context
构建 DefaultNode
, 请求进入到 NodeSelectorSlot
后,会根据 context
获取到资源的 DefaultNode
并且向后传递
ClusterBuilderSlot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot <DefaultNode> { private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap <>(); @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { if (clusterNode == null ) { synchronized (lock) { if (clusterNode == null ) { clusterNode = new ClusterNode (resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap <>(Math.max(clusterNodeMap.size(), 16 )); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; } } } node.setClusterNode(clusterNode); if (!"" .equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); } }
ClusterBuilderSlot
的指责则是创建 ClusterNode
往后传递,非常容易理解
StatisticSlot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 public class StatisticSlot extends AbstractLinkedProcessorSlot <DefaultNode> { @Override public void entry (Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { fireEntry(context, resourceWrapper, node, count, prioritized, args); node.increaseThreadNum(); node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null ) { context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() != null ) { context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); } for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { context.getCurEntry().setBlockError(e); node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() != null ) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseBlockQps(count); } for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { context.getCurEntry().setError(e); throw e; } } @Override public void exit (Context context, ResourceWrapper resourceWrapper, int count, Object... args) { Node node = context.getCurNode(); if (context.getCurEntry().getBlockError() == null ) { long completeStatTime = TimeUtil.currentTimeMillis(); context.getCurEntry().setCompleteTimestamp(completeStatTime); long rt = completeStatTime - context.getCurEntry().getCreateTimestamp(); Throwable error = context.getCurEntry().getError(); recordCompleteFor(node, count, rt, error); recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error); if (resourceWrapper.getEntryType() == EntryType.IN) { recordCompleteFor(Constants.ENTRY_NODE, count, rt, error); } } Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks(); for (ProcessorSlotExitCallback handler : exitCallbacks) { handler.onExit(context, resourceWrapper, count, args); } fireExit(context, resourceWrapper, count, args); } }
StatisticSlot
是 Sentinel ProcessorSlotChain
的核心,包含了数据信息统计,判断是否被流控,以及完成一些回调钩子 ProcessorSlotEntryCallback
。
可以看到 StatisticSlot
作为 Sentinel ProcessorSlotChain
的核心 slot,起着承上启下的作用,在 StatisticSlot
前面的 processorSlot
是构建/获取 DefaultNode
、ClusterNode
,为后面的 processorSlot
的限流/降级/熔断提供数据支撑;真正决定流量是否通行的由 StatisticSlot
之后的 processSlot
决定。