Asynchronous Control Flow Based on Callback in Node.js Design Pattern

  javascript, node.js

This series of articles is《Node.js Design Patterns Second Edition》The translation of the original text and the reading notes were serialized and updated in GitHub.Synchronized translation links.

Welcome to pay attention to my column, and subsequent blog posts will be synchronized in the column:

Asynchronous Control Flow Patterns with Callbacks

Node.jsSuch languages are accustomed to synchronous programming styles thatCPSStylish and asynchronousAPIIt is the standard, which may be difficult for beginners to understand. Writing asynchronous code can be a different experience, especially for asynchronous control flow. Asynchronous code can make it difficult for us to predict whenNode.jsThe order in which statements are executed in. For example, reading a group of files, executing a series of tasks, or waiting for a group of operations to complete, all require developers to adopt new methods and technologies to avoid writing inefficient and maintainable code. A common mistake is callback to hell. The amount of code rises sharply and is unreadable, making simple programs difficult to read and maintain. In this chapter, we will see how to avoid callbacks and write clean, manageable asynchronous code by using some rules and some patterns. We will see control flow libraries such asasync, can greatly simplify our problems, improve the readability of our code, and is easier to maintain.

Difficulties in Asynchronous Programming

JavaScriptIt is no doubt easy to confuse the order of asynchronous code in. Closures and the definition of anonymous functions can enable developers to have a better programming experience without requiring developers to manually manage and jump asynchronous operations. This is in line withKISSIn principle. It is simple and can maintain asynchronous code control flow, making it work in a shorter time. Unfortunately, callback nesting comes at the expense of modularity, reusability, and maintainability, increasing the size of the entire function, and resulting in poor code structure. In most cases, creating closures is functionally unnecessary, but it is more of a constraint than a problem related to asynchronous programming. Recognizing that callback nesting can make our code clumsy, and then taking appropriate measures to solve callback hell according to the most suitable solution, this is the difference between novice and expert.

Create a simple Web crawler

To explain the above problems, we have created a simple Web crawler, a command line application that accepts a.URLFor input, you can then download its contents to a file. In the following code, we rely on the following twonpmLibrary.

In addition, we will also cite an example called./utilitiesThe local module of.

The core functions of our application are contained in a program calledspider.jsIn the module. As shown below, first load the dependency package we need:

const request = require('request');
const fs = require('fs');
const mkdirp = require('mkdirp');
const path = require('path');
const utilities = require('./utilities');

Next, we’ll create a program calledspider()The new function of the that accepts theURLIs a parameter and calls a callback function when the download process is complete.

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  fs.exists(filename, exists => {
    if (!exists) {
      console.log(`Downloading ${url}`);
      request(url, (err, response, body) => {
        if (err) {
          callback(err);
        } else {
          mkdirp(path.dirname(filename), err => {
            if (err) {
              callback(err);
            } else {
              fs.writeFile(filename, body, err => {
                if (err) {
                  callback(err);
                } else {
                  callback(null, filename, true);
                }
              });
            }
          });
        }
      });
    } else {
      callback(null, filename, false);
    }
  });
}

The above functions perform the following tasks:

  • Check thisURLHas the file of been downloaded, i.e. verify that the corresponding file has been created:

