likes
comments
collection
share

Node Stream的几个应用场景和实践

作者站长头像
站长
· 阅读数 41

在大型复杂的数据应用当中,nodejs中的stream是一个非常重要的概念和技术。这个技术被应用到很多比较底层的应用场景,比如HTTP的服务器和客户端数据处理过程,网络操作,文件存取当中。作为一般的开发者,我们通常不关心其底层的实现和应用,但如果遇到一些比较特殊的场合(比如特大型的数据和文件处理),或者希望让自己的应用程序有更好的适应性,可以提供更好的性能同时资源占用更小,那么笔者认为,了解、掌握和熟悉stream技术,是非常必要的。

笔者在日常的工作和实践中,积累了一些使用stream技术的经验和体会,这里正好就有机会总结一下,在一些实践和场景的应用案例的基础上,结合自己的认知和理解,和大家分享一下使用stream技术的理解和体会。这些场景包括HTTP文件服务、文件上传、文件转换等等。下面分别详细分析和叙述。此处的案例都尽量从比较基础的角度来说明,所以一般不会涉及到第三方功能库,都是nodejs提供的原生功能和类型。

HTTP File Serve, HTTP文件服务

HTTP服务最早的主要功能,其实就是提供静态文件服务。笔者没有仔细研究过经典的静态文件Web服务器的实现方式,但基于HTTP协议的规范和定义,很容易可以合理的猜测其实现原理。就是基于请求路径和Web服务的配置信息,将这些路径解析和映射到本地文件系统的某个路径和文件。然后读取这个文件的内容,作为HTTP的响应内容输出到客户端。

考虑到这一实现的真实场景,开发者不能限定要提供服务的文件的类型、内容和大小,所以必须设计一个高效和通用的文件内容读取和响应转换机制,stream就是这样一个非常合适的技术。

下面,我们通过一个简单的实现代码片段,来直观的了解这一应用模式。

import http from "http";
import fs from "fs";
import { URL } from 'url';

// const http = require("http");
const ifile = new URL('.', import.meta.url).pathname +"linux-software-620x330.jpeg";

const webHandle = (req,res)=>{
    const rstream = fs.createReadStream(ifile.slice(1));

    // status code
    res.statusCode = 200;

    // default 
    res.setHeader('Content-Type', 'image/jpeg');

    rstream
    .on("data", chunk=> res.write(chunk))
    .on("end",()=>{ res.end() });
}

为了方便表述,这里的代码使用nodejs原生实现,但省略了createServer的部分,突出文件处理这一部分的内容。这些核心要点是:

  • 定义一个请求响应的处理方法(WebHandle),作为Web服务创建的回调函数,当Web服务器接收到请求的时候,就会调用这个方法
  • 这个方法有两个参数,分别代表请求对象和响应对象,业务操作基于请求对象中相关的参数,并使用响应对象发送响应信息
  • 在此处可以基于请求的路径,来设计Web请求的路由处理,此处简化为无论请求路径如何,都会响应一个文件的内容
  • 测试文件使用一个jpeg文件,逻辑上可以支持任意类型的文件和数据
  • 当接收网络请求时,不直接读取文件的全部内容,而是基于文件路径,创建一个读取流对象
  • 设置读取流对象的data方法,这个方法会在读取内容时,自动调用执行,并传入本次读取的内容(文件片段)
  • res对象以这个片段作为参数,执行write方法,其实就是传输给客户端
  • 程序会循环读取和处理内容,直到所有文件内容读取完成,这时会触发end方法,在这里可以做一些收尾工作,如执行res.end()结束http响应
  • 作为完整的HTTP规范,应当在实际响应输出之前,设置HTTP状态代码和头信息,包括声明当前内容的格式等内容

和直接将文件读取后,整体的发送到HTTP响应的方式相比,使用读取流操作,每次处理的数据量明显减小很多,并且可以有效控制,这样显然可以极大的降低这一操作过程中的系统资源占用,使服务器可以同时支撑更多的客户端应用,并大大降低服务器崩溃的风险。

HTTP Download,HTTP下载

上一章节是从服务器角度而言,使用stream来优化文件服务功能的基础方法。这一章节,我们来看看作为客户端,如何在下载文件的过程中,也能够从这一技术中获利。

