查看原文
其他

就Spark中cache和checkpoint,我和字节面试官battle了10分钟,面试过了

涤生-健哥 涤生大数据
2024-12-05

有个面字节的同学,说被面试官就cache和checkpoint问题两人battle是十几分钟,相谈甚欢,后来过了面试;

Spark 的一个重要部分是能够在计算过程中将内容存储在内存中。这是一个巧妙的技巧,我们可以将其用作加快对常用查询表或数据片段的访问速度的一种方式。这也非常适合在相同数据上一遍又一遍地工作的迭代算法。虽然许多人认为这是解决速度问题的灵丹妙药,但其他概念(如数据分区、集群、存储桶)最终可能会对作业的执行产生比缓存更大的影响。

尽管 Spark 提供的计算速度比传统的 Map Reduce 作业快 100 倍,但如果没有将作业设计为重用重复计算,则在处理数十亿或数万亿数据时,我们就会看到性能明显得下降。因此,我们可能需要查看各个阶段并使用优化技术作为提高性能的方法之一。

在一遍又一遍地重用相同数据集的应用程序中,最有用的优化之一是缓存和持久化。缓存是将RDD或者 DataFrame 或者 Table 的数据放置在集群中Executor之间的内存或者是磁盘临时存储中,并加快后续读取速度。

1.cache缓存

cache() 方法用于缓存转换的中间结果,以便在缓存的 RDD

或者DataFrame 上运行的其他转换将执行得更快。缓存转换结果是提高长时间运行的 Spark 应用程序/作业性能的优化技巧之一。如下:

spark.read.schema(emp_schema).csv("\path\to\emp\emp.csv")df.cache()df = df.withColumn("increment", lit(1000))df.show()

切记:cache在RDD中默认的缓存级别是MEMORY_ONLY,而在DataFrame中默认的缓存级别是MEMORY_AND_DISK。

在调用 cache() 函数后,数据没有任何反应,但缓存管理器通过添加新运算符来更新查询计划。调用 Spark Action算子时会发生实际缓存

它适用于惰性评估的概念,即除非我们要求采取Action,否则什么都不会实现。当我们调用该操作时,它从读取文件开始,我们可以看到它被缓存,然后数据被缓存在内存中,接下来它进行一些转换,然后执行该操作。在第二次转换的情况下,它会从缓存中加载数据,而不会转到源来获取数据。

Spark 会自动监视每个节点上的缓存使用情况,并以最近最少使用的 (LRU) 方式删除旧数据分区。因此,最近使用的最少将首先从缓存中删除。 

2.Persist

Persist 类似于 Cache,唯一的区别是它可以接受参数,这也是可选的。如果没有给出参数,则默认情况下在DataFrame中会将其保存到MEMORY_AND_DISK存储级别。程序会在结束的时候删除内存或者磁盘上缓存的数据,当然我们也可以显式删除缓存数据,我们使用 API unpersist。

df.unpersist()

在缓存和持久化的情况下,血缘保持不变,这意味着它们是容错的,这意味着如果数据集的任何分区丢失,它将使用创建它的原始转换自动重新计算。

在 Spark 中处理数据时,通常需要重用某个数据集。另外我们需要注意的是请务必小心使用缓存,因为它本身就是一项昂贵的操作。例如,如果只使用一次数据集,则拉取和缓存的成本大于原始数据拉取的成本。缓存数据后,Catalyst Optimizer 将仅返回到缓存数据的位置。因此滥用缓存功能有时会导致更多的性能问题。它妨碍了 Catalyst Optimizer,削弱了谓词下推。它们不了解填充缓存所需的时间与从缓存中提取的读取操作所需的时间。

3.CheckPoint

检查点是将 RDD 保存到分布式文件系统的行为,以便将来对此 RDD 的引用指向分布式文件系统中的数据,而不是从其原始源重新计算 RDD。这与缓存类似,只是它不存储在内存或者磁盘中,只存储在分布式文件系统中。

检查点的操作一般的流程如下:

I、设置检查点目录

II、创建指定数据集a的DataFrame

III、创建指定数据集b的DataFrame

IV、将a和b Dataframe 连接起来,然后指向join_df

V、设置检查点操作

spark.sparkContext.setCheckpointDir("\path\to\checkpoint\dir") // Step 1var emp_df = spark.read.schema(emp_schema).csv("\path\to\emp\emp.csv") //Step 2var dept_df = spark.read.schema(dept_schema).csv("\path\to\dept\dept.csv") //Step 3var join_df = emp_df.join(dept_df, Seq("deptid"), "left") // Step 4join_df = join_df.checkpoint() // Step 5join_df.show() // Step 6


我们可以通过Spark UI,了解带检查点和不带检查点之间的区别。

