HBase Bulkload 实践探讨
点击关注“有赞coder”
获取更多技术干货哦~
一、 背景
二、 Bulkload 技术简介
Extract,异构数据源数据导入到 HDFS 之上。 Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。 Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。
三、 实践用法介绍
3.1 MR
编写 mapper 类,该类最核心的工作是将 HDFS 上的其他数据格式转换成 HBase 的 Put 对象与对应的 rowkey。 编写 MapReduce Job 类,核心点需要通过 HFileOutputFormat 2配置 job。 运行 MR 任务。
3.2 Hive SQL
可以直接从 Hive 导出数据到 HBase,不用关心底层文件存储的格式信息。
SQL 实现数据清洗与转换,表达能力强。
流程复杂,需要很多前置工作 底层依然是 MR 任务,由于前置工作整体执行时间可能会更长 SQL 产出任务在指定 HFile 产出的分区之后可能会因为在某个分区 Hive 表没有数据而失败
3.3 Spark Bulkload
优点:
比 MR 执行的快。 可以借助 Spark SQL 完成从 Hive 的数据抽取与过滤。 Spark 社区更加活跃,问题更容易找到解决方案。
四、 有赞 Bulkload 方式演进
4.1 Hive SQL 方案
我们在开始引入的 DataX 可以做Hive等异构数据源导入到 HBase 的方案来解决数据迁移,而随着业务发展越来越多的业务比如算法的很多任务,他们会首先经过迭代计算将数据生成到 Hive 表里,为了可以实时读取这部分数据需要将数据导入到 HBase 线上集群,而这部分数据量级一般都在亿级别,此时再用 DataX 就不那么合适,所以为了解决 Hive -> HBase Bulkload 这个路径,我们研发了Hive SQL 的方案,此方案的执行流程图如下:
流程解析必要的前置工作,检查hive表的读取权限,传参的表字段是否有差错。
获取 HBase 表的 Region 边界点,用于再之后 SQL 生成 HFile 时按照 Region 的区间,可以通过简单的 java 程序去读取表的信息实现。这一步至关重要,如果不按照表的分区生成 HFile,那么再最后的 Load 阶段,我们可以看到有大量的时间耗费在HFile split 的过程。
创建 Hive 表用来生成分区数据,注意,这里需要指定表的 location 属性,用于存放接下来要生成的 lst 分区文件。
生成分区文件。这一步稍微复杂,我们分流程叙述。
我们将第 3 步生成分区表标记为表 A ,将第2步生成的分区数据通过 Hive SQL 插入到一张临时表 A' 里,这两张表都只有一个字段 rowkey,类型为 String。
通过 insert overwrite table 命令将 A' 表中的数据插入到 A 中,此时在表 A 的 location 目录下可以生成若干分区文件。
把这些分区文件通过 cp -f 命令拷贝到 location 目录下的 xx.lst 文件中,这一步是必要的整合过程。
指定 reduce task 的个数为分区的个数。 指定 hive.mapred.partitioner 为 org.apache.hadoop.mapred.lib.TotalOrderPartitioner。 指定 total.order.partitioner.natural.order 为 false。 指定 mapreduce.totalorderpartitioner.path 为 location下的 xx.lst。 指定 hive.hbase.generatehfiles 为 true。 可以指定压缩属性为 lzo hfile.compression=lzo 以上的配置可以通过 set 命令在Hive执行命令前生效。 生成 HFile 的伪代码是 select [columns] from hive_table where 过滤条件 cluster by rowkey。必须通过 cluster by 做全局排序,order by 这里不生效。
exportHADOOP_CLASSPATH=`hbase classpath`
我们可以看出该实现流程比较复杂,为了能达到按照 Region 分区生成 HFile,达到比较好的 Bulkload 效率,我们做了额外很多工作。但这个版本在后续生产中给我们埋下了一个坑。看下面这个例子,假设我们获取的 Region 分区是 [1,3,5,7],如果 Hive 表里 rowkey 在 [3,5] 这个范围内没有数据,那么第 6 步 生成 HFile 就会报 FileNotFound 的异常,而这样的情况发生概率并不低。为了解决这个问题,我们在获取 Region 边界之后额外引入的一步,用来做 Region 边界的裁剪。简单就说就是先根据边界去并发的扫 Hive 表,如果这个边界内 Hive 表没有数据,那么就合并该边界。以上面的例子来说,如果 [3,5] 边界没有数据,最后生成的边界为 [1,3,7],此时还要考虑 3,7 这个区间内是否有数据,如果没有就合并成 [1,7],以此类推。虽然可以解决这个异常,同时并发去扫描 Hive 数据来节约时间,但是这一检验的步骤仍然十分缓慢,所以这个方案在后来的迭代中彻底废弃掉。但这个方案并不是一无是处,我们通过 SQL 就可以生成 HFile,这显然是充满诱惑力的,没有特殊要求的可以只导出部分字段,那么在平台上配置下字段即可,有特殊的要求可以通过写 SQL 的 where 语句做过滤,这显然比 MR 更加灵活(前提是做好这些前置工作)。
4.2 Spark Bulkload
获取 Table 的 Region 信息。
根据配置生成 SQL 并通过 Spark SQL 生成 Dataset。
创建一个 Partitioner 对象,getPartition 根据 rowkey 找到所属的分区的 index,这步比较关键。
创建一个 Comparator,比较 rowkey 以及 cf:quafilier,下文会详细介绍。
从 SQL 中一条条读取数据并根据逻辑过滤,返回一个
List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>,KeyValue>>
列表。调用 flatMapToPair 方法处理第 5 步生成的列表。
调用 repartitionAndSortWithinPartitions 方法将传入 3,4 步创建好的 Partitioner 与 Comparator 对象,并调用 mapToPair 方法转为
Tuple2<ImmutableBytesWritable,KeyValue>
对象。调用 saveAsNewAPIHadoopFile 方法 保存为 HFile 文件。
1,2 两步骤略,第 3 步创建一个 Partitioner 的目的是为了第 7 步通过调用 repartitionAndSortWithinPartitions 来根据 table regions 的范围分区,同时一个分区对应 Spark 的一个 executor,简单来说让每一个分区数据有序,同时并发的处理多个分区可以增加处理效率,如果不做分区只做 sortBykey() 也可以实现,但是执行时间会极长。
List<Tuple2<ImmutableBytesWritable,KeyValue>>
列表,列表里保证 KeyValue 是按照列族,标识符排序好,但是在调用 repartitionAndSortWithinPartitions 方法之后,排序由于 shuffle 的原因重新变为乱序,最后的结果是笔者总会看到 rowkey 确实排序好了,但是依旧因为列族与标识符没排序好而抛出的 "Added a key not lexically larger than previous" IOException。这块笔者请教的负责维护 Spark 的同事,证实了当前调用场景下 shuffle 会影响排序的事实。为了解决这个问题,我们实现的比较器不仅可以比较 rowkey,同时在 rowkey 相等的时候比较列族与标识符,保证 rowkey,列族,标识符三者按照此顺序关系一定有序。这也就是为什么第 5 步返回的是
List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>,KeyValue>>
这样一个列表,返回这样的列表我们才可以按照上述所说进行排序,排序代码见第五节。第 7,8 步是将排好序的数据写到 HDFS 生成 HFile 文件,具体的代码详见第五节示例代码。
4.3 Spark Bulkload常见错误解析
调用 saveAsNewAPIHadoopFile 方法抛出 "Added a key not lexically larger than previous" 的异常是因为排序问题导致,上文已经做了详细介绍。
ImmutableBytesWritable 无法序列化的异常。通过如下 java 代码设置即可(读者可以用 scala 实现)
sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");sparkConf.registerKryoClasses(newClass[]{org.apache.hadoop.hbase.io.ImmutableBytesWritable.class});
比较器无法序列化的异常。让比较器实现 Serializable 接口即可。
driver 中初始化的对象 于在 RDD 的 action 或者 transformation 中无法获取的异常,需要做 broadcast。
笔者还遇到因为 Spark 使用的 HBase 版本 jar 包冲突的问题,可以通过 Spark 命令中指定上传特定版本 jar 包覆盖的版本解决,具体命令在第五节给出。
可以通过动态设置 executor 个数来优化整体任务执行效率。
五、核心示例代码
执行命令如下。
$SPARK_HOME/bin/spark-submit --master yarn --conf spark.executor.extraClassPath=hbase-hadoop-compat-1.2.6.jar:hbase-server-1.2.6.jar:hbase-common-1.2.6.jar:hbase-client-1.2.6.jar--conf spark.driver.extraClassPath=hbase-hadoop-compat-1.2.6.jar:hbase-server-1.2.6.jar:hbase-client-1.2.6.jar:hbase-common-1.2.6.jar--conf spark.dynamicAllocation.maxExecutors=100--conf spark.dynamicAllocation.minExecutors=5--conf spark.dynamicAllocation.enabled=true--files /opt/hive/conf/hive-site.xml,/opt/hbase/conf/hbase-site.xml,$hive_schema_file,$hbase_schema_file,/opt/hbase_bulkload/config.properties --executor-memory 8G--driver-memory 8G--name spark-hbase-bulkload-$hbase_table --class com.youzan.bigdata.BulkLoad2HBase--queue realtime.data_platform --jars $HBASE_HOME/lib/hbase-hadoop-compat-1.2.6.jar,/opt/hbase/lib/hbase-server-1.2.6.jar,/opt/hbase/lib/hbase-client-1.2.6.jar,/opt/hbase/lib/hbase-common-1.2.6.jar--deploy-mode cluster /opt/hbase_bulkload/spark-bulkload-hbase-1.0.0-SNAPSHOT-jar-with-dependencies.jar $hive_table $hbase_table $hbase_cluster $hive_schema_file $hbase_schema_file $bulkload_files_path
入参以及上传的文件可以忽略,读者根据自己的逻辑设计入参。--conf spark.dynamicAllocation.enabled=true 可以根据当前 Region 分区自动调节执行的 executor 个数。上传的 jar 包为了解决版本冲突,通过 spark.executor.extraClassPath 与 spark.driver.extraClassPath 来配置。
Partitioner 代码如下 注意对key对象的处理,它是一个 Tuple2<<ImmutableBytesWritable,byte[]>,KeyValue>>
类型的对象。
public class RegionPartitioner extends Partitioner {
private List<HBaseTableInfo.RegionInfo> regionInfos = new LinkedList<>();
public RegionPartitioner(List<HBaseTableInfo.RegionInfo> regionInfos){
this.regionInfos = regionInfos;
}
@Override public int numPartitions() {
return this.regionInfos.size();
}
@Override public int getPartition(Object key) {
if(key instanceof ImmutableBytesWritable){
for(int i=0;i<regionInfos.size();i++){
if(regionInfos.get(i).containsRowkey(((ImmutableBytesWritable) key).get())){
return i;
}
}
}else if(key instanceof Tuple2){
if((ImmutableBytesWritable)((Tuple2) key)._1() instanceof ImmutableBytesWritable){
ImmutableBytesWritable rowkey = (ImmutableBytesWritable)((Tuple2) key)._1();
for(int i=0;i<regionInfos.size();i++){
if(regionInfos.get(i).containsRowkey(rowkey.get())){
return i;
}
}
}
}
return 0;
}
public static RegionPartitioner getRegionPartitioner(List<HBaseTableInfo.RegionInfo> regionInfos){
Collections.sort(regionInfos);
return new RegionPartitioner(regionInfos);
}
}
Comparator 代码如下。先比较 rowkey ,再比较列族+标识符。
public class KeyQualifierComparator implements Comparator<Tuple2<ImmutableBytesWritable,byte[]>>,Serializable {
@Override public int compare(Tuple2<ImmutableBytesWritable, byte[]> o1, Tuple2<ImmutableBytesWritable, byte[]> o2) {
if(o1._1().compareTo(o2._1()) == 0){
return Bytes.compareTo(o1._2(),o2._2());
}else{
return o1._1().compareTo(o2._1());
}
}
}
driver 端配置如下。
Job job = Job.getInstance();
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
//适配 hbase-1.3.0 的逻辑,这里需要zk传入job的configuration里,否则 HFileOutputFormat2.configureIncrementalLoad 会无法获取zk连接
job.getConfiguration().set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
Connection connection = ConnectionFactory.createConnection(hbaseConf);
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hbaseTable));
Table table = connection.getTable(TableName.valueOf(hbaseTable));
HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
将 SQL 返回的 Dataset 转为 List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>,KeyValue>>
对象的逻辑
public static List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>, KeyValue>> generateHBaseRowWithQualifier(Row sqlRow, List<String> columns) {
List<Tuple2<Tuple2<ImmutableBytesWritable,byte[]>, KeyValue>> result = new LinkedList<>();
//这里读者需要处理rowkey的位置,本源码这里略过很多处理逻辑,这不是重点
byte[] rowkey = Bytes.toBytes((String) sqlRow.get(0));
StructField[] fields = sqlRow.schema().fields();
Set<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
for (int i = 1; i < sqlRow.size(); i++) {
//Rowkey必须是String类型
//hbase columns 格式为 cf:qualifier;type 这部分也是业务代码处理逻辑,读者需要自己根据业务逻辑实现
String family = columns.get(i - 1).split(";")[0].split(":")[0];
String qualifier = columns.get(i - 1).split(";")[0].split(":")[1];
String type = columns.get(i - 1).split(";")[1];
DataType dt = fields[i].dataType();
//convertValueToByteArray 方法将hive中的数据按照用户指定的数据类型做转化,这里就不再给出了。
KeyValue value =
new KeyValue(rowkey, Bytes.toBytes(family), Bytes.toBytes(qualifier), convertValueToByteArray(dt, sqlRow.get(i),type));
map.add(value);
}
for(KeyValue kv:map){
result.add(new Tuple2(new Tuple2<>(new ImmutableBytesWritable(rowkey),kv.getQualifier()), kv));
}
return result;
}
流程如下。
List<HBaseTableInfo.RegionInfo> regionInfos = admin.getTableRegions(TableName.valueOf(hbaseTable)).stream()
.map(HRegionInfo ->{
return new HBaseTableInfo.RegionInfo(hbaseTable,HRegionInfo.getStartKey(),HRegionInfo.getEndKey());
}).collect(Collectors.toList());
RegionPartitioner regionPartitioner = RegionPartitioner.getRegionPartitioner(regionInfos);
KeyQualifierComparator comparator = new KeyQualifierComparator();
rows.javaRDD().flatMapToPair(row -> HBaseTableUtil.generateHBaseRowWithQualifier(row,bcHbaseColumns.value()).iterator())
.repartitionAndSortWithinPartitions(regionPartitioner,comparator)
.mapToPair(combinekey -> {
return new Tuple2(combinekey._1()._1(),combinekey._2());
}).saveAsNewAPIHadoopFile(bulkloadFilePath,ImmutableBytesWritable.class,
KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());
六、总结
经过最开始复杂的 Hive SQL 方式到后来的 Spark Bulkload 方式,离线数据批量导入到 HBase 的方案在有赞的实践就介绍到这里了。将来我们会针对性的对 Spark Bulkload 方案做进一步的优化,比如支持指定时间戳,增加更丰富的可配置的过滤器等等。同时希望 Spark Bulkload 方案能切实帮助读者解决生产上遇到的数据迁移问题,避免踩类似的坑。
最后打个小广告,有赞数据中台团队,主要负责有赞的数据基础组件,数据开发平台,数据资产治理平台等多个数据产品,提供一站式数据解决方案,欢迎感兴趣加入的小伙伴联系 zhaoyuan@youzan.com。
Vol.247
活动预告
Youzan PaaS Innovative Meetup 第二期来啦~
这次邀请了腾讯云和蚂蚁金服的技术讲师来分享最近热门的Service Mesh主题干货,持续致力于打造一个优秀活跃的杭州PaaS技术交流圈。
这次我们也将继续保持 PIM 小规模的深度交流氛围, 欢迎加入。