1. 两种任务创建方式我们将根据Node针对Channel的订阅来驱动任务执行的模式称为 “Pull模式”与之相对的则是解决“__pregel_tasks”这个Channel实现的“Push模式”。具体来说这是一个关闭“累积模式”的Topic类型的Channel它存储的“Topic”体现为具有如下定义的Send对象。当某个Node执行之后可以像这个Channel中写入一个Send来驱动某个Node在下一Superstep中执行。除了利用Send对象的node字段指定待执行的Node名称外还可以利用arg字段提供输入参数。class Send: node: str arg: Any def __init__(self, /, node: str, arg: Any) - None由于关闭了“累积模式”在Topic类型Channel中写入的内容只会在下一个Superstep中生效并且“阅后即焚”。对于执行引擎来说这个名为“__pregel_tasks”的Channel存储的就是下一Superstep以“Push模式”驱动执行的任务列表两者完美契合。2. 确认__pregel_tasks通道的存在__pregel_tasks通道的存在可以通过如下的演示实例来验证。如代码片段所示在采用常规方式将Pregel对象创建出来后我们根据Channel名称从它的channels字段中将此Channel提取出来。断言揭示了该Channel自身的类型、存储的数据类型和“累积模式”开关。from langgraph.channels import LastValue, Topic from langgraph.pregel import Pregel, NodeBuilder from langgraph.types import Send, Sequence node ( NodeBuilder() .subscribe_only(input_channel) .do(lambda args: args) .write_to(output_channel) ) app Pregel( nodes{node: node}, channels{input_channel: LastValue(str), output_channel: LastValue(str)}, input_channels[input_channel], output_channels[output_channel], ) tasks: Topic[Send] app.channels[__pregel_tasks] assert isinstance(tasks, Topic) assert tasks.ValueType Sequence[Send] assert tasks.accumulate False3. 被保护起来的通道虽然“__pregel_tasks”就是一个普通的Topic类型的Channel但是它并未开发对外部使用Pregel把它“保护”的非常好。我们不能声明一个与之同名的Channel否则就会像如下的方式一样抛出一个ValueError并提示“Channel __pregel_tasks is reserved and cannot be used in the graph.”。from langgraph.channels import Topic from langgraph.pregel import Pregel, NodeBuilder from langgraph.types import Send, Sequence try: app Pregel( nodes{node: NodeBuilder().subscribe_only(__pregel_tasks)}, channels{__pregel_tasks: Topic[Sequence[Send]]}, input_channels[input_channel], output_channels[output_channel], ) assert False, Expected an error due to reserved channel name except Exception as e: assert isinstance(e, ValueError) assert str(e) Channel __pregel_tasks is reserved and cannot be used in the graph.我们也不能采用常规的方式将向其发送Send对象。比如在如下的演示程序中节点foo试图向此Channel发送一个驱节点bar执行的Send对象最终抛出一个InvalidUpdateError异常并提示“Cannot write to the reserved channel TASKS”。除此之外由于Pregel在利用它将基于“PUSH模式”的任务创建出来后就会将其清空所以我们也无法读取其中的任务。from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder from langgraph.types import Send from langgraph.errors import InvalidUpdateError foo (NodeBuilder() .subscribe_to(start,read False) .do(lambda _: Send(nodebar, argfoobar)) .write_to(__pregel_tasks)) bar (NodeBuilder() .do(lambda args:args) .write_to(output)) app Pregel( nodes{ foo: foo, bar: bar}, channels{ start: LastValue(str), output: LastValue(str), }, input_channels[start], output_channels[output]) try: app.invoke({start: None}) assert False, Should have raised InvalidUpdateError except Exception as e: assert isinstance(e, InvalidUpdateError) assert str(e) Cannot write to the reserved channel TASKS4. 唯一的解决方案我们能够想到的常规方法针对此Channel的写入基本都绕不开引擎针对它的保护机制。我们将在下部分介绍Pregel另一个核心组成部分NodeNode会利用ChannelWriter对象实现针对Channel的写入我们可以将针对Channel的写入意图封装成ChannelWriteTupleEntry并以此来创建ChannelWriter这应该是唯一能够“欺骗”引擎写入验证的唯一手段。如代码片段所示率先执行的节点foo会返回一个驱动节点bar指定的Send对象为了将它写入“__pregel_tasks”我们创建了一个ChannelWriter针对该Channel的写入定义在ChannelWriteTupleEntry对象中具体体现在调用构造函数指定的mapper参数上它提供一个映射将Node的执行结果转成成Channel名称和值的映射关系。from langgraph.pregel import Pregel, NodeBuilder from langgraph.channels import LastValue from langgraph.pregel._read import PregelNode from langgraph.pregel._write import ChannelWrite, ChannelWriteTupleEntry from langgraph.types import Send foo: PregelNode (NodeBuilder() .subscribe_to(foo) .do(lambda _: Send(nodebar, argfoo)) ).build() entry ChannelWriteTupleEntry(mapper lambda args: [(__pregel_tasks, args)]) foo.writers.append(ChannelWrite(writes[entry])) bar (NodeBuilder() .do(lambda args: fbar is triggered by {args}.) .write_to(output)) app Pregel( nodes{foo: foo, bar: bar}, channels{ foo: LastValue(None), output: LastValue(str), }, input_channels[foo], output_channels[output], ) result app.invoke(input{foo: None}) assert result {output: bar is triggered by foo.}