查看原文
其他

Delta Lake | 数据湖的诞生与案例实践

Editor's Note

在 Spark + AI Summit 峰会上,Databricks 和 Linux 基金会共同宣布 Delta Lake 现在由 Linux 基金会托管,将成为数据湖的开发标准。今天将和大家聊一聊 Delta Lake ,我们从 Delta Lake 诞生说起,直到示例实战。

The following article is from DataFlow范式 Author Jason

续 上篇文章:Delta Lake | Apache大神带你了解数据湖,这一篇文章就够了

Delta Lake 诞生背景

大概今年4月份,在 Spark + AI Summit 2019 大会上,Databricks 宣布将 Databricks Runtime 里面的 Delta Lake 基于 Apache License 2.0 协议开源,该项目旨在建立一个开放的标准来管理数据湖中的海量数据。Delta Lake 项目官方地址为 https://delta.io,github代码地址为 https://github.com/delta-io/delta 。

Delta Lake 一开源就备受关注,说明该项目能够解决企业中某些方面的痛点,那是什么呢?

Hadoop 生态圈发展10几年了,很多公司内部的数据架构中都存在或大或小的数据湖,这里笔者没有说数据仓库,因为数据湖和数据仓库还是有差别的。为了防止话题扯的太远,笔者就简单说几句,可能不标准不规范,但是足够让读者明白怎么回事。

  1. 数据湖可以简单理解为一个存储海量的原始数据池,需要很大的存储容量。

  2. 原始数据的目的没有明确定义。由于数据湖主要存储原始和未处理的数据,可以用于任何目的,

  3. 这使其成为人工智能(Al)、机器学习和数据科学的理想选择。



  4. 而数据仓库可能大家更好理解,因为企业中只要使用大数据平台,数据仓库肯定必不可少。

  5. 数据仓库是结构化和已定义数据的存储库,这些数据已经为特定目的进行了处理,

  6. 这使得企业更容易理解和分析数据。但是数据仓库的这一显著优势提供的灵活性很小,

  7. 并且需要大量的劳动力,比如 ETL 处理。

根据上面简单对比,我们知道数据湖是一种存储大量原始数据的存储库和处理引擎,它能够存储大量各种类型的数据,拥有强大的信息处理能力和处理几乎无限的并发任务或工作的能力。但是,现如今,企业中的数据湖大多面临如下的一些问题:

  • 数据湖的读写是不可靠的

    写入数据湖时,写入期间看到垃圾数据。

  • 数据湖中的数据质量很低

    没有任何验证模式和校验数据的机制,导致数据湖的数据质量很差。

  • 随着数据量增长,处理性能越来越差

    处理数据的作业和查询引擎在处理元数据操作上花费大量时间。

  • 数据湖中数据的更新困难

    绝大多数情况下,都是通过覆盖的方式更新数据,但是效率太低,并且难以维护。

为了解决以上的种种问题,Delta Lake 诞生了。

Delta Lake 介绍

关于 Delta Lake 的介绍,借助官网的一句话和一张图:

  1. Delta Lake is an open-source storage layer that brings ACID

  2. transactions to Apache Spark™ and big data workloads.


总结以下2点:

  • 开源的存储层

  • 为 Apache Spark 和大数据 workloads 提供 ACID 事务能力

