Talk about executor and tasks of storeworker.

  storm

Order

This article mainly studies the executor and task of storm worker.

Worker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

    public static void main(String[] args) throws Exception {
        Preconditions.checkArgument(args.length == 5, "Illegal number of arguments. Expected: 5, Actual: " + args.length);
        String stormId = args[0];
        String assignmentId = args[1];
        String supervisorPort = args[2];
        String portStr = args[3];
        String workerId = args[4];
        Map<String, Object> conf = ConfigUtils.readStormConfig();
        Utils.setupDefaultUncaughtExceptionHandler();
        StormCommon.validateDistributedMode(conf);
        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(supervisorPort),
                                   Integer.parseInt(portStr), workerId);
        worker.start();
        Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
    }
  • The main method creates a Worker and then calls start

Worker.start

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

    public void start() throws Exception {
        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
                 ConfigUtils.maskPasswords(conf));
        // because in local mode, its not a separate
        // process. supervisor will register it in this case
        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
        if (!ConfigUtils.isLocalMode(conf)) {
            // Distributed mode
            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
            String pid = Utils.processPid();
            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
                                        Charset.forName("UTF-8"));
        }
        final Map<String, Object> topologyConf =
            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
        ClusterStateContext csContext = new ClusterStateContext(DaemonType.WORKER, topologyConf);
        IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
        IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);

        StormMetricRegistry.start(conf, DaemonType.WORKER);

        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
        Map<String, String> initCreds = new HashMap<>();
        if (initialCredentials != null) {
            initCreds.putAll(initialCredentials.get_creds());
        }
        autoCreds = ClientAuthUtils.getAutoCredentials(topologyConf);
        subject = ClientAuthUtils.populateSubject(null, autoCreds, initCreds);

        Subject.doAs(subject, (PrivilegedExceptionAction<Object>)
            () -> loadWorker(topologyConf, stateStorage, stormClusterState, initCreds, initialCredentials)
        );

    }
  • This is mainly to call loadWorker.

Worker.loadWorker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java

    private AtomicReference<List<IRunningExecutor>> executorsAtom;

    private Object loadWorker(Map<String, Object> topologyConf, IStateStorage stateStorage, IStormClusterState stormClusterState,
                              Map<String, String> initCreds, Credentials initialCredentials)
        throws Exception {
        workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorPort, port, workerId,
                                      topologyConf, stateStorage, stormClusterState, autoCreds);

        // Heartbeat here so that worker process dies if this fails
        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
        // that worker is running and moves on
        doHeartBeat();

        executorsAtom = new AtomicReference<>(null);

        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
        // to the supervisor
        workerState.heartbeatTimer
            .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
                try {
                    doHeartBeat();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

        workerState.executorHeartbeatTimer
            .scheduleRecurring(0, (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS),
                               Worker.this::doExecutorHeartbeats);

        workerState.registerCallbacks();

        workerState.refreshConnections(null);

        workerState.activateWorkerWhenAllConnectionsReady();

        workerState.refreshStormActive(null);

        workerState.runWorkerStartHooks();

        List<Executor> execs = new ArrayList<>();
        for (List<Long> e : workerState.getLocalExecutors()) {
            if (ConfigUtils.isLocalMode(topologyConf)) {
                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
                execs.add(executor);
                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                }
            } else {
                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                }
                execs.add(executor);
            }
        }

        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
        for (Executor executor : execs) {
            newExecutors.add(executor.execute());
        }
        executorsAtom.set(newExecutors);

        //......

        setupFlushTupleTimer(topologyConf, newExecutors);
        setupBackPressureCheckTimer(topologyConf);

        LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf));
        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
        return this;
    }
  • Here, the collection of List<Long> executorId is obtained through workerState.getLocalExecutors ()
  • Then create a specified number of Executor through Executor.mkExecutor, call the execute () method to convert to ExecutorShutdown, and then save to atomicreference < list < irunningexecutor > > executoratom

