查看原文
其他

大数据运维实战系列105:Aws Flink 平滑迁移K8S

涤生-树哥 涤生大数据
2024-12-05


1. Flink One K8S

Flink on K8s,作为Apache Flink的一种高效部署模式,将流处理引擎Flink与容器编排系统Kubernetes(简称K8s)相结合,实现了数据处理的高可用、高性能和高弹性。Flink,作为一个具备高效流处理和批处理能力的引擎,与K8s的自动化容器部署、扩展和管理功能相得益彰。在K8s上运行Flink集群,不仅可以充分利用K8s的自动伸缩机制根据业务需求和资源利用率动态调整集群节点数量,还可以借助K8s的故障转移和副本机制提升集群的可用性。

Flink on K8s作为一种高效、灵活、可靠的数据处理方案,已经广泛应用于金融、电商、智能制造和物联网等多个领域。它不仅能够处理实时数据流,实现流式数据仓库的功能,还能进行实时监控和预警,以及底层大数据处理。在未来的数据处理场景中,Flink on K8s将继续发挥其独特的优势,助力企业实现数字化转型和智能化升级。

2. Flink-Operator

Flink-Operator是一个用于自动化管理Apache Flink部署生命周期的工具,它基于Java Operator SDK构建,并与Kubernetes原生集成。Flink-Operator适用于需要自动化管理Flink部署的大型生产环境。它可以帮助企业降低运维成本,提高部署效率和可靠性。具有一下特点:

自动化管理:Flink-Operator能够自动化管理Flink部署的生命周期,减少了人工干预的需要。

集成Kubernetes:与Kubernetes原生集成,使得Flink应用程序可以轻松地部署在Kubernetes集群上。

可定制性:通过配置选项,用户可以根据自己的需求定制Flink-Operator的行为。

高性能:通过优化算子链(Operator Chain)等机制,提高了Flink作业的执行性能。

参考:https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/helm/

安装步骤:资源下载

SQL#安装certkubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml#安装资源helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
#下载本地执行/可选helm pull flink-operator-repo/flink-kubernetes-operator

安装

SQLhelm --namespace flink install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator#本地安装helm --namespace flink install flink-kubernetes-operator .# 更新# helm --namespace flink upgrade flink-kubernetes-operator .#查看是否安装 % kubectl get podsNAME READY STATUS RESTARTS AGEflink-kubernetes-operator-d8546dfff-l9t8w 2/2 Running 0 38s


3. Demo案例验证

3.1 Flink Demo

运行官网demo测试看看有安装成功没。

1.下载demo案例

#SQLwget https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.8/examples/basic.yaml

2.demo 案例

修改下对应版本和img信息,默认是17;修改namespace

SQLapiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata: name: basic-example namespace: flinkspec: image: flink:1.19 flinkVersion: v1_19 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless

3.查看pod与svc

SQLmaoshu@maoshu-MacBook flink-examples % kubectl get podsNAME READY STATUS RESTARTS AGEbasic-example-d86f5478-d5bz8 1/1 Running 0 2m4sbasic-example-taskmanager-1-1 1/1 Running 0 101sflink-kubernetes-operator-d8546dfff-l9t8w 2/2 Running 0 10mmaoshu@maoshu-MacBook flink-examples % kubectl get svcNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEbasic-example ClusterIP None <none> 6123/TCP,6124/TCP 2m8sbasic-example-rest ClusterIP 172.20.67.39 <none> 8081/TCP 2m8sflink-operator-webhook-service ClusterIP 172.20.148.23 <none> 443/TCP 10m

将svc映射到本地,进行网页查看

SQL% kubectl port-forward svc/basic-example-rest 8081Forwarding from 127.0.0.1:8081 -> 8081Forwarding from [::1]:8081 -> 8081

打开web页面,进行查看

3.2 运行自己任务

1. 创建img

创建一个制作image的目录,并创建Dockerfile的制作文件,文件案例

SQLFROM --platform=linux/amd64 flink:1.19.0ADD WordCount.jar /opt/flink/examples/WordCount.jar

使用flink 1.19.0 版本,将我们的jar包拷贝进镜像中。如果有依赖,需要同步拷贝依赖进入到/opt/flink/libs 中;

制作镜像

SQLdocker build -t flink .

