OceanBase 源码解读(九):存储层代码解读之「宏块存储格式」
此前,带你读源码第八篇《事务日志的提交和回放》,为大家介绍了日志模块的设计理念和日志的一生。本期“源码解读”由数据库技术专家公祺为大家带来“存储层代码解读之「宏块存储格式」”。
“宏块”是处于 SSTable 和微块之间的数据结构,OceanBase 中的宏块为 2MB 的定长数据块。
众所周知,OceanBase 中微块是读 IO 最小单元,这是因为微块读处在用户请求的关键路径上,为保证快速响应用户的请求,微块不能过大,所以微块的默认大小一般不超过 16KB;而宏块作为写 IO 的最小单元,它的读写不在用户请求的关键路径上,所以就有了 2MB 的宏块,目的是为了最大限度的发挥磁盘的吞吐性能,能快速的做压缩、迁移复制、坏块检查等操作。
宏块的简单结构可以参考下图,详细的宏块格式介绍见下一节:
SSTableData:常规的存放数据的宏块;
LobData:Large Object Data,用来存放数据较大的行数据;
BloomFilterData:带有 bloomfilter 的宏块。
header 中记录元数据:对应 OceanBase 宏块的 header ;
payload 存放的是具体的数据:OceanBase 宏块的 payload 为微块列表;
trailer 中记录的是数据的 index:OceanBase 宏块的 trailer 为微块的 index 信息,即为微块在宏块中的偏移量;
padding 是为了做对齐的:OceanBase 宏块为2MB,不足部分需要做 padding。
common header:宏块的版本、类型、大小、checksum 等信息,见 ObMacroBlockCommonHeader ;
macro block header:记录了宏块数据大小、table_id、partition_id、微块的数量、列数、行数、checksum、加密信息、以及相关 offset 信息,具体可以参考 struct ObSSTableMacroBlockHeader;
column id list:列的 id 列表,OceanBase 数据库表每一列都一个唯一 id;
column type list:每列的类型信息,包括:类型、编码字符集等;
column order list:每列的顺序,可以是 ASC 或 DESC ,宏块中所有微块中的行数据都是按照这个顺序存储;
column checksum list:每列数据的 checksum 信息,用来做列的数据校验。
// src/storage/blocksstable/ob_macro_block.cpp
// 该函数主要给宏块header结构预先指向buffer的不同的offset,后续就不需要再进行序列化操作了,
// 该函数在初始化宏块的时候调用,header成员变量的具体值是在后续的数据写入后指定。
int ObMacroBlock::reserve_header(const ObDataStoreDesc& spec)
{
int ret = OB_SUCCESS;
common_header_.reset();
common_header_.set_attr(ObMacroBlockCommonHeader::SSTableData);
common_header_.set_data_version(spec.data_version_);
common_header_.set_reserved(0);
const int64_t common_header_size = common_header_.get_serialize_size();
// data_的类型是ObSelfBufferWriter,它是一个支持自动扩展的内存buffer
// ObSelfBufferWriter的实现见:src/storage/blocksstable/ob_data_buffer.h
MEMSET(data_.data(), 0, data_.capacity());
// 1. data_的第一部分为ObMacroBlockCommonHeader
if (OB_FAIL(data_.advance(common_header_size))) {
STORAGE_LOG(WARN, "data buffer is not enough for common header.", K(ret), K(common_header_size));
}
if (OB_SUCC(ret)) {
int64_t column_count = spec.row_column_count_;
int64_t rowkey_column_count = spec.rowkey_column_count_;
int64_t column_checksum_size = sizeof(int64_t) * column_count;
int64_t column_id_size = sizeof(uint16_t) * column_count;
int64_t column_type_size = sizeof(ObObjMeta) * column_count;
int64_t column_order_size = sizeof(ObOrderType) * column_count;
int64_t macro_block_header_size = sizeof(ObSSTableMacroBlockHeader);
// 2. data_的第二部分为ObSSTableMacroBlockHeader
header_ = reinterpret_cast<ObSSTableMacroBlockHeader*>(data_.current());
// 3. data_的第三部分为column_ids_
column_ids_ = reinterpret_cast<uint16_t*>(data_.current() + macro_block_header_size);
// 4. data_的第四部分为column_types
column_types_ = reinterpret_cast<ObObjMeta*>(data_.current() + macro_block_header_size + column_id_size);
// 5. data_的第五部分为column_orders_
column_orders_ =
reinterpret_cast<ObOrderType*>(data_.current() + macro_block_header_size + column_id_size + column_type_size);
// 6. data_的第六部分为column_checksum_
column_checksum_ = reinterpret_cast<int64_t*>(
data_.current() + macro_block_header_size + column_id_size + column_type_size + column_order_size);
macro_block_header_size += column_checksum_size + column_id_size + column_type_size + column_order_size;
// for compatibility, fill 0 to checksum and this will be serialized to disk
for (int i = 0; i < column_count; i++) {
column_checksum_[i] = 0;
}
// 7. data_后面的内存空间是给微块预留的
if (OB_FAIL(data_.advance(macro_block_header_size))) {
STORAGE_LOG(WARN, "macro_block_header_size out of data buffer.", K(ret));
} else {
// 初始化header中的成员变量
memset(header_, 0, macro_block_header_size);
header_->header_size_ = static_cast<int32_t>(macro_block_header_size);
header_->version_ = SSTABLE_MACRO_BLOCK_HEADER_VERSION_v3;
header_->magic_ = SSTABLE_DATA_HEADER_MAGIC;
header_->attr_ = 0;
header_->table_id_ = spec.table_id_;
header_->data_version_ = spec.data_version_;
header_->column_count_ = static_cast<int32_t>(column_count);
header_->rowkey_column_count_ = static_cast<int32_t>(rowkey_column_count);
header_->column_index_scale_ = static_cast<int32_t>(spec.column_index_scale_);
header_->row_store_type_ = static_cast<int32_t>(spec.row_store_type_);
header_->micro_block_size_ = static_cast<int32_t>(spec.micro_block_size_);
header_->micro_block_data_offset_ = header_->header_size_ + static_cast<int32_t>(common_header_size);
memset(header_->compressor_name_, 0, OB_MAX_HEADER_COMPRESSOR_NAME_LENGTH);
MEMCPY(header_->compressor_name_, spec.compressor_name_, strlen(spec.compressor_name_));
header_->data_seq_ = 0;
header_->partition_id_ = spec.partition_id_;
// copy column id & type array;
for (int64_t i = 0; i < header_->column_count_; ++i) {
column_ids_[i] = static_cast<int16_t>(spec.column_ids_[i]);
column_types_[i] = spec.column_types_[i];
column_orders_[i] = spec.column_orders_[i];
}
}
}
if (OB_SUCC(ret)) {
// 指定数据在data_中的offset
data_base_offset_ = header_->header_size_ + common_header_size;
}
return ret;
}
// src/storage/blocksstable/ob_micro_block_writer.h
// 下面是微块在内存中以及持久化的存储格式:
// memory
// |- row data buffer
// |- ObMicroBlockHeader
// |- row data
// |- row index buffer
// |- ObRowIndex
//
// build output
// |- compressed data
// |- ObMicroBlockHeader
// |- row data
// |- RowIndex
class ObMicroBlockWriter : public ObIMicroBlockWriter {
public:
virtual int append_row(const storage::ObStoreRow& row) override;
virtual int build_block(char*& buf, int64_t& size) override;
virtual void reuse() override;
virtual int64_t get_block_size() const override;
virtual int64_t get_row_count() const override;
virtual int64_t get_data_size() const override;
virtual int64_t get_column_count() const override;
virtual common::ObString get_last_rowkey() const override;
void reset();
};
微块在宏块中 offset 数组:偏移量的个数为微块数+1,前后两个 offset 的差值是前一个微块的长度;
每个微块的最大的 rowkey 信息( endkey ),包括:endkey 的偏移量和 endkey 的数据。
can_mark_deletion:用来标记这个微块是否可以标记删除;
delta:用来记录微块中真正有效的行数,不包括被标记删除的行数。
// src/storage/blocksstable/ob_micro_block_index_writer.cpp
int ObMicroBlockIndexWriter::add_entry(
const ObString& rowkey, const int64_t data_offset, bool can_mark_deletion, const int32_t delta)
{
int ret = OB_SUCCESS;
int32_t endkey_offset = static_cast<int32_t>(buffer_[ENDKEY_BUFFER_IDX].length());
// 去除了一些参数检查的的代码
if (OB_FAIL(buffer_[INDEX_BUFFER_IDX].write(static_cast<int32_t>(data_offset)))) {
STORAGE_LOG(WARN, "index buffer fail to write data_offset.", K(ret), K(data_offset));
} else if (OB_FAIL(buffer_[INDEX_BUFFER_IDX].write(endkey_offset))) {
STORAGE_LOG(WARN, "index buffer fail to write endkey_offset.", K(ret), K(endkey_offset));
} else if (OB_FAIL(buffer_[ENDKEY_BUFFER_IDX].write(rowkey.ptr(), rowkey.length()))) {
STORAGE_LOG(WARN, "data buffer fail to writer rowkey.", K(ret), K(rowkey));
} else if (is_multi_version_minor_merge_ &&
OB_FAIL(buffer_[MARK_DELETE_BUFFER_IDX].write(static_cast<uint8_t>(can_mark_deletion)))) {
STORAGE_LOG(WARN, "fail to write mark deletion", K(ret), K(can_mark_deletion));
} else if (is_multi_version_minor_merge_ && OB_FAIL(buffer_[DELTA_BUFFER_IDX].write(delta))) {
STORAGE_LOG(WARN, "failed to write delta", K(ret));
} else {
++micro_block_cnt_;
}
return ret;
}
顺序读取:主要用在压缩等场景,顺序读取各个微块数据,进行合并、迁移等处理;
随机读取:主要用在处理用户请求时,根据 rowkey 快速读取对应的微块数据。
使用不固定大小的宏块:这时我们需要记录宏块的 offset 、length 等元信息,对宏块的定位就需要多一个操作,性能会有一定的损失,同时也会提高设计的复杂度;
每个宏块都满载 2MB:仅仅一个 insert 操作,可能会导致满载的宏块分裂成两个。
宏块的操作
// src/storage/blocksstable/ob_store_file_system.h
class ObStorageFile {
public:
...
// 异步读取宏块、微块接口
// 具体读的是微块还是宏块,通过read_info中offset_、size_指定
// 该接口为异步的,数据读取成功后,会通过macro_handle通知调用者
virtual int async_read_block(const ObMacroBlockReadInfo& read_info, ObMacroBlockHandle& macro_handle) = 0;
// 异步写入宏块接口
virtual int async_write_block(const ObMacroBlockWriteInfo& write_info, ObMacroBlockHandle& macro_handle) = 0;
// 同步读写接口,一般是通过上面两个异步接口实现
virtual int write_block(const ObMacroBlockWriteInfo& write_info, ObMacroBlockHandle& macro_handle) = 0;
virtual int read_block(const ObMacroBlockReadInfo& read_info, ObMacroBlockHandle& macro_handle) = 0;
...
};
具体的读写接口的实现是在 ObStorageFile 的派生类 ObLocalStorageFile 中,可以参考下面代码了解其实现:
src/storage/blocksstable/ob_local_file_system.h。
// src/storage/blocksstable/ob_macro_block.h
class ObMacroBlock {
public:
// 初始化宏块结构,主要做一些初始化的操作:
// 1. 调用 reserve_header,将宏块的header映射到buffer中,见本文2.1的代码说明
// 2. 调用 init_row_reader,根据ObRowStoreType初始化行reader
int init(ObDataStoreDesc& spec);
// 将一个微块append到该宏块中,主要就是将序列化好的微块数据copy到该宏块buffer中,
// 并更新该宏块header中的元数据
int write_micro_block(const ObMicroBlockDesc& micro_block_desc, int64_t& data_offset);
// 对已经写满(或者不需要再写)的宏块,进行刷盘操作,具体包括:
// 1. 序列化common header;
// 2. 构建header、trailer中的各种元信息;
// 3. 调用底层ObStorageFile::async_write_block,将数据异步写入到磁盘中;
// 4. 宏块数据写成功后,会通过macro_handle通知上层。
int flush(const int64_t cur_macro_seq, ObMacroBlockHandle& macro_handle, ObMacroBlocksWriteCtx& block_write_ctx);
// 合并两个顺序的宏块,在compaction结束时,检查最后一个不满的宏块能否和上一个宏块合并,
// 如果上一个宏块空间够用,则进行合并,这两个宏块的数据是已经排好序的,
// 这个接口仅被 ObMacroBlockWriter::close 调用。merge函数主要流程如下:
// 1. 再次检查当前宏块空间是否充足,不充足则报错返回;
// 2. 对最后一个宏块的微块index追加到当前的微块index中;
// 3. 将最后一个宏块的微块数据追加到当前微块的buffer中;
// 4. 更新当前宏块header中的元数据。
int merge(const ObMacroBlock& macro_block);
// 和merge接口配合使用,主要是检查当前宏块能否多容纳一个宏块数据
bool can_merge(const ObMacroBlock& macro_block);
// 重置该宏块,主要用于复用该宏块对象
void reset();
...
};
// src/storage/blocksstable/ob_macro_block_writer.cpp
class ObMacroBlockWriter {
public:
// 根据data_store_desc中的table_id、partition_id等信息,打开一个宏块写入器
int open(ObDataStoreDesc& data_store_desc, const ObMacroDataSeq& start_seq,
const ObIArray<ObMacroBlockInfoPair>* lob_blocks = NULL, ObMacroBlockWriter* index_writer = NULL);
// 追加一个宏块,主要会应用在这些场景:
// 1. 在合并时,原来的SSTable的某个宏块没有修改,直接复用到当前SSTable中;
// 2. 并行合并后,也可以用到这个接口,将多个没有重合数据的宏块进行追加。
int append_macro_block(const ObMacroBlockCtx& macro_block_ctx);
// 追加一个微块,和append_macro_block不同的是需要考虑是否存在数据重叠:
// 1. 如果数据不重叠,则将micro_block追加到当前宏块中;
// 2. 如果数据重叠,则需要构建micro_block的reader,将数据按row写到当前宏块中。
int append_micro_block(const ObMicroBlock& micro_block);
// 追加一行数据,会调用ObMicroBlockWriter::append_row
int append_row(const storage::ObStoreRow& row, const bool virtual_append = false);
// 关闭ObMacroBlockWriter,在关闭之前,会尝试将最后两个宏块合并,节省空间,
// 最后将当前最后的宏块flush到磁盘,并等待刷盘成功(wait_io_finish)
int close(storage::ObStoreRow* root = NULL, char* root_buf = NULL);
};
在做 compaction 时,会使用 ObMicroBlockIterator ,读取完整的宏块数据,可以参考 class ObMicroBlockIterator 代码,关于合并的逻辑,后续会有专门的博文来介绍;
在做数据迁移的时候也会涉及到完整宏块的读取,详细逻辑可以参考 class ObMigratePrepareTask 的实现;
ObMacroBlockWriter 将宏块数据写成功后,会根据 MICRO_BLOCK_MERGE_VERIFY_LEVEL 的情况,有可能见宏块数据读出来做检查;
在异步构建 bloomfilter 的时候,也会将宏块数据按序读出来,可以参考代码:ObBloomFilterBuildTask::build_bloom_filter、class ObSSTableRowWholeScanner;
在做坏块检查时,也会有读取全部宏块的情况,具体逻辑参考下面的代码说明:
// src/storage/blocksstable/ob_store_file_system.h
// 坏块检查的定时任务
class ObFileSystemInspectBadBlockTask : public common::ObTimerTask {
public:
// 定时任务基类的任务执行内容接口
// 调用 inspect_bad_block
virtual void runTimerTask();
private:
// 对所有有效的宏块做坏块检查,主要流程为:
// 1. 通过 ObPartitionService 初始化宏块的迭代器
// 2. 根据 macro_iter,可以遍历所有的宏块,
// 调用下面的 check_macro_block 做宏块检查
void inspect_bad_block();
// 做一些参数检查后,对数据宏块做坏块检查
// 通过调用下面 check_data_block 来做数据检查
int check_macro_block(const ObMacroBlockInfoPair& pair, const storage::ObTenantFileKey& file_key);
// 将整个宏块的数据从磁盘读出来
// 使用 ObSSTableMacroBlockChecker::check_data_block 做具体的检查
int check_data_block(const MacroBlockId& macro_id, const blocksstable::ObFullMacroBlockMeta& full_meta,
const storage::ObTenantFileKey& file_key);
bool has_inited();
private:
// 坏块检查任务是每个周期只做一部分,下面的参数记录了断点信息
int64_t last_partition_idx_;
int64_t last_sstable_idx_;
int64_t last_macro_idx_;
// 数据检查工具类,做宏块、微块、列相关的checksum校验
ObSSTableMacroBlockChecker macro_checker_;
};
三、宏块的申请和释放
宏块存储格式的 demo
跨境电商数据融合实践|OceanBase 助力致欧家居打造分布式跨境电商
有奖互动|中国信通院联合 OceanBase 邀您参加《数据库发展研究报告(2022)》调研问卷
征文分享|OceanBase 3.1.2 数据库性能测试探索