查看原文
其他

大数据运维实战101:Spark作业的监控与深度诊断

涤生-强哥 涤生大数据
2024-12-05

Apache Spark是一个强大的大数据处理框架,广泛应用于数据分析、机器学习和实时数据处理等领域。然而,随着数据规模的扩大和作业复杂性的增加,如何有效地监控和诊断Spark作业的资源使用状态变得尤为重要。本文将探讨Spark作业的常用监控工具、任务的资源状态诊断。 

一、Spark作业监控的重要性

在生产环境中,Spark作业的性能直接影响到数据处理的效率和业务决策的及时性。通过监控作业的运行状态,我们可以:

  1. 及时发现问题:监控可以帮助我们在问题发生的第一时间得到预警,避免业务中断。

  2. 优化资源利用:通过分析作业的资源消耗情况,合理调配集群资源,提高资源的使用效率。

  3. 改进作业性能:通过性能监控与分析,识别瓶颈并进行优化,提升作业的执行速度。

二、Spark常用的作业监控工具

1. Spark Web USpark自带的Web UI是最基本的监控工具。它提供了作业的详细运行信息,包括:

  • 作业状态:展示各个作业的状态(RUNNING、SUCCEEDED、FAILED等)。

  • 任务详情:可以查看每个任务的执行时间、Shuffle读写、GC时间等信息。

  • 阶段信息:展示每个阶段的执行时间和数据传输情况。

2. Ganglia

Ganglia是一个开源的分布式监控系统,能够监控Spark集群的整体健康状况。通过在Spark集群中集成Ganglia,可以获得系统级别的指标,如CPU使用率、内存使用情况、网络流量等。

3. Prometheus + Grafana

Prometheus是一种开源的监控与报警系统,Grafana则是一个可视化工具。将Spark与Prometheus结合使用,可以实时监控Spark作业的性能指标,并通过Grafana进行可视化展示,从而直观地了解作业的运行状态。

这里开源的推荐大家使用graphite_exporter 来收集 spark的实时作业状态,后端存储的话可以结合promethues + VictoriaMetrics。

启用 spark 的 metrics 配置:

# Enable JvmSource for instance master, worker, driver and executormaster.source.jvm.class=org.apache.spark.metrics.source.JvmSourceworker.source.jvm.class=org.apache.spark.metrics.source.JvmSourcedriver.source.jvm.class=org.apache.spark.metrics.source.JvmSourceexecutor.source.jvm.class=org.apache.spark.metrics.source.JvmSource# Sink to graphite*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink*.sink.graphite.protocol=tcp*.sink.graphite.host=172.16.32.1*.sink.graphite.port=9109*.sink.graphite.period=10*.sink.graphite.unit=seconds

promethues 后端存储配置为VictoriaMetrics

remote_write: - url: http://10.84.1.xxx:6480/insert/0/prometheus - url: http://10.84.1.xxx:6480/insert/0/prometheus - url: http://10.84.1.xxx:6480/insert/0/prometheus
write_relabel_configs: - source_labels: [__name__] regex: (spark_app_jvm_memory_usage|spark_app_block_manager|spark_app_executor_diskBytesSpilled_metrics|spark_app_executor_memoryBytesSpilled_metrics|spark_app_filesystem_usage|spark_app_filesystem_usage|spark_app_executor_tasks|spark_app_executor_shuffleBytesWritten_metrics|spark_app_executor_shuffleLocalBytesRead_metrics|spark_app_executor_shuffleLocalBlocksFetched_metrics|spark_app_executor_shuffleFetchWaitTime_metrics|spark_app_executor_shuffleRecordsRead_metrics|spark_app_executor_shuffleRecordsWritten_metrics|spark_app_executor_shuffleRemoteBlocksFetched_metrics|spark_app_executor_shuffleTotalBytesRead_metrics|spark_app_executor_shuffleWriteTime_metrics) action: keep

grafana 前端部分实时任务指标的展示图:

三、常见的性能和资源使用问题

在使用 spark作业时,我们可能会遇到以下常见的性能问题:

1. 任务执行时间过长

任务执行时间过长可能由多种原因引起,包括数据倾斜、资源不足、Shuffle操作过多等。

2. 内存溢出

当Spark作业处理的数据量超过了可用内存时,可能会导致内存溢出错误(OutOfMemoryError)。这通常发生在大数据量的Shuffle操作或缓存数据时。

3. GC(垃圾回收)时间过长

长时间的GC会影响作业的执行效率。我们可以通过监控GC时间来判断是否需要调整内存配置或优化代码。

4. 数据倾斜

数据倾斜是指某些任务处理的数据量远大于其他任务,这会导致某些任务执行时间过长。此时,可以考虑重新划分数据或者使用salting等技术来减轻倾斜。

5.资源浪费

在提交Spark作业时,用户可能会过度分配资源,例如为每个作业分配过多的executor或cores。这种情况会导致资源闲置,未能有效利用集群的计算能力。


四、深度诊断方法

1. 日志分析

Spark提供了丰富的日志信息,通过分析这些日志,我们可以定位到具体的错误和性能瓶颈。可以使用工具如Log4j或ELK Stack(Elasticsearch, Logstash, Kibana)进行集中化日志管理和分析。

2. 使用Spark的事件日志

Spark可以记录事件日志(Event Log),它记录了作业的各种事件,如作业开始、阶段开始、任务开始、任务失败、任务完成等。通过设置spark.eventLog.enabledtrue,可以将事件日志输出到指定位置。通过查看事件日志,我们可以重现作业的执行过程,并找到性能瓶颈。

