Talk about storm’s submitTopology

  storm

Order

This article mainly studies storm’s submitTopology

Submit topology log instance

2018-10-08 17:32:55.738  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Generated ZooKeeper secret payload for MD5-digest: -8659577410336375158:-6351873438041855318
2018-10-08 17:32:55.893  INFO 2870 --- [           main] org.apache.storm.utils.NimbusClient      : Found leader nimbus : a391f7a04044:6627
2018-10-08 17:32:56.059  INFO 2870 --- [           main] o.apache.storm.security.auth.AuthUtils   : Got AutoCreds []
2018-10-08 17:32:56.073  INFO 2870 --- [           main] org.apache.storm.utils.NimbusClient      : Found leader nimbus : a391f7a04044:6627
2018-10-08 17:32:56.123  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Uploading dependencies - jars...
2018-10-08 17:32:56.125  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Uploading dependencies - artifacts...
2018-10-08 17:32:56.125  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Dependency Blob keys - jars : [] / artifacts : []
2018-10-08 17:32:56.149  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Uploading topology jar /tmp/storm-demo/target/storm-demo-0.0.1-SNAPSHOT.jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
2018-10-08 17:32:57.105  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Successfully uploaded topology jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
2018-10-08 17:32:57.106  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Submitting topology DemoTopology in distributed mode with conf {"nimbus.seeds":["192.168.99.100"],"storm.zookeeper.topology.auth.scheme":"digest","topology.workers":1,"storm.zookeeper.port":2181,"nimbus.thrift.port":6627,"storm.zookeeper.topology.auth.payload":"-8659577410336375158:-6351873438041855318","storm.zookeeper.servers":["192.168.99.100"]}
2018-10-08 17:32:58.008  INFO 2870 --- [           main] org.apache.storm.StormSubmitter          : Finished submitting topology: DemoTopology
  • It can be seen here that the path uploaded to nimbus is/data/nimbus/inbox/storejar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar.

StormSubmitter

submitTopology

storm-core-1.1.0-sources.jar! /org/apache/storm/StormSubmitter.java

    public static void submitTopology(String name, Map stormConf, StormTopology topology)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopology(name, stormConf, topology, null, null);
    }

    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
             ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
    }

    public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);
        stormConf.putAll(Utils.readCommandLineOpts());
        Map conf = Utils.readStormConfig();
        conf.putAll(stormConf);
        stormConf.putAll(prepareZookeeperAuthentication(conf));

        validateConfs(conf, topology);

        Map<String,String> passedCreds = new HashMap<>();
        if (opts != null) {
            Credentials tmpCreds = opts.get_creds();
            if (tmpCreds != null) {
                passedCreds = tmpCreds.get_creds();
            }
        }
        Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
        if (!fullCreds.isEmpty()) {
            if (opts == null) {
                opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
            }
            opts.set_creds(new Credentials(fullCreds));
        }
        try {
            if (localNimbus!=null) {
                LOG.info("Submitting topology " + name + " in local mode");
                if (opts!=null) {
                    localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
                } else {
                    // this is for backwards compatibility
                    localNimbus.submitTopology(name, stormConf, topology);
                }
                LOG.info("Finished submitting topology: " +  name);
            } else {
                String serConf = JSONValue.toJSONString(stormConf);
                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
                    if (topologyNameExists(name, client)) {
                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                    }

                    // Dependency uploading only makes sense for distributed mode
                    List<String> jarsBlobKeys = Collections.emptyList();
                    List<String> artifactsBlobKeys;

                    DependencyUploader uploader = new DependencyUploader();
                    try {
                        uploader.init();

                        jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);

                        artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
                    } catch (Throwable e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        uploader.deleteBlobs(jarsBlobKeys);
                        uploader.shutdown();
                        throw e;
                    }

                    try {
                        setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
                        submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
                    } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                        // remove uploaded jars blobs, not artifacts since they're shared across the cluster
                        // Note that we don't handle TException to delete jars blobs
                        // because it's safer to leave some blobs instead of topology not running
                        uploader.deleteBlobs(jarsBlobKeys);
                        throw e;
                    } finally {
                        uploader.shutdown();
                    }
                }
            }
        } catch(TException e) {
            throw new RuntimeException(e);
        }
        invokeSubmitterHook(name, asUser, conf, topology);

    }

    private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
                                                       ProgressListener progressListener, String asUser, Map conf,
                                                       String serConf, NimbusClient client) throws TException {
        try {
            String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
            LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);

            if (opts != null) {
                client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
            } else {
                // this is for backwards compatibility
                client.getClient().submitTopology(name, jar, serConf, topology);
            }
            LOG.info("Finished submitting topology: {}", name);
        } catch (InvalidTopologyException e) {
            LOG.warn("Topology submission exception: {}", e.get_msg());
            throw e;
        } catch (AlreadyAliveException e) {
            LOG.warn("Topology already alive exception", e);
            throw e;
        }
    }

    public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
        if (localJar == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }

        try {
            String uploadLocation = client.getClient().beginFileUpload();
            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);

            long totalSize = new File(localJar).length();
            if (listener != null) {
                listener.onStart(localJar, uploadLocation, totalSize);
            }

            long bytesUploaded = 0;
            while(true) {
                byte[] toSubmit = is.read();
                bytesUploaded += toSubmit.length;
                if (listener != null) {
                    listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
                }

                if(toSubmit.length==0) break;
                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
            }
            client.getClient().finishFileUpload(uploadLocation);

            if (listener != null) {
                listener.onCompleted(localJar, uploadLocation, totalSize);
            }

            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
            return uploadLocation;
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }
  • Topology is mainly submitted by submitTopologyAs method
  • However, submitTopologyAs called submitTopologyInDistributeMode, uploaded the dependency through DependencyUploader, and finally uploaded the jar package of topology through submitJarAs method.
  • As can be seen from the previous log, the path uploaded to nimbus is/data/nimbus/inbox/storejar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar.
  • Get client (). submittetopology mainly submits topology information.