WorkerState.getLocalExecutors

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java

    // local executors and localTaskIds running in this worker
    final Set<List<Long>> localExecutors;

    public Set<List<Long>> getLocalExecutors() {
        return localExecutors;
    }

    public WorkerState(Map<String, Object> conf, IContext mqContext, String topologyId, String assignmentId,
                       int supervisorPort, int port, String workerId, Map<String, Object> topologyConf, IStateStorage stateStorage,
                       IStormClusterState stormClusterState, Collection<IAutoCredentials> autoCredentials) throws IOException,
        InvalidTopologyException {
        this.autoCredentials = autoCredentials;
        this.conf = conf;
        this.localExecutors = new HashSet<>(readWorkerExecutors(stormClusterState, topologyId, assignmentId, port));
        //......
    }

    private List<List<Long>> readWorkerExecutors(IStormClusterState stormClusterState, String topologyId, String assignmentId,
                                                 int port) {
        LOG.info("Reading assignments");
        List<List<Long>> executorsAssignedToThisWorker = new ArrayList<>();
        executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
        Map<List<Long>, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port();
        for (Map.Entry<List<Long>, NodeInfo> entry : executorToNodePort.entrySet()) {
            NodeInfo nodeInfo = entry.getValue();
            if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) {
                executorsAssignedToThisWorker.add(entry.getKey());
            }
        }
        return executorsAssignedToThisWorker;
    }

    private Assignment getLocalAssignment(Map<String, Object> conf, IStormClusterState stormClusterState, String topologyId) {
        if (!ConfigUtils.isLocalMode(conf)) {
            try (SupervisorClient supervisorClient = SupervisorClient.getConfiguredClient(conf, Utils.hostname(),
                                                                                          supervisorPort)) {
                Assignment assignment = supervisorClient.getClient().getLocalAssignmentForStorm(topologyId);
                return assignment;
            } catch (Throwable tr1) {
                //if any error/exception thrown, fetch it from zookeeper
                return stormClusterState.remoteAssignmentInfo(topologyId, null);
            }
        } else {
            return stormClusterState.remoteAssignmentInfo(topologyId, null);
        }
    }
  • WorkerState obtains executorIds running in this worker through readWorkerExecutors in the constructor.
  • The Assignment is obtained by the getLocalAssignment method, and then the map < list < long > and nodeinfo > executiondeport are obtained by the get_executor_node_port method.
  • GetLocalAssignment gets the Assignment through Supervisor Client. GetClient (). GetLocalAsignmentForStorm (Topology ID), and if there is an exception, it gets the assignment from zookeeper through Stormclusterstate. RemoteAsignmentInfo.

StormClusterStateImpl.remoteAssignmentInfo

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

    public Assignment remoteAssignmentInfo(String stormId, Runnable callback) {
        if (callback != null) {
            assignmentInfoCallback.put(stormId, callback);
        }
        byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
        return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
    }
  • According to topologyId, get the path from ClusterUtils.assignmentPath, and then go to zookeeper to get the data.
  • The data is serialized by thrift and deserialized when retrieved.

ClusterUtils.assignmentPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

    public static final String ZK_SEPERATOR = "/";

    public static final String ASSIGNMENTS_ROOT = "assignments";

    public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;

    public static String assignmentPath(String id) {
        return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
    }
  • The path is /assignments/{topology}, such as/assignments/demotopology-1-1539163962

Executor.mkExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
        Executor executor;

        WorkerTopologyContext workerTopologyContext = workerState.getWorkerTopologyContext();
        List<Integer> taskIds = StormCommon.executorIdToTasks(executorId);
        String componentId = workerTopologyContext.getComponentId(taskIds.get(0));

        String type = getExecutorType(workerTopologyContext, componentId);
        if (ClientStatsUtil.SPOUT.equals(type)) {
            executor = new SpoutExecutor(workerState, executorId, credentials);
        } else {
            executor = new BoltExecutor(workerState, executorId, credentials);
        }

        int minId = Integer.MAX_VALUE;
        Map<Integer, Task> idToTask = new HashMap<>();
        for (Integer taskId : taskIds) {
            minId = Math.min(minId, taskId);
            try {
                Task task = new Task(executor, taskId);
                idToTask.put(taskId, task);
            } catch (IOException ex) {
                throw Utils.wrapInRuntime(ex);
            }
        }

        executor.idToTaskBase = minId;
        executor.idToTask = Utils.convertToArray(idToTask, minId);
        return executor;
    }
  • Create a SpoutExecutor or BoltExecutor based on the component type.
  • Then create tasks and bind to executor

Executor.execute

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    /**
     * separated from mkExecutor in order to replace executor transfer in executor data for testing.
     */
    public ExecutorShutdown execute() throws Exception {
        LOG.info("Loading executor tasks " + componentId + ":" + executorId);

        String handlerName = componentId + "-executor" + executorId;
        Utils.SmartThread handler =
            Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);

        LOG.info("Finished loading executor " + componentId + ":" + executorId);
        return new ExecutorShutdown(this, Lists.newArrayList(handler), idToTask, receiveQueue);
    }
  • Here, use Utils.asyncLoop to create Utils.SmartThread and call start to start.

