数据开发优化实战案例:Join 数据倾斜
本文分享一个工作中经常遇到的一个优化案例:Join 数据倾斜!
问题描述
券领取全量表运行时间时间超过5小时,分别排查map和reduce和join的运行时间,定位主要耗时在join阶段,对全部的join job运行实例按照时间排序,发现卡在某个join下的两个instance,耗时和数据量都是其他instance的数10倍,基本断定是在join的时候发生了数据倾斜;
需求描述
因为是中间层,下游有很多任务依赖,需要优化任务,保障产出时间;
step1:定位发生倾斜的字段
定位发生倾斜倾斜发生的数据表,发现这两个join,主要是venture,promotion_id关联导致的数据倾斜;
step2: 定位发生倾斜的表
因为dim表是维度表,promotion_id分布不可能倾斜,只可能是dwd_xxx_ug_buyer_resource_df,对这张表按照promotion_id查看数据分布;
SELECT
venture,
promotion_id,
count(*) AS collect_cnt
FROM
dwd_xxx_ug_buyer_resource_df
WHERE
ds = '20230101' GROUB BY venture,
promotion_id
结果发现venture = ID, promotion_id = 207010821100001,207018017000001 (邮费券)领取数量级高达数亿级别,其他promotion_id都是千万级别,差别10倍,和之前定位的现象吻合。
step3: 把倾斜key单独拆分出来,然后Union ALL起来计算
但是调查发现平台已经实现这种方式的倾斜,直接用skewjoin 方式指定key就可以实现
/*+ skewjoin(coll(promotion_id,venture)(('207010821100001','ID'), ('207018017000001','ID')))
代码示例:不重要字段部分删减
INSERT OVERWRITE TABLE dwd_xxx_promotion_collect_df PARTITION(ds='${bizdate}',venture,collect_mon)
SELECT /*+ skewjoin(coll(promotion_id,venture)(('207010821100001','ID'), ('207018017000001','ID'))),MAPJOIN(EXCHANGE,chl,t_channel,t_tag,t_voucher_pool) * /
coll.id as id
,coll.STATUS
,COALESCE(lpi.gmt_create,coll.gmt_created) gmt_created
,coll.activity_id
,coll.collect_src
,coll.buyer_id
,coll.consume_type
,coll.consume_id
,sg_udf:decimalisation(
coalesce(lpi.collect_amount,coll.discountValue,coll.p_decreaseMoney)
,coll.venture
) discountValue_local
,promotion.coupon_value coupon_value_local
,COALESCE(
sg_udf:decimalisation(
coalesce(lpi.collect_amount,coll.discountValue,coll.p_decreaseMoney)
,coll.venture
)
,promotion.coupon_value
) collect_value_local
,TO_CHAR(
FROM_UNIXTIME(
CAST( COALESCE(lpi.gmt_create,coll.gmt_created) / 1000 AS BIGINT )
)
,'yyyymmdd'
) collect_date
,TO_CHAR(
sg_udf.epoch_to_timezone(COALESCE(lpi.gmt_create,coll.gmt_created),coll.venture)
,'yyyymmdd'
) collect_local_date -- ,t2.available_value
,coll.feature
,promotion.product_code
,coalesce(promotion.product_sub_code,'unknown') product_sub_code
,promotion.features
,promotion.sponsor
,snap.id snap_id
--订单id:
,CASE WHEN snap.out_id IS NOT NULL THEN snap.out_id
WHEN LENGTH(coll.consume_id) > 0 THEN coll.consume_id
ELSE NULL
END AS out_id
,promotion.coupon_value * EXCHANGE.to_usd AS coupon_value_usd
,COALESCE(
sg_udf:decimalisation(
coalesce(lpi.collect_amount,coll.discountValue,coll.p_decreaseMoney)
,coll.venture
)
,promotion.coupon_value
) * EXCHANGE.to_usd AS collect_value_usd
,chl.channel_name
,promotion.created_by
,promotion.create_email
,promotion.create_by_no
,promotion.created_by_origin
,COALESCE(promotion.purpose_id,'unknown') purpose_id
,COALESCE(promotion.purpose_name,'unknown') purpose_name
,coll.operation_parameter
,coll.gmt_modified
,COALESCE(vulcan.parent_plan_id,vulcan.plan_id) as plan_id
,COALESCE(coll.tag_id,vulcan.tag_id) tag_id
,COALESCE(t_tag.tag_name,vulcan.tag_name) tag_name
,vulcan.module_id
,vulcan.module_name
,vulcan.page_id
,vulcan.page_name
,COALESCE(coll.scene_id,vulcan.channel_id) scene_id
,COALESCE(t_channel.scene_name,vulcan.channel_name) scene_name
,vulcan.plan_name
,coll.start_time
,TO_CHAR(
sg_udf.epoch_to_timezone(coll.start_time,coll.venture)
,'yyyymmdd'
) start_local_date
,coll.end_time
,TO_CHAR(
sg_udf.epoch_to_timezone(coll.end_time,coll.venture)
,'yyyymmdd'
) end_local_date
,coll.total_count
,coll.available_count
,coll.in_use_count
,coll.used_count
,coll.invalid_count
,coll.campaign_id
,coll.gcp_page_id
,coll.gcp_module_id
,t_tag.tag_union_id
-- 权益升级: 新增字段 --
,promotion.biz_type
,promotion.benefit_type
,promotion.benefit_level
,COALESCE(coll.pool_id,vulcan.pool_id) pool_id
,t_voucher_pool.name as pool_name
,sg_udf:decimalisation(
coalesce(coll.p_amountat,0)
,coll.venture
) * EXCHANGE.to_usd AS min_spend_usd
,coalesce(coll.p_amountat,0) AS min_spend
,sg_udf:decimalisation(
coalesce(coll.p_maxdiscountfee,0)
,coll.venture
) * EXCHANGE.to_usd AS cap_value_usd
,coalesce(coll.p_maxdiscountfee,0) cap_value
,promotion.tool_code
,benefit_group_id
,coll.venture
,SUBSTR(TO_CHAR(
sg_udf.epoch_to_timezone(COALESCE(lpi.gmt_create,coll.gmt_created),coll.venture)
,'yyyymmdd'
),1,6) collect_mon
FROM (
SELECT *
FROM xxx_mkt.dwd_xxx_buyer_resource_df coll -- 领取全量表
) coll
JOIN (
SELECT
CAST(promotion_id as STRING ) AS promotion_id,
ds,
venture
FROM xxx_mkt.dim_xxx_promotion
WHERE ds = MAX_PT('xxx_mkt.dim_xxx_promotion')
) promotion
ON coll.activity_id = promotion.promotion_id
AND coll.venture = promotion.venture
LEFT JOIN t_will_iump_benefit_snapshot snap
ON coll.consume_id = snap.id
AND coll.venture = snap.venture
LEFT JOIN t_vulcan vulcan
on coll.id=vulcan.ref_record_id
and coll.buyer_id=vulcan.user_id
and coll.venture=vulcan.venture
LEFT
JOIN (
SELECT venture
,to_usd
FROM xxx_cdm.dim_lzd_exchange_rate
WHERE ds = '${bizdate}'
) EXCHANGE
ON coll.venture = EXCHANGE.venture
LEFT JOIN (
SELECT channel_id
,channel_name
FROM xxx_mkt.dim_lzd_promotion_channel
) chl
ON coalesce(coll.channel_id,promotion.channel_id) = chl.channel_id
LEFT JOIN (
SELECT id AS scene_id
,channel_name AS scene_name
FROM xxx_tech.xxx_stars_sg_s_stars_channel_new
WHERE ds = MAX_PT('xxx_tech.xxx_stars_sg_s_stars_channel_new')
AND venture = 'ALL'
AND env = 'PRD'
) t_channel
ON COALESCE(coll.scene_id,vulcan.channel_id) = t_channel.scene_id
LEFT JOIN t_tag t_tag
ON COALESCE(coll.tag_id,vulcan.tag_id) = t_tag.tag_id
AND coll.venture = t_tag.venture
LEFT JOIN t_voucher_pool_dim t_voucher_pool
ON COALESCE(coll.pool_id,vulcan.pool_id) = t_voucher_pool.id
AND coll.venture = t_voucher_pool.venture
;
优化结果: 由原来5~6个小时优化后,集群闲时21min,高峰期变成1.5个小时;
优化点:
单独处理大表join大表倾斜key的join关联逻辑;
经验总结:
发生数据倾斜,如果是大表关联小表发生倾斜,无论发生在什么阶段,都能map join或者广播小表的方式解决;如果小表不够小,超过了8096M(官方建议的最大设置)或者是大表关联大表发生倾斜只能用上面的方式来解决;
涤生大数据往期精彩推荐
8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!
10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践
12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)
13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)
14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?
15.企业级Apache Kafka集群策略:Kakfa最佳实践总结
20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建
25.玩转大厂金融风控体系建设
26.实际开发中:如何有效运用Spark Catalyst的执行流程