查看原文
其他

Delta Lake 中使用 Spark SQL DDL 和 DML

alitrack alitrack 2022-10-01

今天的内容可以理解为 Delta Lake 快速入门-Spark SQL 版

我们可以在 Python、Scala 或者 Java 中通过 spark.sql 来执行 SQL,也可以运行 Spark Thrift server(参考Spark Thrift Server 快速入门),直接在 SQL 客户端(如DBeaver)或者 BI(如 Superset、Metabase)里执行 SQL。

测试数据

Lending Club[1]

下载链接:https://pages.databricks.com/rs/094-YMS-629/images/SAISEU19-loan-risks.snappy.parquet

运行 Spark Thrift Server

今天介绍另外一种方法启动 Spark Thrift Server 以及使用 ipython-sql 来运行 SQL

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL") \
    .config("hive.server2.thrift.port""10000")\
    .config("spark.sql.hive.thriftServer.singleSession"True)\
    .enableHiveSupport()\
    .config("spark.jars.packages""io.delta:delta-core_2.12:0.8.0") \
    .config("spark.sql.extensions""io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog""org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from py4j.java_gateway import java_import

sc = spark.sparkContext
java_import(sc._gateway.jvm, "")
#Start Spark Thrift Server using the jvm and passing the SparkSession
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver \
    .HiveThriftServer2.startWithContext(spark._jwrapped)

上面代码初始化了 SparkSession,并启动了 Spark Thrift Server, 端口 10000。

Delta table 与 Spark SQL

查看数据库

show databases;  -- 返回default
show tables-- 返回空

创建 Delta table

CREATE TABLE loan_risk_delta (
loan_id bigint,
funded_amnt int,
paid_amnt double,
addr_state string
)
USING DELTA
LOCATION '/tmp/loan_risk_delta'

导入数据

从 Parquet 或者 Spark DataFrame 导入

INSERT INTO loan_risk_delta
SELECT * FROM parquet.`SAISEU19-loan-risks.snappy.parquet`;
SELECT COUNT(*) FROM loan_risk_delta;
-- 返回14705

上面 2 条语句也可以合并一次完成,

CREATE TABLE loan_risk_delta1
USING DELTA
LOCATION '/tmp/loan_risk_delta1'
AS SELECT * FROM parquet.`SAISEU19-loan-risks.snappy.parquet`

修改表

CREATE OR REPLACE TABLE loan_risk_delta (
loan_id bigint,
funded_amnt int,
paid_amnt double,
addr_state string
)
USING DELTA
PARTITIONED BY (addr_state)
LOCATION '/tmp/loan_risk_delta'

追加数据

INSERT INTO loan_risk_delta
SELECT * FROM loan_risk_delta1;
SELECT COUNT(*) FROM loan_risk_delta;

-- 返回29410

覆盖

INSERT OVERWRITE loan_risk_delta
SELECT * FROM loan_risk_delta1;
SELECT COUNT(*) FROM loan_risk_delta
-- 返回14705

删除与更新

DELETE FROM loan_risk_delta
WHERE addr_state='WY';

UPDATE loan_risk_delta
SET addr_state = 'West Virginia'
WHERE addr_state = 'WV';

SELECT addr_state, COUNT(*) as cnt
FROM loan_risk_delta
WHERE addr_state in ('WY','West Virginia','WV')
GROUP BY 1;
--返回
addr_state cnt
-------------------
West Virginia 60

Upsert

-- Upsert data to a target Delta
-- table using merge
MERGE INTO loan_risk_delta
USING loan_risk_delta1
ON loan_risk_delta.loan_id = loan_risk_delta1.loan_id
WHEN MATCHED THEN UPDATE
SET loan_risk_delta.addr_state = loan_risk_delta1.addr_state
WHEN NOT MATCHED THEN INSERT
(loan_id, funded_amnt, paid_amnt,addr_state)
VALUES (loan_id, funded_amnt, paid_amnt,addr_state);

SELECT addr_state, COUNT(*) as cnt
FROM loan_risk_delta
WHERE addr_state in ('WY','West Virginia','WV')
GROUP BY 1;

-- 返回
addr_state cnt
WY     31
WV     60

体验批流一体

  • 模拟流数据的函数,
import random
from pyspark.sql.functions import *
from pyspark.sql.types import *

def random_checkpoint_dir(table_path):
    return f"{table_path}/chkpt/{random.randint(0, 10000)}" 

#UDF:生成随机州
states = ["CA""TX""NY""IA"]

@udf(returnType=StringType())
def random_state():
    return str(random.choice(states))

# 随机生成的数据流,并将其追加到delta table
def generate_and_append_data_stream(table_format, table_path):
    stream_data = spark.readStream.format("rate").option("rowsPerSecond"50).load() \
        .withColumn("loan_id"10000 + col("value")) \
        .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) \
        .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) \
        .withColumn("addr_state", random_state()) \
        .select("loan_id""funded_amnt""paid_amnt""addr_state")
    # *********** FIXED THE SCHEMA OF THE GENERATED DATA *************

    query = stream_data.writeStream \
        .format(table_format) \
        .option("checkpointLocation", random_checkpoint_dir(table_path)) \
        .trigger(processingTime="10 seconds") \
        .start(table_path)

    return query
  • 执行两个到 delta table 的流写入,
delta_path = "/tmp/loan_risk_delta"

stream_1 = generate_and_append_data_stream(table_format = "delta", table_path = delta_path)
stream_2 = generate_and_append_data_stream(table_format = "delta", table_path = delta_path)
  • 测试

接下来每次执行,

SELECT COUNT(*) FROM loan_risk_delta

都会发现数据量在增加。

  • 关闭流
import shutil
def stop_all_streams():
    # Stop all the streams
    print("Stopping all streams")
    for s in spark.streams.active:
        s.stop()
    print("Stopped all streams")
    print("Deleting checkpoints")
    shutil.rmtree("/tmp/loan_risk_delta/chkpt/"True)
    print("Deleted checkpoints")

stop_all_streams()

关闭 Spark 和 Spark Thrift Server

spark.stop()

参考资料

[1]

Lending Club: https://www.kaggle.com/wendykan/lending-club-loan-data



欢迎关注公众号

有兴趣加群讨论数据挖掘和分析的朋友可以加我微信(witwall),暗号:入群


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存