2. 推送img到ecr

此处,使用的是awss的ecr镜像,也可以使用其他同类服务替代。在AWS ECR中添加镜像服务器信息,并推送;

登陆ECR

SQLaws ecr get-login-password --region ap-southeast-1 | docker login --username AWS --password-stdin ***.dkr.ecr.ap-southeast-1.amazonaws.com

将步骤一制作的镜像重名

SQL%docker tag flink:laste **.dkr.ecr.ap-southeast-1.amazonaws.com/flink:1.19.0%docker images |grep flink

推送镜像到ecr

SQLdocker push ***.dkr.ecr.ap-southeast-1.amazonaws.com/flink:1.19.0

3. 创建eks任务

创建任务yaml文档,可参考demo1案例

SQLapiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata: name: flink-pulsar-demo namespace: flinkspec: image: ***.dkr.ecr.ap-southeast-1.amazonaws.com/flink:1.19.0 flinkVersion: v1_19 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: "4096m" cpu: 2 taskManager: resource: memory: "4096m" cpu: 2 job: jarURI: local:///opt/flink/examples/WordCount.jar parallelism: 2 upgradeMode: stateless

运行任务:kubectl apply -f flink-pulsar-demo.yaml

4. 查看任务

查看任务

4. 迁移aws任务

4.1 image更新

在aws中的flink任务中,查询配置文档,找到其中你的��S3 中jar配置

编译好的jar包S3地址:https://f***-ap-southeast-1-***-bucket.s3.ap-southeast-1.amazonaws.com/d***-0.1.1-SNAPSHOT.jar

将该代码包或者jar包下载到本地,制作image镜像并上传。Dockerfile文件如下,需要添加依赖包。

SQLFROM --platform=linux/amd64 flink:1.19.0#添加运行jar包ADD *******-0.1.1-SNAPSHOT.jar /opt/flink/*******-0.1.1-SNAPSHOT.jar#添加依赖包ADD aws-kinesisanalytics-runtime-1.2.0.jar /opt/flink/lib/

制作image包,并上传ecr

SQL#docker 更新docker build -t flink:******* .
docker tag flink:******* *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:*******_v1
docker push *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:*******_v1
4.2 任务运行与更新

启用任务yaml参考,其中jar包路径和版本信息,根据步骤4.1中确定

SQLapiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata: name: flink-**-**-** namespace: flinkspec: image: *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:*******_v1 flinkVersion: v1_19 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" serviceAccount: flink jobManager: resource: memory: "4096m" cpu: 2 taskManager: resource: memory: "4096m" cpu: 2 job: jarURI: local:///opt/flink/*******-0.1.1-SNAPSHOT.jar parallelism: 2 upgradeMode: stateless
SQL# eks 启动kubectl delete -f flink-demo.yamlkubectl apply -f flink-demo.yaml

启动完毕后,查看任务,并执行网络映射查看

SQL% kubectl get podsNAME READY STATUS RESTARTS AGEflink-**-**-**-7ffc6bf84f-6d9t8 1/1 Running 0 11mflink-**-**-**-taskmanager-1-1 1/1 Running 0 11mflink-**-**-**-taskmanager-1-2 1/1 Running 0 11mflink-**-**-**-taskmanager-1-3 1/1 Running 0 11m% kubectl get svcNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEflink-**-**-** ClusterIP None <none> 6123/TCP,6124/TCP 11mflink-**-**-**-rest ClusterIP 172.20.42.151 <none> 8081/TCP 11m% kubectl port-forward svc/flink-**-**-**-rest 8081Forwarding from 127.0.0.1:8081 -> 8081Forwarding from [::1]:8081 -> 8081

web页面查看:http://127.0.0.1:8081 完美收官

5. Flink 本地配置

此处,主要是迁移aws上flink中的config配置。

5.1 ConfigMap

创建configmap,程序使用的是kinesisanalytics进行配置的读取,读取的逻辑是读取config.properties文件,默认config.properties文件中的内容如下:

数据案例:

Shell[ { "PropertyGroupId": "Stream1", "PropertyMap": { "Region": "us-east-1" } }, { "PropertyGroupId": "Stream2", "PropertyMap": { "Region": "us-east-2", "Offset": "latest" } }]

