查看原文
其他

VictorialMetrics存储原理之索引存储格式

阳明 k8s技术圈 2022-11-21

前文我们介绍了当插入数据的时候会先去添加索引数据,索引构建完成后又是如何去持久化数据的呢?保存的数据又是怎样的格式呢?本节我们将对此进行详细讲解。

添加索引数据

索引构建完成后会调用 AddItems 函数将索引添加到 Table 中去:

// lib/mergeset/table.go
// AddItems 添加指定的 items 到 table 中去
func (tb *Table) AddItems(items [][]byte) error {  
   if err := tb.rawItems.addItems(tb, items); err != nil {  
      return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)  
   }  
   return nil  
}

Table 的结构如下所示:

// lib/mergeset/table.go
// Table 代表 mergeset table.  
type Table struct {  
   activeMerges   uint64  
   mergesCount    uint64  
   itemsMerged    uint64  
   assistedMerges uint64  

   // merge 索引
   mergeIdx uint64  
   // 路径
   path string  

   // flush回调
   flushCallback         func()  
   flushCallbackWorkerWG sync.WaitGroup  
   needFlushCallbackCall uint32  
   // 在将指定项的整个块刷新到持久存储之前,在合并期间调用的回调
   prepareBlock PrepareBlockCallback  

   // parts 列表
   partsLock sync.Mutex  
   parts     []*partWrapper  
  
   // rawItems 包含最近添加的尚未转换为 parts 的数据
   // 出于性能原因,未在搜索中使用 rawItems
   rawItems rawItemsShards  
  
   snapshotLock sync.RWMutex  
  
   flockF *os.File  
  
   stopCh chan struct{}  
  
   partMergersWG syncwg.WaitGroup  
   rawItemsFlusherWG sync.WaitGroup  
   convertersWG sync.WaitGroup  
   rawItemsPendingFlushesWG syncwg.WaitGroup  
}

一个索引 Table 就对应着一个 indexDB,也就是数据目录 indexdb 下面的文件夹:

其中核心的是 partsrawItems 两个属性。

  • parts 主要是存储 merge 后的 blocks,一个 part 与文件系统上的一个目录对应,比如上图中的 24_1_16F4A862471C1DC9 目录就是一个 part
  • rawItems 是用于预处理 Items 的,是一个 rawItemsShards 对象。

rawItemsShards 结构体定义如下所示:

// lib/mergeset/table.go
type rawItemsShards struct {  
   shardIdx uint32  
  
   // 在多 cpu 系统上添加 rows 数据时,shards 分片可以减少锁竞争 
   shards []rawItemsShard  
}

// 每个 table 的 rawItems 分片数 
var rawItemsShardsPerTable = cgroup.AvailableCPUs()  

// 每个分片最大的Block数
const maxBlocksPerShard = 512

// 当在打开Table的时候就会调用该函数进行初始化
func (riss *rawItemsShards) init() {  
   riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)  
}

// 添加 items 元素
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error {  
   n := atomic.AddUint32(&riss.shardIdx, 1)  
   shards := riss.shards  
   idx := n % uint32(len(shards))  
   shard := &shards[idx]  
   return shard.addItems(tb, items)  
}

rawItemsShards 其实就是加了一个分片功能用于保存索引数据,addItems 函数就是将要添加的数据添加到对应的分片上去,最终执行的逻辑是 shard.addItems

// lib/mergeset/table.go
type rawItemsShard struct {  
   mu            sync.Mutex  
   ibs           []*inmemoryBlock  
   lastFlushTime uint64  
}  

// 添加items元素
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {  
   var err error  
   var blocksToFlush []*inmemoryBlock  
  
   ris.mu.Lock()  
   ibs := ris.ibs  
   if len(ibs) == 0 {  
      ib := getInmemoryBlock()  
      ibs = append(ibs, ib)  
      ris.ibs = ibs  
   }  
   // 取最后一个内存块
   ib := ibs[len(ibs)-1]  
   for _, item := range items { 
      // 添加索引item到内存块 
      if !ib.Add(item) {  // 超过了内存块大小
         // 重新获取一个内存块,此时肯定为空
         ib = getInmemoryBlock()  
         // 重新添加
         if !ib.Add(item) {  
            putInmemoryBlock(ib)  
            err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))  
            break  
         }  
         ibs = append(ibs, ib)  
         ris.ibs = ibs  
      }  
   }  
   // 超过了每个分片的最大内存块的数量
   if len(ibs) >= maxBlocksPerShard {  
      // 将内存块放到待刷新的内存块列表中去
      blocksToFlush = append(blocksToFlush, ibs...)  
      // 释放前面的内存块资源
      for i := range ibs {  
         ibs[i] = nil  
      }  
      ris.ibs = ibs[:0]  
      ris.lastFlushTime = fasttime.UnixTimestamp()  
   }  
   ris.mu.Unlock()  
   // 执行merge合并操作
   tb.mergeRawItemsBlocks(blocksToFlush, false)  
   return err  
}

