Skip to content

Commit

Permalink
Fix a leak of open Blob files
Browse files Browse the repository at this point in the history
Summary: ...

Fixes facebook#13066

Important follow-up (FIXME): The added check discovered some apparent
cases of leaked (into table_cache) SST file readers that would stick
around until DB::Close(). Need to enable that check, diagnose, and fix.

Test Plan: added a check that is called during DB::Close in ASAN builds
(to minimize paying the cost in all unit tests). Without the fix, the
check failed in at least these tests:

```
db_blob_basic_test DBBlobBasicTest.DynamicallyWarmCacheDuringFlush
db_blob_compaction_test DBBlobCompactionTest.CompactionReadaheadMerge
db_blob_compaction_test DBBlobCompactionTest.MergeBlobWithBase
db_blob_compaction_test DBBlobCompactionTest.CompactionDoNotFillCache
db_blob_compaction_test DBBlobCompactionTest.SkipUntilFilter
db_blob_compaction_test DBBlobCompactionTest.CompactionFilter
db_blob_compaction_test DBBlobCompactionTest.CompactionReadaheadFilter
db_blob_compaction_test DBBlobCompactionTest.CompactionReadaheadGarbageCollection
```
  • Loading branch information
pdillinger committed Oct 31, 2024
1 parent 9ad772e commit ccc95f7
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 10 deletions.
10 changes: 10 additions & 0 deletions db/blob/blob_file_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Status BlobFileCache::GetBlobFileReader(
assert(blob_file_reader);
assert(blob_file_reader->IsEmpty());

// NOTE: sharing same Cache with table_cache
const Slice key = GetSliceForKey(&blob_file_number);

assert(cache_);
Expand Down Expand Up @@ -98,4 +99,13 @@ Status BlobFileCache::GetBlobFileReader(
return Status::OK();
}

void BlobFileCache::Evict(uint64_t blob_file_number) {
// NOTE: sharing same Cache with table_cache
const Slice key = GetSliceForKey(&blob_file_number);

assert(cache_);

cache_.get()->Erase(key);
}

} // namespace ROCKSDB_NAMESPACE
9 changes: 9 additions & 0 deletions db/blob/blob_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ class BlobFileCache {
uint64_t blob_file_number,
CacheHandleGuard<BlobFileReader>* blob_file_reader);

// Called when a blob file is obsolete to ensure it is removed from the cache
// to avoid effectively leaking the open file and assicated memory
void Evict(uint64_t blob_file_number);

// Used to identify cache entries for blob files (not normally useful)
static const Cache::CacheItemHelper* GetHelper() {
return CacheInterface::GetBasicHelper();
}

private:
using CacheInterface =
BasicTypedCacheInterface<BlobFileReader, CacheEntryRole::kMisc>;
Expand Down
1 change: 1 addition & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ class ColumnFamilyData {
SequenceNumber earliest_seq);

TableCache* table_cache() const { return table_cache_.get(); }
BlobFileCache* blob_file_cache() const { return blob_file_cache_.get(); }
BlobSource* blob_source() const { return blob_source_.get(); }

