您的位置:首頁>正文

kafka 資料移轉實踐

更多騰訊海量技術文章, 請關注雲+社區:https://cloud.tencent.com/developer

作者:mikealzhou

本文重點介紹kafka的兩類常見資料移轉方式:1、broker內部不同資料盤之間的分區資料移轉;2、不同broker之間的分區資料移轉。

一、broker 內部不同資料盤之間進行分區資料移轉

1.1 背景介紹

最近, 騰訊雲的一個重要客戶發現kafka broker內部的topic分區資料存儲分佈不均勻, 導致部分磁片100%耗盡, 而部分磁片只有40%的消耗量。

分析原因, 發現存在部分topic的分區資料過於集中在某些磁片導致, 比如, 以下截圖顯示的/data5 資料盤。

根據分散式系統的特點, 很容易想到採取資料移轉的辦法, 對broker內部不同資料盤的分區資料進行遷移。 在進行線上集群資料移轉之前, 為了保證生產集群的資料完整和安全, 必須先在測試集群進行測試。

1.2 測試broker內部不同資料盤進行分區資料移轉

1.2.1 建立測試topic並驗證生產和消費正常

我們搭建的測試集群, Kafka 有三個broker, hostname分別為:tbds-172-16-16-11, tbds-172-16-16-12, tbds-172-16-16-16。 每個broker配置了兩塊資料盤,

緩存資料分別存儲在 /data/kafka-logs/ 和 /data1/kafka-logs/。

首先建立測試topic:

./kafka-topics.sh --create --zookeeper tbds-172-16-16-11:2181 --replication-factor 2 --partitions 3 --topic test_topic

然後向topic生產發送500條資料, 發送的時候也同時消費資料。 然後查看topic的分區資料情況:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERgroupid1 test_topic 0 172 172 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3groupid1 test_topic 1 156 156 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3groupid1 test_topic 2 172 172 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3

發現test_topic生產和消費資料都正常。

1.2.2 將分區資料在磁片間進行遷移

現在登錄tbds-172-16-16-12這台broker節點, 將test_topic的分區資料目錄 /data1/kafka-logs/test_topic-0/ 移動到 /data/kafka-logs/ :

mv /data1/kafka-logs/test_topic-0/ /data/kafka-logs/

查看 /data/kafka-logs/ 目錄下, 分區test_topic-0 的資料:

1.2.3 再次對測試topic生產和消費資料

再次發送500條資料, 同時消費資料。 然後查看資料情況:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERgroupid1 test_topic 0 337 337 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3groupid1 test_topic 1 304 304 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3groupid1 test_topic 2 359 359 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3

再次查看tbds-172-16-16-12 這個broker節點的/data/kafka-logs/test_topic-0/ 分區目錄下的資料:

發現, 從 /data1/kafka-logs/ 移動到 /data/kafka-logs/ 目錄下的分區資料目錄test_topic-0/(也就是編號為0的分區)緩存資料並沒有增加。

因為test_topic每個分區有2個replicas, 因此, 我找到編號為0的另外一個分區replica資料存儲在tbds-172-16-16-16這台broker節點。 登錄tbds-172-16-16-16這個broker節點, 打開編號為0的分區緩存資料目錄, 得到如下資訊:

發現, tbds-172-16-16-16這台broker節點的分區資料目錄test_topic-0/內緩存資料量是增加的, 也就是緩存有再次生產發送的message資料。

由此可見, 經過移動之後的tbds-172-16-16-12這台broker節點的編號為0的分區資料緩存目錄內, 並沒有新增緩存資料。 與之對應的, 沒有做分區資料移動操作的 tbds-172-16-16-16這台broker 節點的編號為0的分區緩存資料目錄內新增再次發送的資料。

是不是意味著不能在broker的磁片間移動分區資料呢?

1.2.4 調用重啟大法:重啟kafka

重啟kafka集群, 重啟完成後, 發現tbds-172-16-16-12這台broker節點的編號為0的分區緩存資料目錄內的資料也增加到正常水準。

表明重啟之後, broker的不同磁片間遷移資料已經生效。

1.2.5 驗證磁片間遷移分區資料生效

再次向test_topic發送500條資料, 同時消費資料, 然後查看資料情況:

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERgroupid1 test_topic 0 521 521 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3groupid1 test_topic 1 468 468 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3groupid1 test_topic 2 511 511 0 kafka-python-1.3.1_tbds-172-16-16-3/172.16.16.3

查看tbds-172-16-16-12 和 tbds-172-16-16-16 兩個broker節點的test_topic-0分區資料的緩存目錄:

發現兩個replicas完全一樣。

1.3 結論

Kafka broker 內部不同資料盤之間可以自由遷移分區資料目錄。 遷移完成後, 重啟kafka即可生效。

二、不同broker之間傳輸分區資料

當對kafka集群進行擴容之後, 由於新擴容的broker沒有緩存資料, 容易造成系統的資料分佈不均勻。 因此, 需要將原來集群broker的分區資料移轉到新擴容的broker節點。

不同broker之間傳輸分區資料, 可以使用kafka自帶的kafka-reassign-partitions.sh腳本工具實現。

我們在kafka測試集群原有的3台broker基礎上, 擴容1台broker。

2.1 獲取test_topic的分區分佈情況

