“Node.js Design Pattern” Advanced Asynchronous Criterion

  Front end, javascript, node.js

This series of articles is《Node.js Design Patterns Second Edition》The translation of the original text and the reading notes were serialized and updated in GitHub.Synchronized translation links.

Welcome to pay attention to my column, and subsequent blog posts will be synchronized in the column:

Advanced Asynchronous Recipes

Almost all the design patterns we have seen so far can be considered generic and applicable to many different areas of applications. However, there is a more specific model that focuses on solving specific problems. We can call these patterns. Just like cooking in real life, we have a clear set of steps to achieve the desired results. Of course, this does not mean that we cannot use some creativity to customize the design pattern to suit the taste of our guestsNode.jsProcedure is necessary. In this chapter, we will provide some common solutions to solve our daily problemsNode.jsSome specific problems encountered in the development. These modes include the following:

  • Asynchronous module introduction and initialization
  • Performance Optimization Using Batch and Cache Asynchronous Operations in Highly Concurrent Applications
  • Operation andNode.jsSynchronization of Blocking Event Cycles with Conflicting Ability to Process Concurrent RequestsCPUBinding operation

Asynchronous module introduction and initialization

InChapter2-Node.js Essential PatternsWhen we discussNode.jsThe basic properties of the module system, we mentionedrequire()Is synchronous andmodule.exportsIt cannot be set asynchronously.

This is in the core module and manynpmSynchronization in packageAPIOne of the main reasons for this is whether synchronous loading will be treated as aoptionParameters are provided and are mainly used to initialize tasks, not to replace asynchrony.API.

Unfortunately, this is not always possible. SynchronizationAPIIt may not always be available, especially for components that use the network during the initialization phase, such as performing a three-way handshake protocol or retrieving configuration parameters in the network. This is true for many clients of middleware systems such as database drivers and message queues.

Widely applicable solutions

Let’s give an example: one nameddbIt will connect to the remote database. Only after the connection and handshake with the server are completed,dbThe module can accept the request. In this case, we usually have two choices:

  • Ensure that the module has been initialized before starting to use, otherwise wait for its initialization. This process must be completed whenever we want to call an operation on an asynchronous module:
const db = require('aDb'); //The async module
module.exports = function findAll(type, callback) {
  if (db.connected) { //is it initialized?
    runFind();
  } else {
    db.once('connected', runFind);
  }

  function runFind() {
    db.findAll(type, callback);
  };
};
  • Using dependency injection (Dependency Injection) instead of directly introducing asynchronous modules. By doing so, we can delay the initialization of some modules until their asynchronous dependencies are fully initialized. This technique transfers the complexity of management module initialization to another component, usually its parent module. In the following example, this component isapp.js
// 模块app.js
const db = require('aDb'); // aDb是一个异步模块
const findAllFactory = require('./findAll');
db.on('connected', function() {
  const findAll = findAllFactory(db);
  // 之后再执行异步操作
});


// 模块findAll.js
module.exports = db => {
  //db 在这里被初始化
  return function findAll(type, callback) {
    db.findAll(type, callback);
  }
}

We can see that if the number of asynchronous dependencies involved is too large, the first scheme is not very suitable.

In addition, useDISometimes it is not ideal, as we are inChapter7-Wiring ModulesAs seen in. In large projects, it can quickly become too complex, especially if it is done manually and asynchronous initialization modules are used. If we use a that is designed to support asynchronous initialization modulesDIContainers, these problems will be alleviated.

However, we will see that there is a third scheme that allows us to easily separate modules from the initialization state of their dependencies.

Pre-initialize queue

A simple mode of separating modules from the initialization state of dependencies involves using queue and command modes. The idea is to save all operations received by a module when it has not been initialized, and then execute these operations immediately after all initialization steps are completed.

Implement an Asynchronous Initialization Module

To demonstrate this simple and effective technology, let’s build an application. First, create a file calledasyncModule.jsThe asynchronous initialization module of:

const asyncModule = module.exports;

asyncModule.initialized = false;
asyncModule.initialize = callback => {
  setTimeout(() => {
    asyncModule.initialized = true;
    callback();
  }, 10000);
};

asyncModule.tellMeSomething = callback => {
  process.nextTick(() => {
    if(!asyncModule.initialized) {
      return callback(
        new Error('I don\'t have anything to say right now')
      );
    }
    callback(null, 'Current time is: ' + new Date());
  });
};

