定风波

莫听穿林打叶声,何妨吟啸且徐行。竹杖芒鞋轻胜马,谁怕?一蓑烟雨任平生。

flume 1.9 的使用配置

flume 官网 Welcome to Apache Flume — Apache Flume
flume 的使用文档
Flume 1.9.0 User Guide — Apache Flume

flume 可以把一些文本输入到另外一些文本,并且可以对文本进行一些定制,添加字段,删除字段等。目前有很多是用来做日志收集 ,我这里是收集应用打出的日志(文件),写入到kafka ,在由logstash 读取kafka 中的日志流,写入到es中,最后由kibana 做日志的展示以及查询工作(elk)。
当然,此处的写入到kafka 中的日志是可以通过ksql 进行一些过滤,在进行一些操作,比如上报,日志分析等。

flume 的配置文件可以由三部分组成,输入source ,输出 skins ,缓冲 channel ,source 负责采集,skins 负责输出,channel 作为缓冲,可以调节输入和输出的速度。
flume 提供了很多的组件,也可以进行自定义组件以及一些拦截器。

这里,我们使用最新的flume1.9 (19年才发布release),使用这个版本是因为这个版本的kafka-client 进行了升级,这个版本的kafka可以设置kafka 数据的key 。

另外,flume 的taildir 在读取文件后,不会进行删除操作,我对原来的taildir 进行了拓展,添加了定期删除的任务。(这个很简单,如果有需要可以留言)

flume

下载好flume 后,定义好flume.conf 文件后,通过以下命令进行启动。

 ./flume-ng agent -c ../conf  -f flume.conf  -n a1 -Dflume.root.logger=INFO,console

-c 制定配置文件目录,使用默认目录即可
-f 制定flume 的配置文件,指定agetn 输入(source),输出(skins),以及缓冲(channel)
-n 指定要启动的agent ,agent 为自定义的配置文件中存在的
-Dflume.root.logger=INFO,console 指定日志的级别以及输出的地方(使用docker 启动的时候,必须要让日志输出的console ,因为docker 判断进程有没有运行就是在看有没有日志)

本文中的kafka 为本机跑的docker,怎么用docker 跑kafka 可以参考另外一篇文章
docker环境下Kafka1.1.0 搭建

文章中的配置为测试配置,生产环境请根据硬件设置及kafka相关进行修改 ,channel的size 等。

有一点需要注意的是,taildir 默认只能匹配文件,不能够匹配路径,并且在进行正则表达式匹配文件时,需要以“ . ” 开始进行匹配,如下文具体配置实例

flume 单topic 发送配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/zrc/soft/source/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zrc/soft/source/.*-a.*.log

# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic-a
a1.sinks.k1.kafka.bootstrap.servers = 192.168.5.16:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume 多source 多topic 发送

需要定义多个输入,多个输出,多个channel ,避免数据的重复发送
这个用途在与,我们的日志是根据类型输入到不同的文件中的,比如业务日志,远程调用日志等,会写入到不同的文件,并且我们需要把这个日志写入到不同的kafka topic ,这样才方便后面的工作,所以才有这个配置

a1.sources = r1 r2 
a1.sinks = k1 k2 
a1.channels = c1 c2 

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/zrc/soft/source/taildir_position-a.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zrc/soft/source/.*-a.*.log

a1.sources.r2.type = TAILDIR
a1.sources.r2.channels = c1
a1.sources.r2.positionFile = /home/zrc/soft/source/taildir_position-b.json
a1.sources.r2.filegroups = f1
a1.sources.r2.filegroups.f1 = /home/zrc/soft/source/.*-b.*.log


# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic-a
a1.sinks.k1.kafka.bootstrap.servers = 192.168.5.16:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

a1.sinks.k2.channel = c2
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic-b
a1.sinks.k2.kafka.bootstrap.servers = 192.168.5.16:9092
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
a1.sinks.k2.kafka.producer.compression.type = snappy

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 500
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 500
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2

新版本的配置

flume 1.9 中可以在source 中指定发送的topic ,只需要在source 中指定header 为topicHeader 即可,配置文件如下
配置多个taildir ,每个对应一个单独的header

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = cn.migu.music.MiguTaildirSource
#a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/zrc/soft/source/taildir_position.json
a1.sources.r1.sourceAliveDays = 2
a1.sources.r1.deleteSource = true
#a1.sources.r1.filegroups = f2
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/zrc/soft/source/.*-api.*.log
a1.sources.r1.headers.f1.topicHeader = topic-api
a1.sources.r1.headers.f1.topic = topic-default
a1.sources.r1.filegroups.f2 = /home/zrc/soft/source/.*-(?!(api)).*.log
a1.sources.r1.headers.f2.topicHeader = topic-default
a1.sources.r1.headers.f2.topic = topic-api

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic-api
a1.sinks.k1.kafka.bootstrap.servers = 172.17.0.1:9092
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sinks.k1.kafka.flumeBatchSize = 500
a1.sinks.k1.kafka.producer.max.request.size = 1000

a1.channels.c1.type = memory
a1.channels.c1.capacity = 150
a1.channels.c1.transactionCapacity = 150
点赞

发表评论

电子邮件地址不会被公开。