likes
comments
collection
share

Langchain流式-Token统计

作者站长头像
站长
· 阅读数 2

标签:python/langchian

简介

    在使用Langchain进行开发时,我们经常需要对Token的消耗进行监控和统计,以便于优化资源使用和评估API调用的成本效率。Langchain框架提供了丰富的功能和灵活的配置选项,但在某些情况下,特别是在使用ChatOpenAI的流式方法或者在创建模型时指定stream=True参数,框架本身并不提供Token消耗量的具体数据。 这种情况下,我们可以借助OpenAI提供的Tokenizer库来对Token的消耗进行统计。通过Tokenizer库,我们可以精确地计算出每个请求所使用的Token数量,从而更好地管理和预测服务成本。 然而,对于不属于OpenAI的模型,使用Tokenizer库来统计Token消耗就变得不再适用。这种情况下,我们可能需要探索其他解决方案,比如实现自定义的Token统计逻辑,或者寻找第三方库来辅助完成这一任务。

版本

python 10

langchain = "~=0.1.x"

openai = "~=1.13.3"

langchain-openai = "~=0.1.1"

自定义类(ChatOpenAIImpl)

继承from langchain_openai import ChatOpenAI

重写_stream_astream方法,在其中添加代码:

if usage := choice.get("usage"): 
    generation_info["token_usage"] = usage

完整代码如下:

from typing import List, Optional, Any, Iterator, AsyncIterator

from langchain_community.chat_models.openai import _convert_delta_to_message_chunk
from langchain_core.callbacks import CallbackManagerForLLMRun, AsyncCallbackManagerForLLMRun
from langchain_core.messages import BaseMessage, AIMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_openai import ChatOpenAI


class ChatOpenAIImpl(ChatOpenAI):

    def _stream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        message_dicts, params = self._create_message_dicts(messages, stop)
        params = {**params, **kwargs, "stream": True}

        default_chunk_class = AIMessageChunk
        for chunk in self.client.create(messages=message_dicts, **params):
            default_chunk_class, chunk = self.__build_chunk(chunk, default_chunk_class)
            if chunk is None:
                continue

            if run_manager:
                logprobs = chunk.generation_info.get("logprobs") if chunk.generation_info else None
                run_manager.on_llm_new_token(chunk.text, chunk=chunk, logprobs=logprobs)
            yield chunk

    async def _astream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> AsyncIterator[ChatGenerationChunk]:
        message_dicts, params = self._create_message_dicts(messages, stop)
        params = {**params, **kwargs, "stream": True}

        default_chunk_class = AIMessageChunk
        async for chunk in await self.async_client.create(
            messages=message_dicts, **params
        ):
            default_chunk_class, chunk = self.__build_chunk(chunk, default_chunk_class)
            if chunk is None:
                continue
            if run_manager:
                logprobs = chunk.generation_info.get("logprobs") if chunk.generation_info else None
                await run_manager.on_llm_new_token(
                    token=chunk.text, chunk=chunk, logprobs=logprobs
                )
            yield chunk

    @staticmethod
    def __build_chunk(chunk, default_chunk_class):
        if not isinstance(chunk, dict):
            chunk = chunk.model_dump()
        if len(chunk["choices"]) == 0:
            return None, None
        choice = chunk["choices"][0]
        chunk = _convert_delta_to_message_chunk(
            choice["delta"], default_chunk_class
        )
        generation_info = {}
        if finish_reason := choice.get("finish_reason"):
            generation_info["finish_reason"] = finish_reason
        logprobs = choice.get("logprobs")
        if logprobs:
            generation_info["logprobs"] = logprobs
        if usage := choice.get("usage"): 
            print(choice.get("usage"))
            generation_info["token_usage"] = usage
        default_chunk_class = chunk.__class__
        return default_chunk_class, ChatGenerationChunk(
            message=chunk, generation_info=generation_info or None
        )

基于如上代码就能在流式中获取到Token的消耗。

注意:因为上面的token消耗是被设置到了Chunk的generation_info属性中,而设置stream=False的模型,token使用情况是放在ChatResult的llm_out属性中。

所以在使用时需要区分一下:

# llm 需要使用ChatOpenAIImpl构建
chain = LLMChain(prompt=prompt, llm=llm, verbose=True)
rs = chain.generate([args])

if chat_content.stream:
    # 从generation_info属性获取token
    token_usage = rs.generations[0][0].generation_info['token_usage']
else:
    #stream=False 从llm_output属性获取token
    token_usage = rs.llm_output['token_usage']

Callback(AsyncIteratorCallbackHandlerImpl)

    上面方法是解决未使用Callback的情况,通常情况下在构建大模型时会使用异步的回调(如:AsyncIteratorCallbackHandler),此时获取Token的消耗,需要在Callback类上实现相关的方法,也和上面方法类似需要继承AsyncIteratorCallbackHandler或者AsyncCallbackHandler

from __future__ import annotations

import asyncio
from typing import Any, AsyncIterator, Dict, List, Literal, Union, cast

from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult


class AsyncIteratorCallbackHandlerImpl(AsyncCallbackHandler):

    queue: asyncio.Queue[str]

    done: asyncio.Event
    
    token_usage:dict

    @property
    def always_verbose(self) -> bool:
        return True

    def __init__(self) -> None:
        self.queue = asyncio.Queue()
        self.done = asyncio.Event()

    async def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        self.done.clear()

    async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        # 记录Token消耗
        print(kwargs.get("chunk"))
        if chunk := kwargs.get("chunk"):
            if gi := chunk.generation_info:
                self.token_usage = gi.get("token_usage")

        if token is not None and token != "":
            self.queue.put_nowait(token)

    async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        self.done.set()

    async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
        self.done.set()

    async def aiter(self) -> AsyncIterator[str]:
        while not self.queue.empty() or not self.done.is_set():
            done, other = await asyncio.wait(
                [
                    asyncio.ensure_future(self.queue.get()),
                    asyncio.ensure_future(self.done.wait()),
                ],
                return_when=asyncio.FIRST_COMPLETED,
            )

            if other:
                other.pop().cancel()

            token_or_done = cast(Union[str, Literal[True]], done.pop().result())

            if token_or_done is True:
                break

            yield token_or_done

使用AsyncIteratorCallbackHandlerImpl时,在callback上通过token_usage属性即可得到对应的Token消耗量。

通过上述的方法,即可在Langchain的Stream模式下,使用generate方法生成LLM的对话内容时,获取到Token的使用。

注意:langchain中使用chain执行内容生成时,只有generate方法能获取到token json,像run和call方法是无法获取到token json,目前Openai 的流式接口已经不返回TOken的消耗,当是在月之暗面还是能获取到的

如有问题可在评论回复。

创作不易,点个赞呗。