Kafka
[TOC]
Kafka
By:weimenghua
Date:2022.03.30
Description:Kafka 消息队列
参考资料
Kafka 教程
一、环境搭建
- 下载及解压 kafka
https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
tar -xzvf kafka_2.12-2.4.0.tgz
- 配置 kafka
kafka 的配置文件在 config/server.properties 文件中
broker.id 是 kafka broker 的编号,集群里每个 broker 的 id 需不同:
broker.id=0
listeners 是监听地址,需要提供外网服务的话,要设置本地的 IP 地址: listeners=PLAINTEXT://127.0.0.1:9092
kafka 运行日志存放的路径:
log.dirs=/tmp/kafka_2.10-0.8.2.2/logs
zookeeper 配置:
zookeeper.connect=127.0.0.1:2181,1127.0.0..2:2181,127.0.0.3:2181
- 启动及停止 kafka
3.1 启动 kafka
bin/kafka-server-start.sh -daemon config/server.properties
-daemon 参数会将任务转入后台运行,输出日志信息将写入日志文件,日志文件在执行命令的目录下的 logs 目录中 kafkaServer.out,结尾输同 started 说明启动成功。
也可以用 jps 命令,看有没有 kafka 的进程
3.2 停止 kafka
bin/kafka-server-stop.sh config/server.properties
二、Kafka Topic
查看 topic 列表
./kafka-topics.sh --list --zookeeper 127.0.0.1:2181
查看特定主题的详细信息
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic testTopic
创建 topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic topicTest
发送消息
ip 地址应用本地 ip,使用127.0.0.1会报错
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topicTest
接收消息
在2.5.0版本之前只支持 --broker-list;在2.5.0版本之后支持 --bootstrap-server,如果版本老于2.2,应该用--zookeeper;可使用127.0.0.1
./kafka-console-consumer.sh --broker-list 127.0.0.1:9092 --topic topicTest --from-beginning
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic topicTest --from-beginning
删除 topic
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic testTopic
K8s Kafka
进入 pod
kubectl --insecure-skip-tls-verify exec -it -n <namespace> kafka-0 -- /bin/bash
获取 ClusterIP
kubectl --insecure-skip-tls-verify get services -n <namespace> |grep kafka
查看配置
find / -name server.properties
cat /opt/kafka/config/server.properties
Kafka 的命令行工具,查看 topics
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --list
kubectl --insecure-skip-tls-verify exec -it kafka-0 -n <namespace> -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --list
查看某个 topic 信息
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --describe --topic <topic_name>
查看某个 topic 内容
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <ClusterIP>:9092 --topic <topic_name> --from-beginning
创建 topic
kubectl --insecure-skip-tls-verify exec -it kafka-0 -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --topic my-topic --create --partitions 3 --replication-factor 3
发送消息,在终端中逐行输入要发送的消息,每条消息按 Enter 键发送
kubectl --insecure-skip-tls-verify exec -it kafka-0 -n <namespace> -- /opt/kafka/bin/kafka-console-producer.sh --broker-list <ClusterIP>:9092 --topic my-topic