likes
comments
collection
share

什么是流式传输 ?

作者站长头像
站长
· 阅读数 21

当下 chatGPT 是最热门的一款型语言模型,可以用于自然语言处理任务,如对话生成、文本摘要、机器翻译等。ChatGPT的特点是可以根据输入文本动态生成输出,因此采用流式传输的方式,一边生成一边输出结果,无需等待结果的生成。

什么是流式传输?

流式传输是一种处理数据的有效方式,它可以实现高效的数据处理和传输,特别适用于大型数据或网络通信场景。

在流式传输中,数据可以被视为连续的数据流,而不是一个完整的数据集,在不等待整个数据集加载完毕的情况下可以对数据进行处理或传输。每个数据块都可以立即进行处理,并且在处理一个数据块时,不需要等待前面或后面的数据块。

将数据分成小块并逐个处理,从而实现较低的内存消耗和更快的处理速度,且无需一次性加载整个数据集到内存中。

流式传输的使用场景:

  1. 大型数据处理:当处理大型数据集时,流式传输可以避免将整个数据集加载到内存中,从而减少内存占用并提高性能。

  2. 网络通信:在网络通信中,流式传输可以逐个块地发送和接收数据,从而提供更高的响应速度和更好的带宽利用率。

  3. 实时数据处理:对于实时数据流,流式传输可以立即处理新到达的数据块,并实时输出结果,无需等待数据的完整响应,例如聊天应用程序、股票报价等。

  4. 文件传输:在文件传输中,流式传输可以逐个块地读取和写入文件,而不需要一次性加载整个文件到内存中,从而适用于处理大型文件。

流式传输的优点:

  • 内存效率:流式传输允许逐个块地处理数据,无需一次性加载整个数据集到内存中。这对于处理大型文件或网络通信非常有用。
  • 响应速度:数据块在流式传输期间可以即时处理,而不需要等待整个数据集加载完毕。这使得能够更快地响应请求或处理数据。
  • 可扩展性:由于流式传输以块的形式处理数据,因此可以轻松地处理任意大小的数据。无论数据有多大,都不会受到内存限制的影响。

node 中的流式传输

在 node.js 中,流式传输是一种处理数据的有效方式,它可以实现高效的数据处理和传输,特别适用于大型数据或网络通信场景。通过使用流(Stream),你可以逐个块地处理数据,而无需一次性加载整个数据集到内存中。

node 为我们提供了四种类型的流:

  1. 可读流 Readable Stream:用于从数据源(如文件、接口请求等)读取数据。
  2. 可写流 Writable Stream:用于向目标(如文件、网络响应等)写入数据。
  3. 双工流 Duplex Stream:既可以读取数据,也可以写入数据。
  4. 转换流 Transform:可以在读取和写入数据时对数据进行转换或修改。

这些流可以通过使用 node.js 内置的模块(如 fs、http、stream 等)进行创建和操作。

在这些流对象中,数据是通过事件驱动的方式进行处理的,所有的流对象都用 on绑定事件,并触发。当数据源产生新的数据时,流对象会触发相应的事件,并执行绑定在该事件上的回调函数来处理数据。

  • 当流对象接收到新的数据时 会触发 data 事件
  • 当数据源没有数据时触发 end 事件,此时表示数据流已经结束
  • 当在数据流中发生错误时触发 error
  • 当流对象被关闭时会触发 close 事件
  • 如果想要控制读取的速度,可以用 pause 事件暂停,而 resume 事件可以用于恢复数据的读取

流式文件读写

const fs = require('fs');

// 创建可读流读取文件
const readableStream = fs.createReadStream('input.txt');

// 创建可写流将内容写入文件
const writableStream = fs.createWriteStream('output.txt');

// 监听可读流的 'data' 事件,读取数据块并写入到可写流中
readableStream.on('data', (chunk) => {
  writableStream.write(chunk);
});

// 监听可读流的 'end' 事件,表示文件读取完成
readableStream.on('end', () => {
  writableStream.end();
});

通过监听可读流的 'data' 事件,可以获取到每个数据块(chunk),然后将其写入到可写流中。

这样,文件的读取和写入操作就是以数据块为单位逐个进行的,而不是一次性加载整个文件

在实际的应用中,你可以根据需求使用流来进行数据处理、转换、过滤等操作。你还可以使用流来处理网络通信、处理大型文件、实现数据流水线等场景,以提高性能和效率。

流不仅仅支持文件的读写,还可以通过流式传输完成接口请求数据的传输。

请求的流式响应

当我们在使用 node 作为服务器时,也可以通过流式传输完成接口的响应,这种方式被称为流式响应或流式输出。

使用流式响应可以提供更高的响应速度和更低的内存占用,在请求接口时,服务端不会一次性将完整的内容发送给客户端,而是将数据分块生成,并逐个块地发送。

以下是一个 node 示例,展示了如何使用流式响应来输出数据:

