日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受

【Flink】【更新中】狀態(tài)后端和checkpoint

狀態(tài)管理

有狀態(tài)的計算是流處理框架要實(shí)現的重要功能,因為稍復雜的流處理場(chǎng)景都需要記錄狀態(tài),然后在新流入數據的基礎上不斷更新?tīng)顟B(tài)。下面的幾個(gè)場(chǎng)景都需要使用流處理的狀態(tài)功能:

  • 數據流中的數據有重復,我們想對重復數據去重,需要記錄哪些數據已經(jīng)流入過(guò)應用,當新數據流入時(shí),根據已流入過(guò)的數據來(lái)判斷去重。
  • 檢查輸入流是否符合某個(gè)特定的模式,需要將之前流入的元素以狀態(tài)的形式緩存下來(lái)。比如,判斷一個(gè)溫度傳感器數據流中的溫度是否在持續上升。
  • 對一個(gè)時(shí)間窗口內的數據進(jìn)行聚合分析,分析一個(gè)小時(shí)內某項指標的75分位或99分位的數值。
  • 雙流Join場(chǎng)景。

Flink的一個(gè)算子有多個(gè)子任務(wù),每個(gè)子任務(wù)分布在不同實(shí)例上,我們可以把狀態(tài)理解為某個(gè)算子子任務(wù)在其當前實(shí)例上的一個(gè)變量,變量記錄了數據流的歷史信息。當新數據流入時(shí),我們可以結合歷史信息來(lái)進(jìn)行計算。

pic

Managed State和Raw State

Flink有兩種基本類(lèi)型的狀態(tài):托管狀態(tài)(Managed State)和原生狀態(tài)(Raw State)。從名稱(chēng)中也能讀出兩者的區別:Managed State是由Flink管理的,Flink幫忙存儲、恢復和優(yōu)化,Raw State是開(kāi)發(fā)者自己管理的,需要自己序列化。

- Managed State Raw State
狀態(tài)管理方式 Flink Running托管,自動(dòng)存儲、自動(dòng)恢復、自動(dòng)伸縮。 用戶(hù)自己管理
狀態(tài)數據結構 Flink提供的常用數據結構,如:ValueState、ListState、MapState等。 Raw State只支持字節,任何上層數據結構需要序列化為字節數組。
使用場(chǎng)景 絕大部分算子 自定義算子

Managed State

對Managed State繼續細分,它又有兩種類(lèi)型:Keyed State和Operator State。

Keyed State

Flink 為每個(gè)鍵值維護一個(gè)狀態(tài)實(shí)例,并將具有相同鍵的所有數據,都分區到同一個(gè)算子任務(wù)中,這個(gè)任務(wù)會(huì )維護和處理這個(gè)key 對應的狀態(tài)。當任務(wù)處理一條數據時(shí),它會(huì )自動(dòng)將狀態(tài)的訪(fǎng)問(wèn)范圍限定為當前數據的 key。因此,具有相同 key 的所有數據都會(huì )訪(fǎng)問(wèn)相同的狀態(tài)。Keyed State 很類(lèi)似于一個(gè)分布式的 key-value map 數據結構,只能用于 KeyedStream( keyBy 算子處理之后)。

Keyed State 示意圖

Keyed State 有五種類(lèi)型:

  • ValueState :值狀態(tài),保存單個(gè)類(lèi)型為 T 的值。
  • ListState :列表狀態(tài),保存一個(gè)類(lèi)型為 T 的列表。
  • MapState :映射狀態(tài),保存 Key-Value 對。
  • ReducingState :聚合狀態(tài)。
  • AggregatingState:聚合狀態(tài)。

Operator State

KeyedState 是在進(jìn)行 KeyBy 之后進(jìn)行狀態(tài)操作時(shí)使用的狀態(tài)類(lèi)型,那么像 Source、Sink算子是不會(huì )進(jìn)行 KeyBy 操作的,當這類(lèi)算子也需要用到狀態(tài),應該怎么操作呢?這時(shí)候就需要使用 Operator State(算子狀態(tài))Operator State 是綁定在 Operator 的并行度實(shí)例上的,也就是說(shuō)一個(gè)并行度一個(gè)狀態(tài)。

