日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受

Kafka API操作實(shí)踐

Producer API

消息發(fā)送流程

Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送的過(guò)程中,涉及到了兩個(gè)線(xiàn)程——main線(xiàn)程和Sender線(xiàn)程,以及一個(gè)線(xiàn)程共享變量——RecordAccumulator。main線(xiàn)程將消息發(fā)送給RecordAccumulator,Sender線(xiàn)程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker。

0af4aa5621a31cb07fce248512ca1f3a.png

batch.size:只有數據積累到batch.size之后,sender才會(huì )發(fā)送數據。
linger.ms:如果數據遲遲未達到batch.size,sender等待linger.time之后就會(huì )發(fā)送數據。

異步發(fā)送API

導入依賴(lài)

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1'

編寫(xiě)代碼 需要用到的類(lèi):
KafkaProducer:需要創(chuàng )建一個(gè)生產(chǎn)者對象,用來(lái)發(fā)送數據
ProducerConfig:獲取所需的一系列配置參數
ProducerRecord:每條數據都要封裝成一個(gè)ProducerRecord對象

不帶回調函數的API

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//kafka集群,broker-list
props.put("acks", "all");
props.put("retries", 1);//重試次數
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待時(shí)間
props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
       producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
 }
producer.close();

帶回調函數的API
回調函數會(huì )在producer收到ack時(shí)調用,為異步調用,該方法有兩個(gè)參數,分別是RecordMetadata和Exception,如果Exception為null,說(shuō)明消息發(fā)送成功,如果Exception不為null,說(shuō)明消息發(fā)送失敗。
注意:消息發(fā)送失敗會(huì )自動(dòng)重試,不需要我們在回調函數中手動(dòng)重試。

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時(shí)間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)), new Callback() {

                //回調函數,該方法會(huì )在Producer收到ack時(shí)調用,為異步調用
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("success->" + metadata.offset());
                    } else {
                        exception.printStackTrace();
                    }
                }
            });
        }
        producer.close();

同步發(fā)送API
同步發(fā)送的意思就是,一條消息發(fā)送之后,會(huì )阻塞當前線(xiàn)程,直至返回ack。
? 由于send方法返回的是一個(gè)Future對象,根據Futrue對象的特點(diǎn),我們也可以實(shí)現同步發(fā)送的效果,只需在調用Future對象的get方發(fā)即可。

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");//kafka集群,broker-list
        props.put("acks", "all");
        props.put("retries", 1);//重試次數
        props.put("batch.size", 16384);//批次大小
        props.put("linger.ms", 1);//等待時(shí)間
        props.put("buffer.memory", 33554432);//RecordAccumulator緩沖區大小
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();

Consumer API

Consumer消費數據時(shí)的可靠性是很容易保證的,因為數據在Kafka中是持久化的,故不用擔心數據丟失問(wèn)題。 由于consumer在消費過(guò)程中可能會(huì )出現斷電宕機等故障,consumer恢復后,需要從故障前的位置的繼續消費,所以consumer需要實(shí)時(shí)記錄自己消費到了哪個(gè)offset,以便故障恢復后繼續消費。 所以offset的維護是Consumer消費數據是必須考慮的問(wèn)題。

提交offset

導入依賴(lài)

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.4.1'

編寫(xiě)代碼 需要用到的類(lèi):
KafkaConsumer:需要創(chuàng )建一個(gè)消費者對象,用來(lái)消費數據
ConsumerConfig:獲取所需的一系列配置參數
ConsuemrRecord:每條數據都要封裝成一個(gè)ConsumerRecord對象

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");//消費者組,只要group.id相同,就屬于同一個(gè)消費者組
        props.put("enable.auto.commit", "false");//自動(dòng)提交offset:true 表示自動(dòng)提交
//        props.put("auto.commit.interval.ms", "1000"); 
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("first"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
            consumer.commitSync();
        }

代碼分析:
? 手動(dòng)提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點(diǎn)是,都會(huì )將本次poll的一批數據最高的偏移量提交;不同點(diǎn)是,commitSync會(huì )失敗重試,一直到提交成功(如果由于不可恢復原因導致,也會(huì )提交失?。?;而commitAsync則沒(méi)有失敗重試機制,故有可能提交失敗。

數據重復消費問(wèn)題

9d11860a0c4f515ecbb923d0246e0258.png

自動(dòng)提交offset

為了使我們能夠專(zhuān)注于自己的業(yè)務(wù)邏輯,Kafka提供了自動(dòng)提交offset的功能。
自動(dòng)提交offset的相關(guān)參數:
enable.auto.commit:是否開(kāi)啟自動(dòng)提交offset功能
auto.commit.interval.ms:自動(dòng)提交offset的時(shí)間間隔

測試

在kafka上啟動(dòng)消費者,然后運行客戶(hù)端java程序。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first



標 題:《Kafka API操作實(shí)踐
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

    評論
    0 評論
avatar

取消
日本乱偷中文字幕,美女脱内衣18禁免费看,亚洲国产精品丝袜在线观看,18女人腿打开无遮挡,廖承宇chinese野战做受