Nodejs文档中的标准示例代码,包括我们看到的XMLHttpRequest,虽然没有明确的声明,但其底层和基础,就是一个steram的应用场景。下面笔者对这段代码进行了稍微改进,我们可以看得更清楚一点:


const req = http.request(options, (res) => {
  let data =[];
  res
  .on('data', (chunk) => data.push(chunk));
  .on('end', () => {
    console.log('Result:', Buffer.concat(data));
  });
});

req.on('error', (e) => {
  console.error(`problem with request: ${e.message}`);
});

我们可以看到,在默认的http协议中,响应的数据本来就是以数据流的方式,一小片一小片的传输过来的。所以在接受和处理的时候,就需要有对应的机制,即我们需要将这些数据积累下来,最后在响应结束时,再合并起来,以得到最后的结果。

对于一般的结构化对象和小型文本,这样处理是没有问题的。但如果对于文件下载类型的场景,这显然就不是特别合适了,因为文件可能会比较大,很容易就会占用很多内存资源,而且无法预测。这时我们可以考虑引入一个写入文件流,在数据片到达的时候,就直接写入流,这样就不需要最后再来处理,整个过程都是有约束可以控制的。

所以,上述代码可以改进成为下面的示例,来满足文件下载类型HTTP请求的需求。


const req = http.request(options, (res) => {  
  const writeStream = fs.createWriteStream(fname);
  
  res
  .on('data', chunk => writeStream.write(chunk));
  .on('end', () => {
    console.log('Result:', Buffer.concat(data));
    writeStream.close();
  });
});

HTTP Upload, HTTP文件上传

传统的HTTP文件上传是在浏览器器中,通过multipart和form技术实现的。但这部分的内容,不是本文讨论的重点。

我们这里讨论的方案,是以nodejs应用或者其他技术体系作为客户端应用,使用流方式读取文件,并且发起标准HTTP请求并传输二进制数据的模式。相关的参考示例代码如下:

    // create and send block stream
    const readStream = fs.createReadStream(filePath, { start, end });
    
    // block id in header
    let pstr,opt = {
      method: 'POST',
      body: readStream
    };

    fetch(url, opt)
    .then(res=> res.json())
    .then(data=>{
        console.log("uploaded");
    })
    .catch(err=>{
        console.log("Block Send Error:", err, " Waiting Recover....");
        COUNTDOWN = ICOUNT;
    })

打开文件读取流的方式,是nodejs的标准操作。这里的要点,是可以将这个流对象,作为body参数,传到fetch方法当中,它会将其作为流信息进行发送。如果客户端软件不支持这种直接将读取流封装成为POST BODY的方式,那也应当可以通过实现读取流onData事件,在其中调用request对象的write方法,稍微繁琐一点,但应当也能达到相同的目的和效果。

作为一个完整的技术方案一个部分,上面的操作只是在客户端方面的。而作为对应和配套,在服务端,同样可以使用stream技术,来处理请求和接收到的数据。简单的示例代码如下,我们可以看到其实是和HTTP下载的处理方式基本上是一样的。


// recevie block from request 
const uploadHandle = async (req, res)=>{
  let fname = __dirname + "/temp.data";
  const writeStream = fs.createWriteStream(fname);
  
  res
  .on('data', chunk => writeStream.write(chunk));
  .on('end', () => {
    writeStream.close();
  });

};

HTTP Proxy, HTTP代理

开发者在Web应用开发和测试工作中,经常会遇到一些由于如网络拓扑、跨域策略或者安全限制,不方便直接访问Web应用的情况,这时可以使用代理服务器的方式,来临时进行处理。如果自己编写HTTP服务作为一个代理服务器,在传输和中继大量数据的时候,使用流显然是最高效的方式。下面笔者编写了一个非常精简的测试代码,来说明这个问题:


const 
http = require("http"),
host = "127.0.0.1",
port = 8051;

// proxy 
const proxyHandle = (reqp, resp)=>{
    http.get(`http://${host}:${port+1}`, (res) => res.pipe(resp));
}

// backend create, than start proxy
http
.createServer((req,res)=>{  res.end("OK From Backend"); })
.listen({host, port: port + 1},()=> {
    console.log("Back End Start On", port+1);

    http
    .createServer(proxyHandle)
    .listen({host, port},()=> console.log("Proxy Start On", port));
});

