Talk about mkAssignments at storm nimbus.



This article mainly studies mkAssignments of storm nimbus.



    void doRebalance(String topoId, StormBase stormBase) throws Exception {
        RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options();
        StormBase updated = new StormBase();

        if (rbo.is_set_num_executors()) {

        if (rbo.is_set_num_workers()) {
        stormClusterState.updateStorm(topoId, updated);
        updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal()));
        idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it recompute.

    private void mkAssignments() throws Exception {
  • Update store (topoid, updated) is called here to update zk data
  • Both doRebalance and mkAssignments call the mkassignments method here.

mkAssignments(String scratchTopoId)


    private void mkAssignments(String scratchTopoId) throws Exception {
        try {
            if (!isReadyForMKAssignments()) {
            // get existing assignment (just the topologyToExecutorToNodePort map) -> default to {}
            // filter out ones which have a executor timeout
            // figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors
            // should be in each slot (e.g., 4, 4, 4, 5)
            // only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
            // edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be
            // reassigned to. worst comes to worse the executor will timeout and won't assign here next time around

            IStormClusterState state = stormClusterState;
            //read all the topologies
            Map<String, StormBase> bases;
            Map<String, TopologyDetails> tds = new HashMap<>();
            synchronized (submitLock) {
                // should promote: only fetch storm bases of topologies that need scheduling.
                bases = state.topologyBases();

                for (Iterator<Entry<String, StormBase>> it = bases.entrySet().iterator(); it.hasNext(); ) {
                    Entry<String, StormBase> entry =;
                    String id = entry.getKey();
                    try {
                        tds.put(id, readTopologyDetails(id, entry.getValue()));
                    } catch (KeyNotFoundException e) {
                        //A race happened and it is probably not running
            List<String> assignedTopologyIds = state.assignments(null);
            Map<String, Assignment> existingAssignments = new HashMap<>();
            for (String id : assignedTopologyIds) {
                //for the topology which wants rebalance (specified by the scratchTopoId)
                // we exclude its assignment, meaning that all the slots occupied by its assignment
                // will be treated as free slot in the scheduler code.
                if (!id.equals(scratchTopoId)) {
                    Assignment currentAssignment = state.assignmentInfo(id, null);
                    if (!currentAssignment.is_set_owner()) {
                        TopologyDetails td = tds.get(id);
                        if (td != null) {
                            state.setAssignment(id, currentAssignment, td.getConf());
                    existingAssignments.put(id, currentAssignment);

            // make the new assignments for topologies
            lockingMkAssignments(existingAssignments, bases, scratchTopoId, assignedTopologyIds, state, tds);
        } catch (Exception e) {
            throw e;
  • Reading TopologyDetails information corresponding to topology through readTopologyDetails
  • The existing assignments are obtained through state.assignments (runnable callbacks) and state.assignmentinfo (stringstoremd, runnable callbacks)
  • Then call lockingMkAssignments assign new assignments to topologies



    private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases,
                                      String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state,
                                      Map<String, TopologyDetails> tds) throws Exception {
        Topologies topologies = new Topologies(tds);

        synchronized (schedLock) {
            Map<String, SchedulerAssignment> newSchedulerAssignments =
                    computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);

            Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort =
                    computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);
            Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources =
            int nowSecs = Time.currentTimeSecs();
            Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
            //construct the final Assignments by adding start-times etc into it
            Map<String, Assignment> newAssignments = new HashMap<>();

            //tasks figure out what tasks to talk to by looking at topology at runtime
            // only log/set when there's been a change to the assignment
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                TopologyDetails td = topologies.getById(topoId);
                if (assignment.equals(existingAssignment)) {
                    LOG.debug("Assignment for {} hasn't changed", topoId);
                } else {
          "Setting new assignment for topology id {}: {}", topoId, assignment);
                    state.setAssignment(topoId, assignment, td.getConf());

            //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
            //because the number of existing assignments is small for every scheduling round,
            //we expect to notify supervisors at almost the same time
            Map<String, String> totalAssignmentsChangedNodes = new HashMap<>();
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
            notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,

            Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                if (existingAssignment == null) {
                    existingAssignment = new Assignment();
                    existingAssignment.set_executor_node_port(new HashMap<>());
                    existingAssignment.set_executor_start_time_secs(new HashMap<>());
                Set<WorkerSlot> newSlots = newlyAddedSlots(existingAssignment, assignment);
                addedSlots.put(topoId, newSlots);
            inimbus.assignSlots(topologies, addedSlots);
  • Here, first call the computeNewSchedulerAssignments method to calculate newSchedulerAssignments
  • NewAssignments are traversed. if the assignment changes, state.setassignment (topoid, assignment, TD.getconf ()) is called to write the assignment information to zk
  • Then calculate totalAssignmentsChangedNodes to call notifySupervisorsAssignments and notify supervisors with AssignmentDistributionService



    private Map<String, SchedulerAssignment> computeNewSchedulerAssignments(Map<String, Assignment> existingAssignments,
                                                                            Topologies topologies, Map<String, StormBase> bases,
                                                                            String scratchTopologyId)
        throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {

        Map<String, Set<List<Integer>>> topoToExec = computeTopologyToExecutors(bases);

        Set<String> zkHeartbeatTopologies = topologies.getTopologies().stream()
                                                      .filter(topo -> !supportRpcHeartbeat(topo))

        updateAllHeartbeats(existingAssignments, topoToExec, zkHeartbeatTopologies);

        Map<String, Set<List<Integer>>> topoToAliveExecutors = computeTopologyToAliveExecutors(existingAssignments, topologies,
                                                                                               topoToExec, scratchTopologyId);
        Map<String, Set<Long>> supervisorToDeadPorts = computeSupervisorToDeadPorts(existingAssignments, topoToExec,
        Map<String, SchedulerAssignmentImpl> topoToSchedAssignment = computeTopologyToSchedulerAssignment(existingAssignments,
        Set<String> missingAssignmentTopologies = new HashSet<>();
        for (TopologyDetails topo : topologies.getTopologies()) {
            String id = topo.getId();
            Set<List<Integer>> allExecs = topoToExec.get(id);
            Set<List<Integer>> aliveExecs = topoToAliveExecutors.get(id);
            int numDesiredWorkers = topo.getNumWorkers();
            int numAssignedWorkers = numUsedWorkers(topoToSchedAssignment.get(id));
            if (allExecs == null || allExecs.isEmpty() || !allExecs.equals(aliveExecs) || numDesiredWorkers > numAssignedWorkers) {
                //We have something to schedule...
        Map<String, SupervisorDetails> supervisors =
            readAllSupervisorDetails(supervisorToDeadPorts, topologies, missingAssignmentTopologies);
        Cluster cluster = new Cluster(inimbus, resourceMetrics, supervisors, topoToSchedAssignment, topologies, conf);

        scheduler.schedule(topologies, cluster);
        //Get and set the start time before getting current time in order to avoid potential race with the longest-scheduling-time-ms gauge

        return cluster.getAssignments();
  • ComputeTopologyToExecutors returned map < string, Set<List<Integer > > topotexec, key is topologyId, value is set < list < Integer > > and integer is executorId
  • Computetopopolyoliveexecutors returned map < string, Set<List<Integer > > topotooaliveexecutors, key is topologyId, value is set < list < Integer > > and integer is executorId
  • ComputeSupervisorToDeadPorts returns map < string, Set<Long > > supervisors, key is supervisorId, value is set < Long >, and long is port port port
  • Computetopopolytoschedualassigment returns map < string, SchedulerAssignmentImpl > topotooschedassignment, key is topologyId, value is scheduler assignmentimpl
  • After that, according to the requirement of topology configuration, the number of woker and executor are compared with the existing assignments to calculate the missingAssignmentTopologies.
  • ReadAllSupervisorDetails returns map < string, supervisor details > supervisors, that is, the surviving assignable supervisor information
  • Finally, create a Cluster here, and then call scheduler.schedule (topologies, cluster); Schedule



    public void schedule(Topologies topologies, Cluster cluster) {
        defaultSchedule(topologies, cluster);

    public static void defaultSchedule(Topologies topologies, Cluster cluster) {
        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
            List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
            Set<ExecutorDetails> allExecutors = topology.getExecutors();

            Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned =
                EvenScheduler.getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
            Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
            for (List<ExecutorDetails> list : aliveAssigned.values()) {

            Set<WorkerSlot> canReassignSlots = slotsCanReassign(cluster, aliveAssigned.keySet());
            int totalSlotsToUse = Math.min(topology.getNumWorkers(), canReassignSlots.size() + availableSlots.size());

            Set<WorkerSlot> badSlots = null;
            if (totalSlotsToUse > aliveAssigned.size() || !allExecutors.equals(aliveExecutors)) {
                badSlots = badSlots(aliveAssigned, allExecutors.size(), totalSlotsToUse);
            if (badSlots != null) {

            EvenScheduler.scheduleTopologiesEvenly(new Topologies(topology), cluster);
  • Needsschedulingtopologies () traverses the TopologyDetails that need to be scheduled for processing.
  • Get available slots through cluster.getAvailableSlots ()
  • Get aliveAssigned through Evenscheduler. GetaliveAssignedWorkersloExecutors, and call slotsCanReassign to get canReassignSlots according to Alive Assigned
  • Math.min (topology.getnumworkers (), canreasonsgnslots.size ()+availableslots.size ()) calculates totalSlotsToUse and releases badSlots
  • Finally, the allocation is made by Even Scheduler. ScheduleTopology (Cluster)



    public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
        for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
            String topologyId = topology.getId();
            Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
            Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);

            for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : nodePortToExecutors.entrySet()) {
                WorkerSlot nodePort = entry.getKey();
                List<ExecutorDetails> executors = entry.getValue();
                cluster.assign(nodePort, topologyId, executors);
  • The newAssignment is calculated by the schedule topology (cluster) and converted to map < workers lot, list < executordetails > > nodeportoexecutors
  • Assign through cluster.assign (nodeport, topologyid, executors)



    private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
        List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
        Set<ExecutorDetails> allExecutors = topology.getExecutors();
        Map<WorkerSlot, List<ExecutorDetails>> aliveAssigned = getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
        int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());

        List<WorkerSlot> sortedList = sortSlots(availableSlots);
        if (sortedList == null) {
            LOG.error("No available slots for topology: {}", topology.getName());
            return new HashMap<ExecutorDetails, WorkerSlot>();

        //allow requesting slots number bigger than available slots
        int toIndex = (totalSlotsToUse - aliveAssigned.size())
                      > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
        List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);

        Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
        for (List<ExecutorDetails> list : aliveAssigned.values()) {
        Set<ExecutorDetails> reassignExecutors = Sets.difference(allExecutors, aliveExecutors);

        Map<ExecutorDetails, WorkerSlot> reassignment = new HashMap<ExecutorDetails, WorkerSlot>();
        if (reassignSlots.size() == 0) {
            return reassignment;

        List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>(reassignExecutors);
        Collections.sort(executors, new Comparator<ExecutorDetails>() {
            public int compare(ExecutorDetails o1, ExecutorDetails o2) {
                return o1.getStartTask() - o2.getStartTask();

        for (int i = 0; i < executors.size(); i++) {
            reassignment.put(executors.get(i), reassignSlots.get(i % reassignSlots.size()));

        if (reassignment.size() != 0) {
  "Available slots: {}", availableSlots.toString());
        return reassignment;
  • According to availableSlots, list < List<WorkerSlot> reassignSlots is calculated.
  • Differences (allexecutors, aliveexecutors) are used to calculate reassignExecutors and sort them by startTask.
  • Finally, it traverses the reassignExecutors and allocates slot, which are allocated by reasign slots.get (i% reasign slots.size ())



public class ExecutorDetails {
    public final int startTask;
    public final int endTask;


    public boolean equals(Object other) {
        if (!(other instanceof ExecutorDetails)) {
            return false;

        ExecutorDetails executor = (ExecutorDetails) other;
        return (this.startTask == executor.startTask) && (this.endTask == executor.endTask);
  • Although executorDetails is named Executor, it encapsulates the startTask and endTask information.
  • The process of schedule is actually to assign the tasks marked staskTask and endTask to slot for processing.
  • Note that the equals method has been rewritten here and compared according to startTask and endTask, which is mainly required for the previous set comparison operation.



     * Assign the slot to the executors for this topology.
     * @throws RuntimeException if the specified slot is already occupied.
    public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
        if (isSlotOccupied(slot)) {
            throw new RuntimeException(
                "slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");

        TopologyDetails td = topologies.getById(topologyId);
        if (td == null) {
            throw new IllegalArgumentException(
                "Trying to schedule for topo "
                + topologyId
                + " but that is not a known topology "
                + topologies.getAllIds());
        WorkerResources resources = calculateWorkerResources(td, executors);
        SchedulerAssignmentImpl assignment = assignments.get(topologyId);
        if (assignment == null) {
            assignment = new SchedulerAssignmentImpl(topologyId);
            assignments.put(topologyId, assignment);
        } else {
            for (ExecutorDetails executor : executors) {
                if (assignment.isExecutorAssigned(executor)) {
                    throw new RuntimeException(
                        "Attempting to assign executor: "
                        + executor
                        + " of topology: "
                        + topologyId
                        + " to workerslot: "
                        + slot
                        + ". The executor is already assigned to workerslot: "
                        + assignment.getExecutorToSlot().get(executor)
                        + ". The executor must unassigned before it can be assigned to another slot!");

        assignment.assign(slot, executors, resources);
        String nodeId = slot.getNodeId();
        double sharedOffHeapMemory = calculateSharedOffHeapMemory(nodeId, assignment);
        assignment.setTotalSharedOffHeapMemory(nodeId, sharedOffHeapMemory);
        updateCachesForWorkerSlot(slot, resources, sharedOffHeapMemory);
  • This is mainly entrusted to Assignment. Assignment (Slot, Executors, Resources)



     * Assign the slot to executors.
    public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors, WorkerResources slotResources) {
        assert slot != null;
        for (ExecutorDetails executor : executors) {
            this.executorToSlot.put(executor, slot);
        slotToExecutors.computeIfAbsent(slot, MAKE_LIST)
        if (slotResources != null) {
            resources.put(slot, slotResources);
        } else {
  • Give executorDetail (StartTask and endTask) allocate slot


  • Nimbus.mkAssignments is responsible for scheduling and assigning tasks, mainly calculating newSchedulerAssignments (Map < string, scheduler assignmentimpl >, key is topologyId), through the scheduler.schedule (topologies, cluster); Schedule
  • The EvenScheduler.scheduleTopology method is the core operation of scheduling. It mainly allocates slots to ExecutorDetails that need to be allocated. EvenScheduler adopts robbin round’s strategy and allocates slots to each reassignExecutor according to REASSIGN Slots. GET (I% REASSIGN Slots. Size ())
  • There are two attributes in ExecutorDetail, one is starttask and the other is endTask. The assignment process is actually to assign slot to these tasks.
  • After the allocation is completed, there are two operations in lockingMkassignments. One is State. SetAssignment (Topoid, Assignment, TD. GetConf ()) to store the changed Assignment to ZK; The other is to call the notifySupervisorsAssignments method to notify the supervisor, who updates the local memory after receiving it