Node.js Design Pattern uses streams for encoding

  javascript, node.js

This series of articles is《Node.js Design Patterns Second Edition》The translation of the original text and the reading notes were serialized and updated in GitHub.Synchronized translation links.

Welcome to pay attention to my column, and subsequent blog posts will be synchronized in the column:

Coding with Streams

StreamsYesNode.jsOne of the most important components and patterns. There is an adage in the community that “stream all the things” (stem is all), “which alone is enough to describe stream inNode.jsThe position in the.Dominic TarrAs aNode.jsThe largest contributor to the community, which defines flow asNode.jsThe best and most difficult concept to understand.

MakeNode.jsTheStreamsThere are other reasons for this attraction. In addition,StreamsIt is not only related to technical characteristics such as performance or efficiency, but also more important is their elegance and their relationship withNode.jsThe perfect way to fit the design concept of.

In this chapter, you will learn the following:

  • StreamsForNode.jsThe importance of.
  • How to Create and UseStreams.
  • StreamsAs a programming paradigm, not only forI/OIn terms of its application and powerful functions under various application scenarios.
  • Pipeline mode and connection in different configurationsStreams.

The importance of discovering Streams

In event-based platforms (e.g.Node.js) to handleI / OThe most effective method is real-time processing. Once there is input information, it will be processed immediately. Once there is output result, it will also output feedback immediately.

In this section, we will first introduceNode.jsTheStreamsAnd its advantages. Keep in mind that this is only an overview, as how to use and combine them will be described in detail later in this chapter.Streams.

Comparison between Streams and Buffer

Almost all the asynchronous API we have seen in this book are usedBufferMode. For input operations,BufferThe schema collects all data from the resource into theBufferIn the district; Once the entire resource is read, the result is passed to the callback function. The following figure shows a real example of this example:

As we can see from the above picture, int1At that moment, some data is received from the resource and saved to the buffer. Int2At that moment, the last piece of data is received into another data block to complete the reading operation. At this moment, the contents of the entire buffer are sent to the consumer.

On the other hand,StreamsAllows you to process data as soon as it arrives. As shown in the following figure:

This picture showsStreamsHow to receive each new data block from the resource and provide it to the consumer immediately, the consumer does not have to wait for all the data collected in the buffer before processing each data block.

But what is the difference between the two methods? We can summarize them into two points:

  • Space efficiency
  • time efficiency

In addition,Node.jsTheStreamsHas another important advantage:Composability. Now let’s look at how these attributes affect the way we design and write applications.

Space efficiency

First of all,StreamsAllow us to do things that seem impossible by buffering data and processing them at once. For example, consider that we must read a very large file, say hundredsMBEven thousandsMB. Obviously, the return is large when waiting for a full read of the file.BufferTheAPINot a good idea. Imagine if we read some large files concurrently, our application will easily run out of memory. In addition,V8hit the targetBufferCannot be greater than0x3FFFFFFFBytes (less than1GB)。 Therefore, we may run into a wall before we run out of physical memory.

Use Buffered’s API to compress files

To give a specific example, let’s consider a simple command line interface (CLI) that uses theGzipFormat compressed file. UseBufferedTheAPI, such applications inNode.jsThis is roughly how it is written in (exception handling has been omitted for the sake of brevity):

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, err => {
      console.log('File successfully compressed');
    });
  });
});

Now, we can try to put the previous code in a program calledgzip.js, and then execute the following command:

node gzip <path to file>

If we choose a file that is large enough, say greater than1GB, we will receive an error message stating that the file we want to read is larger than the maximum allowable buffer size, as shown below:

RangeError: File size is greater than possible Buffer:0x3FFFFFFF

In the above example, no large file was found, but the reading rate for large files was much slower.

As we expected, usingBufferIt is obviously wrong to read large files.

Use Streams to compress files

We must repair ourGzipThe simplest way to apply an application and make it handle large files is to use theStreamsTheAPI. Let’s see how this can be achieved. Let’s replace the contents of the newly created module with the following code:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];
fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('File successfully compressed'));

“Is it?” You may ask. Yes; As we said, due toStreamsBecause of its interface and composability, we can also write such more concise, elegant and refined code. We will see this in detail later, but it is important to realize that the program can run smoothly on files of any size, ideally with the same memory utilization. Try it (but considering compressing a large file may take some time).

time efficiency

Now let’s consider a compressed file and upload it to a remote location.HTTPAn example of an application of a serverHTTPThe server then decompresses and saves it to the file system. If our client is usingBufferedTheAPIImplementation, then upload will only start when the entire file is read and compressed. On the other hand, decompression will only start on the server if all data is received. A better solution to achieve the same results involves the use ofStreams. On the client machine,StreamsData blocks can be compressed and sent as long as they are read from the file system, while on the server, each data block can be decompressed as long as it is received from a remote peer. We show this by building the aforementioned application, starting from the server side.

We created a system calledgzipReceive.jsThe module of, code is as follows:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

The server receives the data block from the network, decompresses it, and saves it as soon as it receives it, thanks toNode.jsTheStreams.

The client side of our application will enter a database calledgzipSend.jsThe module of is as follows:

In the previous code, we used againStreamsRead data from a file, then compress and send each data block while reading from the file system.

Now, to run this application, we first start the server with the following command:

node gzipReceive

Then, we can specify the file to send and the address of the server (for examplelocalhost) to start the client:

node gzipSend <path to file> localhost

If we choose a file large enough, we will more easily see how the data flows from the client to the server, but why do we use this modeStreams, compared to the use ofBufferedTheAPIMore efficient? The following figure should give us a hint:

A file is processed through the following stages:

  1. The client reads from the file system
  2. Client compressed data
  3. The client sends the data to the server
  4. The server receives data
  5. Server decompresses data
  6. The server writes the data to the disk

In order to complete the processing, we must go through each stage in the pipeline order until the end. In the above figure, we can see that usingBufferedTheAPIThis process is completely sequential. In order to compress data, we must first wait for the entire file to be read, then send the data, we must wait for the entire file to be read and compressed, and so on. When we useStreamsWhen we receive the first data block, the pipeline will be started without waiting for the entire file to be read. But what is even more surprising is that when the next piece of data is available, there is no need to wait for the completion of the previous set of tasks. On the contrary, the other assembly line is started in parallel. Because each task we perform is asynchronous, it looks perfect, so we can passNode.jsTo execute in parallelStreamsRelated operations of the; The only limitation is that the arrival order of data blocks must be guaranteed in each stage.

As can be seen from the previous figure, usingStreamsThe result is that the whole process takes less time because we don’t have to wait for all the data to be read and processed.

Combination

So far, the code we have seen has told us how to use it.pipe()Method to assembleStreamsThe data block of,StreamsAllow us to connect different processing units, each processing unit is responsible for a single responsibility (this is in line withNode.jsStyle). This is possible becauseStreamsHas a unified interface andAPIIn terms of, differentStreamsYou can also interact well. The only prerequisite is the next pipelineStreamsThe previous one must be supported.StreamsThe generated data types can be binary, text or even objects, as we will see in later chapters.

To prove itStreamsCombined advantages, we can try to build in our previousgzipReceive / gzipSendAdd encryption to applications.
To do this, we only need to add another to the pipelineStreamsTo update the client. Specifically, bycrypto.createChipher()The returned stream. The resulting code should look like this:

const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');
const http = require('http');
const path = require('path');

const file = process.argv[2];
const server = process.argv[3];

const options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

const req = http.request(options, res => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_shared_secret'))
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

In the same way, we update the server’s code so that it can decrypt the data block before decompressing it:

const http = require('http');
const fs = require('fs');
const zlib = require('zlib');
const crypto = require('crypto');

const server = http.createServer((req, res) => {
  const filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req
    .pipe(crypto.createDecipher('aes192', 'a_shared_secret'))
    .pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {
        'Content-Type': 'text/plain'
      });
      res.end('That\'s it\n');
      console.log(`File saved: ${filename}`);
    });
});

