Data Sink 介紹
Data sink 有點(diǎn)把數據存儲下來(lái)(落庫)的意思。
如上圖,Source 就是數據的來(lái)源,中間的 Compute 其實(shí)就是 Flink 干的事情,可以做一系列的操作,操作完后就把計算后的數據結果 Sink 到某個(gè)地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。這里我說(shuō)下自己目前做告警這塊就是把 Compute 計算后的結果 Sink 直接告警出來(lái)了(發(fā)送告警消息到釘釘群、郵件、短信等),這個(gè) sink 的意思也不一定非得說(shuō)成要把數據存儲到某個(gè)地方去。其實(shí)官網(wǎng)用的 Connector 來(lái)形容要去的地方更合適,這個(gè) Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。
Flink Data Sink
前面文章 Data Source 介紹 介紹了 Flink Data Source 有哪些,這里也看看 Flink Data Sink 支持的有哪些。
看下源碼有哪些呢?
可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。
SinkFunction
從上圖可以看到 SinkFunction 接口有 invoke 方法,它有一個(gè) RichSinkFunction 抽象類(lèi)。
上面的那些自帶的 Sink 可以看到都是繼承了 RichSinkFunction 抽象類(lèi),實(shí)現了其中的方法,那么我們要是自己定義自己的 Sink 的話(huà)其實(shí)也是要按照這個(gè)套路來(lái)做的。
這里就拿個(gè)較為簡(jiǎn)單的 PrintSinkFunction 源碼來(lái)講下:
@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L;
private static final boolean STD_OUT = false;
private static final boolean STD_ERR = true;
private boolean target;
private transient PrintStream stream;
private transient String prefix;
/**
* Instantiates a print sink function that prints to standard out.
*/
public PrintSinkFunction() {}
/**
* Instantiates a print sink function that prints to standard out.
*
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintSinkFunction(boolean stdErr) {
target = stdErr;
}
public void setTargetToStandardOut() {
target = STD_OUT;
}
public void setTargetToStandardErr() {
target = STD_ERR;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
// get the target stream
stream = target == STD_OUT ? System.out : System.err;
// set the prefix if we have a >1 parallelism
prefix = (context.getNumberOfParallelSubtasks() > 1) ?
((context.getIndexOfThisSubtask() + 1) + "> ") : null;
}
@Override
public void invoke(IN record) {
if (prefix != null) {
stream.println(prefix + record.toString());
}
else {
stream.println(record.toString());
}
}
@Override
public void close() {
this.stream = null;
this.prefix = null;
}
@Override
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
}
}
可以看到它就是實(shí)現了 RichSinkFunction 抽象類(lèi),然后實(shí)現了 invoke 方法,這里 invoke 方法就是把記錄打印出來(lái)了就是,沒(méi)做其他的額外操作。
How to use?
SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();
這樣就可以了,如果是其他的 Sink Function 的話(huà)需要換成對應的。
使用這個(gè) Function 其效果就是打印從 Source 過(guò)來(lái)的數據,和直接 Source.print() 效果一樣。
