大数据运维实战106:Celeborn 优化 Spark Shuffle 性能实战
1.背景
在大数据处理的生态中,Apache Spark作为一种强大的数据处理引擎,已经被广泛应用于各种数据分析和计算任务。尽管 Spark 提供了强大的数据处理能力,但在处理大规模 Shuffle 时,尤其是在高负载的环境下,性能瓶颈常常显现。公司每天的 Spark 作业 Shuffle 量达到 1PB ,加上存算一体的模式,现在的节点存储水位普遍高于 60% ,特别是月更任务单个 stage 的Shuffle量达到了 40TB,这种情况对单个作业和集群整体的稳定性和性能都带来了极大的挑战。为了应对这些问题,我们考虑将 Spark 的 Shuffle 机制从 ESS (External Shuffle Service) 迁移到 Celeborn,一个新兴的高效 Shuffle 解决方案。本文将介绍这一迁移过程中的技术细节和实际经验。
2.Celeborn 的选择
2.Celeborn 的选择
Apache Celeborn 是一款高性能的存算分离存储系统,专门设计用于加速 Apache Spark 的数据处理任务。Celeborn 通过优化 Shuffle 过程中的数据管理和传输,显著提升了 Spark 应用程序的性能。以下是 Celeborn 如何优化 Spark Shuffle 的几个关键方面:
1. 存算分离架构
独立存储层:Celeborn 引入了一个独立的存储层,与计算节点解耦。这意味着 Shuffle 数据可以在专用的存储节点上进行管理和持久化,而不是在计算节点上进行,从而减轻计算节点的 I/O 压力。
灵活的扩展性:Celeborn 的存算分离架构使得存储和计算资源可以根据需求独立扩展,从而更好地利用资源。
2. 高效的数据序列化与反序列化
优化的数据格式:Celeborn 使用高效的序列化/反序列化机制,减少数据转换过程中的开销。这有助于提高数据处理的速度。
压缩技术:通过使用高效的压缩算法,Celeborn 减少了网络传输的数据量,从而加快了 Shuffle 过程。
3. 快速的数据读写
快速写入模式:Celeborn 提供了快速写入模式,通过使用 UnsafeRow 数据结构来加速写操作。这种模式通过减少类型检查和动态调度来提高性能。
并发写入:Celeborn 支持并发写入,允许多个任务同时向存储系统写入数据,从而提高写入吞吐量。
4. 智能的数据重分布
细粒度的数据分区:Celeborn 通过对数据进行细粒度的分区,使得数据能够更加均匀地分布在各个计算节点上,避免了数据倾斜问题。
智能调度:Celeborn 的调度机制可以根据实时负载情况动态调整数据的分布,确保计算节点能够高效地获取所需的数据。
5. 减少数据冗余
去重机制:Celeborn 在存储层实现了数据去重机制,避免了不必要的数据复制,减少了存储空间的使用。
增量更新:Celeborn 支持增量更新机制,只存储变化的数据部分,而不是整个数据集,从而减少了存储和传输的开销。
6. 容错与恢复
快速故障恢复:Celeborn 设计了快速的故障恢复机制,能够在数据丢失或节点故障时迅速恢复数据,保证系统的高可用性。
数据一致性:通过分布式事务管理和一致性协议,Celeborn 确保了数据的一致性和完整性。
7. 易用性和集成
无缝集成:Celeborn 能够无缝集成到现有的 Spark 应用程序中,无需对应用程序进行重大更改即可享受性能提升的好处。
丰富的监控工具:Celeborn 提供了丰富的监控工具和仪表板,方便用户监控 Shuffle 过程中的各项指标,及时发现并解决问题。
3.Celeborn 的部署
3.Celeborn 的部署
Celeborn 的架构主要包括以下几个组件:
1. CelebornMaster:
负责管理整个Celeborn集群的资源分配。
使用Raft协议来确保状态的一致性和高可用性。
2. CelebornWorker:
负责处理来自客户端的数据读写请求。
在shuffle操作期间,Worker节点将处理来自各个计算节点的任务请求,并执行数据的合并与持久化。
3. CelebornClient:
运行在计算框架(如Spark或Flink)的任务执行节点上。
作为计算框架与Celeborn之间的桥梁,负责向Celeborn发起shuffle相关的请求。
支持的部署方式:
Celeborn 可以以不同的方式部署,以适应不同的需求和环境:
独立部署:将celeborn master 和 woker 独立部署在一套硬件资源上,这种可以将 shuffer 部分数据的 io 完全解耦出来,保证了 shuffer 过程的稳定性。
混合部署:可以将Celeborn集群与YARN集群混合部署,这种比较常见的是在 nm 节点独立一块或者多块磁盘硬件单独给 celeborn 使用。
Kubernetes环境下的部署:由于Celeborn支持计算与存储分离架构,因此它可以被部署到特殊的硬件环境中,甚至与计算集群分离,这在Kubernetes环境下特别有用。
spark版本的兼容性:
目前Celeborn兼容的Spark版本至少包括2.4。
tips:Celeborn 服务器与各种引擎内的所有客户端兼容。但Celeborn客户端必须与指定引擎的版本一致。例如,如果运行的是 Spark 2.4,则必须使用 -Pspark-2.4 编译 Celeborn 客户端;
4.Spark 作业的配置
修改配置文件:
为了让Spark使用Celeborn作为其shuffle服务,需要修改Spark的配置文件。以下是一些重要的配置项:
spark.shuffle.manager org.apache.spark.shuffle.celeborn.SparkShuffleManager
#这表示Spark将使用Celeborn作为shuffle manager。这意味着shuffle操作将由Celeborn处理,而不是默认的Hadoop Distributed Cache或其他机制。
spark.serializer org.apache.spark.serializer.KryoSerializer
#这里指定了Spark使用Kryo序列化器。Kryo是一个高效、快速的序列化库,通常用于提高Spark应用的性能。
spark.shuffle.service.enabled false
#此配置表明默认情况下不启用shuffle服务。如果设置为true,则表示启用Celeborn Shuffle Service。
spark.celeborn.master.endpoints 10.90.73.52:9097,10.90.73.41:9097,10.90.72.243:9097
#这个配置指定了Celeborn Master的地址列表。这些地址通常是Celeborn集群中Master节点的地址和端口。
spark.celeborn.client.spark.shuffle.writer sort
#该配置表明当写入shuffle数据时,使用排序(sort)的方式。这意味着在写入shuffle数据前,数据会被排序。
spark.celeborn.client.push.replicate.enabled false
#如果设置为true,则表明启用push复制功能。这意味着当数据被推送到一个节点时,也会被复制到其他节点。这里的false意味着禁用此功能。
spark.celeborn.client.spark.fetch.throwsFetchFailure true
#此配置表明当从Celeborn检索数据失败时,是否抛出异常。如果是true,则表示在发生错误时会抛出异常。
spark.sql.adaptive.enabled true
#此配置开启Spark SQL的自适应查询执行(AQE),它允许Spark在运行时调整查询计划以优化性能。
spark.sql.adaptive.skewJoin.enabled true
#这是AQE的一个特性,用于处理倾斜的join操作,即当一个表的某些分区远大于其他分区时,可以更有效地处理join操作。
spark.celeborn.client.push.timeout 480s
#设置push操作的超时时间为480秒(8分钟)。如果在指定时间内push操作没有完成,则会超时。
spark.celeborn.data.io.connectionTimeout 720s
#设置Celeborn数据IO连接的超时时间为720秒(12分钟)。如果连接在指定时间内没有建立,则会超时。
spark.shuffle.sort.initialBufferSize 12288
#定义初始缓冲区大小为12288字节。这是在执行排序时用于内存中数据的初始缓冲区大小。
spark.celeborn.client.spark.push.sort.memory.threshold 128m
#定义在执行push操作时,用于排序的内存阈值为128MB。当超出这个阈值时,可能会触发磁盘排序。
spark.celeborn.rpc.lookupTimeout 360s
#设置RPC查找的超时时间为360秒(6分钟)。这是在尝试查找远程过程调用时等待的最大时间。
支持Spark动态分配
对于 Spark 版本 < 3.5.0,官网提供了一个补丁,使用户能够将 Spark 与 DRA 和 Celeborn 一起使用。
Spark2 的补丁:https://github.com/apache/celeborn/blob/main/assets/spark-patch/Celeborn_Dynamic_Allocation_spark2.patch
5.celeborn 监控
5.celeborn 监控
服务组件监控
Apache Celeborn 提供了多种监控机制,包括通过 Metrics 和 REST API 来监控其组件的状态和性能。以下是如何使用 Metrics 和 REST API 来监控 Celeborn 的步骤简介:
Metrics 监控
Celeborn 支持暴露 Prometheus 格式的 Metrics,这样就可以使用 Prometheus 或其他支持 Prometheus 格式的监控工具来收集和监控 Celeborn 的指标。
配置 Metrics
1. 启用 Metrics和暴露 Metrics 端点:
编辑 CelebornMaster 和 CelebornWorker 的配置文件(如 celeborn-master.conf 和 celeborn-worker.conf)。
确认已启用 Metrics 输出。通常情况下,Celeborn 默认启用了 Metrics 输出。
Celeborn 默认在 /metrics 端点暴露 Metrics 数据。确保 Celeborn 的配置文件中包含以下内容:
# 在 celeborn-master.conf 和 celeborn-worker.conf 中
metrics.reporter=com.celeborn.metrics.prometheus.PrometheusReporter
metrics.reporter.interval=5
• 这里 metrics.reporter.interval 表示 Metrics 报告的间隔时间(单位为秒)。
2.配置 Prometheus
# Prometheus example config
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "Celeborn"
metrics_path: /metrics/prometheus
scrape_interval: 15s
static_configs:
- targets: [ "master-ip:9098","worker1-ip:9096","worker2-ip:9096","worker3-ip:9096","worker4-ip:9096" ]
3. REST API 监控
Celeborn 提供了 REST API 接口来获取有关集群状态的信息,这些信息可用于监控 Celeborn 的健康状况。
REST API 端点:https://celeborn.apache.org/docs/latest/restapi/#master
日志监控
对Apache Celeborn的日志进行监控是确保系统健康运行的关键部分。Celeborn的日志可以帮助了解系统的运行状况、性能瓶颈以及潜在问题。
使用Filebeat采集数据:
Filebeat 是 主要用于收集、转发文件中的日志数据。使用 Filebeat 可以方便地将 Apache Celeborn 的日志数据集中到 kafka 中,进而写入到Elasticsearch 中,并使用 Kibana 进行可视化分析。
以下是使用 Filebeat 采集 Celeborn 日志的流程架构图:
Filebeat 的配置文件样例:
filebeat.inputs:
- type: log
enabled: true
paths:
- /path/to/celeborn/logs/*.log
output.kafka:
enabled: true
hosts: ["localhost:9092"]
topic_id: "celeborn-logs"
# 设置最大重试次数
max_retries: 5
# 设置重试间隔
retry_backoff: "5s"
# 设置批量发送大小
batch_max_size: 1000
# 设置批量发送最大延迟时间
batch_max_bytes: "10485760" # 10MB
# 设置批量发送超时时间
timeout: "10s"
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
host: "localhost:5601"
setup.ilm.enabled: false
使用 log4j2 Kafka Appender 写入 kafka:
使用 Log4j 将日志输出到 Kafka 是一种常见的做法,尤其是在需要实时处理日志数据的情况下。Log4j 2 提供了原生的支持来将日志直接发送到 Kafka。以下是使用 Log4j 2 将日志输出到 Kafka 的流程架构图。
tips:
celeborn release 的时候,里面并没有包含 kafka 的依赖,所以如果需要将日志输出到 kafka ,需要自己build 一下,把 kafka 依赖添加到 pom 里面,并处理可能的依赖冲突问题。
日志输出中默认的不包含主机的 ip 或者主机名信息,需要自己通过env 获取变量传入。
log4j2 的配置文件样例:
<Configuration status="INFO">
<Appenders>
<Console name="stdout" target="SYSTEM_OUT">
<!--
~ In the pattern layout configuration below, we specify an explicit `%ex` conversion
~ pattern for logging Throwables. If this was omitted, then (by default) Log4J would
~ implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
~ class packaging information. That extra information can sometimes add a substantial
~ performance overhead, so we disable it in our default logging config.
-->
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/>
</Console>
<RollingRandomAccessFile name="file" fileName="${env:CELEBORN_LOG_DIR}/celeborn.log"
filePattern="${env:CELEBORN_LOG_DIR}/celeborn.log.%d-%i">
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} ${env:CELEBORN_HOSTNAME} %p [%t] %c{1}: %m%n%ex"/>
<Policies>
<SizeBasedTriggeringPolicy size="200 MB"/>
</Policies>
<DefaultRolloverStrategy max="7">
<Delete basePath="${env:CELEBORN_LOG_DIR}" maxDepth="1">
<IfFileName glob="celeborn.log*">
<IfAny>
<IfAccumulatedFileSize exceeds="1 GB" />
<IfAccumulatedFileCount exceeds="10" />
</IfAny>
</IfFileName>
</Delete>
</DefaultRolloverStrategy>
</RollingRandomAccessFile>
<Kafka name="Kafka" topic="bg_celeborn_logs" syncSend="true" ignoreExceptions="true">
<JsonLayout properties="false" compact="true" includeTimeMillis="true" charset="UTF-8">
<KeyValuePair key="hostname" value="${env:CELEBORN_HOSTNAME}"/>
</JsonLayout>
<Property name="bootstrap.servers">ds-bigdata-001:9092,ds-bigdata-002:9092,ds-bigdata-003:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="INFO">
<!--
~ Here are appender templates, keep the appender as your need.
-->
<AppenderRef ref="Kafka"/>
<AppenderRef ref="file"/>
</Root>
<Logger name="org.apache.hadoop.hdfs" level="WARN" additivity="false">
<Appender-ref ref="stdout" level="WARN" />
<Appender-ref ref="file" level="WARN"/>
</Logger>
<Logger name="org.apache.ratis.server.RaftServerConfigKeys" level="WARN" additivity="false">
<Appender-ref ref="stdout" level="WARN" />
<Appender-ref ref="file" level="WARN"/>
</Logger>
</Loggers>
</Configuration>
涤生大数据往期精彩推荐
3.运维实战:DolphinScheduler 生产环境升级
4.运维实战:Ambari开发手册-DolphinScheduler集成实操
5.大数据运维实战之Ambari修护Hive无法更换tez引擎
6.大数据平台实践之CDH6.2.1+spark3.3.0+kyuubi-1.6.0
8.运维实战100:CDH5.16.2升级至CDH6.3.2
12.WebHDFS Rest API 企业实战:大佬手把手带你堵住漏洞,企业实例解析
14.大数据实战:Ambari开发手册之OpenTSDB快速集成技巧
18.一站式解析Spark 日志 ,助力开发Spark任务调优诊断