您的位置:首頁>正文

一種基於kafka+storm實現的日誌記錄方法(二)

引言

上一篇分享博文《一種基於kafka+storm實現的日誌記錄方法》,講述了一種基於大資料即時運算實現的日誌記錄方式。 在文中只是提出了一種技術實現思路, 以及整體架構, 並且在我所在的項目中已經進行了實踐, 感興趣的朋友, 可以進一步完善, 比如添加許可權等, 實現一種新日誌平臺的搭建。

博文發佈後, 有網友留言希望公開部分源碼。 今天準備整理下我們已經實現的代碼, 去掉公司業務部分, 做一個簡單share, 以回應網友要求。 本文不再對整體實現流程進行講解, 感興趣的朋友請直接前往上一遍博文。

代碼實現主要分兩部分:第一部分是java用戶端往kafka寫日誌消息(生產者);第二部分是storm消費kafka日誌消息, 歸類, 批量寫入hbase。 從hbase查詢日誌部分比較簡單, 代碼就不提供了。

由於這週末還要準備一個“晉升答辯”, 本次分享只整理出來第一部分“java用戶端往kafka寫日誌消息”。

Java寫日誌消息到kafka

我實現的第一版發送日誌消息到kafka是複用的“點擊流”日誌上報流程, 即用nginx+lua實現的http介面, 往kafka寫消息, 當然也有採用nginx+go語言實現的。 這種方式適用於做頁面埋點, 當使用者流覽頁面產生點擊操作時調用該http介面, 往kafka寫日誌, 當時主要是想通過這種方式實現點擊熱力圖、注意力熱圖等。 這種方式實現的http介面性能相當優異, 而且在支持高併發、高輸送量方面表現優異,

現在在各大電商網站廣泛的運用, 使用者收集使用者的行為資料, 這些都是做大資料計算、分析、以及智慧推薦的基礎。

好吧不扯遠了, 後面有時間再分享下我們做大資料即時計算、以及智慧推薦相關實現。 既然這種nginx+lua+kafka的方式實現的http介面能支援每天海量的“點擊流”日誌上報, 那它同樣能滿足“伺服器”端的日誌記錄, 而且這點日誌量對於該http介面來說簡直毫無壓力。 我的第一版實現很簡單, 直接在java服務端適用httpclient構造http請求, 調用該http介面進行“伺服器”端的日誌上報。 而且正如料想的一樣, 毫無壓力。

這僅僅是我們的第一次嘗試, 但每次列印日誌都需要調用一個http介面, 我還是覺得很彆扭, 而且http介面還是有一定的網路開銷。 既然這種方式可行,

那就可以放棄http介面, 直接在java應用伺服器端直連kafka發送日誌消息, 如果是http介面還有一點網路開銷的話(10ms-50ms), 這種方式對“應用伺服器”來說毫無感知(1-2ms), 這也是我想要的效果, 畢竟只是列印一條日誌。 我把這個想法告訴我的同事“丹哥”(外號甄子丹), 最後把這部分代碼實現做成一個jar包, 在需要採用這種方式列印日誌的系統引入這個jar包, 再做一些配置即可。

核心代碼講解

下面我們來看下該jar包的核心代碼LogCollectorClient類:

@Component public class LogCollectorClient { private static final Log log = LogFactory.getLog(LogCollectorClient.class); //kafka生產者(京東對kafka做了一些簡單封裝, 簡稱JDQ) private JDQProducerClient producer = null; private boolean HASAUTH = false; //每一批日誌量, 批量上報日誌使用 protected int OFFSET = 500; //spring 讀取properties設定檔 @Resource private Environment env; //初始化方法 @PostConstruct private void init { try { //step1:連接kafka許可權驗證, 公司對kafka做的許可權封裝, 可以根據自己公司kafka具體情況調整 Authentication e = new Authentication(env.getProperty("kafka_key"), env.getProperty("test_token"));//開發、測試環境kafka //step2 設置kafka生成者相關配置屬性 Properties pros = new Properties; pros.setProperty("partitioner.class", env.getProperty("partitioner"));//指定分片策略 pros.setProperty("producer.type", env.getProperty("producer.type")); pros.setProperty("compression.codec", env.getProperty("compression.codec")); pros.setProperty("request.required.acks", env.getProperty("request.required.acks")); //step3 初始化kafka生產者用戶端 this.producer = new JDQProducerClient(e, pros); } catch (Exception var4) { log.info("kafaka鑒權初始化失敗!"); } } /** * 上報一條單條日誌 * @param key * @param type * @param logMap * @throws JDQOverSpeedException * @throws JDQException */ public void sendLogInfo(String key, String type, Map logMap) throws JDQOverSpeedException, JDQException { if(StringUtils.isNotBlank(key) && StringUtils.isNotBlank(type) && null != logMap && !logMap.isEmpty) { this.producer.send(new JDQMessage(key + "_" + type, this.assembleJsonStr(key, type, logMap).getBytes)); } } /** * 轉換成json格式上報 * @param key * @param type * @param logMap * @return */ private String assembleJsonStr(String key, String type, Map logMap) { StringBuffer valueStr = new StringBuffer; Iterator logInfo = logMap.entrySet.iterator; while(logInfo.hasNext) { Map.Entry entry = (Map.Entry)logInfo.next; valueStr.append((StringUtils.isNotBlank((String)entry.getKey)?((String)entry.getKey).replaceAll("&", " "):"") + "=").append(StringUtils.isNotBlank((String)entry.getValue)?((String)entry.getValue).replaceAll("&", " "):"").append("&"); } LogAssembleInfo logInfo1 = new LogAssembleInfo("key=" + key + "&type=" + type + "&" + valueStr.toString, DateUtil.getTime); return JsonUtil.write2JsonStr(logInfo1); } @PreDestroy private void destroy { if(null != this.producer) { this.producer.close; } } }

這個類其實很簡單,

說明如下:

1、採用@Component注解, 說明只是一個簡單的spring 單例 bean, spring容器啟動時注入到容器中。

2、@PostConstruct 注解的init方法, bean初始化時, 就會初始化一個kafka生產者物件, 我們公司kafka團隊對kafka做了簡單的封裝 JDQProducerClient本質上對應的是kafka的kafka.javaapi.producer.Producer。 如果你使用的原生kafka, 生產者的初始化方法如下:

public static void main(String[] args) throws Exception { Properties prop = new Properties; prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181"); prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092"); prop.put("serializer.class", StringEncoder.class.getName); Producer producer = new Producer(new ProducerConfig(prop)); int i = 0; while(true){ producer.send(new KeyedMessage("test111", "msg:"+i++)); Thread.sleep(1000); } }

