Category : storm

Order This article mainly studies storm’s submitTopology Submit topology log instance 2018-10-08 17:32:55.738 INFO 2870 — [ main] org.apache.storm.StormSubmitter : Generated ZooKeeper secret payload for MD5-digest: -8659577410336375158:-6351873438041855318 2018-10-08 17:32:55.893 INFO 2870 — [ main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627 2018-10-08 17:32:56.059 INFO 2870 — [ main] o.apache.storm.security.auth.AuthUtils : Got AutoCreds [] 2018-10-08 17:32:56.073 ..

Read more

Order This article mainly studies nimbus.seeds parameters of storm client. NIMBUS_SEEDS storm-core-1.1.0-sources.jar! /org/apache/storm/Config.java /** * The host that the master server is running on, added only for backward compatibility, * the usage deprecated in favor of nimbus.seeds config. */ @Deprecated @isString public static final String NIMBUS_HOST = “nimbus.host”; /** * List of seed nimbus hosts ..

Read more

Order This article mainly studies the LeaderElector of storm nimbus. Nimbus org/apache/storm/daemon/nimbus/Nimbus.java public static void main(String[] args) throws Exception { Utils.setupDefaultUncaughtExceptionHandler(); launch(new StandaloneINimbus()); } public static Nimbus launch(INimbus inimbus) throws Exception { Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(), ConfigUtils.readYamlConfig(“storm-cluster-auth.yaml”, false)); boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP); boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK); if (checkAcl) { AclEnforcement.verifyAcls(conf, ..

Read more

Order This article mainly studies the startup of storm supervisor. Supervisor.launch storm-core-1.2.2-sources.jar! /org/apache/storm/daemon/supervisor/Supervisor.java /** * Launch the supervisor */ public void launch() throws Exception { LOG.info(“Starting Supervisor with conf {}”, conf); String path = ConfigUtils.supervisorTmpDir(conf); FileUtils.cleanDirectory(new File(path)); Localizer localizer = getLocalizer(); SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, this); hb.run(); // should synchronize supervisor so it doesn’t ..

Read more

Order This article mainly studies the executor and task of storm worker. Worker storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java public static void main(String[] args) throws Exception { Preconditions.checkArgument(args.length == 5, “Illegal number of arguments. Expected: 5, Actual: ” + args.length); String stormId = args[0]; String assignmentId = args[1]; String supervisorPort = args[2]; String portStr = args[3]; String workerId = args[4]; ..

Read more

Order This article mainly studies storm’s GraphiteStormReporter GraphiteStormReporter storm-core-1.2.2-sources.jar! /org/apache/storm/metrics2/reporters/GraphiteStormReporter.java public class GraphiteStormReporter extends ScheduledStormReporter { private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); public static final String GRAPHITE_PREFIXED_WITH = “graphite.prefixed.with”; public static final String GRAPHITE_HOST = “graphite.host”; public static final String GRAPHITE_PORT = “graphite.port”; public static final String GRAPHITE_TRANSPORT = “graphite.transport”; @Override public void prepare(MetricRegistry ..

Read more

Order This article mainly studies mkAssignments of storm nimbus. Nimbus.mkAssignments storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java void doRebalance(String topoId, StormBase stormBase) throws Exception { RebalanceOptions rbo = stormBase.get_topology_action_options().get_rebalance_options(); StormBase updated = new StormBase(); updated.set_topology_action_options(null); updated.set_component_debug(Collections.emptyMap()); if (rbo.is_set_num_executors()) { updated.set_component_executors(rbo.get_num_executors()); } if (rbo.is_set_num_workers()) { updated.set_num_workers(rbo.get_num_workers()); } stormClusterState.updateStorm(topoId, updated); updateBlobStore(topoId, rbo, ServerUtils.principalNameToSubject(rbo.get_principal())); idToExecutors.getAndUpdate(new Dissoc<>(topoId)); // remove the executors cache to let it ..

Read more

Order This article mainly studies storm’s AssignmentDistributionService AssignmentDistributionService storm-2.0.0/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java /** * A service for distributing master assignments to supervisors, this service makes the assignments notification * asynchronous. * * <p>We support multiple working threads to distribute assignment, every thread has a queue buffer. * * <p>Master will shuffle its node request to the queues, if ..

Read more

Order This article mainly studies storm’s PartialKeyGrouping Example @Test public void testPartialKeyGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { String spoutId = “wordGenerator”; String counterId = “counter”; String aggId = “aggregator”; String intermediateRankerId = “intermediateRanker”; String totalRankerId = “finalRanker”; int TOP_N = 5; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(spoutId, new TestWordSpout(), 5); //NOTE 通过partialKeyGrouping替代fieldsGrouping,实现较为均衡的负载到countBolt builder.setBolt(counterId, new RollingCountBolt(9, ..

Read more

Order This article mainly studies storm’s CustomStreamGrouping CustomStreamGrouping storm-2.0.0/storm-client/src/jvm/org/apache/storm/grouping/CustomStreamGrouping.java public interface CustomStreamGrouping extends Serializable { /** * Tells the stream grouping at runtime the tasks in the target bolt. This information should be used in chooseTasks to determine the * target tasks. * * It also tells the grouping the metadata on the stream this ..

Read more

Order This article mainly studies storm’s direct grouping direct grouping Direct grouping is a special grouping. It is the upstream producer that directly specifies which task downstream receives the tuple it sends. The use of direct grouping includes the following steps: 1. upstream saves taskId list of downstream bolt in the prepare method public class ..

Read more

Order This article mainly studies storm’s tickTuple Example TickWordCountBolt public class TickWordCountBolt extends BaseBasicBolt { private static final Logger LOGGER = LoggerFactory.getLogger(TickWordCountBolt.class); Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return conf; } @Override public void execute(Tuple input, BasicOutputCollector collector) { if(TupleUtils.isTick(input)){ //execute ..

Read more