Delta Lake 快速入门-PySpark 版
在昨天的文章使用Azure Databricks、Delta Lake 和 Azure Data Lake Storage 简化您的 Lakehouse 架构中提到了开源组件 Delta Lake, 在开源或者商业的数据湖或者沧湖一体架构里都离不开 Delta Lake,那么 Delta Lake 到底何方神圣?在开始快速入门之前,先让我们看看官方怎么定义 Delta Lake 的。
在使用的企业列表
Delta Lake 是什么
Delta Lake 是一个开源存储层,它为数据湖带来了可靠性。Delta Lake 提供 ACID 事务、可扩展的元数据处理,并统一流数据和批数据处理。Delta Lake 运行在您现有的 data Lake 之上,并且完全兼容 Apache Spark API。
具体来说,Delta Lake 具有以下特点:
Spark 上的 ACID 事务控制:
数据湖通常具有多个同时读取和写入数据的数据管道,并且由于缺乏事务,数据工程师必须经过繁琐的过程才能确保数据完整性。Delta Lake 将 ACID 事务带入您的数据湖。它提供了可序列化性,最强的隔离级别。
可伸缩的元数据处理:
在大数据中,甚至元数据本身也可以是大数据。Delta Lake 将元数据像数据一样对待,利用 Spark 的分布式处理能力来处理其所有元数据。这样,Delta Lake 可以轻松处理具有数十亿个分区和文件的 PB 级表。
数据版本控制:
Delta Lake 提供了数据快照,使开发人员可以访问和还原到较早版本的数据以进行审核,回滚或重现实验。
开放的数据格式:
Delta Lake 中的所有数据均以
Apache Parquet
格式存储,从而使 Delta Lake 能够利用 Parquet 固有的高效压缩和编码方案。统一的批流处理:
Delta Lake 中的表既是批处理表,又是流计算的 source 和 sink。流数据提取,批处理历史回填和交互式查询都可以直接使用它。
Schema 执行
Delta Lake 提供了指定和执行模式的功能。这有助于确保数据类型正确并且存在必需的列,从而防止不良数据导致数据损坏。
Schema 演化:
大数据在不断变化。Delta Lake 使您可以更改可自动应用的表结构,而无需繁琐的 DDL。
审核历史记录:
Delta Lake 事务日志记录有关数据所做的每项更改的详细信息,从而提供对更改的完整审核跟踪。
更新和删除:
Delta Lake 支持 Python、SparkSQL、Scala 和 Java API 进行合并,更新和删除数据集。
100%兼容
Apache Spark
API:开发人员可以将 Delta Lake 与现有的数据管道一起使用,而无需进行任何更改,因为它与常用的大数据处理引擎 Spark 完全兼容。
一行代码搞定 PySpark 环境
安装 anaconda 或者 miniconda 后,在虚拟环境里执行如下命令,
conda install -c conda-forge openjdk pyspark -y
该命令相当于同时安装了 Zulu OpenJDK 11、Spark3 和 PySpark。
如果你目前安装的 PySpark 是 2.x,记得更新到最新版本,
conda upgrade -c conda-forge -y pyspark
Delta Lake 入门
该入门和 PySpark 入门非常接近,只是额外引入了 Delta Lake 包
初始化 SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import *
spark = SparkSession.builder.appName("MyApp") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
创建 Delta table
代码上和写 Spark DataFrame 没有区别,只是把csv
、json
、parquet
等格式替换成了delta
。
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
读取 Delta table
读取也是标准 Spark API,指定格式为 delta。
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
| 4|
| 3|
| 0|
| 1|
| 2|
+---+
更新 Delta table
Delta Lake 支持使用标准 DataFrame API 修改表的多种操作。
覆盖
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
| 9|
| 6|
| 8|
| 7|
| 5|
+---+
update(更新)
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# 更新,如果值为偶数则加100
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
| 5|
| 9|
|106|
|108|
| 7|
+---+
删除
# 删除值为偶数的行
deltaTable.delete(condition = expr("id % 2 == 0"))
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
| 5|
| 9|
| 7|
+---+
Upsert(Merge)
# Upsert (merge) 新数据
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
+---+
| id|
+---+
| 1|
| 0|
| 2|
| 10|
| 18|
| 13|
| 11|
| 17|
| 3|
| 8|
| 14|
| 5|
| 6|
| 19|
| 7|
| 4|
| 9|
| 12|
| 15|
| 16|
+---+
读取历史版本数据(时间旅行)
可以使用versionAsOf
参数查询 Delta table 等历史快照
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()
+---+
| id|
+---+
| 4|
| 3|
| 0|
| 1|
| 2|
+---+
流数据写入 Delta table
可以使用Structured Streaming
写入 Delta table。Delta Lake 事务日志保证只处理一次,即使有其他流或对表并发运行的批查询。默认情况下,流以 append 模式运行,向表中添加新的记录:
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta")\
.option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
在流运行时,您可以使用之前的命令来读取表
如果要停止流,可以运行stream.stop()
。
从 Delta table 读取变化的流
在将流写入 Delta table 时,还可以从该表读取作为流源。例如,您可以启动另一个流查询,打印出对 Delta table 所做的所有更改。您可以通过提供startingVersion
或startingTimestamp
选项来指定从该版本开始进行更改的结构化流应该从哪个版本开始。
stream2 = spark.readStream.format("delta")\
.load("/tmp/delta-table").writeStream.format("console").start()
关闭 spark 实例
spark.stop()
欢迎关注公众号
有兴趣加群讨论数据挖掘和分析的朋友可以加我微信(witwall),暗号:入群