我们再来看一下 Delta Lake 特性:

  • 支持 ACID 事务

    数据湖通常有多个数据管道提供并发读取和写入数据,但是由于缺少事务,数据工程师必须通过冗长的过程来确保数据的完整性。幸好,Delta Lake 为你的数据湖带来了ACID 事务。Delta Lake 还提供强大的可序列化隔离级别,这是隔离级别中最强的级别。

  • 可扩展的元数据处理

    在大数据中,甚至元数据本身也可以是“大数据”。Delta Lake 将表或目录的元数据信息存储在事务日志中,而不是 Metastore 中。Delta Lake 对待元数据就像对待数据一样,利用 Spark 的分布式处理能力来处理它的所有元数据。因此,Delta Lake 可以轻松地处理PB级的表和数十亿个分区和文件。

  • 时间旅行(数据版本)

    这里名字起的很好听,其实就是数据版本。Delta Lake 提供了数据快照(snapshot),使开发人员能够访问和恢复到早期版本的数据,以便进行审计、回滚或重现实验。当文件被修改文件时,Delta Lake 会创建较新版本的文件并保留旧版本的文件。当用户想要读取旧版本的表或目录时,他们可以在 Apache Spark 的读取 API 中提供时间戳或版本号,Delta Lake 根据事务日志中的信息构建该时间戳或版本的完整快照。这允许用户重现之前的数据,并在需要时将表还原为旧版本的数据。

  • 数据存储格式采用开源的开放格式

    Delta Lake 中的所有数据都以 Apache Parquet 格式存储,这使得 Delta Lake 能够利用 Parquet 特有的高效压缩和编码方案。

  • 统一流和批处理 Source 和 Sink

    Delta Lake 中的表既是批处理表,也是流 source 和 sink。除批处理写入外,Delta Lake 还可用作 Apache Spark structured streaming 的高效流式 sink。结合 ACID 事务和可扩展的元数据处理,高效的流式 sink 现在可以实现大量近实时分析用例,而无需同时维护复杂的流式传输和批处理管道。

  • Schema Enforcement

    Delta Lake 自动验证正在被写的 DataFrame 模式是否与表的模式兼容。表中存在但 DataFrame 中不存在的列会被设置为 null;如果 DataFrame 中有额外的列在表中不存在,那么该操作将抛出异常。如果没有 Schema enforcement ,单个列中可能会有不同的数据类型混在一起,对我们的数据可靠性造成了损害。例如,如果我们不小心把 StringType 类型的数据引入了一个 FloatType 数据类型的列,可能会无意中使机器学习模型无法读取列,破坏数据管道。

  • Schema Evolution

    大数据在不断变化,Delta Lake 可以让你能够对可自动应用的表 Schema 进行更改,而不需要繁琐的 DDL。Delta Lake 具有可以显式添加新列的 DDL 和自动更新 Schema 的能力。

  • 审计历史

    Delta Lake 事务日志详细记录了对数据所做的每个更改,提供了对这些更改的完整审计跟踪。

  • 更新和删除

    Delta Lake 支持使用 Scala/Java API 来 merge、update 和 delete 数据集。这使得数据工程师可以轻松地在数据湖中 insert/update 和 delete 记录。由于 Delta Lake 以文件级粒度跟踪和修改数据,因此它比读取和覆盖整个分区或表更有效。

  • 100% 兼容 Apache Spark API

    Delta Lake 目前需要 Apache Spark 2.4.2 版本。开发人员可以将 Delta Lake 与他们现有的数据管道一起使用,只需要做很少的更改,因为它与常用的大数据处理引擎 Spark 完全兼容,后面笔者会介绍一些示例。

好了,以上就是 DELTA LAKE 的十宗罪,哦不,是十大特性。

另外社区还在开发一些特性,比如支持 Apache Hive & Presto 和提高 Delta Lake 数据质量等。

关于 Delta Lake 的介绍,最后再总结一下:Delta Lake 定位是一个存储层,为 Apache Spark 和大数据 workloads 提供 ACID 事务能力,其通过写和快照隔离之间的乐观并发控制,在写入数据期间提供一致性的读取,从而为构建在 HDFS 和云存储上的数据湖带来可靠性。Delta Lake 还提供内置数据版本控制,以便轻松安全地回滚。

其实 Delta Lake 内部细节东西很多,比如时间旅行、事务日志、更新删除实现、底层存储等,随着笔者一步步实战,慢慢深入细节。接下来我们先抛开这些理论,带着大家实战操作。

Delta Lake 使用

截至目前,Apache Spark 已经发布了 Delta Lake 0.4.0,主要支持 DML 的 Python API、将 Parquet 表转换成 Delta Lake 表 以及部分 SQL 功能。

Spark 环境

