日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受

Data Sink 介紹

Data sink 有點(diǎn)把數據存儲下來(lái)(落庫)的意思。

735d8ad5f81eea372f9ec6d847c01ee1.jpg

如上圖,Source 就是數據的來(lái)源,中間的 Compute 其實(shí)就是 Flink 干的事情,可以做一系列的操作,操作完后就把計算后的數據結果 Sink 到某個(gè)地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。這里我說(shuō)下自己目前做告警這塊就是把 Compute 計算后的結果 Sink 直接告警出來(lái)了(發(fā)送告警消息到釘釘群、郵件、短信等),這個(gè) sink 的意思也不一定非得說(shuō)成要把數據存儲到某個(gè)地方去。其實(shí)官網(wǎng)用的 Connector 來(lái)形容要去的地方更合適,這個(gè) Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。

Flink Data Sink

前面文章 Data Source 介紹 介紹了 Flink Data Source 有哪些,這里也看看 Flink Data Sink 支持的有哪些。

63cd34dfdbaa7b4b080e11f8dd204d3f.jpg

看下源碼有哪些呢?

e0204a474154c290ed2632c251cff2b6.jpg

可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。

SinkFunction

745d781aedd5602b86acaa668efe02d3.png

從上圖可以看到 SinkFunction 接口有 invoke 方法,它有一個(gè) RichSinkFunction 抽象類(lèi)。

上面的那些自帶的 Sink 可以看到都是繼承了 RichSinkFunction 抽象類(lèi),實(shí)現了其中的方法,那么我們要是自己定義自己的 Sink 的話(huà)其實(shí)也是要按照這個(gè)套路來(lái)做的。

這里就拿個(gè)較為簡(jiǎn)單的 PrintSinkFunction 源碼來(lái)講下:

@PublicEvolving
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
   private static final long serialVersionUID = 1L;

   private static final boolean STD_OUT = false;
   private static final boolean STD_ERR = true;

   private boolean target;
   private transient PrintStream stream;
   private transient String prefix;

   /**
    * Instantiates a print sink function that prints to standard out.
    */
   public PrintSinkFunction() {}

   /**
    * Instantiates a print sink function that prints to standard out.
    *
    * @param stdErr True, if the format should print to standard error instead of standard out.
    */
   public PrintSinkFunction(boolean stdErr) {
       target = stdErr;
   }

   public void setTargetToStandardOut() {
       target = STD_OUT;
   }

   public void setTargetToStandardErr() {
       target = STD_ERR;
   }

   @Override
   public void open(Configuration parameters) throws Exception {
       super.open(parameters);
       StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
       // get the target stream
       stream = target == STD_OUT ? System.out : System.err;

       // set the prefix if we have a >1 parallelism
       prefix = (context.getNumberOfParallelSubtasks() > 1) ?
               ((context.getIndexOfThisSubtask() + 1) + "> ") : null;
   }

   @Override
   public void invoke(IN record) {
       if (prefix != null) {
           stream.println(prefix + record.toString());
       }
       else {
           stream.println(record.toString());
       }
   }

   @Override
   public void close() {
       this.stream = null;
       this.prefix = null;
   }

   @Override
   public String toString() {
       return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
   }
}

可以看到它就是實(shí)現了 RichSinkFunction 抽象類(lèi),然后實(shí)現了 invoke 方法,這里 invoke 方法就是把記錄打印出來(lái)了就是,沒(méi)做其他的額外操作。

How to use?

SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();

這樣就可以了,如果是其他的 Sink Function 的話(huà)需要換成對應的。

使用這個(gè) Function 其效果就是打印從 Source 過(guò)來(lái)的數據,和直接 Source.print() 效果一樣。

3a4dff87d0758a076e47d813a24d22a5.png



標 題:《Data Sink 介紹
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受