无码人妻精品一区二区三18禁,影音先锋男人AV橹橹色,污污污污污污www网站免费,日韩成人av无码一区二区三区,欧美性受xxxx狂喷水

時序數據流經過Kafka隊列時可能產生的亂序原因和解決方法

Tao Liu

2019-10-08 / ,

Kafka作為一個流行的消息隊列,以分布式高性能,高可靠性等特點已經在多種場景下廣泛使用。在工業互聯網、物聯網時序數據存儲的解決方案中也有大量用到。

但在實際部署過程中,可能會因為配置原因導致經過Kafka的數據在接收方產生亂序,給后續處理環節帶來排序等工作,造成不必要的處理開銷,降低系統的處理性能和額外排序的工作。

其實可以通過合理的規劃設計Kafka的配置和方法來避免消息在通過Kafka后亂序的產生,只需要遵循以下原則即可:

對于需要確保順序的一條消息流,發送到同一個partition上去

Kafka可以在一個topic下設置多個partition來實現分布式和負載均衡,由同一consumer group下的不同consumer去消費;這樣的機制能夠支持多線程分布式的處理,帶來高性能,但也帶來了同一消息流走了不同路徑的可能性,如果沒有針對性的規劃,從架構上就無法保證消息的順序。如下圖所示,對于同一個topic的一條消息流,寫入不同的partition,就會產生多條路徑。

時序數據流經過Kafka隊列時可能產生的亂序原因和解決方法 - TDengine Database 時序數據庫

為了確保一條消息流的數據能夠嚴格按照時間順序被消費,則必須遵循一條路徑的原則,這樣才能實現FIFO(First In First Out)。

根據Kafka的文檔描述,把哪條記錄發到哪個partition,是由producer負責:

Producers

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!

可見,Kafka已考慮到了確保消息順序的需求,提供了接口來實現根據指定的key值發送到同一partition的方法。 可以看看Kafka相關源碼:

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
  private val random = new java.util.Random

  def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
  }
}

從源碼上來看,Kafka支持通過Key的hash值對partition的數量求余來實現基于Key的分配partition方法。因此我們只要對不同時序消息流,找到他們不同的key,并且這個key是不會發生變化的,那么就能在發送到Kafka的時候,確保每一條消息流發送到同一個partition,走唯一的路徑。因此我們可以通過指定Key的方式,來實現這種嚴格的時序關系。

具體實現方法

在TDengine Database的應用場景下,我們通常會把某一類設備(超級表)劃分為一個topic。對于每個設備,會單獨建表,一個設備產生的數據,會只放到一張表里。對于設備產生的原始數據,就需要在這個數據中找一個能夠代表這個數據的ID,而且不會發生變化的字段,作為Key值,在發送給Kafka時,帶上這個Key值。這樣就能確保該設備的所有數據流經過Kafka時,走唯一的路徑。這個ID或key往往是設備具有唯一性的設備編碼,這個編碼不僅可以作為Kafka的Key,也可以作為TDengine Database里的表名。

具體實現非常簡單,在producer發送數據時,選擇一個key,通過KeyedMessage方法生成消息,然后send。以Java為例,其他語言可以從Kafka文檔中找到相同功能的接口:

 producer.send(new KeyedMessage<String, String>(topic,key,record))

這個接口,可以讓使用者非常方便無需增加代碼的情況下來實現指定每個消息流綁定一個partition的結果。用戶也可以通過自己實現一個partition的算法,來實現更精準的partition分配控制。具體實現可以參考”“中的介紹。