server.listen(3000, () => console.log('Listening'));

cryptoIt is one of the core modules of Node.js and provides a series of encryption algorithms.

With just a few lines of code, we added an encryption layer to the application. We just need to put the existingStreamsThe module and encryption layer can be combined together. Similarly, we can add and merge othersStreamsLike playing with high building blocks.

Obviously, the main advantage of this method is reusability, but as we can see from the code introduced so far,StreamsYou can also achieve clearer, more modular, and more concise code. For these reasons, streams are generally not only used to handle pureI / OAnd it is also a means to simplify and modularize code.

Start using Streams

In the previous chapter, we learned whyStreamsSo powerful, and it’s inNode.jsEverywhere, even inNode.jsIt is also found in the core modules of the. For example, as we have seen,fsThe module has a for reading from a filecreateReadStream()And for writing filescreateWriteStream(),HTTPThe request and response objects are essentiallyStreams, andzlibModules allow us to useStreamstypeAPICompress and decompress data blocks.

Now we know whyStreamsIs so important, let’s step back and start exploring it in more detail.

Streams structure

Node.jsEach of theStreamsAre allStreamsImplementation of one of the four basic abstract classes available in the core module:

  • stream.Readable
  • stream.Writable
  • stream.Duplex
  • stream.Transform

Each ..streamThe class is alsoEventEmitterAn example of. In fact,StreamsSeveral types of events can be generated, such asendThe event will be displayed in a readableStreamsTriggered when reading is completed, or error reading is made, or an exception is generated in the process.

Please note that, for the sake of brevity, in the examples presented in this chapter, we often ignore appropriate error handling. However, in a production environment, it is always recommended to register error event listeners for all Stream.

StreamsOne of the reasons why it is so flexible is that it can process not only binary data, but also almost anything.JavaScriptValue. In fact,StreamsTwo modes of operation can be supported:

  • Binary mode: in the form of data blocks (e.g.buffersOrstrings) Streaming data
  • Object schema: treat stream data as a series of discrete objects (which makes it possible to use almost anyJavaScriptValue)

These two modes of operation enable us not only to useI / OFlow, but also can be used as a tool to gracefully combine processing units in a functional style, as we will see later in this chapter.

In this chapter, we will mainly use the Node.js streaming interface introduced in Node.js 0.11, also known as version 3. For more details about the differences from the old interface, see StrongLoop inhttps://strongloop.com/strong ….

Streams readable

A readableStreamsRepresents a data source in theNode.jsIt uses thestreamIn the moduleReadableabstractClass implementation.

Read information from Streams

Read fromStreamsThere are two ways to receive data:non-flowingPatterns andflowingMode. Let’s analyze these patterns in more detail.

non-flowing模式(不流动模式)

From readableStreamsThe default mode for reading data in is to attach a readable event listener to indicate the availability of new data to be read. Then, in a cycle, we read all the data up to the inside.bufferEmpty. This can be usedread()The method completes, which synchronously reads data from the internal buffer and returns a that represents the data blockBufferOrStringObject.read()The method is used as follows:

readable.read([size]);

With this method, data can be directly retrieved fromStreamsOn-demand extraction.

In order to illustrate how this works, we have created a system calledreadStdin.jsNew module of, it implements a simple program, it reads data from the standard input (a readable stream), and sends all data back to the standard output:

process.stdin
  .on('readable', () => {
    let chunk;
    console.log('New data available');
    while ((chunk = process.stdin.read()) !== null) {
      console.log(
        `Chunk read: (${chunk.length}) "${chunk.toString()}"`
      );
    }
  })
  .on('end', () => process.stdout.write('End of stream'));

read()The method is a synchronous operation that reads from theStreamsInside ofBuffersArea to extract data blocks. IfStreamsWhen operating in binary mode, the returned data block defaults to oneBufferObject.

In a readable Stream operating in binary mode, we can read a string instead of a Buffer object by calling setEncoding(encoding the Stream and provide a valid encoding format (e.g. utf8).

Data is read from a readable listener, which is called whenever there is new data. When there is no more data available in the internal buffer,read()Method returnsnull; In this case, we have to wait for another readable event to be triggered, telling us that we can read again or wait for the representation.StreamsAt the end of the reading processendEvent trigger. When a stream is operating in binary mode, we can also determine whether to use theread()The method passes asizeParameter to specify the size of data we want to read. This is especially useful when implementing network protocols or parsing specific data formats.

Now, we are ready to runreadStdinModule and experiment. Let’s type some characters in the console and pressEnterKey to view the data displayed back to the standard output. To terminate the stream and thus generate a normal end event, we need to insert aEOF(End of File) Character (inWindowsUse onCtrl + ZOr atLinuxUse onCtrl + D)。

We can also try to connect our program with other programs. This can be done using the pipeline operator (|), which redirects the standard output of one program to the standard input of another program. For example, we can run the following command:

cat <path to a file> | node readStdin

This is a streaming paradigm, a good example of a universal interface, which enables our programs to communicate regardless of the language in which they are written.

flowing模式(流动模式)

FromStreamsAnother way to read from is to attach listeners to thedataEvents; This willStreamsSwitch toflowingMode, where data is not usedread()Function, but once data arrivesdataThe monitor is pushed into the monitor. For example, what we created earlierreadStdinThe application will use the flow mode:

process.stdin
  .on('data', chunk => {
    console.log('New data available');
    console.log(
      `Chunk read: (${chunk.length}) "${chunk.toString()}"`
    );
  })
  .on('end', () => process.stdout.write('End of stream'));

flowingThe mode is an old version.StreamsInterface (also known asStreams1) inheritance, its flexibility is low,APILess. along withStreams2The introduction of the interface,flowingThe mode is not the default working mode; to enable it, you need to attach a listener to thedataEvent or explicit callresume()Methods. To suspend temporarilyStreamsTriggerdataEvent, we can call thepause()Method, causing any incoming data to be cached internallybufferChina.

Calling pause () does not cause Streams to switch back to non-flowing mode.

Implement readable Streams

Now we know how to startStreamsThe next step is to learn how to realize a new oneReadableData flow. To this end, it is necessary to inheritstream.ReadableTo create a new class. Specific flow must be provided_read()Implementation of the method:

readable._read(size)

ReadableThe inside of the class will call the_read()Method, which in turn will start
Usepush()Fill internal buffer:

Note that read () is a method called by the Stream consumer, while _read () is a method implemented by a Stream subclass and cannot be called directly. Underline usually indicates that the method is private and should not be called directly.

To demonstrate how to implement the new readabilityStreams, we can try to implement a that generates random stringsStreams. Let’s create a project calledrandomStream.jsThe new module of, which will contain the of our stringgeneratorThe code for:

const stream = require('stream');
const Chance = require('chance');

const chance = new Chance();

class RandomStream extends stream.Readable {
  constructor(options) {
    super(options);
  }

  _read(size) {
    const chunk = chance.string(); //[1]
    console.log(`Pushing chunk of size: ${chunk.length}`);
    this.push(chunk, 'utf8'); //[2]
    if (chance.bool({
        likelihood: 5
      })) { //[3]
      this.push(null);
    }
  }
}

module.exports = RandomStream;

At the top of the file, we will load our dependencies. Except we’re loading oneChance’s npm moduleBesides, there is nothing special about it. It is a library for generating various random values, from numbers to strings to whole sentences.

The next step is to create a database calledRandomStreamAnd specifiesstream.ReadableAs its parent. In the previous code, we called the constructor of the parent class to initialize its internal state and will receive theoptionsParameters as input. viaoptionsPossible parameters passed by the object include the following:

  • Used toBuffersConvert toStringsTheencodingParameters (default isnull)
  • Whether to enable object mode (objectModeThe default isfalse)
  • Stored internallybufferThe upper limit of the data in the zone. Once this upper limit is exceeded, the data from thedata sourceRead (highWaterMarkThe default is16KB)

Okay, now let’s explain what we rewrittenstream.Readablegeneric_read()Methods:

  • The method useschanceGenerates a random string.
  • It converts the stringpushInternalbuffer. Please note that as wepushThe truth isStringIn addition, we also specified the code asutf8(If the data block is only a binaryBuffer, it is not required).
  • In order to5%The probability of random interruption ofstreamThe random string generated bypush nullTo the insideBufferTo expressEOF, i.e.streamThe end of.

We can also see in_read()Given in the input to the functionsizeThe parameter was ignored because it is a suggested parameter. We can simply put all available datapushTo the insidebufferHowever, if there are multiple pushes in the same call, then we should checkpush()Do you want to returnfalseBecause it means insidebufferHas reachedhighWaterMarkRestrictions, we should stop adding more data.

Above isRandomStreamModule, we are now ready to use it. Let’s create a project calledgenerateRandom.jsIn this module we instantiate a newRandomStreamObject and extract some data from it:

const RandomStream = require('./randomStream');
const randomStream = new RandomStream();

randomStream.on('readable', () => {
  let chunk;
  while ((chunk = randomStream.read()) !== null) {
    console.log(`Chunk received: ${chunk.toString()}`);
  }
});

Now that everything is ready, let’s try a new custom one.stream. Simply execute as usual.generateRandomModule, observe the random string flowing on the screen.

Writeable Streams

A writablestreamIndicates a data endpoint, atNode.jsIt uses thestreamIn the moduleWritableAbstract class to implement.

Write a stream

Put some data in writablestreamChina is a simple thing, all we have to do is use itwrite()Method, which has the following format:

writable.write(chunk, [encoding], [callback])

encodingThe parameter is optional and is set in thechunkYesStringType (default isutf8IfchunkYesBuffer, ignore); When a data block is refreshed into an underlying resource,callbackWill be called,callbackParameters are also optional.

To indicate that no more data will be writtenstreamWe must useend()Methods:

writable.end([chunk], [encoding], [callback])

We can pass.end()The method provides the last piece of data. In this case,callbakThe function is equivalent tofinishEvent registers a listener when all data blocks are writtenstreamThis event will be triggered when the.

Now, let’s create a small output random string sequenceHTTPServer to demonstrate how this works:

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  }); //[1]
  while (chance.bool({
      likelihood: 95
    })) { //[2]
    res.write(chance.string() + '\n'); //[3]
  }
  res.end('\nThe end...\n'); //[4]
  res.on('finish', () => console.log('All data was sent')); //[5]
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

