跳到主要内容

Kafka

[TOC]


Kafka

By:weimenghua
Date:2022.03.30
Description:Kafka 消息队列

参考资料
Kafka 教程

一、环境搭建

  1. 下载及解压 kafka

Kafka 下载地址

https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz

tar -xzvf kafka_2.12-2.4.0.tgz
  1. 配置 kafka

kafka 的配置文件在 config/server.properties 文件中
broker.id 是 kafka broker 的编号,集群里每个 broker 的 id 需不同:
broker.id=0

listeners 是监听地址,需要提供外网服务的话,要设置本地的 IP 地址: listeners=PLAINTEXT://127.0.0.1:9092

kafka 运行日志存放的路径:
log.dirs=/tmp/kafka_2.10-0.8.2.2/logs

zookeeper 配置:
zookeeper.connect=127.0.0.1:2181,1127.0.0..2:2181,127.0.0.3:2181

  1. 启动及停止 kafka

3.1 启动 kafka bin/kafka-server-start.sh -daemon config/server.properties
-daemon 参数会将任务转入后台运行,输出日志信息将写入日志文件,日志文件在执行命令的目录下的 logs 目录中 kafkaServer.out,结尾输同 started 说明启动成功。 也可以用 jps 命令,看有没有 kafka 的进程

3.2 停止 kafka bin/kafka-server-stop.sh config/server.properties

二、Kafka Topic

查看 topic 列表  
./kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看特定主题的详细信息
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic testTopic

创建 topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 3 --topic topicTest

发送消息
ip 地址应用本地 ip,使用127.0.0.1会报错
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topicTest

接收消息
在2.5.0版本之前只支持 --broker-list;在2.5.0版本之后支持 --bootstrap-server,如果版本老于2.2,应该用--zookeeper;可使用127.0.0.1
./kafka-console-consumer.sh --broker-list 127.0.0.1:9092 --topic topicTest --from-beginning
./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic topicTest --from-beginning

删除 topic
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic testTopic

K8s Kafka

进入 pod
kubectl --insecure-skip-tls-verify exec -it -n <namespace> kafka-0 -- /bin/bash

获取 ClusterIP
kubectl --insecure-skip-tls-verify get services -n <namespace> |grep kafka

查看配置
find / -name server.properties
cat /opt/kafka/config/server.properties

Kafka 的命令行工具,查看 topics
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --list
kubectl --insecure-skip-tls-verify exec -it kafka-0 -n <namespace> -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --list

查看某个 topic 信息
/opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --describe --topic <topic_name>

查看某个 topic 内容
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server <ClusterIP>:9092 --topic <topic_name> --from-beginning

创建 topic
kubectl --insecure-skip-tls-verify exec -it kafka-0 -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server <ClusterIP>:9092 --topic my-topic --create --partitions 3 --replication-factor 3

发送消息,在终端中逐行输入要发送的消息,每条消息按 Enter 键发送
kubectl --insecure-skip-tls-verify exec -it kafka-0 -n <namespace> -- /opt/kafka/bin/kafka-console-producer.sh --broker-list <ClusterIP>:9092 --topic my-topic