查看原文
其他

数据开发优化实战案例:Join 数据倾斜

涤生-莫哥 涤生大数据
2024-12-05

本文分享一个工作中经常遇到的一个优化案例:Join 数据倾斜!

问题描述

券领取全量表运行时间时间超过5小时,分别排查map和reduce和join的运行时间,定位主要耗时在join阶段,对全部的join job运行实例按照时间排序,发现卡在某个join下的两个instance,耗时和数据量都是其他instance的数10倍,基本断定是在join的时候发生了数据倾斜; 

需求描述

因为是中间层,下游有很多任务依赖,需要优化任务,保障产出时间;

step1:定位发生倾斜的字段

定位发生倾斜倾斜发生的数据表,发现这两个join,主要是venture,promotion_id关联导致的数据倾斜;

step2: 定位发生倾斜的表

因为dim表是维度表,promotion_id分布不可能倾斜,只可能是dwd_xxx_ug_buyer_resource_df,对这张表按照promotion_id查看数据分布;

SELECT venture, promotion_id,count(*) AS collect_cntFROM dwd_xxx_ug_buyer_resource_dfWHERE ds = '20230101' GROUB BY venture, promotion_id

结果发现venture = ID, promotion_id = 207010821100001,207018017000001 (邮费券)领取数量级高达数亿级别,其他promotion_id都是千万级别,差别10倍,和之前定位的现象吻合。

step3: 把倾斜key单独拆分出来,然后Union ALL起来计算

但是调查发现平台已经实现这种方式的倾斜,直接用skewjoin 方式指定key就可以实现

