資料內(nèi)容:
線上問題及優(yōu)化
1、消息丟失情況:
消息發(fā)送端:
(1)acks=0: 表示producer不需要等待任何broker確認(rèn)收到消息的回復(fù),就可以繼續(xù)發(fā)送下一條消息。性能最高,但是最容易丟消
息。大數(shù)據(jù)統(tǒng)計(jì)報(bào)表場(chǎng)景,對(duì)性能要求很高,對(duì)數(shù)據(jù)丟失不敏感的情況可以用這種。
(2)acks=1: 至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續(xù)發(fā)送下一條消
息。這種情況下,如果follower沒有成功備份數(shù)據(jù),而此時(shí)leader又掛掉,則消息會(huì)丟失。
(3)acks=-1或all: 這意味著leader需要等待所有備份(min.insync.replicas配置的備份個(gè)數(shù))都成功寫入日志,這種策略會(huì)保證只要有一
個(gè)備份存活就不會(huì)丟失數(shù)據(jù)。這是最強(qiáng)的數(shù)據(jù)保證。一般除非是金融級(jí)別,或跟錢打交道的場(chǎng)景才會(huì)使用這種配置。當(dāng)然如果
min.insync.replicas配置的是1則也可能丟消息,跟acks=1情況類似。
消息消費(fèi)端:
如果消費(fèi)這邊配置的是自動(dòng)提交,萬一消費(fèi)到數(shù)據(jù)還沒處理完,就自動(dòng)提交offset了,但是此時(shí)你consumer直接宕機(jī)了,未處理完的數(shù)據(jù)
丟失了,下次也消費(fèi)不到了。
2、消息重復(fù)消費(fèi)
消息發(fā)送端:
發(fā)送消息如果配置了重試機(jī)制,比如網(wǎng)絡(luò)抖動(dòng)時(shí)間過長(zhǎng)導(dǎo)致發(fā)送端發(fā)送超時(shí),實(shí)際broker可能已經(jīng)接收到消息,但發(fā)送方會(huì)重新發(fā)送消息
消息消費(fèi)端:
如果消費(fèi)這邊配置的是自動(dòng)提交,剛拉取了一批數(shù)據(jù)處理了一部分,但還沒來得及提交,服務(wù)掛了,下次重啟又會(huì)拉取相同的一批數(shù)據(jù)重
復(fù)處理
一般消費(fèi)端都是要做消費(fèi)冪等處理的。
3、消息亂序
如果發(fā)送端配置了重試機(jī)制,kafka不會(huì)等之前那條消息完全發(fā)送成功才去發(fā)送下一條消息,這樣可能會(huì)出現(xiàn),發(fā)送了1,2,3條消息,第
一條超時(shí)了,后面兩條發(fā)送成功,再重試發(fā)送第1條消息,這時(shí)消息在broker端的順序就是2,3,1了
所以,是否一定要配置重試要根據(jù)業(yè)務(wù)情況而定。也可以用同步發(fā)送的模式去發(fā)消息,當(dāng)然acks不能設(shè)置為0,這樣也能保證消息從發(fā)送
端到消費(fèi)端全鏈路有序。
kafka保證全鏈路消息順序消費(fèi),需要從發(fā)送端開始,將所有有序消息發(fā)送到同一個(gè)分區(qū),然后用一個(gè)消費(fèi)者去消費(fèi),但是這種性能比較
低,可以在消費(fèi)者端接收到消息后將需要保證順序消費(fèi)的幾條消費(fèi)發(fā)到內(nèi)存隊(duì)列(可以搞多個(gè)),一個(gè)內(nèi)存隊(duì)列開啟一個(gè)線程順序處理消
息。
4、消息積壓
1)線上有時(shí)因?yàn)榘l(fā)送方發(fā)送消息速度過快,或者消費(fèi)方處理消息過慢,可能會(huì)導(dǎo)致broker積壓大量未消費(fèi)消息。
此種情況如果積壓了上百萬未消費(fèi)消息需要緊急處理,可以修改消費(fèi)端程序,讓其將收到的消息快速轉(zhuǎn)發(fā)到其他topic(可以設(shè)置很多分
區(qū)),然后再啟動(dòng)多個(gè)消費(fèi)者同時(shí)消費(fèi)新主題的不同分區(qū)。
2)由于消息數(shù)據(jù)格式變動(dòng)或消費(fèi)者程序有bug,導(dǎo)致消費(fèi)者一直消費(fèi)不成功,也可能導(dǎo)致broker積壓大量未消費(fèi)消息。
此種情況可以將這些消費(fèi)不成功的消息轉(zhuǎn)發(fā)到其它隊(duì)列里去(類似死信隊(duì)列),后面再慢慢分析死信隊(duì)列里的消息處理問題。
5、延時(shí)隊(duì)列
延時(shí)隊(duì)列存儲(chǔ)的對(duì)象是延時(shí)消息。所謂的“延時(shí)消息”是指消息被發(fā)送以后,并不想讓消費(fèi)者立刻獲取,而是等待特定的時(shí)間后,消費(fèi)者
才能獲取這個(gè)消息進(jìn)行消費(fèi),延時(shí)隊(duì)列的使用場(chǎng)景有很多, 比如 :
1)在訂單系統(tǒng)中, 一個(gè)用戶下單之后通常有 30 分鐘的時(shí)間進(jìn)行支付,如果 30 分鐘之內(nèi)沒有支付成功,那么這個(gè)訂單將進(jìn)行異常處理,
這時(shí)就可以使用延時(shí)隊(duì)列來處理這些訂單了。
2)訂單完成1小時(shí)后通知用戶進(jìn)行評(píng)價(jià)。
實(shí)現(xiàn)思路:發(fā)送延時(shí)消息時(shí)先把消息按照不同的延遲時(shí)間段發(fā)送到指定的隊(duì)列中(topic_1s,topic_5s,topic_10s,...topic_2h,這個(gè)一
般不能支持任意時(shí)間段的延時(shí)),然后通過定時(shí)器進(jìn)行輪訓(xùn)消費(fèi)這些topic,查看消息是否到期,如果到期就把這個(gè)消息發(fā)送到具體業(yè)務(wù)處
理的topic中,隊(duì)列中消息越靠前的到期時(shí)間越早,具體來說就是定時(shí)器在一次消費(fèi)過程中,對(duì)消息的發(fā)送時(shí)間做判斷,看下是否延遲到對(duì)
應(yīng)時(shí)間了,如果到了就轉(zhuǎn)發(fā),如果還沒到這一次定時(shí)任務(wù)就可以提前結(jié)束了。
6、消息回溯
如果某段時(shí)間對(duì)已消費(fèi)消息計(jì)算的結(jié)果覺得有問題,可能是由于程序bug導(dǎo)致的計(jì)算錯(cuò)誤,當(dāng)程序bug修復(fù)后,這時(shí)可能需要對(duì)之前已消
費(fèi)的消息重新消費(fèi),可以指定從多久之前的消息回溯消費(fèi),這種可以用consumer的offsetsForTimes、seek等方法指定從某個(gè)offset偏移
的消息開始消費(fèi),參見上節(jié)課的內(nèi)容。
7、分區(qū)數(shù)越多吞吐量越高嗎
可以用kafka壓測(cè)工具自己測(cè)試分區(qū)數(shù)不同,各種情況下的吞吐量
1
# 往test里發(fā)送一百萬消息,每條設(shè)置1KB
2
# throughput 用來進(jìn)行限流控制,當(dāng)設(shè)定的值小于 0 時(shí)不限流,當(dāng)設(shè)定的值大于 0 時(shí),當(dāng)發(fā)送的吞吐量大于該值時(shí)就會(huì)被阻塞一段時(shí)間
3
bin/kafka‐producer‐perf‐test.sh ‐‐topic test ‐‐num‐records 1000000 ‐‐record‐size 1024 ‐‐throughput ‐1
4
‐‐producer‐props bootstrap.servers=192.168.65.60:9092 acks=1