// lib/mergeset/encoding.go
// 内存中的一个Block块结构
type inmemoryBlock struct {  
   commonPrefix []byte  
   data         []byte  // 用来存储数据
   items        []Item  // 用来存储每个item数据的起始偏移量
}

// Item 表示用于存储在 mergeset 中的单个 item 数据
type Item struct {  
   // 数据的开始偏移量
   Start uint32  
   // 数据的结束偏移量
   End uint32  
}

// maxInmemoryBlockSize 是 memoryblock.data 的最大值。
//  
// 它必须适合 CPU 缓存大小,即当前 CPU 的缓存大小为64kb。
const maxInmemoryBlockSize = 64 * 1024

// Add 将 x 添加到内存卡 ib 的末尾
//  
// 如果由于块大小限制,x 未添加到 ib,则返回 false
func (ib *inmemoryBlock) Add(x []byte) bool {  
   data := ib.data  
   // 操过块大小限制了
   if len(x)+len(data) > maxInmemoryBlockSize {  
      return false  
   }  
   if cap(data) == 0 {  
      // 预分配 data 和 items 以减少内存分配
      data = make([]byte0, maxInmemoryBlockSize)  
      ib.items = make([]Item, 0512)  
   }  
   dataLen := len(data)  
   data = append(data, x...)  // 将 x 添加到 data
   ib.items = append(ib.items, Item{  // 更新 items
      Start: uint32(dataLen),  
      End:   uint32(len(data)),  
   })  
   ib.data = data  
   return true  
}

rawItemsShard 表示保存索引数据的一个分片,里面其实就是一个 inmemoryBlock 的内存块切片,每个分片最多有 512 个内存块,每个内存块占用 64KB 的容量,当每个分片中的内存块数量超过最大数量(512)会去将内存块数据刷新为 Part

如果分片中的内存块数量没超过上限,则会通过一个任务去定时(1s)将 rawItem 数据刷新(转换)为 Part,以便它们对搜索可见。

// lib/mergeset/table.go
// 将最近的 rawItem 刷新(转换)为 Part,以便它们对搜索可见。
const rawItemsFlushInterval = time.Second

// 启动 rawItems Flusher 任务
func (tb *Table) startRawItemsFlusher() {  
   tb.rawItemsFlusherWG.Add(1)  
   go func() {  
      tb.rawItemsFlusher()  
      tb.rawItemsFlusherWG.Done()  
   }()  
}  
  
func (tb *Table) rawItemsFlusher() {  
   ticker := time.NewTicker(rawItemsFlushInterval)  
   defer ticker.Stop()  
   for {  
      select {  
      case <-tb.stopCh:  
         return  
      case <-ticker.C:  
         tb.flushRawItems(false)  
      }  
   }  
}

合并内存数据

将内存块数据转换为 Part 都是通过 mergeRawItemsBlocks 函数去实现的。

// lib/mergeset/table.go

// 一次合并的默认 parts 数
//  
// 这个数字是根据经验得出的,它提供了尽可能低的开销
// 有关详细信息,请参阅 appendPartsToMerge test
const defaultPartsToMerge = 15