We created aHTTP serverAnd write the dataresObjects,resThe object ishttp.ServerResponseAn instance of is also writablestream. Let’s explain what happened to the above code:

  1. We write firstHTTP responseThe head of. Please note,writeHead()NoWritablePart of the interface, in fact, this method ishttp.ServerResponseClass to expose auxiliary methods.
  2. Let’s start a5%The probability of ending the cycle (the probability of entering the cycle body ischance.bool()Generate, which is95%)。
  3. Inside the loop, we write a random string tostream.
  4. Once we are not in the loop, we callstreamTheend()That means there is no more

The data block will be written. In addition, we provide a final string to write to the stream before the end.

  1. Finally, we registered afinishEvent listener, when all data blocks are flushed to the bottomsocketThis event will be triggered when.

We can call this little module calledentropyServer.jsAnd then execute it. To test this server, we can use the addresshttp:// localhost:8080Open a browser or use it from the terminal.curlCommand, as follows:

curl localhost:8080

At this point, the server should start to selectHTTP clientSend random strings (note that some browsers may buffer data and streaming behavior may not be obvious).

Back-pressure

Similar to liquid flowing in a real pipe system,Node.jsThestreamIt may also suffer from bottlenecks and data writing may be faster thanstreamThe consumption of. Mechanisms to solve this problem include buffering input data; However, if the datastreamWithout giving any feedback to producers, we may have more and more data accumulated in the internal buffer, leading to memory leaks.

In order to prevent this from happening, when the internalbufferMore thanhighWaterMarkAt the time of restriction,writable.write()Will returnfalse. WriteablestreamHavehighWaterMarkProperty, this iswrite()The method starts to returnfalseInside ofBufferArea size limits, onceBufferThe size of the zone exceeds this limit, indicating that the application should stop writing. When the buffer is emptied, a trigger calleddrainFor the event of, it is safe to notify to start writing again. This mechanism is calledback-pressure.

The mechanism described in this section also applies to readable stream. In fact, back-pressure also exists in the readable stream and is triggered when the push () method called within _read () returns false. However, this is a specific problem for stream implementers, so we will not deal with it often.

We can modify the previously createdentropyServerModule to demonstrate writablestreamTheback-pressure

const Chance = require('chance');
const chance = new Chance();

require('http').createServer((req, res) => {
  res.writeHead(200, {
    'Content-Type': 'text/plain'
  });

  function generateMore() { //[1]
    while (chance.bool({
        likelihood: 95
      })) {
      const shouldContinue = res.write(
        chance.string({
          length: (16 * 1024) - 1
        }) //[2]
      );
      if (!shouldContinue) { //[3]
        console.log('Backpressure');
        return res.once('drain', generateMore);
      }
    }
    res.end('\nThe end...\n', () => console.log('All data was sent'));
  }
  generateMore();
}).listen(8080, () => console.log('Listening on http://localhost:8080'));

The most important steps in the previous code can be summarized as follows:

  1. We encapsulate the main logic in a program calledgenerateMore()In the function of.
  2. In order to gain moreback-pressureWe will increase the block size to16KB-1ByteWhich is very close to the defaulthighWaterMarkRestrictions.
  3. After writing a large block of data, we checkres.write()The return value of the. If it returnsfalseWhich means insidebufferIt is full, we should stop sending more data. In this case, we exit from the function and register a new publisher of the write event whendrainCalled when an event is triggeredgenerateMore.

If we try to run the server again now and then usecurlWhen client requests are generated, there will probably be someback-pressureBecause the server generates data at a very high speed, even faster than the bottomsocketFaster.

Implements writable Streams.

We can inheritstream.WritableClass to implement a new writable stream and_write()The method provides an implementation. Implementation of a user-defined writableStreamsClass.

Let’s build a writablestreamWhich receives objects in the following format:

{
  path: <path to a file>
  content: <string or buffer>
}

The function of this class is as follows: for each object, ourstreamThecontentPortions are saved to a file created in the given path. We can immediately see that westreamThe input of is an object, notStringsOrBuffers, which means that ourstreamYou must work in object mode.

Calling moduletoFileStream.js

const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirp = require('mkdirp');

class ToFileStream extends stream.Writable {
  constructor() {
    super({
      objectMode: true
    });
  }

  _write(chunk, encoding, callback) {
    mkdirp(path.dirname(chunk.path), err => {
      if (err) {
        return callback(err);
      }
      fs.writeFile(chunk.path, chunk.content, callback);
    });
  }
}
module.exports = ToFileStream;

As a first step, we load all the dependency packages we need. Note that we need modulesmkdirpAs you should know from the previous chapters, it should be usednpmInstallation.

We have created a new class that derives fromstream.WritableIt’s expanded.

We have to call the parent constructor to initialize its internal state; We also provided oneoptionObject is used as a parameter to specify that the flow works in object mode (objectMode:true)。stream.WritableOther options accepted are as follows:

  • highWaterMark(The default value is16KB): controlback-pressureThe upper limit of.
  • decodeStrings(Default istrue): Pass in string to_write()Before the method, the string is automatically decoded into binarybufferDistrict. This parameter is ignored in object mode.

Finally, we are_write()The method provides an implementation. As you can see, this method accepts a data block, an encoding method (only in binary mode,streamOptionsdecodeStringsSet tofalseIt only makes sense when it is.

In addition, the method accepts a callback function that needs to be called when the operation is completed. It is not necessary to pass the result of the operation, but we can still pass one if necessary.errorObject, which will result instreamTriggererrorEvents.

Now, to try out what we just builtstream, we can create a file calledwriteToFile.js, and performs some write operations on the stream:

const ToFileStream = require('./toFileStream.js');
const tfs = new ToFileStream();

tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log("All files created"));

