Flink 寫(xiě)入數據到 Kafka
前言
通過(guò)Flink官網(wǎng)可以看到Flink里面就默認支持了不少sink,比如也支持Kafka sink connector(FlinkKafkaProducer),那么這篇文章我們就來(lái)看看如何將數據寫(xiě)入到Kafka。
準備
Flink里面支持Kafka 0.8、0.9、0.10、0.11.
這里我們需要安裝下Kafka,請對應添加對應的Flink Kafka connector依賴(lài)的版本,這里我們使用的是0.11 版本:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
目前我們先看下本地Kafka是否有這個(gè)student-write topic呢?需要執行下這個(gè)命令:
./kafka-topics.sh --list --zookeeper localhost:2181
執行結果:
Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true
__consumer_offsets
metric
student
如果等下我們的程序運行起來(lái)后,再次執行這個(gè)命令出現student-write topic,那么證明我的程序確實(shí)起作用了,已經(jīng)將其他集群的Kafka數據寫(xiě)入到本地Kafka了。
代碼
package com.thinker.kafka;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.*;
import java.util.Properties;
/**
* @author zeekling [lingzhaohui@zeekling.cn]
* @version 1.0
* @apiNote
* @since 2020-05-14
*/
public class FlinkSinkToKafka {
private static final String READ_TOPIC = "student-write";
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "student-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
READ_TOPIC, //這個(gè) kafka topic 需要和上面的工具類(lèi)的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1);
student.print();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "student-write");
student.addSink(new FlinkKafkaProducer011<String>(
"student-write",
new SimpleStringSchema(),
properties
)).name("flink-connectors-kafka").setParallelism(1);
student.print();
env.execute("flink learning connectors kafka");
}
}
運行程序
將下面列舉出來(lái)的包拷貝到flink對應的目錄下面,并且重啟flink。
執行下面命令提交flink任務(wù)
./bin/flink run -c com.thinker.kafka.FlinkSinkToKafka ~/project/flink-test/target/flink-test-1.0-SNAPSHOT.jar
提交成功后執行下面命令:
/kafka-topics.sh --list --zookeeper localhost:2181
執行結果:

0 評論