Customize processor for kafka streams

  kafka

Order

This article will analyze the KStreamBuilder of kafka streams and give an example of how to customize the processor of kafka streams.

Example

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("demo-topic");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

Topology is hidden in KStreamBuilder.

KStreamBuilder

kafka-streams-0.10.2.1-sources.jar! /org/apache/kafka/streams/kstream/KStreamBuilder.java

public class KStreamBuilder extends TopologyBuilder {

    public <K, V> KStream<K, V> stream(final String... topics) {
        return stream(null, null, null, topics);
    }
    public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset,
                                       final Serde<K> keySerde,
                                       final Serde<V> valSerde,
                                       final String... topics) {
        final String name = newName(KStreamImpl.SOURCE_NAME);

        addSource(offsetReset, name,  keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);

        return new KStreamImpl<>(this, name, Collections.singleton(name), false);
    }
}

AddSource here is the method of calling TopologyBuilder

TopologyBuilder

kafka-streams-0.10.2.1-sources.jar! /org/apache/kafka/streams/processor/TopologyBuilder.java

public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) {
        if (topics.length == 0) {
            throw new TopologyBuilderException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (nodeFactories.containsKey(name))
            throw new TopologyBuilderException("Processor " + name + " is already added.");

        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            validateTopicNotAlreadyRegistered(topic);
            maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic);
            sourceTopicNames.add(topic);
        }

        nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
        nodeToSourceTopics.put(name, Arrays.asList(topics));
        nodeGrouper.add(name);

        return this;
    }

processor topology

Stream Processing Topology

Is this topology very familiar? storm also has topology

        TopologyBuilder builder = new TopologyBuilder();
        //并发度10
        builder.setSpout("spout", new TestWordSpout(), 10);
        builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word"));
        builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count");

        String topologyName = "DemoTopology";
        Config conf = new Config();
        conf.setDebug(true);

        try {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf,builder.createTopology());
            Thread.sleep(60 * 1000);
            cluster.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

Processor Topology defines the flow mode of data among various processing units (called processor in Kafka Stream), or defines the processing logic of data.

See kafka-streams-0.10.2.1-sources.jar! /org/apache/kafka/streams/kstream/internals/KStreamImpl.java

@Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = topology.newName(FILTER_NAME);

        topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name);

        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
    }

    @Override
    public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        String name = topology.newName(FILTER_NAME);

        topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name);

        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
    }

    @Override
    public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true);
    }
    @Override
    public <K1, V1> KStream<K1, V1> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = topology.newName(MAP_NAME);

        topology.addProcessor(name, new KStreamMap<K, V, K1, V1>(mapper), this.name);

        return new KStreamImpl<>(topology, name, sourceNodes, true);
    }


    @Override
    public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = topology.newName(MAPVALUES_NAME);

        topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name);

        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
    }
    
     @Override
    public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = topology.newName(FLATMAP_NAME);

        topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name);

        return new KStreamImpl<>(topology, name, sourceNodes, true);
    }

    @Override
    public <V1> KStream<K, V1> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        String name = topology.newName(FLATMAPVALUES_NAME);

        topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name);

        return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired);
    }
    
    @Override
    public void foreach(ForeachAction<? super K, ? super V> action) {
        Objects.requireNonNull(action, "action can't be null");
        String name = topology.newName(FOREACH_NAME);

        topology.addProcessor(name, new KStreamForeach<>(action), this.name);
    }

You can see that all kinds of streaming operations add processor to topology.

ProcessorSupplier

kafka-streams-0.10.2.1-sources.jar! /org/apache/kafka/streams/processor/ProcessorSupplier.java
The specific streaming processor can see the implementation class of this ProcessorSupplier, such as kafka-streams-0.10.2.1-sources.jar! /org/apache/kafka/streams/kstream/internals/KStreamFilter.java

class KStreamFilter<K, V> implements ProcessorSupplier<K, V> {

    private final Predicate<K, V> predicate;
    private final boolean filterNot;

    public KStreamFilter(Predicate<K, V> predicate, boolean filterNot) {
        this.predicate = predicate;
        this.filterNot = filterNot;
    }

    @Override
    public Processor<K, V> get() {
        return new KStreamFilterProcessor();
    }

    private class KStreamFilterProcessor extends AbstractProcessor<K, V> {
        @Override
        public void process(K key, V value) {
            if (filterNot ^ predicate.test(key, value)) {
                context().forward(key, value);
            }
        }
    }
}

Processor instance of word count

public class WordCountProcessorSupplier implements ProcessorSupplier<String, Processor> {
    @Override
    public Processor get() {
        return new WordCountProcessor();
    }

    private class WordCountProcessor implements Processor<String, String>{

        private ProcessorContext context;

        private KeyValueStore<String, Integer> kvStore;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(1000);
            this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
        }

        @Override
        public void process(String key, String value) {
            String[] words = value.toLowerCase(Locale.getDefault()).split(" ");
            for (String word : words) {
                Integer oldValue = this.kvStore.get(word);
                if (oldValue == null) {
                    this.kvStore.put(word, 1);
                } else {
                    this.kvStore.put(word, oldValue + 1);
                }
            }
        }

        /**
         * Perform any periodic operations
         * @param timestamp
         */
        @Override
        public void punctuate(long timestamp) {
            try (KeyValueIterator<String, Integer> itr = this.kvStore.all()) {
                while (itr.hasNext()) {
                    KeyValue<String, Integer> entry = itr.next();
                    System.out.println("[" + entry.key + ", " + entry.value + "]");
                    context.forward(entry.key, entry.value.toString());
                }
                context.commit();
            }
        }

        @Override
        public void close() {
            this.kvStore.close();
        }
    }
}

Configuration

Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
        // Note: To re-run the demo, you need to use the offset reset tool:
        // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", "wc-input");
        builder.addProcessor("PROCESSOR1", new WordCountProcessorSupplier(), "SOURCE");
        builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
                "PROCESSOR1");
//        builder.addSink("SINK", OUTPUT_TOPIC_NAME, "PROCESSOR1");


        KafkaStreams streams = new KafkaStreams(builder, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            e.printStackTrace();
        }

Output instance

[aaa, 2]
[bb, 1]
[ccc, 1]
[ddd, 1]
[hello, 2]
[kafka, 1]
[world, 2]
2017-10-16 22:33:01.835  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-16 22:33:08.775  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-10-16 22:33:08.776  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-16 22:33:08.776  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-16 22:33:08.776  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_2
[aaa, 1]
2017-10-16 22:33:11.621  INFO   --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_2
[bb, 1]

doc