大数据运维实战系列105:Aws Flink 平滑迁移K8S
1. Flink One K8S
Flink on K8s,作为Apache Flink的一种高效部署模式,将流处理引擎Flink与容器编排系统Kubernetes(简称K8s)相结合,实现了数据处理的高可用、高性能和高弹性。Flink,作为一个具备高效流处理和批处理能力的引擎,与K8s的自动化容器部署、扩展和管理功能相得益彰。在K8s上运行Flink集群,不仅可以充分利用K8s的自动伸缩机制根据业务需求和资源利用率动态调整集群节点数量,还可以借助K8s的故障转移和副本机制提升集群的可用性。
Flink on K8s作为一种高效、灵活、可靠的数据处理方案,已经广泛应用于金融、电商、智能制造和物联网等多个领域。它不仅能够处理实时数据流,实现流式数据仓库的功能,还能进行实时监控和预警,以及底层大数据处理。在未来的数据处理场景中,Flink on K8s将继续发挥其独特的优势,助力企业实现数字化转型和智能化升级。
2. Flink-Operator
Flink-Operator是一个用于自动化管理Apache Flink部署生命周期的工具,它基于Java Operator SDK构建,并与Kubernetes原生集成。Flink-Operator适用于需要自动化管理Flink部署的大型生产环境。它可以帮助企业降低运维成本,提高部署效率和可靠性。具有一下特点:
自动化管理:Flink-Operator能够自动化管理Flink部署的生命周期,减少了人工干预的需要。
集成Kubernetes:与Kubernetes原生集成,使得Flink应用程序可以轻松地部署在Kubernetes集群上。
可定制性:通过配置选项,用户可以根据自己的需求定制Flink-Operator的行为。
高性能:通过优化算子链(Operator Chain)等机制,提高了Flink作业的执行性能。
参考:https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/
安装步骤:资源下载
SQL
#安装cert
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
#安装资源
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
#下载本地执行/可选
helm pull flink-operator-repo/flink-kubernetes-operator
安装
SQL
helm --namespace flink install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
#本地安装
helm --namespace flink install flink-kubernetes-operator .
# 更新
# helm --namespace flink upgrade flink-kubernetes-operator .
#查看是否安装
% kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-d8546dfff-l9t8w 2/2 Running 0 38s
3. Demo案例验证
运行官网demo测试看看有安装成功没。
1.下载demo案例
#SQL
wget https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.8/examples/basic.yaml
2.demo 案例
修改下对应版本和img信息,默认是17;修改namespace
SQL
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
namespace: flink
spec:
image: flink:1.19
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: stateless
3.查看pod与svc
SQL
maoshu@maoshu-MacBook flink-examples % kubectl get pods
NAME READY STATUS RESTARTS AGE
basic-example-d86f5478-d5bz8 1/1 Running 0 2m4s
basic-example-taskmanager-1-1 1/1 Running 0 101s
flink-kubernetes-operator-d8546dfff-l9t8w 2/2 Running 0 10m
maoshu@maoshu-MacBook flink-examples % kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
basic-example ClusterIP None <none> 6123/TCP,6124/TCP 2m8s
basic-example-rest ClusterIP 172.20.67.39 <none> 8081/TCP 2m8s
flink-operator-webhook-service ClusterIP 172.20.148.23 <none> 443/TCP 10m
将svc映射到本地,进行网页查看
SQL
% kubectl port-forward svc/basic-example-rest 8081
Forwarding from 127.0.0.1:8081 -> 8081
Forwarding from [::1]:8081 -> 8081
打开web页面,进行查看
1. 创建img
创建一个制作image的目录,并创建Dockerfile的制作文件,文件案例
SQL
FROM --platform=linux/amd64 flink:1.19.0
ADD WordCount.jar /opt/flink/examples/WordCount.jar
使用flink 1.19.0 版本,将我们的jar包拷贝进镜像中。如果有依赖,需要同步拷贝依赖进入到/opt/flink/libs 中;
制作镜像
SQL
docker build -t flink .
2. 推送img到ecr
此处,使用的是awss的ecr镜像,也可以使用其他同类服务替代。在AWS ECR中添加镜像服务器信息,并推送;
登陆ECR
SQL
aws ecr get-login-password --region ap-southeast-1 | docker login --username AWS --password-stdin ***.dkr.ecr.ap-southeast-1.amazonaws.com
将步骤一制作的镜像重名
SQL
%docker tag flink:laste **.dkr.ecr.ap-southeast-1.amazonaws.com/flink:1.19.0
%docker images |grep flink
推送镜像到ecr
SQL
docker push ***.dkr.ecr.ap-southeast-1.amazonaws.com/flink:1.19.0
3. 创建eks任务
创建任务yaml文档,可参考demo1案例
SQL
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-pulsar-demo
namespace: flink
spec:
image: ***.dkr.ecr.ap-southeast-1.amazonaws.com/flink:1.19.0
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "4096m"
cpu: 2
taskManager:
resource:
memory: "4096m"
cpu: 2
job:
jarURI: local:///opt/flink/examples/WordCount.jar
parallelism: 2
upgradeMode: stateless
运行任务:kubectl apply -f flink-pulsar-demo.yaml
4. 查看任务
查看任务
4. 迁移aws任务
在aws中的flink任务中,查询配置文档,找到其中你的��S3 中jar配置
编译好的jar包S3地址:https://f***-ap-southeast-1-***-bucket.s3.ap-southeast-1.amazonaws.com/d***-0.1.1-SNAPSHOT.jar
将该代码包或者jar包下载到本地,制作image镜像并上传。Dockerfile文件如下,需要添加依赖包。
SQL
FROM --platform=linux/amd64 flink:1.19.0
#添加运行jar包
ADD *******-0.1.1-SNAPSHOT.jar /opt/flink/*******-0.1.1-SNAPSHOT.jar
#添加依赖包
ADD aws-kinesisanalytics-runtime-1.2.0.jar /opt/flink/lib/
制作image包,并上传ecr
SQL
#docker 更新
docker build -t flink:******* .
docker tag flink:******* *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:*******_v1
docker push *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:*******_v1
启用任务yaml参考,其中jar包路径和版本信息,根据步骤4.1中确定
SQL
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-**-**-**
namespace: flink
spec:
image: *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:*******_v1
flinkVersion: v1_19
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "4096m"
cpu: 2
taskManager:
resource:
memory: "4096m"
cpu: 2
job:
jarURI: local:///opt/flink/*******-0.1.1-SNAPSHOT.jar
parallelism: 2
upgradeMode: stateless
SQL
# eks 启动
kubectl delete -f flink-demo.yaml
kubectl apply -f flink-demo.yaml
启动完毕后,查看任务,并执行网络映射查看
SQL
% kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-**-**-**-7ffc6bf84f-6d9t8 1/1 Running 0 11m
flink-**-**-**-taskmanager-1-1 1/1 Running 0 11m
flink-**-**-**-taskmanager-1-2 1/1 Running 0 11m
flink-**-**-**-taskmanager-1-3 1/1 Running 0 11m
% kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-**-**-** ClusterIP None <none> 6123/TCP,6124/TCP 11m
flink-**-**-**-rest ClusterIP 172.20.42.151 <none> 8081/TCP 11m
% kubectl port-forward svc/flink-**-**-**-rest 8081
Forwarding from 127.0.0.1:8081 -> 8081
Forwarding from [::1]:8081 -> 8081
web页面查看:http://127.0.0.1:8081 完美收官
5. Flink 本地配置
此处,主要是迁移aws上flink中的config配置。
创建configmap,程序使用的是kinesisanalytics进行配置的读取,读取的逻辑是读取config.properties文件,默认config.properties文件中的内容如下:
数据案例:
Shell
[
{
"PropertyGroupId": "Stream1",
"PropertyMap": {
"Region": "us-east-1"
}
},
{
"PropertyGroupId": "Stream2",
"PropertyMap": {
"Region": "us-east-2",
"Offset": "latest"
}
}
]
对此,参考aws上flink任务配置,迁移到configmap配置案例如下:
YAML
# ConfigMap for Flink configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: cp-**-**-config
namespace: flink
data:
application_properties.json: |-
[{"PropertyGroupId":"RealtimeCoupon",
"PropertyMap":{
"coupon.job.window-process": "window",
"dos.coupon.topic": "**-**-**-loadtest",
********
"sink.parallelism": 6,
"source.parallelism":8,
"window.parallelism":12}
}]
YAML
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: cp-**-**
namespace: flink
labels:
deploy.artifact.io/kind: 'flink'
deploy.artifact.io/name: 'cp-**-**'
spec:
image: *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:uat-cp-**-**-v2
imagePullPolicy: Always
flinkVersion: v1_19
flinkConfiguration:
jobmanager.memory.process.size: "1g"
kubernetes.jobmanager.memory.limit-factor: "1.5"
kubernetes.jobmanager.cpu.amount: "1"
kubernetes.jobmanager.cpu.limit-factor: "8"
taskmanager.memory.process.size: "1g"
kubernetes.taskmanager.memory.limit-factor: "1.33"
kubernetes.taskmanager.cpu.amount: "1"
kubernetes.taskmanager.cpu.limit-factor: "8"
parallelism.default: "4"
taskmanager.data.port: "12000"
high-availability.storageDir: s3://flink-**-**/cp-**-**/ha
jobmanager.archive.fs.dir: s3://flink-**-**/cp-**-**/completed-jobs
historyserver.archive.fs.dir: s3://flink-**-**/cp-**-**/completed-jobs
state.backend.type: hashmap
state.checkpoints.dir: s3://flink-**-**/cp-**-**/checkpoint/
state.savepoints.dir: s3://flink-**-**/cp-**-**/savepoint/
serviceAccount: flink
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
jobManager:
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: cp-**-**-jm
namespace: flink
labels:
deploy.artifact.io/kind: 'flink'
deploy.artifact.io/name: 'cp-**-**'
spec:
containers:
# Do not change the main container name
- name: flink-main-container
ports:
- name: rpc
containerPort: 6123
- name: blob
containerPort: 6124
- name: ui
containerPort: 8081
- name: metric
containerPort: 9249
env:
- name: module
value: 'cp-**-**'
- name: ENV
value: 'dev'
- name: FLINK_VER
value: '1.19.0'
readinessProbe:
httpGet:
path: /
port: 8081
initialDelaySeconds: 30
volumeMounts:
- name: cp-**-**-config
mountPath: /etc/flink
volumes:
- name: cp-**-**-config
configMap:
name: cp-**-**-config
taskManager:
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: cp-**-**-tm
namespace: flink
labels:
deploy.artifact.io/kind: 'flink'
deploy.artifact.io/name: 'cp-**-**'
spec:
serviceAccountName: cp-**-**
containers:
- name: flink-main-container
ports:
- name: data
containerPort: 6121
- name: rpc
containerPort: 6122
- name: metric
containerPort: 9249
env:
- name: module
value: 'cp-**-**'
- name: ENV
value: 'dev'
- name: FLINK_VER
value: '1.19.0'
volumeMounts:
- name: cp-**-**-config
mountPath: /etc/flink
volumes:
- name: cp-**-**-config
configMap:
name: cp-**-**-config
job:
jarURI: local:///opt/flink/*******-0.2.1-snapshot.jar
parallelism: 4
allowNonRestoredState: False
upgradeMode: 'savepoint'
state: 'running'
6. 监控
参考:
https://nightlies.apache.org/flink/flink-docs-release1.13/docs/deployment/metric_reporters/
需要granafa和prometheus,全场argocd,非常nice
Grafana
默认账号密码:Username: admin
Password: admin
Shell
kubectl create secret generic grafana --from-literal=username=admin --from-literal=admin-password=admin -n monitor
kubectl create secret generic grafana --from-literal=admin-username=admin --dry-run=client -o yaml | kubectl apply -f - -n monitor
argocd app create grafana \
--repo https://github.com/***** \
--path flink/argocd-dos-flink/apps/grafana \
--dest-server https://kubernetes.default.svc --dest-namespace ${NS}
Prometheus
SQL
argocd app create prometheus \
--repo https://github.com/******* \
--path flink/argocd-dos-flink/apps/prometheus \
--dest-server https://kubernetes.default.svc --dest-namespace ${NS}
flink包中添加Prometheus expoter,编译dockerfile文件,参考4.1
Shell
FROM --platform=linux/amd64 flink:1.19.0
#添加运行jar包
ADD *******-0.1.1-SNAPSHOT.jar /opt/flink/*******-0.1.1-SNAPSHOT.jar
#添加依赖包
#AWS 读取配置
ADD aws-kinesisanalytics-runtime-1.2.0.jar /opt/flink/lib/
# 监控依赖
ADD flink-metrics-prometheus-1.19.1.jar /opt/flink/lib/
flink中添加监控参数
在flink operator的values.yaml 中添加监控发送指标。参考2中的那个flink-oprator
Shell
defaultConfiguration:
# If set to true, creates ConfigMaps/VolumeMounts. If set to false, no configuration will be created.
# All below fields will be ignored if create is set to false.
create: true
# If set to true,
# (1) loads the built-in default configuration
# (2) appends the below flink-conf and logging configuration overrides
# If set to false, loads just the overrides as in (2).
# This option has not effect, if create is equal to false.
append: true
flink-conf.yaml: |+
# Flink Config Overrides
kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory
kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
kubernetes.operator.reconcile.interval: 15 s
kubernetes.operator.observer.progress-check.interval: 5 s
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http://a41093c9812b64ed7957852531a4ad74-1203801685.ap-southeast-1.elb.amazonaws.com:9091
metrics.reporter.promgateway.jobName: flink
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.interval: 60 SECONDS
其中promgateway 的地址,查看步骤6.1中的svc信息:
更新配置后,更新flink-operator服务
Shell
helm --namespace flink upgrade flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator -f values.yaml
helm --namespace flink upgrade flink-kubernetes-operator .
重新执行第5节中的flink任务。
登陆pushgateway的地址,查看flink任务监控是否接收
http://a41093c9812b64ed7957852531a4ad74-1203801685.ap-southeast-1.elb.amazonaws.com:9091
模板地址:https://grafana.com/grafana/dashboards/?search=flink
搞定,完美。
涤生大数据往期精彩推荐
3.运维实战:DolphinScheduler 生产环境升级
4.运维实战:Ambari开发手册-DolphinScheduler集成实操
5.大数据运维实战之Ambari修护Hive无法更换tez引擎
6.大数据平台实践之CDH6.2.1+spark3.3.0+kyuubi-1.6.0
8.运维实战100:CDH5.16.2升级至CDH6.3.2
12.WebHDFS Rest API 企业实战:大佬手把手带你堵住漏洞,企业实例解析
14.大数据实战:Ambari开发手册之OpenTSDB快速集成技巧
18.一站式解析Spark 日志 ,助力开发Spark任务调优诊断
19.大数据运维实战:Presto如何自定义日志插件实现日志采集存储?