查看原文
其他

开放表格格式 — Delta Lake、Iceberg和Hudi

常华Andy Andy730
2025-01-01

什么是表格(Table)格式?

表格格式是一种用于组织数据文件的方式,旨在将数据湖中的数据结构化为类似于数据库的形式。

Apache Hive表格格式是最早且最常用的表格格式之一。

然而,Hive表格格式存在一些固有的问题:
  • 过时的表统计信息(分区生命周期)
  • 用户必须了解数据物理布局
  • 缺乏数据的谱系/历史记录
  • 缺乏模式演变(schema evolution)

Hive是在云计算时代之前编写的,其设计初衷并非针对对象存储进行优化。随着云计算的崛起和新的使用模式的出现,Hive开始面临性能上的挑战。Hive中的元数据表增长迅速,严重影响了其性能表现。

随着数据领域的发展,新的应用场景对数据湖提出了更高的要求。特别是,新系统需要以下数据湖功能作为核心特性:
  • 事务支持(具备ACID属性)
  • 统一的批处理和流处理
  • 数据变更(支持合并或更正延迟的数据)
  • 模式强制执行、演变和版本控制
  • 元数据扩展性
此外,新应用场景还需要支持以下特性:
  • 时间旅行(Time Travel)
  • 并发读写
  • 独立于存储的消费模式
  • 数据质量
  • 可插拔存储
为了解决这些问题,业界开始创建新的开放表格格式(Open Table Formats)。在本文中,我们将重点讨论以下三种表格格式:
  • Delta Table - Databricks
  • Iceberg - Netflix
  • Hudi - Uber

Delta Table

Delta Table是一种带有事务日志的Parquet表格。

Delta日志:这是Delta Table上所有操作的记录日志,包括添加文件、删除文件、更新元数据、设置事务、更改协议以及提交信息等。Delta日志以JSON文件格式存储,用于记录Delta Table在任何给定时间点的状态。

数据文件:这些是包含实际数据的Parquet文件,具有不可变性质。当数据行的内容需要更改时,Delta Lake会将当前文件中的所有行加载到内存中,应用必要的更新,然后将结果写出为新的Parquet文件。随后,这些操作会被记录在Delta日志中,旧的Parquet文件会从当前状态中移除,而新的Parquet文件会被添加到状态中。

如果您正在考虑使用Databricks的产品,Delta Table格式是一个明智的选择。

from pyspark.sql import SparkSessionfrom delta import DeltaTable
spark = SparkSession.builder \ .appName("Delta with PySpark") \ .config('spark.jars.packages', 'io.delta:delta-core_2.12:2.1.1') \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config( "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog", ) \ .config("spark.sql.warehouse.dir", "spark-warehouse") \ .master("local[*]") \ .enableHiveSupport() \ .getOrCreate()
df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")df_sales.write \ .format("delta") \ .mode("overwrite") \ .option("mergeSchema", True) \ .saveAsTable("sales_delta_managed_delta")

dt = DeltaTable.forName(spark, "sales_delta_managed")dt.history().select("version", "timestamp").show(truncate=False)
%%sparksqlselect * from default.sales_delta_managed limit 5;update default.sales_delta_managed set amount = 428 where trx_id = 123
dt.history().select("version", "timestamp").show(truncate=False)


Iceberg

在Iceberg中,元数据的管理涉及到三个关键文件:1. 元数据文件 2. Manifest列表 3. Manifest文件。

Metadata.json - 此文件存储了与表相关的元数据信息。具体包括以下子部分:
  • 快照(Snapshot):它记录了表中所有文件的当前状态的快照列表。其中包含了表模式、分区规范以及清单列表的位置信息。
  • 模式(Schemas):用于跟踪表的结构变化。所有对表模式的修改都会在schemas数组中记录下来。
  • 分区规范(partition-specs):存储了关于如何对数据进行分区的信息。
  • 排序顺序(sort-orders)

清单列表:这是一个文件,其中包含了所有的清单文件及其相关指标(如清单文件所覆盖的分区列的值范围)。它充当了清单与快照之间的桥梁。

清单:这是一个包含了数据文件列表的文件,其中还记录了这些文件的格式、位置以及指标信息(如每列数据的边界,如行数等)。

数据文件:这些是实际存储数据的物理文件。

Iceberg旨在解决与S3对象列表或Hive元数据存储的分区枚举相关的性能瓶颈问题。

Apache Iceberg致力于创建一个可移植的格式层,并提供了一套规范和目录服务的替代方案,以便与Hive等系统更好地集成。

注意:还有一个名为Hiveberg的插件,它允许我们从Hive的元数据存储中读取Iceberg表格,从而实现了两者的互操作性。

