查看原文
其他

Hbase源码系列之BufferedMutator的Demo和源码解析

2017-07-17 浪尖 Spark学习技巧

一,基本介绍

BufferedMutator主要用来异步批量的将数据写入一个hbase表,就像Htable一样。通过Connection获取一个实例。Map/reduce 任务是BufferedMutator的好的使用案例。Map/Reduce任务获益于batch操作,但是没有留出flush接口。BufferedMutator从Map/Reduce任务接受数据,会依据一些先验性的经验批量提交数据,比如puts堆积的数量,由于批量提交时异步的,所以M/R逻辑不会因为数据的batch提交而阻塞。Map/Reduce 批处理任务每个线程会有一个BufferedMutator。单个BufferedMutator也能够很高效用于大数据量的在线系统,来成批的写puts入hbase表。

二,使用demo

Configuration conf =  HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "zookeeperHost");
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
        for (int i = 0; i < e.getNumExceptions(); i++) {
        LOG.info("Failed to sent put " + e.getRow(i) + ".");              }
        }
        };
        BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
        .listener(listener);
        params.writeBufferSize(123123L);
        try {
        Connection conn = ConnectionFactory.createConnection(conf);
        BufferedMutator mutator = conn.getBufferedMutator(params);
        Put p = new Put(Bytes.toBytes("someRow"));
        p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
        mutator.mutate(p);
        mutator.close();
        conn.close();
        } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
        }

        }

 

三,源码介绍

1,主要类介绍

BufferedMutatorParams

实例化一个BufferedMutator所需要的参数。

主要参数TableName(表名),writeBufferSize(写缓存大小),maxKeyValueSize(最大key-value大小),ExecutorService(执行线程池),ExceptionListener(监听BufferedMutator的异常)。

BufferedMutatorImpl

用来和hbase表交互,类似于Htable,但是意味着批量,异步的puts。通过HConnectionImplementation获得实例,具体方法如下:

public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
  if (params.getTableName() == null) {
    throw new IllegalArgumentException("TableName cannot be null.");
  }
  if (params.getPool() == null) {
    params.pool(HTable.getDefaultExecutor(getConfiguration()));
  }
  if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
    params.writeBufferSize(connectionConfig.getWriteBufferSize());
  }
  if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
    params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
  }
  return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
}

AsyncProcess

AsyncProcess内部维护的有一个线程池,我们的操作会被封装成runnable,然后扔到线程池里执行。这个过程是异步的,直到任务数达到最大值。

HConnectionImplementation

一个集群的链接。通过它可以找到master,定位到regions的分布,保持locations的缓存,并指导如何校准localtions信息。

2,源码过程

A),BufferedMutator构建的过程

1),首先是要构建一个HBaseConfiguration

Configuration conf =  HBaseConfiguration.create();
 conf.set("hbase.zookeeper.quorum", "zookeeperHost");

2),接着是构建BufferedMutatorParams

final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
  @Override
  public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
    for (int i = 0; i < e.getNumExceptions(); i++) {
      LOG.info("Failed to sent put " + e.getRow(i) + ".");
    }
  }
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
    .listener(listener);
params.writeBufferSize(123);

 

3),最后构建HConnection

 Connection conn = ConnectionFactory.createConnection(getConf())

 

4),最后构建BufferMutator

BufferedMutator mutator = conn.getBufferedMutator(params)

 

B),数据发送的过程

1),构建put或者List[put]

2),调用BufferedMutator.mutate方法

3),刷写到hbase。

三种方法:

一是,显式调用BufferedMutator.flush

二是,发送结束的时候调用BufferedMutator.close

三是,它根据当前缓存大于了设置的写缓存大小

while (undealtMutationCount.get() != 0
    && currentWriteBufferSize.get() > writeBufferSize) {
  backgroundFlushCommits(false);
}

最终都是调用的backgroundFlushCommits方法。

4),rpc的过程

入口是backgroundFlushCommits方法。Ap是AsyncProcess的实例。

ap.submit(tableName, taker, true, null, false);

首先是构建了一个HashMap,可以通过server找到该server上我们需要的region

//可以根据我们的server找到要发送到该server的actions
Map<ServerName, MultiAction<Row>> actionsByServer =
    new HashMap<ServerName, MultiAction<Row>>();

 

获取所有的region信息,所有region的副本都被包括在内

RegionLocations locs = connection.locateRegion(
    tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);

 

获取默认的region信息此时一个region只会返回一个默认id指定的位置。

loc = locs.getDefaultRegionLocation();

 

将row操作转变为action,并加入actionsByServer

//可以操作将row操作变为Action
Action<Row> action = new Action<Row>(r, ++posInList);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
byte[] regionName = loc.getRegionInfo().getRegionName();
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
it.remove();

接着是

AsyncProcess.submitMultiActions

AsyncRequestFutureImpl<CResult>

.sendMultiAction(actionsByServer, 1, null, false);

 