uploadDependencyJarsToBlobStore

storm-core-1.1.0-sources.jar! /org/apache/storm/StormSubmitter.java

    private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
        LOG.info("Uploading dependencies - jars...");

        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();

        String depJarsProp = System.getProperty("storm.dependency.jars", "");
        List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);

        try {
            return uploader.uploadFiles(depJars, true);
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

uploadDependencyArtifactsToBlobStore

storm-core-1.1.0-sources.jar! /org/apache/storm/StormSubmitter.java

    private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
        LOG.info("Uploading dependencies - artifacts...");

        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();

        String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}");
        Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);

        try {
            return uploader.uploadArtifacts(depArtifacts);
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

DependencyUploader

storm-core-1.1.0-sources.jar! /org/apache/storm/dependency/DependencyUploader.java

    public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
        checkFilesExist(dependencies);

        List<String> keys = new ArrayList<>(dependencies.size());
        try {
            for (File dependency : dependencies) {
                String fileName = dependency.getName();
                String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));

                try {
                    uploadDependencyToBlobStore(key, dependency);
                } catch (KeyAlreadyExistsException e) {
                    // it should never happened since we apply UUID
                    throw new RuntimeException(e);
                }

                keys.add(key);
            }
        } catch (Throwable e) {
            if (getBlobStore() != null && cleanupIfFails) {
                deleteBlobs(keys);
            }
            throw new RuntimeException(e);
        }

        return keys;
    }

    public List<String> uploadArtifacts(Map<String, File> artifacts) {
        checkFilesExist(artifacts.values());

        List<String> keys = new ArrayList<>(artifacts.size());
        try {
            for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
                String artifact = artifactToFile.getKey();
                File dependency = artifactToFile.getValue();

                String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
                try {
                    uploadDependencyToBlobStore(key, dependency);
                } catch (KeyAlreadyExistsException e) {
                    // we lose the race, but it doesn't matter
                }

                keys.add(key);
            }
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }

        return keys;
    }

    private boolean uploadDependencyToBlobStore(String key, File dependency)
            throws KeyAlreadyExistsException, AuthorizationException, IOException {

        boolean uploadNew = false;
        try {
            // FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
            // as a workaround, we call getBlobMeta() for all keys
            getBlobStore().getBlobMeta(key);
        } catch (KeyNotFoundException e) {
            // TODO: do we want to add ACL here?
            AtomicOutputStream blob = getBlobStore()
                    .createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
            Files.copy(dependency.toPath(), blob);
            blob.close();

            uploadNew = true;
        }

        return uploadNew;
    }
  • The uploadFiles and uploadArtifacts methods finally call uploadDependencyToBlobStore.
  • The uploadDependencyToBlobStore method writes data to AtomicOutputStream

NimbusUploadAtomicOutputStream

