Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
#include "table/block_based_table_factory.h"
#include "table/merging_iterator.h"
#include "table/two_level_iterator.h"
#include "third-party/zenfs/fs/zbd_stat.h"
#include "util/autovector.h"
#include "util/build_version.h"
#include "util/c_style_callback.h"
Expand Down Expand Up @@ -113,6 +114,8 @@

namespace TERARKDB_NAMESPACE {

std::vector<ZoneStat> GetStat(Env* env);

const std::string kDefaultColumnFamilyName("default");
const uint64_t kDumpStatsWaitMicroseconds = 10000;
const std::string kPersistentStatsColumnFamilyName(
Expand Down Expand Up @@ -985,6 +988,163 @@ void DBImpl::ScheduleTtlGC() {
log_buffer_debug.FlushBufferToLog();
}

#ifdef LIBZBD
void DBImpl::ScheduleZNSGC() {
TEST_SYNC_POINT("DBImpl:ScheduleZNSGC");
uint64_t nowSeconds = env_->NowMicros() / 1000U / 1000U;
LogBuffer log_buffer_info(InfoLogLevel::INFO_LEVEL,
immutable_db_options_.info_log.get());
LogBuffer log_buffer_debug(InfoLogLevel::DEBUG_LEVEL,
immutable_db_options_.info_log.get());

chash_set<uint64_t> mark_for_gc;

if (initial_db_options_.zenfs_gc_ratio <= 0.0 ||
initial_db_options_.zenfs_gc_ratio >= 1.0) {
// GC is not enabled
return;
}

// Pick files for GC
auto stat = GetStat(env_);

uint64_t number;
FileType type;

// Merge db paths and column family paths together
chash_set<std::string> db_paths;

// Get column family paths
mutex_.Lock();
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (const auto& path : cfd->ioptions()->db_paths) {
db_paths.emplace(path.path);
}
}
mutex_.Unlock();

// Get database paths
for (const auto& path : immutable_db_options_.db_paths) {
db_paths.emplace(path.path);
}

std::string strip_filename;

for (const auto& zone : stat) {
std::vector<uint64_t> sst_in_zone;
uint64_t written_data = zone.write_position - zone.start_position;
// zone is full
if (written_data == zone.total_capacity) {
uint64_t total_size = 0;
bool ignore_zone = false;
for (const auto& file : zone.files) {
strip_filename.clear();

for (const auto& path : db_paths) {
if (Slice(file.filename).starts_with(path)) {
strip_filename.assign(file.filename, path.length(),
file.filename.length() - path.length());
break;
}
}

if (strip_filename.empty()) {
// This file is not in DB folder.
ignore_zone = true;
break;
}

if (ParseFileName(strip_filename, &number, Slice(), &type)) {
// Is SST file, and is of current TerarkDB instance.
if (type == kTableFile) {
total_size += file.size_in_zone;
sst_in_zone.push_back(number);
} else {
// This zone contains file other than SSTs or files from other
// databases. We ignore the zone for now. When other files (like
// logs) have been deleted, we will come back and recycle this zone.
ignore_zone = true;
break;
}
} else {
// This file is not recognized by TerarkDB (or RocksDB). Even if we
// move the file, the zone may not be reset. Therefore, we simply
// ignore this zone.
ignore_zone = true;
break;
}
}

if (ignore_zone) {
continue;
}

// if data in zone <= (1 - ratio) * total_capacity, recycle the zone
if (total_size <=
(1.0 - initial_db_options_.zenfs_gc_ratio) * written_data) {
for (auto&& file_id : sst_in_zone) {
mark_for_gc.insert(file_id);
}
}
}
}

mutex_.Lock();
for (auto cfd : *versions_->GetColumnFamilySet()) {
uint64_t new_mark_count = 0;
uint64_t old_mark_count = 0;
uint64_t total_count = 0;
if (!cfd->initialized() || cfd->IsDropped()) {
continue;
}
VersionStorageInfo* vstorage = cfd->current()->storage_info();
// Level -1 contains SSTs inside lazy compaction SST index.
// By iterating level -1, we could collect that kind of garbage.
// But we still recommend using ZNS GC without lazy compaction
// enabled.
for (int l = -1; l < vstorage->num_non_empty_levels(); l++) {
for (auto meta : vstorage->LevelFiles(l)) {
if (meta->being_compacted) {
continue;
}
++total_count;
old_mark_count += meta->marked_for_compaction;
TEST_SYNC_POINT("DBImpl:Exist-SST");
if (!meta->marked_for_compaction &&
mark_for_gc.count(meta->fd.GetNumber()) > 0) {
meta->marked_for_compaction = true;
}
if (meta->marked_for_compaction) {
new_mark_count++;
TEST_SYNC_POINT("DBImpl:ScheduleZNSGC-mark");
}
}
}
if (new_mark_count > old_mark_count) {
vstorage->ComputeCompactionScore(*cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
if (!cfd->queued_for_compaction()) {
AddToCompactionQueue(cfd);
unscheduled_compactions_++;
}
}
if (old_mark_count != 0 && new_mark_count != 0) {
ROCKS_LOG_BUFFER(&log_buffer_info,
"[%s] ZNS GC: SSTs total marked = %" PRIu64
", new marked = %" PRIu64 ", file count: %" PRIu64,
cfd->GetName().c_str(), old_mark_count, new_mark_count,
total_count);
}
}
if (unscheduled_compactions_ > 0) {
MaybeScheduleFlushOrCompaction();
}
mutex_.Unlock();
log_buffer_info.FlushBufferToLog();
log_buffer_debug.FlushBufferToLog();
}
#endif

void DBImpl::DumpStats() {
TEST_SYNC_POINT("DBImpl::DumpStats:1");
#ifndef ROCKSDB_LITE
Expand Down
15 changes: 10 additions & 5 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ class DBImpl : public DB {
// Implemented in db_impl_debug.cc

// Compact any files in the named level that overlap [*begin, *end]
Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
ColumnFamilyHandle* column_family = nullptr,
SeparationType separation_type =
kCompactionTransToSeparate,
bool disallow_trivial_move = false);
Status TEST_CompactRange(
int level, const Slice* begin, const Slice* end,
ColumnFamilyHandle* column_family = nullptr,
SeparationType separation_type = kCompactionTransToSeparate,
bool disallow_trivial_move = false);

void TEST_SwitchWAL();

Expand Down Expand Up @@ -806,6 +806,11 @@ class DBImpl : public DB {

void ScheduleTtlGC();

#ifdef LIBZBD
// schedule GC by polling ZNS zone status
void ScheduleZNSGC();
#endif

protected:
Env* const env_;
const std::string dbname_;
Expand Down
16 changes: 16 additions & 0 deletions db/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,22 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
to_delete;
}

// TODO: Workaround for ZNS and Windows.
// In TerarkDB, when a WAL is not needed, it is first deleted, and then
// closed. This means that the underlying FS must support deferred delete.
// In this case, we delete the writer before issuing delete to FS.
if (type == kLogFile) {
auto it =
std::find_if(state.logs_to_free.begin(), state.logs_to_free.end(),
[number](log::Writer* writer) {
return writer->get_log_number() == number;
});
if (it != state.logs_to_free.end()) {
delete *it;
*it = nullptr;
}
}

#ifndef ROCKSDB_LITE
if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
immutable_db_options_.wal_size_limit_mb > 0)) {
Expand Down
10 changes: 10 additions & 0 deletions db/periodic_work_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ void PeriodicWorkScheduler::Register(DBImpl* dbi,
initial_delay.fetch_add(1) % kDefaultScheduleGCTTLPeriodSec *
kMicrosInSecond,
kDefaultScheduleGCTTLPeriodSec * kMicrosInSecond);
#ifdef LIBZBD
timer->Add([dbi]() { dbi->ScheduleZNSGC(); },
GetTaskName(dbi, "schedule_gc_zns"),
initial_delay.fetch_add(1) % kDefaultScheduleZNSTTLPeriodSec *
kMicrosInSecond,
kDefaultScheduleZNSTTLPeriodSec * kMicrosInSecond);
#endif
}

void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
Expand All @@ -53,6 +60,9 @@ void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
timer->Cancel(GetTaskName(dbi, "pst_st"));
timer->Cancel(GetTaskName(dbi, "flush_info_log"));
timer->Cancel(GetTaskName(dbi, "schedule_gc_ttl"));
#ifdef LIBZBD
timer->Cancel(GetTaskName(dbi, "schedule_gc_zns"));
#endif
if (!timer->HasPendingTask()) {
timer->Shutdown();
}
Expand Down
1 change: 1 addition & 0 deletions db/periodic_work_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class PeriodicWorkScheduler {
// log.
static const uint64_t kDefaultFlushInfoLogPeriodSec = 10;
static const uint64_t kDefaultScheduleGCTTLPeriodSec = 10;
static const uint64_t kDefaultScheduleZNSTTLPeriodSec = 10;

protected:
std::unique_ptr<Timer> timer;
Expand Down
31 changes: 23 additions & 8 deletions env/env_zenfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#ifdef LIBZBD
#include "third-party/zenfs/fs/fs_zenfs.h"
#include "third-party/zenfs/fs/zbd_stat.h"
#include "third-party/zenfs/fs/zbd_zenfs.h"

namespace TERARKDB_NAMESPACE {
Expand Down Expand Up @@ -166,9 +167,7 @@ class ZenfsDirectory : public Directory {
explicit ZenfsDirectory(std::unique_ptr<FSDirectory>&& target)
: target_(std::move(target)) {}

Status Fsync() override {
return target_->Fsync(IOOptions(), nullptr);
}
Status Fsync() override { return target_->Fsync(IOOptions(), nullptr); }
size_t GetUniqueId(char* id, size_t max_size) const override {
return target_->GetUniqueId(id, max_size);
}
Expand Down Expand Up @@ -469,14 +468,20 @@ class ZenfsEnv : public EnvWrapper {
target_->SanitizeEnvOptions(env_opts);
}

Status GetZbdDiskSpaceInfo(uint64_t &total_size, uint64_t &avail_size, uint64_t &used_size) {
Status GetZbdDiskSpaceInfo(uint64_t& total_size, uint64_t& avail_size,
uint64_t& used_size) {
auto zbd = dynamic_cast<ZenFS*>(fs_)->GetZonedBlockDevice();
used_size = zbd->GetUsedSpace() + zbd->GetReclaimableSpace();
avail_size = zbd->GetFreeSpace();
total_size = used_size + avail_size;
return Status::OK();
}

std::vector<ZoneStat> GetStat() {
auto zen_fs = dynamic_cast<ZenFS*>(fs_);
return zen_fs->GetStat();
}

private:
Env* target_;
FileSystem* fs_;
Expand All @@ -490,8 +495,16 @@ Status NewZenfsEnv(Env** zenfs_env, const std::string& zdb_path) {
return s;
}

Status GetZbdDiskSpaceInfo(Env* env, uint64_t &total_size, uint64_t &avail_size, uint64_t &used_size) {
return dynamic_cast<ZenfsEnv*>(env)->GetZbdDiskSpaceInfo(total_size, avail_size, used_size);
Status GetZbdDiskSpaceInfo(Env* env, uint64_t& total_size, uint64_t& avail_size,
uint64_t& used_size) {
return dynamic_cast<ZenfsEnv*>(env)->GetZbdDiskSpaceInfo(
total_size, avail_size, used_size);
}

std::vector<ZoneStat> GetStat(Env* env) {
auto zen_env = dynamic_cast<ZenfsEnv*>(env);
if (!zen_env) return {};
return zen_env->GetStat();
}

} // namespace TERARKDB_NAMESPACE
Expand All @@ -505,11 +518,13 @@ Status NewZenfsEnv(Env** zenfs_env, const std::string& zdb_path) {
return Status::NotSupported("ZenFSEnv is not implemented.");
}

Status GetZbdDiskSpaceInfo(Env* env, uint64_t &total_size, uint64_t &avail_size, uint64_t &used_size) {
Status GetZbdDiskSpaceInfo(Env* env, uint64_t& total_size, uint64_t& avail_size,
uint64_t& used_size) {
return Status::NotSupported("GetZbdDiskSpaceInfo is not implemented.");
}

std::vector<ZoneStat> GetStat(Env* env) { return {}; }

} // namespace TERARKDB_NAMESPACE

#endif

Loading