周其仁:停止改革,我们将面临三大麻烦

抛开立场观点不谈,且看周小平写一句话能犯多少语病

罗马尼亚的声明:小事件隐藏着大趋势——黑暗中的风:坚持做对的事相信未来的结果

布林肯突访乌克兰,为何选择去吃麦当劳?

中国不再是美国第一大进口国,贸易战殃及纺织业? 美国进一步延长352项中国商品的关税豁免期

生成图片,分享到微信朋友圈

自由微信安卓APP发布,立即下载! | 提交文章网址
查看原文

Kafka 如何设置内外网和 Controller 都进行分流

ImportNew 2022-10-20

The following article is from 石臻臻的杂货铺 Author 石臻臻

1 参数详解


1.1 侦听器列表 listeners


这里配置的监听器底层调用的是:


ServerSocketAdaptor.bind(SocketAddress local)

那么这个说明什么意思呢?


说明你配置的监听器将被用于监听网络请求。简单理解就是你建立监听一个通道,别人能够通过这个通道跟你沟通。所以我们需要设置 IP : Port。


这个属性的格式为:


listeners = listener_name://host_name:port,listener_name2://host_nam2e:port2


1) 可以同时配置多个,并且用逗号隔开;
2) 监听器的名称和端口必须是唯一的,端口相同就冲突了;
3) host_name 如果为空,例如 listeners = ://host_name:port,则会绑定到默认的接口(网卡)。一般情况下是 localhost,底层调用的是:
java.net.InetAddress.getCanonicalHostName()
4) 将 host_name 设置为 0.0.0.0 则会绑定所有的网卡。也就是说不管从哪个网卡进入的请求都会被接受处理。
请注意:假如你设置的是 0.0.0.0,那么 advertised.listeners 必须要设置。因为 advertised.listeners 默认情况下使用的是 listeners 的配置发布到 ZooKeeper 中。发布到 ZooKeeper 中是给其他 Brokers/Clients 来跟你通信的。如果设置 0.0.0.0,谁知道要请求哪个 IP 呢?所以它必须要指定并明确 IP:PORT。具体详情请看下面示例3。

5) listener_name 是监听名,唯一值。它并不是安全协议(大部分人都会搞错)。因为默认的 4 个安全协议已经做好了映射,例如 PLAINTEXT ==> PLAINTEXT。
所以会经常看到下面的配置:


## 这个PLAINTEXT是监听名称,刚好他对应的安全协议就是 PLAINTEXT## 当然这个是可以自定义的, 详细情况 后面的配置listener.security.protocol.maplisteners = PLAINTEXT://your.host.name:9092


6) 可动态配置该属性。


1.2 发布公开的监听器 advertised.listeners


啥叫发布公开的监听器?


就是让 Brokers 和 Clients 们都能够知道的监听器。


你想想看,listeners 是 Broker 用来监听网络请求的。那么其他 Broker 或者客户端想要与它通信,则需要知道具体的 IP:PORT 对吧?


所以,为了让别人知道自己的监听器,那么就需要公开出去。当然这个公开的形式是通过 ZooKeeper 来共享数据。


看看 Broker 到 ZooKeeper 节点 /brokers/{brokerid}/ 下面的信息示例:


{ "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://localhost:9092" ], "jmx_port": -1, "port": 9092, "host": "localhost", "version": 5, "timestamp": "1647337490945"}


其中 endpoints 就是我们发布出去的监听器。这个属性的格式为:


advertised.listeners = listener_name://host_name:port,listener_name2://host_nam2e:port2


  • 默认情况下 advertised.listeners 不设置会自动使用 listeners 属性;
  • advertised.listeners 不支持 0.0.0.0 这种形式,所以如果 listeners 属性设置成 0.0.0.0,则必须设置 advertised.listeners 属性,具体请看示例 3。由于 0.0.0.0 是表示的是监听 Broker 上任意网卡,将这个发布出去后别的 Broker 和客户端怎么知道具体的 IP 和端口呢?
  • 可以同时配置多个, 并且用逗号隔开;
  • 可动态配置该属性。


1.3 监听器名称和安全协议之间的映射关系集合 listener.security.protocol.map


listeners=PLAINTEXT://localhost:9092


看看上面的配置,PLANINTEXT 是监听器名称。那么它对应的安全协议是什么呢?


它对应的安全协议是  PLANINTEXT。


为什么呢?


那是因为默认情况下已经有了他们的映射关系。


默认集合

PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

属性格式


监听名称1:安全协议1,监听名称2:安全协议2

现有的安全协议有下面四种,其默认监听器名称映射对应的安全协议情况如下:


  • PLAINTEXT => PLAINTEXT:不需要授权,非加密通道;
  • SSL => SSL:使用 SSL 加密通道;
  • SASL_PLAINTEXT => SASL_PLAINTEXT:使用 SASL 认证非加密通道;
  • SASL_SSL => SASL_SSL:使用 SASL 认证并且 SSL 加密通道。