Utils.asyncLoop

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/Utils.java

    /**
     * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous
     * call.
     *
     * The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable
     * that in turn returns the number of seconds to sleep. In the latter case isFactory.
     *
     * @param afn              the code to call on each iteration
     * @param isDaemon         whether the new thread should be a daemon thread
     * @param eh               code to call when afn throws an exception
     * @param priority         the new thread's priority
     * @param isFactory        whether afn returns a callable instead of sleep seconds
     * @param startImmediately whether to start the thread before returning
     * @param threadName       a suffix to be appended to the thread name
     * @return the newly created thread
     *
     * @see Thread
     */
    public static SmartThread asyncLoop(final Callable afn, boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
                                        int priority, final boolean isFactory, boolean startImmediately,
                                        String threadName) {
        SmartThread thread = new SmartThread(new Runnable() {
            public void run() {
                try {
                    final Callable<Long> fn = isFactory ? (Callable<Long>) afn.call() : afn;
                    while (true) {
                        if (Thread.interrupted()) {
                            throw new InterruptedException();
                        }
                        final Long s = fn.call();
                        if (s == null) { // then stop running it
                            break;
                        }
                        if (s > 0) {
                            Time.sleep(s);
                        }
                    }
                } catch (Throwable t) {
                    if (Utils.exceptionCauseIsInstanceOf(
                        InterruptedException.class, t)) {
                        LOG.info("Async loop interrupted!");
                        return;
                    }
                    LOG.error("Async loop died!", t);
                    throw new RuntimeException(t);
                }
            }
        });
        if (eh != null) {
            thread.setUncaughtExceptionHandler(eh);
        } else {
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                public void uncaughtException(Thread t, Throwable e) {
                    LOG.error("Async loop died!", e);
                    Utils.exitProcess(1, "Async loop died!");
                }
            });
        }
        thread.setDaemon(isDaemon);
        thread.setPriority(priority);
        if (threadName != null && !threadName.isEmpty()) {
            thread.setName(thread.getName() + "-" + threadName);
        }
        if (startImmediately) {
            thread.start();
        }
        return thread;
    }
  • Here, the run method calls fn.call () endlessly, that is, calls the Executor.call().call () method
  • Call mainly calls the receiveQueue.consume method.
  • In addition to calling the receiveQueue.consume method, SpoutExecutor.call also called sports.get (j). nexttuple ()

receiveQueue.consume

storm-2.0.0/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java

    /**
     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
     */
    public int consume(JCQueue.Consumer consumer) {
        return consume(consumer, continueRunning);
    }

    /**
     * Non blocking. Returns immediately if Q is empty. Runs till Q is empty OR exitCond.keepRunning() return false. Returns number of
     * elements consumed from Q
     */
    public int consume(JCQueue.Consumer consumer, ExitCondition exitCond) {
        try {
            return consumeImpl(consumer, exitCond);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q
     *
     * @param consumer
     * @param exitCond
     */
    private int consumeImpl(Consumer consumer, ExitCondition exitCond) throws InterruptedException {
        int drainCount = 0;
        while (exitCond.keepRunning()) {
            Object tuple = recvQueue.poll();
            if (tuple == null) {
                break;
            }
            consumer.accept(tuple);
            ++drainCount;
        }

        int overflowDrainCount = 0;
        int limit = overflowQ.size();
        while (exitCond.keepRunning() && (overflowDrainCount < limit)) { // 2nd cond prevents staying stuck with consuming overflow
            Object tuple = overflowQ.poll();
            ++overflowDrainCount;
            consumer.accept(tuple);
        }
        int total = drainCount + overflowDrainCount;
        if (total > 0) {
            consumer.flush();
        }
        return total;
    }
  • The consumer method is mainly to call the Consumer’s accept method.

Task

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

public class Task {

    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private final TaskMetrics taskMetrics;
    private Executor executor;
    private WorkerState workerData;
    private TopologyContext systemTopologyContext;
    private TopologyContext userTopologyContext;
    private WorkerTopologyContext workerTopologyContext;
    private Integer taskId;
    private String componentId;
    private Object taskObject; // Spout/Bolt object
    private Map<String, Object> topoConf;
    private BooleanSupplier emitSampler;
    private CommonStats executorStats;
    private Map<String, Map<String, LoadAwareCustomStreamGrouping>> streamComponentToGrouper;
    private HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> streamToGroupers;
    private boolean debug;

    public Task(Executor executor, Integer taskId) throws IOException {
        this.taskId = taskId;
        this.executor = executor;
        this.workerData = executor.getWorkerData();
        this.topoConf = executor.getTopoConf();
        this.componentId = executor.getComponentId();
        this.streamComponentToGrouper = executor.getStreamToComponentToGrouper();
        this.streamToGroupers = getGroupersPerStream(streamComponentToGrouper);
        this.executorStats = executor.getStats();
        this.workerTopologyContext = executor.getWorkerTopologyContext();
        this.emitSampler = ConfigUtils.mkStatsSampler(topoConf);
        this.systemTopologyContext = mkTopologyContext(workerData.getSystemTopology());
        this.userTopologyContext = mkTopologyContext(workerData.getTopology());
        this.taskObject = mkTaskObject();
        this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG);
        this.addTaskHooks();
        this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId);
    }

    //......
}