对此,参考aws上flink任务配置,迁移到configmap配置案例如下:

YAML# ConfigMap for Flink configurationapiVersion: v1kind: ConfigMapmetadata: name: cp-**-**-config namespace: flinkdata: application_properties.json: |- [{"PropertyGroupId":"RealtimeCoupon", "PropertyMap":{ "coupon.job.window-process": "window", "dos.coupon.topic": "**-**-**-loadtest", ******** "sink.parallelism": 6, "source.parallelism":8, "window.parallelism":12} }]
5.2 任务Demo
YAMLapiVersion: flink.apache.org/v1beta1kind: FlinkDeploymentmetadata: name: cp-**-** namespace: flink labels: deploy.artifact.io/kind: 'flink' deploy.artifact.io/name: 'cp-**-**'spec: image: *****.dkr.ecr.ap-southeast-1.amazonaws.com/flink:uat-cp-**-**-v2 imagePullPolicy: Always flinkVersion: v1_19 flinkConfiguration: jobmanager.memory.process.size: "1g" kubernetes.jobmanager.memory.limit-factor: "1.5" kubernetes.jobmanager.cpu.amount: "1" kubernetes.jobmanager.cpu.limit-factor: "8" taskmanager.memory.process.size: "1g" kubernetes.taskmanager.memory.limit-factor: "1.33" kubernetes.taskmanager.cpu.amount: "1" kubernetes.taskmanager.cpu.limit-factor: "8" parallelism.default: "4" taskmanager.data.port: "12000" high-availability.storageDir: s3://flink-**-**/cp-**-**/ha jobmanager.archive.fs.dir: s3://flink-**-**/cp-**-**/completed-jobs historyserver.archive.fs.dir: s3://flink-**-**/cp-**-**/completed-jobs state.backend.type: hashmap state.checkpoints.dir: s3://flink-**-**/cp-**-**/checkpoint/ state.savepoints.dir: s3://flink-**-**/cp-**-**/savepoint/
serviceAccount: flink podTemplate: apiVersion: v1 kind: Pod metadata: name: pod-template jobManager: podTemplate: apiVersion: v1 kind: Pod metadata: name: cp-**-**-jm namespace: flink labels: deploy.artifact.io/kind: 'flink' deploy.artifact.io/name: 'cp-**-**' spec: containers: # Do not change the main container name - name: flink-main-container ports: - name: rpc containerPort: 6123 - name: blob containerPort: 6124 - name: ui containerPort: 8081 - name: metric containerPort: 9249 env: - name: module value: 'cp-**-**' - name: ENV value: 'dev' - name: FLINK_VER value: '1.19.0' readinessProbe: httpGet: path: / port: 8081 initialDelaySeconds: 30 volumeMounts: - name: cp-**-**-config mountPath: /etc/flink volumes: - name: cp-**-**-config configMap: name: cp-**-**-config
taskManager: podTemplate: apiVersion: v1 kind: Pod metadata: name: cp-**-**-tm namespace: flink labels: deploy.artifact.io/kind: 'flink' deploy.artifact.io/name: 'cp-**-**' spec: serviceAccountName: cp-**-** containers: - name: flink-main-container ports: - name: data containerPort: 6121 - name: rpc containerPort: 6122 - name: metric containerPort: 9249 env: - name: module value: 'cp-**-**' - name: ENV value: 'dev' - name: FLINK_VER value: '1.19.0' volumeMounts: - name: cp-**-**-config mountPath: /etc/flink volumes: - name: cp-**-**-config configMap: name: cp-**-**-config
job: jarURI: local:///opt/flink/*******-0.2.1-snapshot.jar parallelism: 4 allowNonRestoredState: False upgradeMode: 'savepoint' state: 'running'

6. 监控

 
6.1 监控架构

参考:

https://nightlies.apache.org/flink/flink-docs-release1.13/docs/deployment/metric_reporters/

6.2 基础依赖

需要granafa和prometheus,全场argocd,非常nice

Grafana

默认账号密码:Username: adminPassword: admin
Shellkubectl create secret generic grafana --from-literal=username=admin --from-literal=admin-password=admin -n monitorkubectl create secret generic grafana --from-literal=admin-username=admin --dry-run=client -o yaml | kubectl apply -f - -n monitor
argocd app create grafana \ --repo https://github.com/***** \ --path flink/argocd-dos-flink/apps/grafana \ --dest-server https://kubernetes.default.svc --dest-namespace ${NS}

