Talk about serialization of storm tuple

  storm

Order

This article mainly studies the serialization of storm tuple.

ExecutorTransfer.tryTransfer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java

// Every executor has an instance of this class
public class ExecutorTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);

    private final WorkerState workerData;
    private final KryoTupleSerializer serializer;
    private final boolean isDebug;
    private int indexingBase = 0;
    private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
    private AtomicReferenceArray<JCQueue> queuesToFlush;
        // [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance


    public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
        this.workerData = workerData;
        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
        this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    }

    //......

    // adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
        if (isDebug) {
            LOG.info("TRANSFERRING tuple {}", addressedTuple);
        }

        JCQueue localQueue = getLocalQueue(addressedTuple);
        if (localQueue != null) {
            return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
        }
        return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
    }

    //......
}
  • ExecutorTransfer created KryoTupleSerializer in the constructor
  • Here, it is first determined whether the target address is in the localQueue. if so, localransfer is performed, otherwise, remote transfer is performed.
  • WorkerData.tryTransferRemote was called when remote transfer and serializer was passed.

WorkerState.tryTransferRemote

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java

    /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */
    public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
        return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
    }
  • WorkerState.tryTransferRemote actually uses workertransfer.

workerTransfer.tryTransferRemote

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java

    /* Not a Blocking call. If cannot emit, will add 'tuple' to 'pendingEmits' and return 'false'. 'pendingEmits' can be null */
    public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
        if (pendingEmits != null && !pendingEmits.isEmpty()) {
            pendingEmits.add(addressedTuple);
            return false;
        }

        if (!remoteBackPressureStatus[addressedTuple.dest].get()) {
            TaskMessage tm = new TaskMessage(addressedTuple.getDest(), serializer.serialize(addressedTuple.getTuple()));
            if (transferQueue.tryPublish(tm)) {
                return true;
            }
        } else {
            LOG.debug("Noticed Back Pressure in remote task {}", addressedTuple.dest);
        }
        if (pendingEmits != null) {
            pendingEmits.add(addressedTuple);
        }
        return false;
    }
  • Here you can see that when creating TaskMessage, serializer.serialize (addressed tuple.gettuple ()) was used to serialize the tuple. The serializer is of type ITupleSerializer, and its implementation class is KryoTupleSerializer.

KryoTupleSerializer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java

public class KryoTupleSerializer implements ITupleSerializer {
    KryoValuesSerializer _kryo;
    SerializationFactory.IdDictionary _ids;
    Output _kryoOut;

    public KryoTupleSerializer(final Map<String, Object> conf, final GeneralTopologyContext context) {
        _kryo = new KryoValuesSerializer(conf);
        _kryoOut = new Output(2000, 2000000000);
        _ids = new SerializationFactory.IdDictionary(context.getRawTopology());
    }

