# my-own-kafka **Repository Path**: Jyokiyi/my-own-kafka ## Basic Information - **Project Name**: my-own-kafka - **Description**: spring kafka消息队列的简单使用 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2018-03-05 - **Last Updated**: 2024-07-16 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # spring-kafka * 本项目使用maven构建 * 版本:kafka_2.11-1.0.0.tgz,springboot2.0,spring-kafka-2.1.4.RELEASE,kafka-clients-1.0.0.jar。 * Linux系统centos7 ## kafka的启动和其他操作 进入kafka的安装目录下运行下面的命令 * 在启动kafka之前,编辑一下config/server.properties文件,使用如下命令: vi config/server.properties 在文件中追加host.name=192.168.198.130(此为Linux服务器的ip,运行时请改成自己的服务器IP地址) * 启动zookeeper服务: nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper.ry.log & * 启动kafka服务: nohup bin/kafka-server-start.sh config/server.properties > server.ry.log & * 当使用java连接kafka时,应该开放kafka所使用的端口--9092,9032,9094,可以在Windows下使用如下命令来检测服务器是否开放了9092端口: telnet 192.168.198.130 9092 (若服务器端口未对外开放,则会出现如下警告和错误 :[Consumer clientId=consumer-2, groupId=myGroup] Connection to node -1 could not be established. Broker may not be available.org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.) * 查看所有的topic: bin/kafka-topics.sh --list --zookeeper localhost:2181 * 查看指定的topic的描述(此处查看名为"test"的主题): bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test * 使用单个分区并且只有一个副本,创建一个名为:"test"的主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test * kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到kafka集群。默认情况下,每一行将作为一个单独的消息发送。 运行生产者,然后在控制台中输入一些消息,发送给服务器: bin/kafka-console-producer.sh --broker-list 192.168.198.130:9092 --topic test * kafka还有一个命令行用户,它会将消息转储到标准输出。 bin/kafka-console-consumer.sh --bootstrap-server 192.168.198.130:9092 --topic test --from-beginning * 集群---对于单个broker就是一个大小为1的集群,所以除了启动更多的broker实例之外,没有什么变化。将集群扩展为3个节点(仍在本地机器上)。 首先我们要为每个broker节点创建一个配置文件: cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties 编辑配置文件: config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 在集群中各个节点拥有唯一的broker.id属性和不变的名字。我们必须重写port端口和日志目录,因为我们将所有这些都运行在同一台机器上。下面运行新增加的节点: nohup bin/kafka-server-start.sh config/server-1.properties >server1.ry.log & nohup bin/kafka-server-start.sh config/server-2.properties >server2.ry.log & 创建一个新的主题my-replicated-topic拥有三个复制因子: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic 查看新主题my-replicated-topicbin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic ![my-replicated-topic](mul-broker.png) 第一行给出了所有分区的摘要,每一个额外的行提供了关于一个分区的信息。因为这个主题只有一个分区,所以只有一行。 "leader"节点负责给定分区的所有读写,每个节点将成为分区随机选择部分的领导者。 "replicas"是所有的节点,为区分复写日志,不管他们是leader或即使他们当前是活跃的。 "isr" 用于设置"同步"replicas. 是replicas列表的子设定,该副本列表当前是活跃的,并被leader所捕获。 ## 类的说明 * com.my.kafka.serer.DataProducer生产者类,此处用了一个定时器做消息的发送处理,每5秒会发送一次消息。 * com.my.kafka.serer.DataConsumer消费者,用来处理生产者生产的消息