Executor.accept

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/Executor.java

    @Override
    public void accept(Object event) {
        AddressedTuple addressedTuple = (AddressedTuple) event;
        int taskId = addressedTuple.getDest();

        TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
        if (isDebug) {
            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
        }

        try {
            if (taskId != AddressedTuple.BROADCAST_DEST) {
                tupleActionFn(taskId, tuple);
            } else {
                for (Integer t : taskIds) {
                    tupleActionFn(t, tuple);
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
  • The accept method mainly calls the tupleActionFn method one by one for each taskId
  • BoltExecutor.tupleActionFn mainly obtains the boltObject from the task and then calls boltObject.execute(tuple);
  • SpoutExecutor.tupleActionFn mainly takes out TupleInfo from rotating map < long, tupleinfo > pending, and then makes ack for success or failure.

ExecutorShutdown

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorShutdown.java

public class ExecutorShutdown implements Shutdownable, IRunningExecutor {

    private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);

    private final Executor executor;
    private final List<Utils.SmartThread> threads;
    private final ArrayList<Task> taskDatas;
    private final JCQueue receiveQueue;

    //......

    @Override
    public void credentialsChanged(Credentials credentials) {
        TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), new Values(credentials),
                                        Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID,
                                        Constants.CREDENTIALS_CHANGED_STREAM_ID);
        AddressedTuple addressedTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
        try {
            executor.getReceiveQueue().publish(addressedTuple);
            executor.getReceiveQueue().flush();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void loadChanged(LoadMapping loadMapping) {
        executor.reflectNewLoadMapping(loadMapping);
    }

    @Override
    public JCQueue getReceiveQueue() {
        return receiveQueue;
    }

    @Override
    public boolean publishFlushTuple() {
        return executor.publishFlushTuple();
    }

    @Override
    public void shutdown() {
        try {
            LOG.info("Shutting down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
            executor.getReceiveQueue().close();
            for (Utils.SmartThread t : threads) {
                t.interrupt();
            }
            for (Utils.SmartThread t : threads) {
                LOG.debug("Executor " + executor.getComponentId() + ":" + executor.getExecutorId() + " joining thread " + t.getName());
                t.join();
            }
            executor.getStats().cleanupStats();
            for (Task task : taskDatas) {
                if (task == null) {
                    continue;
                }
                TopologyContext userContext = task.getUserContext();
                for (ITaskHook hook : userContext.getHooks()) {
                    hook.cleanup();
                }
            }
            executor.getStormClusterState().disconnect();
            if (executor.getOpenOrPrepareWasCalled().get()) {
                for (Task task : taskDatas) {
                    if (task == null) {
                        continue;
                    }
                    Object object = task.getTaskObject();
                    if (object instanceof ISpout) {
                        ((ISpout) object).close();
                    } else if (object instanceof IBolt) {
                        ((IBolt) object).cleanup();
                    } else {
                        LOG.error("unknown component object");
                    }
                }
            }
            LOG.info("Shut down executor " + executor.getComponentId() + ":" + executor.getExecutorId());
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}
  • ExecutorShutdown mainly wrapped the shutdown process.

Summary

  • After worker starts, read assignment information from /assignments/{topology} path to zk, such as/assignments/demotopology-1-1539163962
  • Then get map < list < long > and nodeinfo > Executoronodeport according to assignment information, and then create executor through Executor.mkExecutor
  • When creating Executor, create task binding to Executor according to Task information in assignment information.
  • After that, the execute method of executor is called, which starts Utils.SmartThread, which loops to call the Executor.call().call () method.

Call mainly calls the receiveQueue.consume method; In addition to calling the receiveQueue.consume method, SpoutExecutor.call also called sports.get (j). nexttuple ()

  • The receiveQueue.consume method mainly calls the Executor accept method, while the accept method mainly calls the tupleActionFn method one by one for each taskId

BoltExecutor.tupleActionFn mainly obtains the boltObject from the task and then calls boltobject.execute (tuple); SpoutExecutor.tupleActionFn mainly takes out TupleInfo from rotating map < long, tupleinfo > pending, and then makes ack for success or failure.

  • Worker can be understood as a process. executor is the number of threads in the process, while task can be understood as an instance of spout or Bolts. By default, an executor corresponds to a spout or Bolts task.
  • The supervisor can be expanded by adding a worker or an executor. This process is called rebalance, and task is used as a carrier and abstract of the task. The task is transferred from the executor of the heavily loaded worker to the executor of the new worker to realize rebalance (The rebalance command can only readjust the number of worker and executor, and cannot change the number of task.)

doc