例如當消費 kafka 數據的 Kafka Source 并行度為 3 時(shí),默認每個(gè)并行度都是從一個(gè) Kafka 的 topic 的某個(gè)分區中消費數據,而每個(gè) kafka Source 為了保證在極端情況下也不丟失數據,就不能將 partition 對應的 offset 保存到默認的 zookeeper 中,而是需要將這些數據保存在狀態(tài)中,自己來(lái)維護這部分數據。當并行度發(fā)生調整時(shí),需要在 Operator 的并行度上重新分配狀態(tài)。

在流數據開(kāi)發(fā)的大多數場(chǎng)景中,我們都不需要使用 Operator State ,Operator State 的實(shí)現主要是針對一些沒(méi)有 Keyed 操作的 Source 和 Sink 而設計的

Operator State 的作用范圍限定為算子任務(wù)。這意味著(zhù)由同一并行任務(wù)所處理的所有數據都可以訪(fǎng)問(wèn)到相同的狀態(tài),狀態(tài)對于同一任務(wù)而言是共享的。算子狀態(tài)不能由相同或不同算子的另一個(gè)任務(wù)訪(fǎng)問(wèn)。

OperatorState示意圖

Flink 為算子狀態(tài)提供三種基本數據結構:

  • 列表狀態(tài)( List state ):狀態(tài)是一個(gè) 可序列化 對象的集合 List,彼此獨立,方便在改變并發(fā)后進(jìn)行狀態(tài)的重新分派。這些對象是重新分配 non-Keyed State 的最細粒度。根據狀態(tài)的不同訪(fǎng)問(wèn)方式,有如下兩種重新分配的模式:

    • Even-split redistribution: 每個(gè)算子都保存一個(gè)列表形式的狀態(tài)集合,整個(gè)狀態(tài)由所有的列表拼接而成。當作業(yè)恢復或重新分配的時(shí)候,整個(gè)狀態(tài)會(huì )按照算子的并發(fā)度進(jìn)行均勻分配。比如說(shuō),算子 A 的并發(fā)讀為 1,包含兩個(gè)元素 element1element2,當并發(fā)讀增加為 2 時(shí),element1 會(huì )被分到并發(fā) 0 上,element2 則會(huì )被分到并發(fā) 1 上。

      Even-split redistribution圖示

    • Union redistribution: 每個(gè)算子保存一個(gè)列表形式的狀態(tài)集合。整個(gè)狀態(tài)由所有的列表拼接而成。作業(yè)恢復或重新分配時(shí),每個(gè)算子都將獲得所有的狀態(tài)數據。Union redistribution 模式下 checkpoint metadata會(huì )存儲每個(gè)operator 的 subTask 的offset信息。如果List State的基數較大時(shí),不要使用這種方式的redistribution。因為容易引起OOM。

      Union redistribution圖示

    • 調用不同的獲取狀態(tài)對象的接口,會(huì )使用不同的狀態(tài)分配算法。比如 getUnionListState(descriptor) 會(huì )使用 union redistribution 算法, 而 getListState(descriptor) 則簡(jiǎn)單的使用 even-split redistribution 算法。

    • 當初始化好狀態(tài)對象后,我們通過(guò) isRestored() 方法判斷是否從之前的故障中恢復回來(lái),如果該方法返回 true 則表示從故障中進(jìn)行恢復,會(huì )執行接下來(lái)的恢復邏輯。

  • 廣播狀態(tài)( Broadcast state ):如果一個(gè)算子有多項任務(wù),而它的每項任務(wù)狀態(tài)又都相同,那么這種特殊情況最適合應用廣播狀態(tài)。

狀態(tài)后端和checkpoint

  • 狀態(tài)后端是保存到本地的狀態(tài)。
  • checkpoint是將狀態(tài)定時(shí)備份到第三方存儲,比如hdfs,obs上面,方便在作業(yè)重新運行的時(shí)候恢復數據。

pic

狀態(tài)后端相關(guān)配置