    public byte[] serialize(Tuple tuple) {
        try {

            _kryoOut.clear();
            _kryoOut.writeInt(tuple.getSourceTask(), true);
            _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
            tuple.getMessageId().serialize(_kryoOut);
            _kryo.serializeInto(tuple.getValues(), _kryoOut);
            return _kryoOut.toBytes();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    //    public long crc32(Tuple tuple) {
    //        try {
    //            CRC32OutputStream hasher = new CRC32OutputStream();
    //            _kryo.serializeInto(tuple.getValues(), hasher);
    //            return hasher.getValue();
    //        } catch (IOException e) {
    //            throw new RuntimeException(e);
    //        }
    //    }
}
  • KryoTupleSerializer created KryoValuesSerializer and called _ kryo.serializeinto (tuple.getvalues (), _ kryout) when serialize tuple.

KryoValuesSerializer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java

public class KryoValuesSerializer {
    Kryo _kryo;
    ListDelegate _delegate;
    Output _kryoOut;

    public KryoValuesSerializer(Map<String, Object> conf) {
        _kryo = SerializationFactory.getKryo(conf);
        _delegate = new ListDelegate();
        _kryoOut = new Output(2000, 2000000000);
    }

    public void serializeInto(List<Object> values, Output out) {
        // this ensures that list of values is always written the same way, regardless
        // of whether it's a java collection or one of clojure's persistent collections 
        // (which have different serializers)
        // Doing this lets us deserialize as ArrayList and avoid writing the class here
        _delegate.setDelegate(values);
        _kryo.writeObject(out, _delegate);
    }

    public byte[] serialize(List<Object> values) {
        _kryoOut.clear();
        serializeInto(values, _kryoOut);
        return _kryoOut.toBytes();
    }

    public byte[] serializeObject(Object obj) {
        _kryoOut.clear();
        _kryo.writeClassAndObject(_kryoOut, obj);
        return _kryoOut.toBytes();
    }
}
  • KryoValuesSerializer calls serializationfactory.getkryo (conf) method to create _kryo in the constructor
  • The _delegate here uses a ListDelegate (Use it to wrap List<Object> values), _ kryout is new Output(2000, 2000000000)
  • The serialize method calls the serializeInto method, which finally calls the native _kryo.writeObject method for serialization.

SerializationFactory.getKryo

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java

    public static Kryo getKryo(Map<String, Object> conf) {
        IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
        Kryo k = kryoFactory.getKryo(conf);
        k.register(byte[].class);

        /* tuple payload serializer is specified via configuration */
        String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
        try {
            Class serializerClass = Class.forName(payloadSerializerName);
            Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
            k.register(ListDelegate.class, serializer);
        } catch (ClassNotFoundException ex) {
            throw new RuntimeException(ex);
        }

        k.register(ArrayList.class, new ArrayListSerializer());
        k.register(HashMap.class, new HashMapSerializer());
        k.register(HashSet.class, new HashSetSerializer());
        k.register(BigInteger.class, new BigIntegerSerializer());
        k.register(TransactionAttempt.class);
        k.register(Values.class);
        k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
        k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
        k.register(ConsList.class);
        k.register(BackPressureStatus.class);

        synchronized (loader) {
            for (SerializationRegister sr : loader) {
                try {
                    sr.register(k);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }

        kryoFactory.preRegister(k, conf);

        boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);

        register(k, conf.get(Config.TOPOLOGY_KRYO_REGISTER), conf, skipMissing);

        kryoFactory.postRegister(k, conf);

        if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
            for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
                try {
                    Class klass = Class.forName(klassName);
                    IKryoDecorator decorator = (IKryoDecorator) klass.newInstance();
                    decorator.decorate(k);
                } catch (ClassNotFoundException e) {
                    if (skipMissing) {
                        LOG.info("Could not find kryo decorator named " + klassName + ". Skipping registration...");
                    } else {
                        throw new RuntimeException(e);
                    }
                } catch (InstantiationException e) {
                    throw new RuntimeException(e);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        kryoFactory.postDecorate(k, conf);

        return k;
    }

    public static void register(Kryo k, Object kryoRegistrations, Map<String, Object> conf, boolean skipMissing) {
        Map<String, String> registrations = normalizeKryoRegister(kryoRegistrations);
        for (Map.Entry<String, String> entry : registrations.entrySet()) {
            String serializerClassName = entry.getValue();
            try {
                Class klass = Class.forName(entry.getKey());
                Class serializerClass = null;
                if (serializerClassName != null) {
                    serializerClass = Class.forName(serializerClassName);
                }
                if (serializerClass == null) {
                    k.register(klass);
                } else {
                    k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
                }
            } catch (ClassNotFoundException e) {
                if (skipMissing) {
                    LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration...");
                } else {
                    throw new RuntimeException(e);
                }
            }
        }
    }
  • The static method of SerializationFactory.getKryo first creates IKryoFactory according to Config.TOPOLOGY_KRYO_FACTORY, and the default is org.Apache.store.serialization.defaultkryofactory.
  • After that, Kryo was created by IKryoFactory.getKryo. After that, Kryo was configured in a series of ways. Byte[].class, ListDelegate.class, ArrayList.class, HashMap.class, HashSet.class, BigInteger.class, TransactionAttempt.class, Values.class, org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class
  • Class is the container of payload, using config.topology _ tuple _ serializer (Topology.tuple.serializer, default is org.Apache.storm.serialization.types.listdelegatesterializer) to serialize the configured class
  • Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(The default is false.), when kryo cannot find the configured class corresponding serializers to serialize, whether to throw an exception or skip registration directly;
  • Finally, through config.topology _ kryo _ decorators (topology.kryo.decoratorsLoad a custom serialization

DefaultKryoFactory

storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java

public class DefaultKryoFactory implements IKryoFactory {

    @Override
    public Kryo getKryo(Map<String, Object> conf) {
        KryoSerializableDefault k = new KryoSerializableDefault();
        k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)));
        k.setReferences(false);
        return k;
    }

    @Override
    public void preRegister(Kryo k, Map<String, Object> conf) {
    }

    public void postRegister(Kryo k, Map<String, Object> conf) {
        ((KryoSerializableDefault) k).overrideDefault(true);
    }

    @Override
    public void postDecorate(Kryo k, Map<String, Object> conf) {
    }

    public static class KryoSerializableDefault extends Kryo {
        boolean _override = false;

        public void overrideDefault(boolean value) {
            _override = value;
        }

        @Override
        public Serializer getDefaultSerializer(Class type) {
            if (_override) {
                return new SerializableSerializer();
            } else {
                return super.getDefaultSerializer(type);
            }
        }
    }
}
  • Here read config.topology _ fall _ back _ on _ java _ serialization (topology.fall.back.on.java.serialization), the default value is true, then registrationRequired is set to false here, that is, serialization does not require the class to be in the registered list.

Kryo

kryo-4.0.2-sources.jar! /com/esotericsoftware/kryo/Kryo.java

    /** If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered
     * using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
     * @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
     * @see ClassResolver#getRegistration(Class) */
    public Registration getRegistration (Class type) {
        if (type == null) throw new IllegalArgumentException("type cannot be null.");

        Registration registration = classResolver.getRegistration(type);
        if (registration == null) {
            if (Proxy.isProxyClass(type)) {
                // If a Proxy class, treat it like an InvocationHandler because the concrete class for a proxy is generated.
                registration = getRegistration(InvocationHandler.class);
            } else if (!type.isEnum() && Enum.class.isAssignableFrom(type) && !Enum.class.equals(type)) {
                // This handles an enum value that is an inner class. Eg: enum A {b{}};
                registration = getRegistration(type.getEnclosingClass());
            } else if (EnumSet.class.isAssignableFrom(type)) {
                registration = classResolver.getRegistration(EnumSet.class);
            } else if (isClosure(type)) {
                registration = classResolver.getRegistration(ClosureSerializer.Closure.class);
            }
            if (registration == null) {
                if (registrationRequired) {
                    throw new IllegalArgumentException(unregisteredClassMessage(type));
                }
                if (warnUnregisteredClasses) {
                    warn(unregisteredClassMessage(type));
                }
                registration = classResolver.registerImplicit(type);
            }
        }
        return registration;
    }

    /** Registers the class using the lowest, next available integer ID and the {@link Kryo#getDefaultSerializer(Class) default
     * serializer}. If the class is already registered, no change will be made and the existing registration will be returned.
     * Registering a primitive also affects the corresponding primitive wrapper.
     * <p>
     * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
     * using this method. The order must be the same at deserialization as it was for serialization. */
    public Registration register (Class type) {
        Registration registration = classResolver.getRegistration(type);
        if (registration != null) return registration;
        return register(type, getDefaultSerializer(type));
    }

    /** Returns the best matching serializer for a class. This method can be overridden to implement custom logic to choose a
     * serializer. */
    public Serializer getDefaultSerializer (Class type) {
        if (type == null) throw new IllegalArgumentException("type cannot be null.");

        final Serializer serializerForAnnotation = getDefaultSerializerForAnnotatedType(type);
        if (serializerForAnnotation != null) return serializerForAnnotation;

        for (int i = 0, n = defaultSerializers.size(); i < n; i++) {
            DefaultSerializerEntry entry = defaultSerializers.get(i);
            if (entry.type.isAssignableFrom(type)) {
                Serializer defaultSerializer = entry.serializerFactory.makeSerializer(this, type);
                return defaultSerializer;
            }
        }

        return newDefaultSerializer(type);
    }

    /** Called by {@link #getDefaultSerializer(Class)} when no default serializers matched the type. Subclasses can override this
     * method to customize behavior. The default implementation calls {@link SerializerFactory#makeSerializer(Kryo, Class)} using
     * the {@link #setDefaultSerializer(Class) default serializer}. */
    protected Serializer newDefaultSerializer (Class type) {
        return defaultSerializer.makeSerializer(this, type);
    }

    /** Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already
     * registered, the existing entry is updated with the new serializer. Registering a primitive also affects the corresponding
     * primitive wrapper.
     * <p>
     * Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
     * using this method. The order must be the same at deserialization as it was for serialization. */
    public Registration register (Class type, Serializer serializer) {
        Registration registration = classResolver.getRegistration(type);
        if (registration != null) {
            registration.setSerializer(serializer);
            return registration;
        }
        return classResolver.register(new Registration(type, serializer, getNextRegistrationId()));
    }

    /** Returns the lowest, next available integer ID. */
    public int getNextRegistrationId () {
        while (nextRegisterID != -2) {
            if (classResolver.getRegistration(nextRegisterID) == null) return nextRegisterID;
            nextRegisterID++;
        }
        throw new KryoException("No registration IDs are available.");
    }
  • Kryo’s getRegistration method will determine registrationRequired when class is not registered. if true, it will throw IllegalArgumentException; ; If false, call classResolver.registerImplicit for implicit registration, and if warnUnregisteredClasses is true, the warning message will be printed.
  • If no Serializer is specified in Kryo’s register method, it will get the most matching Serializer through getDefaultSerializer. if there is no match from the already registered defaultSerializers, it will call newDefaultSerializer to create one. there may be an exception that cannot be created, and an IllegalArgumentException will be thrown.
  • The last step of the register (classtype, serializer serializer) method is to call the classresolver. register (Registration) method. if there is no Registration, here is a new one. at the same time, the registration is assigned an id through getNextRegistrationId.

DefaultClassResolver.register

kryo-4.0.2-sources.jar! /com/esotericsoftware/kryo/util/DefaultClassResolver.java

    static public final byte NAME = -1;

    protected final IntMap<Registration> idToRegistration = new IntMap();
    protected final ObjectMap<Class, Registration> classToRegistration = new ObjectMap();
    protected IdentityObjectIntMap<Class> classToNameId;

    public Registration registerImplicit (Class type) {
        return register(new Registration(type, kryo.getDefaultSerializer(type), NAME));
    }

    public Registration register (Registration registration) {
        if (registration == null) throw new IllegalArgumentException("registration cannot be null.");
        if (registration.getId() != NAME) {
            if (TRACE) {
                trace("kryo", "Register class ID " + registration.getId() + ": " + className(registration.getType()) + " ("
                    + registration.getSerializer().getClass().getName() + ")");
            }
            idToRegistration.put(registration.getId(), registration);
        } else if (TRACE) {
            trace("kryo", "Register class name: " + className(registration.getType()) + " ("
                + registration.getSerializer().getClass().getName() + ")");
        }
        classToRegistration.put(registration.getType(), registration);
        if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration);
        return registration;
    }

