<input id="0qass"><u id="0qass"></u></input>
  • <input id="0qass"><u id="0qass"></u></input>
  • <menu id="0qass"><u id="0qass"></u></menu>

    中通消息平臺 Kafka 順序消費線程模型的實踐與優化

    各類消息中間件對順序消息實現的做法是將具有順序性的一類消息發往相同的主題分區中,只需要將這類消息設置相同的 Key 即可,而 Kafka 會在任意時刻保證一個消費組同時只能有一個消費者監聽消費,因此可在消費時按分區進行順序消費,保證每個分區的消息具備局部順序性。由于需要確保分區消息的順序性,并不能并發地消費消費,對消費的吞吐量會造成一定的影響。那么,如何在保證消息順序性的前提下,最大限度的提高消費者的消費能力?

    本文將會對 Kafka 消費者拉取消息流程進行深度分析之后,對 Kafka 消費者順序消費線程模型進行一次實踐與優化。

    Kafka 消費者拉取消息流程分析

    在講實現 Kafka 順序消費線程模型之前,我們需要先深入分析 Kafka 消費者的消息拉取機制,只有當你對 Kafka 消費者拉取消息的整個流程有深入的了解之后,你才能夠很好地理解本次線程模型改造的方案。

    我先給大家模擬一下消息拉取的實際現象,這里 max.poll.records = 500。

    1、消息沒有堆積時:

    可以發現,在消息沒有堆積時,消費者拉取時,如果某個分區沒有的消息不足 500 條,會從其他分區湊夠 500 條后再返回。

    2、多個分區都有堆積時:

    在消息有堆積時,可以發現每次返回的都是同一個分區的消息,但經過不斷 debug,消費者在拉取過程中并不是等某個分區消費完沒有堆積了,再拉取下一個分區的消息,而是不斷循環的拉取各個分區的消息,但是這個循環并不是說分區 p0 拉取完 500 條,后面一定會拉取分區 p1 的消息,很有可能后面還會拉取 p0 分區的消息,為了弄明白這種現象,我仔細閱讀了相關源碼。

    org.apache.kafka.clients.consumer.KafkaConsumer#poll

    private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
      try {
        // poll for new data until the timeout expires
        do {
          // 客戶端拉取消息核心邏輯
          final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
          if (!records.isEmpty()) {
            //  在返回數據之前, 發送下次的 fetch 請求, 避免用戶在下次獲取數據時線程阻塞
            if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
              // 調用 ConsumerNetworkClient#poll 方法將 FetchRequest 發送出去。
              client.pollNoWakeup();
            }
            return this.interceptors.onConsume(new ConsumerRecords<>(records));
          }
        } while (timer.notExpired());
        return ConsumerRecords.empty();
      } finally {
        release();
      }
    }
    

    我們使用 Kafka consumer 進行消費的時候通常會給一個時間,比如:

    consumer.poll(Duration.ofMillis(3000));
    

    從以上代碼邏輯可以看出來,用戶給定的這個時間,目的是為了等待消息湊夠 max.poll.records 條消息后再返回,即使消息條數不夠 max.poll.records 消息,時間到了用戶給定的等待時間后,也會返回。

    pollForFetches 方法是客戶端拉取消息核心邏輯,但并不是真正去 broker 中拉取,而是從緩存中去獲取消息。在 pollForFetches 拉取消息后,如果消息不為零,還會調用 fetcher.sendFetches() 與 client.pollNoWakeup(),調用這兩個方法究竟有什么用呢?

    fetcher.sendFetches() 經過源碼閱讀后,得知該方法目的是為了構建拉取請求 FetchRequest 并進行發送,但是這里的發送并不是真正的發送,而是將 FetchRequest 請求對象存放在 unsend 緩存當中,然后會在 ConsumerNetworkClient#poll 方法調用時才會被真正地執行發送。

    fetcher.sendFetches() 在構建 FetchRequest 前,會對當前可拉取分區進行篩選,而這個也是決定多分區拉取消息規律的核心,后面我會講到。

    從 KafkaConsumer#poll 方法源碼可以看出來,其實 Kafka 消費者在拉取消息過程中,有兩條線程在工作,其中用戶主線程調用 pollForFetches 方法從緩存中獲取消息消費,在獲取消息后,會再調用 ConsumerNetworkClient#poll 方法從 Broker 發送拉取請求,然后將拉取到的消息緩存到本地,這里為什么在拉取完消息后,會主動調用 ConsumerNetworkClient#poll 方法呢?我想這里的目的是為了下次 poll 的時候可以立即從緩存中拉取消息。

    pollForFetches 方法會調用 Fetcher#fetchedRecords 方法從緩存中獲取并解析消息:

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
      Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
      int recordsRemaining = maxPollRecords;
      try {
        while (recordsRemaining > 0) {
          // 如果當前獲取消息的 PartitionRecords 為空,或者已經拉取完畢
          // 則需要從 completedFetches 重新獲取 completedFetch 并解析成 PartitionRecords
          if (nextInLineRecords == null || nextInLineRecords.isFetched) {
            // 如果上一個分區緩存中的數據已經拉取完了,直接中斷本次循環拉取,并返回空的消息列表
            // 直至有緩存數據為止
            CompletedFetch completedFetch = completedFetches.peek();
            if (completedFetch == null) break;
            try {
              // CompletedFetch 即拉取消息的本地緩存數據
              // 緩存數據中 CompletedFetch 解析成 PartitionRecords
              nextInLineRecords = parseCompletedFetch(completedFetch);
            } catch (Exception e) {
              // ...
            }
            completedFetches.poll();
          } else {
            // 從分區緩存中獲取指定條數的消息
            List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
            // ...
            fetched.put(partition, records);
            recordsRemaining -= records.size();
          }
        }
      }
    } catch (KafkaException e) {
      // ...
    }
    return fetched;
    }
    

    completedFetches 是拉取到的消息緩存,以上代碼邏輯就是圍繞著如何從 completedFetches 緩存中獲取消息的,從以上代碼邏輯可以看出:

    maxPollRecords 為本次拉取的最大消息數量,該值可通過 max.poll.records 參數配置,默認為 500 條,該方法每次從 completedFetches 中取出一個 CompletedFetch 并解析成可以拉取的 PartitionRecords 對象,即方法中的 nextInLineRecords,請注意,PartitionRecords 中的消息數量可能大與 500 條,因此可能本次可能一次性從 PartitionRecords 獲取 500 條消息后即返回,如果 PartitionRecords 中消息數量不足 500 條,會從 completedFetches 緩存中取出下一個要拉取的分區消息,recordsRemaining 會記錄本次剩余還有多少消息沒拉取,通過循環不斷地從 completedFetches 緩存中取消息,直至 recordsRemaining 為 0。

    以上代碼即可解釋為什么消息有堆積的情況下,每次拉取的消息很大概率是同一個分區的消息,因為緩存 CompletedFetch 緩存中的消息很大概率會多余每次拉取消息數量,Kafka 客戶端每次從 Broker 拉取的消息數據并不是通過 max.poll.records 決定的,該參數僅決定用戶每次從本地緩存中獲取多少條數據,真正決定從 Broker 拉取的消息數據量是通過 fetch.min.bytes、max.partition.fetch.bytes、fetch.max.bytes 等參數決定的。

    我們再想一下,假設某個分區的消息一直都處于堆積狀態,Kafka 會每次都拉取這個分區直至將該分區消費完畢嗎?(根據假設,Kafka 消費者每次都會從這個分區拉取消息,并將消息存到分區關聯的 CompletedFetch 緩存中,根據以上代碼邏輯,nextInLineRecords 一直處于還沒拉取完的狀態,導致每次拉取都會從該分區中拉取消息。)

    答案顯然不會,不信你打開 Kafka-manager 觀察每個分區的消費進度情況,每個分區都會有消費者在消費中。

    那 Kafka 消費者是如何循環地拉取它監聽的分區呢?我們接著往下分析。

    發送拉取請求邏輯:

    org.apache.kafka.clients.consumer.internals.Fetcher#sendFetches

    public synchronized int sendFetches() {
      // 解析本次可拉取的分區
      Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
      for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        // 構建請求對象
        final FetchRequest.Builder request = FetchRequest.Builder
          .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
          .isolationLevel(isolationLevel)
          .setMaxBytes(this.maxBytes)
          .metadata(data.metadata())
          .toForget(data.toForget());
        // 發送請求,但不是真的發送,而是將請求保存在 unsent 中
        client.send(fetchTarget, request)
          .addListener(new RequestFutureListener<ClientResponse>() {
            @Override
            public void onSuccess(ClientResponse resp) {
              synchronized (Fetcher.this) {
    
                // ... ...
    
                // 創建 CompletedFetch, 并緩存到 completedFetches 隊列中
                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                                        resp.requestHeader().apiVersion()));
              }
    
            }
          }
                       // ... ...
                       });
      }
      return fetchRequestMap.size();
    }
    

    以上代碼邏輯很好理解,在發送拉取請求前,先檢查哪些分區可拉取,接著為每個分區構建一個 FetchRequest 對象,FetchRequest 中的 minBytes 和 maxBytes,分別可通過 fetch.min.bytes 和 fetch.max.bytes 參數設置。這也是每次從 Broker 中拉取的消息不一定等于 max.poll.records 的原因。

    prepareFetchRequests 方法會調用 Fetcher#fetchablePartitions 篩選可拉取的分區,我們來看下 Kafka 消費者是如何進行篩選的:

    org.apache.kafka.clients.consumer.internals.Fetcher#fetchablePartitions

    private List<TopicPartition> fetchablePartitions() {
      Set<TopicPartition> exclude = new HashSet<>();
      List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
      if (nextInLineRecords != null && !nextInLineRecords.isFetched) {
        exclude.add(nextInLineRecords.partition);
      }
      for (CompletedFetch completedFetch : completedFetches) {
        exclude.add(completedFetch.partition);
      }
      fetchable.removeAll(exclude);
      return fetchable;
    }
    

    nextInLineRecords 即我們上面提到的根據某個分區緩存 CompletedFetch 解析得到的,如果 nextInLineRecords 中的緩存還沒拉取完,則不從 broker 中拉取消息了,以及如果此時 completedFetches 緩存中存在該分區的緩存,也不進行拉取消息。

    我們可以很清楚的得出結論:

    當緩存中還存在中還存在某個分區的消息數據時,消費者不會繼續對該分區進行拉取請求,直到該分區的本地緩存被消費完,才會繼續發送拉取請求。

    為了更加清晰的表達這段邏輯,我舉個例子并將整個流程用圖表達出來:

    假設某消費者監聽三個分區,每個分區每次從 Broker 中拉取 4 條消息,用戶每次從本地緩存中獲取 2 條消息:

    從以上流程可看出,Kafka 消費者自身已經實現了拉取限流的機制。

    Kafka 順序消費線程模型的實現

    kafka 的消費類 KafkaConsumer 是非線程安全的,因此用戶無法在多線程中共享一個 KafkaConsumer 實例,且 KafkaConsumer 本身并沒有實現多線程消費邏輯,如需多線程消費,還需要用戶自行實現,在這里我會講到 Kafka 兩種多線程消費模型:

    1、每個線程維護一個 KafkaConsumer

    這種消費模型創建多個 KafkaConsumer 對象,每個線程維護一個 KafkaConsumer,從而實現線程隔離消費,由于每個分區同一時刻只能有一個消費者消費,所以這種消費模型天然支持順序消費。

    但是缺點是無法提升單個分區的消費能力,如果一個主題分區數量很多,只能通過增加 KafkaConsumer 實例提高消費能力,這樣一來線程數量過多,導致項目 Socket 連接開銷巨大,項目中一般不用該線程模型去消費。

    2、單 KafkaConsumer 實例 + 多 worker 線程

    這種消費模型獎 KafkaConsumer 實例與消息消費邏輯解耦,我們不需要創建多個 KafkaConsumer 實例就可進行多線程消費,還可根據消費的負載情況動態調整 worker 線程,具有很強的獨立擴展性,在公司內部使用的多線程消費模型就是用的單 KafkaConsumer 實例 + 多 worker 線程模型。但是通常情況下,這種消費模型無法保證消費的順序性。

    那么,如果在使用第二種消費模型的前提下,實現消息順序消費呢?

    接下來我們來看下 ZMS 是怎么實現順序消費線程模型的,目前 ZMS 的順序消費線程模型為每個分區單線程消費模式:

    com.zto.consumer.KafkaConsumerProxy#addUserDefinedProperties

    首先在初始化的時候,會對消費線程池進行初始化,具體是根據 threadsNumMax 的數量創建若干個單個線程的線程池,單個線程的線程池就是為了保證每個分區取模后拿到線程池是串行消費的,但這里創建 threadsNumMax 個線程池是不合理的,后面我會說到。

    com.zto.consumer.KafkaConsumerProxy#submitRecords

    ZMS 會對消息分區進行取模,根據取模后的序號從線程池列表緩存中獲取一個線程池,從而使得相同分區的消息會被分配到相同線程池中執行,對于順序消費來說至關重要,前面我也說了,當用戶配置了順序消費時,每個線程池只會分配一個線程,如果相同分區的消息分配到同一個線程池中執行,也就意味著相同分區的消息會串行執行,實現消息消費的順序性。

    為了保證手動提交位移的正確性,我們必須保證本次拉取的消息消費完之后才會進行位移提交,因此 ZMS 在消費前會創建一個 count 為本次消息數量的 CountDownLatch:

    final CountDownLatch countDownLatch = new CountDownLatch(records.count());
    

    消費邏輯中,在 finally 進行 countDown 操作,最后會在本次消費主線程當中阻塞等待本次消息消費完成:

    com.zto.consumer.KafkaConsumerProxy#submitRecords

    以上就是目前 ZMS 順序消費的線程模型,用圖表示以上代碼邏輯:

    以上,由于某些分區的消息堆積量少于 500 條(Kafka 默認每次從 Broker 拉取 500 條消息),因此會繼續從其它分區湊夠 500 條消息,此時拉取的 500 條消息會包含 3 個分區的消息,ZMS 根據利用分區取模將同一個分區的消息放到指定的線程池中(線程池只有一條線程)進行消費,以上圖來看,總共有 3 條線程在消費本次拉取的 500 條消息。

    那如果每個分區的積壓都超過了 500 條消息呢?這種實際的情況會更加多,因為消息中間件其中一個重要功能就是用于流量削峰,流量洪峰那段時間積壓幾百上千萬條消息還是經常能夠遇到的,那么此時每次拉取的消息中,很大概率就只剩下一個分區了,我用如下圖表示:

    在消息流量大的時候,順序消息消費時卻退化成單線程消費了。

    如何提高 Kafka 順序消費的并發度?

    經過對 ZMS 的消費線程模型以及對 Kafka 消費者拉取消息流程的深入了解之后,我想到了如下幾個方面對 ZMS 的消費線程模型進行優化:

    1、細化消息順序粒度

    之前的做法是將每個分區單獨一條線程消費,無法再繼續在分區之上增加消費能力,我們知道業務方發送順序消息時,會將同一類型具有順序性的消息給一個相同的 Key,以保證這類消息發送到同一個分區進行消費,從而達到消息順序消費的目的,而同一個分區會接收多種類型(即不同 Key)的消息,每次拉取的消息具有很大可能是不同類型的,那么我們就可以將同一個分區的消息,分配一個獨立的線程池,再利用消息 Key 進行取模放入對應的線程中消費,達到并發消費的目的,且不打亂消息的順序性。

    2、細化位移提交粒度

    由于 ZMS 目前是手動提交位移,目前每次拉取消息必須先消費完才能進行位移提交,既然已經對分區消息進行指定的線程池消費了,由于分區之間的位移先后提交不影響,那么我們可以將位移提交交給每個分區進行管理,這樣拉取主線程不必等到是否消費完才進行下一輪的消息拉取。

    3、異步拉取與限流

    異步拉取有個問題,就是如果節點消費跟不上,而拉取消息過多地保存在本地,很可能會造成內存溢出,因此我們需要對消息拉取進行限流,當本地消息緩存量達到一定量時,阻止消息拉取。

    上面在分析 Kafka 消費者拉取消息流程時,我們知道消費者在發送拉取請求時,首先會判斷本地緩存中是否存在該分區的緩存,如果存在,則不發送拉取請求,但由于 ZMS 需要改造成異步拉取的形式,由于 Comsumer#poll 不再等待消息消費完再進行下一輪拉取,因此 Kafka 的本地緩存中幾乎不會存在數據了,導致 Kafka 每次都會發送拉取請求,相當于將 Kafka 的本地緩存放到 ZMS 中,因此我們需要 ZMS 層面上對消息拉取進行限流,Kafka 消費者有兩個方法可以設置訂閱的分區是否可以發送拉取請求:

    // 暫停分區消費(即暫停該分區發送拉取消息請求)
    org.apache.kafka.clients.consumer.KafkaConsumer#pause
    // 恢復分區消費(即恢復該分區發送拉取消息請求)
    org.apache.kafka.clients.consumer.KafkaConsumer#resume
    

    以上兩個方法,其實就是改變了消費者的訂閱分區的狀態值 paused,當 paused = true 時,暫停分區消費,當 paused = false 時,恢復分區消費,這個參數是在哪里使用到呢?上面在分析 Kafka 消費者拉取消息流程時我們有提到發送拉取請求之前,會對可拉取的分區進行篩選,其中一個條件即分區 paused = false:

    org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#isFetchable

    private boolean isFetchable() {
      return !paused && hasValidPosition();
    }
    

    由于 KafkaConsumer 是非線程安全的,如果我們在異步線程 KafkaConsumer 相關的類,會報如下錯誤:

    KafkaConsumer is not safe for multi-threaded access
    

    只需要確保 KafkaConsumer 相關方法在 KafkaConsumer#poll 方法線程中調用即可,具體做法可以設置一個線程安全上下文容器,異步線程操作 KafkaConsumer 相關方法是,只需要將具體的分區放到上下文容器即可,后續統一由 poll 線程執行。

    因此我們只需要利用好這個特性,就可以實現拉取限流,消費者主線程的 Comsumer#poll 方法依然是異步不斷地從緩存中獲取消息,同時不會造成兩次 poll 之間的時間過大導致消費者被踢出消費組。

    以上優化改造的核心是在不打亂消息順序的前提下利用消息 Key 盡可能地并發消費,但如果遇到分區中的消息都是相同 Key,并且在有一定的積壓下每次拉取都是同一個分區的消息時,以上模型可能沒有理想情況下的那么好。這時是否可以將 fetch.max.bytes 與 max.partition.fetch.bytes 參數設置小一點,讓每個分區的本地緩存都不足 500 條,這樣每次 poll 的消息列表都可以包含多個分區的消息了,但這樣又會導致 RPC 請求增多,這就需要針對業務消息大小,對這些參數進行調優。

    以上線程模型,需要增加一個參數 orderlyConsumePartitionParallelism,用于設置分區消費并行度,假設某個消費組被分配 5 個分區進行消費,則每個分區默認啟動一條線程消費,一共 5 * 1 = 5 條消費線程,當 orderlyConsumePartitionParallelism = 3,則每個分區啟動 3 條線程消費,一共 5 * 3 = 15 條消費線程。orderlyConsumePartitionParallelism = 1 時,則說明該分區所有消息都處在順序(串行)消費;當 orderlyConsumePartitionParallelism > 1 時,則根據分區消息的 Key 進行取模分配線程消費,保證不了整個分區順序消費,但保證相同 Key 的消息順序消費。

    注意,當 orderlyConsumePartitionParallelism > 1 時,分區消費線程的有效使用率取決于該分區消息的 Key:

    1、如果該分區所有消息的 Key 都相同,則消費的 Key 取模都分配都同一條線程當中,并行度退化成 orderlyConsumePartitionParallelism = 1;

    2、如果該分區相同 Key 的消息過于集中,會導致每次拉取都是相同 key 的一批消息,同樣并行度退化成 orderlyConsumePartitionParallelism = 1。

    綜合對比:

    優化前,ZMS 可保證整個分區消息的順序性,優化后可根據消息 Key 在分區的基礎上不打亂相同 Key 消息的順序性前提下進行并發消費,有效地提升了單分區的消費吞吐量;優化前,有很大的概率會退化成同一時刻單線程消費,優化后盡可能至少保證每個分區一條線程消費,情況好的時候每個分區可多條線程消費。

    通過以上場景分析,該優化方案不是提高順序消費吞吐量的銀彈,它有很大的局限性,用戶在業務的實現上不能重度依賴順序消費去實現,以免影響業務性能上的需求。

    總結

    通過本文深度分析,我們已經認識到順序消息會給消費吞吐量帶來怎么樣的影響,因此用戶在業務的實現上不能重度依賴順序消費去實現,能避免則避免,如果一定要使用到順序消費,需要知道 Kafka 并不能保證嚴格的順序消費,在消費組重平衡過程中很可能就會將消息的順序性打亂,而且順序消費會影響消費吞吐量,用戶需要權衡這種需求的利弊。

    寫在最后

    我們知道 RocketMQ 本身已經實現了具體的消費線程模型,用戶不需要關心具體實現,只需要實現消息消費邏輯即可,而 Kafka 消息者僅提供 KafkaConsumer#poll 一個方法,消費線程模型的實現則完全交由用戶去實現。

    中通科技正式開源內部的消息 Pass 云平臺化產品 ZMS(ZTO Messaging Service),它可以屏蔽底層消息中間件類型,封裝了包括 Kafka 消費線模型在內的具體實現,提供統一的對外 API,彌補了 Kafka 消費者這部分支持的不足。同時還提供了通過唯一標識動態路由消息,為開發運維人員提供自動化部署運維集群,主題、消費組申請與審批、實時監控、自動告警、容災遷移等功能。

    同時希望更多的開源愛好者加入到該項目中,共同打造一體化的智能消息運維平臺。

    ZMS GitHub 地址:https://github.com/ZTO-Express/zms

    作者簡介

    作者張乘輝,擅長消息中間件技能,負責公司百萬 TPS 級別 Kafka 集群的維護,作者維護的公號「后端進階」不定期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰總結以及細節上的源碼分析;同時作者也是阿里開源分布式事務框架 Seata Contributor,因此也會分享關于 Seata 的相關知識;當然公號也會分享 WEB 相關知識比如 Spring 全家桶等。內容不一定面面俱到,但一定讓你感受到作者對于技術的追求是認真的!

    公眾號:后端進階

    技術博客:https://objcoding.com/

    GitHub:https://github.com/objcoding/

    公眾號「后端進階」,專注后端技術分享!

    后端進階 CSDN認證博客專家 Java kafka RocketMQ
    作者張乘輝,擅長消息中間件技能,負責公司百萬 TPS 級別 Kafka 集群的維護,微信公眾號「后端進階」不定期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰總結以及細節上的源碼分析;同時作者也是阿里開源分布式事務框架 Seata Contributor,因此也會不定期分享關于 Seata 的相關知識;當然公號也會不定期發表 WEB 相關知識比如 Spring 全家桶等。不一定面面俱到,但一定讓你感受到作者對于技術的追求是認真的!
    已標記關鍵詞 清除標記
    ??2020 CSDN 皮膚主題: 代碼科技 設計師:Amelia_0503 返回首頁
    多乐彩