# my-own-kafka
**Repository Path**: Jyokiyi/my-own-kafka
## Basic Information
- **Project Name**: my-own-kafka
- **Description**: spring kafka消息队列的简单使用
- **Primary Language**: Java
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2018-03-05
- **Last Updated**: 2024-07-16
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# spring-kafka
* 本项目使用maven构建
* 版本:kafka_2.11-1.0.0.tgz,springboot2.0,spring-kafka-2.1.4.RELEASE,kafka-clients-1.0.0.jar。
* Linux系统centos7
## kafka的启动和其他操作
进入kafka的安装目录下运行下面的命令
* 在启动kafka之前,编辑一下config/server.properties文件,使用如下命令:
vi config/server.properties
在文件中追加host.name=192.168.198.130(此为Linux服务器的ip,运行时请改成自己的服务器IP地址)
* 启动zookeeper服务:
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.ry.log &
* 启动kafka服务:
nohup bin/kafka-server-start.sh config/server.properties > server.ry.log &
* 当使用java连接kafka时,应该开放kafka所使用的端口--9092,9032,9094,可以在Windows下使用如下命令来检测服务器是否开放了9092端口:
telnet 192.168.198.130 9092
(若服务器端口未对外开放,则会出现如下警告和错误 :[Consumer clientId=consumer-2, groupId=myGroup] Connection to node -1 could not be established. Broker may not be available.org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.)
* 查看所有的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
* 查看指定的topic的描述(此处查看名为"test"的主题):
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
* 使用单个分区并且只有一个副本,创建一个名为:"test"的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
* kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到kafka集群。默认情况下,每一行将作为一个单独的消息发送。
运行生产者,然后在控制台中输入一些消息,发送给服务器:
bin/kafka-console-producer.sh --broker-list 192.168.198.130:9092 --topic test
* kafka还有一个命令行用户,它会将消息转储到标准输出。
bin/kafka-console-consumer.sh --bootstrap-server 192.168.198.130:9092 --topic test --from-beginning
* 集群---对于单个broker就是一个大小为1的集群,所以除了启动更多的broker实例之外,没有什么变化。将集群扩展为3个节点(仍在本地机器上)。
首先我们要为每个broker节点创建一个配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑配置文件:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
在集群中各个节点拥有唯一的broker.id
属性和不变的名字。我们必须重写port端口和日志目录,因为我们将所有这些都运行在同一台机器上。下面运行新增加的节点:
nohup bin/kafka-server-start.sh config/server-1.properties >server1.ry.log &
nohup bin/kafka-server-start.sh config/server-2.properties >server2.ry.log &
创建一个新的主题my-replicated-topic
拥有三个复制因子:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看新主题my-replicated-topic
:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

第一行给出了所有分区的摘要,每一个额外的行提供了关于一个分区的信息。因为这个主题只有一个分区,所以只有一行。
"leader"节点负责给定分区的所有读写,每个节点将成为分区随机选择部分的领导者。
"replicas"是所有的节点,为区分复写日志,不管他们是leader或即使他们当前是活跃的。
"isr" 用于设置"同步"replicas. 是replicas列表的子设定,该副本列表当前是活跃的,并被leader所捕获。
## 类的说明
* com.my.kafka.serer.DataProducer生产者类,此处用了一个定时器做消息的发送处理,每5秒会发送一次消息。
* com.my.kafka.serer.DataConsumer消费者,用来处理生产者生产的消息