likes
comments
collection
share

Next.js 处理流数据实践

作者站长头像
站长
· 阅读数 29

笔者遇到的问题是如何在 pages router 下转发一个AI返回流数据,经过实践得出此篇,

Node.js stream 是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 Node.js 中的 stream 是内置对象,使您能够以小块的形式增量地读取或写入源,而不是一次性将整个数据加载到内存中。

本文就不多介绍,为什么使用 stream 和 node.js stream 相关概念了。主要是在 Next.js(Pages Router 和 App Router)中实践处理流数据。

背景

  • Node.js stream 如何工作
  • Next.js Pages Router,如何处理流数据
  • Next.js App Router,如何处理流数据
  • Next.js 如何使用 SSE(Server-Sent Events
  • 如何使用 Fetch API 处理 stream
  • 以 OpenAI API 为例子,因为它就是通过 SSE 返回的流数据

使用 Fetch API 处理流数据

使用 Fetch API 发出请求时,返回的 Response 对象包含一个 body 属性,这个属性是一个 ReadableStreamReadableStream 表示一个可读的数据流,你可以使用它的 getReader() 方法来获取一个 reader,然后使用这个 reader 的 read() 方法来读取数据。

reader.read() 返回一个 Promise,这个 Promise 的 resolve 值是一个对象,包含两个属性:value 和 donevalue 是读取到的数据块,done 是一个布尔值,如果为 true 则表示数据已经读取完毕。

以下,就是读取二进制数据之后进行解码的一个例子。

const decoder = new TextDecoder();

const response = await fetch('/api/stream');
const reader = response.body.getReader();

let done = false;

while (!done) {
  const { value, done: doneReading } = await reader.read();
  done = doneReading;
  const data = JSON.parse(decoder.decode(value));
  // Do something with data
} 

什么是 SSE?

Server-Sent Events(SSE)是一种用于实现服务器向客户端实时推送数据的Web技术。与传统的轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。

SSE基于HTTP协议,允许服务器将数据以事件流(Event Stream)的形式发送给客户端。客户端通过建立持久的HTTP连接,并监听事件流,可以实时接收服务器推送的数据。 SSE的主要特点包括:

  1. 简单易用:SSE使用基于文本的数据格式,如纯文本、JSON等,使得数据的发送和解析都相对简单。
  2. 单向通信:SSE支持服务器向客户端的单向通信,服务器可以主动推送数据给客户端,而客户端只能接收数据。
  3. 实时性:SSE建立长时间的连接,使得服务器可以实时地将数据推送给客户端,而无需客户端频繁地发起请求。

Next.js Pages Router 处理流数据

获取到 openai 接口之后进行转发,这里转发的 FastGPT api 和 openai 是一样的,几个关键点。

  1. 首先是使用 fetch 获取 API 返回的流数据
  2. 拿到流数据之后,既然是转发,那么自己 api 也就要通过 sse 返回数据
  3. 需要用 res.setHeader()、res.end()、res.write()

代码用例如下,/pages/api/chat

import { jsonRes } from '@/services/backend/response';
import type { NextApiRequest, NextApiResponse } from 'next';
const fastgpt_url = process.env.FASTGPT_API_URL;
const fastgpt_key = process.env.FASTGPT_API_KEY;

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
  try {
    if (!fastgpt_url || !fastgpt_key) {
      return jsonRes(res, { code: 500, data: 'miss fastgppt key' });
    }

    const data = {
      chatId: 'orderId',
      stream: true,
      detail: false,
      messages: [{ role: 'user', content: '一句话简单介绍 next.js' }]
    };

    res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');
    res.setHeader('Cache-Control', 'no-cache, no-transform');

    const response = await fetch(fastgpt_url, {
      method: 'POST',
      headers: {
        Authorization: `Bearer ${fastgpt_key}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify(data)
    });

    const reader = response.body?.getReader();
    if (!reader) return res.end();

    req.socket.on('close', () => {
      res.end();
    });

    while (true) {
      const { value, done } = await reader.read();
      if (done) {
        res.end();
      }
      console.log(value);
      res.write(value);
    }
  } catch (error) {
    res.end();
  }
}

上面写了一个打印 console.log(value); 效果如下图。

Next.js 处理流数据实践

另一个例子,同样的道理。

import { NextApiRequest, NextApiResponse } from 'next';
import EventSource from "eventsource";

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
    res.writeHead(200, {
        Connection: 'keep-alive',
        'Content-Encoding': 'none',
        'Cache-Control': 'no-cache, no-transform',
        'Content-Type': 'text/event-stream',
    });

    const evtSource = new EventSource("[http://localhost:8000/instances/stream](http://localhost:8000/instances/stream)")
    evtSource.onmessage = (e: MessageEvent<any>) => {
        res.write(`event: message\ndata: ${e.data}\n\n`)
    }

    evtSource.onerror = (e: Event) => {
        console.error(e)
        evtSource.close()
    }

    req.socket.on("close", () => {
        evtSource.close()
        res.end()
    })
}

Next.js App Router 处理流数据

很关键的一个不同点如下

// pages router
export default function handler(req: NextApiRequest, res: NextApiResponse) {
  // ...
}
  • app router 的 req 和 res
    • 路由处理程序允许您使用 Web 请求和响应 API 为给定路由创建自定义请求处理程序。
    • Request and Response
// app router
export async function GET() {
  const res = await fetch('https://data.mongodb-api.com/...', {
    headers: {
      'Content-Type': 'application/json',
      'API-Key': process.env.DATA_API_KEY,
    },
  })
  const data = await res.json()
 
  return Response.json({ data })
}

所以,这种情况下处理就比较简单,获取到 openai 的流数据之后,可通过 toReadableStream() 返回一个可读流完成,参考代码如下

import { OpenAI } from 'openai';

const openai = new OpenAI({
  apiKey: process.env.OPENAI_API_KEY,
  baseURL: process.env.OPENAI_BASE_URL
});

export async function POST(req:Request) {
  const { messages } = await req.json();
  const response = await openai.chat.completions.create({
    model: 'gpt-3.5-turbo',
    stream: true,
    messages,
  });

  const stream = response.toReadableStream();

  return new Response(stream);
}

同样的也可以手动实现迭代器,完成这个操作,可查看参考链接第一条。

参考链接

转载自:https://juejin.cn/post/7366071543405756452
评论
请登录