With this, we created and used our first custom writable stream. Run the new module as usual to check its output; You will see that three new files will be created after execution.

Double Streams

doublestreamBoth readable and writable. When we want to describe an entity that is both a data source and a data endpoint (for examplesocket), which is very useful. Duplex flow inheritancestream.ReadableAndstream.WritableIt is not new to us. This means that we canread()Orwrite()Data, or can monitorreadableAnddrainEvents.

To create a custom doublestream, we must for_read()And_write()Provides an implementation. Pass toDuplex()Constructor ofoptionsObjects are forwarded internally toReadableAndWritableThe constructor of the.optionsThe contents of the parameters are the same as those discussed above.optionsAdded a name calledallowHalfOpenValue (default istrue), if set tofalse, this will result in as long asstreamOn the one hand (ReadableAndWritable) over.streamIt’s over.

In order for the dual stream to work in object mode on one side and binary mode on the other side, we need to manually set the following properties in the stream constructor:

this._writableState.objectMode
this._readableState.objectMode

Converted Streams

ConvertedStreamsIt is a special type of double that is specially designed to handle data conversion.Streams.

In a simple doubleStreamsFromstreamThere is no direct relationship between the data read in and the data written to it (at leaststreamIs unknown). Think of oneTCP socketIt only sends data to and receives data from remote nodes.TCP socketI am not aware of any relationship between input and output.

The figure below illustrates the doubleStreamsData flow in:

On the other hand, convertedStreamsSome kind of conversion is applied to each data block received from the writable end, and then the converted data is made available at its readable end.

The following figure shows how the data is being convertedStreamsMedium flow:

Seen from the outside, transformedStreamsThe Interface and Duplication ofStreamsThe interfaces of are exactly the same. However, when we want to build a new doubleStreamsWhen, we must provide_read()And_write()Method, and in order to implement a new transformation flow, we must fill in another pair of methods:_transform()And_flush())。

Let’s demonstrate how to use an example to create a new transformationStreams.

Streams that implement conversion

Let’s implement a transformationStreamsThat will replace all occurrences of the given string. To do this, we have to create a system calledreplaceStream.jsNew module of. Let’s look directly at how to implement it:

const stream = require('stream');
const util = require('util');

class ReplaceStream extends stream.Transform {
  constructor(searchString, replaceString) {
    super();
    this.searchString = searchString;
    this.replaceString = replaceString;
    this.tailPiece = '';
  }

  _transform(chunk, encoding, callback) {
    const pieces = (this.tailPiece + chunk)         //[1]
      .split(this.searchString);
    const lastPiece = pieces[pieces.length - 1];
    const tailPieceLen = this.searchString.length - 1;

    this.tailPiece = lastPiece.slice(-tailPieceLen);     //[2]
    pieces[pieces.length - 1] = lastPiece.slice(0,-tailPieceLen);

    this.push(pieces.join(this.replaceString));       //[3]
    callback();
  }

  _flush(callback) {
    this.push(this.tailPiece);
    callback();
  }
}

module.exports = ReplaceStream;

As usual, we will build the module from its dependencies. This time we did not use a third-party module.

And then we created a file fromstream.TransformThe new class inherited by the base class. The constructor of this class takes two parameters:searchStringAndreplaceString. As you can imagine, they allow us to define the text to match and the strings to be used as replacements. We have also initiated a program that will be implemented by_transform()The used by the methodtailPieceInternal variables.

Now, let’s analyze it_transform()Method, which is the core of our new class._transform()Methods and WriteablestreamThe_write()The method has almost the same format, but instead of writing data to the underlying resource, it uses thethis.push()Push it insidebuffer, which is different from what we will see in the readable stream_read()Method. This shows the convertedStreamsHow do the two sides actually connect?

ReplaceStreamThe_transform()The method implements the core of our new class. Under normal circumstances, search and replacebufferThe string in the zone is an easy thing; However, when data is streamed, the situation is completely different, and the possible matches may be distributed among multiple data blocks. The program following the code can be explained as follows:

  1. Our algorithm usessearchStringFunction as delimiter to divide blocks.
  2. Then, it takes out the last item of the array generated after separationlastPieceAnd extract its last charactersearchString.length - 1. The results are saved totailPieceVariable, it will be used as the prefix of the next data block.
  3. Finally, all fromsplit()The obtained fragment is used forreplaceStringJoin together as separators and push insidebufferDistrict.

WhenstreamAt the end, we may still have the last one.tailPieceThe variable was not pressed into the internal buffer. This is exactly what it is_flush()The use of the method; It’s in.streamCalled before the end, and this is where we finally have a chance to complete the stream or push any remaining data before the stream is completely finished.

_flush()The method only needs a callback function as a parameter. When all operations are completed, we must ensure that this callback function is called. Having finished this, we have finished ourReplaceStreamClass.

Now is the time to try something newstream. We can create another one calledreplaceStreamTest.jsTo write some data, and then read the conversion results:

const ReplaceStream = require('./replaceStream');

const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));

rs.write('Hello W');
rs.write('orld!');
rs.end();

In order to make this example more complicated, we distributed the search terms in two different data blocks. Then, use theflowingMode, we are from the samestreamRead data from and record each converted block. Running the previous program should produce the following output:

Hel
lo Node.js
!

One thing worth mentioning is the fifth type of stream:stream.PassThrough Unlike other flow classes that we introduced, PassThrough is not abstract and can be instantiated directly without implementing any method. In fact, this is a convertible stream, which can output every data block without any conversion.

Connect Streams using pipes

UnixThe concept of pipeline is defined byDouglas McllroyInvented; This enables the output of the program to be connected to the next input. Look at the following command:

echo Hello World! | sed s/World/Node.js/g

In the previous command,echoWillHello World!Write standard output and then redirect tosedStandard input for commands (due to pipe operators|)。 ThensedUseNode.jsReplace anyWorldAnd prints the results to its standard output (this time the console).

In a similar manner, readableStreamsThepipe()The method willNode.jsTheStreamsConnected together, it has the following interfaces:

readable.pipe(writable, [options])

Very intuitively,pipe()The method will derive from the readableStreamsData issued in is extracted to the provided writableStreamsChina. In addition, when readableStreamsIssueendEvents (unless we specify{end:false}As aoptions), writableStreamsIt will end automatically.pipe()The method returns a writable passed as a parameterStreamsIf this is the casestreamAlso readable (e.g., dual or convertibleStreams), it allows us to create a chain call.

Take twoStreamsWhen connected together, allows data to flow automatically to writableStreamsSo there is no need to callread()Orwrite()Methods; But the most important thing is that there is no need for controlback-pressureBecause it will be processed automatically.

To give a simple example (there will be a large number of examples), we can create one calledreplace.jsA new module of that accepts text streams from standard input, applies substitution transformations, and then returns data to standard output:

const ReplaceStream = require('./replaceStream');
process.stdin
  .pipe(new ReplaceStream(process.argv[2], process.argv[3]))
  .pipe(process.stdout);

The above program transfers data from standard input toReplaceStream, and then return to standard output. Now, in order to practice this little application, we can useUnixThe pipeline redirects some data to its standard input as follows:

echo Hello World! | node replace World Node.js

Running the above program will output the following results:

Hello Node.js

This simple example demonstratesStreams(especially the textStreams) is a common interface, and pipelines are almost the common way to form and connect all these interfaces.

errorEvents do not propagate automatically through pipelines. For example, look at the following code snippet:

stream1
  .pipe(stream2)
  .on('error', function() {});

In the previous chain call, we will only capture the data from thestream2This is because we addederorrEvent listener. This means that if we want to capture fromstream1For any errors generated, we must attach another error listener directly. Later we will see another pattern that enables common error capture (mergeStreams)。 In addition, we should note that if the targetStreams(read)Streams) issue an error, which will send a message to the sourceStreamsNotify oneerrorAfter that, the pipeline was interrupted.