fs.exists(filename, exists => ...

  • If the file has not been downloaded, the following code is executed for downloading:

request(url, (err, response, body) => ...

  • Then, we need to determine whether the file is already included in the directory:

mkdirp(path.dirname(filename), err => ...

  • Finally, we putHTTPThe message body returned by the request is written into the file system:

mkdirp(path.dirname(filename), err => ...

To complete ourWeb crawlerApplication, just provide oneURLAs input (in our example, we read it from the command line arguments), we simply callspider()Function.

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of "${filename}"`);
  } else {
    console.log(`"${filename}" was already downloaded`);
  }
});

Now, let’s try to run it.Web crawlerApplication, but first, make sure it already existsutilities.jsModules andpackage.jsonAll dependency packages in have been installed in your project:

npm install

After that, we execute our crawler module to download a web page and use the following command:

node spider http://www.example.com

OursWeb crawlerThe application requirements are set forth in theURLAlways include a protocol type (for example,http://)。 In addition, don’t expectHTMLLinks have been rewritten, and do not expect to download resources such as pictures, because this is only a simple example of how asynchronous programming works.

Back to hell

Look at oursspider()Function, we can find that although the algorithm we implemented is very simple, the generated code has several levels of indentation and is difficult to read. Use blocking synchronizationAPIIt is very simple to implement similar functions, and there is little chance to make it look so wrong. However, asynchronous is usedCPSIt’s another matter. Using closures can lead to unreadable code.

A large number of closures and callbacks convert code into unreadable, unmanageable situations called callback hell. It isNode.jsOne of the most recognized and most serious anti-patterns in China. In general, forJavaScriptIn terms of. The typical structure of the code affected by this problem is as follows:

asyncFoo(err => {
  asyncBar(err => {
    asyncFooBar(err => {
      //...
    });
  });
});

We can see how the code written in this way forms a pyramid shape, which is difficult to read due to deep embedding, and is called the “doomsday pyramid”.

The most obvious problem with code like the previous code snippet is poor readability. Because the nesting is too deep, it is almost impossible to track where the callback function ends and where another callback function starts.

Another problem is caused by the overlap of variable names used in each scope. In general, we must use similar or even identical names to describe the contents of variables. The best example is the error parameter received by each callback. Some people often try to use variants with the same name to distinguish objects in each range, for example,errorerrerr1err2Wait. Others prefer to hide variables defined in the scope and always use the same name. For example,err. These two options are far from perfect, and will lead to confusion and increase lead tobugThe possibility of.

In addition, we must remember that although closures cost little in terms of performance and memory consumption. In addition, they can also create unrecognized memory leaks, because we should not forget that any context variables referenced by closures will not be retained by garbage collection.

RegardingV8The closure of the working principle, can refer toVyacheslav Egorov’s blog post.

If we look at ourspider()Function, we will clearly notice that it is a typical scenario of callback hell, and there are all the problems we just described in this function. This is exactly the problem to be solved by the models and techniques we will learn in this chapter.

Use simple JavaScript

Now that we have encountered the first example of a correction to hell, we know what we should avoid. However, this is not the only concern when writing asynchronous code. In fact, there are several situations where controlling the flow of a set of asynchronous tasks requires the use of specific patterns and techniques, especially if we only use common ones.JavaScriptWithout the help of any external library. For example, traversing a collection by applying asynchronous operations sequentially is not like calling in an arrayforEach()It is as simple as that, but in fact it requires a technique similar to recursion.

In this section, we will learn how to avoid going back to hell and how to use simpleJavaScriptImplement some of the most common control flow patterns.

Criteria for callback functions

The first rule to remember when writing asynchronous code is not to abuse closures when defining callbacks. Abusing closures is fun at the moment because it does not require additional thinking on issues such as modularity and reusability. However, we have already seen that this practice does more harm than good. In most cases, fixing the problem of callback to hell does not require any library, fancy technology or paradigm change, but only common sense.

Here are some basic principles that can help us to nest less and improve the organization of our code:

  • Exit the outer function as much as possible. Depending on the context, use thereturncontinueOrbreakTo exit the current code block immediately instead of using theif...elseCode block. Other statements. This will help optimize our code structure.
  • Create named functions for callbacks, avoid closures, and pass intermediate results as parameters. Named functions also make them more elegant in stack traces.
  • The code is as modular as possible. And divide the code into smaller, reusable functions as much as possible.

Guidelines for callback invocation

In order to demonstrate the above principles, we have reconstructedWeb crawlerApplication to explain.

For the first step, we can deleteelseStatement to reconstruct our error checking method. This is returned from the function as soon as we receive the error. Therefore, look at the following code:

if (err) {
  callback(err);
} else {
  // 如果没有错误,执行该代码块
}

We can improve our code structure by writing the following code:

if (err) {
  return callback(err);
}
// 如果没有错误,执行该代码块

With this simple technique, we immediately reduced the nesting level of functions. It is very simple and does not require any complicated refactoring.

When performing the optimization we just described, a common error is forgetting to terminate the function after calling the callback function, namelyreturn. For error handling scenarios, the following code isbugTypical sources of:

if (err) {
  callback(err);
}
// 如果没有错误,执行该代码块

In this example, the execution of the function continues even after the callback is called. Then to avoid this kind of situation,returnStatement is very necessary. Also note that it does not matter what the output returned by the function is. The actual result (or error) is generated asynchronously and passed to the callback. The return value of an asynchronous function is usually ignored. This property allows us to write the following code:

return callback(...);

Otherwise we must split it into two sentences to write:

callback(...);
return;

Next we continue to reconstruct ourspider()Function, we can try to identify reusable code fragments. For example, the ability to write a given string to a file can be easily broken down into a single function:

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

Following the same principle, we can create a system calleddownload()The general function of the that willURLAndFile nameAs input, and willURLThe content of the download to the given file. Internally, we can use the previously createdsaveFile()Function.

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  });
}

Finally, revise ourspider()Functions:

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  fs.exists(filename, exists => {
    if (exists) {
      return callback(null, filename, false);
    }
    download(url, filename, err => {
      if (err) {
        return callback(err);
      }
      callback(null, filename, true);
    })
  });
}

spider()The function’s function and interface are still exactly the same, only the organization of the code is changed. By applying the above basic principles, we can greatly reduce the nesting of code and increase its reusability and testability. In fact, we can consider exportingsaveFile()Anddownload(), so that we can reuse them in other modules. This also makes it easier for us to test their functions.

The refactoring we have done in this section clearly shows that most of the time all we need is some rules and make sure we don’t abuse closures and anonymous functions. It works very well, with minimal workload and uses only the originalJavaScript.

Sequential execution

Now we will begin to explore the execution sequence of asynchronous control flow. We will begin to analyze a string of asynchronous codes to explore its control flow.

Performing a set of tasks in sequence means running them one after another. The order of execution is very important, and its correctness must be guaranteed, because the result of one task in the list may affect the execution of the next task. The following figure illustrates this concept:

The asynchronous control flow mentioned above has some different changes:

  • Performs a set of known tasks in sequence without linking or passing execution results.
  • Use the output of the task as the next input (also calledchain,pipeline, orwaterfall)
  • Iteration of a set, element by element, when running asynchronous tasks on each element

For sequential execution, although direct pattern blocking is usedAPIThe implementation is simple, but it is usually used.Asynchronous CPSSometimes it will lead to the problem of going back to hell.

Perform a set of known tasks in sequence

Implemented in the previous sectionspider()Function, we have encountered the problem of sequential execution. By studying the following ways, we can better control asynchronous code. Based on this code, we can use the following pattern to solve the above problems:

function task1(callback) {
  asyncOperation(() => {
    task2(callback);
  });
}

function task2(callback) {
  asyncOperation(result() => {
    task3(callback);
  });
}

function task3(callback) {
  asyncOperation(() => {
    callback(); //finally executes the callback
  });
}

task1(() => {
  //executed when task1, task2 and task3 are completed
  console.log('tasks 1, 2 and 3 executed');
});

The above mode shows that after one asynchronous operation is completed, the next asynchronous operation is called. This pattern emphasizes the modularization of tasks and avoids using closures when dealing with asynchronous code.

Sequential iteration

The models we described earlier are perfect if we know in advance what to perform and how many tasks to perform. This enables us to hard-code the call to the next task in the sequence, but what happens if we perform asynchronous operations on each item in the collection? In this case, we cannot hard code the task sequence. On the contrary, we must build it dynamically.

Web爬虫版本2

To show an example of sequential iteration, let’sWeb crawlerThe application introduces a new feature. We now want to recursively download all links in the web page. To do this, we will extract all links from the page and then trigger ourWeb crawlerApplications.

The first step is to revise ourspider()Function so that by calling anspiderLinks()The function of triggers recursive download of all links on the page.

In addition, we are now trying to read the file instead of checking if the file already exists and starting to crawl its links. In this way, we can resume the interrupted download. Last but not least, we will ensure that the parameters we pass are up to date and limit the recursion depth. The result code is as follows:

function spider(url, nesting, callback) {
  const filename = utilities.urlToFilename(url);
  fs.readFile(filename, 'utf8', (err, body) => {
    if (err) {
      if (err.code! == 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }
    spiderLinks(url, body, nesting, callback);
  });
}
爬取链接

Now we can create this new version ofWeb crawlerThe core of the application, namelyspiderLinks()Function, which uses a sequential asynchronous iterative algorithm to downloadHTMLAll links to the page. Notice the way we define it in the following code block:

function spiderLinks(currentUrl, body, nesting, callback) {
  if(nesting === 0) {
    return process.nextTick(callback);
  }

  let links = utilities.getPageLinks(currentUrl, body); //[1]
  function iterate(index) { //[2]
    if(index === links.length) {
      return callback();
    }

    spider(links[index], nesting - 1, function(err) { //[3]
      if(err) {
        return callback(err);
      }
      iterate(index + 1);
    });
  }
  iterate(0); //[4]
}

Important steps from this new feature are as follows:

  1. We useutilities.getPageLinks()Function to get a list of all links contained in the page. This function only returns links to the same hostname.
  2. We use a system callediterate()To traverse links, this function needs the index of the next link to analyze. In this function, we first check whether the index is equal to the length of the linked array, if so, the iteration is completed, in which case we call immediatelycallback()Function, because it means that we have handled all the projects.
  3. At this point, the processing link is ready. We call it recursivelyspider()Function.
  4. As aspiderLinks()The last and most important step of the function is to calliterate(0)To start the iteration.

The algorithm we have just proposed allows us to iterate through arrays by performing asynchronous operations in sequence, in our examplespider()Function.

We can try this new version now.Web crawlerApplication, and watch it recursively download all links of the web page one by one. To interrupt this process, if there are many links that may take some time, please remember that we can use them at any time.Ctrl + C. If we decide to restore it, we can do it by startingWeb crawlerThe application also provides the sameURLTo resume execution.

Now our networkWeb crawlerThe application may trigger the download of the entire website, please consider using it carefully. For example, do not set a high nesting level or run away from the crawler for more than a few seconds. It is immoral to reload the server with thousands of requests. In some cases, this is also considered illegal. The consequences need to be considered!

迭代模式

What we showed earlierspiderLinks()The code of the function is a clear example of how to iterate the set when applying asynchronous operations. We can also notice that this is a pattern that can be adapted to any other situation. We need to iterate asynchronously in sequence on the elements of the collection or the usual task list. The model can be generalized as follows:

function iterate(index) {
  if (index === tasks.length) {
    return finish();
  }
  const task = tasks[index];
  task(function() {
    iterate(index + 1);
  });
}

function finish() {
  // 迭代完成的操作
}

iterate(0);

Note that iftask()It is a synchronous operation, and these types of algorithms become really recursive. In this case, the call stack may overflow.

The model we have just proposed is very powerful because it can adapt to several situations. For example, we can map the values of an array, or we can pass the results of an iteration to the next iteration to implement a reduce algorithm. If certain conditions are met, we can exit the loop ahead of time, or we can even iterate an infinite number of elements.

We can also choose to further promote the solution:

iterateSeries(collection, iteratorCallback, finalCallback);

By creating a file callediteratorThis function calls the next executable task in the collection and ensures that the callback function that the iterator ends is called when the current task is completed.

Parallel

In some cases, the execution order of a group of asynchronous tasks is not important, we only need to notify us when all these running tasks are completed. Use parallel execution streams to better handle this situation, as shown in the following figure:

If we think thatNode.jsThis may sound strange if it is single-threaded, but if we remember what we discussed in chapter 1, we realize that even if we have only one thread, we can still achieve concurrency becauseNode.jsThe non-blocking property of. In fact, in this case, parallel words are not used correctly, because this does not mean that tasks are running at the same time, but their execution is not blocked by the underlying layer.APIPerformed and interleaved by event loops.

We know that when a task allows an event loop to execute another task, or a task allows control to return to the event loop. The name of this workflow is concurrency, but for the sake of simplicity, we will still use parallelism.

The following figure shows that two asynchronous tasks can be executed in theNode.jsRun in parallel in the program:

Through the above picture, we have oneMainFunction performs two asynchronous tasks:

  1. MainFunction triggerTask 1AndTask 2The implementation of the. As these trigger asynchronous operations, the two functions return immediately and return control to the main function, and then wait until the event loop is completed before notifying the main thread.
  2. WhenTask 1When the asynchronous operation of is completed, the event loops to its thread control. WhenTask 1When the synchronization operation is completed, it notifiesMainFunction.
  3. WhenTask 2When the asynchronous operation of is completed, the event loops to its thread control. WhenTask 2When the synchronization operation is completed, it will notify againMainFunction. At this point,MainFunction knowledgeTask 1AndTask 2All of them have been executed, so it can continue to perform subsequent operations or return the results of the operations to another callback function.

In short, this means that inNode.jsIn, we can only perform parallel asynchronous operations because their concurrency is controlled by non-blockingAPIHandle internally. InNode.jsIn, synchronous blocking operations cannot run at the same time unless their execution is interleaved with asynchronous operations, or bysetTimeout()OrsetImmediate()Delay. We will see this in more detail in chapter 9.

Web crawler version 3

AboveWeb crawlerIn parallel asynchronous operation also seems to be very perfect. So far, the application is recursively executing the download of linked pages. However, the performance is not optimal, and it is easy to improve the performance of this application.

To do this, we only need to amend itspiderLinks()Function to ensure thatspider()The task is executed only once, and when all the tasks have been executed, the last callback is called, so we are rightspiderLinks()Make the following changes:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0,
    hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }
    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }
  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

What changes have been made to the above codes? Nowspider()All tasks of the function are started synchronously. By simply traversing the linked array and starting each task, we do not have to wait for the previous task to complete before proceeding to the next task:

links.forEach(link => {
  spider(link, nesting - 1, done);
});

Then, the way to make our application aware of the completion of all tasks is tospider()The function provides a special callback function, which we calldone(). When the crawler task is completed,done()Function to set a counter. When the number of downloads completed reaches the size of the linked array, call the final callback:

function done(err) {
  if (err) {
    hasErrors = true;
    return callback(err);
  }
  if (++completed === links.length && !hasErrors) {
    callback();
  }
}

Through the above changes, if we try to run our crawler on the web page now, we will notice a great improvement in the speed of the whole process, because each download is executed in parallel without waiting for the previous link to be processed.

Mode

In addition, for parallel execution processes, we can extract our scheme so as to adapt to different situations and improve code reusability. We can use the following code to represent the general version of the schema:

const tasks = [ /* ... */ ];
let completed = 0;
tasks.forEach(task => {
  task(() => {
    if (++completed === tasks.length) {
      finish();
    }
  });
});

function finish() {
  // 所有任务执行完成后调用
}

With minor modifications, we can adjust the pattern to accumulate the results of each task to one.listIn order to filter or map the elements of the array, or can be called once one or a certain number of tasks have been completedfinish()Callback.

Note: If there is no restriction, a group of asynchronous tasks that are executed in parallel and then wait for all asynchronous tasks to complete before executing callbacks, the method is to calculate the number of their execution completions.

Repairing Competitive Conditions with Concurrent Tasks

When usedBlocking I/OWhen combined with multithreading, running a set of tasks in parallel may cause some problems. However, we have just seen, inNode.jsIn fact, running multiple asynchronous tasks in parallel consumes less resources. This isNode.jsOne of the most important advantages, therefore inNode.jsParallelization has become a common practice, and it is not a complicated technology.

Node.jsAnother important feature of our concurrency model is the way we handle task synchronization and competitive conditions. In multi-threaded programming, this is usually implemented using structures such as locks, mutexes, semaphores, and observers. These are one of the most complex aspects of multi-threaded language parallelization and have a great impact on performance. InNode.jsIn, we usually don’t need a fancy synchronization mechanism, because all run on a single thread! However, this does not mean that we have no competitive conditions. On the contrary, they can be quite common. The root of the problem lies in the delay between the invocation of an asynchronous operation and its result notification. To give a specific example, we can refer to ourWeb crawlerApplications, especially the last version we created, actually contain a competitive condition.

The problem is that at the beginning of downloading the correspondingURLBefore checking whether the file already existsspider()Functions:

function spider(url, nesting, callback) {
  if(spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);
  fs.readFile(filename, 'utf8', function(err, body) {
    if(err) {
      if(err.code !== 'ENOENT') {
        return callback(err);
      }

      return download(url, filename, function(err, body) {
        if(err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

The problem now is, in the sameURLThe two crawler tasks operated on may be called on the same file before one of the two tasks finishes downloading and creates a file, causing the second task to start downloading.fs.readFile()The result was wrong, resulting in two downloads. This situation is shown in the following figure:

The above figure showsTask 1AndTask 2How toNode.js, and how asynchronous operations actually introduce race conditions. In our case, the two crawler tasks will eventually download the same file.
How can we solve this problem? The answer is much simpler than we thought. In fact, all we need is a variable (mutually exclusive variable), which can be excluded from running in the same one.URLMultiple onspider()Task. This can be achieved by the following code:

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);
  // ...
}

Parallel execution frequency limitation

Generally, if the frequency of parallel tasks is not controlled, parallel tasks will lead to overload. Imagine that there are thousands of files to read and accessURLOr database queries run in parallel. In this case, the common problem is insufficient system resources, for example, when trying to open too many files at once, all the file descriptors available to the application are utilized. InWeb application, it may also create a system that exploits denial of service (DoSA loophole in the attack. In all cases, it is best to limit the number of tasks that run simultaneously. In this way, we can add some predictability to the load on the server and ensure that our applications do not run out of resources. The following figure describes a situation where we limit the concurrent running of five tasks to two segments:

It is clear from the above figure how our algorithm works:

  1. We can perform as many tasks as possible without exceeding the concurrency limit.
  2. Every time a task is completed, we perform one or more more tasks while ensuring that the number of tasks does not reach the limit.

Concurrency limit

We now propose a pattern that performs a given set of tasks in parallel with limited concurrency:

const tasks = ...
let concurrency = 2, running = 0, completed = 0, index = 0;

function next() {
  while (running < concurrency && index < tasks.length) {
    task = tasks[index++];
    task(() => {
      if (completed === tasks.length) {
        return finish();
      }
      completed++, running--;
      next();
    });
    running++;
  }
}
next();

function finish() {
  // 所有任务执行完成
}

The algorithm can be considered as a mixture of sequential execution and parallel execution. In fact, we may notice the similarities between the two models we introduced earlier:

  1. We have an iterator function, which we callnext()There is an internal loop that performs as many tasks as possible in parallel while maintaining concurrency limits.
  2. The callbacks we pass to each task check whether all the tasks in the list have been completed. If there are tasks to run, it calls thenext()To perform the next task.

Global concurrency limit

OursWeb crawlerApplications are ideal for applying what we have learned to limit the concurrency of a set of tasks. In fact, in order to avoid climbing thousands of links at the same time, we can limit the number of concurrent downloads by adding some measures.

The version of Node.js before 0.11 has limited the number of concurrent HTTP connections per host to 5. However, this can be changed to meet our needs. Please check the official documentshttp://nodejs.org/docs/v0.10 ….More in axsockets. Starting with Node.js 0.11, there is no default limit on the number of concurrent connections.

We can apply the model we have just learned to ourspiderLinks()Function, but what we will get is to limit the concurrency of a set of links in a page. If we choose a concurrency of 2, we can download up to two links per page in parallel. However, because we can download more than one link at a time, each page will generate another two downloads, so recursion does not completely limit the amount of concurrency.

Use queue

What we really want is to limit the number of global download operations that we can run in parallel. We can slightly modify the pattern shown earlier, but we prefer to use it as an exercise because we want to take this opportunity to introduce another mechanism that uses queues to limit the concurrency of multiple tasks. Let’s see how this works.

We are now going to implement a program calledTaskQueueClass, which combines queues with the algorithm we mentioned earlier. We created a project calledtaskQueue.jsNew module for:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }
  pushTask(task) {
    this.queue.push(task);
    this.next();
  }
  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
};

The constructor of the above class is only used as the concurrency limit of the input, but in addition, it initializes variables of the run and queue. The former variable is a counter that tracks all running tasks, while the latter is an array that will be used as a queue to store pending tasks.

pushTask()The method simply adds the new task to the queue, and then calls thethis.next()To guide the execution of the task.

next()The method generates a set of tasks from the queue to ensure that it does not exceed the concurrency limit.

We may notice some similarities between this approach and the pattern that restricts concurrency we mentioned earlier. It basically starts from the queue with as many tasks as possible without exceeding the concurrency limit. When each task completes, it updates the count of running tasks and then calls againnext()To start another mission.TaskQueueThe interesting property of the class is that it allows us to dynamically add new items to the queue. Another advantage is that now we have a central entity responsible for limiting the concurrency of our tasks, which can be shared across all instances of function execution. In our example, it isspider()Function, we will see later.

Web crawler version 4

Now we have a common queue to perform tasks in limited parallel processes, and we can do this in ourWeb crawlerUse it directly in your application. We first load the new dependency and create it by setting the concurrency limit to 2TaskQueueA new instance of the class:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);

Next, we use the newly createddownloadQueueUpdatespiderLinks()Functions:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0,
    hasErrors = false;
  links.forEach(link => {
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }
        done();
      });
    });
  });
}

This new implementation of this function is very easy. It is very similar to the infinite parallel execution algorithm mentioned earlier in this chapter. This is because we delegate concurrency control toTaskQueueObject, the only thing we have to do is to check whether all the tasks have been completed. See how our tasks are defined in the above code:

  • We run by providing custom callbacksspider()Function.
  • In the callback, we checked withspiderLinks()Whether all tasks related to function execution are completed. When this condition is true, we call the last callback of the spiderLinks () function.
  • At the end of our mission, we calleddone()Callback so that the queue can continue to execute.

After making these small changes, we can now try to run again.Web crawlerApplications. This time, we should note that there will not be more than two downloads at the same time.

Async library

If we take a look at every control flow pattern we have analyzed so far, we can see that they can be used as a basis for building reusable and more general solutions. For example, we can wrap unlimited parallel execution algorithms into a function that accepts a task list, run them in parallel, and call a given callback function when they are all complete. This way of transforming control flow algorithms into reusable functions can lead to a more declarative and expressive way to define asynchronous control flows, which is exactly what it isasyncWhat he did.asyncLibrary is a very popular solution inNode.jsAndJavaScriptIs used to process asynchronous code. It provides a set of functions, which can greatly simplify the execution of a set of tasks in different configurations and provide useful help for asynchronous processing sets. Even if there are several other libraries with similar goals, due to its popularity, thereforeasyncYesNode.jsA de facto standard in.

Sequential execution

asyncLibraries can greatly help us in implementing complex asynchronous control processes, but one difficulty is to choose the right library to solve the problem. For example, for sequential execution, there are about 20 different functions to choose from, includingeachSeries(),mapSeries(),filterSeries(),rejectSeries(),reduce(),reduceRight(),detectSeries(),concatSeries(),series(),whilst(),doWhilst(),until(),doUntil(),forever(),waterfall(),compose(),seq(),applyEachSeries(),iterator(), andtimesSeries()
.

Choosing the right function is an important step in writing more stable and readable code, but it also requires some experience and practice. In our example, we will only introduce some of them, but they will still provide a solid foundation for understanding and effectively using the rest of the library.

The following is an exampleasyncHow the library works, we will use it for ourWeb crawlerApplications. We start directly from version 2 and download all links recursively in sequence.

However, first of all, we will ensure thatasyncThe library is installed into our current project:

npm install async

Then we need to start fromspider.jsModule Load New Dependencies:

const async = require('async');

The sequential execution of a set of tasks is known

Let’s revise it first.download()Function. As shown below, it has done the following three things in turn:

  1. downloadURLThe content of.
  2. Create a new directory if it does not already exist.
  3. willURLThe contents of the are saved to a file.

async.series()You can perform a set of tasks in sequence:

async.series(tasks, [callback])

async.series()Accept a task list and a callback function called after all tasks are completed as parameters. Each task is only a function that accepts a callback function, which is called when the task completes execution:

function task(callback) {}

asyncThe advantage is that it uses andNode.jsThe same callback convention automatically handles error propagation. Therefore, if any task calls its callback and generates an error,asyncWill skip the remaining tasks in the list and jump directly to the final callback.

With this in mind, let’s look at how to useasyncTo modify the abovedownload()Functions:

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  let body;
  async.series([
    callback => {
      request(url, (err, response, resBody) => {
        if (err) {
          return callback(err);
        }
        body = resBody;
        callback();
      });
    },
    mkdirp.bind(null, path.dirname(filename)),
    callback => {
      fs.writeFile(filename, body, callback);
    }
  ], err => {
    if (err) {
      return callback(err);
    }
    console.log(`Downloaded and saved: ${url}`);
    callback(null, body);
  });
}

For the callback hell version of this code, use theasyncThe method enables us to better organize our asynchronous tasks. And callbacks will not be nested, because we only need to provide a list of tasks, usually for each asynchronous operation, then asynchronous tasks will be executed in turn:

  1. First, downloadURLThe content of. We save the response body to a closure variable (body) so that it can be shared with other tasks.
  2. Create and save a directory of downloaded pages. Through implementationmkdirp()Function and is bound to the directory path created. In this way, we can save a few lines of code and increase its readability.
  3. Finally, we will downloadURLWrite the contents of the. In this case, we cannot execute some applications (as we did in the second task), because variablesbodyAvailable only after the download task in the series is completed. However, by passing the callback of the task directly to thefs.writeFile()Function, we can still save some lines of code by using asynchronous automatic error management.

4. After completing all tasks, will callasync.series()The last callback of the. In our example, we just do some error management and then returnbodyVariable to callbackdownload()Function.

For the above situation,async.series()An alternative to isasync.waterfall(), which still performs tasks in sequence, but also provides the output of each task as the next input. In our case, we can use this feature to spreadbodyVariables until the end of the sequence.

Sequential iteration

Earlier, I explained how to execute a set of tasks in sequence. The above exampleasync.series()To do this. Can be implemented using the same functionsWeb crawler version 2ThespiderLinks()Function. However,asyncIt provides a more suitable one for specific situations.API, traverse a collection, thisAPIYesasync.eachSeries(). Let’s use it to re-implement ourspiderLinks()Function (Version 2, Serial Download), as follows:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  async.eachSeries(links, (link, callback) => {
    spider(link, nesting - 1, callback);
  }, callback);
}

If we will useasyncThe above code of and use pureJavaScriptCompared with the code of the same function implemented by the pattern, we will notice thatasyncGreat advantages in code organization and readability.

concurrent execution

asyncIt does not have the function of processing parallel streams, among which can be foundeach(),map(),filter(),reject(),detect(),some(),every(),concat(),parallel(),applyEach()Andtimes(). They follow the same logic as the functions we have seen for sequential execution, except that the tasks provided are executed in parallel.

In order to prove this point, we can try to apply one of the above functions to realize ourWeb crawlerThe third version of the application uses unrestricted parallel processes to perform downloads.

If we remember the code we used to implement itspiderLinks()Function, then adjusting it to work in parallel is relatively simple:

function spiderLinks(currentUrl, body, nesting, callback) {
  // ...
  async.each(links, (link, callback) => {
    spider(link, nesting - 1, callback);
  }, callback);
}

This function is exactly the same as our function for sequential downloading, but it usesasync.each()Rather thanasync.eachSeries(). This clearly demonstrates the use of libraries (e.g.async) abstract the function of asynchronous flow. The code is no longer bound to a specific execution process, and there is no code specifically written for this purpose. Most are just application logic.

Restrict parallel execution

If you want to knowasyncIt can also be used to limit the concurrency of parallel tasks, and the answer is yes. We have some functions that we can use, namelyeachLimit(),mapLimit(),parallelLimit(),queue()Andcargo().

We are trying to use one of them to realizeWeb crawlerVersion 4 of the application, which performs linked downloads in parallel with limited concurrency. Fortunately,asyncYesasync.queue()Which works the same way as the one created earlier in this chapterTaskQueueSimilar.async.queue()Function to create a new queue that uses aworker()Function to perform a set of tasks with specified concurrency limits:

const q = async.queue(worker, concurrency);

worker()Function as input to receive the task to be run and a callback function as parameters to execute the callback when the task is completed:

function worker(task, callback);

We should note that in this exampletaskIt can be of any type, not just a function. In fact,workerHave the responsibility to handle the task in the most appropriate way. New tasks can be created byq.push(task, callback)Add tasks to the queue. After a task is processed, the callback function associated with a task must beworkerCall.

Now, we modify our code once again to implement a fully parallel execution flow with concurrency restrictions, usingasync.queue()First, we need to create a queue:

const downloadQueue = async.queue((taskData, callback) => {
  spider(taskData.link, taskData.nesting - 1, callback);
}, 2);

The code is simple. We are creating a new queue with a concurrency limit of 2, so that a worker can simply use the data associated with the task to call ourspider()Function. Next, we realizespiderLinks()Functions:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  const completed = 0,
    hasErrors = false;
  links.forEach(function(link) {
    const taskData = {
      link: link,
      nesting: nesting
    };
    downloadQueue.push(taskData, err => {
      if (err) {
        hasErrors = true;
        return callback(err);
      }
      if (++completed === links.length && !hasErrors) {
        callback();
      }
    });
  });
}

The previous code should look very familiar, as it is almost identical to the use of.TaskQueueObject to implement the same process. In addition, in this case, the important part to be analyzed is to push the new task into the queue. At this point, we ensure that we pass a callback so that we can check whether all download tasks on the current page are completed and finally call the final callback.

There are many lossesasync.queue(), we can easily copy ourTaskQueueThe function of the object, once again proved throughasync, we can avoid writing asynchronous control flow patterns from scratch, reduce our workload, and the amount of code is more concise.

Summary

At the beginning of this chapter, we saidNode.jsIt may be very difficult to program for because of its asynchrony, especially for people who have previously developed on other platforms. However, in this chapter we show asynchronyAPIHow can we start from simple primordialJavaScriptAt first, it laid the foundation for us to analyze more complex technologies. Then we saw that in addition to providing programming styles for each flavor, the tools we have are indeed diversified and provide good solutions to most of our problems. For example, we can chooseasyncLibrary to simplify the most common processes.

There are more advanced technologies, such asPromiseAndGeneratorFunction, which will be the focus of the next chapter. When you understand all these technologies, you can choose the best solution according to your needs or use multiple technologies in the same project.