TIPS:我们如果利用这种方式,只需要获取事件日志,通过一定的规则解析数据即可,Event Log 的解析实现我门往期的文章有介绍,大家可以参考:《一站式解析Spark 日志 ,助力开发Spark任务调优诊断》https://mp.weixin.qq.com/s/hGkwHdHoUTRQCyKO7lWK3A 

TIPS:spark 单个 executor 的内存使用状态,在Event Log并不记录的,所以我们要获取这部分的数据有两种途径。

1.通过开启 spark driver 和 executor 的 GC 日志,这样每次作业结束的时候,都会在spark driver 和 executor 的运行日志中输出,我们可以分析log 中的 GC 日志来获取内存的使用状态。

开启 GC 日志的方式:

--conf "spark.driver.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \

Driver 的 GC 日志样例:

Heap PSYoungGen total 578560K, used 171814K [0x00000000d5580000, 0x0000000100000000, 0x0000000100000000) eden space 493568K, 17% used [0x00000000d5580000,0x00000000daa4b6a0,0x00000000f3780000) from space 84992K, 99% used [0x00000000f3780000,0x00000000f8a7e458,0x00000000f8a80000) to space 120320K, 0% used [0x00000000f8a80000,0x00000000f8a80000,0x0000000100000000) ParOldGen total 1398272K, used 265270K [0x0000000080000000, 0x00000000d5580000, 0x00000000d5580000) object space 1398272K, 18% used [0x0000000080000000,0x000000009030d920,0x00000000d5580000) Metaspace used 126488K, capacity 138102K, committed 138240K, reserved 1169408K class space used 16855K, capacity 17632K, committed 17664K, reserved 1048576K

2.通过上面提到的graphite_exporter接出来的 metrics 指标,然后通过 promethues 提供的接口查询。

实现代码案例:

#promethues 接口查询def query_range_prometheus_api(query, start_time, end_time, step): url = "http://10.90.50.93:60080/api/v1/query_range" params = {'query': query, 'start': start_time, 'end': end_time, 'step': step} response = requests.get(url, params=params) if response.status_code == 200: return response.json() else: raise Exception(f"Failed to query Prometheus API. Status code: {response.status_code}")


def get_app_memory_usage(application, type, startedTime, finished_time, executortype): """ 获取spark 任务的内存使用情况 """ # 格式化时间为ISO 8601格式 start_time = datetime.utcfromtimestamp(startedTime / 1000.0).isoformat() + 'Z' # 将 finished_time 转换为 UTC 时间 end_time = datetime.utcfromtimestamp(finished_time / 1000.0)
# 延后 10 分钟 end_time += timedelta(minutes=10)
# 转换为 ISO 格式并添加 'Z' end_time_iso = end_time.isoformat() + 'Z'
if executortype == 0: query = 'spark_app_jvm_memory_usage{application="' + application + '",executor_id="driver",mem_type="heap",qty="' + type + '"}' else: query = 'spark_app_jvm_memory_usage{application="' + application + '", executor_id!="driver", mem_type="heap",qty="' + type + '"}'
result = query_range_prometheus_api(query, start_time, end_time_iso, '15s') # 初始化最大值 max_value = -1
# 遍历结果 for result in result['data']['result']: values = result['values'] for timestamp, value in values: value_float = float(value) # 将字符串值转换为浮点数 if value_float > max_value: max_value = value_float return max_value

下面是实现后的一些指标样例展示:

3.开源工具推荐

OPPO开源的一款Compass,整体功能相比较完善,目前已经支持的覆盖了 MR、Spark、Flink 三种类型的作业分析,其中解析的核心也相对全面,使用成本也比较低,大家可以尝试用用。

项目地址:https://github.com/cubefs/compass

下面是部分功能的展示:

 

五、总结

Spark作业的监控与深度诊断是确保大数据处理高效且稳定运行的重要环节。借助合适的监控工具和诊断方法,我们可以及时发现和解决性能问题,不断优化作业执行效率。通过不断的实践与总结,提升团队的监控能力,将有助于更好地利用Spark这一强大的工具,推动业务的发展。希望本文能为您在Spark作业监控与诊断方面提供一些参考和帮助。


涤生大数据往期精彩推荐


1.涤生大数据教学集群的首次运维现场复现

2.涤生大数据HDFS小文件治理总结

3.运维实战:DolphinScheduler 生产环境升级

4.运维实战:Ambari开发手册-DolphinScheduler集成实操

5.大数据运维实战之Ambari修护Hive无法更换tez引擎

6.大数据平台实践之CDH6.2.1+spark3.3.0+kyuubi-1.6.0

7.运维实战:CDH6.3.2编译集成Flink

8.运维实战100:CDH5.16.2升级至CDH6.3.2

9.CDH集成的kerberos迁移实战(原创干货)

10.CDH启用kerberos 高可用运维实战

11.Hbase 迁移小结:从实践中总结出的最佳迁移策略

12.WebHDFS Rest API 企业实战:大佬手把手带你堵住漏洞,企业实例解析

13.解析线上HBase集群CPU飙高的原因与解决方案

14.大数据实战:Ambari开发手册之OpenTSDB快速集成技巧

15.迁移策略:CDH 集群整体平缓迁移的最佳实践

16.你知道HDFS 节点内数据(磁盘间)是怎么均衡的吗?

17.关于列式存储你可能不知道的事儿

18.一站式解析Spark 日志 ,助力开发Spark任务调优诊断

19.大数据运维实战:Presto如何自定义日志插件实现日志采集存储?


继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存