langchain入门3-LCEL核心源码速通
书接上回,在看完了LCEL如何使用之后,让我们再来了解一下它是如何实现的。
管道运算符
在上一篇博客中,我们使用了一个很简单的demo chain:
chain = prompt | model | output_parser
。
我们可以看到这段代码中使用了,运算符|
,熟悉python的同学肯定知道,这里使用了python的magic method,也就是说它一定具有__or__
函数。
prompt | model
就相当于prompt.__or__(model)
。
如果用过agent的朋友,可能也会发现这样的使用方式:
{
"input": lambda x: x["input"],
"agent_scratchpad": lambda x: format_to_openai_function_messages(
x["intermediate_steps"]
),
}
| prompt
| llm_with_tools
| OpenAIFunctionsAgentOutputParser()
这里开头是一个dict,我们知道dict有自己默认的__or__
,用于两个dict的和合并操作,接受参数必须为dict,那它是如何运行的呢。这里实际上,是因为prompt实现了__ror__
,这个magic method支持从右往左的or运算,dict|prompt
,相当于prompt.__ror__(dict)
。
顺着管道符,我们继续深入,就可以发现这一切的核心其实是RunnableSerializble
。
类图
我这里简单画了一下相关的UML类图。
我们平时使用的所有LCEL相关的组件都继承自RunnableSerializable。
RunnableSerializable
RunnableSerializable 分为两部分Runnable
和Serializable
。其中Serializable是继承自Pydantic的BaseModel。(py+pedantic=Pydantic,是非常流行的参数验证框架)Serializable提供了,将Runnable序列化的能力。而Runnable,则是LCEL组件最重要的一个抽象类,包含了最核心的能力。
Runnable
Runnble作为一个抽象类,它有几个重要的抽象方法。
- invoke/ainvoke: 单个输入转为输出。
- batch/abatch:批量转换。
- stream/astream: 单个流式处理。
- astream_log:从输入流流式获取结果与中间步骤。
注意a开头的函数代表具有异步能力。
同时Runnbale也实现了两个重要的magic method ,就是前面说的用于支持管道操作符|
的 __or__
与__ror__
。
def __or__(
self,
other: Union[
Runnable[Any, Other],
Callable[[Any], Other],
Callable[[Iterator[Any]], Iterator[Other]],
Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],
],
) -> RunnableSerializable[Input, Other]:
# coerce_to_runnable是将other强制转换为Runnable
"""Compose this runnable with another object to create a RunnableSequence."""
return RunnableSequence(self, coerce_to_runnable(other))
def __ror__(
self,
other: Union[
Runnable[Other, Any],
Callable[[Other], Any],
Callable[[Iterator[Other]], Iterator[Any]],
Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],
],
) -> RunnableSerializable[Other, Output]:
"""Compose this runnable with another object to create a RunnableSequence."""
return RunnableSequence(coerce_to_runnable(other), self)
可以发现Runnable之间编排以后,会生成一个RunnableSequence。
RunnableSequence
RunnableSequence 顾名思义就按顺序执行的Runnable。
如果我们运行最终编排好的Chain,例如chain.invoke({"topic": "ice cream"})
,实际上就是执行了RunnableSequence的invoke。那我们先来看看invoke函数。
# config对象,可以设置一些并发数、标签等等配置,默认情况下为空。
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:
from langchain_core.beta.runnables.context import config_with_context
# 根据上下文补充config
config = config_with_context(ensure_config(config), self.steps)
# 创建回调管理器,用于支持运行中产生的各种回调
callback_manager = get_callback_manager_for_config(config)
# 创建运行管理器,用于处理异常重试,结束等情况
run_manager = callback_manager.on_chain_start(
dumpd(self), input, name=config.get("run_name") or self.get_name()
)
# !!关键内容!!
# 调用整个链
try:
# 顺序执行step,每一步的输出,将作为下一步的输入
for i, step in enumerate(self.steps):
input = step.invoke(
input,
# 为下一个step更新config
patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{i+1}")
),
)
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input)
return cast(Output, input)
我们上面讲过Runnable 规定了多个抽象方法,而RunnableSequence也分别实现了它们,总的来说与invoke类似,各位感兴趣可以自行查看源码。
RunnableParallel
RunnableParallel为LCEL提供了并行执行能力。
同样的让我们来看一下RunnableParallel的invoke,需要注意的一点是RunnableParallel的step与RunnableSequence不同,是一个dict。引用入门2的那篇博客的例子 setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
。这里context和question会变成两个step。
try:
# 复制steps,处理过程中steps改变,产生问题。
steps = dict(self.steps)
# 根据config中的并发数设置,创建一个 ThreadPoolExecutor
with get_executor_for_config(config) as executor:
# 按顺序提交func,收集futures。
futures = [
executor.submit(
step.invoke,
input,
patch_config(
config,
callbacks=run_manager.get_child(f"map:key:{key}"),
),
)
for key, step in steps.items()
]
# 等待所有结果完成,拼装成dict输出。
output = {key: future.result() for key, future in zip(steps, futures)}
# finish the root run
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
总结
本篇博客,我们简单的讲述了LCEL的管道操作符是如何实现的,以及背后最核心的Runnable抽象类,和它最重要的两个子类RunnableSequence与RunnableParallel。至于其他的实现类例如Prompt相关的、LanguageModel相关的、等其他组件,各位感兴趣可以自己了解一下内部实现。
转载自:https://juejin.cn/post/7328204968636252198