ManagedValue——一种特殊的只读虚拟通道
如果我们仔细查看Pregel类的定义可以看出其channels字段返回一个字典字典的值的类型联合了BaseChannel和ManagedValueSpec两种类型前者是Channel的基类后者就是ManagedValue类的别名。class Pregel( PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]): channels : dict[str, BaseChannel | ManagedValueSpec] ManagedValueSpec type[ManagedValue]如果说Channel存储的是的业务状态那么ManagedValue传递的就是Pregel这个执行引擎的运行时状态。一般来说ManagedValue自身不负责存储状态其提供的值可以实时计算得出所以它不参与基于Checkpoint的持久化。从如下所示的代码片段可以看出ManagedValue仅仅定义了一个唯一的静态抽象方法get返回对应的值由于作为输入的PregelScratchpad对象提供的信息有限所以ManagedValue能够发挥的空间其实很有限在大部分情况下用不到它。class ManagedValue(ABC, Generic[V]): staticmethod abstractmethod def get(scratchpad: PregelScratchpad) - V: ...1. PregelScratchpadManagedValue提供的值是通过其get方法根据PregelScratchpad对象计算所得。当确定后续待执行的Node后引擎会为每个Node创建一个任务每个任务都会附加一个PregelScratchpad对象。PregelScratchpad的step和stop字段就返回当前Superstep的序号和针对迭代的限制最大超步数其它字段与持久化有关。dataclasses.dataclass(**_DC_KWARGS) class PregelScratchpad: step : int stop : int call_counter : Callable[[], int] interrupt_counter : Callable[[], int] get_null_resume : Callable[[bool], Any] resume : list[Any] subgraph_counter : Callable[[], int]PregelScratchpad的call_counter、interrupt_counter和subgraph_counter字段以闭包的形式返回一个计数器。call_counter计数器用于为当前Superstep内产生的所有任务分配唯一的内部序列号。1.1 Resume Value和中断计数器interrupt_counter、get_null_resume和resume字段与Pregel基于 “中断Interrupt/恢复Resume” 的执行方式有关。假设Pregel的对应一个需要人工介入的多级审批流程在每次需要以人工介入的方式收集审批者决定的时候流程进入一个中断当前的状态被持久化。当审批决定给出后流程以 “恢复” 的形式开始执行中断时持久化的快照被提取出来 “恢复现场” 审批决定以Resume Value的形式提供给引擎。为了匹配多个中断点与对应的Resume Value后者会按照顺序被持久化并在恢复执行的时候连同当前提供的Resume Value一并填充到PregelScratchpad的resume列表中。恢复执行做不到在中断点出开始执行它总是从头执行Node的处理函数所以定义幂等Node应该成为Agent编程的 “金科玉律”。由于PregelScratchpad的resume字段会按照中断的顺序存放Resume Value所以在恢复执行的时候每遇到一个中断引擎可以利用interrupt_counter字段返回的计数器作为位置索引从resume列表中将匹配的Resume Value提取出来。如果提取的Resume Value为None或者计数器返回的索引越界get_null_resume字段提供的回调就会执行。这个回调函数具有一个bool类型的参数is_called调用时该参数被设置为True表示该中断确实被触发了但没有对应的数据。这会消耗掉这个中断位确保流程不至于永远得不到恢复。1.2 子图调用计数器如果说interrupt_counter计数器旨在解决每次中断与提供的Resume Value的匹配问题那么subgraph_counter计数器解决的每次“子图调用”与对应Pregel实例的匹配问题。如果站在“图”的视角每个Pregel对象就是由多个Node组成的图而Pregel也可以作为一个Node出现在另一个Pregel构建的图中两个Pregel之间就称为了“父子”关系子Pregel构建的图就是“子图”针对它的调用就是子图调用。虽然在同一个图中每个Pregel会独自完成自身的持久化。在恢复执行场景中引擎会率先加载作为“根”的Pregel对应的Checkpoint来恢复现场。当遇到“子图”形式调用另一个Pregel时引擎会加载对应的Checkpoint来恢复子图在中断那个时间点的状态。现在问题来了在子Pregel众多持久化的Checkpoint中怎么知道该加载哪一个呢这个问题本质上是如何解决作为子图执行的Pregel在执行持久化时如何将生成的Checkpoint与当前执行上下文进行匹配的问题这个问题是利用Checkpoint命名空间来解决的。Node是以任务的形式被执行的每个任务具有唯一的ID并且在恢复时保持不变如果命名空间由执行链路上每个任务的节点名称任务ID组成那么子图的Checkpoint就能利用此命名空间关联起来。但是问题还是没有完全解决如果同一个任务涉及针对同一子图的多次调用如命名空间只包含基于任务的执行路径此时两个子图会共享相同的命名空间具体对应哪个Checkpoint依然无法解决。因此若涉及同一个Node针对同一个Pregel对象的多次调用持久化这个Pregel的Checkpoint的命名空间还应该包含调用顺序。Checkpoint的命名空间的规则可以通过如下这个演示实例来证实。如代码片段所示我们创建了一个由单一Node组成的Pregel对象sub_graph命名为 “baz” 的Node在执行的时候会从当前的RunnableConfig配置中提取并输出当前的Checkpoint命名空间。from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue from langgraph.checkpoint.memory import InMemorySaver from langgraph.pregel._write import ChannelWrite, ChannelWriteTupleEntry from langgraph.types import RunnableConfig from typing import Any def handle(args:dict[str,Any], config:RunnableConfig)-None: print(config[configurable][checkpoint_ns]) sub_node (NodeBuilder() .subscribe_to(start) .do(handle)) sub_graph Pregel( nodes{baz: sub_node}, channels{ start: LastValue(None), }, input_channels[start], output_channels[]) def handle1(args:dict[str,Any])-None: sub_graph.invoke(input{start: None}) def handle2(args:dict[str,Any])-str: sub_graph.invoke(input{start: None}) sub_graph.invoke(input{start: None}) foo (NodeBuilder() .subscribe_to(foo) .do(handle1) .write_to(barNone)) bar (NodeBuilder() .subscribe_to(bar) .do(handle2)) graph Pregel( nodes{foo: foo, bar: bar}, channels{ foo: LastValue(None), bar: LastValue(str), }, input_channels[foo], output_channels[], checkpointer InMemorySaver()) config {configurable: {thread_id: 123}} graph.invoke(input{foo: None}, configconfig)在另一个Pregel中我们为它设置了两个先后执行的Nodefoo和bar前者调用sub_graph一次后者调用两次。针对三次调用sub_graph为自身持久化设置的Checkpoint命名会以如下的形式输出可以看出命名空间同时体现了调用链路和次序。foo:36817c76-c3f7-643f-7924-0d29b39f469a|baz:311cc911-96a0-56b6-225b-28e4cece7cd9 bar:97be6a71-1b71-7364-e691-a122cfef1a92|baz:789287de-869f-42b8-dd03-7518820daaa6 bar:97be6a71-1b71-7364-e691-a122cfef1a92|1|baz:dd1ddd1b-fc62-b46a-c2ec-6a1d8344b793基于Pregel“中断/恢复”的执行方式让我们对Pregel实例会有特别的理解。我们习惯了将一个通过调用某个类构造函数创建的对象视为该类型的一个实例但是在Node的处理函数中即使针对同一Pregel实例的连续两次调用都有可能出现中断一旦恢复执行后一个实例就有可能使根据另一个Checkpoint的状态创建的它自然也就不是原来的那个实例了。在不断的“中断/恢复”执行流程中所谓Pregel实例有时候表示成对应的Checkpoint可能更准确。对于同一个节点任务来说如果涉及针对同一个子Pregel的多次调用从第二次调用开始对方持久化生成的Checkpoint会将调用次序包含在命名空间中。与之相对的在恢复执行的时候也需要根据当前的执行上下文提供包含此序号的命名空间采用加载对应的Checkpoint并最终恢复对应的Pregel对象PregelScratchpad的subgraph_counter字段返回的计数器就是为了提供这个序号。2. 两个原生的ManagedValue由于ManagedValue所能提供的值是根据PregelScratchpad计算生成而后者可用的唯有表示当前和最大Superstep序号的step和stop字段所以我们采用ManagedValue的应用场景其实很窄。我从只找到如下两个原生的ManagedValue类型它们都定义在langgraph.managed.is_last_step这个包中。其中一个IsLastStepManager用于判断是否为最后一个Superstep而RemainingStepsManager则用来确定余下的Superstep数。具体的实现非常简单仅仅是针对PregelScratchpad的step和stop字段的简单运算而已。class IsLastStepManager(ManagedValue[bool]): staticmethod def get(scratchpad: PregelScratchpad) - bool: return scratchpad.step scratchpad.stop - 1 class RemainingStepsManager(ManagedValue[int]): staticmethod def get(scratchpad: PregelScratchpad) - int: return scratchpad.stop - scratchpad.step由于ManagedValue属于一个计算属性所以它只能作为Node的输入。它可以被视为一种虚拟的ChannelNode针对ManagedValue和常规Channel的读取方式完全一致。在创建Pregel对象时所用到的ManagedValue需要在channels字段中显式声明但是不能将其添加到输入和输出Channel列表中。如下的实例演示了RemainingStepsManager的使用方式创建的Pregel由两个先后执行的Node构成foo和bar它们会将命名为remaining_steps的ManagedValue作为输入并将其分别输出到remaining_steps_after_foo和remaining_steps_after_bar这两个Channel中分别表示在这两个Node完成执行后所剩的Superstep数。from langgraph.pregel import Pregel, NodeBuilder from langgraph.managed.is_last_step import RemainingStepsManager from langgraph.channels import LastValue foo (NodeBuilder() .subscribe_to(foo) .read_from(remaining_steps) .do(lambda args: args[remaining_steps]) .write_to(remaining_steps_after_foo lambda args:args, barNone)) bar (NodeBuilder() .subscribe_to(bar) .read_from(remaining_steps) .do(lambda args: args[remaining_steps]) .write_to(remaining_steps_after_bar)) app Pregel( nodes{foo:foo, bar:bar}, channels{ foo: LastValue(None), bar:LastValue(None), remaining_steps_after_foo: LastValue(int), remaining_steps_after_bar:LastValue(int), remaining_steps: RemainingStepsManager, }, input_channels[foo], output_channels[remaining_steps_after_foo, remaining_steps_after_bar]) config {recursion_limit: 10} result app.invoke({foo:None}, configconfig) assert result[remaining_steps_after_foo] 10 assert result[remaining_steps_after_bar] 9在根据两个Node创建Pregel对象时我们将针对命名为remaining_steps的ManagedValue的声明添加到channels字段中对应的类型被设置为RemainingStepsManager。由于在调用Pregel对象时利用RunnableConfig配置将Superstep迭代限制为10所以先后执行的两个Node后剩余步数分别为10和9。