In the above code,asyncModuleIt shows the design pattern of an asynchronous initialization module. It has oneinitialize()Method, in10After a delay of seconds, will initialize theflagThe variable is set totrueAnd notify its callback call (10Seconds is a long time for real applications, but it may be too much for applications with mutually exclusive conditions.

Another methodtellMeSomething()Returns the current time, but if the module has not been initialized, it throws an exception.
The next step is to create another module based on the service we just created. We designed a simpleHTTPRequest handler, in a file calledroutes.jsImplementation in the file of:

const asyncModule = require('./asyncModule');

module.exports.say = (req, res) => {
  asyncModule.tellMeSomething((err, something) => {
    if(err) {
      res.writeHead(500);
      return res.end('Error:' + err.message);
    }
    res.writeHead(200);
    res.end('I say: ' + something);
  });
};

InhandlerCall inasyncModuleThetellMeSomething()Method, and then write its results to theHTTPIn response. As we can see, we are not rightasyncModuleThis may cause problems.

Now, createapp.jsModule, using corehttpThe module creates a very basicHTTPServer:

const http = require('http');
const routes = require('./routes');
const asyncModule = require('./asyncModule');

asyncModule.initialize(() => {
  console.log('Async module initialized');
});

http.createServer((req, res) => {
  if (req.method === 'GET' && req.url === '/say') {
    return routes.say(req, res);
  }
  res.writeHead(404);
  res.end('Not found');
}).listen(8000, () => console.log('Started'));

The above module is the entry point of our application, and all it does is triggerasyncModuleTo initialize and create aHTTPThe server, which uses thehandler(routes.say()) to respond to network requests.

We can now pass the execution as usual.app.jsModule to try to start our server.

After the server starts, we can try to access it using a browser.URLhttp://localhost:8000/And view the content returned from the asyncModule.
As expected, if we send the request immediately after the server starts, the result will be an error, as follows:

Error:I don't have anything to say right now

Obviously, after the asynchronous module is loaded:

This meansasyncModuleNot initialized yet, but we still try to use it, an error will be thrown.

According to the implementation details of the asynchronous initialization module, fortunately we may receive an error or even lose important information and crash the entire application. In general, the situation we have just described must always be avoided.

Most of the time, the above problems may not occur, after all, initialization is generally very fast, so that in practice, it will never happen. However, for high-load applications and cloud servers designed for automatic regulation, the situation is completely different.

Packaging module with preinitialized queue

In order to maintain the robustness of the server, we will now load asynchronous modules by using the pattern we described at the beginning of this section. We will be inasyncModuleDuring the uninitialized period, all called operations are pushed into a pre-initialization queue, and then the queue is refreshed immediately when asynchronous modules are loaded and processed. This is a good application of state mode! We will need two states, one to queue all operations when the module is not initialized, and the other to simply delegate each method to the original when initialization is complete.asyncModuleModules.

In general, we have no chance to modify the code of asynchronous modules; Therefore, in order to add our queuing layer, we need to focus on the originalasyncModuleThe module creates a proxy.

Next, create a file calledasyncModuleWrapper.jsLet’s build it one by one according to each step. The first thing we need to do is to create a proxy and delegate the operation of the original asynchronous module to this proxy:

const asyncModule = require('./asyncModule');
const asyncModuleWrapper = module.exports;
asyncModuleWrapper.initialized = false;
asyncModuleWrapper.initialize = () => {
  activeState.initialize.apply(activeState, arguments);
};
asyncModuleWrapper.tellMeSomething = () => {
  activeState.tellMeSomething.apply(activeState, arguments);
};

In the previous code,asyncModuleWrapperSimply delegate each of its methods toactiveState. Let’s see what these two states look like.

FromnotInitializedStateAt first,notInitializedStateRefers to the state that has not yet been initialized:

// 当模块没有被初始化时的状态
let pending = [];
let notInitializedState = {

  initialize: function(callback) {
    asyncModule.initialize(function() {
      asyncModuleWrapper.initalized = true;
      activeState = initializedState;
      
      pending.forEach(function(req) {
        asyncModule[req.method].apply(null, req.args);
      });
      pending = [];
      
      callback();
    });
  },
  
  tellMeSomething: function(callback) {
    return pending.push({
      method: 'tellMeSomething',
      args: arguments
    });
  }
  
};

Wheninitialize()When the method is called, we trigger initializationasyncModuleModule that provides a callback function as a parameter. This makes ourasyncModuleWrapperKnow when the original module is initialized, perform the pre-initialization queue operation after initialization, empty the pre-initialization queue, and then call the callback function as a parameter. The following are the specific steps:

  1. TheinitializedStateAssign toactiveStateA that indicates that the preinitialization has been completed.
  2. All commands previously stored in the queue to be processed are executed.
  3. Call the original callback.

Because the module at this time has not been initialized, the of this statetellMeSomething()Method creates only one newCommandObject and adds it to the preinitialization queue.

At this point, when the originalasyncModuleWhen the module has not been initialized, the agent should already know that our agent will simply prevent all received requests from entering the pre-initialization queue. Then, when we are notified that initialization is complete, we perform all pre-initialization queue operations and then switch the internal state toinitializedState. Let’s look at the final definition of this proxy module:

let initializedState = asyncModule;

Not surprisingly,initializedStateThe object is only for the originalasyncModuleA reference to! In fact, after initialization is completed, we can safely send any request directly to the original module.

Finally, set the state that the asynchronous module has not yet been loaded, i.e.notInitializedState

let activeState = notInitializedState;

We can now try to start our test server again, but first of all, let’s not forget to use our new one.asyncModuleWrapperObject replaces the originalasyncModuleReferences to modules; This must be done atapp.jsAndroutes.jsModule.

After doing so, if we try to send a request to the server again, we will see that in theasyncModuleWhen the module has not been initialized, the request will not fail. On the contrary, they will hang up until initialization is completed before they are actually executed. Of course, we can be sure that the fault tolerance rate is higher than before.

As you can see, when the asynchronous module is just initialized, the server will wait for the response of the request:

The server will not return a response until the asynchronous module is loaded:

Mode: If the module needs asynchronous initialization, queue each operation until the module fully initializes the release queue.

Now, our server can start to accept requests immediately after startup, and ensure that none of these requests will fail due to the initialization status of its modules. We can use it when not in useDIIn addition, it does not require lengthy and error-prone checks to verify the status of asynchronous modules.

Application of other scenes

The schema we have just introduced is used by many database drivers andORMUsed by the library. The most noteworthy thing is thatMongoose, it isMongoDBTheORM. UseMongoose, there is no need to wait for the database connection to open so that queries can be sent, because each operation is queued and executed later when the connection to the database is fully established. This obviously improves itAPIThe availability of.

Take a look at Mongoose’s source code, and how each of its methods adds a pre-initialized queue through a proxy. Take a look at the code snippet that implements the pattern in this:https://github.com/Automattic …

for (var i in Collection.prototype) {
  (function(i){
    NativeCollection.prototype[i] = function () {
      if (this.buffer) {
        // mongoose中,在缓冲区不为空时,只是简单地把这个操作加入缓冲区内
        this.addQueue(i, arguments);
        return;
      }

      var collection = this.collection
        , args = arguments
        , self = this
        , debug = self.conn.base.options.debug;

      if (debug) {
        if ('function' === typeof debug) {
          debug.apply(debug
            , [self.name, i].concat(utils.args(args, 0, args.length-1)));
        } else {
          console.error('\x1B[0;36mMongoose:\x1B[0m %s.%s(%s) %s %s %s'
            , self.name
            , i
            , print(args[0])
            , print(args[1])
            , print(args[2])
            , print(args[3]))
        }
      }

      return collection[i].apply(collection, args);
    };
  })(i);
}

Asynchronous batch processing and caching

Caching plays a vital role in high-load applications. Caching is used almost anywhere in the network, from static resources such as web pages, images and style sheets to pure data (such as the results of database queries). In this section, we will learn how to apply cache to asynchronous operations and how to make full use of cache to solve the problem of high request throughput.

Implement servers without caching or batch processing

Before that, we will implement a small server to measure the advantages of caching and batch processing technologies in solving high-load applications.

Let’s consider a management e-commerce company saleswebServers, especially for querying the sum of all specific types of merchandise transactions on our servers. To this end, considering thatLevelUPThe simplicity and flexibility of, we will use againLevelUP. The data model we want to use is stored in thesalesThis onesublevelA list of simple transactions in, which is in the following form:

transactionId {amount, item}

keyBytransactionIdIndicates that,valueIt is oneJSONObject that contains theamountA that represents the sum of the sales amount anditemA that represents the project type.
The data to be processed is very basic, so let’s take a look at the data calledtotalSales.jsImplementation in fileAPI, as follows:

const level = require('level');
const sublevel = require('level-sublevel');

const db = sublevel(level('example-db', {valueEncoding: 'json'}));
const salesDb = db.sublevel('sales');

module.exports = function totalSales(item, callback) {
  console.log('totalSales() invoked');
  let sum = 0;
  salesDb.createValueStream()  // [1]
    .on('data', data => {
      if(!item || data.item === item) {  // [2]
        sum += data.amount;
      }
    })
    .on('end', () => {
      callback(null, sum);  // [3]
    });
};

The core of this module istotalSalesFunction, which is also uniqueexportsTheAPI; It does the following:

  1. We started from thesalesDbThesublevelCreate aStream.StreamAll entries will be extracted from the database.
  2. MonitordataEvent, this event will be triggered from the databaseStreamIf this item’sitemParameters are exactly what we needitem, to accumulate itamountOn the wholesumInside.
  3. Finally,endWhen the event triggers, we finally call thecallback()Methods.

The above query method may not be good in performance. Ideally, in actual applications, we can use indexes or even incremental mapping to shorten the time of real-time computation. However, because we need to reflect the advantages of caching, slow queries are actually better for the above example because it highlights the advantages of the pattern we want to analyze.

In order to complete the total sales application, we only need to start fromHTTPServer exposuretotalSalesTheAPI; So, the next step is to build a (app.jsFile):

const http = require('http');
const url = require('url');
const totalSales = require('./totalSales');

http.createServer((req, res) => {
  const query = url.parse(req.url, true).query;
  totalSales(query.item, (err, sum) => {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, () => console.log('Started'));

The server we created is very simple; We just need it exposedtotalSales API.
Before we start the server for the first time, we need to populate the database with some sample data. We can use thepopulate_db.jsScript to perform this operation. The script will be created in the database100KA random sales transaction.
Okay! Now, everything is ready. As usual, start the server and we execute the following command:

node app

Request thisHTTPInterface to the followingURL

http://localhost:8000/?item=book

However, in order to better understand the performance of the server, we need to send multiple requests continuously. So, we created a project calledloadTest.jsThe script for the200 msSend requests at intervals of. It has been configured to connect to the server’sURL, so to run it, execute the following command:

node loadTest

We will see that these 20 requests will take some time to complete. Pay attention to the total execution time of the test, because we start our service now and measure how much time we can save.

Batch asynchronous request

When handling asynchronous operations, the most basic cache level can be achieved by centralizing a set of calls into the same oneAPITo achieve. This is very simple: if we call an asynchronous function while there is another callback in the queue that has not been processed, we can attach the callback to the already running operation instead of creating a completely new request. Look at the following picture:

The previous image shows two clients (they can be two different machines or two different onesWebRequest) to invoke the same asynchronous operation with exactly the same input. Of course, the natural way to describe this situation is for two customers to start two separate operations, which will be completed at two different times, as shown in the previous figure. Now consider the next scenario, as shown in the following figure:

The above picture shows us how to correctAPIIn other words, perform the same operation on both requests. By doing so, when the operation is completed, both clients will be notified at the same time. This represents a simple and very powerful way to reduce the load of applications without having to deal with more complex caching mechanisms, which usually requires appropriate memory management and cache invalidation policies.

Use batch processing in Web servers for e-commerce sales

Now let’s be intotalSales APIAdd a batch layer on. The model we want to use is very simple: if you useAPIAnother identical request was pending when it was called, and we will add this callback to a queue. When the asynchronous operation is completed, all callbacks in its queue are immediately called.

Now, let’s change the previous code: create a code namedtotalSalesBatch.jsNew module of. Here, we will be in the originaltotalSales APIImplement a batch layer above:

const totalSales = require('./totalSales');

const queues = {};
module.exports = function totalSalesBatch(item, callback) {
  if(queues[item]) {  // [1]
    console.log('Batching operation');
    return queues[item].push(callback);
  }
  
  queues[item] = [callback];  // [2]
  totalSales(item, (err, res) => {
    const queue = queues[item];  // [3]
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};

totalSalesBatch()The function is primitivetotalSales() APIThe agent of, its working principle is as follows:

  1. If requesteditemIt already exists in the queue, which means that this particularitemThe request for is already in the server task queue. In this case, all we have to do is pull backpushTo an existing queue and immediately returns from the call. No follow-up operations will be carried out.
  2. If requesteditemNot in the queue, which means we must create a new request. To this end, we are for the specificitemTo create a new queue and initialize it using the current callback function. Next, we call the originaltotalSales() API.
  3. When the originaltotalSales()When the request is completed, our callback function is executed, and we traverse theitemAdd all callbacks and call these callback functions separately.

totalSalesBatch()Function’s Behavior and OriginaltotalSales() APIThe behavior of the is the same, the difference is that now the request for the same contentAPIBatch processing is carried out, thus saving time and resources.

I’d like to know that compared tototalSales() APIWhat are the performance advantages of the original non-batch version? Then, let’sHTTPUsed by the servertotalSalesThe module is replaced by the module we just created and modifiedapp.jsThe documents are as follows:

//const totalSales = require('./totalSales');
const totalSales = require('./totalSalesBatch');
http.createServer(function(req, res) {
// ...
});

If we try to start the server again and do the load test now, the first thing we see is that the request is returned in batch.

In addition, we observed a significant reduction in the total time required for requests; It should at least compare with the originaltotalSales() APIThe original test was performed four times faster!

This is an amazing result, which proves that only a simple batch processing layer is required to achieve a huge performance improvement. Compared with the caching mechanism, it is not too complicated, because there is no need to consider the cache retirement strategy.

Batch processing mode is used in high load applications and slow executionAPIIt is precisely because of the application of this mode that it can handle a large number of requests in batch.

Asynchronous Request Cache Policy

One of the problems with asynchronous batch mode is that forAPIThe faster the reply, the less significant it is to batch processing. Some people may argue that if oneAPIIt is already very fast, so there is no point in trying to optimize it. However, it is still a factor that takes up the resource load of the application. In summary, there is still a solution. In addition, ifAPIThe results of the call will not change frequently; Therefore, batch processing will not have better performance improvement at this time. In this case, the best solution to reduce application load and improve response speed is definitely a better caching mode.

The caching pattern is simple: once the request is completed, we store the results in a cache, which can be variables, entries in a database, or a specialized caching server. Therefore, the next callAPIYou can retrieve the results from the cache immediately instead of generating another request.

For an experienced developer, caching should not be a new technology, but the difference in asynchronous programming is that it should be combined with batch processing to achieve the best results. The reason is that multiple requests may run concurrently without setting the cache, and when these requests are completed, the cache will be set multiple times, which will result in a waste of cache resources.

Based on these assumptions, the final structure of the asynchronous request cache mode is shown in the following figure:

The above figure shows the two steps of asynchronous caching algorithm:

  1. It is exactly the same as batch mode and will be batch processed with any requests received when no cache is set. When these requests are completed, the cache will be set once.
  2. When the cache is finally set, any subsequent requests will be provided directly from the cache.

In addition, we need to considerZalgoThe reaction of (we have been inChapter 2-Node.js Essential PatternsSee its practical application). When dealing with asynchronyAPIWhen accessing the cache, we must ensure that the cached values are always returned asynchronously, even if accessing the cache involves only synchronous operations.

Use asynchronous cache requests in Web servers sold in e-commerce

Practice the advantages of asynchronous caching mode, now let’s apply what we have learned tototalSales() API.

Like the asynchronous batch sample program, we create a proxy to add a cache layer.

Then create a file calledtotalSalesCache.jsThe code is as follows:

const totalSales = require('./totalSales');

const queues = {};
const cache = {};

module.exports = function totalSalesBatch(item, callback) {
  const cached = cache[item];
  if (cached) {
    console.log('Cache hit');
    return process.nextTick(callback.bind(null, null, cached));
  }
  
  if (queues[item]) {
    console.log('Batching operation');
    return queues[item].push(callback);
  }
  
  queues[item] = [callback];
  totalSales(item, (err, res) => {
    if (!err) {
      cache[item] = res;
      setTimeout(() => {
        delete cache[item];
      }, 30 * 1000); //30 seconds expiry
    }
    
    const queue = queues[item];
    queues[item] = null;
    queue.forEach(cb => cb(err, res));
  });
};

We can see that the previous code is basically the same as many parts of our asynchronous batch processing. In fact, the only difference is the following:

  • The first thing we need to do is to check whether the cache is set. If this is the case, we will use it immediately.callback()Returns the cached value, which must be used hereprocess.nextTick()Because the cache may be set asynchronously, it cannot be guaranteed that the cache has been set until the next event poll.
  • Continue asynchronous batch mode, but this time, when the originalAPIUpon successful completion, we save the results to the cache. In addition, we have also set up a cache retirement mechanism30The cache is invalidated after seconds. A simple and effective technology!

Now, we are going to try what we have just created.totalSalesModules. Change firstapp.jsModule, as shown below:

// const totalSales = require('./totalSales');
// const totalSales = require('./totalSalesBatch');
const totalSales = require('./totalSalesCache');
   http.createServer(function(req, res) {
     // ...
});

Now restart the server and use theloadTest.jsThe script is configured as we did in the previous example. Using the default test parameters, compared with the simple asynchronous batch mode, obviously has a better performance improvement. Of course, this depends to a large extent on many factors; For example, the number of requests received and the delay between one request and another. When the number of requests is high and spans a long time, the advantages of using cache batch processing will be more significant.

MemoizationAn algorithm called caching the results of function calls. InnpmYou can find many packages to implement asynchronousmemoizationOne of the most famous ismemoizee.

Description of Implementation of Cache Mechanism

We must remember that in practical applications, we may want to use more advanced invalidation techniques and storage mechanisms. This may be necessary for the following reasons:

  • A large number of cache values may consume a large amount of memory. In this case, the least recently used (LRU) algorithm to maintain a constant memory utilization.
  • When the application is distributed across multiple processes, using simple variables for the cache may cause each server instance to return different results. If this is undesirable for the specific application we are implementing, then the solution is to use shared storage to store the cache. Common solutions areRedisAndMemcached.
  • Compared with timing cache retirement, manual cache retirement can prolong the service life of cache and provide updated data, but of course, managing cache is much more complicated.

Batch processing and caching using Promise

InChapter4-Asynchronous Control Flow Patterns with ES2015 and BeyondWe have seenPromiseHow to greatly simplify our asynchronous code, but it can provide more help when dealing with batch processing and caching.

utilizePromiseAsynchronous batch processing and caching strategies have the following two advantages:

  • Multiplethen()Listeners can be attached to the samePromiseExamples.
  • then()Listeners are guaranteed to be called at most once, even whenPromiseHas beenresolveAfter that,then()Can also work normally. In addition,then()It is always guaranteed to be called asynchronously.

In short, the first advantage is exactly what batch processing requests require, while the second advantage isPromiseThe same mechanism for asynchronously returning cached values is also provided when the cache is already parsing values.

Let’s start with the code, we can try to use it.PromisesFortotalSales()Create a module to add batch processing and caching functions. Create a file calledtotalSalesPromises.jsNew module for:

const pify = require('pify');  // [1]
const totalSales = pify(require('./totalSales'));

const cache = {};
module.exports = function totalSalesPromises(item) {
  if (cache[item]) {  // [2]
    return cache[item];
  }

  cache[item] = totalSales(item)  // [3]
    .then(res => {  // [4]
      setTimeout(() => {delete cache[item]}, 30 * 1000); //30 seconds expiry
      return res;
    })
    .catch(err => {  // [5]
      delete cache[item];
      throw err;
    });
  return cache[item];  // [6]
};

PromiseIt is really good. The following is a functional description of the above functions:

  1. First of all, we need a person namedpifyThe module of, it allows us tototalSales()Module progresspromisification. After doing so,totalSales()It will return one that meets the ES2015 standard.PromiseInstance instead of accepting a callback function as an argument.
  2. When calledtotalSalesPromises(), we check whether the given item type already has a corresponding in the cachePromise. If we already have thisPromise, we directly return to thisPromiseExamples.
  3. If we do not have a for a given item type in the cachePromise, we continue by calling the original (promisifiedOftotalSales()To create aPromiseExamples.
  4. WhenPromiseNormalresolve, we set a time to clear the cache (assuming30 seconds), we returnresReturns the result of the operation to the application.
  5. IfPromiseBe abnormalreject, we immediately reset the cache and throw an error again to propagate it to thePromise chain, so any attached to the samePromiseOther applications of the will also receive this exception.
  6. Finally, we return to what we just created or cached.PromiseExamples.

Very simple and intuitive, and more importantly, we usePromiseBatch processing and caching can also be implemented.
If we want to try to use it nowtotalSalesPromise()Function, slightly adjustedapp.jsModule, because now in usePromiseInstead of a callback function. Let’s create a database calledappPromises.jsApp module to implement:

const http = require('http');
const url = require('url');
const totalSales = require('./totalSalesPromises');

http.createServer(function(req, res) {
  const query = url.parse(req.url, true).query;
  totalSales(query.item).then(function(sum) {
    res.writeHead(200);
    res.end(`Total sales for item ${query.item} is ${sum}`);
  });
}).listen(8000, function() {console.log('Started')});

Its implementation is almost exactly the same as the original application module, except that what we use now is based onPromiseBatch/Cache Package Version of; Therefore, the way we call it is also slightly different.

Run the following command to start this new version of the server:

node appPromises

Run tasks with CPU-bound

Although the abovetotalSales()It consumes a lot of system resources, but it will not affect the server’s ability to handle concurrency. We areChapter1-Welcome to the Node.js PlatformWe learned about the event loop in, and we should provide an explanation for this behavior: calling asynchronous operations will cause the stack to return to the event loop, thus preventing it from processing other requests.

However, what happens when we run a long synchronization task and never return control to the event loop?

This kind of task is also calledCPU-boundBecause its main features areCPUHigher utilization rate instead ofI/OThe operation is heavy.
Let’s take a look at these types of tasks in an example immediatelyNode.jsThe specific behavior in.

Solving the Sum of Subsets Problem

Now let’s make oneCPUExperiments that take up a relatively high amount of computation. Next, we will look at the sum of subsets. We will calculate whether there is a subarray in an array, and the sum is 0. For example, if we have an array[1, 2, -4, 5, -3]As input, the subarrays that satisfy the problem are[1, 2, -3]And[2, -4, 5, -3].

The simplest algorithm is to traverse each array element and then calculate in turn, with a time complexity ofO(2^n), or in other words, it grows exponentially with the length of the input array. This means a group20Integers can have as many as1, 048, 576The situation, obviously cannot be done through exhaustive. Of course, the solution to this problem may not be complicated. In order to make things more difficult, we will consider the following changes of arrays and problems: given a set of integers, we will calculate all possible combinations, the sum of which is equal to any given integer.

const EventEmitter = require('events').EventEmitter;
class SubsetSum extends EventEmitter {
  constructor(sum, set) {
      super();
      this.sum = sum;
      this.set = set;
      this.totalSubsets = 0;
    } //...
}

SubsetSumClass isEventEmitterSubclass of class; This causes us to issue an event each time we find a new subset that matches the sum received as input. We will see that this will give us great flexibility.

Next, let’s look at how we can generate all possible subset combinations:

Start building such an algorithm. Create a file calledsubsetSum.jsNew module of. In which one is declaredSubsetSumClass:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combine(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}

No matter what the content of the algorithm is, there are two points to note:

  • _combine()Methods are completely synchronized; It recursively generates every possible subset instead ofCPUControl is returned to the event loop. If we think about it, this is not necessary for anyI/OIs very normal.
  • Every time a new combination is generated, we will provide this combination to_processSubset()Methods for further processing.

_processSubset()The method is responsible for verifying whether the sum of elements in a given subset is equal to the number we are looking for:

_processSubset(subset) {
  console.log('Subset', ++this.totalSubsets, subset);
  const res = subset.reduce((prev, item) => (prev + item), 0);
  if (res == this.sum) {
    this.emit('match', subset);
  }
}

In short,_processSubset()The method willreduceOperations are applied to subsets to calculate the sum of their elements. Then, when the sum of the results equals a givensumParameter, anmatchEvents.

Finally, callstart()The method starts executing the algorithm:

start() {
  this._combine(this.set, []);
  this.emit('end');
}

By calling_combine()Trigger algorithm, and finally trigger oneendEvent, indicating that all combinations have been checked and any possible matches have been calculated. This is possible because_combine()Is synchronous; Therefore, as long as the previous function returns,endThe event will trigger, which means all combinations have been calculated.

Next, we will publish the newly created algorithm on the Internet. You can use a simpleHTTPThe server responds to the responsive task. In particular, we hope to/subsetSum? data=<Array>&sum=<Integer>This request format responds by passing in the given array andsum, useSubsetSumThe algorithm matches.

In a place calledapp.jsThis simple server is implemented in the module of:

const http = require('http');
const SubsetSum = require('./subsetSum');

http.createServer((req, res) => {
  const url = require('url').parse(req.url, true);
  if(url.pathname === '/subsetSum') {
    const data = JSON.parse(url.query.data);
    res.writeHead(200);
    const subsetSum = new SubsetSum(url.query.sum, data);
    subsetSum.on('match', match => {
      res.write('Match: ' + JSON.stringify(match) + '\n');
    });
    subsetSum.on('end', () => res.end());
    subsetSum.start();
  } else {
    res.writeHead(200);
    res.end('I\m alive!\n');
  }
}).listen(8000, () => console.log('Started'));

Due toSubsetSumInstances use events to return results, so we can use matching results immediately after the algorithm is generated.StreamFor processing. Another detail to note is that every time our server returnsI'm alive!, so that we send one at a time that is different from/subsetSumAt the time of the request. Can be used to check if our server is down, which will be seen later.

Start running:

node app

Once the server starts, we are ready to send our first request; Let’s try to send a set of 17 random numbers, which will result in131,071A combination, then the server will process for a period of time:

curl -G http://localhost:8000/subsetSum --data-urlencode "data=[116,119,101,101,-116,109,101,-105,-102,117,-115,-97,119,-116,-104,-105,115]"--data-urlencode "sum=0"

This is if we try to enter the following command in another terminal while the first request is still running, we will find a huge problem:

curl -G http://localhost:8000

We will see that the last request is still pending until the end of the first request. The server did not return a response! This is what we think.Node.jsThe event loop runs in a separate thread. If this thread is blocked by a long synchronization computation, it will not be able to execute another loop to respond.I'm alive!,
We must know that this code obviously cannot be used for applications that receive multiple requests at the same time.

But don’t be rightNode.jsDespair, we can solve this situation in several ways. Let’s analyze the two most common scenarios:

Use setImmediate

In general,CPU-boundThe algorithm is based on certain rules. It can be a set of recursive calls, a loop, or any variation/combination of these. Therefore, a simple solution to our problem is to return control to the event loop after these steps are completed (or after a certain number of steps). In this way, any pendingI / OThe algorithm can still be generated when the event cycle runs for a long time.CPUThe time interval between the processing. For this problem, the way to solve this problem is to put the next step of the algorithm in any possible suspensionI/ORun after request. This sounds likesetImmediate()The perfect use case for the methodChapter2-Node.js Essential PatternsThis was introduced inAPI)。

Mode: UsesetImmediate()Staggered execution of long-running synchronization tasks.

Steps of Subset Sum Algorithm Using setImmediate

Now let’s look at how this pattern is applied to the subset summation algorithm. All we have to do is make a few changes.subsetSum.jsModules. For convenience, we’ll create a file calledsubsetSumDefer.jsThe new module, will be the originalsubsetSumClass as a starting point.
The first change we want to make is to add a name called_combineInterleaved()Is the core of the model we are implementing:

_combineInterleaved(set, subset) {
  this.runningCombine++;
  setImmediate(() => {
    this._combine(set, subset);
    if(--this.runningCombine === 0) {
      this.emit('end');
    }
  });
}

As we can see, all we have to do is usesetImmediate()Calling the original synchronized_combine()Methods. However, the problem now is that the algorithm is no longer synchronized, and it is more difficult for us to know when all the combinations have been calculated.

In order to solve this problem, we must use a method very similar to the one we used inChapter3-Asynchronous Control Flow Patterns with CallbacksSee asynchronous parallel execution pattern_combine()All running instances of the method. When_combine()Triggered when all instances of the method have finished running.endEvent to notify any listener that all actions required by the process have been completed.

For the reconstructed version of the final subset summation algorithm. First of all, we need to_combine()The recursive step in the method is replaced by asynchronous:

_combine(set, subset) {
  for(let i = 0; i < set.length; i++) {
    let newSubset = subset.concat(set[i]);
    this._combineInterleaved(set.slice(i + 1), newSubset);
    this._processSubset(newSubset);
  }
}

Through the above changes, we ensure that every step of the algorithm will be usedsetImmediate()In the event loop queue, in the event loop queueI / OThe request is executed after the request, instead of blocking due to synchronous operation.

Another small adjustment is forstart()Methods:

start() {
  this.runningCombine = 0;
  this._combineInterleaved(this.set, []);
}

In the previous code, we will_combine()The number of running instances of the method is initialized to0. We also call the_combineInterleaved()To replace the call with_combine()And removedendTriggered by, because now_combineInterleaved()It is handled asynchronously.
With this last change, our subset summation algorithm should now be able to run alternately at intervals that the event cycle can run, which may take up a lot of time.CPUAnd will not cause blocking again.

Last updateapp.jsModule so that it can use the new version ofSubsetSum

const http = require('http');
// const SubsetSum = require('./subsetSum');
const SubsetSum = require('./subsetSumDefer');
http.createServer(function(req, res) {
  // ...
})

Start running in the same way as before with the following results:

At this point, run asynchronously and will no longer block.CPUHere we go.

Interleaving mode

As we can see, run one while maintaining the responsiveness of the applicationCPU-boundThe task of is not complicated, only need to usesetImmediate()The code that executes synchronously can be changed into asynchronous execution. However, this is not the most efficient model. As a matter of fact, delaying the execution of a task will bring about an additional small overhead. In such an algorithm, the accumulation of a small sum will have a significant impact. This is usually what we are runningCPUThe last thing we need to limit the task, especially if we have to return the results directly to the user, this should be responded to within a reasonable time. One possible solution to alleviate this problem is to use it only after a certain number of steps.setImmediate()Instead of using it in every step. However, this still cannot solve the root cause of the problem.

Remember, this does not mean that once we want to execute in asynchronous modeCPU-boundIn fact, from a broader perspective, synchronization tasks are not necessarily so long and complicated as to cause trouble. In a busy server, even blocking event loops200Millisecond tasks can also cause undesirable delays. For those servers with low concurrency, even if there is a certain short-term blocking, the performance will not be affected and staggered execution will be used.setImmediate()May be the simplest and most effective way to avoid blocking the event cycle.

process.nextTick()It cannot be used to interleave long-running tasks. As we areChapter1-Welcome to the Node.js PlatformAs you can see in,nextTick()Will not return in anyI / ODispatched before and called repeatedlyprocess.nextTick()It will eventually lead toI / OHunger. You can use theprocess.nextTick()replacesetImmediate()To verify.

Use multiple processes

UseInterleaving modeIt’s not what we use to runCPU-boundThe only way to accomplish a task; Another mode to prevent event loop blocking is to use subprocesses. we have known that.Node.jsIn operationI / OIntensive applications (such as Web servers) are the best becauseNode.jsThis enables us to optimize resource utilization through asynchrony.

Therefore, the best way we must keep the application responsive is not to run expensive in the context of the main applicationCPU-boundTask, but using a separate process. This has three main advantages:

  • Synchronization tasks can run at full speed without staggered steps
  • InNode.jsThe processing process in is very simple, and may be better than modifying one to usesetImmediate()Our algorithm is easier, and multi-process allows us to easily use multiple processors without expanding the main application itself.
  • If we really need ultra-high performance, we can use low-level languages, such as those with good performance.C.

Node.jsThere is a sufficientAPILibrary brings interaction with external processes. We can do it atchild_processFind everything we need in the module. Moreover, when an external process is just another oneNode.jsIt’s very easy to connect it to the main application when the program is running, and we don’t even think we run anything outside the local application. This is due tochild_process.fork()Function that creates a new childNode.jsProcess, and automatically create a communication pipeline, so that we can use andEventEmitterVery similar interfaces exchange information. Let’s look at how to use this feature to reconstruct our subset summation algorithm.

Delegate subset summation tasks to other processes

ReconstructionSubsetSumThe goal of the task is to create a separate subprocess that is responsible for processingCPU-boundTo focus the server’s event cycle on processing requests from the network:

  1. We’re going to create a project calledprocessPool.jsIt will allow us to create a running process pool. Creating a new process is expensive and takes time, so we need to keep them running continuously, try not to generate interrupts, and be ready to handle requests at all times, so that we can save time and money.CPU. In addition, the process pool needs to help us limit the number of processes running at the same time to avoid denying service to our applications (DoS) attack.
  2. Next, we’ll create a program calledsubsetSumFork.jsThe module of, responsible for abstract subprocess running inSubsetSumTask. Its role will communicate with the subprocess and show the results of the task as coming from the current application.
  3. Finally, we need oneworker(our subprocess), a new oneNode.jsProgram that runs the subset summation algorithm and forwards its results to the parent process.

DoS attacks are attempts to prevent their intended users from using machine or network resources, such as temporarily or indefinitely disrupting or suspending services of hosts connected to the Internet.

实现一个进程池

First from the constructionprocessPool.jsModule start:

const fork = require('child_process').fork;
class ProcessPool {
  constructor(file, poolMax) {
      this.file = file;
      this.poolMax = poolMax;
      this.pool = [];
      this.active = [];
      this.waiting = [];
    } //...
}

In the first part of the module, introduce thechild_process.fork()Function. Then, we defineProcessPoolThe constructor of the that accepts the that represents the to runNode.jsFile parameters of the program and the maximum number of instances running in the poolpoolMaxAs a parameter. Then we define three instance variables:

  • poolIndicates the process to be run.
  • activeRepresents a list of currently running processes
  • waitingA task queue containing all these requests holds tasks that cannot be implemented immediately due to lack of available resources.

LookProcessPoolgenericacquire()Method, which takes out a process that is ready to be used:

acquire(callback) {
  let worker;
  if(this.pool.length > 0) {  // [1]
    worker = this.pool.pop();
    this.active.push(worker);
    return process.nextTick(callback.bind(null, null, worker));
  }

  if(this.active.length >= this.poolMax) {  // [2]
    return this.waiting.push(callback);
  }

  worker = fork(this.file);  // [3]
  this.active.push(worker);
  process.nextTick(callback.bind(null, null, worker));
}

The logic of the function is as follows:

  1. If there is a process in the process pool that is ready to be used, we just need to move it toactiveArray, and then call its callback function asynchronously.
  2. If there are no processes available in the pool or the maximum number of running processes has been reached, you must wait. By placing the current callback into thewaitingArrays.
  3. If we have not reached the maximum number of running processes, we will usechild_process.fork()Create a new process and add it to theactiveList, and then call its callback.

ProcessPoolThe last method of the class is torelease()To put a process back into the process pool:

release(worker) {
  if(this.waiting.length > 0) {  // [1]
    const waitingCallback = this.waiting.shift();
    waitingCallback(null, worker);
  }
  this.active = this.active.filter(w => worker !==  w);  // [2]
  this.pool.push(worker);
}

The previous code is also very simple, and its explanation is as follows:

  • If inwaitingThere are tasks in the task queue that need to be executed. We just need to assign a process to this task.workerExecute.
  • Otherwise, if you are inwaitingThere are no tasks that need to be executed in the task queue, but we willactiveThe processes in the process list of the are put back into the process pool.

As we can see, the process has never been interrupted, only constantly reassigning tasks to it, so that we can save time and space by not restarting a process at each request. However, it is important to note that this may not always be the best choice, which largely depends on the requirements of our application. To reduce the long-term memory usage of the process pool, the possible adjustments are as follows:

  • After a process is idle for a period of time, terminate the process and free up memory space.
  • Add a mechanism to terminate or restart unresponsive or crashed processes.
父子进程通信

Now ourProcessPoolClass is ready and we can use it to implementSubsetSumForkModules,SubsetSumForkThe function of the is to communicate with the subprocess to obtain the sum result of the subsets. As mentioned earlier, it is useful tochild_process.fork()Starting a process has also created a simple message-based pipeline for us to implementsubsetSumFork.jsModule to see how it works:

const EventEmitter = require('events').EventEmitter;
const ProcessPool = require('./processPool');
const workers = new ProcessPool(__dirname + '/subsetSumWorker.js', 2);

class SubsetSumFork extends EventEmitter {
  constructor(sum, set) {
    super();
    this.sum = sum;
    this.set = set;
  }

  start() {
    workers.acquire((err, worker) => {  // [1]
      worker.send({sum: this.sum, set: this.set});

      const onMessage = msg => {
        if (msg.event === 'end') {  // [3]
          worker.removeListener('message', onMessage);
          workers.release(worker);
        }

        this.emit(msg.event, msg.data);  // [4]
      };

      worker.on('message', onMessage);  // [2]
    });
  }
}

module.exports = SubsetSumFork;

First of all, we aresubsetSumWorker.jscallProcessPoolConstructor creation forProcessPoolExamples. We also set the maximum capacity of the process pool to2.

In addition, we are trying to maintain the originalSubsetSumClass has the same public apis. In fact,SubsetSumForkYesEventEmitterA subclass of, whose constructor acceptssumAndset, andstart()Method triggers the execution of the algorithm, and thisSubsetSumForkThe instance runs on a separate process. callstart()What happens when the method is used:

  1. We are trying to get a new subprocess from the process pool. After the process was successfully created, we tried to send a message to the subprocess containingsumAndset.send()The method isNode.jsAutomatically provided tochild_process.fork()All processes created are actually related to the communication pipeline between parent and child processes.
  2. Then we start listening for any messages returned by the subprocess, and we use theon()Method to attach a new event listenerchild_process.fork()Part of the communication channel provided by the created process).
  3. In the event listener, we first check whether we have received aendEvents, which meansSubsetSumAll tasks have been completed, in this case, we deleteonMessageListener and Releaseworker, and put it back into the process pool, no longer let it occupy memory resources andCPUResources.
  4. workerIn order to{event,data}Format generates a message so that we can receive this message externally whenever the subprocess finishes processing the task.

This is it.SubsetSumForkModule Now let’s implement thisworkerApplications.

与父进程进行通信

Now let’s createsubsetSumWorker.jsModule, our application, the entire content of this module will run in a separate process:

const SubsetSum = require('./subsetSum');

process.on('message', msg => {  // [1]
  const subsetSum = new SubsetSum(msg.sum, msg.set);
  
  subsetSum.on('match', data => {  // [2]
    process.send({event: 'match', data: data});
  });
  
  subsetSum.on('end', data => {
    process.send({event: 'end', data: data});
  });
  
  subsetSum.start();
});

Because of ourhandlerIn a separate process, we don’t have to worry about this kind ofCPU-boundTask Blocking Event Cycle, AllHTTPRequests will continue to be processed by the main application’s event loop without interruption.

When the child process starts to start, the parent process:

  1. The child process immediately starts listening for messages from the parent process. This can be done byprocess.on()Functions are easily implemented. We expect the only message from the parent process to be newSubsetSumThe task provides the input message. As long as we receive such a message, we will create oneSubsetSumClass and registermatchAndendEvent listener. Finally, we usesubsetSum.start()Start counting.
  2. Each time the subset summation algorithm receives an event, it encapsulates the result in the format{event,data}Object and send it to the parent process. These messages are then posted onsubsetSumFork.jsModule, as we saw in the previous chapter.

Note: when the subprocess is notNode.jsProcess, the above communication channel is not available. In this case, we can still establish an interface for parent-child process communication by implementing our own protocol on top of the standard input stream and standard output stream exposed to the parent process.

Multiprocess mode

Try the new version of the subset summation algorithm, we only need to replace it.HTTPModules used by the server (filesapp.js):

The operation results are as follows:

More interestingly, we can also try to start two at the same time.subsetSumTask, we can fully see the multi-coreCPUThe role of. On the contrary, if we try to run three at the same timesubsetSumTask, the result should be that the last startup will be suspended. This is not because the event cycle of the main process is blocked, but because we aresubsetSumThe task sets a concurrency limit for two processes.

As we can see, the multi-process mode is more powerful and flexible than the interleaving mode. However, since a single machine providesCPUAnd therefore it is still not scalable. In this case, distributing the load to multiple machines is a better solution.

It is worth mentioning that in operationCPU-boundWhen performing tasks, multithreading can be an alternative to multiprocessing. At present, there are severalnpmThe package discloses a for processing threads of user-level modulesAPI; One of the most popular iswebworker-threads. However, even if threads are lighter, complete processes can provide greater flexibility and have higher and more reliable fault tolerance.

Summary

This chapter covers the following three points:

  • Asynchronous initialization module
  • Batch processing and gentle existenceNode.jsApplication in Asynchronous
  • Use asynchronous or multi-process processingCPU-boundThe task of