Kafka 如何设置内外网和 Controller 都进行分流
The following article is from 石臻臻的杂货铺 Author 石臻臻
1 参数详解
1.1 侦听器列表 listeners
这里配置的监听器底层调用的是:
ServerSocketAdaptor.bind(SocketAddress local)那么这个说明什么意思呢?
说明你配置的监听器将被用于监听网络请求。简单理解就是你建立监听一个通道,别人能够通过这个通道跟你沟通。所以我们需要设置 IP : Port。
这个属性的格式为:
listeners = listener_name://host_name:port,listener_name2://host_nam2e:port22) 监听器的名称和端口必须是唯一的,端口相同就冲突了;
3) host_name 如果为空,例如 listeners = ://host_name:port,则会绑定到默认的接口(网卡)。一般情况下是 localhost,底层调用的是:
java.net.InetAddress.getCanonicalHostName()4) 将 host_name 设置为 0.0.0.0 则会绑定所有的网卡。也就是说不管从哪个网卡进入的请求都会被接受处理。
请注意:假如你设置的是 0.0.0.0,那么 advertised.listeners 必须要设置。因为 advertised.listeners 默认情况下使用的是 listeners 的配置发布到 ZooKeeper 中。发布到 ZooKeeper 中是给其他 Brokers/Clients 来跟你通信的。如果设置 0.0.0.0,谁知道要请求哪个 IP 呢?所以它必须要指定并明确 IP:PORT。具体详情请看下面示例3。
5) listener_name 是监听名,唯一值。它并不是安全协议(大部分人都会搞错)。因为默认的 4 个安全协议已经做好了映射,例如 PLAINTEXT ==> PLAINTEXT。
所以会经常看到下面的配置:
## 这个PLAINTEXT是监听名称,刚好他对应的安全协议就是 PLAINTEXT## 当然这个是可以自定义的, 详细情况 后面的配置listener.security.protocol.maplisteners = PLAINTEXT://your.host.name:90921.2 发布公开的监听器 advertised.listeners
啥叫发布公开的监听器?
就是让 Brokers 和 Clients 们都能够知道的监听器。
你想想看,listeners 是 Broker 用来监听网络请求的。那么其他 Broker 或者客户端想要与它通信,则需要知道具体的 IP:PORT 对吧?
所以,为了让别人知道自己的监听器,那么就需要公开出去。当然这个公开的形式是通过 ZooKeeper 来共享数据。
看看 Broker 到 ZooKeeper 节点 /brokers/{brokerid}/ 下面的信息示例:
{ "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://localhost:9092" ], "jmx_port": -1, "port": 9092, "host": "localhost", "version": 5, "timestamp": "1647337490945"}
其中 endpoints 就是我们发布出去的监听器。这个属性的格式为:
advertised.listeners = listener_name://host_name:port,listener_name2://host_nam2e:port2默认情况下 advertised.listeners 不设置会自动使用 listeners 属性; advertised.listeners 不支持 0.0.0.0 这种形式,所以如果 listeners 属性设置成 0.0.0.0,则必须设置 advertised.listeners 属性,具体请看示例 3。由于 0.0.0.0 是表示的是监听 Broker 上任意网卡,将这个发布出去后别的 Broker 和客户端怎么知道具体的 IP 和端口呢? 可以同时配置多个, 并且用逗号隔开; 可动态配置该属性。
1.3 监听器名称和安全协议之间的映射关系集合 listener.security.protocol.map
listeners=PLAINTEXT://localhost:9092看看上面的配置,PLANINTEXT 是监听器名称。那么它对应的安全协议是什么呢?
它对应的安全协议是 PLANINTEXT。
为什么呢?
那是因为默认情况下已经有了他们的映射关系。
默认集合
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL属性格式
监听名称1:安全协议1,监听名称2:安全协议2现有的安全协议有下面四种,其默认监听器名称映射对应的安全协议情况如下:
PLAINTEXT => PLAINTEXT:不需要授权,非加密通道; SSL => SSL:使用 SSL 加密通道; SASL_PLAINTEXT => SASL_PLAINTEXT:使用 SASL 认证非加密通道; SASL_SSL => SASL_SSL:使用 SASL 认证并且 SSL 加密通道。
当然,也可以自己重新映射监听器名称和安全协议,比如示例 4。
1.4 inter.broker.listener.name
用于 Broker 之间通信的 listener 的名称。如果未设置该参数,则 listener 名称由 security.inter.broker.protocol 定义。security.inter.broker.protocol 默认值是PLAINTEXT。
注意:同时设置这个和 security.inter.broker.protocol 属性是错误的。
默认值:空。不可动态配置。
特别注意:这个属性表示 Broker 之间的网络通信使用的监听器,比如 Broker2Broker。但是,还有一种 Controller2Broker。如果没有配置control.plane.listener.name,那么走的也是 inter.broker.listener.name 监听器。
由于是根据本地配置的监听器名称查找其他 Broker 监听器的 EndPoint,所以一般所有 Broker 的监听器名称都必须一致。否则就找不到具体的 EndPoint,无法正确的发起请求。
1.5 security.inter.broker.protocol
用于在代理之间进行通信的安全协议。有效值为 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
注意:同时设置该属性和 inter.broker.listener.name 属性是错误的。
默认值:PLAINTEXT (纯文本)
注意:这个跟 inter.broker.listener.name 是有区别的。这个配置只有四个选项,是安全协议。而 inter.broker.listener.name 是监听名称。需要通过这个监听名称去找到它映射的安全协议以及 IP:PORT。
如果 inter.broker.listener.name 没有配置,则默认使用 security.inter.broker.protocol 的配置。对 inter.broker.listener.name 而言,最终还是要去找到对应的 IP:PORT。
一般自定义了监听器名称,inter.broker.listener.name 就是必须要设置的,不能使用security.inter.broker.protocol 来代替。
1.6 control.plane.listener.name
用于 Controller 和 Broker 之间通信的监听器名称,Broker 将会使用 control.plane.listener.name 来定位监听器列表中的 EndPoint。
如果未设置,则默认使用 inter.broker.listener.name 来通信,没有专门的链接。
1.7 示例说明
示例 1:绑定一个 IP,客户端使用另外的 IP 访问
让 Broker 监听 localhost:9092,然后客户端访问 Broker 的具体 IP。
listeners=PLAINTEXT://localhost:9092启动之后查看一下监听情况:
Linux 命令:
netstat -anp |grep 9092Mac 环境命令:
netstat -AaLlnW当然,如果你这台机器刚好还是 Controller 的话,除了了 LISTEN 还能看到 ESTABLISHED 状态的连接。因为 Controller 也会给这台 Broker 建立连接发起请求的,比如通知 Broker 更新元信息之类的。
我们使用生产者客户端来生产几条消息。
sh bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic Topic4## 或者sh bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic Topic4可以发现正常发送消息。
那么接下来,使用使用具体 IP 发起请求。
sh bin/kafka-console-producer.sh --bootstrap-server 10.xxx.xx.128:9092 --topic Topic4[2022-03-16 12:59:07,024] WARN [Controller id=1000, targetBrokerId=1000] Connection to node 1000 (/10.xxx.xxx.xx:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)可以看到,客户端提示说不能跟这个 IP:Port 建立连接。
示例 2:listeners 和 advertised.listeners 配置的 IP 不一样
listeners=PLAINTEXT://xx.xx.xxx.01:9092advertised.listeners=PLAINTEXT:/xx.xx.xxx.02:9092假设本地监听和发布的监听不一样,那么就会造成其他 Broker 和客户端跟这台 Broker 不能正确的建立链接。
如果当前这台 Broker 刚好还是 Controller,那么他也会对自己建立连接,都是根据 advertised.listeners 的配置来建立的,同样会失败。其他 Broker 也一样。
[2022-03-16 12:59:07,024] WARN [Controller id=1000, targetBrokerId=1000] Connection to node 1000 (/10.xxx.xxx.xx:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)示例 3:listeners 监听任意可用 IP,advertised.listeners 发布指定 IP
在示例 2 中,我们指定 listeners 监听器和 advertised.listeners 发布的监听器不一致会导致异常。那么,我们只需要将监听器的和发布的监听器一致就行了。
当然,我们还可直接设置监听器监听任意可用 IP(该 Broker 上的可用 IP)。
listeners=PLAINTEXT://0.0.0.0:9092当然,如果只是将 host 设置为 0.0.0.0 那么会报错。
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1789)因为默认情况下不会设置 advertised.listeners,这时会使用 listeners 的属性,然而 advertised.listeners 是不支持 0.0.0.0 的。所以需要指定暴露的监听器,如下:
listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://xx.xx.xx.128:9092这样子配置就不会报错了,其他 Broker 和客户端会通过 advertised.listeners 发布的监听器来跟该 Broker 建立链接。
注意:这个时候你还可以在这台 Broker 的机器上使用 localhost 来进行访问。比如:
sh bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic Topic4可以看到也是可以正常发送消息的。
示例 4:listeners 配置多个监听器,内外网分流
listeners = INSIDE://内网IP:9091,OUTSIDE://外网IP:9092
#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INSIDE设置了两个监听器:INSIDE 监听内网 IP,OUTSIDE 监听外网 IP。
因为这个是我们自己定义的监听名称,listener.security.protocol.map默认映射中并没有对应的映射关系 所以我们就需要主动设置这个映射关系 listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:SSL
不然会抛异常。
Caused by: java.lang.IllegalArgumentException: No security protocol defined for listener INSIDE at kafka.cluster.EndPoint$.$anonfun$createEndPoint$2(EndPoint.scala:48)注意:自定义了监听器,则必须要配置 inter.broker.listener.name
确定好内部的 Broker 之间通信的监听器,并确保能够正常访问。这样 Broker 直接就会通过内网互相连接,客户端除了可以通过内网连接(如果在内网环境的话)也可以通过外网连接。
2 几种场景的配置方式
2.1 一台机器部署一套集群
这种场景一般是自己开发测试的时候。比如自己搭建一个集群学习学习,但是又没有那么多机器,那么就可以在一台电脑上部署多个 Broker。
只配置 listeners 属性
listeners = 监听名称://your.host.name:port关于监听名称,默认的映射关系有四种:
PLAINTEXT => PLAINTEXT:不需要授权,非加密通道; SSL => SSL:使用 SSL 加密通道; SASL_PLAINTEXT => SASL_PLAINTEXT:使用 SASL 认证非加密通道; SASL_SSL => SASL_SSL:使用 SASL 认证并且 SSL 加密通道。
简单一点,用 PLAINTEXT 就够了。这里我们可以把 host 给去掉,或者使用 localhost。
listeners = PLAINTEXT://:port或者
listeners = PLAINTEXT://localhost:port如果没有配置 host 会调用 java.net.InetAddress.getCanonicalHostName() 获取本机 host。默认情况下就是 localhost。
这里之所以建议不填写具体的 host,是因为一般自己搭建练习的时候可能网络 IP 会经常变动(例如家里和公司的网络)。如果绑定了具体 IP,每次重启都要更换配置就很麻烦。
可以看看 Broker 启动后注册到 ZooKeeper 中的配置如下:
{ "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://localhost:9092" ], "jmx_port": -1, "port": 9092, "host": "localhost", "version": 5, "timestamp": "1647337490945"}这个 endpoints 就是 Broker 注册到 ZooKeeper 的访问地址。如果其他 Broker 或者客户端要跟这台 Broker 发生网络请求话,拿的就是这里面的值。
所以你想想看,如果是不同机器上配置的 host 是 localhost 是不是就访问不了了?
当然,listeners 属性的 host,也可以在 hosts 文件里面配置别的域名。配置域名指向的具体 IP,这样的话那还能奏效。只是每个 Broker 和客户端都要配置 host 比较麻烦,还不如直接配置 IP。
2.2 内网环境多机器部署集群
这种是绝大部分的场景。一般公司部署集群都是在公司内网环境下 Broker 之间和 Broker 与客户端之间都在同一个网络环境,并且安全协议都是直接 PLAINTEXT(明文)或者其他协议。
listeners=PLAINTEXT://ip:port2.3 内网和外网分流
listeners=INTERNAL://内网ip:port1,EXTERNAL://外网ip:port2
#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INTERNAL配置了两个监听器。每个 Brokerinter.broker.listener.name=INTERNAL 使用内网交流。
其他的客户端,例如 Producer 和 Consumer 请求的时候直接访问外网 IP。
2.4 内网和外网和 Controller 分流
listeners=INTERNAL://内网ip:port1,EXTERNAL://外网ip:port2,CONTROLLER://内网ip:port3,
#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INTERNALcontrol.plane.listener.name=CONTROLLER这样配置:
Controller2Broker 或者 Broker2Controller Broker2Broker Clients2Broker
他们都有独立的网络通信线程。
- EOF -
看完本文有收获?请转发分享给更多人
关注「ImportNew」,提升Java技能
点赞和在看就是最大的支持❤️