Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 10 additions & 32 deletions src/Disks/ObjectStorages/VFS/JSONSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Common/Exception.h>

#include <Disks/ObjectStorages/VFS/JSONUtils.h>
#include <Disks/ObjectStorages/VFS/VFSLog.h>
#include <IO/ReadHelpers.h>
#include <Poco/JSON/JSON.h>
Expand Down Expand Up @@ -29,15 +30,15 @@ class JSONSerializer
Poco::JSON::Object object;

serializeToJSON(event, object);
String str = to_string(object);
String str = JSONUtils::to_string(object);

return {str.begin(), str.end()};
}

static VFSEvent deserialize(std::span<char> buffer)
{
String str(buffer.data(), buffer.size());
auto object = from_string(std::move(str));
auto object = JSONUtils::from_string(std::move(str));

if (!object)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot parse VFS log item buffer as JSON");
Expand All @@ -49,29 +50,6 @@ class JSONSerializer
}

private:
template <typename Value>
static Value getOrThrow(const Poco::JSON::Object & object, const String & key)
{
if (!object.has(key))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Key {} is not found in VFS log item", key);
return object.getValue<Value>(key);
}

static String to_string(const Poco::JSON::Object & object)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(object, oss);

return oss.str();
}

static Poco::JSON::Object::Ptr from_string(const String & json_str)
{
Poco::JSON::Parser parser;
return parser.parse(json_str).extract<Poco::JSON::Object::Ptr>();
}

static void serializeToJSON(const VFSEvent & event, Poco::JSON::Object & object)
{
object.set("remote_path", event.remote_path);
Expand All @@ -97,20 +75,20 @@ class JSONSerializer

static void deserializeFromJSON(VFSEvent & event, const Poco::JSON::Object & json)
{
event.remote_path = getOrThrow<String>(json, "remote_path");
event.local_path = getOrThrow<String>(json, "local_path");
event.action = static_cast<VFSAction>(getOrThrow<std::underlying_type_t<VFSAction>>(json, "action"));
event.timestamp = getOrThrow<UInt64>(json, "timestamp");
event.remote_path = JSONUtils::getOrThrow<String>(json, "remote_path");
event.local_path = JSONUtils::getOrThrow<String>(json, "local_path");
event.action = static_cast<VFSAction>(JSONUtils::getOrThrow<std::underlying_type_t<VFSAction>>(json, "action"));
event.timestamp = JSONUtils::getOrThrow<UInt64>(json, "timestamp");

if (auto orig_wal = json.getObject("orig_wal"); orig_wal)
deserializeFromJSON(event.orig_wal.emplace(), *orig_wal);
}

static void deserializeFromJSON(WALInfo & orig_wal, const Poco::JSON::Object & json)
{
orig_wal.id = parseFromString<UUID>(getOrThrow<String>(json, "id"));
orig_wal.index = getOrThrow<UInt64>(json, "index");
orig_wal.replica = getOrThrow<String>(json, "replica");
orig_wal.id = parseFromString<UUID>(JSONUtils::getOrThrow<String>(json, "id"));
orig_wal.index = JSONUtils::getOrThrow<UInt64>(json, "index");
orig_wal.replica = JSONUtils::getOrThrow<String>(json, "replica");
}
};

Expand Down
42 changes: 42 additions & 0 deletions src/Disks/ObjectStorages/VFS/JSONUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <Common/Exception.h>

#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/Stringifier.h>


namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
}
/// TODO: Do refactoring: we can derive from POCO::Json::Object
/// and we should replace Poco::JSON::Object to Poco::JSON::Object::Ptr
struct JSONUtils
{
template <typename Value>
static Value getOrThrow(const Poco::JSON::Object & object, const String & key)
{
if (!object.has(key))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Key {} is not found in JSON object", key);
return object.getValue<Value>(key);
}

static String to_string(const Poco::JSON::Object & object)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(object, oss);

return oss.str();
}

static Poco::JSON::Object::Ptr from_string(const String & json_str)
{
Poco::JSON::Parser parser;
return parser.parse(json_str).extract<Poco::JSON::Object::Ptr>();
}
};
}
6 changes: 3 additions & 3 deletions src/Disks/ObjectStorages/VFS/VFSGarbageCollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ void VFSGarbageCollector::updateSnapshot()
}
LOG_DEBUG(log, "Merge snapshot with {} entries from wal.", wal_items_batch.size());

auto new_snaphot_meta = vfs_shapshot_data.mergeWithWals(std::move(wal_items_batch), snapshot_meta);
auto new_snapshot_meta = vfs_shapshot_data.mergeWithWals(std::move(wal_items_batch), snapshot_meta);

updateShapshotMetadata(new_snaphot_meta, snapshot_meta.znode_version);
updateShapshotMetadata(new_snapshot_meta, snapshot_meta.znode_version);
wal.dropUpTo(wal_items_batch.back().wal.index + 1);