当然,也可以自己重新映射监听器名称和安全协议,比如示例 4。


1.4 inter.broker.listener.name


用于 Broker 之间通信的 listener 的名称。如果未设置该参数,则 listener 名称由  security.inter.broker.protocol 定义。security.inter.broker.protocol 默认值是PLAINTEXT。


注意:同时设置这个和 security.inter.broker.protocol 属性是错误的。


默认值:空。不可动态配置。


特别注意:这个属性表示 Broker 之间的网络通信使用的监听器,比如 Broker2Broker。但是,还有一种 Controller2Broker。如果没有配置control.plane.listener.name,那么走的也是 inter.broker.listener.name 监听器。


由于是根据本地配置的监听器名称查找其他 Broker 监听器的 EndPoint,所以一般所有 Broker 的监听器名称都必须一致。否则就找不到具体的 EndPoint,无法正确的发起请求。


1.5 security.inter.broker.protocol


用于在代理之间进行通信的安全协议。有效值为 PLAINTEXTSSLSASL_PLAINTEXTSASL_SSL


注意:同时设置该属性和 inter.broker.listener.name 属性是错误的。


默认值:PLAINTEXT  (纯文本)


注意:这个跟 inter.broker.listener.name 是有区别的。这个配置只有四个选项,是安全协议。而 inter.broker.listener.name 是监听名称。需要通过这个监听名称去找到它映射的安全协议以及 IP:PORT。


如果 inter.broker.listener.name 没有配置,则默认使用 security.inter.broker.protocol 的配置。对 inter.broker.listener.name 而言,最终还是要去找到对应的 IP:PORT。


一般自定义了监听器名称,inter.broker.listener.name 就是必须要设置的,不能使用security.inter.broker.protocol 来代替。


1.6 control.plane.listener.name


用于 Controller 和 Broker 之间通信的监听器名称,Broker 将会使用 control.plane.listener.name 来定位监听器列表中的 EndPoint。


如果未设置,则默认使用 inter.broker.listener.name 来通信,没有专门的链接。


1.7 示例说明


示例 1:绑定一个 IP,客户端使用另外的 IP 访问


让 Broker 监听 localhost:9092,然后客户端访问 Broker 的具体 IP。


listeners=PLAINTEXT://localhost:9092

启动之后查看一下监听情况:


Linux 命令:


netstat -anp |grep 9092

Mac 环境命令:


netstat -AaLlnW



当然,如果你这台机器刚好还是 Controller 的话,除了了 LISTEN 还能看到 ESTABLISHED 状态的连接。因为 Controller 也会给这台 Broker 建立连接发起请求的,比如通知 Broker 更新元信息之类的。




我们使用生产者客户端来生产几条消息。


sh bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092  --topic Topic4## 或者sh bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic Topic4


可以发现正常发送消息。


那么接下来,使用使用具体 IP 发起请求。


sh bin/kafka-console-producer.sh --bootstrap-server 10.xxx.xx.128:9092 --topic Topic4


[2022-03-16 12:59:07,024] WARN [Controller id=1000, targetBrokerId=1000] Connection to node 1000 (/10.xxx.xxx.xx:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)


可以看到,客户端提示说不能跟这个 IP:Port 建立连接。


示例 2:listeners 和 advertised.listeners 配置的 IP 不一样


listeners=PLAINTEXT://xx.xx.xxx.01:9092advertised.listeners=PLAINTEXT:/xx.xx.xxx.02:9092

假设本地监听和发布的监听不一样,那么就会造成其他 Broker 和客户端跟这台 Broker 不能正确的建立链接。


如果当前这台 Broker 刚好还是 Controller,那么他也会对自己建立连接,都是根据 advertised.listeners 的配置来建立的,同样会失败。其他 Broker 也一样。


[2022-03-16 12:59:07,024] WARN [Controller id=1000, targetBrokerId=1000] Connection to node 1000 (/10.xxx.xxx.xx:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

示例 3:listeners 监听任意可用 IP,advertised.listeners 发布指定 IP


在示例 2 中,我们指定 listeners 监听器和 advertised.listeners 发布的监听器不一致会导致异常。那么,我们只需要将监听器的和发布的监听器一致就行了。


当然,我们还可直接设置监听器监听任意可用 IP(该 Broker 上的可用 IP)。


listeners=PLAINTEXT://0.0.0.0:9092

当然,如果只是将 host 设置为 0.0.0.0 那么会报错。


java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address. at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1789)


因为默认情况下不会设置 advertised.listeners,这时会使用 listeners 的属性,然而 advertised.listeners 是不支持 0.0.0.0 的。所以需要指定暴露的监听器,如下:


listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://xx.xx.xx.128:9092

这样子配置就不会报错了,其他 Broker 和客户端会通过 advertised.listeners 发布的监听器来跟该 Broker 建立链接。