執行命令:

./kafka-topics.sh --zookeeper 172.16.16.11:2181 --topic test_topic --describe

可以得到test_topic的3個分區(每個分區有2份replicas)在三個broker節點的分佈情況:

Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 1002 Replicas: 1002,1001 Isr: 1002,1001Topic: test_topic Partition: 1 Leader: 1003 Replicas: 1003,1002 Isr: 1003,1002Topic: test_topic Partition: 2 Leader: 1001 Replicas: 1001,1003 Isr: 1001,1003

2.2 獲取topic重新分區的配額檔

編寫分配腳本:move_kafka_topic.json內容如下:

{"topics": [{"topic":"test_topic"}], "version": 1}

執行分配計畫生成腳本:

./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --topics-to-move-json-file /tmp/move_kafka_topic.json --broker-list "1001,1002,1003,1004" --generate

命令裡面的broker-list填寫kafka集群4個broker的id。 不同kafka集群, 因為部署方式不一樣,

選擇的broker id也不一樣。 我們的測試集群broker id是1001,1002,1003,1004。 讀者需要根據自己的kafka集群設置的broker id填寫。

執行命令之後, 得到以下結果:

Current partition replica assignment #當前分區的副本分配{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1002,1001]},{"topic":"test_topic","partition":2,"replicas":[1001,1003]},{"topic":"test_topic","partition":1,"replicas":[1003,1002]}]}Proposed partition reassignment configuration #建議的分區配置{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}

Proposed partition reassignment configuration 後是根據命令列的指定的broker list生成的分區分配計畫json格式。 將 Proposed partition reassignment configuration的配置複製保存到一個檔中 move_kafka_topic_result.json:

{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}

2.3 對topic分區資料進行重新分佈

執行重新分配命令:

./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --reassignment-json-file /tmp/move_kafka_topic_result.json --execute

得到如下結果:

Current partition replica assignment{"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1002,1001]},{"topic":"test_topic","partition":2,"replicas":[1001,1003]},{"topic":"test_topic","partition":1,"replicas":[1003,1002]}]}Save this to use as the --reassignment-json-file option during rollbackSuccessfully started reassignment of partitions {"version":1,"partitions":[{"topic":"test_topic","partition":0,"replicas":[1001,1002]},{"topic":"test_topic","partition":2,"replicas":[1003,1004]},{"topic":"test_topic","partition":1,"replicas":[1002,1003]}]}

從返回結果來看, 分區資料重新分佈任務已經啟動成功。

2.4 查看分區資料重新分佈進度

檢查分配的狀態, 執行命令:

./kafka-reassign-partitions.sh --zookeeper tbds-172-16-16-11:2181 --reassignment-json-file /tmp/move_kafka_topic_result.json --verify

得到結果:

Status of partition reassignment:Reassignment of partition [test_topic,0] completed successfullyReassignment of partition [test_topic,2] completed successfullyReassignment of partition [test_topic,1] completed successfully

表明分區資料重新分步任務已經完成。

2.5 再次獲取test_topic的分區分佈情況

再次查看各個分區的分佈情況, 執行命令:

./kafka-topics.sh --zookeeper 172.16.16.11:2181 --topic test_topic --describe

得到返回結果:

Topic:test_topic PartitionCount:3 ReplicationFactor:2 Configs:Topic: test_topic Partition: 0 Leader: 1002 Replicas: 1001,1002 Isr: 1002,1001Topic: test_topic Partition: 1 Leader: 1003 Replicas: 1002,1003 Isr: 1003,1002Topic: test_topic Partition: 2 Leader: 1003 Replicas: 1003,1004 Isr: 1003,1004

從結果看出, test_topic的分區資料已經由原來的3個broker, 重新分佈到4個broker。

三、測試結論

Kafka broker 內部不同資料盤之間可以自由遷移分區資料目錄。 遷移完成後,重啟kafka即可生效;

Kafka 不同broker之前可以遷移資料,使用kafka自帶的kafka-reassign-partitions.sh腳本工具實現。

四、修復客戶的kafka集群故障

我們採用本文測試的方法,對該客戶的Kafka集群進行broker節點內部不同磁片間的資料移轉,對多個topic均進行了資料移轉,最終實現磁片間的資料緩存分佈均勻化。

同時,我們又對客戶的kafka集群進行擴容,擴容之後採用本文描述的不同broker之間遷移分區資料方法,對多個topic均進行了資料移轉,保證新擴容節點也有緩存資料,原來的broker節點存儲壓力減小。

遷移完成後,重啟kafka即可生效;

Kafka 不同broker之前可以遷移資料,使用kafka自帶的kafka-reassign-partitions.sh腳本工具實現。

四、修復客戶的kafka集群故障

我們採用本文測試的方法,對該客戶的Kafka集群進行broker節點內部不同磁片間的資料移轉,對多個topic均進行了資料移轉,最終實現磁片間的資料緩存分佈均勻化。

同時,我們又對客戶的kafka集群進行擴容,擴容之後採用本文描述的不同broker之間遷移分區資料方法,對多個topic均進行了資料移轉,保證新擴容節點也有緩存資料,原來的broker節點存儲壓力減小。

同類文章
Next Article
喜欢就按个赞吧!!!
点击关闭提示