没有检查点:只创建了一个作业。逻辑计划是实现目标所需的完整计划。如下所示的 DAG 具有完整的流程,其中数据加载到两个 DataFrame 中,并在 DataFrame 之间进行联接。

使用检查点在调用检查点时创建单独的作业。此函数将 RDD 具体化并存储到我们在代码中指定的检查点目录中。一旦 DataFrame 被检查点,后续读取将从检查点位置进行。我们可以明显的看到逻辑计划,与上述逻辑计划相比,它被截断了。在这种情况下,DAG 也有所不同。在这里,我们可以看到起点变成了它被检查的点

检查点用于截断此数据集的逻辑计划,这在计划可能呈指数增长的迭代算法中特别有用。

那么我们什么时候去设置检查点呢每次需要缓存计算分区时,都会将其缓存到内存中。但是,Checkpoint 不遵循相同的原则。相反,它会等到作业结束,然后启动另一个作业来完成检查点。需要检查点的 RDD 将计算两次;因此,建议在 rdd.checkpoint() 之前执行 rdd.cache()。在这种情况下,第二个作业不会重新计算 RDD。相反,它只会读取缓存。

而在persist / cache 在检查点中断血缘时保持血缘完整。即使从缓存中获取数据,也会保留血缘。这意味着如果缓存中的某些分区丢失,则可以从头开始重新计算数据。在第二种情况下,血缘在检查点之后完全丢失,并且不再携带重建它所需的信息。

4.缓存和检查点之间的区别

缓存和检查点之间存在显著差异。缓存将 RDD 具体化并将其保存在内存(和/或磁盘)中。但是RDD的沿袭(计算链)(即生成RDD的操作序列)将被记住,因此,如果出现节点故障并且缓存的RDD的一部分丢失,则可以重新生成它们。但是,checkpoint 将 RDD 保存到 HDFS 文件中,实际上会完全忘记血缘。这样可以截断长血缘,并将数据可靠地保存在 HDFS 中,HDFS 自然可以通过复制实现容错。

此外,rdd.persist(StorageLevel.DISK_ONLY) 也不同于检查点。通过前者可以将RDD分区持久化到磁盘上,这些分区由blockManager管理。一旦驱动程序完成,这意味着 CoarseGrainedExecutorBackend 所在的线程停止,blockManager 将停止,缓存到磁盘的 RDD 将被删除(blockManager 使用的本地文件将被删除)。但是检查点会将 RDD 保存到 HDFS 或本地目录。如果不手动删除,它们将始终位于磁盘上,因此下一个驱动程序可以使用它们。

这里要记住的一点是,当Hadoop MapReduce执行一个作业时,它会在每个任务和每个作业的末尾保持持久化数据(写入HDFS)。执行任务时,它会在内存和磁盘之间来回切换。Hadoop的问题在于,如果发生任何错误,则需要重新执行任务,例如,因错误而停止的shuffle将只有一半的数据保留在磁盘上,然后将重新计算持久化的数据,以便下一次运行shuffle。Spark 的优点是,当发生错误时,下一次运行将从检查点读取数据,但缺点是检查点需要执行两次作业。

缓存比检查点更加有用,因为实际的工作中我们可能会有大量可用内存来存储 RDD 或DataFrame。

另外缓存将保留转换的结果,以便在 RDD 或 DataFrame 上应用其他转换时不必再次重新计算这些转换,当应用缓存时,Spark 会存储所应用转换的历史记录,并在内存不足的情况下重新计算它们,但是当应用检查点 Spark 时,Spark 会丢弃所有转换,并将最终的 DataFrame 永久存储到 HDFS 中。

检查点的主要问题是将数据存储到 HDFS 中,这比缓存慢很多很多。检查点主要用于跨多个批处理合并数据的有状态转换。在此类转换中,生成的 RDD 依赖于前批次的 RDD,这会导致依赖链的长度随时间不断增加,并避免恢复时间的无限增加。

涤生大数据往期精彩推荐

1.企业数仓DQC数据质量管理实践篇

2.企业数据治理实战总结--数仓面试必备

3.OneData理论案例实战—企业级数仓业务过程

4.中大厂数仓模型规范与度量指标有哪些?

5.手把手教你搭建用户画像系统(入门篇上)

6.手把手教你搭建用户画像系统(入门篇下)

7.SQL优化之诊断篇:快速定位生产性能问题实践

8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!

9.新能源趋势下一个简单的数仓项目,助力理解数仓模型

10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践

11.开发实战角度:distinct实现原理及具体优化总结

12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)

13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)

14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?

15.企业级Apache Kafka集群策略:Kakfa最佳实践总结

16.玩转Spark小文件合并与文件读写提交机制

17.一文详解Spark内存模型原理,面试轻松搞定

18.大厂8年老司机漫谈数仓架构

19.用户画像之银行信贷场景的标签体系实践

20.Clickhouse+Bitmap实现用户画像中海量用户的圈选


继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存