    public Registration writeClass (Output output, Class type) {
        if (type == null) {
            if (TRACE || (DEBUG && kryo.getDepth() == 1)) log("Write", null);
            output.writeVarInt(Kryo.NULL, true);
            return null;
        }
        Registration registration = kryo.getRegistration(type);
        if (registration.getId() == NAME)
            writeName(output, type, registration);
        else {
            if (TRACE) trace("kryo", "Write class " + registration.getId() + ": " + className(type));
            output.writeVarInt(registration.getId() + 2, true);
        }
        return registration;
    }

    protected void writeName (Output output, Class type, Registration registration) {
        output.writeVarInt(NAME + 2, true);
        if (classToNameId != null) {
            int nameId = classToNameId.get(type, -1);
            if (nameId != -1) {
                if (TRACE) trace("kryo", "Write class name reference " + nameId + ": " + className(type));
                output.writeVarInt(nameId, true);
                return;
            }
        }
        // Only write the class name the first time encountered in object graph.
        if (TRACE) trace("kryo", "Write class name: " + className(type));
        int nameId = nextNameId++;
        if (classToNameId == null) classToNameId = new IdentityObjectIntMap();
        classToNameId.put(type, nameId);
        output.writeVarInt(nameId, true);
        output.writeString(type.getName());
    }

