其他
冷热数据分离 | Alluxio元数据管理策略
点击上方蓝色字体,选择“设为星标”
本文作者:林意群
原文地址:http://suo.im/5Xcmci
一.Alluxio概述
最近访问的热点元数据,做内存缓存,叫做cached layer。
很久没有访问过的数据((也可称作冷数据),做持久化保存存,叫做persisted layer。
及时将那些访问频率降低的热点数据移除并写出到baking store里去。
有新的数据访问来时,将这些数据从baking store读出来并加载到cache里去。
When Does Cleanup Happen?
Caches built with CacheBuilder do not perform cleanup and evict values "automatically," or instantly after a value expires, or anything of the sort. Instead, it performs small amounts of maintenance during write operations, or during occasional read operations if writes are rare.
The reason for this is as follows: if we wanted to perform Cache maintenance continuously, we would need to create a thread, and its operations would be competing with user operations for shared locks. Additionally, some environments restrict the creation of threads, which would make CacheBuilder unusable in that environment.
Instead, we put the choice in your hands. If your cache is high-throughput, then you don't have to worry about performing cache maintenance to clean up expired entries and the like. If your cache does writes only rarely and you don't want cleanup to block cache reads, you may wish to create your own maintenance thread that calls Cache.cleanUp() at regular intervals.
If you want to schedule regular cache maintenance for a cache which only rarely has writes, just schedule the maintenance using ScheduledExecutorService.
public final class CachingInodeStore implements InodeStore, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(CachingInodeStore.class);
// Backing store用户数据写出持久化的store
private final InodeStore mBackingStore;
private final InodeLockManager mLockManager;
// Cache recently-accessed inodes.
@VisibleForTesting
final InodeCache mInodeCache;
// Cache recently-accessed inode tree edges.
@VisibleForTesting
final EdgeCache mEdgeCache;
@VisibleForTesting
final ListingCache mListingCache;
// Starts true, but becomes permanently false if we ever need to spill metadata to the backing
// store. When true, we can optimize lookups for non-existent inodes because we don't need to
// check the backing store. We can also optimize getChildren by skipping the range query on the
// backing store.
private volatile boolean mBackingStoreEmpty;
...
public CachingInodeStore(InodeStore backingStore, InodeLockManager lockManager) {
mBackingStore = backingStore;
mLockManager = lockManager;
AlluxioConfiguration conf = ServerConfiguration.global();
int maxSize = conf.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE);
Preconditions.checkState(maxSize > 0,
"Maximum cache size %s must be positive, but is set to %s",
PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE.getName(), maxSize);
float highWaterMarkRatio = ConfigurationUtils.checkRatio(conf,
PropertyKey.MASTER_METASTORE_INODE_CACHE_HIGH_WATER_MARK_RATIO);
// 最高水位的计算
int highWaterMark = Math.round(maxSize * highWaterMarkRatio);
float lowWaterMarkRatio = ConfigurationUtils.checkRatio(conf,
PropertyKey.MASTER_METASTORE_INODE_CACHE_LOW_WATER_MARK_RATIO);
Preconditions.checkState(lowWaterMarkRatio <= highWaterMarkRatio,
"low water mark ratio (%s=%s) must not exceed high water mark ratio (%s=%s)",
PropertyKey.MASTER_METASTORE_INODE_CACHE_LOW_WATER_MARK_RATIO.getName(), lowWaterMarkRatio,
PropertyKey.MASTER_METASTORE_INODE_CACHE_HIGH_WATER_MARK_RATIO, highWaterMarkRatio);
// 最低水位的计算
int lowWaterMark = Math.round(maxSize * lowWaterMarkRatio);
mBackingStoreEmpty = true;
CacheConfiguration cacheConf = CacheConfiguration.newBuilder().setMaxSize(maxSize)
.setHighWaterMark(highWaterMark).setLowWaterMark(lowWaterMark)
.setEvictBatchSize(conf.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_EVICT_BATCH_SIZE))
.build();
// 将上述cache相关配置值传入cache中
mInodeCache = new InodeCache(cacheConf);
mEdgeCache = new EdgeCache(cacheConf);
mListingCache = new ListingCache(cacheConf);
}
class InodeCache extends Cache<Long, MutableInode<?>> {
public InodeCache(CacheConfiguration conf) {
super(conf, "inode-cache", MetricKey.MASTER_INODE_CACHE_SIZE);
}
...
}
public abstract class Cache<K, V> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(Cache.class);
private final int mMaxSize;
// cache的高水位值,当当前cache entry总数超过此值时,会触发entry的写出
private final int mHighWaterMark;
// cache的低水位值,每次cache写出清理后的entry总数
private final int mLowWaterMark;
// 每次过期写出entry的批量大小
private final int mEvictBatchSize;
private final String mName;
// cache map,为了保证线程安全,使用了ConcurrentHashMap
@VisibleForTesting
final ConcurrentHashMap<K, Entry> mMap;
// TODO(andrew): Support using multiple threads to speed up backing store writes.
// Thread for performing eviction to the backing store.
@VisibleForTesting
// entry移除写出线程
final EvictionThread mEvictionThread;
...
class EvictionThread extends Thread {
@VisibleForTesting
volatile boolean mIsSleeping = true;
// 存储需要被清理出去的cache entry
private final List<Entry> mEvictionCandidates = new ArrayList<>(mEvictBatchSize);
private final List<Entry> mDirtyEvictionCandidates = new ArrayList<>(mEvictBatchSize);
private final Logger mCacheFullLogger = new SamplingLogger(LOG, 10L * Constants.SECOND_MS);
...
@Override
public void run() {
while (!Thread.interrupted()) {
// 如果当前map总entry数未超过高水位置,则线程进行wait等待
while (!overHighWaterMark()) {
synchronized (mEvictionThread) {
if (!overHighWaterMark()) {
try {
mIsSleeping = true;
mEvictionThread.wait();
mIsSleeping = false;
} catch (InterruptedException e) {
return;
}
}
}
}
if (cacheIsFull()) {
mCacheFullLogger.warn(
"Metastore {} cache is full. Consider increasing the cache size or lowering the "
+ "high water mark. size:{} lowWaterMark:{} highWaterMark:{} maxSize:{}",
mName, mMap.size(), mLowWaterMark, mHighWaterMark, mMaxSize);
}
// 如果当前map总entry数超过高水位置,则开始准备进行entry的写出清理,map entry数量清理至低水位置
evictToLowWaterMark();
}
}
}
private void evictToLowWaterMark() {
long evictionStart = System.nanoTime();
// 计算此处entry移除会被移除的数量
int toEvict = mMap.size() - mLowWaterMark;
// 当前移除entry的计数累加值
int evictionCount = 0;
// 进行entry的写出移除
while (evictionCount < toEvict) {
if (!mEvictionHead.hasNext()) {
mEvictionHead = mMap.values().iterator();
}
// 遍历mapentry,进行需要被移除的entry数的收集
fillBatch(toEvict - evictionCount);
// 进行entry的写出清理
evictionCount += evictBatch();
}
if (evictionCount > 0) {
LOG.debug("{}: Evicted {} entries in {}ms", mName, evictionCount,
(System.nanoTime() - evictionStart) / Constants.MS_NANO);
}
}
private void fillBatch(int count) {
// 单次移除entry数的上限值设定
int targetSize = Math.min(count, mEvictBatchSize);
// 当待移除entry未达到目标值时,继续遍历map寻找未被引用的entry
while (mEvictionCandidates.size() < targetSize && mEvictionHead.hasNext()) {
Entry candidate = mEvictionHead.next();
// 如果entry被外界引用,则将其引用值标记为false,下次如果还遍历到此entry,此entry将被收集移除
// 当entry被会访问时,其reference值会被标记为true。
if (candidate.mReferenced) {
candidate.mReferenced = false;
continue;
}
// 如果此entry已经被标记为没有引用,则加入到待移除entry列表内
mEvictionCandidates.add(candidate);
if (candidate.mDirty) {
mDirtyEvictionCandidates.add(candidate);
}
}
}
private int evictBatch() {
int evicted = 0;
if (mEvictionCandidates.isEmpty()) {
return evicted;
}
// 进行entry的写出,entry分为两类
// 如果entry值和baking store里保存的是一致的话:则直接从map里进行移除即可
// 如果entry值和baking store对比是发生过更新的,则额外还需要进行flush写出,然后map里再进行移除
flushEntries(mDirtyEvictionCandidates);
for (Entry entry : mEvictionCandidates) {
if (evictIfClean(entry)) {
evicted++;
}
}
mEvictionCandidates.clear();
mDirtyEvictionCandidates.clear();
return evicted;
}
第一类,只需从cache map中进行移除
第二类,从cache map中进行移除,还需要写出到baking store。
protected class Entry {
protected K mKey;
// null value means that the key has been removed from the cache, but still needs to be removed
// from the backing store.
@Nullable
protected V mValue;
// Whether the entry is out of sync with the backing store. If mDirty is true, the entry must be
// flushed to the backing store before it can be evicted.
protected volatile boolean mDirty = true;
,,,
/**
* Attempts to flush the given entries to the backing store.
*
* The subclass is responsible for setting each candidate's mDirty field to false on success.
*
* @param candidates the candidate entries to flush
*/
protected abstract void flushEntries(List<Entry> candidates)
/**
* Writes a key/value pair to the cache.
*
* @param key the key
* @param value the value
*/
public void put(K key, V value) {
mMap.compute(key, (k, entry) -> {
// put操作callback接口方法
onPut(key, value);
// 如果是cache已经满了,则直接写出到baking store里
if (entry == null && cacheIsFull()) {
writeToBackingStore(key, value);
return null;
}
if (entry == null || entry.mValue == null) {
onCacheUpdate(key, value);
return new Entry(key, value);
}
// 进行entry的更新
entry.mValue = value;
// 标记entry reference引用值为true,意为近期此entry被访问过,在get,remove方法中,也会更新此属性值为true
entry.mReferenced = true;
// 标记此数据为dirty,意为从baking load此entry值后,此值发生过更新
entry.mDirty = true;
return entry;
});
// 随后通知Eviction线程,判断是否需要进行entry的移除,在get,remove方法中,也会在末尾调用此方法
wakeEvictionThreadIfNecessary();
}
文章不错?点个【在看】吧! 👇