如何使用 Kubernetes 部署 Flink 应用
关键词:Flink
概述
编译并打包 Flink 脚本 Jar 文件;
构建 Docker 容器镜像,添加 Flink 运行时库和上述 Jar 包;
使用 Kubernetes Job 部署 Flink JobManager 组件;
使用 Kubernetes Service 将 JobManager 服务端口开放到集群中;
使用 Kubernetes Deployment 部署 Flink TaskManager;
配置 Flink JobManager 高可用,需使用 ZooKeeper 和 HDFS;
借助 Flink SavePoint 机制来停止和恢复脚本。
Kubernetes 实验环境
如果手边没有 K8s 实验环境,我们可以用 Minikube 快速搭建一个,以 MacOS 系统为例:安装 VirtualBox,Minikube 将在虚拟机中启动 K8s 集群;
下载 Minikube 程序,权限修改为可运行,并加入到 PATH 环境变量中;
执行 minikube start,该命令会下载虚拟机镜像,安装 kubelet 和 kubeadm 程序,并构建一个完整的 K8s 集群。如果你在访问网络时遇到问题,可以配置一个代理,并告知 Minikube 使用它;
下载并安装 kubectl 程序,Minikube 已经将该命令指向虚拟机中的 K8s 集群了,所以可以直接运行 kubectl get pods -A 来显示当前正在运行的 K8s Pods:
NAMESPACE NAME READY STATUS RESTARTS AGE
kube-system kube-apiserver-minikube 1/1 Running 0 16m
kube-system etcd-minikube 1/1 Running 0 15m
kube-system coredns-5c98db65d4-d4t2h 1/1 Running 0 17m
Flink实时处理脚本示例
我们可以编写一个简单的实时处理脚本,该脚本会从某个端口中读取文本,分割为单词,并且每 5 秒钟打印一次每个单词出现的次数。以下代码是从 Flink 官方文档 上获取来的,完整的示例项目可以到 GitHub 上查看。DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("192.168.99.1", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
构建 Docker 容器镜像
Flink 提供了一个官方的容器镜像,可以从 DockerHub 上下载。我们将以这个镜像为基础,构建独立的脚本镜像,将打包好的 Jar 文件放置进去。此外,新版 Flink 已将 Hadoop 依赖从官方发行版中剥离,因此我们在打镜像时也需要包含进去。将 OpenJDK 1.8 作为基础镜像;
下载并安装 Flink 至 /opt/flink 目录中;
添加 flink 用户和组;
指定入口文件,不过我们会在 K8s 配置中覆盖此项。
FROM openjdk:8-jre
ENV FLINK_HOME=/opt/flink
WORKDIR $FLINK_HOME
RUN useradd flink && \
wget -O flink.tgz "$FLINK_TGZ_URL" && \
tar -xf flink.tgz
ENTRYPOINT ["/docker-entrypoint.sh"]
FROM flink:1.8.1-scala_2.12
ARG hadoop_jar
ARG job_jar
COPY --chown=flink:flink $hadoop_jar $job_jar $FLINK_HOME/lib/
USER flink
$ brew install docker
$ eval $(minikube docker-env)
$ cd /path/to/Dockerfile
$ cp /path/to/flink-shaded-hadoop-2-uber-2.8.3-7.0.jar hadoop.jar
$ cp /path/to/flink-on-kubernetes-0.0.1-SNAPSHOT-jar-with-dependencies.jar job.jar
$ docker build --build-arg hadoop_jar=hadoop.jar --build-arg job_jar=job.jar --tag flink-on-kubernetes:0.0.1 .
$ docker image ls
REPOSITORY TAG IMAGE ID CREATED SIZE
flink-on-kubernetes 0.0.1 505d2f11cc57 10 seconds ago 618MB
部署 JobManager
首先,我们通过创建 Kubernetes Job 对象来部署 Flink JobManager。Job 和 Deployment 是 K8s 中两种不同的管理方式,他们都可以通过启动和维护多个 Pod 来执行任务。不同的是,Job 会在 Pod 执行完成后自动退出,而 Deployment 则会不断重启 Pod,直到手工删除。Pod 成功与否是通过命令行返回状态判断的,如果异常退出,Job 也会负责重启它。因此,Job 更适合用来部署 Flink 应用,当我们手工关闭一个 Flink 脚本时,K8s 就不会错误地重新启动它。以下是 jobmanager.yml 配置文件:apiVersion: batch/v1
kind: Job
metadata:
name: ${JOB}-jobmanager
spec:
template:
metadata:
labels:
app: flink
instance: ${JOB}-jobmanager
spec:
restartPolicy: OnFailure
containers:
- name: jobmanager
image: flink-on-kubernetes:0.0.1
command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
"-Djobmanager.rpc.address=${JOB}-jobmanager",
"-Dparallelism.default=1",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob
- containerPort: 6125
name: query
- containerPort: 8081
name: ui
${JOB} 变量可以使用 envsubst 命令来替换,这样同一份配置文件就能够为多个脚本使用了;
容器的入口修改为了 standalone-job.sh,这是 Flink 的官方脚本,会以前台模式启动 JobManager,扫描类加载路径中的 Main-Class 作为脚本入口,我们也可以使用 -j 参数来指定完整的类名。之后,这个脚本会被自动提交到集群中。
JobManager 的 RPC 地址修改为了 Kubernetes Service 的名称,我们将在下文创建。集群中的其他组件将通过这个名称来访问 JobManager。
Flink Blob Server & Queryable State Server 的端口号默认是随机的,为了方便将其开放到集群中,我们修改为了固定端口。
kubectl
命令创建对象,并查看状态:$ export JOB=flink-on-kubernetes
$ envsubst <jobmanager.yml | kubectl create -f -
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-on-kubernetes-jobmanager-kc4kq 1/1 Running 0 2m26s
service.yml
apiVersion: v1
kind: Service
metadata:
name: ${JOB}-jobmanager
spec:
selector:
app: flink
instance: ${JOB}-jobmanager
type: NodePort
ports:
- name: rpc
port: 6123
- name: blob
port: 6124
- name: query
port: 6125
- name: ui
port: 8081
type: NodePort
是必要的,因为通过这项配置,我们可以在 K8s 集群之外访问 JobManager UI 和 RESTful API。$ envsubst <service.yml | kubectl create -f -
$ kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-on-kubernetes-jobmanager NodePort 10.109.78.143 <none> 6123:31476/TCP,6124:32268/TCP,6125:31602/TCP,8081:31254/TCP 15m
$ minikube service $JOB-jobmanager --urlhttp://192.168.99.108:31476http://192.168.99.108:32268http://192.168.99.108:31602http://192.168.99.108:31254
部署 TaskManager
taskmanager.ymlapiVersion: apps/v1
kind: Deployment
metadata:
name: ${JOB}-taskmanager
spec:
selector:
matchLabels:
app: flink
instance: ${JOB}-taskmanager
replicas: 1
template:
metadata:
labels:
app: flink
instance: ${JOB}-taskmanager
spec:
containers:
- name: taskmanager
image: flink-on-kubernetes:0.0.1
command: ["/opt/flink/bin/taskmanager.sh"]
args: ["start-foreground", "-Djobmanager.rpc.address=${JOB}-jobmanager"]
$ nc -lk 9999
hello world
hello flink
$ kubectl logs -f -l instance=$JOB-taskmanager
(hello,2)
(flink,1)
(world,1)
开启高可用模式
可用性方面,上述配置中的 TaskManager 如果发生故障退出,K8s 会自动进行重启,Flink 会从上一个 Checkpoint 中恢复工作。但是,JobManager 仍然存在单点问题,因此需要开启 HA 模式,配合 ZooKeeper 和分布式文件系统(如 HDFS)来实现 JobManager 的高可用。在独立集群中,我们需要运行多个 JobManager,作为主备服务器。然而在 K8s 模式下,我们只需开启一个 JobManager,当其异常退出后,K8s 会负责重启,新的 JobManager 将从 ZooKeeper 和 HDFS 中读取最近的工作状态,自动恢复运行。command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
"-Djobmanager.rpc.address=${JOB}-jobmanager",
"-Dparallelism.default=1",
"-Dblob.server.port=6124",
"-Dqueryable-state.server.ports=6125",
"-Dhigh-availability=zookeeper",
"-Dhigh-availability.zookeeper.quorum=192.168.99.1:2181",
"-Dhigh-availability.zookeeper.path.root=/flink",
"-Dhigh-availability.cluster-id=/${JOB}",
"-Dhigh-availability.storageDir=hdfs://192.168.99.1:9000/flink/recovery",
"-Dhigh-availability.jobmanager.port=6123",
]
command: ["/opt/flink/bin/taskmanager.sh"]
args: ["start-foreground",
"-Dhigh-availability=zookeeper",
"-Dhigh-availability.zookeeper.quorum=192.168.99.1:2181",
"-Dhigh-availability.zookeeper.path.root=/flink",
"-Dhigh-availability.cluster-id=/${JOB}",
"-Dhigh-availability.storageDir=hdfs://192.168.99.1:9000/flink/recovery",
]
准备好 ZooKeeper 和 HDFS 测试环境,该配置中使用的是宿主机上的 2181 和 9000 端口;
Flink 集群基本信息会存储在 ZooKeeper 的 /flink/${JOB} 目录下;
Checkpoint 数据会存储在 HDFS 的 /flink/recovery 目录下。使用前,请先确保 Flink 有权限访问 HDFS 的 /flink 目录;
jobmanager.rpc.address 选项从 TaskManager 的启动命令中去除了,是因为在 HA 模式下,TaskManager 会通过访问 ZooKeeper 来获取到当前 JobManager 的连接信息。需要注意的是,HA 模式下的 JobManager RPC 端口默认是随机的,我们需要使用 high-availability.jobmanager.port 配置项将其固定下来,方便在 K8s Service 中开放。
管理Flink脚本
我们可以通过 RESTful API 来与 Flink 集群交互,其端口号默认与 Dashboard UI 一致。在宿主机上安装 Flink 命令行工具,传入-m
参数来指定目标集群:$ bin/flink list -m 192.168.99.108:30206
------------------ Running/Restarting Jobs -------------------
24.08.2019 12:50:28 : 00000000000000000000000000000000 : Window WordCount (RUNNING)
--------------------------------------------------------------
00000000000000000000000000000000
,我们可以使用这个 ID 来手工停止脚本,并生成一个 SavePoint 快照:$ bin/flink cancel -m 192.168.99.108:30206 -s hdfs://192.168.99.1:9000/flink/savepoints/ 00000000000000000000000000000000
Cancelled job 00000000000000000000000000000000. Savepoint stored in hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c.
$ kubectl get job
NAME COMPLETIONS DURATION AGE
flink-on-kubernetes-jobmanager 1/1 4m40s 7m14s
$ kubectl delete job $JOB-jobmanager
$ kubectl delete deployment $JOB-taskmanager
--fromSavepoint
参数:command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
...
"--fromSavepoint", "${SAVEPOINT}",
]
$ export SAVEPOINT=hdfs://192.168.99.1:9000/flink/savepoints/savepoint-000000-f776c8e50a0c
$ envsubst <jobmanager-savepoint.yml | kubectl create -f -
扩容
有两种方式可以对 Flink 脚本进行扩容。第一种方式是用上文提到的 SavePoint 机制:手动关闭脚本,并使用新的 replicas 和 parallelism.default 参数进行重启;另一种方式则是使用 flink modify 命令行工具,该工具的工作机理和人工操作类似,也是先用 SavePoint 停止脚本,然后以新的并发度启动。在使用第二种方式前,我们需要在启动命令中指定默认的 SavePoint 路径:command: ["/opt/flink/bin/standalone-job.sh"]
args: ["start-foreground",
...
"-Dstate.savepoints.dir=hdfs://192.168.99.1:9000/flink/savepoints/",
]
kubectl scale
命令调整 TaskManager 的个数;$ kubectl scale --replicas=2 deployment/$JOB-taskmanager
deployment.extensions/flink-on-kubernetes-taskmanager scaled
flink modify
调整脚本并发度:$ bin/flink modify 755877434b676ce9dae5cfb533ed7f33 -m 192.168.99.108:30206 -p 2
Modify job 755877434b676ce9dae5cfb533ed7f33.
Rescaled job 755877434b676ce9dae5cfb533ed7f33. Its new parallelism is 2.
flink modify
命令来对 HA 模式下的 Flink 集群进行扩容,因此还请使用人工的方式操作。 Flink 将原生支持 Kubernetes
Flink 有着非常活跃的开源社区,他们不断改进自身设计(FLIP-6),以适应现今的云原生环境。他们也注意到了 Kubernetes 的蓬勃发展,对 K8s 集群的原生支持也在开发中。我们知道,Flink 可以直接运行在 YARN 或 Mesos 资源管理框架上。以 YARN 为例,Flink 首先启动一个 ApplicationMaster,作为 JobManager,分析提交的脚本需要多少资源,并主动向 YARN ResourceManager 申请,开启对应的 TaskManager。当脚本的并行度改变后,Flink 会自动新增或释放 TaskManager 容器,达到扩容缩容的目的。这种主动管理资源的模式,社区正在开发针对 Kubernetes 的版本(FLINK-9953),今后我们便可以使用简单的命令来将 Flink 部署到 K8s 上了。作者:薄荷脑https://blog.csdn.net/zjerryj/article/details/100063858
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html
https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/
https://jobs.zalando.com/tech/blog/running-apache-flink-on-kubernetes/
https://www.slideshare.net/tillrohrmann/redesigning-apache-flinks-distributed-architecture-flink-forward-2017
https://www.slideshare.net/tillrohrmann/future-of-apache-flink-deployments-containers-kubernetes-and-more-flink-forward-2019-sf
END
关注我
公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章
Flink 实战
1、《从0到1学习Flink》—— Apache Flink 介绍
2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门
3、《从0到1学习Flink》—— Flink 配置文件详解
4、《从0到1学习Flink》—— Data Source 介绍
5、《从0到1学习Flink》—— 如何自定义 Data Source ?
6、《从0到1学习Flink》—— Data Sink 介绍
7、《从0到1学习Flink》—— 如何自定义 Data Sink ?
8、《从0到1学习Flink》—— Flink Data transformation(转换)
9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows
10、《从0到1学习Flink》—— Flink 中的几种 Time 详解
11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch
12、《从0到1学习Flink》—— Flink 项目如何运行?
13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka
14、《从0到1学习Flink》—— Flink JobManager 高可用性配置
15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍
16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL
17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ
18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了
19、大数据“重磅炸弹”——实时计算框架 Flink
20、《Flink 源码解析》—— 源码编译运行
21、为什么说流处理即未来?
22、OPPO数据中台之基石:基于Flink SQL构建实数据仓库
23、流计算框架 Flink 与 Storm 的性能对比
24、Flink状态管理和容错机制介绍
25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理
26、Apache Flink 是如何管理好内存的?
27、《从0到1学习Flink》——Flink 中这样管理配置,你知道?
28、《从0到1学习Flink》——Flink 不可以连续 Split(分流)?
29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文
30、360深度实践:Flink与Storm协议级对比
31、Apache Flink 1.9 重大特性提前解读
32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了
33、美团点评基于 Flink 的实时数仓建设实践
34、Flink 灵魂两百问,这谁顶得住?
35、一文搞懂 Flink 的 Exactly Once 和 At Least Once
36、你公司到底需不需要引入实时计算引擎?
37、Flink 从0到1学习 —— 如何使用 Side Output 来分流?
38、一文让你彻底了解大数据实时计算引擎 Flink
39、基于 Flink 实现的商品实时推荐系统(附源码)
40、如何使用 Flink 每天实时处理百亿条日志?
41、Flink 在趣头条的应用与实践
42、Flink Connector 深度解析
43、滴滴实时计算发展之路及平台架构实践
44、Flink Back Pressure(背压)是怎么实现的?有什么绝妙之处?
45、Flink 实战 | 贝壳找房基于Flink的实时平台建设