LOG_DEBUG(log, "Snapshot update finished with new shapshot key {}", new_snaphot_meta.object_storage_key);
LOG_DEBUG(log, "Snapshot update finished with new shapshot key {}", new_snapshot_meta.object_storage_key);
}
}
4 changes: 4 additions & 0 deletions src/Disks/ObjectStorages/VFS/VFSLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ struct VFSLogItem
VFSEvent event;
WALInfo wal;

UUID getDestinationWalId() const { return ((event.action == VFSAction::REQUEST) ? event.orig_wal->id : wal.id); }
UInt64 getDestinationWalIndex() const { return ((event.action == VFSAction::REQUEST) ? event.orig_wal->index : wal.index); }
String getDestinationReplicaName() const { return ((event.action == VFSAction::REQUEST) ? event.orig_wal->replica : wal.replica); }

bool operator==(const VFSLogItem &) const = default;
};

Expand Down
78 changes: 54 additions & 24 deletions src/Disks/ObjectStorages/VFS/VFSShapshotMetadata.h
Original file line number Diff line number Diff line change
@@ -1,73 +1,103 @@
#pragma once

#include <Disks/ObjectStorages/VFS/JSONUtils.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>

#include <fmt/chrono.h>

#include <unordered_map>

namespace DB
{

using ProcessedLogsIndicesMap = std::unordered_map<UUID, UInt64>;

struct SnapshotMetadata
{
uint64_t metadata_version;
UInt64 metadata_version;
String object_storage_key;
uint64_t total_size;
int32_t znode_version;
bool is_initial_snaphot;
UInt64 total_size;
Int32 znode_version;
bool is_initial_snapshot;
ProcessedLogsIndicesMap processed_logs_indices;

SnapshotMetadata(
uint64_t metadata_version_ = 0ull,
UInt64 metadata_version_ = 0ull,
const String & object_storage_key_ = "",
uint64_t total_size_ = 0ull,
int32_t znode_version_ = -1,
UInt64 total_size_ = 0ull,
Int32 znode_version_ = -1,
bool is_initial_ = false)
: metadata_version(metadata_version_)
, object_storage_key(object_storage_key_)
, total_size(total_size_)
, znode_version(znode_version_)
, is_initial_snaphot(is_initial_)
, is_initial_snapshot(is_initial_)
{
}

SnapshotMetadata(
const String & object_storage_key_,
uint64_t metadata_version_ = 0ull,
uint64_t total_size_ = 0ull,
int32_t znode_version_ = -1,
UInt64 metadata_version_ = 0ull,
UInt64 total_size_ = 0ull,
Int32 znode_version_ = -1,
bool is_initial_ = false)
: metadata_version(metadata_version_)
, object_storage_key(object_storage_key_)
, total_size(total_size_)
, znode_version(znode_version_)
, is_initial_snaphot(is_initial_)
, is_initial_snapshot(is_initial_)
{
}

void update(const String & new_snapshot_key) { object_storage_key = new_snapshot_key; }

String serialize() const { return fmt::format("{} {} {} ", metadata_version, object_storage_key, total_size); }
String serialize() const
{
Poco::JSON::Object::Ptr root(new Poco::JSON::Object());

root->set("metadata_version", metadata_version);
root->set("object_storage_key", object_storage_key);
root->set("total_size", total_size);
root->set("znode_version", znode_version);

Poco::JSON::Object::Ptr logs_indices_object(new Poco::JSON::Object());
root->set("processed_logs_indices", logs_indices_object);

static SnapshotMetadata deserialize(const String & str, int32_t znode_version)
for (const auto & [wal_id, index] : processed_logs_indices)
{
logs_indices_object->set(toString(wal_id), index);
}
return JSONUtils::to_string(*root);
}

static SnapshotMetadata deserialize(const String & str, Int32 znode_version)
{
SnapshotMetadata result;
result.znode_version = znode_version;
/// In case of initial snaphot, the content will be empty.

/// In case of initial snapshot, the content will be empty.
if (str.empty())
{
result.is_initial_snaphot = true;
result.is_initial_snapshot = true;
return result;
}
ReadBufferFromString rb(str);
auto json_object = JSONUtils::from_string(str);

readIntTextUnsafe(result.metadata_version, rb);
checkChar(' ', rb);
readStringUntilWhitespace(result.object_storage_key, rb);
checkChar(' ', rb);
readIntTextUnsafe(result.total_size, rb);
result.is_initial_snaphot = false;
result.is_initial_snapshot = false;
result.metadata_version = JSONUtils::getOrThrow<UInt64>(*json_object, "metadata_version");
result.object_storage_key = JSONUtils::getOrThrow<String>(*json_object, "object_storage_key");
result.total_size = JSONUtils::getOrThrow<UInt64>(*json_object, "total_size");
auto logs_indicies_object = json_object->getObject("processed_logs_indices");
if (!logs_indicies_object)
{
throw Exception(ErrorCodes::CORRUPTED_DATA, "Key processed_log_indices is not found in JSON with metadata for snapshot");
}
for (auto it = logs_indicies_object->begin(); it != logs_indicies_object->end(); ++it)
{
result.processed_logs_indices.insert({parse<UUID>(it->first), it->second});
}
return result;
}
};
Expand Down
Loading