配置名稱(chēng) 默認值 說(shuō)明
state.backend - 建議配置為rocksdb
state.backend.latency-track.keyed-state-enabled false 是否跟蹤keyed state operations的延時(shí),建議不要開(kāi)啟
state.backend.latency-track.sample-interval 100 跟蹤耗時(shí)超過(guò)100ms的operations
state.backend.latency-track.history-size 128 跟蹤耗時(shí)較高operation的個(gè)數
table.exec.state.ttl - 狀態(tài)后端ttl時(shí)間,一般用于join場(chǎng)景下,防止狀態(tài)后端過(guò)大導致作業(yè)失敗

checkpoint 相關(guān)配置

配置名稱(chēng) 默認值 說(shuō)明
execution.checkpointing.interval - checkpoint的觸發(fā)的時(shí)間,每個(gè)一段時(shí)間都會(huì )觸發(fā)checkpoint。建議一般配置為1-10min左右
execution.checkpointing.mode EXACTLY_ONCE EXACTLY_ONCE:保證精確一次; AT_LEAST_ONCE:至少一次。建議EXACTLY_ONCE
state.backend.incremental false 是否開(kāi)啟增量checkpoint,建議開(kāi)啟
execution.checkpointing.timeout 10min checkpoint的超時(shí)時(shí)間,建議設置長(cháng)一點(diǎn),30min左右
execution.checkpointing.unaligned.enabled false 是否啟用非對齊checkpoint,建議不開(kāi)啟
execution.checkpointing.unaligned.forced false 是否強制開(kāi)啟非對齊checkpoint
execution.checkpointing.max-concurrent-checkpoints 1 同時(shí)進(jìn)行checkpoint的最大次數
execution.checkpointing.min-pause 0 兩個(gè)checkpoint之間的最小停頓時(shí)間
execution.checkpointing.tolerable-failed-checkpoints - 可容忍的checkpoint的連續故障數目
execution.checkpointing.aligned-checkpoint-timeout 0 對齊checkpoint超時(shí)時(shí)間
execution.checkpointing.alignment-timeout 0 參考:execution.checkpointing.aligned-checkpoint-timeout (已經(jīng)過(guò)期)
execution.checkpointing.force false 是否強制檢查點(diǎn)(已經(jīng)過(guò)期)
state.checkpoints.num-retained 1 checkpoint 保存個(gè)數
state.backend.async true 是否開(kāi)啟異步checkpoint (已經(jīng)過(guò)期)
state.savepoints.dir - savepoints存儲文件夾
state.checkpoints.dir - checkpoint存儲文件夾
state.storage.fs.memory-threshold 20kb 狀態(tài)文件的最小大小
state.storage.fs.write-buffer-size 4 * 1024 寫(xiě)入文件系統的檢查點(diǎn)流的寫(xiě)入緩沖區的默認大小。

RocksDb相關(guān)配置

配置項名稱(chēng) 默認值 說(shuō)明
state.backend.rocksdb.checkpoint.transfer.thread.num 4 用于上傳和下載文件的線(xiàn)程數目
state.backend.rocksdb.write-batch-size 2mb Rocksdb寫(xiě)入時(shí)消耗的最大內存
state.backend.rocksdb.predefined-options DEFAULT DEFAULT:所有的RocksDb配置都是默認值。 SPINNING_DISK_OPTIMIZED:在寫(xiě)硬盤(pán)的時(shí)候優(yōu)化RocksDb參數 SPINNING_DISK_OPTIMIZED_HIGH_MEM: 在寫(xiě)入常規硬盤(pán)時(shí)優(yōu)化參數,需要消耗更多的內存 FLASH_SSD_OPTIMIZED:在寫(xiě)入ssd閃盤(pán)時(shí)進(jìn)行優(yōu)化。

狀態(tài)后端實(shí)現

StateBackend實(shí)現類(lèi)圖,在1.17版本中,部分狀態(tài)后端已經(jīng)過(guò)期,比如:MemoryStateBackend、RocksDBStateBackend、FsStateBackend等。

pic

去除掉已經(jīng)過(guò)期的狀態(tài)后端剩余的如下所示:

pic

HashMapStateBackend

在TaskManager的內存當中保存作業(yè)的狀態(tài)后端信息,如果一個(gè)TaskManager并行執行多個(gè)任務(wù)時(shí),所有的聚合信息都要保存到當前的TaskManager內存里面。數據主要以Java對象的方式保存在堆內存當中。Key/value 形式的狀態(tài)和窗口算子會(huì )持有一個(gè) hash table,其中存儲著(zhù)狀態(tài)值、觸發(fā)器。