3、關於kafka初始化的相關配置資訊放到一個properties的設定檔中, 通過spring的Environment環境上下文物件的getProperty方法獲取與連接kafka所以的配置。

4、採用@PreDestroy注解的destroy方法, 這裡是應用伺服器tomcat停止之前, 優雅的自動關閉kafka連接。

5、最後來看下日誌上報方法sendLogInfo, 在需要上報日誌的類中注入LogCollectorClient物件即可, 如下:

@Component public class TestService { @Resource private LogCollectorClient log; public void publish{ //省略業務代碼 //開始上報日誌,
日誌內容放到一個map裡 Map param = new HashMap; param.put("time", DateTimeUtils.getDateTime); param.put("logs", "xxx發佈活動"); log.sendLogInfo(pageId, SystemConstant.APP_ID, param); } }

sendLogInfo日誌上報方法需要三個參數:

第一個是查詢key, hbase日誌表中rowkey構成部分。 比如:這裡的pageId, 發佈頁面的id。

第二個是系統id, 用於區分hbase日誌表, 每個系統對應一個固定的常量。

第三個是需要答應的日誌內容, 考慮到列印的日誌可能比較多, 這裡用一個map存放, 也可以改為一個String。

好了, 關於java先kafka上報日誌的核心類LogCollectorClient講解完畢。 正如前面所說, 把LogCollectorClient類打成一個jar包, 在需要日誌列印的應用系統裡引入這個jar包, 以及一個kafka的properties設定檔即可。

當然, 你可以把kafka需要的配置當做常量寫死在jar包中的一個常量類中, 這樣應用系統只需要一個jar包即可。

優化

最後你還可以對上述LogCollectorClient類做一些優化:

1、比如加一個執行緒池, 把上報日誌該為非同步上報, 在執行緒中處理kafka·異常。這樣即便kafka·出現問題時,也不會影響正常業務,唯一影響的就是日誌會丟失。

2、另外如果你的日誌量很大,你還可以採用kafka·的批量上報,當日誌量達到一定條數後 才調用一次producer的sender方法。

3、也許你已經發現了這裡上報的日誌內容是json格式,為了更加高效你改為pb格式。

最後需要說明的是:理論上這種日誌記錄方式可以完全代替傳統的日誌列印到檔的方式,比如Log4j。但是沒有必要,個人覺得一些無關緊要的調試日誌還是使用Log4j,對於一些敏感日誌或者重要的流水日誌,採用這種方式。 Log4j列印日誌更簡單,基於kafka+storm的更加安全、永久存放、日誌更集中(一張hbase表中),二者結合使用天衣無縫。

關於第一部分“java用戶端往kafka寫日誌消息(生產者)”就這裡,由於下周還有一個“晉升答辯”需要準備,自己祝福自己希望這次晉升能成功吧。第二部分“storm消費kafka日誌消息放到hbase”只能緩幾天,才能整理出來啦,忘諒解。

在執行緒中處理kafka·異常。這樣即便kafka·出現問題時,也不會影響正常業務,唯一影響的就是日誌會丟失。

2、另外如果你的日誌量很大,你還可以採用kafka·的批量上報,當日誌量達到一定條數後 才調用一次producer的sender方法。

3、也許你已經發現了這裡上報的日誌內容是json格式,為了更加高效你改為pb格式。

最後需要說明的是:理論上這種日誌記錄方式可以完全代替傳統的日誌列印到檔的方式,比如Log4j。但是沒有必要,個人覺得一些無關緊要的調試日誌還是使用Log4j,對於一些敏感日誌或者重要的流水日誌,採用這種方式。 Log4j列印日誌更簡單,基於kafka+storm的更加安全、永久存放、日誌更集中(一張hbase表中),二者結合使用天衣無縫。

關於第一部分“java用戶端往kafka寫日誌消息(生產者)”就這裡,由於下周還有一個“晉升答辯”需要準備,自己祝福自己希望這次晉升能成功吧。第二部分“storm消費kafka日誌消息放到hbase”只能緩幾天,才能整理出來啦,忘諒解。

同類文章
Next Article
喜欢就按个赞吧!!!
点击关闭提示