How do Streams pass through the pipeline

So far, we have created customizationsStreamsThe way in which this is done is not fully followed.NodeDefined patterns; In fact, fromstreamBase class inheritance is a violationsmall surface area, and requires some sample code. This does not mean thatStreamsThe design is not good, in fact, we should not forget, becauseStreamsYesNode.jsPart of the core, so they must be as flexible and extensive as possible.StreamsSo that user-level modules can fully utilize them.

However, in most cases, we don’t need all the power and extensibility that prototype inheritance can give, but usually all we want is to define something new.StreamsA rapid development model of the.Node.jsThe community has certainly created a solution for this. A perfect example isthrough2A that enables us to simply create transformationsStreamsA small library of. viathrough2, we can create a new convertible by calling a simple functionStreams

const transform = through2([options], [_transform], [_flush]);

Similarly,from2It also allows us to create a readableStreams

const readable = from2([options], _read);

Next, we will show their usage in the rest of this chapter, when we will clearly understand the benefits of using these small libraries.

throughAndfromBased onStream1The top-level library of specifications.

Asynchronous Control Flow Based on Streams

Through the examples we have already introduced, it should be clear that,StreamsNot only can it be used for processingI / OAnd can be used as an elegant programming mode for processing any type of data. But the advantages do not stop there. It can also be usedStreamsTo implement asynchronous control flow, as will be seen in this section.

Sequential execution

By default,StreamsData will be processed sequentially; For example, convertedStreamsThe_transform()The function executes on the previous data blockcallback()The next block of data will not be called until later. This isStreamsIt is very important to process each data block in the correct order, but you can also use this attribute toStreamsRealize elegant traditional control flow mode.

Code is always much better than too many explanations, so let’s show an example of how to use streams to perform asynchronous tasks in sequence. Let’s create a function to connect a set of received files as input to ensure that the order provided is followed. We created a project calledconcatFiles.js, starting with its dependency:

const fromArray = require('from2-array');
const through = require('through2');
const fs = require('fs');

We will usethrough2To simplify the conversionStreams, and use thefrom2-arrayCreates a readable from an array of objectsStreams.
Next, we can defineconcatFiles()Functions:

function concatFiles(destination, files, callback) {
  const destStream = fs.createWriteStream(destination);
  fromArray.obj(files)             //[1]
    .pipe(through.obj((file, enc, done) => {   //[2]
      const src = fs.createReadStream(file);
      src.pipe(destStream, {end: false});
      src.on('end', done); //[3]
    }))
    .on('finish', () => {         //[4]
      destStream.end();
      callback();
    });
}

module.exports = concatFiles;

The previous function passed thefilesArray converted toStreamsTo realize the rightfilesSequential iteration of an array. The procedure followed by this function is explained as follows:

  1. First of all, we usefrom2-arrayFromfilesArray to create a readableStreams.
  2. Next, we usethroughTo create a transformedStreamsTo process each file in the sequence. For each file, we create a readableStreamsAnd input it through the pipeline to the that represents the output filedestStreamChina. After the source file has been read, you can use thepipe()Specifies in the second parameter of the method{end:false}, we make sure not to closedestStream.
  3. When all the contents of the source file are transferred todestStreamWhen we callthrough.objPublicdoneFunction to pass that the current processing has been completed, in our case this is the next file that needs to be triggered for processing.
  4. After all the documents have been processed,finishThe event was triggered. We can finally end itdestStreamAnd callconcatFiles()Thecallback()Function that represents the completion of the entire operation.

We can now try to use the little module we just created. Let’s create one calledconcat.jsTo complete an example:

const concatFiles = require('./concatFiles');

concatFiles(process.argv[2], process.argv.slice(3), () => {
  console.log('Files concatenated successfully');
});

We can now run the above program with the target file as the first command line parameter, followed by a list of files to connect, for example:

node concat allTogether.txt file1.txt file2.txt

Executing this command will create a file namedallTogether.txt, which are saved sequentiallyfile1.txtAndfile2.txtThe content of.

UseconcatFiles()Function, we can use onlyStreamsImplement the sequential execution of asynchronous operations. As we areChapter3 Asynchronous Control Flow Patters with CallbacksAs you can see in, if you use pureJavaScriptImplement, or useasyncSuch as external libraries, you need to use or implement iterators. We now provide another method to achieve the same effect. As we can see, its implementation is very elegant and readable.

Pattern: Using Streams or a combination of Streams, you can easily traverse a set of asynchronous tasks in sequence.

Out-of-order parallel execution

We just sawStreamsEach data block is processed sequentially, but sometimes this may not be possible because it is not fully utilized.Node.jsThe concurrency of. If we have to perform a slow asynchronous operation on each data block, then parallel execution of this set of asynchronous tasks is absolutely necessary. Of course, this mode can only be applied if there is no relationship between each data block, which may often occur in object mode.StreamsHowever, for the binary modeStreamsRandom parallel execution is rarely used.

Note: Streams executed out of order and in parallel cannot be used when the order of processing data is important.

In order to parallelize a convertibleStreamsThe implementation of, we can useChapter3 Asynchronous Control Flow Patters with CallbacksThe same pattern of out-of-order parallel execution is mentioned, and then some changes are made to make them applicable toStreams. Let’s see how this has changed.

实现一个无序并行的Streams

Let’s use an example to illustrate directly: we create a system calledparallelStream.js, and then customize an ordinary convertibleStreams, and then gives a series of methods to convert the flow:

const stream = require('stream');

class ParallelStream extends stream.Transform {
  constructor(userTransform) {
    super({objectMode: true});
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
  }

  _transform(chunk, enc, done) {
    this.running++;
    this.userTransform(chunk, enc, this._onComplete.bind(this), this.push.bind(this));
    done();
  }

  _flush(done) {
    if(this.running > 0) {
      this.terminateCallback = done;
    } else {
      done();
    }
  }

  _onComplete(err) {
    this.running--;
    if(err) {
      return this.emit('error', err);
    }
    if(this.running === 0) {
      this.terminateCallback && this.terminateCallback();
    }
  }
}

module.exports = ParallelStream;

Let’s analyze this new custom class. As you can see, the constructor accepts auserTransform()Function as a parameter, and then save it as an instance variable; We also call the parent constructor, and we enable object mode by default.

Next, let’s look at it_transform()Method, in which we executeuserTransform()Function, and then increase the number of currently running tasks; Finally, we call thedone()To notify that the current conversion step has been completed._transform()The method shows how to process another task in parallel. We don’t have to waituserTransform()Method is called after executiondone(). On the contrary, we will implement it immediately.done()Methods. On the other hand, we provide a special callback function touserTransform()Method, this isthis._onComplete()Methods; So that we canuserTransform()Receive notification when it is completed.

InStreamsCalled before termination_flush()Method, so if there are still tasks running, we can call thedone()Callback function to delayfinishTriggering of an event. Instead, we assign it tothis.terminateCallbackVariable. To understandStreamsHow to terminate correctly, to see_onComplete()Methods.

When each set of asynchronous tasks is finally completed,_onComplete()The method is called. First, it checks if any tasks are running, and if not, callsthis.terminateCallback()Function, which will result inStreamsEnd, trigger_flush()Method offinishEvents.

Using the newly builtParallelStreamClass can easily create a convertible that executes out of order in parallelStreamsInstance, but note that it does not preserve the order in which items are received. In fact, asynchronous operations can complete and push data at any time, but they are not necessarily related to the moment they start. So we know that for binary modeStreamsNot applicable because binaryStreamsThe order requirement is higher.

实现一个URL监控应用程序

Now, let’s use itParallelStreamA specific example of module implementation. Let’s imagine that we want to build a simple service to monitor a largeURLThe status of the list, let’s imagine the following, all of theseURLIncluded in a separate file and eachURLOccupy an empty line.

StreamsCan provide an efficient and elegant solution for this scene. Especially when we use what we just wroteParallelStreamClass to review these out of orderURL.