內存當中存儲格式定義如下:

/** So that we can give out state when the user uses the same key. */
private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;

適用場(chǎng)景

  • 有較大 state,較長(cháng) window 和較大 key/value 狀態(tài)的 Job。
  • 所有的高可用場(chǎng)景。

建議同時(shí)將 managed memory 設為0,以保證將最大限度的內存分配給 JVM 上的用戶(hù)代碼。

EmbeddedRocksDBStateBackend

將正在于行的作業(yè)的狀態(tài)保存到RocksDb里面。

創(chuàng )建KeyedStateBackend

  1. 加載RocksDB JNI library相關(guān)Jar包。
  2. 申請RocksDB所需要的內存。核心代碼在SharedResources類(lèi)當中的getOrAllocateSharedResource函數。在申請資源之前會(huì )先加鎖,在加鎖成功會(huì )申請所需要的資源。加鎖代碼如下:
try {
   lock.lockInterruptibly();
} catch (InterruptedException e) {
   Thread.currentThread().interrupt();
   throw new MemoryAllocationException("Interrupted while acquiring memory");
}

在申請資源之前需要根據類(lèi)型判斷是否已經(jīng)申請了資源,如果已經(jīng)申請了資源就不會(huì )重新申請,沒(méi)有則需要申請。申請的代碼如下所示:

private static <T extends AutoCloseable> LeasedResource<T> createResource(
            LongFunctionWithException<T, Exception> initializer, long size) throws Exception {

   final T resource = initializer.apply(size);
   return new LeasedResource<>(resource, size);
}
  1. 創(chuàng )建resourceContainer,包含預先定義好的RocksDB優(yōu)化選項等。
private RocksDBResourceContainer createOptionsAndResourceContainer(
            @Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources,
            @Nullable File instanceBasePath,
            boolean enableStatistics) {

return new RocksDBResourceContainer(
           configurableOptions != null ? configurableOptions : new Configuration(),
           predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
           rocksDbOptionsFactory,
           sharedResources,
           instanceBasePath,
           enableStatistics);
  1. 初始化RocksDBKeyedStateBackend,會(huì )從目錄里面加載數據到RocksDB里面。
restoreOperation =
       getRocksDBRestoreOperation(
               keyGroupPrefixBytes,
               cancelStreamRegistry,
               kvStateInformation,
               registeredPQStates,
               ttlCompactFiltersManager);
RocksDBRestoreResult restoreResult = restoreOperation.restore();
db = restoreResult.getDb();
defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();

restoreOperation實(shí)現類(lèi)圖如下所示,主要包含如下的實(shí)現類(lèi)。

pic

RocksDBIncrementalRestoreOperation

主要實(shí)現從增量快照中恢復RocksDB數據。核心函數為restore()。主要區分為:

  • restoreWithRescaling:從多個(gè)增量的狀態(tài)后端恢復,需要進(jìn)行擴縮容。在這個(gè)過(guò)程中會(huì )創(chuàng )建一個(gè)臨時(shí)的RocksDB實(shí)例用于關(guān)key-groups。臨時(shí)RocksDB當中的數據在都會(huì )復制到實(shí)際使用的RocksDB的實(shí)例當中。
  • restoreWithoutRescaling:從單個(gè)遠程的增量狀態(tài)后端恢復,無(wú)需進(jìn)行擴縮容。
if (isRescaling) {
   restoreWithRescaling(restoreStateHandles);
} else {
   restoreWithoutRescaling(theFirstStateHandle);
}

restoreWithRescaling 實(shí)現原理

  1. 選擇最優(yōu)的KeyedStateHandle。
  2. 初始化RocksDB實(shí)例。
  3. 將key-groups從臨時(shí)RocksDB轉換到Base RocksDB數據庫。

restoreWithoutRescaling 實(shí)現原理

RocksDBFullRestoreOperation

RocksDBHeapTimersFullRestoreOperation

RocksDBNoneRestoreOperation

ChangelogStateBackend

DeactivatedChangelogStateBackend



標 題:《【Flink】【更新中】狀態(tài)后端和checkpoint
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受