Let’s talk about the partition of topic in kafka0.8 and topicCountMap.

  kafka

Order

This paper mainly studies the relationship between topicCountMap and partition of topic under kafka0.8 api.

partition

Physically, the topic is divided into one or more partitions, each partition physically corresponds to a folder under which all messages and index files of the partition are stored.

Partition and consumer

  • If there are more consumers than partitions, it is a waste, because kafka’s design does not allow concurrency on one partition, so the number of consumers should not be greater than the number of partitions.
  • If the number of consumers is less than the number of partitons, one consumer will correspond to multiple partitions. here, the number of consumers and the number of partitions are mainly allocated rationally, otherwise, the data in the partition will be taken unevenly. it is better that the number of partitions is an integer multiple of the number of consumers, so the number of partitions is very important. for example, if you take 24, it is easy to set the number of consumers.
  • If consumer reads data from multiple partitions, it does not guarantee the order of the data. kafka only guarantees that the data is orderly on one partition, but multiple partitions will differ according to the order of your reading.

When kafka producer sends a message, if there is a key, hash it according to the key and distribute it to the specified partition; ; If there is no key, partition by counter.

rebalance

If the consumer, broker, and partition are increased or decreased, the balance will be changed, and the partition corresponding to the consumer will change after the balance.

For example, reduce one consumer, and then after rebalance, partition corresponding to consumer will readjust mapping.

topicCountMap

Tell Kafka how many threads we will use in Consumer to consume the topic. The key of topicCountMap is topic name, and value is the number of threads for the topiccountmap.

Assuming there is a topic, there are 6 partiton, and then two consumers are started, and the topicCount of each consumer is 3, the observation will find that the consuming thread of each consumer is running.
If the topicCount of each consumer becomes 4, it will be found that 4 threads are running in the consumer started first, only 2 threads are running in the consumer started later, and the other 2 threads are blocked.

In other words, for consumers, the actual number of consumption = number of consumer instances * number of topicCount per consumer. if this value > partition, some consumption threads will be redundant and blocked.
If this value < =partition, all consuming threads are consuming.
Therefore, when a consumer is actually deployed in a distributed manner, the number of consumer instances * the number of =topicCount per consumer < = the partition value of topic.

Code instance

  • Create topic
sh kafka-topics.sh --create --topic topic20170921 --replication-factor 1 --partitions 6 --zookeeper localhost:2181
  • View consumer group
sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-group-0921
  • consumer
public class NativeConsumer {

    ExecutorService pool = Executors.newFixedThreadPool(10);

    public void exec(String topic,String zk,int consumerCount,String group) throws UnsupportedEncodingException {
        Properties props = new Properties();
        props.put("zookeeper.connect", zk);
//        props.put("auto.offset.reset","smallest");
        props.put("group.id",group);
        props.put("zookeeper.session.timeout.ms", "10000");
        props.put("zookeeper.sync.time.ms", "2000");
        props.put("auto.commit.interval.ms", "10000");
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
        ConsumerConfig consumerConfig =  new kafka.consumer.ConsumerConfig(props);
        ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, consumerCount);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        consumerMap.get(topic).stream().forEach(stream -> {

            pool.submit(new Runnable() {
                @Override
                public void run() {
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
                    while (it.hasNext()) {
                        System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
                    }
                }
            });

        });
    }
}
  • producer
public class NativeProducer {

    public void produce(String topic,String brokerAddr) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");

        try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props)) {
            int totalCountOfSendedMessages = 0;
            long totalSendTime = 0;

            long timeOfLastUpdate = 0;
            int countOfMessagesInSec = 0;

            for(int i=0;i<1000000;i++){
                //todo key不能相同,否则都发送到同一个partition了,消费者无法scale out
                byte[] dataKey = SerializationUtils.serialize(UUID.randomUUID().toString());
                byte[] dataValue = SerializationUtils.serialize(UUID.randomUUID().toString());

                ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(
                        topic,
                        dataKey,
                        dataValue
                );

                long sendingStartTime = System.currentTimeMillis();
                // Sync send
                producer.send(producerRecord).get();
                Thread.sleep(100);
                long currentTime = System.currentTimeMillis();

                long sendTime = currentTime - sendingStartTime;

                totalSendTime += sendTime;

                totalCountOfSendedMessages++;
                countOfMessagesInSec++;
                if (currentTime - timeOfLastUpdate > TimeUnit.SECONDS.toMillis(1)) {
                    System.out.println("Average send time: " +
                            (double) (totalSendTime / totalCountOfSendedMessages) + " ms.");
                    System.out.println("Count of messages in second: " + countOfMessagesInSec);

                    timeOfLastUpdate = currentTime;
                    countOfMessagesInSec = 0;
                }
            }

        }
    }
}
  • test
    String zkAddr = "localhost:2181";
    String topic = "topic20170921"; //partition 6
    String brokerAddr = "localhost:9092";
    String group = "test-group-0921";

    @Test
    public void testConsumer1() throws InterruptedException {
        NativeConsumer nativeConsumer = new NativeConsumer();
        try {
            nativeConsumer.exec(topic,zkAddr,4,group);
        } catch (UnsupportedEncodingException e1) {
            e1.printStackTrace();
        }
        Thread.sleep(100000);
    }

    @Test
    public void testConsumer2() throws InterruptedException {
        NativeConsumer nativeConsumer = new NativeConsumer();
        try {
            nativeConsumer.exec(topic,zkAddr,4,group);
        } catch (UnsupportedEncodingException e1) {
            e1.printStackTrace();
        }
        Thread.sleep(100000);
    }

    @Test
    public void testProducer() throws UnsupportedEncodingException, InterruptedException {
        NativeProducer producer = new NativeProducer();
        try {
            producer.produce(topic,brokerAddr);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

doc