    public void reset () {
        if (!kryo.isRegistrationRequired()) {
            if (classToNameId != null) classToNameId.clear(2048);
            if (nameIdToClass != null) nameIdToClass.clear();
            nextNameId = 0;
        }
    }
  • The id of the registration is determined in the defaultlastsresolver. registration method. if it is NAME (It is denoted here by -1.) is registered to objectmap < class, registration > classtorregistration, and if there is an id other than NAME, it is registered to intmap < registration > idoregistration
  • As mentioned earlier, if the registrationRequired is false, the classResolver.registerImplicit is called for implicit registration. here, we can see that the id of the registration registered by registerImplicit is NAME.
  • Whether the id of registration is NAME or not is embodied in writeClass (If there are not only the basic types but also unregistered classes in the fields of the class to be serialized, the writeClass method here will be called), it can be seen from the code that if it is NAME, then writeName; is used; If it is not NAME, it will directly use Output. WriteVarint; (Registration. GetID ()+2, true) to write int; The writeName method generates a NAMEid for name’s class the first time it meets it, then puts it into identityobjectintmap < class > classtoneid, writes int, and then writes class.getName. the second time it meets the class, because nameId already exists in classtoneid, it writes int directly; However, the reset method of DefaultClassResolver will call ClassToneId. Clear (2048) when registrationRequired is false to clear or resize. Once this method is called at this time, ClassToneId may not be used to replace className with id for serialization next time.

Kryo.writeObject

kryo-4.0.2-sources.jar! /com/esotericsoftware/kryo/Kryo.java

