Pregel中的Node对应的类型为PregelNode。对于一个PregelNode对象来说它最核心的部分就是绑定在它上面的一个可执行操作它是抽象类Runnable的子类。在LangChain整个体系中Runnable类型几乎无处不在包括语言模型不论是传统的LLM模型还是Chat模型、实现RAG的Retriever和提示词模板等组件都是一个Runnable对象实际上Pregel本身也是一个Runnable对象。LangChain的“Chain”就是由一组Runnable对象按照指定的顺序构建而成。Runnable如此重要值得单开一个系列进行独立介绍这里我们可用先将它理解成一个可执行对象可用帮助我们执行定义Node的函数。class PregelNode: bound: Runnable[Any, Any]1. 输入PregelNode的channels和triggers字段表示作为输入和触发器的Channel的名称。由于ManagedValue也可用作为输入所以channels字段也可用包含ManagerValue的名称这里我们统称为Channel。同一个Channel可以同时作为输入和触发器出现在这两个字段中。class PregelNode: channels : str | list[str] triggers : list[str]对于单一的输入Channel名称可以设置为字符串也可以封装成列表它们会影响Node处理函数的输入参数传递方式。对于前者字符串对应Channel的值会作为原始参数传递给Node的处理函数后者则会封装成字典进行传递字典的Key为Channel的名称。如下的演示实例体现了这一点。from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel._read import PregelNode def build_node(node_name:str)-PregelNode: channel_in f{node_name}_in channel_out f{node_name}_out node (NodeBuilder() .do(lambda args:args) .write_to(channel_out)).build() node.triggers [channel_in] node.channels channel_in if node_name foo else [channel_in] return node app Pregel( nodes {name: build_node(name) for name in [foo,bar]}, channels { foo_in: LastValue(str), bar_in: LastValue(str), foo_out: LastValue(object), bar_out: LastValue(object), }, input_channels[foo_in, bar_in], output_channels[foo_out, bar_out]) result app.invoke(input{foo_in:foobar, bar_in:foobar}) assert result[foo_out] foobar assert result[bar_out] {bar_in: foobar}如果一个Channel被多个Node作为输入只要该Channel被任一Node以列表的形式注册引擎内部的对齐机制将统一使用字典作为所有Node处理函数的输入。比如我们按照如下的方式修改了上面的程序是foo和bar两个Node都使用foobarChannel作为输入该输入Channel在bar中以列表的形式进行了设置以字符串形式设置的fooNode的处理函数的输入参数依然会编程字典。这是一个不为人知的细节。from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel._read import PregelNode def build_node(node_name:str)-PregelNode: node (NodeBuilder() .do(lambda args:args) .write_to(node_name)).build() node.triggers [input] node.channels input if node_name foo else [input] return node app Pregel( nodes {name: build_node(name) for name in [foo,bar]}, channels { input: LastValue(str), foo: LastValue(object), bar: LastValue(object), }, input_channels[input], output_channels[foo, bar]) result app.invoke(input{input:foobar}) assert result[foo] {input: foobar} assert result[bar] {input: foobar}2. 输出Node的执行结果依赖于PregelNode的writers字段返回的一组Runnable对象输出到对应的Channel系统默认使用的是一个ChannelWrite。如代码片段所示初始化ChannelWrite对象时需要提供一组ChannelWriteEntry或ChannelWriteTupleEntry来表示针对目标Channel的写入意图。另一个应用了cached_property装饰器的flat_writers返回一组扁平化的Runnable对象以提供性能。class PregelNode: writers : list[Runnable] cached_property def flat_writers(self) - list[Runnable]class ChannelWrite(RunnableCallable): writes: list[ChannelWriteEntry | ChannelWriteTupleEntry | Send] def __init__( self, writes: Sequence[ChannelWriteEntry | ChannelWriteTupleEntry | Send], *, tags: Sequence[str] | None None, )ChannelWriteEntry的channel和value字段分别表示输出Channel名称和值。skip_none字段决定是否需要忽略None值如果指定了mapper字段value还会被它作进一步处理并最终生成输出到Channel的值。一个ChannelWriteEntry对应一个单一Channel的输出而ChannelWriteTupleEntry可以完成针对多Channel的输出。具体来说Node处理函数返回的对象可以绑定在它的value字段上通过mapper提供的可执行对象映射为一个“二元组序列”此二元组的两部分对应输出Channel的名称和值。ChannelWriteTupleEntry的static设置的三元组序列仅作静态分析用可以忽略。class ChannelWriteEntry(NamedTuple): channel: str value: Any PASSTHROUGH skip_none: bool False mapper: Callable | None None class ChannelWriteTupleEntry(NamedTuple): mapper: Callable[[Any], Sequence[tuple[str, Any]] | None] value: Any PASSTHROUGH static: Sequence[tuple[str, Any, str | None]] | None None PASSTHROUGH object()在如下所示的演示实例中我们创建了一个包含唯一Node的Pregel对象并创建了一个包含单一ChannelWrite对象的列表作为该Node的writers字段。此ChannelWrite的writes列表包含两个ChannelWriteEntry和一个ChannelWriteTupleEntry它们分别完成了针对三个Channel的输出。from langgraph.channels import LastValue from langgraph.pregel import Pregel from langgraph.pregel._read import PregelNode from langgraph.pregel._write import ChannelWrite, ChannelWriteEntry, ChannelWriteTupleEntry entry1 ChannelWriteEntry( channelfoo, value 123 ) entry2 ChannelWriteEntry( channelbar, value 456, mapper lambda v: int(v) ) tuple_entry ChannelWriteTupleEntry( value {foo:123, bar:456}, mapper lambda v: [(baz, int(v[foo]) int(v[bar]))] ) node PregelNode( triggers[start], channels[], writers[ChannelWrite(writes[entry1, entry2, tuple_entry])] ) app Pregel( nodes{body: node}, channels{ start: LastValue(None), foo: LastValue(str), bar: LastValue(int), baz: LastValue(int) }, input_channels[start], output_channels[foo, bar, baz], ) result app.invoke(input{start: None}) assert result {foo: 123, bar: 456, baz: 579}对于通过PregelNode对象表示的Node来说其bound字段返回的Runnable对象用于执行处理函数操作执行的结果由writers列表的一组Runnable对象写入相应的Channel这两个核心工作最终会被如下这个node属性合并。对于Pregel来说该属性在功能上基本就代表了整个Node这也是该属性如此命名的原因。class PregelNode: cached_property def node(self) - Runnable[Any, Any] | None3. 输入映射Node基于Channel的输入、触发和输出分别对应channels、triggers和writers字段。如果所有输入Channel读取的原始输入以字典形式表示和提交给处理函数的参数有出入我们还可以利用mapper字段返回的可执行对象Callable[[Any], Any]作进一步映射。class PregelNode: mapper : Callable[[Any], Any] | None如下的演示程序通过Pregel的mapper字段设置为了一个Lambda表达式将提供给Node处理函数处理的字典转换成元组。from langgraph.channels import LastValue from langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel._read import PregelNode from typing import Tuple node: PregelNode (NodeBuilder() .subscribe_to(foo,bar) .do(lambda args:args) .write_to(output)).build() node.mapper lambda args:tuple(args.values()) app Pregel( nodes{body: node}, channels{ foo: LastValue(str), bar: LastValue(str), output: LastValue(Tuple[str,str]), }, input_channels[foo,bar], output_channels[output], ) result app.invoke(input{foo:hello, bar:world}) assert result[output] (hello,world)4. 失败重试Agent中的Node可能会涉及网络传输、数据检索等会导致瞬时错误的操作失败后自动重试机制是确保可靠性的主要手段我们可以利用PregelNode 的retry_policy字段设置相应的重试策略。具体的重试策略通过如下所示的RetryPolicy具名元组表示。RetryPolicy的max_attempts和initial_interval分别表示最大重试次数包含初次调用和第一次重试前的初始等待时间单位为秒。如果没有为Node设置针对性的重试策略Pregel的retry_policy字段设置的重试策略将作为兜底。class PregelNode: retry_policy : Sequence[RetryPolicy] | None class RetryPolicy(NamedTuple): initial_interval: float 0.5 backoff_factor: float 2.0 max_interval: float 128.0 max_attempts: int 3 jitter: bool True retry_on: ( type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool] ) default_retry_on class Pregel( PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]): retry_policy : Sequence[RetryPolicy] ()重试策略采用基于“间隔倍增”的退避机制Back off也就是下次重试等待时间是前一次等待的N倍这个倍数通过backoff_factor字段来提供max_interval字段为等待实现设置了上限。为了防止多个并发Node同时重试而产生“惊群效应”我们可以在重试间隔中添加随机抖动jitter是这一特性的开关。调用失败有很多原因重试在任何错误场景中都有意义我们可以利用retry_on字段设置为重置设置前置条件。该字段的默认值对应如下这个default_retry_on函数。def default_retry_on(exc: Exception) - bool: import httpx import requests if isinstance(exc, ConnectionError): return True if isinstance(exc, httpx.HTTPStatusError): return 500 exc.response.status_code 600 if isinstance(exc, requests.HTTPError): return 500 exc.response.status_code 600 if exc.response else True if isinstance( exc, ( ValueError, TypeError, ArithmeticError, ImportError, LookupError, NameError, SyntaxError, RuntimeError, ReferenceError, StopIteration, StopAsyncIteration, OSError, ), ): return False return True在如下的演示程序中我们定义了一个用于创建Pregel对象的build_pregel函数该函数的max_attempts参数决定组成返回Pregel对象的Node采用的RetryPolicy的重试次数。Node的处理函数是一个由get_handler函数返回的闭包该闭包在前两次执行的时候总是会排除异常。程序反映的情况是重试次数若设置为2调用会抛出异常但是若设置为3会保证成功调用。from langgraph.channels import LastValue from langgraph.types import RetryPolicy from langgraph.pregel import Pregel, NodeBuilder from langgraph.pregel._read import PregelNode from typing import Any def get_handler(): times 0 def handle(args: dict[str, Any]) - str: nonlocal times times 1 if times 3: raise Exception(manually thrown error.) return Success return handle def build_node(max_attempts: int) - PregelNode: node ( NodeBuilder().subscribe_to(start).do(get_handler()).write_to(output) ).build() node.retry_policy [RetryPolicy(max_attemptsmax_attempts)] return node def build_pregel(max_attempts: int) - Pregel: return Pregel( nodes{body: build_node(max_attempts)}, channels{ start: LastValue(None), output: LastValue(str), }, input_channels[start], output_channels[output], ) app build_pregel(max_attempts2) try: app.invoke(input{start: None}) assert False, Expected an exception but none was raised. except Exception as e: assert str(e) manually thrown error. app build_pregel(max_attempts3) result app.invoke(input{start: None}) assert result[output] Success5. 结果缓存如果Node绑定一个相对耗时的计算并且结果完全由给定的输入决定那么针对输入对结果予以缓存无疑是改善时延的好办法。基于结果的缓存可以通过PregelNode 的cache_policy字段返回的缓存策略来控制。缓存策略通过CachePolicy类型表示它具有key_func和ttl两个字段前者提供一个用于解析缓存键字符串或者字节数组的可执行对象后者用于设置缓存过期时间如果没有显示设置意味着永不过期。class PregelNode: cache_policy : CachePolicy | None cached_property def input_cache_key(self) - INPUT_CACHE_KEY_TYPE dataclass(**_DC_KWARGS) class CachePolicy(Generic[KeyFuncT]): key_func: KeyFuncT default_cache_key ttl: int | None None KeyFuncT TypeVar(KeyFuncT, boundCallable[..., str | bytes]) INPUT_CACHE_KEY_TYPE tuple[Callable[..., Any], tuple[str, ...]] def default_cache_key(*args: Any, **kwargs: Any) - str | bytes: return pickle.dumps((_freeze(args), _freeze(kwargs)), protocol5, fix_importsFalse)对结果实施缓存的前提是需要将输入的“指纹”作为缓存键这里的缓存键根据通过INPUT_CACHE_KEY_TYPE定义的二元组进行计算该二元组前半部分提供的可执行对象相当于一个哈希函数能够将原始内容转换成“指纹”后者提供的多元组以路径的方式唯一标识当前的节点。CachePolicy的key_func字段对应的可执行对象将会作为INPUT_CACHE_KEY_TYPE二元组的前半部分从定义可以看出默认设置的default_cache_key函数会利用pickle以序列化的方式将原始输入转换成字节作为指纹。该指纹和二元组后半部分合并称为Node执行结果缓存项的Key。这里之所以需要强制使用字符串或者字节来表示缓存键是因为Pregel针对缓存的实现并不限于内存存储原则上可以与任意的内存数据库进行整合比如redis。缓存存储在Pregel中通过如下这个抽象基类BaseCache表示它定义了一系列的抽象方法完成针对缓存的读取、写入和清除我们可以通过派生此基类实现自定义的缓存存储。开发测试阶段我们经常会使用基于内存存储的InMemoryCache。如果没有对Node的缓存策略作针对性设置在Pregel对象上设置的缓存策略将作为兜底。class BaseCache(ABC, Generic[ValueT]): serde: SerializerProtocol JsonPlusSerializer(pickle_fallbackTrue) def __init__(self, *, serde: SerializerProtocol | None None) - None: self.serde serde or self.serde abstractmethod def get(self, keys: Sequence[FullKey]) - dict[FullKey, ValueT]: abstractmethod async def aget(self, keys: Sequence[FullKey]) - dict[FullKey, ValueT]: abstractmethod def set(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) - None: abstractmethod async def aset(self, pairs: Mapping[FullKey, tuple[ValueT, int | None]]) - None: abstractmethod def clear(self, namespaces: Sequence[Namespace] | None None) - None: abstractmethod async def aclear(self, namespaces: Sequence[Namespace] | None None) - None: class Pregel( PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]): cache: BaseCache | None None cache_policy: CachePolicy | None None在如下所示的演示程序中对于创建的Pregel的唯一Node它虽然具有两个输入Channelfoo和bar但是对应的处理函数会将当前时间戳作为返回结果。我们为该Node设置了缓存策略并将过期时间设置为30秒。from langgraph.channels import LastValue from langgraph.types import CachePolicy from langgraph.pregel import Pregel,NodeBuilder import datetime,time from langgraph.cache.memory import InMemoryCache node (NodeBuilder() .subscribe_to(foo,bar) .do(lambda _: datetime.datetime.now()) .write_to(output)).build() node.cache_policy CachePolicy(ttl30) app Pregel( nodes{body: node}, cacheInMemoryCache(), channels{ foo: LastValue(str), bar: LastValue(str), output: LastValue(str), }, input_channels[foo,bar], output_channels[output]) input {foo:abc, bar:xyz} result app.invoke(inputinput) print(f[{datetime.datetime.now()}]{input} - {result[output]}) time.sleep(5) result app.invoke(inputinput) print(f[{datetime.datetime.now()}]{input} - {result[output]}) time.sleep(5) input {foo:xyz, bar:abc} result app.invoke(inputinput) print(f[{datetime.datetime.now()}]{input} - {result[output]})我们以5秒为间隔调用了Pregel对象三次前两次使用相同的输入{foo:abc, bar:xyz}。三次调用的时间戳和输入输出会以如下的形式打印出来我们可以清晰地看到前两次由于提供了相同的参数所以得到了相同的结果很明显第二次得到的是缓存的结果。[2026-01-31 23:51:20.178285]{foo: abc, bar: xyz} - 2026-01-31 23:51:20.177192 [2026-01-31 23:51:25.180527]{foo: abc, bar: xyz} - 2026-01-31 23:51:20.177192 [2026-01-31 23:51:30.183805]{foo: xyz, bar: abc} - 2026-01-31 23:51:30.1824906.补遗前面已经介绍了PregelNode类型的大部分核心成员对于如下几个遗漏的成员我们在这里作一下概况性介绍。tags使我们可以在Node上打上相应的标签而metadata则可以在它上面附加任意的元数据。class PregelNode: tags : Sequence[str] | None metadata : Mapping[str, Any] | None subgraphs : Sequence[PregelProtocol] def copy(self, update: dict[str, Any]) - PregelNode def invoke( self, input: Any, config: RunnableConfig | None None, **kwargs: Any | None, ) - Any async def ainvoke( self, input: Any, config: RunnableConfig | None None, **kwargs: Any | None, ) - Any def stream( self, input: Any, config: RunnableConfig | None None, **kwargs: Any | None, ) - Iterator[Any] async def astream( self, input: Any, config: RunnableConfig | None None, **kwargs: Any | None, ) - AsyncIterator[Any]Node结合边构成了图而图本身也可以作为一个Node参与构建一个更大的图所以图具有一个嵌套的层级结构一个Node可以包含一组子图体现在subgraphs字段上。方法copy对返回自身的一个浅拷贝至于四个方法invoke、ainvoke、stream和astream实现的两种调用模式最终还是通过调用bound字段的Runnable对象的同名方法实现的。我们最后使用最简单的语言对Pregel做一个总结我们可以将 PregelNode 想象成一个智能反应堆其中triggers是点火装置决定什么时候开始channels是原料管道输入数据mapper是入料加工数据预处理bound是核心反应室业务逻辑writers是成品输送带更新状态。7. NodeBuilder为了让大家对表示Node的PregelNode类型有深入地理解在前面的演示中我们大都采用直接对其字段进行设置的方式实际上在真正的开发中基本不会这么做而是选择使用NodeBuilder来构建它后者提供更加精简的API。class NodeBuilder: def subscribe_only( self, channel: str, ) - Self def subscribe_to( self, *channels: str, read: bool True, ) - Self def read_from( self, *channels: str, ) - Self def write_to( self, *channels: str | ChannelWriteEntry, **kwargs: _WriteValue, ) - Self def meta(self, *tags: str, **metadata: Any) - Self def add_retry_policies(self, *policies: RetryPolicy) - Self def add_cache_policy(self, policy: CachePolicy) - Self def build(self) - PregelNode如果构建的Node只需定义一个Channel我们可以调用subscribe_only方法它将以字符串的不是列表形式赋值给channels字段。subscribe_to方法默认会将指定的Channel同时添加到triggers和channels列表中如果将read参数设置为False指定的Channel只会作为输入添加到channels列表中。read_from方法指定的仅仅是输入Channel所以只会添加到channels列表中。如果自行构建PregelNode针对Channel的输出会很麻烦使用NodeBuilder的write_to方法就简单多了我们只需要指定输出Channel的名称列表就可以了。如果需要多输出作更细粒度的控制也可以指定一组ChannelWriteEntry对象。我们也可以利用write_to方法提供的关键字参数针对Channel的写入此时参数名会作为输出Channel的名称其类型_WriteValue定义如下所以我们可以使用兼容的Lambda表达式简单快捷地完成输出。_WriteValue Callable[[Input], Output] | Any