什么是流式传输 ?
当下 chatGPT 是最热门的一款型语言模型,可以用于自然语言处理任务,如对话生成、文本摘要、机器翻译等。ChatGPT的特点是可以根据输入文本动态生成输出,因此采用流式传输的方式,一边生成一边输出结果,无需等待结果的生成。
什么是流式传输?
流式传输是一种处理数据的有效方式,它可以实现高效的数据处理和传输,特别适用于大型数据或网络通信场景。
在流式传输中,数据可以被视为连续的数据流,而不是一个完整的数据集,在不等待整个数据集加载完毕的情况下可以对数据进行处理或传输。每个数据块都可以立即进行处理,并且在处理一个数据块时,不需要等待前面或后面的数据块。
将数据分成小块并逐个处理,从而实现较低的内存消耗和更快的处理速度,且无需一次性加载整个数据集到内存中。
流式传输的使用场景:
-
大型数据处理:当处理大型数据集时,流式传输可以避免将整个数据集加载到内存中,从而减少内存占用并提高性能。
-
网络通信:在网络通信中,流式传输可以逐个块地发送和接收数据,从而提供更高的响应速度和更好的带宽利用率。
-
实时数据处理:对于实时数据流,流式传输可以立即处理新到达的数据块,并实时输出结果,无需等待数据的完整响应,例如聊天应用程序、股票报价等。
-
文件传输:在文件传输中,流式传输可以逐个块地读取和写入文件,而不需要一次性加载整个文件到内存中,从而适用于处理大型文件。
流式传输的优点:
- 内存效率:流式传输允许逐个块地处理数据,无需一次性加载整个数据集到内存中。这对于处理大型文件或网络通信非常有用。
- 响应速度:数据块在流式传输期间可以即时处理,而不需要等待整个数据集加载完毕。这使得能够更快地响应请求或处理数据。
- 可扩展性:由于流式传输以块的形式处理数据,因此可以轻松地处理任意大小的数据。无论数据有多大,都不会受到内存限制的影响。
node 中的流式传输
在 node.js 中,流式传输是一种处理数据的有效方式,它可以实现高效的数据处理和传输,特别适用于大型数据或网络通信场景。通过使用流(Stream),你可以逐个块地处理数据,而无需一次性加载整个数据集到内存中。
node 为我们提供了四种类型的流:
- 可读流 Readable Stream:用于从数据源(如文件、接口请求等)读取数据。
- 可写流 Writable Stream:用于向目标(如文件、网络响应等)写入数据。
- 双工流 Duplex Stream:既可以读取数据,也可以写入数据。
- 转换流 Transform:可以在读取和写入数据时对数据进行转换或修改。
这些流可以通过使用 node.js 内置的模块(如 fs、http、stream 等)进行创建和操作。
在这些流对象中,数据是通过事件驱动的方式进行处理的,所有的流对象都用 on绑定事件,并触发。当数据源产生新的数据时,流对象会触发相应的事件,并执行绑定在该事件上的回调函数来处理数据。
- 当流对象接收到新的数据时 会触发 data 事件
- 当数据源没有数据时触发 end 事件,此时表示数据流已经结束
- 当在数据流中发生错误时触发 error
- 当流对象被关闭时会触发 close 事件
- 如果想要控制读取的速度,可以用 pause 事件暂停,而 resume 事件可以用于恢复数据的读取
流式文件读写
const fs = require('fs');
// 创建可读流读取文件
const readableStream = fs.createReadStream('input.txt');
// 创建可写流将内容写入文件
const writableStream = fs.createWriteStream('output.txt');
// 监听可读流的 'data' 事件,读取数据块并写入到可写流中
readableStream.on('data', (chunk) => {
writableStream.write(chunk);
});
// 监听可读流的 'end' 事件,表示文件读取完成
readableStream.on('end', () => {
writableStream.end();
});
通过监听可读流的 'data' 事件,可以获取到每个数据块(chunk),然后将其写入到可写流中。
这样,文件的读取和写入操作就是以数据块为单位逐个进行的,而不是一次性加载整个文件
在实际的应用中,你可以根据需求使用流来进行数据处理、转换、过滤等操作。你还可以使用流来处理网络通信、处理大型文件、实现数据流水线等场景,以提高性能和效率。
流不仅仅支持文件的读写,还可以通过流式传输完成接口请求数据的传输。
请求的流式响应
当我们在使用 node 作为服务器时,也可以通过流式传输完成接口的响应,这种方式被称为流式响应或流式输出。
使用流式响应可以提供更高的响应速度和更低的内存占用,在请求接口时,服务端不会一次性将完整的内容发送给客户端,而是将数据分块生成,并逐个块地发送。
以下是一个 node 示例,展示了如何使用流式响应来输出数据:
import express from "express";
const app = express();
// node 服务端接口路径
app.post("/chatStream", async (request, response) => {
console.log(request.body)
// 设置返回的响应头为流式传输
response.setHeader('Content-type', 'application/octet-stream');
const data = '你好啊';
const interval = setInterval(() => {
response.write(data);
}, 3000);
setTimeout(() => {
clearInterval(interval);
response.write('有什么可以帮你的吗');
response.end();
}, 5000);
// 关闭输出流
response.end();
});
app.listen(port, () => {
console.log(`listening on port ${port}`);
});
当客户端接收到流式响应时,它可以逐个块地处理数据,而不需要等待整个响应内容到达。
客户端获解析数据流
const response = await fetch("http://localhost:8000/chatStream", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "*,application/json",
},
body: JSON.stringify({
chats,
}),
});
// 创建了一个 TextDecoder 对象,主要用于将二进制数据解码为 UTF-8 格式(中文)字符串
const encode = new TextDecoder("utf-8");
// 读取响应数据流
const reader = response.body?.getReader() as any;
let answer = "";
const flag = true;
while (flag) {
const { done, value } = await reader.read();
console.log(value);
if (done) {
setCursor(false);
break;
}
// 获取响应流中的数据,并将其解码为字符串
const text = encode.decode(value);
// 凭借每次获取到的字符串
answer += text;
console.log(answer);
msgs[msgs.length - 1].content = answer;
console.log(msgs);
setChats(msgs);
}
观察接口的返回值
response 获取到的二进制数据流,我们需要通过解码将流式数据转为中文字符串用于页面的展示
response.write
在 node 中,response.write 和 response.end 是用于处理 HTTP 响应的方法。
-
response.write(chunk, [encoding], [callback]) 用于向响应流中写入数据块。
- chunk:要写入的数据块,可以是字符串或 Buffer 对象。
- encoding(可选):指定 chunk 的字符编码,默认为 UTF-8。
- callback(可选):在写入完成后调用的回调函数。
response.write 方法可以被多次调用,每次调用写入一个数据块。如果指定了字符编码,chunk 参数必须是字符串,否则可以是 Buffer 对象。
-
response.end([data], [encoding], [callback]) 用于结束响应流并发送响应给客户端。
- data(可选):要作为最后一个数据块发送的内容,可以是字符串或 Buffer 对象。
- encoding(可选):指定 data 的字符编码,默认为 UTF-8。
- callback(可选):在响应结束后调用的回调函数。
response.end 方法会将响应标记为已完成,并将其发送给客户端。如果提供了 data 参数,则会将其作为最后一个数据块发送。结束响应后,不能再调用 response.write。
这两个方法通常在处理 HTTP 请求时用于向客户端发送响应。
使用 response.write 可以逐个数据块地发送响应内容,而 response.end 则为结束响应并发送最后的数据块。
const http = require('http');
const server = http.createServer((request, response) => {
response.statusCode = 200;
response.setHeader('Content-Type', 'text/plain');
response.write('Hello, ');
response.write('World!\n');
response.end('End of response.');
});
server.listen(8000, () => {
console.log('Server is running on port 8000');
});
当有请求进入创建的 HTTP 服务器时,在回调函数中处理请求和响应。
通过调用 response.write,可以将数据块逐个写入响应流,而 response.end 则用于结束响应并发送最后的数据块。
请注意,在实际的应用中,你可能会根据具体的需求和业务逻辑进行更复杂的响应处理。以上示例仅用于演示基本的使用方法。
pipe 实现数据流传输
在 node 中,pipe 是一个用于流数据处理的方法,它能够将可读流的数据输出到可写流,从而实现将数据从一个流传输到另一个流的功能。
通过使用 pipe 方法,可以简化数据传输的代码逻辑,不需要手动处理数据的读取和写入。
pipe 方法会自动处理数据的读取和写入,并且在适当的时机进行流控制,确保数据以适当的速率传输,避免内存溢出或数据丢失的问题。
pipe 的使用 :
const fs = require('fs');
const readableStream = fs.createReadStream('input.txt'); // 创建可读流
const writableStream = fs.createWriteStream('output.txt'); // 创建可写流
readableStream.pipe(writableStream); // 将可读流中的数据传输到可写流中
通过调用 pipe 能够自动处理数据的读取和写入,
同时 pipe 方法在 node.js 中还可以用于连接多个流,以便将数据从一个流传输到另一个流,形成数据处理管道。这样可以将数据处理的逻辑分解为多个模块,使代码更加模块化并易于后续维护。
总结
流式传输通过分块处理或传输数据,提供了更高的性能、更低的内存占用以及更好的响应速度。
流式传输不仅限于数据的输入和输出,还可以应用于数据转换、数据过滤、数据压缩等各种场景,以提供更灵活和高效的数据处理能力。
流式传输我就先介绍到这里,而这个知识点在后续也将会有大用途哦 😜 !!!
参考
转载自:https://juejin.cn/post/7241514309716803644