其他
Spark离线开发框架设计与实现
一、背景
二、框架设计
2.1 基础框架
2.2 可扩展工具
2.3 应用程序
2.3.1 SQL应用
<?xml version="1.0" encoding="UTF-8"?>
<project name="test">
<class>com.way.app.instance.SqlExecutor</class>
<path>sql文件路径</path>
<!-- sparksession conf -->
<conf>
<spark.executor.memory>1G</spark.executor.memory>
<spark.executor.cores>2</spark.executor.cores>
<spark.driver.memory>1G</spark.driver.memory>
<spark.executor.instances>20</spark.executor.instances>
</conf>
</project>
2.3.2 Java应用
<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_dwd_hanwuji_click_incr_day_domain">
<class>com.way.app.instance.ecommerce.Test</class>
<input>
<type>table</type>
<sql>select
clk_url,
clk_num
from test_table
where event_day='{DATE}'
and click_pv > 0
and is_ubs_spam=0
</sql>
</input>
<output>
<type>afs_kp</type>
<path>test/event_day={DATE}</path>
</output>
<conf>
<spark.executor.memory>2G</spark.executor.memory>
<spark.executor.cores>2</spark.executor.cores>
<spark.driver.memory>2G</spark.driver.memory>
<spark.executor.instances>10</spark.executor.instances>
</conf>
</project>
package com.way.app.instance.ecommerce;
import com.way.app.Application;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import java.util.Map;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
public class Test extends Application {
@Override
public void run() {
// 输入
Map<String, String> input = (Map<String, String>) property.get("input");
Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num");
// 逻辑处理(简单的筛选出url带有部分站点的日志)
JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> {
String url = row.getAs("url").toString();
return url.contains(".jd.com")
|| url.contains(".suning.com")
|| url.contains("pin.suning.com")
|| url.contains(".taobao.com")
|| url.contains("detail.tmall.hk")
|| url.contains(".amazon.cn")
|| url.contains(".kongfz.com")
|| url.contains(".gome.com.cn")
|| url.contains(".kaola.com")
|| url.contains(".dangdang.com")
|| url.contains("aisite.wejianzhan.com")
|| url.contains("w.weipaitang.com");
})
.toJavaRDD()
.map(row -> row.mkString("\001"));
// 输出
Map<String, String> output = (Map<String, String>) property.get("output");
outRdd.saveAsTextFile(getOutPut(output));
}
}
2.3.3 数据回溯应用
2.3.3.1 方案设计
在client提交application,首先client向RS申请启动ApplicationMaster RS先随机找到一台NodeManager启动ApplicationMaster ApplicationMaster向RS申请启动Executor的资源 RS返回一批资源给ApplicationMaster ApplicationMaster连接Executor 各个Executor反向注册给ApplicationMaster ApplicationMaster发送task、监控task执行,回收结果
执行环境准备 开始执行代码 释放资源
2.3.3.2 功能介绍
断点续回
回溯顺序
并行回溯
2.3.3.3 创建一个回溯任务
class参数为回溯应用的唯一类,必填参数,所有回溯任务无需变化。 type参数为回溯应用类型,默认为sql,若应用类型为java,则type值应为java类名。 path参数为回溯代码路径,必填参数,无默认值,通常与例行任务代码相同,无需修改。 limitdate参数为回溯的截止日期,必填参数,无默认值。 startdate参数为回溯开始日期,必填参数,无默认值,若任务进入断点续回或开启并行回溯时,则该参数无效。 order参数为回溯顺序,默认为倒序。当值为1时为正序,为值为-1时为倒序。 distance参数为回溯步长,框架默认为串行回溯,但也支持并行回溯,该参数主要用于支持并行回溯,当该参数存在且值不为-1时,回溯开始日期取值为基准日期。如启动两个并行任务,任务的执行范围为基准日期至基准日期加步长或limitdate,若基准日期加步长后日期大于limitdate,则是取limitdate,否则反之。 file参数为回溯日志文件,必填参数,无默认值,用于记录已回溯成功的日期,当失败再次重启任务时,startdate会以日志文件中日期的下一个日期为准。 conf参数与其他应用相同,为本次回溯任务的资源占用配置。
<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_ads_others_order_retain_incr_day">
<class>com.way.app.instance.ecommerce.Huisu</class>
<type>sql</type>
<path>/sql/ecommerce/ecommerce_ads_others_order_retain_incr_day.sql</path>
<limitdate>20220404</limitdate>
<startdate>20210101</startdate>
<order>1</order>
<distance>-1</distance>
<file>/user/ecommerce_ads_others_order_retain_incr_day/process</file>
<conf>
<spark.executor.memory>1G</spark.executor.memory>
<spark.executor.cores>2</spark.executor.cores>
<spark.executor.instances>30</spark.executor.instances>
<spark.yarn.maxAppAttempts>1</spark.yarn.maxAppAttempts>
</conf>
</project>
三、使用方式
3.1 使用介绍
3.2 使用对比
四、展望
---------- END ----------