// merge 内存块数据
func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) {  
   if len(ibs) == 0 {  
      return  
   }  
   tb.partMergersWG.Add(1)  
   defer tb.partMergersWG.Done()  
  
   pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)  
   var pwsLock sync.Mutex  
   var wg sync.WaitGroup  
   for len(ibs) > 0 {  
      // 一次最大合并的内存块数量
      n := defaultPartsToMerge  
      if n > len(ibs) {  
         n = len(ibs)  
      }  
      wg.Add(1)  
      go func(ibsPart []*inmemoryBlock) {  
         defer wg.Done()  
         // merge inmemoryBlock
         pw := tb.mergeInmemoryBlocks(ibsPart)  
         if pw == nil {  
            return  
         }  
         pw.isInMerge = true  
         pwsLock.Lock()  
         pws = append(pws, pw)  
         pwsLock.Unlock()  
      }(ibs[:n])  
      ibs = ibs[n:]  
   }  
   wg.Wait()  
   if len(pws) > 0 {  
      if err := tb.mergeParts(pws, niltrue); err != nil {  
         logger.Panicf("FATAL: cannot merge raw parts: %s", err)  
      }  
      if tb.flushCallback != nil {  
         if isFinal {  
            tb.flushCallback()  
         } else {  
            atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 01)  
         }  
      }  
   }  
  
   for {  
      tb.partsLock.Lock()  
      ok := len(tb.parts) <= maxParts  
      tb.partsLock.Unlock()  
      if ok {  
         return  
      }  
  
      // The added part exceeds maxParts count. Assist with merging other parts.  
      //      
      // Prioritize assisted merges over searches.      
      storagepacelimiter.Search.Inc()  
      err := tb.mergeExistingParts(false)  
      storagepacelimiter.Search.Dec()  
      if err == nil {  
         atomic.AddUint64(&tb.assistedMerges, 1)  
         continue  
      }  
      if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {  
         return  
      }  
      logger.Panicf("FATAL: cannot merge small parts: %s", err)  
   }  
}

mergeRawItemsBlocks 函数将指定的内存块进行 merge 合并操作,一次合并最大的内存块数量为 15,然后在独立的 goroutine 中去进行合并操作,使用 mergeInmemoryBlocks 函数。

// lib/mergeset/table.go
// merge InmemoryBlocks
func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {  
   // 将 InmemoryBlock 列表转换成 inmemoryPart 列表 
   // inmemoryPart 表示内存中的Part
   mps := make([]*inmemoryPart, 0len(ibs))  
   for _, ib := range ibs {  
      if len(ib.items) == 0 {  
         continue  
      }  
      mp := getInmemoryPart()  
      mp.Init(ib) // 将inmemoryBlock转换为inmemoryPart
      putInmemoryBlock(ib)  
      mps = append(mps, mp)  
   }  
   if len(mps) == 0 {  
      return nil  
   }  
   if len(mps) == 1 {  
      // 没有要合并的内容。只需返回单个 inmemory part。
      mp := mps[0]  
      p := mp.NewPart()  
      return &partWrapper{  
         p:        p,  
         mp:       mp,  
         refCount: 1,  
      }  
   }  
   defer func() {  
      for _, mp := range mps {  
         putInmemoryPart(mp)  
      }  
   }()  
  
   atomic.AddUint64(&tb.mergesCount, 1)  
   atomic.AddUint64(&tb.activeMerges, 1)  
   defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))  
  
   // 为每个 `inmemoryPart` 构造 `blockStreamReader`, 用于迭代读取 items
   bsrs := make([]*blockStreamReader, 0len(mps))  
   for _, mp := range mps {  
      bsr := getBlockStreamReader()  
      bsr.InitFromInmemoryPart(mp)  
      bsrs = append(bsrs, bsr)  
   }  
  
   // 准备一个 blockStreamWriter 用于合并写入的 part
   bsw := getBlockStreamWriter()  
   // 不要通过 getInmemoryPart() 获取 mpDst,因为与池中的其他条目相比,它的大小可能太大。 
   // 这可能会导致内存使用量增加,因为存在大量的碎片。 
   // 创建一个新的 inmemoryPart,接收合并的数据
   mpDst := &inmemoryPart{}  
   bsw.InitFromInmemoryPart(mpDst)  
  
   // 开始 merge 数据
   // 该 merge 不应该被 stopCh 中断,因为它可能是 stopCh 关闭后的最终结果
   err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged)  
   if err != nil {  
      logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)  
   }  
   putBlockStreamWriter(bsw)  
   for _, bsr := range bsrs {  
      putBlockStreamReader(bsr)  
   }  
  
   p := mpDst.NewPart()  
   return &partWrapper{  
      p:        p,  
      mp:       mpDst,  
      refCount: 1,  
   }  
}

上面的函数会将指定的内存块转换成 partWrapper,该结构就是一个包含 partinmemoryPart 的包装器。

// lib/mergeset/table.go
type partWrapper struct {  
   p *part  
  
   mp *inmemoryPart  
  
   refCount uint64  
  
   isInMerge bool  
}

part 的结构如下所示:

