Analysis of EventEmitter (Event Publish/Subscribe node)

  javascript, node.js

Let’s start with node asynchronous programming solution:

  • Event publish/subscribe mode
  • Promise/deferred mode
  • Process control library

Event publish/subscribe mode

The event listener mode is a mode widely used in asynchronous programming. It is also called the publish/subscribe mode as an event word of callback function.

The main functions implemented include

  • on
  • remove
  • once
  • emit

Cut the crap, let’s simply implement an event monitor function

First create an eventEmitter function

function EventEmitter() {
 //replace empty object {} with Object.create(null)
 //The advantage is that there are no impurities and the prototype chain is not inherited.
 // _events to store information that observes the queue
 this._events = Object.create(null);
 }

Because too many listeners take up a lot of memory, resulting in memory leaks, the number of listeners is generally not more than 10, otherwise there will be warnning warnings.
The following are some default settings

//Default Maximum Binding Times
 EventEmitter.defaultMaxListeners = 10;
 //on method
 EventEmitter.prototype.addListener = EventEmitter.prototype.on;
 //Return the monitored event name
 EventEmitter.prototype.eventNames = function () {
 return Object.keys(this._events);
 };
 //Set the maximum number of listeners
 EventEmitter.prototype.setMaxListeners = function (n) {
 this._count = n;
 };
 //Returns the number of listeners
 EventEmitter.prototype.getMaxListeners = function () {
 return this._count ?   this._count : this.defaultMaxListeners;
 };

Next is the implementation of the on function

EventEmitter.prototype.on = function (type, cb, flag) {
 //if it is not newListener, newListener should perform the following
 if (type !  == 'newListener') {
 this._events['newListener'] && this._events['newListener'].forEach(listener => {
 listener(type);
 });
 }
 if (this._events[type]) {
 //Decide whether to add forward or backward according to the flag passed in.
 if (flag) {
 this._events[type].unshift(cb);
 } else {
 this._events[type].push(cb);
 }
 } else {
 this._events[type] = [cb];
 }
 //The monitored events cannot exceed the set maximum number of monitors
 if (this._events[type].length === this.getMaxListeners()) {
 Warning ('Warning-Listener Number Too Large');
 }
 };

Resolution:
The on function is the initial function for assistance. First, it is judged whether it is the first time to listen. If so, the initialization function is first performed once.
Next, find the place where the pointer is type in the-events queue, and judge whether to add callback function at the end or head of the queue according to flag

Next is the implementation method of once monitoring

//Monitor once
 EventEmitter.prototype.once = function (type, cb, flag) {
 //bind first, call then delete
 function wrap() {
 cb(...arguments);
 this.removeListener(type, wrap);
 }
 //Custom Attributes
 wrap.listen = cb;
 this.on(type, wrap, flag);
 };

Resolution:
Implemented as wrapping a layer of remove operation on the callback, and then passed into the on function as a new callback.
In this way, the remove operation will be executed when the callback is executed for the first time, and the remove operation will be executed once.

Next is the remove function, which removes a listener for type

EventEmitter.prototype.removeListener = function (type, cb) {
 if (this._events[type]) {
 this._events[type] = this._events[type].filter(listener => {
 return cb !  == listener && cb !  == listener.listen;
 });
 }
 };

Resolution:
Pass in type and callback to delete, filter the array marked by type, and filter out if cbcbcb = = = listener

Delete all

EventEmitter.prototype.removeAllListener = function () {
 this._events = Object.create(null);
 };

Next is the release function emit

EventEmitter.prototype.emit = function (type, ...args) {
 if (this._events[type]) {
 this._events[type].forEach(listener => {
 listener.call(this, ...args);
 });
 }
 };

Resolution:
Also more intuitive, if there is a listener queue of type in events, each callback in the queue is executed once, and this and arg are bound with the call function

Complete code

//EventEmitter.js
 
 
 function EventEmitter() {
 //replace empty object {} with Object.create(null)
 //The advantage is that there are no impurities and the prototype chain is not inherited.
 this._events = Object.create(null);
 }
 //Default Maximum Binding Times
 EventEmitter.defaultMaxListeners = 10;
 //on method
 EventEmitter.prototype.addListener = EventEmitter.prototype.on;
 //Return the monitored event name
 EventEmitter.prototype.eventNames = function () {
 return Object.keys(this._events);
 };
 //Set the maximum number of listeners
 EventEmitter.prototype.setMaxListeners = function (n) {
 this._count = n;
 };
 //Returns the number of listeners
 EventEmitter.prototype.getMaxListeners = function () {
 return this._count ?   this._count : this.defaultMaxListeners;
 };
 //Monitor
 EventEmitter.prototype.on = function (type, cb, flag) {
 //Default, if there is no _events, create one for it
 if (!  this._events) {
 this._events = Object.create(null);
 }
 //if it is not newListener, newListener should perform the following
 if (type !  == 'newListener') {
 this._events['newListener'] && this._events['newListener'].forEach(listener => {
 listener(type);
 });
 }
 if (this._events[type]) {
 //Decide whether to add forward or backward according to the flag passed in.
 if (flag) {
 this._events[type].unshift(cb);
 } else {
 this._events[type].push(cb);
 }
 } else {
 this._events[type] = [cb];
 }
 //The monitored events cannot exceed the set maximum number of monitors
 if (this._events[type].length === this.getMaxListeners()) {
 Warning ('Warning-Warning-Warning');
 }
 };
 //Add Forward
 EventEmitter.prototype.prependListener = function (type, cb) {
 this.on(type, cb, true);
 };
 EventEmitter.prototype.prependOnceListener = function (type, cb) {
 this.once(type, cb, true);
 };
 //Monitor once
 EventEmitter.prototype.once = function (type, cb, flag) {
 //bind first, call then delete
 function wrap() {
 cb(...arguments);
 this.removeListener(type, wrap);
 }
 //Custom Attributes
 wrap.listen = cb;
 this.on(type, wrap, flag);
 };
 //delete listening type
 EventEmitter.prototype.removeListener = function (type, cb) {
 if (this._events[type]) {
 this._events[type] = this._events[type].filter(listener => {
 return cb !  == listener && cb !  == listener.listen;
 });
 }
 };
 EventEmitter.prototype.removeAllListener = function () {
 this._events = Object.create(null);
 };
 //Return all listening types
 EventEmitter.prototype.listeners = function (type) {
 return this._events[type];
 };
 //release
 EventEmitter.prototype.emit = function (type, ...args) {
 if (this._events[type]) {
 this._events[type].forEach(listener => {
 listener.call(this, ...args);
 });
 }
 };
 
 module.exports = EventEmitter;

My blog will soon be synchronized to tengxunyun+community, inviting everyone to join us:https://cloud.tencent.com/dev …