Data Source 介紹
分類(lèi)
Data Sources 是什么呢?就字面意思其實(shí)就可以知道:數據來(lái)源。
Flink 做為一款流式計算框架,它可用來(lái)做批處理,即處理靜態(tài)的數據集、歷史的數據集;也可以用來(lái)做流處理,即實(shí)時(shí)的處理些實(shí)時(shí)數據流,實(shí)時(shí)的產(chǎn)生數據流結果,只要數據源源不斷的過(guò)來(lái),Flink 就能夠一直計算下去,這個(gè) Data Sources 就是數據的來(lái)源地。
Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)
來(lái)為你的程序添加數據來(lái)源。
Flink 已經(jīng)提供了若干實(shí)現好了的 source functions,當然你也可以通過(guò)實(shí)現 SourceFunction 來(lái)自定義非并行的 source 或者實(shí)現 ParallelSourceFunction 接口或者擴展 RichParallelSourceFunction 來(lái)自定義并行的 source。
StreamExecutionEnvironment
中可以使用以下幾個(gè)已實(shí)現的 stream sources:
總的來(lái)說(shuō)可以分為下面幾大類(lèi):
基于集合
- fromCollection(Collection) - 從 Java 的 Java.util.Collection 創(chuàng )建數據流。集合中的所有元素類(lèi)型必須相同。
- fromCollection(Iterator, Class) - 從一個(gè)迭代器中創(chuàng )建數據流。Class 指定了該迭代器返回元素的類(lèi)型。
- fromElements(T …) - 從給定的對象序列中創(chuàng )建數據流。所有對象類(lèi)型必須相同。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input = env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(2, "start", 2.0),
new Event(3, "foobar", 3.0),
...
);
- fromParallelCollection(SplittableIterator, Class) - 從一個(gè)迭代器中創(chuàng )建并行數據流。Class 指定了該迭代器返回元素的類(lèi)型。
- generateSequence(from, to) - 創(chuàng )建一個(gè)生成指定區間范圍內的數字序列的并行數據流。
基于文件
- readTextFile(path) - 讀取文本文件,即符合 TextInputFormat 規范的文件,并將其作為字符串返回。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("file:///path/to/file");
- readFile(fileInputFormat, path) - 根據指定的文件輸入格式讀取文件(一次)。
- readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 這是上面兩個(gè)方法內部調用的方法。它根據給定的 fileInputFormat 和讀取路徑讀取文件。根據提供的 watchType,這個(gè) source 可以定期(每隔 interval 毫秒)監測給定路徑的新數據(FileProcessingMode.PROCESS_CONTINUOUSLY),或者處理一次路徑對應文件的數據并退出(FileProcessingMode.PROCESS_ONCE)。你可以通過(guò) pathFilter 進(jìn)一步排除掉需要處理的文件。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
實(shí)現
在具體實(shí)現上,Flink 把文件讀取過(guò)程分為兩個(gè)子任務(wù),即目錄監控和數據讀取。每個(gè)子任務(wù)都由單獨的實(shí)體實(shí)現。目錄監控由單個(gè)非并行(并行度為1)的任務(wù)執行,而數據讀取由并行運行的多個(gè)任務(wù)執行。后者的并行性等于作業(yè)的并行性。單個(gè)目錄監控任務(wù)的作用是掃描目錄(根據 watchType 定期掃描或僅掃描一次),查找要處理的文件并把文件分割成切分片(splits),然后將這些切分片分配給下游 reader。reader 負責讀取數據。每個(gè)切分片只能由一個(gè) reader 讀取,但一個(gè) reader 可以逐個(gè)讀取多個(gè)切分片。
重要注意:
如果 watchType 設置為 FileProcessingMode.PROCESS_CONTINUOUSLY,則當文件被修改時(shí),其內容將被重新處理。這會(huì )打破“exactly-once”語(yǔ)義,因為在文件末尾附加數據將導致其所有內容被重新處理。
如果 watchType 設置為 FileProcessingMode.PROCESS_ONCE,則 source 僅掃描路徑一次然后退出,而不等待 reader 完成文件內容的讀取。當然 reader 會(huì )繼續閱讀,直到讀取所有的文件內容。關(guān)閉 source 后就不會(huì )再有檢查點(diǎn)。這可能導致節點(diǎn)故障后的恢復速度較慢,因為該作業(yè)將從最后一個(gè)檢查點(diǎn)恢復讀取。
基于 Socket
socketTextStream(String hostname, int port) - 從 socket 讀取。元素可以用分隔符切分。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999) // 監聽(tīng) localhost 的 9999 端口過(guò)來(lái)的數據
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
自定義
addSource - 添加一個(gè)新的 source function。例如,你可以 addSource(new FlinkKafkaConsumer011<>(…)) 以從 Apache Kafka 讀取數據
說(shuō)下上面幾種的特點(diǎn)吧:
- 基于集合:有界數據集,更偏向于本地測試用
- 基于文件:適合監聽(tīng)文件修改并讀取其內容
- 基于 Socket:監聽(tīng)主機的 host port,從 Socket 中獲取數據
- 自定義 addSource:大多數的場(chǎng)景數據都是無(wú)界的,會(huì )源源不斷的過(guò)來(lái)。比如去消費 Kafka 某個(gè) topic 上的數據,這時(shí)候就需要用到這個(gè) addSource,可能因為用的比較多的原因吧,Flink 直接提供了 FlinkKafkaConsumer011 等類(lèi)可供你直接使用。你可以去看看 FlinkKafkaConsumerBase 這個(gè)基礎類(lèi),它是 Flink Kafka 消費的最根本的類(lèi)。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<KafkaEvent> input = env
.addSource(
new FlinkKafkaConsumer011<>(
parameterTool.getRequired("input-topic"), //從參數中獲取傳進(jìn)來(lái)的 topic
new KafkaEventSchema(),
parameterTool.getProperties())
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));
Flink 目前支持如下圖里面常見(jiàn)的 Source:
如果你想自己自定義自己的 Source 呢?
那么你就需要去了解一下 SourceFunction 接口了,它是所有 stream source 的根接口,它繼承自一個(gè)標記接口(空接口)Function。
SourceFunction 定義了兩個(gè)接口方法:
- run : 啟動(dòng)一個(gè) source,即對接一個(gè)外部數據源然后 emit 元素形成 stream(大部分情況下會(huì )通過(guò)在該方法里運行一個(gè) while 循環(huán)的形式來(lái)產(chǎn)生 stream)。
- cancel : 取消一個(gè) source,也即將 run 中的循環(huán) emit 元素的行為終止。
正常情況下,一個(gè) SourceFunction 實(shí)現這兩個(gè)接口方法就可以了。其實(shí)這兩個(gè)接口方法也固定了一種實(shí)現模板。比如,實(shí)現一個(gè) XXXSourceFunction,那么大致的模板是這樣的:(直接拿 FLink 源碼的實(shí)例給你看看)