// lib/mergeset/part.go
type part struct {  
   ph partHeader  
  
   path string  
  
   size uint64  
  
   mrs []metaindexRow  
  
   indexFile fs.MustReadAtCloser  
   itemsFile fs.MustReadAtCloser  
   lensFile  fs.MustReadAtCloser  
}

一个 part 就是 Table 下面的一个数据目录。

part 中包含一个 partHeader,该属性中包含当前 part 的一些 Meta 信息,一共有多少个 items、有多少 blocks、第一个和最后一个 item,对应着 part 目录下面的 metadata.json 文件。

// lib/mergeset/part_header.go
type partHeader struct {  
   // part 包含的 items 数
   itemsCount uint64  
  
   // part 包含的 blocks 数
   blocksCount uint64  
  
   // part 中的第一个 item
   firstItem []byte  
  
   // part 中的最后一个 item
   lastItem []byte  
}

part 中另外的属性 path 表示当前 part 的路径,size 表示大小,另外三个属性 indexFileitemsFilelensFile 对应中 part 目录下面的三个文件:index.binitems.binlens.bin。此外 part 结构中还有最后一个 mrs 属性,是一个 []metaindexRow

// lib/mergeset/metaindex_row.go

// metaindexRow 描述了一个 blockHeaders 即索引块。 
type metaindexRow struct {  
   // 第一个 block 中的第一个 item 元素
   // 它用于快速查找所需的索引块
   firstItem []byte  
  
   // 块包含的 blockHeaders 的数量
   blockHeadersCount uint32  
  
   // 索引文件中块的偏移量
   indexBlockOffset uint64  
  
   // 索引文件中块的大小
   indexBlockSize uint32  
}

除了 part 之外还有一个内存中的 inmemoryPart 结构,其基本结构和 part 类似,不同的是几个相关的属性不是文件对象,而是 ByteBuffer,因为是内存中的结构。

// lib/mergeset/inmemory_part.go
// 在内存中的 Part 结构
type inmemoryPart struct {  
   // partHeader 记录 itemsCount, blocksCount, firstItem, lastItem 信息, 最后会序列化到 metadata.json
   ph partHeader  
   // 当前 block 的 header 信息,有 commonPrefix, firstItem, marshalType, itemsCount, itemsBlockOffset, lenBlockOffset, itemsBlockSize, lenBlockSize
   bh blockHeader  
   // 当前 block 的 metaindex 信息,存储了当前 blockHeader 的 firstItem, blockHeaderCount, indexBlockOffset, indexBlockSize
   mr metaindexRow  
   
   // 用于序列化后写入内存/磁盘文件使用
   metaindexData bytesutil.ByteBuffer  // -> metaindex.bin
   indexData     bytesutil.ByteBuffer  // -> index.bin
   itemsData     bytesutil.ByteBuffer  // -> items.bin
   lensData      bytesutil.ByteBuffer  // -> lens.bin
}

其他几个属性上面介绍过,blockHeader 结构如下所示,用于记录 block 头信息:

// lib/mergeset/block_header.go
type blockHeader struct {  
   // 块中所有 items 的公用前缀  
   commonPrefix []byte  
  
   // 第一个 item
   firstItem []byte  
  
   // 用于块压缩的 Marshal 类型
   marshalType marshalType  
  
   // 块中的 items 数,不包括第一个 item
   itemsCount uint32  
  
   // items block 的偏移量
   itemsBlockOffset uint64  
  
   // lens block 的偏移量
   lensBlockOffset uint64  
  
   // items block 的大小
   itemsBlockSize uint32  
  
   // lens block 的大小
   lensBlockSize uint32  
}

整个 part 的结构看上去确实比较复杂,为什么需要设计这些属性?核心肯定就是为了快速索引,我们先往下分析,待会再回过头来看。

inmemoryPartpart 读入内存中的结构, 在 inmemoryBlock merge 之前,每个 inmemoryBlock 都会先通过 mp.Init 转换成一个 inmemoryPart 的结构,inmemoryPartmetaindexDataindexDataitemsDatalensData 数据结构与磁盘对应的文件内容一致。

序列化数据

现在我们再回到上面的 mergeInmemoryBlocks 函数,流程如下所示:

  • 1.将所有的 inmemoryBlock 转换为 inmemoryPart 结构
  • 2.为每个 inmemoryPart 构造 blockStreamReader,用于迭代读取 items
  • 3.创建一个新的 inmemoryPart,并构造一个 blockSteamWriter 用于合并写入的数据
  • 4.然后调用 mergeBlockStreams 函数执行真正的 merge 操作