注意:这个时候你还可以在这台 Broker 的机器上使用 localhost 来进行访问。比如:


sh bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic Topic4


可以看到也是可以正常发送消息的。


示例 4listeners 配置多个监听器,内外网分流


listeners = INSIDE://内网IP:9091,OUTSIDE://外网IP:9092
#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INSIDE


设置了两个监听器:INSIDE 监听内网 IP,OUTSIDE 监听外网 IP。


因为这个是我们自己定义的监听名称,listener.security.protocol.map默认映射中并没有对应的映射关系 所以我们就需要主动设置这个映射关系 listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:SSL

不然会抛异常。


Caused by: java.lang.IllegalArgumentException: No security protocol defined for listener INSIDE at kafka.cluster.EndPoint$.$anonfun$createEndPoint$2(EndPoint.scala:48)

注意:自定义了监听器,则必须要配置 inter.broker.listener.name 

确定好内部的 Broker 之间通信的监听器,并确保能够正常访问。这样 Broker 直接就会通过内网互相连接,客户端除了可以通过内网连接(如果在内网环境的话)也可以通过外网连接。


2 几种场景的配置方式


2.1 一台机器部署一套集群


这种场景一般是自己开发测试的时候。比如自己搭建一个集群学习学习,但是又没有那么多机器,那么就可以在一台电脑上部署多个 Broker。


只配置 listeners 属性


listeners = 监听名称://your.host.name:port


关于监听名称,默认的映射关系有四种:


  • PLAINTEXT => PLAINTEXT:不需要授权,非加密通道;
  • SSL => SSL:使用 SSL 加密通道;
  • SASL_PLAINTEXT => SASL_PLAINTEXT:使用 SASL 认证非加密通道;
  • SASL_SSL => SASL_SSL:使用 SASL 认证并且 SSL 加密通道。


简单一点,用 PLAINTEXT 就够了。这里我们可以把 host 给去掉,或者使用 localhost。


listeners = PLAINTEXT://:port


或者


listeners = PLAINTEXT://localhost:port


如果没有配置 host 会调用 java.net.InetAddress.getCanonicalHostName() 获取本机 host。默认情况下就是 localhost。


这里之所以建议不填写具体的 host,是因为一般自己搭建练习的时候可能网络 IP 会经常变动(例如家里和公司的网络)。如果绑定了具体 IP,每次重启都要更换配置就很麻烦。


可以看看 Broker 启动后注册到 ZooKeeper 中的配置如下:


{ "features": {}, "listener_security_protocol_map": { "PLAINTEXT": "PLAINTEXT" }, "endpoints": [ "PLAINTEXT://localhost:9092" ], "jmx_port": -1, "port": 9092, "host": "localhost", "version": 5, "timestamp": "1647337490945"}

这个 endpoints 就是 Broker 注册到 ZooKeeper 的访问地址。如果其他 Broker 或者客户端要跟这台 Broker 发生网络请求话,拿的就是这里面的值。


所以你想想看,如果是不同机器上配置的 host 是 localhost 是不是就访问不了了?


当然,listeners 属性的 host,也可以在 hosts 文件里面配置别的域名。配置域名指向的具体 IP,这样的话那还能奏效。只是每个 Broker 和客户端都要配置 host 比较麻烦,还不如直接配置 IP。


2.2 内网环境多机器部署集群


这种是绝大部分的场景。一般公司部署集群都是在公司内网环境下 Broker 之间和 Broker 与客户端之间都在同一个网络环境,并且安全协议都是直接 PLAINTEXT(明文)或者其他协议。


listeners=PLAINTEXT://ip:port


2.3 内网和外网分流


listeners=INTERNAL://内网ip:port1,EXTERNAL://外网ip:port2
#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INTERNAL

配置了两个监听器。每个 Brokerinter.broker.listener.name=INTERNAL 使用内网交流。


其他的客户端,例如 Producer 和 Consumer 请求的时候直接访问外网 IP。


2.4 内网和外网和 Controller 分流


listeners=INTERNAL://内网ip:port1,EXTERNAL://外网ip:port2,CONTROLLER://内网ip:port3,
#把OUTSIDE 的安全协议映射成PLAINTEXT INSIDE也映射成PLAINTEXTlistener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
# Broker之间的连接用 INSIDE 监听器inter.broker.listener.name=INTERNALcontrol.plane.listener.name=CONTROLLER

这样配置:


  • Controller2Broker 或者 Broker2Controller
  • Broker2Broker
  • Clients2Broker


他们都有独立的网络通信线程。


- EOF -

推荐阅读  点击标题可跳转

1、图解 Kafka Producer 内存池架构设计

2、刨根问底,Kafka消息中间件到底会不会丢消息

3、八大步骤带你深度剖析 Kafka 生产级容量评估方案


看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

点赞和在看就是最大的支持❤️


文章有问题?点此查看未经处理的缓存