Langchain流式-Token统计
标签:python/langchian
简介
在使用Langchain进行开发时,我们经常需要对Token的消耗进行监控和统计,以便于优化资源使用和评估API调用的成本效率。Langchain框架提供了丰富的功能和灵活的配置选项,但在某些情况下,特别是在使用ChatOpenAI的流式方法或者在创建模型时指定stream=True
参数,框架本身并不提供Token消耗量的具体数据。 这种情况下,我们可以借助OpenAI提供的Tokenizer库来对Token的消耗进行统计。通过Tokenizer库,我们可以精确地计算出每个请求所使用的Token数量,从而更好地管理和预测服务成本。 然而,对于不属于OpenAI的模型,使用Tokenizer库来统计Token消耗就变得不再适用。这种情况下,我们可能需要探索其他解决方案,比如实现自定义的Token统计逻辑,或者寻找第三方库来辅助完成这一任务。
版本
python 10
langchain = "~=0.1.x"
openai = "~=1.13.3"
langchain-openai = "~=0.1.1"
自定义类(ChatOpenAIImpl)
继承from langchain_openai import ChatOpenAI
重写_stream
和_astream
方法,在其中添加代码:
if usage := choice.get("usage"):
generation_info["token_usage"] = usage
完整代码如下:
from typing import List, Optional, Any, Iterator, AsyncIterator
from langchain_community.chat_models.openai import _convert_delta_to_message_chunk
from langchain_core.callbacks import CallbackManagerForLLMRun, AsyncCallbackManagerForLLMRun
from langchain_core.messages import BaseMessage, AIMessageChunk
from langchain_core.outputs import ChatGenerationChunk
from langchain_openai import ChatOpenAI
class ChatOpenAIImpl(ChatOpenAI):
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
default_chunk_class = AIMessageChunk
for chunk in self.client.create(messages=message_dicts, **params):
default_chunk_class, chunk = self.__build_chunk(chunk, default_chunk_class)
if chunk is None:
continue
if run_manager:
logprobs = chunk.generation_info.get("logprobs") if chunk.generation_info else None
run_manager.on_llm_new_token(chunk.text, chunk=chunk, logprobs=logprobs)
yield chunk
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
message_dicts, params = self._create_message_dicts(messages, stop)
params = {**params, **kwargs, "stream": True}
default_chunk_class = AIMessageChunk
async for chunk in await self.async_client.create(
messages=message_dicts, **params
):
default_chunk_class, chunk = self.__build_chunk(chunk, default_chunk_class)
if chunk is None:
continue
if run_manager:
logprobs = chunk.generation_info.get("logprobs") if chunk.generation_info else None
await run_manager.on_llm_new_token(
token=chunk.text, chunk=chunk, logprobs=logprobs
)
yield chunk
@staticmethod
def __build_chunk(chunk, default_chunk_class):
if not isinstance(chunk, dict):
chunk = chunk.model_dump()
if len(chunk["choices"]) == 0:
return None, None
choice = chunk["choices"][0]
chunk = _convert_delta_to_message_chunk(
choice["delta"], default_chunk_class
)
generation_info = {}
if finish_reason := choice.get("finish_reason"):
generation_info["finish_reason"] = finish_reason
logprobs = choice.get("logprobs")
if logprobs:
generation_info["logprobs"] = logprobs
if usage := choice.get("usage"):
print(choice.get("usage"))
generation_info["token_usage"] = usage
default_chunk_class = chunk.__class__
return default_chunk_class, ChatGenerationChunk(
message=chunk, generation_info=generation_info or None
)
基于如上代码就能在流式中获取到Token的消耗。
注意:因为上面的token消耗是被设置到了Chunk的generation_info属性中,而设置stream=False
的模型,token使用情况是放在ChatResult
的llm_out属性中。
所以在使用时需要区分一下:
# llm 需要使用ChatOpenAIImpl构建
chain = LLMChain(prompt=prompt, llm=llm, verbose=True)
rs = chain.generate([args])
if chat_content.stream:
# 从generation_info属性获取token
token_usage = rs.generations[0][0].generation_info['token_usage']
else:
#stream=False 从llm_output属性获取token
token_usage = rs.llm_output['token_usage']
Callback(AsyncIteratorCallbackHandlerImpl)
上面方法是解决未使用Callback的情况,通常情况下在构建大模型时会使用异步的回调(如:AsyncIteratorCallbackHandler
),此时获取Token的消耗,需要在Callback类上实现相关的方法,也和上面方法类似需要继承AsyncIteratorCallbackHandler
或者AsyncCallbackHandler
from __future__ import annotations
import asyncio
from typing import Any, AsyncIterator, Dict, List, Literal, Union, cast
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.outputs import LLMResult
class AsyncIteratorCallbackHandlerImpl(AsyncCallbackHandler):
queue: asyncio.Queue[str]
done: asyncio.Event
token_usage:dict
@property
def always_verbose(self) -> bool:
return True
def __init__(self) -> None:
self.queue = asyncio.Queue()
self.done = asyncio.Event()
async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
self.done.clear()
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
# 记录Token消耗
print(kwargs.get("chunk"))
if chunk := kwargs.get("chunk"):
if gi := chunk.generation_info:
self.token_usage = gi.get("token_usage")
if token is not None and token != "":
self.queue.put_nowait(token)
async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
self.done.set()
async def on_llm_error(self, error: BaseException, **kwargs: Any) -> None:
self.done.set()
async def aiter(self) -> AsyncIterator[str]:
while not self.queue.empty() or not self.done.is_set():
done, other = await asyncio.wait(
[
asyncio.ensure_future(self.queue.get()),
asyncio.ensure_future(self.done.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
if other:
other.pop().cancel()
token_or_done = cast(Union[str, Literal[True]], done.pop().result())
if token_or_done is True:
break
yield token_or_done
使用AsyncIteratorCallbackHandlerImpl
时,在callback上通过token_usage属性即可得到对应的Token消耗量。
通过上述的方法,即可在Langchain的Stream模式下,使用generate
方法生成LLM的对话内容时,获取到Token的使用。
注意:langchain中使用chain执行内容生成时,只有generate方法能获取到token json,像run和call方法是无法获取到token json,目前Openai 的流式接口已经不返回TOken的消耗,当是在月之暗面还是能获取到的
如有问题可在评论回复。