首先通过 Init 函数将 inmemoryBlock 转换为 inmemoryPart 结构。

// lib/mergeset/inmemory_part.go
// Init 初始化 mp 从 ib. 
func (mp *inmemoryPart) Init(ib *inmemoryBlock) {  
   mp.Reset()  
   
   sb := &storageBlock{}  
   sb.itemsData = mp.itemsData.B[:0]  
   sb.lensData = mp.lensData.B[:0]  
  
   // 使用尽可能小的压缩等级来压缩 inmemoryPart,因为它很快就会被合并到文件 part 去。
   compressLevel := -5  
   // 序列化乱序的数据
   mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)  

   // 获取 partHeader 值
   mp.ph.itemsCount = uint64(len(ib.items))  
   mp.ph.blocksCount = 1  
   mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)  
   mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)  

   // 获取itemsData,更新blockHeader的items偏移和数量
   mp.itemsData.B = sb.itemsData  
   mp.bh.itemsBlockOffset = 0  
   mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))  

   // 获取lensData,更新blockHeader的lens偏移和数量
   mp.lensData.B = sb.lensData  
   mp.bh.lensBlockOffset = 0  
   mp.bh.lensBlockSize = uint32(len(mp.lensData.B))  

   // 获取 indexData,blockHeader序列化的值
   bb := inmemoryPartBytePool.Get()  
   bb.B = mp.bh.Marshal(bb.B[:0])  
   mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, 0)  

   // 获取 metaindexData,metaindexRow序列化的值
   mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)  
   mp.mr.blockHeadersCount = 1  
   mp.mr.indexBlockOffset = 0  
   mp.mr.indexBlockSize = uint32(len(mp.indexData.B))  
   bb.B = mp.mr.Marshal(bb.B[:0])  
   mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, 0)  
   inmemoryPartBytePool.Put(bb)  
}

上面的函数将 inmemoryBlock 转换成 inmemoryPart,首先会通过一个 MarshalUnsortedData 函数来序列化未排序的数据。

// MarshalUnsortedData 序列化未排序的 items 从 ib 到 sb.
//  
// It also:  
// - 将第一个 item 追加到 firstItemDst 并返回结果  
// - 将所有 item 的公共前缀附加到 commonPrefixDst 并返回结果  
// - 返回包含第一个 item 的编码项的数量  
// - 返回用于编码的 marshal 类型  
func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {  
   if !ib.isSorted() {  
      sort.Sort(ib) // 排序  
   }  
   // 更新内存块的公共前缀  
   ib.updateCommonPrefix()  
   // 序列化数据  
   return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)  
}

上面的序列化函数中首先会对未排序的数据进行排序,然后更新内存块的公共前缀:

// lib/mergeset/encoding.go
// 更新公共前缀  
func (ib *inmemoryBlock) updateCommonPrefix() {  
   ib.commonPrefix = ib.commonPrefix[:0]  // 公共前缀
   if len(ib.items) == 0 {  
      return  
   }  
   items := ib.items          // 数据前后位置  
   data := ib.data            // 数据  
   cp := items[0].Bytes(data) // 第一段数据  
   if len(cp) == 0 {  
      return  
   }  
   for _, it := range items[1:] { // 后面的数据  
      // 计算公共前缀的长度  
      cpLen := commonPrefixLen(cp, it.Bytes(data))  
      if cpLen == 0 {  
         return  
      }  
      // 截取公共前缀数据  
      cp = cp[:cpLen]  
   }  
   // 设置内存块的公共前缀  
   ib.commonPrefix = append(ib.commonPrefix[:0], cp...)  
}

公共前缀就是把每段数据包含的共同前缀提取出来,这样存储的时候后面就可以不需要存储共同的部分了,减少存储空间。

公共前缀提取出来后,接下来调用 marshalData 函数去序列化数据。