内部主要是根据server,获取MultiAction,然后构建Runnable

for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
  ServerName server = e.getKey();
  MultiAction<Row> multiAction = e.getValue();

  Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
      numAttempt);
  // make sure we correctly count the number of runnables before we try to reuse the send
  // thread, in case we had to split the request into different runnables because of backoff
  if (runnables.size() > actionsRemaining) {
    actionsRemaining = runnables.size();
  }

然后,遍历执行Runnable

for (Runnable runnable : runnables) {
  if ((--actionsRemaining == 0) && reuseThread
      && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
    runnable.run();
  } else {
    try {
      pool.submit(runnable);

 

5),Runnable的构建及Run方法

主要是进入getNewMultiActionRunnable

List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
for (DelayingRunner runner : actions.values()) {
  incTaskCounters(runner.getActions().getRegions(), server);
  String traceText = "AsyncProcess.sendMultiAction";
  Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
  // use a delay runner only if we need to sleep for some time
  if (runner.getSleepTime() > 0) {
    runner.setRunner(runnable);
    traceText = "AsyncProcess.clientBackoff.sendMultiAction";
    runnable = runner;
    if (connection.getConnectionMetrics() != null) {
      connection.getConnectionMetrics().incrDelayRunners();
      connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
    }
  } else {
    if (connection.getConnectionMetrics() != null) {
      connection.getConnectionMetrics().incrNormalRunners();
    }
  }
  runnable = Trace.wrap(traceText, runnable);
  toReturn.add(runnable);

进入SingleServerRequestRunnable,分析其Run方法

// setup the callable based on the actions, if we don't have one already from the request
if (callable == null) {
  callable = createCallable(server, tableName, multiAction);
}
RpcRetryingCaller<MultiResponse> caller = createCaller(callable, rpcTimeout);
try {
  if (callsInProgress != null) {
    callsInProgress.add(callable);
  }
  res = caller.callWithoutRetries(callable, operationTimeout);

然后是RpcRetryingCaller中调用了MultiServerCallable的call方法,主要是构建请求,调用RPC。这就进入了服务端也即RSRpcServices的mutil方法。

responseProto = getStub().multi(controller, requestProto);

 

C),HRegionserver端处理

RSRpcServices是服务端,本文对应的服务端实现是RSRpcServices.mutli。

if (request.hasCondition()) {
  Condition condition = request.getCondition();
  byte[] row = condition.getRow().toByteArray();
  byte[] family = condition.getFamily().toByteArray();
  byte[] qualifier = condition.getQualifier().toByteArray();
  CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
  ByteArrayComparable comparator =
      ProtobufUtil.toComparator(condition.getComparator());
  processed = checkAndRowMutate(region, regionAction.getActionList(),
        cellScanner, row, family, qualifier, compareOp,
        comparator, regionActionResultBuilder);
} else {
  mutateRows(region, regionAction.getActionList(), cellScanner,
      regionActionResultBuilder);
  processed = Boolean.TRUE;
}

根据条件进入checkAndRowMutate或者mutateRows。

根据类型做不同的操作,然后正式进入执行操作

MutationType type = action.getMutation().getMutateType();
  if (rm == null) {
    rm = new RowMutations(action.getMutation().getRow().toByteArray());
  }
  switch (type) {
    case PUT:
      rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
      break;
    case DELETE:
      rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
      break;
    default:
      throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
  }
  // To unify the response format with doNonAtomicRegionMutation and read through client's
  // AsyncProcess we have to add an empty result instance per operation
  resultOrExceptionOrBuilder.clear();
  resultOrExceptionOrBuilder.setIndex(i++);
  builder.addResultOrException(
      resultOrExceptionOrBuilder.build());
}
region.mutateRow(rm);

HRegion.mutateRow方法

HRegion.mutateRowsWithLocks

public void mutateRowsWithLocks(Collection<Mutation> mutations,
    Collection<byte[]> rowsToLock) throws IOException {
  mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
}

public void mutateRowsWithLocks(Collection<Mutation> mutations,
    Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
  MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
  processRowsWithLocks(proc, -1, nonceGroup, nonce);
}

具体处理的过程,可以自行去看了,源码注释条例很清晰。

四,总结

Hbase的JAVA API客户端,写操作有三种实现:

1,HTablePool

源码请看hbase权威指南。

2,HConnection

这种方式要自己实现一个线程池。

Connection conn = ConnectionFactory.createConnection(conf);
TableName tabName=  TableName.valueOf("tableName");
Table table=conn.getTable(tabName);

3,BufferedMutator

建议put操作采用这种方式。

批量,异步puts操作。MR都在采用,你还在等啥呢。

Demo请看上文。

客户端读写数据的操作,我们就讲这两篇,后面开始讲HBase的MR建立二级索引的实现及原理源码。然后就开始讲解管理类操作的实现。

 

欢迎关注浪尖公众号,一起学习kafka,hbase,spark源码。

 



您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存