import express from "express";
const app = express();
//  node 服务端接口路径
app.post("/chatStream", async (request, response) => {
  console.log(request.body)
  // 设置返回的响应头为流式传输
  response.setHeader('Content-type', 'application/octet-stream');
  
  const data = '你好啊';
  const interval = setInterval(() => {
    response.write(data);
  }, 3000);

  setTimeout(() => {
    clearInterval(interval);
    response.write('有什么可以帮你的吗');
    response.end();
  }, 5000);

  // 关闭输出流
  response.end();
  
});

app.listen(port, () => {
  console.log(`listening on port ${port}`);
});

当客户端接收到流式响应时,它可以逐个块地处理数据,而不需要等待整个响应内容到达。

客户端获解析数据流

const response = await fetch("http://localhost:8000/chatStream", {
  method: "POST",
  headers: {
    "Content-Type": "application/json",
    Accept: "*,application/json",
  },
  body: JSON.stringify({
    chats,
  }),
});
// 创建了一个 TextDecoder 对象,主要用于将二进制数据解码为 UTF-8 格式(中文)字符串
const encode = new TextDecoder("utf-8");
// 读取响应数据流
const reader = response.body?.getReader() as any;
let answer = "";
const flag = true;
while (flag) {
  const { done, value } = await reader.read();
  console.log(value);
  if (done) {
    setCursor(false);
    break;
  }
  // 获取响应流中的数据,并将其解码为字符串
  const text = encode.decode(value);
  // 凭借每次获取到的字符串
  answer += text;
  console.log(answer);
  msgs[msgs.length - 1].content = answer;
  console.log(msgs);
  setChats(msgs);
}

观察接口的返回值

什么是流式传输 ?

response 获取到的二进制数据流,我们需要通过解码将流式数据转为中文字符串用于页面的展示

什么是流式传输 ?

response.write

在 node 中,response.write 和 response.end 是用于处理 HTTP 响应的方法。

  1. response.write(chunk, [encoding], [callback]) 用于向响应流中写入数据块。

    • chunk:要写入的数据块,可以是字符串或 Buffer 对象。
    • encoding(可选):指定 chunk 的字符编码,默认为 UTF-8。
    • callback(可选):在写入完成后调用的回调函数。

response.write 方法可以被多次调用,每次调用写入一个数据块。如果指定了字符编码,chunk 参数必须是字符串,否则可以是 Buffer 对象。

  1. response.end([data], [encoding], [callback]) 用于结束响应流并发送响应给客户端。

    • data(可选):要作为最后一个数据块发送的内容,可以是字符串或 Buffer 对象。
    • encoding(可选):指定 data 的字符编码,默认为 UTF-8。
    • callback(可选):在响应结束后调用的回调函数。

response.end 方法会将响应标记为已完成,并将其发送给客户端。如果提供了 data 参数,则会将其作为最后一个数据块发送。结束响应后,不能再调用 response.write。

这两个方法通常在处理 HTTP 请求时用于向客户端发送响应。

使用 response.write 可以逐个数据块地发送响应内容,而 response.end 则为结束响应并发送最后的数据块。

const http = require('http');

const server = http.createServer((request, response) => {
  response.statusCode = 200;
  response.setHeader('Content-Type', 'text/plain');

  response.write('Hello, ');
  response.write('World!\n');

  response.end('End of response.');
});

server.listen(8000, () => {
  console.log('Server is running on port 8000');
});

当有请求进入创建的 HTTP 服务器时,在回调函数中处理请求和响应。

通过调用 response.write,可以将数据块逐个写入响应流,而 response.end 则用于结束响应并发送最后的数据块。

请注意,在实际的应用中,你可能会根据具体的需求和业务逻辑进行更复杂的响应处理。以上示例仅用于演示基本的使用方法。

pipe 实现数据流传输

在 node 中,pipe 是一个用于流数据处理的方法,它能够将可读流的数据输出到可写流,从而实现将数据从一个流传输到另一个流的功能。

通过使用 pipe 方法,可以简化数据传输的代码逻辑,不需要手动处理数据的读取和写入。

pipe 方法会自动处理数据的读取和写入,并且在适当的时机进行流控制,确保数据以适当的速率传输,避免内存溢出或数据丢失的问题。

pipe 的使用 :

const fs = require('fs');

const readableStream = fs.createReadStream('input.txt'); // 创建可读流
const writableStream = fs.createWriteStream('output.txt'); // 创建可写流

readableStream.pipe(writableStream); // 将可读流中的数据传输到可写流中

通过调用 pipe 能够自动处理数据的读取和写入,

同时 pipe 方法在 node.js 中还可以用于连接多个流,以便将数据从一个流传输到另一个流,形成数据处理管道。这样可以将数据处理的逻辑分解为多个模块,使代码更加模块化并易于后续维护。

总结

流式传输通过分块处理或传输数据,提供了更高的性能、更低的内存占用以及更好的响应速度。

流式传输不仅限于数据的输入和输出,还可以应用于数据转换、数据过滤、数据压缩等各种场景,以提供更灵活和高效的数据处理能力。

流式传输我就先介绍到这里,而这个知识点在后续也将会有大用途哦 😜 !!!

参考

  1. 模拟ChatGPT的流式输出
  2. node 官方文档