09 资源指标数据统计的实现全解析资源指标数据统计StatisticSlotStatisticSlot 才是实现资源各项指标数据统计的 ProcessorSlot它与 NodeSelectorSlot、ClusterBuilderSlot 组成了资源指标数据统计流水线分工明确。首先 NodeSelectorSlot 为资源创建 DefaultNode将 DefaultNode 向下传递ClusterBuilderSlot 负责给资源的 DefaultNode 加工添加 ClusterNode 这个零部件再将 DefaultNode 向下传递给 StatisticSlot如下图所示StatisticSlot 在统计指标数据之前会先调用后续的 ProcessorSlot根据后续 ProcessorSlot 判断是否需要拒绝该请求的结果决定记录哪些指标数据这也是为什么 Sentinel 设计的责任链需要由前一个 ProcessorSlot 在 entry 或者 exit 方法中调用 fireEntry 或者 fireExit 完成调用下一个 ProcessorSlot 的 entry 或 exit 方法而不是使用 for 循环遍历调用 ProcessorSlot 的原因。每个 ProcessorSlot 都有权决定是先等后续的 ProcessorSlot 执行完成再做自己的事情还是先完成自己的事情再让后续 ProcessorSlot 执行与流水线有所区别。StatisticSlot 源码框架如下public class StatisticSlot extends AbstractLinkedProcessorSlotDefaultNode { Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args); // ..... } catch (PriorityWaitException ex) { // ..... } catch (BlockException e) { // .... throw e; } catch (Throwable e) { // ..... throw e; } } Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { DefaultNode node (DefaultNode)context.getCurNode(); // .... fireExit(context, resourceWrapper, count); } }entry先调用 fireEntry 方法完成调用后续的 ProcessorSlot#entry 方法根据后续的 ProcessorSlot 是否抛出 BlockException 决定记录哪些指标数据并将资源并行占用的线程数加 1。exit若无任何异常则记录响应成功、请求执行耗时将资源并行占用的线程数减 1。entry 方法第一种情况当后续的 ProcessorSlot 未抛出任何异常时表示不需要拒绝该请求放行当前请求。当请求可正常通过时需要将当前资源并行占用的线程数增加 1、当前时间窗口被放行的请求总数加 1代码如下// Request passed, add thread count and pass count. node.increaseThreadNum(); node.addPassRequest(count);如果调用来源不为空也将调用来源的 StatisticNode 的当前并行占用线程数加 1、当前时间窗口被放行的请求数加 1代码如下if (context.getCurEntry().getOriginNode() ! null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); }如果流量类型为 IN则将资源全局唯一的 ClusterNode 的并行占用线程数、当前时间窗口被放行的请求数都增加 1代码如下if (resourceWrapper.getEntryType() EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); }回调所有 ProcessorSlotEntryCallback#onPass 方法代码如下// Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); }可调用 StatisticSlotCallbackRegistry#addEntryCallback 静态方法注册 ProcessorSlotEntryCallbackProcessorSlotEntryCallback 接口的定义如下public interface ProcessorSlotEntryCallbackT { void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception; void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args); }onPass该方法在请求被放行时被回调执行。onBlocked该方法在请求被拒绝时被回调执行。第二种情况捕获到类型为 PriorityWaitException 的异常。这是特殊情况在需要对请求限流时只有使用默认流量效果控制器才可能会抛出 PriorityWaitException 异常这部分内容将在分析 FlowSlot 的实现源码时再作分析。当捕获到 PriorityWaitException 异常时说明当前请求已经被休眠了一会了但请求还是允许通过的只是不需要为 DefaultNode 记录这个请求的指标数据了只自增当前资源并行占用的线程数同时DefaultNode 也会为 ClusterNode 自增并行占用的线程数。最后也会回调所有 ProcessorSlotEntryCallback#onPass 方法。这部分源码如下。node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() ! null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); }第三种情况捕获到 BlockException 异常BlockException 异常只在需要拒绝请求时抛出。当捕获到 BlockException 异常时将异常记录到调用链路上下文的当前 EntryStatisticSlot 的 exit 方法会用到然后调用 DefaultNode#increaseBlockQps 方法记录当前请求被拒绝将当前时间窗口的 block qps 这项指标数据的值加 1。如果调用来源不为空让调用来源的 StatisticsNode 也记录当前请求被拒绝如果流量类型为 IN则让用于统计所有资源指标数据的 ClusterNode 也记录当前请求被拒绝。这部分的源码如下// Blocked, set block exception to current entry. context.getCurEntry().setError(e); // Add block count. node.increaseBlockQps(count); if (context.getCurEntry().getOriginNode() ! null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallbackDefaultNode handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e;StatisticSlot 捕获 BlockException 异常只是为了收集被拒绝的请求BlockException 异常还是会往上抛出。抛出异常的目的是为了拦住请求让入口处能够执行到 catch 代码块完成请求被拒绝后的服务降级处理。第四种情况捕获到其它异常。其它异常并非指业务异常因为此时业务代码还未执行而业务代码抛出的异常是通过调用 Tracer#trace 方法记录的。当捕获到非 BlockException 异常时除 PriorityWaitException 异常外其它类型的异常都同样处理。让 DefaultNode 记录当前请求异常将当前时间窗口的 exception qps 这项指标数据的值加 1。调用来源的 StatisticsNode、用于统计所有资源指标数据的 ClusterNode 也记录下这个异常。这部分源码如下// Unexpected error, set error to current entry. context.getCurEntry().setError(e); // This should not happen. node.increaseExceptionQps(count); if (context.getCurEntry().getOriginNode() ! null) { context.getCurEntry().getOriginNode().increaseExceptionQps(count); } if (resourceWrapper.getEntryType() EntryType.IN) { Constants.ENTRY_NODE.increaseExceptionQps(count); } throw e;exit 方法exit 方法被调用时要么请求被拒绝要么请求被放行并且已经执行完成所以 exit 方法需要知道当前请求是否正常执行完成这正是 StatisticSlot 在捕获异常时将异常记录到当前 Entry 的原因exit 方法中通过 Context 可获取到当前 CtEntry从当前 CtEntry 可获取 entry 方法中写入的异常。exit 方法源码如下有删减Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { DefaultNode node (DefaultNode)context.getCurNode(); if (context.getCurEntry().getError() null) { // 计算耗时 long rt TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime(); // 记录执行耗时与成功总数 node.addRtAndSuccess(rt, count); if (context.getCurEntry().getOriginNode() ! null) { context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count); } // 自减当前资源占用的线程数 node.decreaseThreadNum(); // origin 不为空 if (context.getCurEntry().getOriginNode() ! null) { context.getCurEntry().getOriginNode().decreaseThreadNum(); } // 流量类型为 in 时 if (resourceWrapper.getEntryType() EntryType.IN) { Constants.ENTRY_NODE.addRtAndSuccess(rt, count); Constants.ENTRY_NODE.decreaseThreadNum(); } } // Handle exit event with registered exit callback handlers. CollectionProcessorSlotExitCallback exitCallbacks StatisticSlotCallbackRegistry.getExitCallbacks(); for (ProcessorSlotExitCallback handler : exitCallbacks) { handler.onExit(context, resourceWrapper, count, args); } fireExit(context, resourceWrapper, count); }exit 方法中通过 Context 可获取当前资源的 DefaultNode如果 entry 方法中未出现异常那么说明请求是正常完成的在请求正常完成情况下需要记录请求的执行耗时以及响应是否成功可将当前时间减去调用链路上当前 Entry 的创建时间作为请求的执行耗时。资源指标数据的记录过程ClusterNode 才是一个资源全局的指标数据统计节点但我们并未在 StatisticSlot#entry 方法与 exit 方法中看到其被使用。因为 ClusterNode 被 ClusterBuilderSlot 交给了 DefaultNode 掌管在 DefaultNode 的相关指标数据收集方法被调用时ClusterNode 的对应方法也会被调用如下代码所示public class DefaultNode extends StatisticNode { ...... private ClusterNode clusterNode; Override public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } }记录某项指标数据指的是针对当前请求记录当前请求的某项指标数据例如请求被放行、请求被拒绝、请求的执行耗时等。假设当前请求被成功处理StatisticSlot 会调用 DefaultNode#addRtAndSuccess 方法记录请求处理成功、并且记录处理请求的耗时DefaultNode 先调用父类的 addRtAndSuccess 方法然后 DefaultNode 会调用 ClusterNode#addRtAndSuccess 方法。ClusterNode 与 DefaultNode 都是 StatisticNode 的子类StatisticNode#addRtAndSuccess 方法源码如下Override public void addRtAndSuccess(long rt, int successCount) { // 秒级滑动窗口 rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); // 分钟级的滑动窗口 rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); }rollingCounterInSecond 是一个秒级的滑动窗口rollingCounterInMinute 是一个分钟级的滑动窗口类型为 ArrayMetric。分钟级的滑动窗口一共有 60 个 MetricBucket每个 MetricBucket 都被 WindowWrap 包装每个 MetricBucket 统计一秒钟内的各项指标数据如下图所示当调用 rollingCounterInMinute#addSuccess 方法时由 ArrayMetric 根据当前时间戳获取当前时间窗口的 MetricBucket再调用 MetricBucket#addSuccess 方法将 success 这项指标的值加上方法参数传递进来的值一般是 1。MetricBucket 使用 LongAdder 记录各项指标数据的值。Sentinel 在 MetricEvent 枚举类中定义了 Sentinel 会收集哪些指标数据MetricEvent 枚举类的源码如下public enum MetricEvent { PASS, BLOCK, EXCEPTION, SUCCESS, RT, OCCUPIED_PASS }pass 指标请求被放行的总数block请求被拒绝的总数exception请求处理异常的总数success请求被处理成功的总数rt被处理成功的请求的总耗时occupied_pass预通过总数前一个时间窗口使用了当前时间窗口的 passQps其它的指标数据都可通过以上这些指标数据计算得出例如平均耗时可根据总耗时除以成功总数计算得出。资源指标数据统计总结一个调用链路上只会创建一个 Context在调用链路的入口创建一个调用链路上第一个被 Sentinel 保护的资源。一个 Context 名称只创建一个 EntranceNode也是在调用链路的入口创建调用 Context#enter 方法时创建。与方法调用的入栈出栈一样一个线程上调用多少次 SphU#entry 方法就会创建多少个 CtEntry前一个 CtEntry 作为当前 CtEntry 的父节点当前 CtEntry 作为前一个 CtEntry 的子节点构成一个双向链表。Context.curEntry 保存的是当前的 CtEntry在调用当前的 CtEntry#exit 方法时由当前 CtEntry 将 Context.curEntry 还原为当前 CtEntry 的父节点 CtEntry。一个调用链路上如果多次调用 SphU#entry 方法传入的资源名称都相同那么只会创建一个 DefaultNode如果资源名称不同会为每个资源名称创建一个 DefaultNode当前 DefaultNode 会作为调用链路上的前一个 DefaultNode 的子节点。一个资源有且只有一个 ProcessorSlotChain一个资源有且只有一个 ClusterNode。一个 ClusterNode 负责统计一个资源的全局指标数据。StatisticSlot 负责记录请求是否被放行、请求是否被拒绝、请求是否处理异常、处理请求的耗时等指标数据在 StatisticSlot 调用 DefaultNode 用于记录某项指标数据的方法时DefaultNode 也会调用 ClusterNode 的相对应方法完成两份指标数据的收集。DefaultNode 统计当前资源的各项指标数据的维度是同一个 Context名称相同而 ClusterNode 统计当前资源各项指标数据的维度是全局。