The following article is from 石臻臻的杂货铺 Author 石臻臻
CSDN博客之星Top5、Kafka Contributor、LogiKM PMC、华为云MVP。进滴滴技术交流群, 不同技术专家轮流值班, 本号分享Java/大数据/中间件等领域干货和视频
1 参数详解
这里配置的监听器底层调用的是:
ServerSocketAdaptor.bind(SocketAddress local)那么这个说明什么意思呢?
说明你配置的监听器将被用于监听网络请求。简单理解就是你建立监听一个通道,别人能够通过这个通道跟你沟通。所以我们需要设置 IP : Port。
这个属性的格式为:
listeners = listener_name://host_name:port,listener_name2://host_nam2e:port2java.net.InetAddress.getCanonicalHostName()# 这个PLAINTEXT是监听名称,刚好他对应的安全协议就是 PLAINTEXT# 当然这个是可以自定义的, 详细情况 后面的配置listener.security.protocol.maplisteners = PLAINTEXT://your.host.name:9092
啥叫发布公开的监听器?
就是让 Brokers 和 Clients 们都能够知道的监听器。
你想想看,listeners 是 Broker 用来监听网络请求的。那么其他 Broker 或者客户端想要与它通信,则需要知道具体的 IP:PORT 对吧?
所以,为了让别人知道自己的监听器,那么就需要公开出去。当然这个公开的形式是通过 ZooKeeper 来共享数据。
看看 Broker 到 ZooKeeper 节点 /brokers/{brokerid}/ 下面的信息示例:
{"features": {},"listener_security_protocol_map": {"PLAINTEXT": "PLAINTEXT"},"endpoints": ["PLAINTEXT://localhost:9092"],"jmx_port": -1,"port": 9092,"host": "localhost","version": 5,"timestamp": "1647337490945"}
其中 endpoints 就是我们发布出去的监听器。这个属性的格式为:
advertised.listeners = listener_name://host_name:port,listener_name2://host_nam2e:port2listeners=PLAINTEXT://localhost:9092看看上面的配置,PLANINTEXT 是监听器名称。那么它对应的安全协议是什么呢?
它对应的安全协议是 PLANINTEXT。
为什么呢?
那是因为默认情况下已经有了他们的映射关系。
默认集合
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL属性格式
监听名称1:安全协议1,监听名称2:安全协议2现有的安全协议有下面四种,其默认监听器名称映射对应的安全协议情况如下:
当然,也可以自己重新映射监听器名称和安全协议,比如示例 4。
用于 Broker 之间通信的 listener 的名称。如果未设置该参数,则 listener 名称由 security.inter.broker.protocol 定义。security.inter.broker.protocol 默认值是PLAINTEXT。
注意:同时设置这个和 security.inter.broker.protocol 属性是错误的。
默认值:空。不可动态配置。
特别注意:这个属性表示 Broker 之间的网络通信使用的监听器,比如 Broker2Broker。但是,还有一种 Controller2Broker。如果没有配置control.plane.listener.name,那么走的也是 inter.broker.listener.name 监听器。
由于是根据本地配置的监听器名称查找其他 Broker 监听器的 EndPoint,所以一般所有 Broker 的监听器名称都必须一致。否则就找不到具体的 EndPoint,无法正确的发起请求。
用于在代理之间进行通信的安全协议。有效值为 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
注意:同时设置该属性和 inter.broker.listener.name 属性是错误的。
默认值:PLAINTEXT (纯文本)
注意:这个跟 inter.broker.listener.name 是有区别的。这个配置只有四个选项,是安全协议。而 inter.broker.listener.name 是监听名称。需要通过这个监听名称去找到它映射的安全协议以及 IP:PORT。
如果 inter.broker.listener.name 没有配置,则默认使用 security.inter.broker.protocol 的配置。对 inter.broker.listener.name 而言,最终还是要去找到对应的 IP:PORT。
一般自定义了监听器名称,inter.broker.listener.name 就是必须要设置的,不能使用security.inter.broker.protocol 来代替。
用于 Controller 和 Broker 之间通信的监听器名称,Broker 将会使用 control.plane.listener.name 来定位监听器列表中的 EndPoint。
如果未设置,则默认使用 inter.broker.listener.name 来通信,没有专门的链接。
让 Broker 监听 localhost:9092,然后客户端访问 Broker 的具体 IP。
listeners=PLAINTEXT://localhost:9092启动之后查看一下监听情况:
Linux 命令:
netstat -anp |grep 9092Mac 环境命令:
netstat -AaLlnW当然,如果你这台机器刚好还是 Controller 的话,除了了 LISTEN 还能看到 ESTABLISHED 状态的连接。因为 Controller 也会给这台 Broker 建立连接发起请求的,比如通知 Broker 更新元信息之类的。
我们使用生产者客户端来生产几条消息。
sh bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic Topic4## 或者sh bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic Topic4
可以发现正常发送消息。
那么接下来,使用使用具体 IP 发起请求。
sh bin/kafka-console-producer.sh --bootstrap-server 10.xxx.xx.128:9092 --topic Topic4[2022-03-16 12:59:07,024] WARN [Controller id=1000, targetBrokerId=1000] Connection to node 1000(/10.xxx.xxx.xx:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
可以看到,客户端提示说不能跟这个 IP:Port 建立连接。
listeners=PLAINTEXT://xx.xx.xxx.01:9092advertised.listeners=PLAINTEXT:/xx.xx.xxx.02:9092
假设本地监听和发布的监听不一样,那么就会造成其他 Broker 和客户端跟这台 Broker 不能正确的建立链接。
如果当前这台 Broker 刚好还是 Controller,那么他也会对自己建立连接,都是根据 advertised.listeners 的配置来建立的,同样会失败。其他 Broker 也一样。
[2022-03-16 12:59:07,024] WARN [Controller id=1000, targetBrokerId=1000] Connection to node 1000(/10.xxx.xxx.xx:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
在示例 2 中,我们指定 listeners 监听器和 advertised.listeners 发布的监听器不一致会导致异常。那么,我们只需要将监听器的和发布的监听器一致就行了。
当然,我们还可直接设置监听器监听任意可用 IP(该 Broker 上的可用 IP)。
listeners=PLAINTEXT://0.0.0.0:9092当然,如果只是将 host 设置为 0.0.0.0 那么会报错。
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1789)
因为默认情况下不会设置 advertised.listeners,这时会使用 listeners 的属性,然而 advertised.listeners 是不支持 0.0.0.0 的。所以需要指定暴露的监听器,如下:
listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://xx.xx.xx.128:9092
这样子配置就不会报错了,其他 Broker 和客户端会通过 advertised.listeners 发布的监听器来跟该 Broker 建立链接。
注意:这个时候你还可以在这台 Broker 的机器上使用 localhost 来进行访问。比如:
sh bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic Topic4可以看到也是可以正常发送消息的。
listeners = INSIDE://内网IP:9091,OUTSIDE://外网IP:9092#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INSIDE
设置了两个监听器:INSIDE 监听内网 IP,OUTSIDE 监听外网 IP。
因为这个是我们自己定义的监听名称,listener.security.protocol.map默认映射中并没有对应的映射关系 所以我们就需要主动设置这个映射关系 listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:SSL
不然会抛异常。
Caused by: java.lang.IllegalArgumentException: No security protocol defined for listener INSIDEat kafka.cluster.EndPoint$.$anonfun$createEndPoint$2(EndPoint.scala:48)
注意:自定义了监听器,则必须要配置 inter.broker.listener.name
确定好内部的 Broker 之间通信的监听器,并确保能够正常访问。这样 Broker 直接就会通过内网互相连接,客户端除了可以通过内网连接(如果在内网环境的话)也可以通过外网连接。
这种场景一般是自己开发测试的时候。比如自己搭建一个集群学习学习,但是又没有那么多机器,那么就可以在一台电脑上部署多个 Broker。
只配置 listeners 属性
listeners = 监听名称://your.host.name:port关于监听名称,默认的映射关系有四种:
简单一点,用 PLAINTEXT 就够了。这里我们可以把 host 给去掉,或者使用 localhost。
listeners = PLAINTEXT://:port或者
listeners = PLAINTEXT://localhost:port如果没有配置 host 会调用 java.net.InetAddress.getCanonicalHostName() 获取本机 host。默认情况下就是 localhost。
这里之所以建议不填写具体的 host,是因为一般自己搭建练习的时候可能网络 IP 会经常变动(例如家里和公司的网络)。如果绑定了具体 IP,每次重启都要更换配置就很麻烦。
可以看看 Broker 启动后注册到 ZooKeeper 中的配置如下:
{"features": {},"listener_security_protocol_map": {"PLAINTEXT": "PLAINTEXT"},"endpoints": ["PLAINTEXT://localhost:9092"],"jmx_port": -1,"port": 9092,"host": "localhost","version": 5,"timestamp": "1647337490945"}
这个 endpoints 就是 Broker 注册到 ZooKeeper 的访问地址。如果其他 Broker 或者客户端要跟这台 Broker 发生网络请求话,拿的就是这里面的值。
所以你想想看,如果是不同机器上配置的 host 是 localhost 是不是就访问不了了?
当然,listeners 属性的 host,也可以在 hosts 文件里面配置别的域名。配置域名指向的具体 IP,这样的话那还能奏效。只是每个 Broker 和客户端都要配置 host 比较麻烦,还不如直接配置 IP。
这种是绝大部分的场景。一般公司部署集群都是在公司内网环境下 Broker 之间和 Broker 与客户端之间都在同一个网络环境,并且安全协议都是直接 PLAINTEXT(明文)或者其他协议。
listeners=PLAINTEXT://ip:port2.3 内网和外网分流
listeners=INTERNAL://内网ip:port1,EXTERNAL://外网ip:port2#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INTERNAL
配置了两个监听器。每个 Brokerinter.broker.listener.name=INTERNAL 使用内网交流。
其他的客户端,例如 Producer 和 Consumer 请求的时候直接访问外网 IP。
2.4 内网和外网和 Controller 分流
listeners=INTERNAL://内网ip:port1,EXTERNAL://外网ip:port2,CONTROLLER://内网ip:port3,#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INTERNALcontrol.plane.listener.name=CONTROLLER
这样配置:
他们都有独立的网络通信线程。
- EOF -
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️