更新時間:2019年11月08日14時53分 來源:傳智播客 瀏覽次數(shù):
一、 Flume攔截器介紹
攔截器是簡單的插件式組件,設(shè)置在source和channel之間。source接收到的時間,在寫入channel之前,攔截器都可以進行轉(zhuǎn)換或者刪除這些事件。每個攔截器只處理同一個source接收到的事件??梢宰远x攔截器。
flume內(nèi)置了很多攔截器,并且會定期的添加一些攔截器,下面我們學(xué)習(xí)flume內(nèi)置的,兩個經(jīng)常使用的攔截器。
1. Timestamp Interceptor(時間戳攔截器)
flume中一個最經(jīng)常使用的攔截器 ,該攔截器的作用是將時間戳插入到flume的事件報頭中。如果不使用任何攔截器,flume接受到的只有message。時間戳攔截器的配置。
參數(shù)默認值描述type類型名稱timestamp,也可以使用類名的全路徑。preserveExisting false 如果設(shè)置為true,若事件中報頭已經(jīng)存在,不會替換時間戳報頭的值。
a1.sources.r1.interceptors = timestamp a1.sources.r1.interceptors.timestamp.type=timestamp a1.sources.r1.interceptors.timestamp.preserveExisting=false
2. Host Interceptor(主機攔截器)
主機攔截器插入服務(wù)器的ip地址或者主機名,agent將這些內(nèi)容插入到事件的報頭中。時間報頭中的key使用hostHeader配置,默認是host。主機攔截器的配置參數(shù) 默認值 描述 type 類型名稱host hostHeader host 事件投的key useIP true 如果設(shè)置為false,host鍵插入主機名 preserveExisting false 如果設(shè)置為true,若事件中報頭已經(jīng)存在,不會替換host報頭的值
a1.sources.r1.interceptors = host a1.sources.r1.interceptors.host.type=host a1.sources.r1.interceptors.host.useIP=false a1.sources.r1.interceptors.timestamp.preserveExisting=true
二、 業(yè)務(wù)需求
使用flume內(nèi)置攔截器完成如下需求:
1. agent配置
# 03 timestamp and host interceptors work before source a1.sources.r1.interceptors = i1 i2 # 兩個interceptor串聯(lián),依次作用于event a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i2.type = host # flume event的頭部將添加 “hostname”:實際主機名 a1.sources.r1.interceptors.i2.hostHeader = hostname # 指定key,value將填充為flume agent所在節(jié)點的主機名 a1.sources.r1.interceptors.i2.useIP = false # IP和主機名,二選一即可 # 04 hdfs sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs: // master:9000 / flume / % Y - % m - % d / # hdfs sink將根據(jù)event header中的時間戳進行替換 # 和hostHeader的值保持一致,hdfs sink將提取event中key為hostnmae的值,基于該值創(chuàng)建文件名前綴 a1.sinks.k1.hdfs.filePrefix = % {hostname} # hdfs sink將根據(jù)event header中的hostnmae對應(yīng)的value進行替換 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollCount = 10 a1.sinks.k1.hdfs.rollSize = 1024000 # channel,memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind source,sink to channel a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
三、 驗證攔截器效果
1. 驗證思路
1)先將interceptor作用后的event,通過logger sink打印到console,驗證header是否正常添加
2)修改sink為hdfs, 觀察目錄和文件的名稱是否能夠按照預(yù)期創(chuàng)建(時間戳-目錄,hostname-文件前綴)
2. 驗證過程
1)發(fā)送header為空的http請求,logger sink打印event到終端,觀察event header中是否被添加了timestamp以及hostname
# 01 define agent name, source/sink/channel name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 02 source,http,jsonhandler a1.sources.r1.type = http a1.sources.r1.bind = node - 1 a1.sources.r1.port = 8888 a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler # 03 timestamp and host interceptors work before source a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i1.preserveExisting = false a1.sources.r1.interceptors.i2.type = host a1.sources.r1.interceptors.i2.hostHeader = hostname a1.sources.r1.interceptors.i2.useIP = false # 04 hdfs sink a1.sinks.k1.type = logger # channel,memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind source,sink to channel a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
啟動flume agent
bin/flume-ng agent -c ./conf -f ./conf/http_sink_logger_source.conf -n a1 -Dflume.root.logger=INFO,console
發(fā)送請求測試:
curl -X POST -d '[{"header":{},"body":"time-host-interceptor001"}]' http://node-1:8888
可以看到終端輸出的event header中已經(jīng)有了攔截器的信息
修改sink為hdfs, 觀察HDFS的目錄名(時間戳)和文件前綴(hostnme)
目錄名被正常替換(基于event header中的時間戳)
文件前綴被正常替換(基于event header中的hostname:實際主機名)
文件內(nèi)容被寫入為event的body
本文來自:傳智播客大數(shù)據(jù)培訓(xùn)學(xué)院