Category : kafka

maven <!– log4j2 kafka appender –> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> <exclusions> <!– exclude掉过时的log依赖 –> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency> <!– log4j2 async –> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.0</version> </dependency> log4j2.xml <?xml version=”1.0″ encoding=”UTF-8″?> <Configuration status=”warn” name=”MyApp” packages=””> <Appenders> <Console name=”Console” target=”SYSTEM_OUT” ignoreExceptions=”false”> <PatternLayout pattern=”%d{yyyy-MM-dd HH:mm:ss} [%t] ..

Read more

brief introduction Kafka0.10, if you consume topic, you don’t need to know zk’s address, just broker’s address. maven <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency> Consumption /** * java -jar -Dfile.encoding=utf-8 \ * kafka-consumer-0.0.1-SNAPSHOT.jar \ * localhost:9092 topicname groupid earliest */ @SpringBootApplication public class KafkaConsumerApplication { private static volatile boolean isStop = false; public static void main(String[] ..

Read more

maven <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR6</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> Producer allocation server: port: 8081 spring: application: name: output-demo cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: localhost:9092 zk-nodes: localhost:2182 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: output: destination: event-demo content-type: text/plain producer: partitionCount: 1 Java code @EnableBinding(Source.class) ..

Read more

Order This article briefly introduces some attribute configurations of Spring-Cloud-Stream-Binder-Kafka. maven <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> <version>1.0.3.RELEASE</version> </dependency> Stream attribute spring-cloud-stream-1.0.3.RELEASE-sources.jar! /org/springframework/cloud/stream/config/ChannelBindingServiceProperties.java spring: cloud: stream: instanceIndex: 0 ##支持环境变量INSTANCE_INDEX ## The instance index of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka instanceCount: 1 ## The number of deployed instances of an ..

Read more

Order Here is a brief demonstration of how to use kafka0.8 client to consume a topic. maven <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency> Initialize client Properties props = new Properties(); props.put(“zookeeper.connect”, zk); // props.put(“auto.offset.reset”,”smallest”); props.put(“group.id”,group); props.put(“zookeeper.session.timeout.ms”, “10000”); props.put(“zookeeper.sync.time.ms”, “2000”); props.put(“auto.commit.interval.ms”, “10000”); props.put(“consumer.timeout.ms”,”10000″); //设置ConsumerIterator的hasNext的超时时间,不设置则永远阻塞直到有新消息来 props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, “range”); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> ..

Read more

Order This article briefly introduces the example of producer of kafka0.8 client. maven <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency> Initialization Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr); props.put(ProducerConfig.CLIENT_ID_CONFIG, “DemoProducer”); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, “org.apache.kafka.common.serialization.StringSerializer”); KafkaProducer<String, String> producer = new KafkaProducer<>(props) Prepare news String dataKey = UUID.randomUUID().toString(); String dataValue = UUID.randomUUID().toString(); ProducerRecord<String, String> producerRecord = new ProducerRecord<>( topic, dataKey, ..

Read more

Order This article briefly analyzes exception handling of java producer in Kafka version kafka0.8.2.2. Overview Kafka’s java producer is sent asynchronously, mainly in several steps: Append to RecordAccumulator Sender takes out RecordBatch from RecordAccumulator and sends it to client for sending. NetworkClient deals with broker and sends RecordBatch out. This involves several steps of exceptions. ..

Read more

Order This article mainly explains several configuration parameters of kafka producers. List of Parameters and Importance static { config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, BOOSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, Type.STRING, “1”, in(Arrays.asList(“all”, “-1”, “0”, “1”)), Importance.HIGH, ACKS_DOC) .define(COMPRESSION_TYPE_CONFIG, Type.STRING, “none”, Importance.HIGH, ..

Read more

Order This article mainly talks about a group coordinator of kafka. At the time of Kafka version 0.9.0, the new consumer config was started. The new consumer config uses bootstrap.servers to replace the previous version of zookeeper.connect, mainly to gradually weaken zk dependence and hide zk dependence behind broker. group coordinator Using bootstrap.servers to replace ..

Read more

Order This article briefly introduces how docker runs kafka and kafka manager. mirror image docker pull dockerkafka/zookeeper docker pull dockerkafka/kafka docker pull dockerkafka/kafka-manager Start up docker run -it –rm –name kafkadocker_zookeeper_1 -p 2181:2181 dockerkafka/zookeeper docker run -it –rm –name kafkadocker_kafka_1 –link kafkadocker_zookeeper_1:zookeeper -p 9092:9092 dockerkafka/kafka docker run -it –rm –link kafkadocker_zookeeper_1:zookeeper –link kafkadocker_kafka_1:kafka -p 9000:9000 ..

Read more