// lib/mergeset/encoding.go
// 前提条件:  
// - ib.items 必须排序  
// - updateCommonPrefix 必须被调用  
// 序列化数据  
func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {  
   ......  
   // 拷贝 inmemoryBlock 数据块的 firstItem(排序后的第一条数据)  
   data := ib.data                      // 内存块数据  
   firstItem := ib.items[0].Bytes(data) // 第一条数据  
   firstItemDst = append(firstItemDst, firstItem...)  
   // 最大公共前缀  
   commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...) 
   // 内存块数据小于2段或(数据大小-公共前缀长度*数据段大小 < 64) 则定义为小块  
   if len(data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {  
      // 对small block使用普通序列化,因为它更便宜  
      ib.marshalDataPlain(sb)  
      return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain  
   }  
  
   bbItems := bbPool.Get()  
   bItems := bbItems.B[:0// 保存目的 items 数据的内存 buffer  
  
   bbLens := bbPool.Get()  
   bLens := bbLens.B[:0// 保存目的 lens 数据的内存buffer  
  
   // 序列化 items 数据  
   // 第一项数据不需要存储,所以获取的 Uint64s 大小要减1  
   xs := encoding.GetUint64s(len(ib.items) - 1
   defer encoding.PutUint64s(xs)  
  
   cpLen := len(ib.commonPrefix) // 公共前缀的长度  
   prevItem := firstItem[cpLen:] // 第一项数据(排除公共前缀)  
   prevPrefixLen := uint64(0)  
   // 从第二个元素开始遍历(第一个 firstItem 单独存储)  
   for i, it := range ib.items[1:] { 
      // 偏移到公共前缀之后的位置
      it.Start += uint32(cpLen)   
      // Bytes(data) 得到的数据不包含公共前缀的部分                           
      item := it.Bytes(data)  
      // 计算第 N 项和 N-1 项的公共前缀长度                               
      prefixLen := uint64(commonPrefixLen(prevItem, item))   
      // 仅仅只把差异的部分拷贝到目的buffer 
      bItems = append(bItems, item[prefixLen:]...)     
      // 第一次,与0异或,还是等于原值。异或后,两个整数值前面相同的部分都为0了,数值变得更短,能够便于压缩。     
      xLen := prefixLen ^ prevPrefixLen       
      // 上次的除去公共前缀的item                 
      prevItem = item                  
      // 上次计算得到的公共前缀长度                      
      prevPrefixLen = prefixLen                            
  
      xs.A[i] = xLen // 异或后的公共前缀值  
   }

   // 对N-1个长度进行序列化(将uint64数组序列化成byte数组)  
   bLens = encoding.MarshalVarUint64s(bLens, xs.A)                                    
   // 将items数据(只有差异的部分)ZSTD压缩后,写入storageBlock 
   sb.itemsData = encoding.CompressZSTDLevel(sb.itemsData[:0], bItems, compressLevel)  
  
   bbItems.B = bItems  
   bbPool.Put(bbItems)  
  
   // 序列化 lens 数据  
   // 第一项数据大小(排除公共前缀)
   prevItemLen := uint64(len(firstItem) - cpLen)   
   for i, it := range ib.items[1:] {             // 从第二个元素开始遍历 
      // item长度 = End-Start-公共前缀大小   
      itemLen := uint64(int(it.End-it.Start) - cpLen) 
      // 与前面一个元素长度异或 
      xLen := itemLen ^ prevItemLen    
      // 上次去除公共前缀的长度                  
      prevItemLen = itemLen                           
  
      xs.A[i] = xLen // 异或后的元素长度  
   }  
   // 前面记录的是两两相对的长度,这里记录的是数据的真实长度  
   // 长度信息包含两种,相对长度和总长度  
   bLens = encoding.MarshalVarUint64s(bLens, xs.A)   
   // 将lens数据进行ZSTD压缩后,写入storageBlock                                
   sb.lensData = encoding.CompressZSTDLevel(sb.lensData[:0], bLens, compressLevel) 
  
   bbLens.B = bLens  
   bbPool.Put(bbLens)  
  
   // 如果压缩不到90%则选择不压缩  
   if float64(len(sb.itemsData)) > 0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items)) {  
      // 压缩率不高的时候,选择不压缩  
      ib.marshalDataPlain(sb)  
      return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain  
   }  
  
   // 很好的压缩率  
   return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD  
}

上面的序列化函数看上去比较复杂,实际上核心的一点就是想办法尽可能减少存储空间。首先将数据块的第一个数据拷贝出来放入 firstItemDst,然后后面就从第二个元素开始去循环处理,首先计算第 N 项和 N-1 项的公共前缀长度,然后将差异的数据部分保存起来,为了能够反序列化回数据,还需要将两两之间公共前缀的长度保存下来,为了能够便于压缩,使用异或的方式来计算两两之间的公共前缀长度值。

循环计算后,将保存的两两之间的公共前缀长度进行序列化,下面的函数将一个 uint64 类型的切片转换成字节切片,如果数据小于 128 直接转换即可,如果大于 127 则用一个 7bit 来表示数值的内容,最高位后面的一个字节用来表示长度,这样就可以用变长长度来序列化数值,而不是每个数值都占用固定的长度。

// lib/encoding/int.go
// 将uint64切片转成字节切片
func MarshalVarUint64s(dst []byte, us []uint64) []byte {  
   for _, u := range us {  
      if u < 0x80 { // 小于128,直接加入到 dst,能直接存到 byte 中去  
         // Fast path  
         dst = append(dst, byte(u))  
         continue  
      }  
      for u > 0x7f { // 大于127,则超过的部分保留为 0x80,低位右移7位继续计算  
         dst = append(dst, 0x80|byte(u))  
         u >>= 7  
      }  
      dst = append(dst, byte(u))  
   }  
   return dst  
}

长度数据序列化后,将 items 数据(只有差异的部分)进行 ZSTD 压缩后,写入 storageBlock。

只记录两两之间的公共前缀长度还不够,还需要记录数据的真实长度,最后同样再将 lens 数据进行 ZSTD 压缩后,写入 storageBlock。

如果最后的结果压缩不到 90% 则选择不压缩,不压缩则使用 marshalDataPlain 函数进行序列化:

// lib/mergeset/encoding.go
// 普通序列化数据  
func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {  
   data := ib.data  
  
   // 序列化 items 数据  
   // 不需要序列化第一项数据,因为它会在 marshalData 中返回给调用者。  
   cpLen := len(ib.commonPrefix) // 公共前缀长度  
   b := sb.itemsData[:0]  
   for _, it := range ib.items[1:] { // 第一项之后的数据  
      it.Start += uint32(cpLen)         // 跳过公共前缀  
      b = append(b, it.String(data)...) // 添加移出公共前缀的数据  
   }  
   sb.itemsData = b // itemsData数据  
  
   // 序列化 lens 数据  
   b = sb.lensData[:0]  
   for _, it := range ib.items[1:] { // 第一项之后的数据  
      // 原始的End-Start-公共前缀长度  
      b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen)) 
   }  
   sb.lensData = b  
}