/*+ skewjoin(coll(promotion_id,venture)(('207010821100001','ID'), ('207018017000001','ID')))


代码示例:不重要字段部分删减

INSERT OVERWRITE TABLE dwd_xxx_promotion_collect_df PARTITION(ds='${bizdate}',venture,collect_mon)SELECT /*+ skewjoin(coll(promotion_id,venture)(('207010821100001','ID'), ('207018017000001','ID'))),MAPJOIN(EXCHANGE,chl,t_channel,t_tag,t_voucher_pool) * /coll.id as id,coll.STATUS,COALESCE(lpi.gmt_create,coll.gmt_created) gmt_created,coll.activity_id,coll.collect_src,coll.buyer_id,coll.consume_type,coll.consume_id,sg_udf:decimalisation(coalesce(lpi.collect_amount,coll.discountValue,coll.p_decreaseMoney),coll.venture) discountValue_local,promotion.coupon_value coupon_value_local,COALESCE(sg_udf:decimalisation(coalesce(lpi.collect_amount,coll.discountValue,coll.p_decreaseMoney),coll.venture),promotion.coupon_value) collect_value_local,TO_CHAR(FROM_UNIXTIME(CAST( COALESCE(lpi.gmt_create,coll.gmt_created) / 1000 AS BIGINT )),'yyyymmdd') collect_date,TO_CHAR(sg_udf.epoch_to_timezone(COALESCE(lpi.gmt_create,coll.gmt_created),coll.venture),'yyyymmdd') collect_local_date -- ,t2.available_value,coll.feature,promotion.product_code,coalesce(promotion.product_sub_code,'unknown') product_sub_code,promotion.features,promotion.sponsor,snap.id snap_id--订单id: ,CASE WHEN snap.out_id IS NOT NULL THEN snap.out_idWHEN LENGTH(coll.consume_id) > 0 THEN coll.consume_idELSE NULLEND AS out_id,promotion.coupon_value * EXCHANGE.to_usd AS coupon_value_usd,COALESCE(sg_udf:decimalisation(coalesce(lpi.collect_amount,coll.discountValue,coll.p_decreaseMoney),coll.venture),promotion.coupon_value) * EXCHANGE.to_usd AS collect_value_usd,chl.channel_name,promotion.created_by,promotion.create_email,promotion.create_by_no,promotion.created_by_origin,COALESCE(promotion.purpose_id,'unknown') purpose_id,COALESCE(promotion.purpose_name,'unknown') purpose_name,coll.operation_parameter,coll.gmt_modified,COALESCE(vulcan.parent_plan_id,vulcan.plan_id) as plan_id,COALESCE(coll.tag_id,vulcan.tag_id) tag_id,COALESCE(t_tag.tag_name,vulcan.tag_name) tag_name,vulcan.module_id,vulcan.module_name,vulcan.page_id,vulcan.page_name,COALESCE(coll.scene_id,vulcan.channel_id) scene_id,COALESCE(t_channel.scene_name,vulcan.channel_name) scene_name,vulcan.plan_name,coll.start_time,TO_CHAR(sg_udf.epoch_to_timezone(coll.start_time,coll.venture),'yyyymmdd') start_local_date,coll.end_time,TO_CHAR(sg_udf.epoch_to_timezone(coll.end_time,coll.venture),'yyyymmdd') end_local_date,coll.total_count,coll.available_count,coll.in_use_count,coll.used_count,coll.invalid_count,coll.campaign_id,coll.gcp_page_id,coll.gcp_module_id,t_tag.tag_union_id-- 权益升级: 新增字段 --,promotion.biz_type,promotion.benefit_type,promotion.benefit_level,COALESCE(coll.pool_id,vulcan.pool_id) pool_id,t_voucher_pool.name as pool_name,sg_udf:decimalisation(coalesce(coll.p_amountat,0),coll.venture) * EXCHANGE.to_usd AS min_spend_usd,coalesce(coll.p_amountat,0) AS min_spend,sg_udf:decimalisation(coalesce(coll.p_maxdiscountfee,0),coll.venture) * EXCHANGE.to_usd AS cap_value_usd,coalesce(coll.p_maxdiscountfee,0) cap_value,promotion.tool_code,benefit_group_id,coll.venture,SUBSTR(TO_CHAR(sg_udf.epoch_to_timezone(COALESCE(lpi.gmt_create,coll.gmt_created),coll.venture),'yyyymmdd'),1,6) collect_monFROM (SELECT *FROM xxx_mkt.dwd_xxx_buyer_resource_df coll -- 领取全量表) collJOIN (SELECTCAST(promotion_id as STRING ) AS promotion_id,ds,ventureFROM xxx_mkt.dim_xxx_promotionWHERE ds = MAX_PT('xxx_mkt.dim_xxx_promotion')) promotionON coll.activity_id = promotion.promotion_idAND coll.venture = promotion.ventureLEFT JOIN t_will_iump_benefit_snapshot snapON coll.consume_id = snap.idAND coll.venture = snap.ventureLEFT JOIN t_vulcan vulcanon coll.id=vulcan.ref_record_idand coll.buyer_id=vulcan.user_idand coll.venture=vulcan.ventureLEFTJOIN (SELECT venture,to_usdFROM xxx_cdm.dim_lzd_exchange_rateWHERE ds = '${bizdate}') EXCHANGEON coll.venture = EXCHANGE.ventureLEFT JOIN (SELECT channel_id,channel_nameFROM xxx_mkt.dim_lzd_promotion_channel) chlON coalesce(coll.channel_id,promotion.channel_id) = chl.channel_idLEFT JOIN (SELECT id AS scene_id,channel_name AS scene_nameFROM xxx_tech.xxx_stars_sg_s_stars_channel_newWHERE ds = MAX_PT('xxx_tech.xxx_stars_sg_s_stars_channel_new')AND venture = 'ALL'AND env = 'PRD') t_channelON COALESCE(coll.scene_id,vulcan.channel_id) = t_channel.scene_idLEFT JOIN t_tag t_tagON COALESCE(coll.tag_id,vulcan.tag_id) = t_tag.tag_idAND coll.venture = t_tag.ventureLEFT JOIN t_voucher_pool_dim t_voucher_poolON COALESCE(coll.pool_id,vulcan.pool_id) = t_voucher_pool.idAND coll.venture = t_voucher_pool.venture;

优化结果: 由原来5~6个小时优化后,集群闲时21min,高峰期变成1.5个小时;

优化点:

单独处理大表join大表倾斜key的join关联逻辑;

经验总结:

发生数据倾斜,如果是大表关联小表发生倾斜,无论发生在什么阶段,都能map join或者广播小表的方式解决;如果小表不够小,超过了8096M(官方建议的最大设置)或者是大表关联大表发生倾斜只能用上面的方式来解决;

涤生大数据往期精彩推荐

1.企业数仓DQC数据质量管理实践篇

2.企业数据治理实战总结--数仓面试必备

3.OneData理论案例实战—企业级数仓业务过程

4.中大厂数仓模型规范与度量指标有哪些?

5.手把手教你搭建用户画像系统(入门篇上)

6.手把手教你搭建用户画像系统(入门篇下)

7.SQL优化之诊断篇:快速定位生产性能问题实践

8.SQL之优化篇:一文搞懂如何优化线上任务性能,增效降本!

9.新能源趋势下一个简单的数仓项目,助力理解数仓模型

10.基于FlinkSQL +Hbase在O2O场景营销域实时数仓的实践

11.开发实战角度:distinct实现原理及具体优化总结

12.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)

13.涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(二)

14.5分钟了解实时车联网,车联网(IoV)OLAP 解决方案是怎样的?

15.企业级Apache Kafka集群策略:Kakfa最佳实践总结

16.玩转Spark小文件合并与文件读写提交机制

17.一文详解Spark内存模型原理,面试轻松搞定

18.大厂8年老司机漫谈数仓架构

19.一文带你深入吃透Spark的窗口函数

20.大数据实战:基于Flink+ODPS进行最近N天实时标签构建

21.数仓面试高频-如何在Hive中实现拉链表

22.数仓面试还不懂什么是基线管理?

23.传说中的热点值打散之代码怎么写? 

24.列转行经典实现,细谈hive中的爆炸函数

25.玩转大厂金融风控体系建设

26.实际开发中:如何有效运用Spark Catalyst的执行流程

27.数据开发优化实战案例:Group by数据倾斜



继续滑动看下一个
涤生大数据
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存