Next, let’s create a simplecheckUrls.jsThe application of the module.

const fs = require('fs');
const split = require('split');
const request = require('request');
const ParallelStream = require('./parallelStream');

fs.createReadStream(process.argv[2])         //[1]
  .pipe(split())                             //[2]
  .pipe(new ParallelStream((url, enc, done, push) => {     //[3]
    if(!url) return done();
    request.head(url, (err, response) => {
      push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))   //[4]
  .on('finish', () => console.log('All urls were checked'))
;

As we can see, our code looks very elegant and intuitive through streaming. Let’s see how it works:

  1. First, we create a readable with the given file parametersStreamsTo facilitate the subsequent reading of the file.
  2. We passedsplitThe of the file to be enteredStreamsThe content of the output a convertibleStreamsInto the pipeline and separate each row of the data block.
  3. Then, it’s time to use usParallelStreamTo checkURLYes, we sent oneHEADRequesting and then waiting for the requestresponse. When the request returns, we send the result of the requestpushtostreamChina.
  4. Finally, save the results toresults.txtIn the file.
node checkUrls urlList.txt

The files hereurlList.txtContains a set ofURL, for example:

  • http://www.mariocasciaro.me/
  • http://loige.co/
  • http://thiswillbedownforsure.com/

When the application is finished, we can see a fileresults.txtIs created, which contains the results of the operation, such as:

  • http://thiswillbedownforsure.com is down
  • http://loige.co is up
  • http://www.mariocasciaro.me is up

The order of the output results is likely to be the same as that specified in the input fileURLThe order is different. This isStreamsObvious features of unordered parallel execution of tasks.

Out of curiosity, we may want to try to replace ParallelStream with a normal through2 stream and compare the behavior and performance of the two (an exercise you may want to do). We will see that the through2 method is slower because each URL will be checked sequentially and the order of the results in the file results.txt will be preserved.

Out-of-order restriction of parallel execution

If you run a file that contains thousands or millions of URLscheckUrlsApplications, we will definitely encounter trouble. Our application will create an uncontrolled number of connections at the same time, send a large amount of data in parallel, and may destroy the stability of the application and the availability of the entire system. We already know that out-of-order restriction of parallel execution to control load is an excellent solution.

Let’s start by creating alimitedParallelStream.jsModule to see how it works. This module is adapted from the one created in the previous section.parallelStream.jsModules.

Let’s look at its constructor:

class LimitedParallelStream extends stream.Transform {
  constructor(concurrency, userTransform) {
    super({objectMode: true});
    this.concurrency = concurrency;
    this.userTransform = userTransform;
    this.running = 0;
    this.terminateCallback = null;
    this.continueCallback = null;
  }
// ...
}

We need oneconcurrencyVariables are used as input to limit concurrency. This time we will save two callback functions.continueCallbackFor any pending_transformMethods,terminateCallbackCallback for _flush method.
Next, look_transform()Methods:

_transform(chunk, enc, done) {
  this.running++;
  this.userTransform(chunk, enc,  this.push.bind(this), this._onComplete.bind(this));
  if(this.running < this.concurrency) {
    done();
  } else {
    this.continueCallback = done;
  }
}

This time in_transform()Method, we must call thedone()Before, it was checked whether the limit of the maximum number of parallelism has been reached. If the limit has not been reached, the processing of the next item can be triggered. If we have reached the limit of the maximum number of parallelism, we can simplydone()Callback saved tocontinueCallbackVariable so that it can be called immediately after the task is completed.

_flush()Methods andParallelStreamClass remains exactly the same, so we go directly to the implementation_onComplete()Methods:

_onComplete(err) {
  this.running--;
  if(err) {
    return this.emit('error', err);
  }
  const tmpCallback = this.continueCallback;
  this.continueCallback = null;
  tmpCallback && tmpCallback();
  if(this.running === 0) {
    this.terminateCallback && this.terminateCallback();
  }
}

Whenever the task is completed, we call any savedcontinueCallback()Will result in
streamUnlock, triggering the processing of the next item.

This is it.limitedParallelStreamModules. We can nowcheckUrlsUse it instead in the moduleparallelStreamAnd limit the concurrency of our tasks to the values we set.

Sequential parallel execution

The parallelism we created earlierStreamsThe data may be out of order, but in some cases this is unacceptable. Sometimes, in fact, there are business scenarios that require each data block to be sent out in the same order as it was received. We can still run in paralleltransformFunction. All we have to do is sort the data sent by each task so that it follows the same order as the data received.

This technique involves the use ofbufferTo reorder the blocks when each running task is issued. For the sake of brevity, we do not intend to provide such astreamBecause the scope of this book is rather lengthy. What we have to do is reuse what was built for this particular purpose.npmAn available package on, for examplethrough2-parallel.

We can modify the existingcheckUrlsModule to quickly check the behavior of an orderly parallel execution. Suppose we want our results to match those in the input fileURLWrite in the same order. We can use throughthrough2-parallelTo achieve:

const fs = require('fs');
const split = require('split');
const request = require('request');
const throughParallel = require('through2-parallel');

fs.createReadStream(process.argv[2])
  .pipe(split())
  .pipe(throughParallel.obj({concurrency: 2}, function (url, enc, done) {
    if(!url) return done();
    request.head(url, (err, response) => {
      this.push(url + ' is ' + (err ? 'down' : 'up') + '\n');
      done();
    });
  }))
  .pipe(fs.createWriteStream('results.txt'))
  .on('finish', () => console.log('All urls were checked'))
;

As we can see,through2-parallelInterface withthrough2The interfaces of are very similar. The only difference is inthrough2-parallelCan also provide us withtransformFunction specifies a concurrency limit. If we try to run this new version ofcheckUrls, we will seeresults.txtThe file lists the order of the results with the input file
URLs appear in the same order.

Through this, we summarized the use ofStreamsRealizing the analysis of asynchronous control flow; Next, we will study the pipeline model.

Pipeline mode

Just like in real life,Node.jsTheStreamsIt is also possible to connect pipes according to different modes. In fact, we can put two differentStreamsMerge into oneStreams, aStreamsDivide into two or more pipes or redirect the flow according to conditions. In this section, we will explore the possible applicationsNode.jsTheStreamsThe most important pipeline technology.

Combined Streams

In this chapter, we emphasizeStreamsIt provides a simple infrastructure to modularize and reuse our code, but it misses an important part: if we want to modularize and reuse the entire pipeline? If we want to merge multipleStreamsTo make them look like externalStreams, then what should I do? The following figure shows what this means:

From the above figure, we can see how to combine several streams:

  • When we write the combinedStreamsIn fact, we are writing combinationsStreamsThe first unit of, namelyStreamA.
  • When we start from the combinedStreamsIn fact, when reading information from theStreamsRead from the last cell of the.

A combinationStreamsUsually a multipleStreamsBy connecting the write end of the first cell and the read end of the last cell.

To create multiple Streams from two different Streams (a readable stream and a writable stream), we can use an npm module, for exampleduplexer2.

However, this is not complete. In fact, combinedStreamsYou should also capture any section of the pipeline.StreamsError generated by unit. We have already said that no error will automatically propagate into the pipeline. Therefore, we must have proper error management, and we will have to explicitly attach an error listener to eachStreams. However, combinedStreamsIn fact, it is a black box, which means that we cannot access any unit in the middle of the pipeline, so the exception capture of any unit in the pipeline, combinedStreamsIt also acts as an aggregator.

All in all, combinedStreamsIt has two main advantages:

  • Inside the pipeline is a black box that is not visible to users.
  • Error management is simplified because we do not need to attach an error listener to each unit in the pipeline, but only to the combinedStreamsIt can be attached to itself.

assortedStreamsIs a very common and common practice, so if we don’t have any special needs, we may just want to reuse existing solutions, such asmultipipeOrcombine-stream.

Streams that implement a combination

To illustrate a simple example, let’s consider the following two combinationsStreamsSituation:

  • Compress and encrypt data
  • Decompress and decrypt data

Use methods such asmultipipeSuch as library, we can by combining some existing core libraryStreams(DocumentscombinedStreams.js) to easily build a combinedStreams

const zlib = require('zlib');
const crypto = require('crypto');
const combine = require('multipipe');
module.exports.compressAndEncrypt = password => {
  return combine(
    zlib.createGzip(),
    crypto.createCipher('aes192', password)
  );
};
module.exports.decryptAndDecompress = password => {
  return combine(
    crypto.createDecipher('aes192', password),
    zlib.createGunzip()
  );
};

For example, we can now use these combined data streams, like black boxes, which are invisible to us. We can create a small application to archive files through compression and encryption. Let’s take a look at a book calledarchive.jsTo do this in the new module of:

const fs = require('fs');
const compressAndEncryptStream = require('./combinedStreams').compressAndEncrypt;
fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"));

We can do this by building a combinedStreamTo further improve the previous code, but this time it is not just to obtain black boxes that are not visible to the outside world, but to capture exceptions. In fact, as we have already mentioned, writing the following code will only capture the last oneStreamError issued by unit:

fs.createReadStream(process.argv[3])
  .pipe(compressAndEncryptStream(process.argv[2]))
  .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
  .on('error', function(err) {
    // 只会捕获最后一个单元的错误
    console.log(err);
  });

However, by putting all theStreamsTogether, we can solve this problem gracefully. Reconstructedarchive.jsAs follows:

const combine = require('multipipe');
   const fs = require('fs');
   const compressAndEncryptStream =
     require('./combinedStreams').compressAndEncrypt;
   combine(
     fs.createReadStream(process.argv[3])
     .pipe(compressAndEncryptStream(process.argv[2]))
     .pipe(fs.createWriteStream(process.argv[3] + ".gz.enc"))
   ).on('error', err => {
     // 使用组合的Stream可以捕获任意位置的错误
     console.log(err);
   });

As we can see, we can now attach an error listener directly to the combinedStreamsWhich will receive any from any internal streamerrorEvents.
Now, to runarchiveModule, just specify in the command line argumentspasswordAndfileParameters, i.e. parameters of compression module:

node archive mypassword /path/to/a/file.text

Through this example, we have clearly proved the combination ofStreamHow important it is; On one hand, it allows us to create reusable combinations of flows, and on the other hand, it simplifies error management of pipelines.

SeparateStreams

We can do this by combining a single readableStreamPipelining into multiple writableStreamTo executeStreamA branch of. When we want to send the same data to different destinations, this shows its effect, for example, two different sockets or two different files. It can also be used when we want to perform different transformations on the same data or when we want to split the data according to some standards. As shown in the figure:

InNode.jsSeparated inStreamIt is a small matter. Give an example.

Implementation of a Multiple Checksum Generator

Let’s create a that outputs the given filesha1Andmd5Hash gadget. Let’s call this new modulegenerateHashes.js, look at the following code:

const fs = require('fs');
const crypto = require('crypto');
const sha1Stream = crypto.createHash('sha1');
sha1Stream.setEncoding('base64');
const md5Stream = crypto.createHash('md5');
md5Stream.setEncoding('base64');

There is nothing special so far. The next part of the module is actually that we will create a readable one from the file.StreamAnd bifurcates it into two different streams to obtain two other files, one of which containssha1Hash, another containsmd5Checksum:

const inputFile = process.argv[2];
const inputStream = fs.createReadStream(inputFile);
inputStream
  .pipe(sha1Stream)
  .pipe(fs.createWriteStream(inputFile + '.sha1'));
inputStream
  .pipe(md5Stream)
  .pipe(fs.createWriteStream(inputFile + '.md5'));

This is very simple:inputStreamThe variable is input to through the pipelinesha1Stream, input to on the other sidemd5Stream. But note:

  • WheninputStreamAt the end,md5StreamAndsha1StreamWill end automatically unless calledpipe()When specifiedendThe options arefalse.
  • StreamThe two branches of the will accept the same data block, so we must be very careful when performing some side effects on the data, because that will affect the other branch.
  • Back pressure will be generated outside the black box frominputStreamThe flow rate of the data stream of will be adjusted according to the flow rate of the slowest receiving branch.

amalgamativeStreams

Merging is opposed to separating by combining a set of readableStreamsMerge into a single writableStream, as shown in the figure:

Combine multipleStreamsMerging into one is usually a simple operation; However, we must pay attention to our handlingendEvent, because piping systems that use the automatic end option end the target stream immediately when a source ends. This usually results in errors because other sources that have not yet ended will continue to write to terminated sources.Stream. The solution to this problem is to use options when transferring multiple sources to a single destination{end:false}And only after all sources have finished readingStreamCall onend().

Compress multiple source files into one compression package

To give a simple example, let’s implement a small program that creates a compression package based on the contents of two different directories. To this end, we will introduce two newnpmModule:

  • tarUsed to create compression package
  • fstreamCreate a library of object streams from file system files

We create a new modulemergeTar.jsTo start initialization as follows:

var tar = require('tar');
var fstream = require('fstream');
var path = require('path');
var destination = path.resolve(process.argv[2]);
var sourceA = path.resolve(process.argv[3]);
var sourceB = path.resolve(process.argv[4]);

In the previous code, we only loaded all the dependency packages and initialized the file containing the target file and the two source directories (sourceAAndsourceB) variable.

Next, we createtarTheStreamAnd output to a writableStream

const pack = tar.Pack();
pack.pipe(fstream.Writer(destination));

Now, let’s start initializing the sourceStream

let endCount = 0;

function onEnd() {
  if (++endCount === 2) {
    pack.end();
  }
}

const sourceStreamA = fstream.Reader({
    type: "Directory",
    path: sourceA
  })
  .on('end', onEnd);

const sourceStreamB = fstream.Reader({
    type: "Directory",
    path: sourceB
  })
  .on('end', onEnd);

In the previous code, we created a file from two source directories (sourceStreamAAndsourceStreamB) is read from theStreamThen for each sourceStream, we attach a.endEvent subscribers will only be triggered when the two directories are completely read.packTheendEvents.

Finally, merge the twoStream

sourceStreamA.pipe(pack, {end: false});
sourceStreamB.pipe(pack, {end: false});

We compressed both source files topackThis ..Stream, and through the settingpipe()TheoptionParameters are{end:false}Configure endpointStreamAutomatic triggering ofendEvents.

In this way, we have completed our simpleTARProcedures. We can try to run this utility by providing the target file as the first command line argument, followed by two source directories:

node mergeTar dest.tar /path/to/sourceA /path/to/sourceB

InnpmWe can find some that can be simplified 2StreamThe merged modules of:

Note that the inflow targetStreamThe data of is randomly mixed, which is an acceptable attribute in some types of object streams (as we saw in the previous example), but when dealing with binariesStreamThis is usually an undesirable situation.

However, we can merge sequentially through one patternStream; It includes merging sources one after anotherStreamAt the end of the current one, start sending the second data block (just like connecting all sourcesStreamThe output of the same). InnpmIn fact, we can find some software packages that also deal with this situation. One of them ismultistream.

Multiplexing and demultiplexing

MergeStreamPatterns have a special pattern, we don’t really just want to put more than oneStreamMerge together, but use a shared channel to transfer a set of data.Stream. It is different from the previous one because the source dataStreamLogical separation is maintained within the shared channel, which allows us to separate the data once it reaches the other end of the shared channel.Stream. As shown in the figure:

Combine multipleStreamCombine in a singleStreamOperations transmitted on are referred to as multiplexing, while the opposite operations (i.e., from sharingStreamThe received data reconstructs the originalStream) is called demultiplexing. The devices that perform these operations are called multiplexers and demultiplexers respectively. This is a topic widely studied in the fields of computer science and telecommunications, because it is one of the foundations of almost any type of communication media, such as telephone, radio, television, and of course the Internet itself. We will not explain too much about the scope of this book, because this is a very big topic.

What we want to demonstrate in this section is how to use sharedNode.js StreamsTo transmit a plurality of logically separatedStream, and then in sharingStreamThe other end of the is separated again, i.e. once multiplexing and demultiplexing are realized.

Create a remote logger log record

For example, we want to have a small program to start the process and redirect its standard output and standard errors to a remote server, which accepts them and saves them as two separate files. Therefore, in this case, the shared medium isTCPConnection, and the two channels to be reused are subprocessesstdoutAndstderr. We will make use of packet switching technology, which is related toIP,TCPOrUDPSuch protocols use the same technology, including encapsulating data in data packets, allowing us to specify various source information, which is very helpful for multiplexing, routing, control processes, and checking for corrupted data.

As shown in the figure, the protocol of this example is roughly as follows: data is encapsulated into data packets with the following structure:

在客户端实现多路复用

Let’s start with the client side and create a file calledclient.jsThis is part of our application. It is responsible for starting a subprocess and implementing it.StreamMultiplexing.

Start defining modules, first load dependencies:

const child_process = require('child_process');
const net = require('net');

Then start to realize the function of multiplexing:

function multiplexChannels(sources, destination) {
  let totalChannels = sources.length;

  for(let i = 0; i < sources.length; i++) {
    sources[i]
      .on('readable', function() { // [1]
        let chunk;
        while ((chunk = this.read()) !== null) {
          const outBuff = new Buffer(1 + 4 + chunk.length); // [2]
          outBuff.writeUInt8(i, 0);
          outBuff.writeUInt32BE(chunk.length, 1);
          chunk.copy(outBuff, 5);
          console.log('Sending packet to channel: ' + i);
          destination.write(outBuff); // [3]
        }
      })
      .on('end', () => { //[4]
        if (--totalChannels === 0) {
          destination.end();
        }
      });
  }
}

multiplexChannels()The function accepts the source to reuseStreamAs input
And then perform the following steps:

  1. For each sourceStreamWhich registers areadableEvent listener, we usenon-flowingThe schema reads data from the stream.
  2. For each data block read, we encapsulate it into a header, and the order of the headers is as follows:channel IDIs 1 byte (UInt8), the packet size is 4 bytes (UInt32BE), and then the actual data.
  3. When the packet is ready, we write it to the destinationStream.
  4. We areendEvent registers a listener so that when all sourcesStreamAt the end,endEvent Triggered, Notify TargetStreamTriggerendEvents.

Note that our protocol can reuse up to 256 different streams, because we only have one byte to identify it.channel.

const socket = net.connect(3000, () => { // [1]
  const child = child_process.fork( // [2]
    process.argv[2],
    process.argv.slice(3), {
      silent: true
    }
  );
  multiplexChannels([child.stdout, child.stderr], socket); // [3]
});

In the end, we do the following:

  1. We create a newTCPClient connected to addresslocalhost:3000.
  2. We promote the process by using the first command line argument as the path, while we provide the restprocess.argvArrays are used as parameters for subprocesses. We specify options{silent:true}So that child processes do not inherit from the parentstdoutAndstderr.
  3. We usemutiplexChannels()The function willstdoutAndstderrMultiplexed tosocketLi.
在服务端实现多路分解

Now let’s look at the server and createserver.jsModule, here we will come from a remote connectionStreamDecompose and transfer them to two different files.

First, create a file calleddemultiplexChannel()Function of:

function demultiplexChannel(source, destinations) {
  let currentChannel = null;
  let currentLength = null;
  source
    .on('readable', () => { //[1]
      let chunk;
      if(currentChannel === null) {          //[2]
        chunk = source.read(1);
        currentChannel = chunk && chunk.readUInt8(0);
      }
    
      if(currentLength === null) {          //[3]
        chunk = source.read(4);
        currentLength = chunk && chunk.readUInt32BE(0);
        if(currentLength === null) {
          return;
        }
      }
    
      chunk = source.read(currentLength);        //[4]
      if(chunk === null) {
        return;
      }
    
      console.log('Received packet from: ' + currentChannel);
    
      destinations[currentChannel].write(chunk);      //[5]
      currentChannel = null;
      currentLength = null;
    })
    .on('end', () => {            //[6]
      destinations.forEach(destination => destination.end());
      console.log('Source channel closed');
    })
  ;
}

The above code may look complicated, but careful reading is not the case. Due toNode.jsReadableStream, we can easily demultiplex our small protocol, as shown below:

  1. We started usingnon-flowingThe schema reads data from the stream.
  2. First of all, if we haven’t read it yetchannel ID, we tried to read 1 byte from the stream and then convert it to a number.
  3. The next step is to read the length of the header. We need to read 4 bytes, so it is possible to do this internally.BufferThere is not enough data, which will lead tothis.read()Call returnnull. In this case, we just interrupt parsing and try the next one againreadableEvents.
  4. When we can finally read the data size, we know from the insideBufferSo we tried to read all the data.
  5. When we read all the data, we can write it to the correct target channel, and we must remember to reset it.currentChannelAndcurrentLengthVariables (these variables will be used to parse the next packet).
  6. Finally, when the sourcechannelAt the end, don’t forget to call the targetStreamTheend()Methods.

Since we can demultiplex the sourcesStream, make the following call:

net.createServer(socket => {
  const stdoutStream = fs.createWriteStream('stdout.log');
  const stderrStream = fs.createWriteStream('stderr.log');
  demultiplexChannel(socket, [stdoutStream, stderrStream]);
})
  .listen(3000, () => console.log('Server started'))
;

In the above code, we started with3000Start one on the portTCPServer, then for each connection we receive, we will create two writableStream, pointing to two different files, one for standard output and the other for standard errors; These are our goalschannel. Finally, we usedemultiplexChannel()Demultiplexing Socket Streams tostdoutStreamAndstderrStream.

运行多路复用和多路分解应用程序

Now, we are ready to try to run our new multiplexing/demultiplexing application, but first let’s create a smallNode.jsProgram to produce some sample output; We call itgenerateData.js

console.log("out1");
console.log("out2");
console.error("err1");
console.log("out3");
console.error("err2");

First, let’s start running the server:

node server

Then run the client, need to provide file parameters as subprocesses:

node client generateData.js

The client runs almost immediately, but at the end of the process,generateDataThe application’s standard input and standard output pass through oneTCPConnection, and then on the server side, is demultiplexed into two files.

Note that when we usechild_process.fork()When, our client can start otherNode.jsModules.

Multiplexing and Decomposing Objects Streams

The example we just showed demonstrates how to reuse and demultiplex binary/textStreamHowever, it is worth mentioning that the same rules also apply to objectsStream. The biggest difference is that using objects, we already have a way to transfer data using atomic messages (objects), so multiplexing is like setting an attributechannel IDIt is as simple as every object, while demultiplexing only requires reading·channel IDProperty and route each object to the correct destinationStream.

Another pattern is to take several attributes on an object and distribute them to multiple purposesStreamThrough this mode, we can realize a complex process, as shown in the following figure:

As shown in the above figure, take an objectStreamshowanimals, and then according to the animal type:reptiles,amphibiansAndmammals, and then distributed to the correct targetStreamChina.

Summary

In this chapter, we’ve been rightNode.js StreamsAnd its use cases are described, but at the same time, it should also open a door for programming paradigm, with almost unlimited possibilities. We understand whyStreamBeNode.jsCommunity praise, and we have mastered their basic functions, so that we can use it to do more interesting things. We analyzed some advanced models and began to understand how to configure differentStreamsConnect together and master these characteristics, thus making flow so versatile and powerful.

If we meet, we can’t use oneStreamTo achieve the function, we can through the otherStreamsConnected together, this isNode.jsA very good characteristic of;StreamsIn dealing with binary data, strings and objects are very useful and have distinct characteristics.

In the next chapter, we will focus on the traditional object-oriented design pattern. AlthoughJavaScriptTo some extent, it is an object-oriented language, but inNode.jsIn, functional or mixed methods are usually preferred. Read the next chapter to find out the answer.