**底細**
日志體制接入的日志種類多、形式復雜多樣,主流的有以下幾種日志:
– Filebeat采集到的文即日志,形式多樣
– Winbeat采集到的操縱體制日志
– 器材上報到Logstash的syslog日志
– 接入到Kafka的業務日志
以上通過不同種類渠道接入的日志,存在2個重要的疑問:
– 形式不統一、不規范、尺度化不夠
– 如何從各類日志中提掏出用戶關懷的指標,發掘更多的業務代價
為了解決上面2個疑問,我們基于Flink和Drools條例引擎做了即時的日志處置辦事。
**體制條理**
條理對照簡樸,條理圖如下:
各類日志都是通過Kafka匯總,做日志中轉。
Flink花費Kafka的數據,同時通過API調用拉取Drools條例引擎,對日志做分析處置后,將分析后的數據儲備到Elasticsearch中,用于日志的搜索和解析等業務。
為了監控日志分析的即時狀態,Flink會將日志處置的統計數據,如每分鐘處置的日志量,每種日志從各個機械IP來的日志量運彩 兩邊都買寫到Redis中,用于監控統計。
**模塊介紹**
體制項目起名為Eagle。
eagle-api:基于Spring Boot,作為Drools條例引擎的寫入和讀取API辦事。
eagle-mon:通用類模塊。
eagle-log:基于Flink的日志處置辦事。
焦點講一下eagle-log:
**對接kafka、ES和Redis**
對接Kafka和ES都對照簡樸,用的官方的connector(flink-connector-k運彩 會員代碼afka-0.10和flink-connector-elasticsearch6),詳見代碼。
對接Redis,最開端用的是org.apache.bahir提供的redis connector,后來發明敏捷度不夠,就採用了Jedis。
在將統計數據寫入redis的時候,最開端用的keyby分組后緩存了分組數據,在sink中做統計處置后寫入,參考代碼如下:
“`
String name = “redis-agg-log”;
DataStream>> keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex())
.timeWindo(Time.seconds(indoTime)).trigger(ne CountTriggerWithTimeout(indoCount, TimeCharacteristic.ProcessingTime))
.process(ne ProcessWindoFunction>, String, TimeWindo>() {
Overre
public vo process(String s, Context context, Iterable運彩 一直輸 iterable, Collector>> collector) {
ArrayList logs = Lists.neArrayList(iterable);
if (logs.size() > 0) {
collector.collect(ne Tuple2(s, logs));
}
}
}).setParallelism(redisSinkParallelism).name(name).u(name);
“`
后來發明這樣做對內存耗損對照大,實在不需求緩存整個分組的原始數據,只需求一個統計數據就OK了,優化后:
“`
String name = “redis-agg-log”;
DataStream keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex())
.timeWindo(Time.seconds(indoTime))
.trigger(ne CountTriggerWithTimeout(indoCount, TimeCharacteristic.ProcessingTime))
.aggregate(ne LogStatAggregateFunction(), ne LogStatWindoFunction())
.setParallelism(redisSinkParallelism).name(name).u(name);
“`
這里使用了Flink的聚合函數和Accumulator,通過Flink的agg操作做統計,減輕了內存消耗的壓力。
**使用Broadcast廣播Drools規則引擎**
1、Drools規則流通過broadcast map state廣播出去。
2、Kafka的數據流connect規則流處理日志。
“`
廣播規則流
en場中賽事v.addSource(ne RuleSourceFunction(ruleUrl)).name(ruleName).u(ruleName).setParallelism(1)
.broadcast(ruleStateDescriptor);
Kafka數據流
FlinkKafkaConsumer010 source = ne FlinkKafkaConsumer010(kafkaTopic, ne LogSchema(), properties);
env.addSource(source).name(kafkaTopic).u(kafkaTopic).setParallelism(kafkaParallelism);
數據流connect條例流處置日志
BroadcastConnectedStream connectedStreams = dataSource.connect(ruleSource);
connectedStreams.process(ne LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).u(name);
“`
具運彩 獨贏意思體細節參考開源代碼。
**小結**
本體制提供了一個基于Flink的即時數據處置參考,對接了Kafka、Redis和Elasticsearch,通過可部署的Drools條例引擎,將數據處置邏輯部署化和動態化。
對于處置后的數據,也可以對接到其他sink,為其他各類業務平臺提供數據的分析、清洗和尺度化辦事。
> 云棲號在線上課每日都有產物專業專家分享!
> 課程地址:syqh.aliyun.live
> 當即參加社群,與專家面臨面,及時了解課程最新動態!
> 云棲號在線上課 社群sc.tb.cnF3.Z8gvnK
原文發行時間:2024-07-
本文作者: aoxiang
本文來自:,了解關連信息可以注目dockone