from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("Icerberg with PySpark") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.0.0") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog",) \ .config("spark.sql.warehouse.dir", "spark-warehouse") \ .config("spark.sql.catalog.spark_catalog.type", "hive") \ .config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.demo.warehouse", "path/to/warehouse") .config("spark.sql.catalog.iceberg.type", "hadoop") \ .config("spark.sql.defaultCatalog", "demo") .master("local[*]") \ .enableHiveSupport() \ .getOrCreate()
df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")df_sales.write \ .format("iceberg") \ .mode("overwrite") \ .saveAsTable("sales_delta_managed_iceberg")
与Nessie协作的配置:
org.projectnessie:nessie-spark-extensions-3.3_2.12:0.44.0 # Library for working with Nessie-based catalogs like Dremio Arcticorg.projectnessie.spark.extensions.NessieSparkSessionExtensionsspark.sql.extensions="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions" \spark.sql.catalog.nessie.uri=$ARCTIC_URI \spark.sql.catalog.nessie.ref=main \spark.sql.catalog.nessie.authentication.type=BEARER \spark.sql.catalog.nessie.authentication.token=$TOKEN \spark.sql.catalog.nessie.catalog-impl=org.apache.iceberg.nessie.NessieCatalog \spark.sql.catalog.nessie.warehouse=$WAREHOUSE \spark.sql.catalog.nessie=org.apache.iceberg.spark.SparkCatalog \spark.sql.catalog.nessie.io-impl=org.apache.iceberg.aws.s3.S3FileIO
Iceberg中的文件格式
  • 数据:Parquet
  • 元数据:JSON
  • 清单列表:Avro
  • 清单:Avro

Iceberg支持ACID特性:

在Iceberg中,当写入操作启动更新时,会创建元数据文件,并尝试通过将元数据文件指针从当前版本交换到新版本来提交更新。但是,如果写入者发现基于其进行更新的快照不再是当前的,则必须根据新版本重试更新。

通过将一个表的元数据文件的原子交换为另一个表提供了可串行化隔离的基础。

读取操作与并发写入操作隔离,并始终使用表数据的已提交快照。


Hudi

索引:记录键和文件组/文件ID之间的映射。

时间轴:记录表上所有操作的事件序列,这些操作发生在不同的时刻。

数据文件:实际的Parquet格式数据文件。

如果数据湖文件需要被多种消费工具访问,并且需要管理可变数据集,Hudi是一个不错的选择。

from pyspark.sql import SparkSession
spark = SparkSession.builder \ .appName("Hudiwith PySpark") \ .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1") \ .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog",) \ .config("spark.sql.warehouse.dir", "spark-warehouse") \ .config("spark.sql.catalog.spark_catalog.type", "hive") \ .master("local[*]") \ .enableHiveSupport() \ .getOrCreate()
df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")df_sales.write \ .format("hudi") \ .mode("overwrite") \ .saveAsTable("sales_delta_managed_hudi")
df = spark.read. \ format("hudi"). \ option("as.of.instant", "2021-07-28"). \ load(basePath)
df_sales.write.format("hudi"). \ options(**hudi_options). \ mode("append"). \ saveAsTable("sales_delta_managed_hudi")
# create streaming dfdf = spark.readStream \ .format("hudi") \ .load(basePath)
# write stream to new hudi tabledf.writeStream.format("hudi") \ .options(**hudi_streaming_options) \ .outputMode("append") \ .option("path", baseStreamingPath) \ .option("checkpointLocation", checkpointLocation) \ .trigger(once=True) \ .start()


总结

Delta Lake在数据湖和数据管道场景中表现卓越,而Iceberg在数据仓库和分析领域具有显著优势。至于Hudi,它在实时数据处理和流式分析场景中展现出了强大的能力。

注意:目前有一些项目,如Delta UniForm或Onetable,正致力于实现Delta Lake、Apache Hudi和Apache Iceberg之间的互操作性。Onetable的方法允许同一份数据在不同系统中进行查询,从而实现了数据的无缝集成。

例如,可以将使用Hudi摄取的表公开为Iceberg和/或Delta Lake表,而无需用户复制或移动底层数据文件。这种方法不仅保留了相似的提交历史,还支持正确的时间点查询,从而提高了数据的一致性和可用性。

开放表格格式(Open Table Formats)实际上是我们数据存储的包装器。它通常通过一系列文件来实现以下功能:
  1. 跟踪表的模式/分区(DDL)更改。
  2. 跟踪表的数据文件及其列统计信息。
  3. 跟踪表上所有的插入/更新/删除(DML)操作。

-----

Source:Amit Singh Rathore; Open Table Formats — Delta, Iceberg & Hudi; Jun 2, 2022


--【本文完】---

近期受欢迎的文章:

  1. 2024年数据平台的十大关注热点

  2. 2024年非结构化数据管理的4大变革方式

  3. 2024年四大数据存储趋势

  4. 2024年数据管理和存储的四大预测

  5. AI数据湖:大数据浪潮与企业技术重塑



更多交流,可添加本人微信

(请附姓名/关注领域)

继续滑动看下一个
Andy730
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存