    /** Writes an object using the registered serializer. */
    public void writeObject (Output output, Object object) {
        if (output == null) throw new IllegalArgumentException("output cannot be null.");
        if (object == null) throw new IllegalArgumentException("object cannot be null.");
        beginObject();
        try {
            if (references && writeReferenceOrNull(output, object, false)) {
                getRegistration(object.getClass()).getSerializer().setGenerics(this, null);
                return;
            }
            if (TRACE || (DEBUG && depth == 1)) log("Write", object);
            getRegistration(object.getClass()).getSerializer().write(this, output, object);
        } finally {
            if (--depth == 0 && autoReset) reset();
        }
    }

    /** Resets unregistered class names, references to previously serialized or deserialized objects, and the
     * {@link #getGraphContext() graph context}. If {@link #setAutoReset(boolean) auto reset} is true, this method is called
     * automatically when an object graph has been completely serialized or deserialized. If overridden, the super method must be
     * called. */
    public void reset () {
        depth = 0;
        if (graphContext != null) graphContext.clear();
        classResolver.reset();
        if (references) {
            referenceResolver.reset();
            readObject = null;
        }

        copyDepth = 0;
        if (originalToCopy != null) originalToCopy.clear(2048);

        if (TRACE) trace("kryo", "Object graph complete.");
    }
  • Note here that the writeObject method judges that if depth is 0 and autoreset is true, the Reset method will be called when finally. The reset method calls classResolver.reset (), clearing nameIdToClass and classToNameId (classToNameId.clear(2048))

Summary

  • Storm uses kryo by default to serialize tuple. Storm has additionally registered byte[].class, ListDelegate.class, ArrayList.class, HashMap.class, HashSet.class, BigInteger.class, TransactionAttempt.class, Values.class, Org.apache.storm.metric.api.imetriconsumer.datapoint.class, org.apache.storm.metric.api.imetriconsumer.taskinfo.class, ConsList.class, BackPressureStatus.class, etc
  • Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization) If true, kryo. SetRegistrationRequired (False), that is, if a class is not registered in Kryo, no exception will be thrown; This name may be ambiguous (Not using java's own serialization mechanism to fallback), it is actually to express whether to fallback for the class that has not been registered. if not, the exception will be thrown directly. if fallback, the implicit registration will be carried out. on the premise that classtoneid will not be reset, className will be used for serialization for the first time, and an id will be assigned to write classtoneid. for the second time, the id obtained from classtoneid will be directly used, which is equivalent to the effect of manual registration.
  • Config.TOPOLOGY_TUPLE_SERIALIZER(Topology.tuple.serializer, default is org.Apache.storm.serialization.types.listdelegatesterializerThe serialization class used to configure tuple’s payload
  • Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators) is used to load the custom serialization. you can register an IKryoDecorator directly through Config.registerDecorator and register the class to serialize for Kyro in the decorate method.
  • Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(The default is false.This attribute is easily associated with config.topology _ fall _ back _ on _ java _ serialization (topology.fall.back.on.java.serialization) are confused, the former is the attribute of storm itself while the latter is the attribute of kryo wrapped by storm (registrationRequired); Config. topology _ skip _ missing _ kryo _ registers is configured to skip or throw an exception if storm cannot load the user-defined IKryoDecorator class under the scenario with custom config. topology _ kryo _ decorators
  • If Kryo’s registrationRequired is false, unregistered class will automatically be implicitly registered (Register to classToNameId), only use className when serializing for the first time, and then replace it with id to save space; However, it should be noted that if Kryo’s autoReset is true, classToNameId will be reset, so implicit registration cannot always be serialized using id instead of className when it is not the first time to encounter unregistered class.

doc