likes
comments
collection
share

langchain入门3-LCEL核心源码速通

作者站长头像
站长
· 阅读数 8

书接上回,在看完了LCEL如何使用之后,让我们再来了解一下它是如何实现的。

管道运算符

在上一篇博客中,我们使用了一个很简单的demo chain: chain = prompt | model | output_parser。 我们可以看到这段代码中使用了,运算符|,熟悉python的同学肯定知道,这里使用了python的magic method,也就是说它一定具有__or__函数。 prompt | model就相当于prompt.__or__(model)。 如果用过agent的朋友,可能也会发现这样的使用方式:

    {  
        "input": lambda x: x["input"],  
        "agent_scratchpad": lambda x: format_to_openai_function_messages(  
            x["intermediate_steps"]  
        ),  
    }  
    | prompt  
    | llm_with_tools  
    | OpenAIFunctionsAgentOutputParser()

这里开头是一个dict,我们知道dict有自己默认的__or__,用于两个dict的和合并操作,接受参数必须为dict,那它是如何运行的呢。这里实际上,是因为prompt实现了__ror__,这个magic method支持从右往左的or运算,dict|prompt,相当于prompt.__ror__(dict)

顺着管道符,我们继续深入,就可以发现这一切的核心其实是RunnableSerializble

类图

我这里简单画了一下相关的UML类图。

Runnable
String name
invoke/ainvoke()
batch/abatch()
stream/astream()
astream_log()
__or__() : RunnableSequence
__ror__() : RunnableSequence
RunnableSerializable
Serializable
baseModel
RunnableSequence
+invoke()
..()
RunnableParallel
+invoke()
..()
BasePromptTemplate
+invoke()
..()
BaseLanguageModel
generate_prompt()
get_num_tokens()
BaseChatModel
+invoke()
..()
Etc

我们平时使用的所有LCEL相关的组件都继承自RunnableSerializable。

RunnableSerializable

RunnableSerializable 分为两部分RunnableSerializable。其中Serializable是继承自Pydantic的BaseModel。(py+pedantic=Pydantic,是非常流行的参数验证框架)Serializable提供了,将Runnable序列化的能力。而Runnable,则是LCEL组件最重要的一个抽象类,包含了最核心的能力。

Runnable

Runnble作为一个抽象类,它有几个重要的抽象方法。

  • invoke/ainvoke: 单个输入转为输出。
  • batch/abatch:批量转换。
  • stream/astream: 单个流式处理。
  • astream_log:从输入流流式获取结果与中间步骤。

注意a开头的函数代表具有异步能力。

同时Runnbale也实现了两个重要的magic method ,就是前面说的用于支持管道操作符|__or____ror__

def __or__(  
    self,  
    other: Union[  
        Runnable[Any, Other],  
        Callable[[Any], Other],  
        Callable[[Iterator[Any]], Iterator[Other]],  
        Mapping[str, Union[Runnable[Any, Other], Callable[[Any], Other], Any]],  
    ],  
) -> RunnableSerializable[Input, Other]: 
	# coerce_to_runnable是将other强制转换为Runnable
    """Compose this runnable with another object to create a RunnableSequence."""  
    return RunnableSequence(self, coerce_to_runnable(other))  
  
def __ror__(  
    self,  
    other: Union[  
        Runnable[Other, Any],  
        Callable[[Other], Any],  
        Callable[[Iterator[Other]], Iterator[Any]],  
        Mapping[str, Union[Runnable[Other, Any], Callable[[Other], Any], Any]],  
    ],  
) -> RunnableSerializable[Other, Output]:  
    """Compose this runnable with another object to create a RunnableSequence."""  
    return RunnableSequence(coerce_to_runnable(other), self)

可以发现Runnable之间编排以后,会生成一个RunnableSequence。

RunnableSequence

RunnableSequence 顾名思义就按顺序执行的Runnable。

如果我们运行最终编排好的Chain,例如chain.invoke({"topic": "ice cream"}),实际上就是执行了RunnableSequence的invoke。那我们先来看看invoke函数。

# config对象,可以设置一些并发数、标签等等配置,默认情况下为空。
def invoke(self, input: Input, config: Optional[RunnableConfig] = None) -> Output:  
    from langchain_core.beta.runnables.context import config_with_context  
  
    # 根据上下文补充config
    config = config_with_context(ensure_config(config), self.steps)  
    # 创建回调管理器,用于支持运行中产生的各种回调
    callback_manager = get_callback_manager_for_config(config)  
    # 创建运行管理器,用于处理异常重试,结束等情况
    run_manager = callback_manager.on_chain_start(  
        dumpd(self), input, name=config.get("run_name") or self.get_name()  
    )  
	# !!关键内容!!
    # 调用整个链
    try:  
	    # 顺序执行step,每一步的输出,将作为下一步的输入
        for i, step in enumerate(self.steps):  
            input = step.invoke(  
                input,  
                # 为下一个step更新config 
                patch_config(  
                    config, callbacks=run_manager.get_child(f"seq:step:{i+1}")  
                ),  
            )  
    # finish the root run  
    except BaseException as e:  
        run_manager.on_chain_error(e)  
        raise  
    else:  
        run_manager.on_chain_end(input)  
        return cast(Output, input)

我们上面讲过Runnable 规定了多个抽象方法,而RunnableSequence也分别实现了它们,总的来说与invoke类似,各位感兴趣可以自行查看源码。

RunnableParallel

RunnableParallel为LCEL提供了并行执行能力。 同样的让我们来看一下RunnableParallel的invoke,需要注意的一点是RunnableParallel的step与RunnableSequence不同,是一个dict。引用入门2的那篇博客的例子 setup_and_retrieval = RunnableParallel({"context": retriever, "question": RunnablePassthrough()})。这里context和question会变成两个step。

try:  
    # 复制steps,处理过程中steps改变,产生问题。
    steps = dict(self.steps)
    # 根据config中的并发数设置,创建一个 ThreadPoolExecutor 
    with get_executor_for_config(config) as executor:
	    # 按顺序提交func,收集futures。  
        futures = [  
            executor.submit(  
                step.invoke,  
                input,  
                patch_config(  
                    config,  
                    callbacks=run_manager.get_child(f"map:key:{key}"),  
                ),  
            )  
            for key, step in steps.items()  
        ]
        # 等待所有结果完成,拼装成dict输出。 
        output = {key: future.result() for key, future in zip(steps, futures)}  
# finish the root run  
except BaseException as e:  
    run_manager.on_chain_error(e)  
    raise  
else:  
    run_manager.on_chain_end(output)  
    return output

总结

本篇博客,我们简单的讲述了LCEL的管道操作符是如何实现的,以及背后最核心的Runnable抽象类,和它最重要的两个子类RunnableSequence与RunnableParallel。至于其他的实现类例如Prompt相关的、LanguageModel相关的、等其他组件,各位感兴趣可以自己了解一下内部实现。