经过上面的序列化过后就可以得到第一个数据、公共前缀、items 个数以及序列化类型,然后将这些数据存入 blockHeader 中去,后面就是一些比较简单的常规操作。

转换成 inmemoryPart 后,再包装成 blockStreamReader,创建一个新的 inmemoryPart,并构造一个 blockSteamWriter 用于合并写入的数据,然后调用 mergeBlockStreams 函数执行真正的 merge 操作。

// lib/mergeset/merge.go
// mergeBlockStreams 合并 bsrs 并将结果写入 bsw
//  
// 也填充了 ph  
//  
// prepareBlock 是可选的 
//  
// 当 stopCh 关闭时,该函数立即返回
//  
// 它还以原子方式将合并的 items 添加到 itemsMerged
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},  
   itemsMerged *uint64) error
 {  
   // 将多个 blockStreamReader 构造成一个 blockStreamMerger 结构
   bsm := bsmPool.Get().(*blockStreamMerger)  
   if err := bsm.Init(bsrs, prepareBlock); err != nil {  
      return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)  
   }  
   err := bsm.Merge(bsw, ph, stopCh, itemsMerged)  
   bsm.reset()  
   bsmPool.Put(bsm)  
   bsw.MustClose()  
   if err == nil {  
      return nil  
   }  
   return fmt.Errorf("cannot merge %d block streams: %s: %w"len(bsrs), bsrs, err)  
}

首先把多个 blockStreamReader 构造成一个 blockStreamMerger 结构, merger 里面主要是一个 bsrHeap 堆用于维护 bsrs,用于 merge 数据时的排序。首先通过 merger 的 Init 函数构造堆排序的结构,然后核心是调用 merger 的 Merge 函数进行处理。

