Order
This article briefly introduces the join operation of kafka streams.
join
A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that mu st be maintained for performing the join may grow indefinitely.
Example
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> left = builder.stream("intpu-left");
KStream<String, String> right = builder.stream("intpu-right");
KStream<String, String> all = left.selectKey((key, value) -> value.split(",")[1])
.join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner<String, String, String>() {
@Override
public String apply(String value1, String value2) {
return value1 + "--" + value2;
}
}, JoinWindows.of(30000));
all.print();
Since the join operation is based on the key, it is usually necessary to map the key again.
test
sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181
sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-left
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-right
Left input such as
1,a
2,b
3,c
3,c
4,d
1,a
2,b
3,c
1,a
2,b
3,c
4,e
5,h
6,f
7,g
Right input such as
a,hello
b,world
c,hehehe
c,aaa
d,eee
a,cccc
b,aaaaaa
c,332435
a,dddd
b,2324
c,ddddd
e,23453
h,2222222
f,0o0o0o0
g,ssss
Output instance
[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd
[KSTREAM-MERGE-0000000014]: b , 2,b--b,2324
2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-10-17 22:17:34.578 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-17 22:17:34.585 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1
Join category
We use inner join, left join and outer join here. If there is no matching record in the time window, you can use outer join to store it additionally, and then filter it again according to the matching record.
Output instance
[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd
[KSTREAM-MERGE-0000000014]: f , 4,f--f,ddddddd
2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-10-17 22:31:12.530 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1
2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_0
2017-10-17 22:31:12.531 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_2
2017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_1
2017-10-17 22:31:12.533 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_0
2017-10-17 22:31:12.539 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 1_2
2017-10-17 22:31:12.540 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_1
2017-10-17 22:31:12.541 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_2
[KSTREAM-MERGE-0000000014]: g , 5,g--null
[KSTREAM-MERGE-0000000014]: h , 6,h--null
[KSTREAM-MERGE-0000000014]: h , 6,h--h,ddddddd
Summary
Kafka streams’ join operation is very suitable for real-time matching operation of different data sources.