kafka

消息队列

安装

上传kafka_2.11-0.10.2.0.tgz 点击这里下载

(kafka 使用3台机器做集群 主要是配置的区别)

解压:

    tar -zxvf kafka_2.11-0.10.2.0.tgz

创建目录:

    mkdir -p /usr/local/kafka

移动jdk至/usr/local/kafka:

    mv kafka_2.11-0.10.2.0  /usr/local/kafka

使用软连接指定kafka:

    ln -snf /usr/local/kafka/kafka_2.11-0.10.2.0  /usr/local/kafka/kafka

配置

(应使用3台做集群 不同的机器需要不同的配置会指出)

vi /usr/local/kafka/kafka/config/server.properties

#每一个broker在集群中的唯一标识 做集群每一个的都需要不一样
broker.id=120
#配置监听主要为生产者地址
listeners=PLAINTEXT://192.168.137.130:9092
#网络处理线程数 默认即可
num.network.threads=3
#I/O处理线程数默认即可
num.io.threads=8
#发送缓存区buffer大小 数据不是已下载发送是先缓冲到指定值之后发送
socket.send.buffer.bytes=102400
#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能  超过java的堆栈大小
socket.request.max.bytes=104857600
#存放消息的目录 可为多个 以,分割
log.dirs=/data/kafka
#默认的分区数 一个topic默认1个分区数
num.partitions=1
#每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
#默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,    kafka会新起一个文件
log.segment.bytes=1073741824
 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.retention.check.interval.ms=300000
#配置zookeeper管理地址
zookeeper.connect=192.168.137.120:2181,192.168.137.130:2181,192.168.137.140:2181
#配置zookeeper连接超时时间
zookeeper.connection.timeout.ms=6000

简而言之只需要broker.id配置不相同即可

最后把命令放到/etc/profie 下即可:

export PATH=/usr/local/kafka/kafka/bin:$PATH

使用

启动:

kafka-server-start.sh -daemon server.properties

关闭:

kafka-server-stop.sh

查看topic:

kafka-topics.sh --list --zookeeper localhost:2181

查看topic状态:

kafka-topics.sh --describe --zookeeper localhost:2181 --topic test