// lib/mergeset/merge.go
func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {  
again:  
   if len(bsm.bsrHeap) == 0 {  
      // 将最后的 inmemoryBlock(可能不完整)写入 bsw
      bsm.flushIB(bsw, ph, itemsMerged)  
      return nil  
   }  
  
   select {  
   case <-stopCh:  
      return errForciblyStopped  
   default:  
   }  
   // 取出 blockStreamReader
   bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader)  
  
   var nextItem []byte  // 下一个 blockStreamReader
   hasNextItem := false  
   if len(bsm.bsrHeap) > 0 {  
      nextItem = bsm.bsrHeap[0].bh.firstItem  
      hasNextItem = true  
   }  
   items := bsr.Block.items  
   data := bsr.Block.data  
   // 循环所有的 items
   for bsr.blockItemIdx < len(bsr.Block.items) {  
      item := items[bsr.blockItemIdx].Bytes(data)  
      if hasNextItem && string(item) > string(nextItem) {  
         break  
      }  
      // 添加元素
      if !bsm.ib.Add(item) {  
         // bsm.ib 已满,将其刷新到 bsw 并继续
         bsm.flushIB(bsw, ph, itemsMerged)  
         continue  
      }  
      bsr.blockItemIdx++  
   }  
   if bsr.blockItemIdx == len(bsr.Block.items) {  
      // bsr.Block 已完全读取,处理下一个 block
      if bsr.Next() {  
         heap.Push(&bsm.bsrHeap, bsr)  
         goto again  
      }  
      if err := bsr.Error(); err != nil {  
         return fmt.Errorf("cannot read storageBlock: %w", err)  
      }  
      goto again  
   }  

   // bsr.Block 中的下一个 item 超过了 nextItem
   // 调整 bsr.bh.firstItem 并将 bsr 返回到堆
   bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)  
   heap.Push(&bsm.bsrHeap, bsr)  
   goto again  
}

这里主要解决的问题是多个有序的字节数组(inmemoryPart),按照字节序排序,合成一个 inmemoryPart 的过程,在 merge 的过程中,每 64KB 会单独创建一个 blockHeader,用于快速索引该 block 里面的 Items。

持久化数据

最后重复上面的过程,将 ninmemoryBlock 合并成 (n-1)/defaultPartsToMerge+1inmemoryPart,最后再调用 mergeParts 函数完成索引持久化操作,持久化后生成的索引 part,主要包含 metaindex.binindex.binlens.binitems.binmetadata.json 等 5 个文件。

这几个文件的关系如下图所示, metaindex.bin 文件通过 metaindexRow 索引 index.bin 文件,index.bin 文件通过 indexBlock 中的 blockHeader 同时索引 items.bin 文件和 items.bin 文件。

metaindex.bin:文件包含一系列的 metaindexRow 数据,每个 metaindexRow 中包含第一条数据 firstItem、索引块包含的块头部数 blockHeadersCount、索引块偏移 indexBlockOffset 以及索引块大小 indexBlockSize

  • metaindexRow 在文件中按照 firstItem 的大小的字典序排序存储,以支持二分查找
  • metaindex.bin 文件使用 ZSTD 进行压缩
  • metaindex.bin 文件中的内容在 part 打开时,会全部读出加载至内存中,以加速查询过滤
  • metaindexRow 包含的 firstItem 为其索引的 indexBlock 中所有 blockHeader 中字典序最小的 firstItem
  • 查找时根据 firstItem 进行二分检索

index.bin:文件中包含一系列的 indexBlock, 每个 indexBlock 又包含一系列 blockHeader,每个 blockHeader 包含 item 的公共前缀 commonPrefix、第一项数据 firstItemitemsData 的序列化类型 marshalTypeitemsData 包含的 item 数、item 块的偏移 itemsBlockOffset 等内容,就是前面使用将 inmemoryBlock 转换为 inmemoryPart 结构的 Init 函数得到的。

  • 每个 indexBlock 使用 ZSTD 压缩算法进行压缩
  • indexBlock 中查找时,根据 firstItem 进行二分检索 blockHeader

items.bin 文件中,包含一系列的 itemsData, 每个 itemsData 又包含一系列的 Item。

  • itemsData 会视情况而定来是否使用 ZTSD 压缩,当 item 个数小于 2 时,或者 itemsData 的长度小于 64 字节时,不压缩;当 itemsData 使用 ZSTD 压缩后的压缩率大于90%的时候也不压缩
  • 每个 item 在存储时,去掉了 blockHeader 中的公共前缀 commonPrefix 以提高压缩率

lens.bin 文件中,包含一系列的 lensData, 每个 lensData 又包含一系列 8 字节的长度 len, 长度 len 标识 items.bin 文件中对应 item 的长度。在读取或者需要解析 itemsData 中的 item 时,先要读取对应的 lensData 中对应的长度 len。 当 itemsData 进行压缩时,lensData 会先使用异或算法进行压缩,然后再使用 ZSTD 算法进一步压缩。

到这里我们就了解了索引数据是实现和存储原理了,那么真正的指标数据又是如何去存储的呢?未完待续......

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存