Prometheus

SQLargocd app create prometheus \ --repo https://github.com/******* \ --path flink/argocd-dos-flink/apps/prometheus \ --dest-server https://kubernetes.default.svc --dest-namespace ${NS}
6.3 flink包更新

flink包中添加Prometheus expoter,编译dockerfile文件,参考4.1

ShellFROM --platform=linux/amd64 flink:1.19.0#添加运行jar包ADD *******-0.1.1-SNAPSHOT.jar /opt/flink/*******-0.1.1-SNAPSHOT.jar#添加依赖包#AWS 读取配置ADD aws-kinesisanalytics-runtime-1.2.0.jar /opt/flink/lib/# 监控依赖ADD flink-metrics-prometheus-1.19.1.jar /opt/flink/lib/

flink中添加监控参数

在flink operator的values.yaml 中添加监控发送指标。参考2中的那个flink-oprator

ShelldefaultConfiguration: # If set to true, creates ConfigMaps/VolumeMounts. If set to false, no configuration will be created. # All below fields will be ignored if create is set to false. create: true # If set to true, # (1) loads the built-in default configuration # (2) appends the below flink-conf and logging configuration overrides # If set to false, loads just the overrides as in (2). # This option has not effect, if create is equal to false. append: true flink-conf.yaml: |+ # Flink Config Overrides kubernetes.operator.metrics.reporter.slf4j.factory.class: org.apache.flink.metrics.slf4j.Slf4jReporterFactory kubernetes.operator.metrics.reporter.slf4j.interval: 5 MINUTE
kubernetes.operator.reconcile.interval: 15 s kubernetes.operator.observer.progress-check.interval: 5 s
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory metrics.reporter.promgateway.hostUrl: http://a41093c9812b64ed7957852531a4ad74-1203801685.ap-southeast-1.elb.amazonaws.com:9091 metrics.reporter.promgateway.jobName: flink metrics.reporter.promgateway.randomJobNameSuffix: true metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.interval: 60 SECONDS

其中promgateway 的地址,查看步骤6.1中的svc信息:

更新配置后,更新flink-operator服务

Shellhelm --namespace flink upgrade flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator -f values.yamlhelm --namespace flink upgrade flink-kubernetes-operator .

重新执行第5节中的flink任务。

6.4 pushgateway确认接收

登陆pushgateway的地址,查看flink任务监控是否接收

http://a41093c9812b64ed7957852531a4ad74-1203801685.ap-southeast-1.elb.amazonaws.com:9091

6.5 grafana 配置dashboard

模板地址:https://grafana.com/grafana/dashboards/?search=flink

搞定,完美。

涤生大数据往期精彩推荐


1.涤生大数据教学集群的首次运维现场复现

2.涤生大数据HDFS小文件治理总结

3.运维实战:DolphinScheduler 生产环境升级

4.运维实战:Ambari开发手册-DolphinScheduler集成实操

5.大数据运维实战之Ambari修护Hive无法更换tez引擎

6.大数据平台实践之CDH6.2.1+spark3.3.0+kyuubi-1.6.0

7.运维实战:CDH6.3.2编译集成Flink

8.运维实战100:CDH5.16.2升级至CDH6.3.2

9.CDH集成的kerberos迁移实战(原创干货)

10.CDH启用kerberos 高可用运维实战

11.Hbase 迁移小结:从实践中总结出的最佳迁移策略

12.WebHDFS Rest API 企业实战:大佬手把手带你堵住漏洞,企业实例解析

13.解析线上HBase集群CPU飙高的原因与解决方案

14.大数据实战:Ambari开发手册之OpenTSDB快速集成技巧

15.迁移策略:CDH 集群整体平缓迁移的最佳实践

16.你知道HDFS 节点内数据(磁盘间)是怎么均衡的吗?

17.关于列式存储你可能不知道的事儿

18.一站式解析Spark 日志 ,助力开发Spark任务调优诊断

19.大数据运维实战:Presto如何自定义日志插件实现日志采集存储?

20.大数据运维实战101:Spark作业的监控与深度诊断


继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存