flume
主要用于数据采集 暂时只做c/s架构的配置 后续可以更改为负载均衡模式等
安装
上传apache-flume-1.7.0-bin.tar.gz 点击这里下载
解压:
tar -zxvf apache-flume-1.7.0-bin.tar.gz
创建目录:
mkdir -p /usr/local/flume
移动flume至/usr/local/flume:
mv apache-flume-1.7.0-bin /usr/local/flume
使用软连接指定java:
ln -snf /usr/local/flume/apache-flume-1.7.0-bin /usr/local/flume/flume
配置c/s架构
a.client配置示例:
vi /usr/local/flume/flume/conf/client.properties
#命名此代理上的组件
client.sources = s1
client.channels = c1
client.sinks = sk1
#配置源 使用类型为linux命令 直接读取nginx日志
client.sources.s1.type=exec
client.sources.s1.command=tail -F /data/logs/nginx/chayu.bet.access.log
#使用在内存中缓存的通道
client.channels.c1.type=memory
#存储在通道中的最大缓存量
client.channels.c1.capacity=10000
#每次最大的交易量
client.channels.c1.transactionCapacity=100
#配置接收器 使用avro协议
client.sinks.sk1.type= avro
#server地址
client.sinks.sk1.hostname=192.168.41.218
#server端口
client.sinks.sk1.port = 44444
#将源和接收器绑定通道
client.sources.s1.channels=c1
client.sinks.sk1.channel=c1
b.server配置示例:
vi/usr/local/flume/flume/conf/server.properties
#命名此代理上的组件
server.sources = s1
server.channels = c1
server.sinks = sk1
#配置源 使用avro接受数据
server.sources.s1.type = avro
#配置监听
server.sources.s1.bind = 0.0.0.0
#配置监听端口
server.sources.s1.port = 44444
#使用在内存中缓存的通道
server.channels.c1.type=memory
#存储在通道中的最大缓存量
server.channels.c1.capacity=10000
#每次最大的交易量
server.channels.c1.transactionCapacity=100
#配置接收器 传递给kafka
client.sinks.sk1.type= org.apache.flume.sink.kafka.KafkaSink
#指定kafka地址
server.sinks.sk1.brokerList=192.168.137.120:9092,192.168.137.130:9092,192.1 68.137.140:9092
#指定topic
server.sinks.sk1.topic=ran
server.sinks.sk1.serializer.class=kafka.serializer.StringEncoder
#将源和接收器绑定通道
server.sources.s1.channels=c1
server.sinks.sk1.channel=c1
最后把命令放到/etc/profie 下即可:
export PATH=/usr/local/flume/flume/bin:$PATH
使用
使用screen -S flume 命令启动一个session
cd /usr/local/flume/flume
./bin/flume-ng agent -n agent -c conf -f conf/test.properties -Dflume.root.logger=INFO,console
-c指定使用使用conf下的配置 -f 指定一个配置文件
值得注意的地方
使用内存在高并发下会丢失数据 故为了保证数据不丢失应使用file类型类似如 下配置:
server.channels.c1.type = file
server.channels.c1.checkpointDir = /data/flume/checkPoint
server.channels.c1.useDualCheckpoints = true
server.channels.c1.backupCheckpointDir = /data/flume/checkPoint2
server.channels.c1.dataDirs = /data/flume/data
server.channels.c1.transactionCapacity = 10000
server.channels.c1.checkpointInterval = 30000
server.channels.c1.maxFileSize = 4292870142
server.channels.c1.minimumRequiredSpace = 524288000
server.channels.c1.capacity = 100000