这段代码创建了两个Web服务,一个是真正的后端业务服务(端口是8052),另一个是前端代理服务,给客户端提供服务(端口是8051),对于客户端而言,Web应用就是运行在前端的8501端口上,而后端服务是隐藏的。当客户端发起请求时,代理不进行真正的业务处理,而是直接导向到后端业务服务,并使用pipe方法,将后端的响应输出到前端,实际上就是将后端服务响应的读取流,连接到代理服务的写入流,从而完成了请求响应和数据流量转发的工作。

当然,以上的代码,主要是为了说明在编写代理服务应用的过程中,我们可以非常方便的使用nodejs提供的数据流特性和功能,并真正生产级别的代理服务,还需要考虑很多其他的问题,如考虑不同的HTTP方法、请求头、状态处理、错误处理等等,所以以上代码仅用作参考。

File Read Line, 读取和处理行

前面我们其实已经探讨了使用流技术来处理文件读写的操作。在实际的应用场景中,除了简单的处理文件数据流之外,还有一个比较常见的模式是处理文本行。典型的就是从一个CSV文件,或者其他具有某种格式规范的文本文件中,读取数据,然后简单处理后,写入数据库的操作。

这些文本文件一般是按照行来进行组织的,一行就是一条记录的内容,如果没有流处理或者行处理的方式的话,我们可能需要将文件先整体加载,然后使用行分隔符将内容转换成为数组,这样,有遇到了以前整体数据处理资源占用和不可控制的问题。所以,整体思路还是应该使用流处理的方式,只不过这里的数据片的方式,变成了明确的一行而已。

nodejs中,提供了相关的处理方法readline,笔者猜想,其技术基础还是基于流处理,只不过增加了使用换行符作为数据切片和处理的依据,就是在抽象的流数据处理模式上,增加了“文本换行”这一常用的业务逻辑而已。本质上仍然属于数据流技术的范畴。

关于readline,我们来看看nodejs的官方示例:


const fs = require('node:fs');
const readline = require('node:readline');

async function processLineByLine() {
  const fileStream = fs.createReadStream('input.txt');

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity,
  });
  // Note: we use the crlfDelay option to recognize all instances of CR LF
  // ('\r\n') in input.txt as a single line break.

  rl.on('line', (line) => {
     console.log(`Line from file: ${line}`);
  }); 

}

processLineByLine();

我们可以看到,示例中先创建了一个读取流,然后这个流就可以作为readline的接口参数,另一个参数是可选的行分隔符,来创建一个realine实例。这个实例,可以处理标准的文件读取流,转换成为一个行的流。随后,就像使用标准流一样,可以通过监听和实现line事件,来实现每一行内容的操作。从而我们可以看到readline就是对应readstream的一个更高级别的封装,来更好的适应实际业务需求的技术实现。

File Merge,文件合并

这里简单重复一下。需求就是有一个文件名称的列表和对应的本地文件,使用nodejs和stream技术编程,来快速高效的将这些文件按顺序合并成为一个单一的文件。

相关核心参考代码如下:


// write file stream
const wstream = fs.createWriteStream(fileName);

for (const block of blocks) {
    await new Promise((rv,rj)=>{
        const rstream = fs.createReadStream(block);
        console.log("merge:", block);

        rstream
        .on("data", chunk=> wstream.write(chunk))
        .on("end", ()=>{  rv(); rstream.close(); });
    });
};
// close stream
wstream.emit("end");

这里的核心概念和方法就是,默认情况下,readStream是一个异步操作,需要使用promise对这个操作进行封装,将多个异步操作转换成为同步的流读取操作。然后所有的读取流产生的数据片段,都会写入同一个写入流当中。这样就实现了串行化的读取流和流写入操作方式。

小结

本文以各种可能涉及到的应用需求和场景为例,探讨了nodejs stream技术的各种使用的方式,和相关的操作过程以及示例代码,来帮助读者能够更直观的体会和理解stream这种技术,并帮助他们在各种业务场景中合理的进行运用。

转载自:https://juejin.cn/post/7352333464241668150
评论
请登录