笔者 Spark 版本为 2.4.4,指明delta版本后运行 spark-shell:

  1. spark-shell --packages io.delta:delta-core_2.11:0.4.0

  2. Spark context Web UI available at http://zhangyun:4040

  3. Spark context available as 'sc' (master = local[*], app id = local-1571560290657).

  4. Spark session available as 'spark'.

  5. Welcome to

  6. ____ __

  7. / __/__ ___ _____/ /__

  8. _\ \/ _ \/ _ `/ __/ '_/

  9. /___/ .__/\_,_/_/ /_/\_\ version 2.4.4

  10. /_/


  11. Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_211)

  12. Type in expressions to have them evaluated.

  13. Type :help for more information.


  14. scala>

创建表

为了创建 Delta Lake 表,在 DataFrame 指定写入数据的格式为 delta

  1. val data = spark.range(0, 5)

  2. data.write.format("delta").save("/tmp/delta-table")

查看 HDFS 文件:

  1. $ hdfs dfs -ls /tmp/delta-table

  2. Found 5 items

  3. drwxr-xr-x - zhangyun supergroup 0 2019-10-20 16:36 /tmp/delta-table/_delta_log

  4. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:36 /tmp/delta-table/part-00000-a547aff2-9cd5-4e2a-a3ec-8370308324c0-c000.snappy.parquet

  5. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:36 /tmp/delta-table/part-00001-c0f2f984-2d0d-494a-8772-5e8a34894eea-c000.snappy.parquet

  6. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:36 /tmp/delta-table/part-00002-34622b20-ff8f-403b-bf08-3c109e7121ba-c000.snappy.parquet

  7. -rw-r--r-- 1 zhangyun supergroup 437 2019-10-20 16:36 /tmp/delta-table/part-00003-2c963cec-8507-49a2-bff9-23fa49200c02-c000.snappy.parquet


读取数据

  1. val df = spark.read.format("delta").load("/tmp/delta-table")

  2. df.show()

输出结果为:

  1. +---+

  2. | id|

  3. +---+

  4. | 3|

  5. | 4|

  6. | 2|

  7. | 0|

  8. | 1|

  9. +---+

更新数据

  1. val data = spark.range(5, 10)

  2. data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

  3. df.show()

输出结果为:

  1. +---+

  2. | id|

  3. +---+

  4. | 8|

  5. | 9|

  6. | 7|

  7. | 5|

  8. | 6|

  9. +---+

查看 HDFS 数据文件:

  1. $ hdfs dfs -ls /tmp/delta-table

  2. Found 9 items

  3. drwxr-xr-x - zhangyun supergroup 0 2019-10-20 16:54 /tmp/delta-table/_delta_log

  4. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:54 /tmp/delta-table/part-00000-33de6d09-b71f-4c0d-9c5c-df9781c4fce1-c000.snappy.parquet

  5. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:36 /tmp/delta-table/part-00000-a547aff2-9cd5-4e2a-a3ec-8370308324c0-c000.snappy.parquet

  6. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:54 /tmp/delta-table/part-00001-5e51eedb-4a4f-433a-824b-c99df2276857-c000.snappy.parquet

  7. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:36 /tmp/delta-table/part-00001-c0f2f984-2d0d-494a-8772-5e8a34894eea-c000.snappy.parquet

  8. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:36 /tmp/delta-table/part-00002-34622b20-ff8f-403b-bf08-3c109e7121ba-c000.snappy.parquet

  9. -rw-r--r-- 1 zhangyun supergroup 429 2019-10-20 16:54 /tmp/delta-table/part-00002-468a086f-2dbb-49ae-8f8c-9ef8da418573-c000.snappy.parquet

  10. -rw-r--r-- 1 zhangyun supergroup 437 2019-10-20 16:54 /tmp/delta-table/part-00003-2c3cd1ba-f66b-4431-8338-20c561f9ce31-c000.snappy.parquet

  11. -rw-r--r-- 1 zhangyun supergroup 437 2019-10-20 16:36 /tmp/delta-table/part-00003-2c963cec-8507-49a2-bff9-23fa49200c02-c000.snappy.parquet

  12. zhangyun:Spark zhangyun$


更新符合条件的数据

上面的更新是覆盖更新,会更新所有数据,下面将介绍更新符合条件的数据:

  1. import io.delta.tables._

  2. import org.apache.spark.sql.functions._


  3. val deltaTable = DeltaTable.forPath("/tmp/delta-table")


  4. // Update every even value by adding 100 to it

  5. deltaTable.update(

  6. condition = expr("id % 2 == 0"),

  7. set = Map("id" -> expr("id + 100")))


  8. // Delete every even value

  9. deltaTable.delete(condition = expr("id % 2 == 0"))


  10. // Upsert (merge) new data

  11. val newData = spark.range(0, 20).toDF


  12. deltaTable.as("oldData")

  13. .merge(

  14. newData.as("newData"),

  15. "oldData.id = newData.id")

  16. .whenMatched

  17. .update(Map("id" -> col("newData.id")))

  18. .whenNotMatched

  19. .insert(Map("id" -> col("newData.id")))

  20. .execute()


  21. deltaTable.toDF.show()

输出结果为:

  1. scala> deltaTable.toDF.show()

  2. +---+

  3. | id|

  4. +---+

  5. | 4|

  6. | 8|

  7. | 15|

  8. | 13|

  9. | 0|

  10. | 16|

  11. | 14|

  12. | 18|

  13. | 6|

  14. | 7|

  15. | 10|

  16. | 3|

  17. | 1|

  18. | 12|

  19. | 11|

  20. | 19|

  21. | 5|

  22. | 2|

  23. | 17|

  24. | 9|

  25. +---+

为了便于分析,笔者把输出结果排序:

  1. scala> deltaTable.toDF.sort("id").show()

  2. +---+

  3. | id|

  4. +---+

  5. | 0|

  6. | 1|

  7. | 2|

  8. | 3|

  9. | 4|

  10. | 5|

  11. | 6|

  12. | 7|

  13. | 8|

  14. | 9|

  15. | 10|

  16. | 11|

  17. | 12|

  18. | 13|

  19. | 14|

  20. | 15|

  21. | 16|

  22. | 17|

  23. | 18|

  24. | 19|

  25. +---+

分析如下:

  1. // Update every even value by adding 100 to it

  2. deltaTable.update(

  3. condition = expr("id % 2 == 0"),

  4. set = Map("id" -> expr("id + 100")))

deltaTable 原始数据为:

  1. +---+

  2. | id|

  3. +---+

  4. | 8|

  5. | 9|

  6. | 6|

  7. | 7|

  8. | 5|

  9. +---+

对于原始数据符合 id % 2 == 0 条件的数据(6,8符合),则加上100, 该操作执行后,结果为

  1. +---+

  2. | id|

  3. +---+

  4. |108|

  5. | 9|

  6. |106|

  7. | 7|

  8. | 5|

  9. +---+

接着执行:

  1. // Delete every even value

  2. deltaTable.delete(condition = expr("id % 2 == 0"))

删除符合 id % 2 == 0 条件的数据(106,108符合),执行结果如下:

  1. +---+

  2. | id|

  3. +---+

  4. | 9|

  5. | 7|

  6. | 5|

  7. +---+

然后定义新的数据 newData(0到19共20个数字):

  1. // Upsert (merge) new data

  2. val newData = spark.range(0, 20).toDF

最后执行:

  1. deltaTable.as("oldData")

  2. .merge(

  3. newData.as("newData"),

  4. "oldData.id = newData.id")

  5. .whenMatched

  6. .update(Map("id" -> col("newData.id")))

  7. .whenNotMatched

  8. .insert(Map("id" -> col("newData.id")))

  9. .execute()

这个操作的含义是,如果 newData 和 oldData 匹配到相同数据,则用 newData 更新,否则 插入 newData 数据,类似 upsert 插入或更新功能。

因为 oldData 数据为5、7、9,而 newData 数据为 0、1、2、...、17、18、19,newData 包含了 oldData 的数据,所以结果数据也为 newData 的数据。

使用时间旅行(Time Travel)访问历史版本的数据

查询最原始未被更新的数据(排序输出):

  1. val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")

  2. df.sort("id").show()

结果为:

  1. +---+

  2. | id|

  3. +---+

  4. | 0|

  5. | 1|

  6. | 2|

  7. | 3|

  8. | 4|

  9. +---+

笔者再查看版本为1的数据:

  1. scala> val df = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta-table")

  2. df: org.apache.spark.sql.DataFrame = [id: bigint]


  3. scala> df.sort("id").show()

  4. +---+

  5. | id|

  6. +---+

  7. | 5|

  8. | 6|

  9. | 7|

  10. | 8|

  11. | 9|

  12. +---+

写实时流式数据到表中

我们可以使用Spark Structured Streaming来写入数据到 Delta Lake 表。Delta Lake 事务日志保证只进行一次处理,即使有其他流或批量查询并发地运行在该表上。默认情况下,流以追加模式运行,向表添加新的数据记录。

这里为了观察结果,需要打开两个 spark-shell 客户端 A 和 B。spark-shell 客户端 A 执行:

  1. val streamingDf = spark.readStream.format("rate").load()

  2. val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

该流式程序一直运行,直到手工cancel掉。

从表中读取实时流式数据

当流正在向 Delta Lake 表写入数据时,可以从该表作为流 source 进行读取。

spark-shell 客户端 B 执行如下操作,打印对 Delta Lake 表所做的所有更改:

  1. val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

输出结果:

  1. -------------------------------------------

  2. Batch: 11

  3. -------------------------------------------

  4. +---+

  5. | id|

  6. +---+

  7. |288|

  8. |292|

  9. |289|

  10. |293|

  11. |290|

  12. |294|

  13. |291|

  14. +---+


  15. -------------------------------------------

  16. Batch: 12

  17. -------------------------------------------

  18. +---+

  19. | id|

  20. +---+

  21. |295|

  22. |299|

  23. |296|

  24. |300|

  25. |297|

  26. |298|

  27. +---+


  28. -------------------------------------------

  29. Batch: 13

  30. -------------------------------------------

  31. +---+

  32. | id|

  33. +---+

  34. |301|

  35. |305|

  36. |302|

  37. |306|

  38. |303|

  39. |307|

  40. |304|

  41. +---+


  42. ......

到此,将 Delta Lake 常见的操作演示了一遍,当然也只是一些基本功能。

总结

笔者通过对 Delta Lake诞生背景、Delta Lake 项目本身的介绍以及示例演示,希望大家对该项目有更多的了解,加入到该项目生态中,以及将 Delta Lake 应用到企业级生产案例中。

至此,我们也可以对比一下数据湖、数据仓库、数据中台,简明扼要概括为:

  • 1)数据湖:   无为而治,目标AI

  • 2)数据仓库:分而治之,目标BI

  • 3)数据中台:一统天下,目标组织架构




欢迎加入数仓BI技术交流群。进群方式:请加同学微信(微信号:iom1128),回复:交流,审核通过 会自动拉你进群。


今日荐文

点击下方文字即可阅读

  1. Delta Lake | Apache大神带你了解数据湖

  2. 百万年薪挖了个P8程序员,难道是“水货”?

  3. 记一次字节跳动面试经历

  4. 58同城实时数仓架构与实践

  5. 数据湖?可否取代数据仓库?

  6. Flink:常见问题与排查思路

  7. 互联网版《哪吒2》

  8. 招聘贴 | 京东大数据、数仓招聘

  9. 小米 MySQL实时同步到数据仓库

  10. 为什么 30 岁的工程师容易跳槽?



觉得内容不错的话 请分享到朋友圈哦~

▼ 福利时刻 ▼ 


01. 后台回复「经典」,即可领取大数据数仓经典书籍。

02. 后台回复「加群」,或添加小助微信IDiom1128  拉您入群或领取资料。


技术大佬们在等你,各种资源定期分享~

Q: 关于大数据,你还想了解什么?

欢迎留言区与大家分享

觉得不错,请把这篇文章分享给你的朋友哦

入群请联系小助手:iom1128『紫霞仙子』

更多精彩,请戳"阅读原文"到"数仓之路"查看

更多精彩,请戳"阅读原文"到"数据分析"查看

 

 

关注不迷路~ 各种福利、资源定期分享

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存