storm-core-1.1.0-sources.jar! /org/apache/storm/blobstore/NimbusBlobStore.java

    public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
        private String session;
        private int maxChunkSize = 4096;
        private String key;

        public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
            this.session = session;
            this.maxChunkSize = bufferSize;
            this.key = key;
        }

        @Override
        public void cancel() throws IOException {
            try {
                synchronized(client) {
                    client.getClient().cancelBlobUpload(session);
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void write(int b) throws IOException {
            try {
                synchronized(client) {
                    client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b}));
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void write(byte []b) throws IOException {
            write(b, 0, b.length);
        }

        @Override
        public void write(byte []b, int offset, int len) throws IOException {
            try {
                int end = offset + len;
                for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
                    int realLen = Math.min(end - realOffset, maxChunkSize);
                    LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset));
                    synchronized(client) {
                        client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen));
                    }
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void close() throws IOException {
            try {
                synchronized(client) {
                    client.getClient().finishBlobUpload(session);
                    client.getClient().createStateInZookeeper(key);
                }
            } catch (TException e) {
                throw new RuntimeException(e);
            }
        }
    }
  • NimbusUploadAtomicOutputStream’s write method uploads data through client.getClient (). UploadBlobchunk

send&recv

storm-core-1.1.0-sources.jar! /org/apache/storm/generated/Nimbus.java

    public String beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
    {
      send_beginFileUpload();
      return recv_beginFileUpload();
    }

    public void send_beginFileUpload() throws org.apache.thrift.TException
    {
      beginFileUpload_args args = new beginFileUpload_args();
      sendBase("beginFileUpload", args);
    }

    public String recv_beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
    {
      beginFileUpload_result result = new beginFileUpload_result();
      receiveBase(result, "beginFileUpload");
      if (result.is_set_success()) {
        return result.success;
      }
      if (result.aze != null) {
        throw result.aze;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result");
    }

    public void send_finishFileUpload(String location) throws org.apache.thrift.TException
    {
      finishFileUpload_args args = new finishFileUpload_args();
      args.set_location(location);
      sendBase("finishFileUpload", args);
    }

    public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
    {
      send_uploadChunk(location, chunk);
      recv_uploadChunk();
    }

    public void send_uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift.TException
    {
      uploadChunk_args args = new uploadChunk_args();
      args.set_location(location);
      args.set_chunk(chunk);
      sendBase("uploadChunk", args);
    }

    public void recv_uploadChunk() throws AuthorizationException, org.apache.thrift.TException
    {
      uploadChunk_result result = new uploadChunk_result();
      receiveBase(result, "uploadChunk");
      if (result.aze != null) {
        throw result.aze;
      }
      return;
    }

    public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
    {
      send_submitTopology(name, uploadedJarLocation, jsonConf, topology);
      recv_submitTopology();
    }

    public void send_submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws org.apache.thrift.TException
    {
      submitTopology_args args = new submitTopology_args();
      args.set_name(name);
      args.set_uploadedJarLocation(uploadedJarLocation);
      args.set_jsonConf(jsonConf);
      args.set_topology(topology);
      sendBase("submitTopology", args);
    }

    public void recv_submitTopology() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
    {
      submitTopology_result result = new submitTopology_result();
      receiveBase(result, "submitTopology");
      if (result.e != null) {
        throw result.e;
      }
      if (result.ite != null) {
        throw result.ite;
      }
      if (result.aze != null) {
        throw result.aze;
      }
      return;
    }

    public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
    {
      send_uploadBlobChunk(session, chunk);
      recv_uploadBlobChunk();
    }

    public void send_uploadBlobChunk(String session, ByteBuffer chunk) throws org.apache.thrift.TException
    {
      uploadBlobChunk_args args = new uploadBlobChunk_args();
      args.set_session(session);
      args.set_chunk(chunk);
      sendBase("uploadBlobChunk", args);
    }

    public void recv_uploadBlobChunk() throws AuthorizationException, org.apache.thrift.TException
    {
      uploadBlobChunk_result result = new uploadBlobChunk_result();
      receiveBase(result, "uploadBlobChunk");
      if (result.aze != null) {
        throw result.aze;
      }
      return;
    }
  • SendBase sends data and receiveBase receives data.

Summary

Storm’s submission topology uploads the dependency jar specified by storm.dependency.jars, then uploads the dependency specified by storm.dependency.artifacts, and finally uploads the specified jar package. they all send data through remote method sendBase and receive data through receiveBase.

doc