// See documentation in compaction_picker.h
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,15 +659,17 @@ Status DBImpl::CloseHelper() {
// We need to release them before the block cache is destroyed. The block
// cache may be destroyed inside versions_.reset(), when column family data
// list is destroyed, so leaving handles in table cache after
// versions_.reset() may cause issues.
// Here we clean all unreferenced handles in table cache.
// versions_.reset() may cause issues. Here we clean all unreferenced handles
// in table cache, and (for certain builds/conditions) assert that no obsolete
// files are hanging around unreferenced (leak) in the table/blob file cache.
// Now we assume all user queries have finished, so only version set itself
// can possibly hold the blocks from block cache. After releasing unreferenced
// handles here, only handles held by version set left and inside
// versions_.reset(), we will release them. There, we need to make sure every
// time a handle is released, we erase it from the cache too. By doing that,
// we can guarantee that after versions_.reset(), table cache is empty
// so the cache can be safely destroyed.
TEST_VerifyNoObsoleteFilesCached(/*db_mutex_already_held=*/true);
table_cache_->EraseUnRefEntries();

for (auto& txn_entry : recovered_transactions_) {
Expand Down
7 changes: 6 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1239,9 +1239,14 @@ class DBImpl : public DB {
static Status TEST_ValidateOptions(const DBOptions& db_options) {
return ValidateOptions(db_options);
}

#endif // NDEBUG

// In certain configurations, verify that the table/blob file cache only
// contains entries for live files, to check for effective leaks of open
// files. This can only be called when purging of obsolete files has
// "settled," such as during parts of DB Close().
void TEST_VerifyNoObsoleteFilesCached(bool db_mutex_already_held) const;

// persist stats to column family "_persistent_stats"
void PersistStats();

Expand Down
49 changes: 49 additions & 0 deletions db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#ifndef NDEBUG

#include "db/blob/blob_file_cache.h"
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
Expand Down Expand Up @@ -321,5 +322,53 @@ size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
return EstimateInMemoryStatsHistorySize();
}

void DBImpl::TEST_VerifyNoObsoleteFilesCached(
bool db_mutex_already_held) const {
// This check is somewhat expensive and obscure to make a part of every
// unit test in every build variety. Thus, we only enable it for ASAN builds,
// and of course only DEBUG builds.
#ifndef NDEBUG
if (!kMustFreeHeapAllocations) {
return;
}

std::optional<InstrumentedMutexLock> l;
if (db_mutex_already_held) {
mutex_.AssertHeld();
} else {
l.emplace(&mutex_);
}

std::vector<uint64_t> live_table_files;
std::vector<uint64_t> live_blob_files;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->current()->AddLiveFiles(&live_table_files, &live_blob_files);
}

std::set<uint64_t> live_files;
live_files.insert(live_table_files.begin(), live_table_files.end());
live_files.insert(live_blob_files.begin(), live_blob_files.end());

auto fn = [&](const Slice& key, Cache::ObjectPtr, size_t,
const Cache::CacheItemHelper* helper) {
if (helper != BlobFileCache::GetHelper()) {
// Skip non-blob files for now
// FIXME: diagnose and fix the leaks of obsolete SST files revealed in
// unit tests.
return;
}
// See TableCache and BlobFileCache
assert(key.size() == sizeof(uint64_t));
uint64_t file_number;
GetUnaligned(reinterpret_cast<const uint64_t*>(key.data()), &file_number);
assert(live_files.find(file_number) != live_files.end());
};
table_cache_->ApplyToAllEntries(fn, {});
#endif // !NDEBUG
}
} // namespace ROCKSDB_NAMESPACE
#endif // NDEBUG
2 changes: 2 additions & 0 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Status TableCache::GetTableReader(
}

Cache::Handle* TableCache::Lookup(Cache* cache, uint64_t file_number) {
// NOTE: sharing same Cache with BlobFileCache
Slice key = GetSliceForFileNumber(&file_number);
return cache->Lookup(key);
}
Expand All @@ -178,6 +179,7 @@ Status TableCache::FindTable(
size_t max_file_size_for_l0_meta_pin, Temperature file_temperature) {
PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos, ioptions_.clock);
uint64_t number = file_meta.fd.GetNumber();
// NOTE: sharing same Cache with BlobFileCache
Slice key = GetSliceForFileNumber(&number);
*handle = cache_.Lookup(key);
TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
Expand Down
12 changes: 6 additions & 6 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "cache/cache_reservation_manager.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_meta.h"
#include "db/dbformat.h"
#include "db/internal_stats.h"
Expand Down Expand Up @@ -744,19 +745,18 @@ class VersionBuilder::Rep {
return Status::Corruption("VersionBuilder", oss.str());
}

// Note: we use C++11 for now but in C++14, this could be done in a more
// elegant way using generalized lambda capture.
VersionSet* const vs = version_set_;
const ImmutableCFOptions* const ioptions = ioptions_;

auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
auto deleter = [vs = version_set_, ioptions = ioptions_,
bc = cfd_ ? cfd_->blob_file_cache()
: nullptr](SharedBlobFileMetaData* shared_meta) {
if (vs) {
assert(ioptions);
assert(!ioptions->cf_paths.empty());
assert(shared_meta);
assert(bc);

vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
ioptions->cf_paths.front().path);
bc->Evict(shared_meta->GetBlobFileNumber());
}

delete shared_meta;
Expand Down
1 change: 0 additions & 1 deletion db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -1526,7 +1526,6 @@ class VersionSet {
void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);

void AddObsoleteBlobFile(uint64_t blob_file_number, std::string path) {
// TODO: Erase file from BlobFileCache?
obsolete_blob_files_.emplace_back(blob_file_number, std::move(path));
}

Expand Down

0 comments on commit ccc95f7

Please sign in to comment.