Skip to content
43 changes: 7 additions & 36 deletions include/neug/storages/csr/immutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <stddef.h>
#include <atomic>
#include <limits>
#include <set>
#include <string>
Expand Down Expand Up @@ -58,27 +59,7 @@ class ImmutableCsr : public TypedCsrBase<EDATA_T> {
: 0;
}

size_t edge_num() const override {
size_t ret = 0;
if (!degree_list_buffer_ || !adj_list_buffer_) {
return 0;
}
const nbr_t** adj_lists_ptr =
reinterpret_cast<const nbr_t**>(adj_list_buffer_->GetData());
const int* degree_list_ptr =
reinterpret_cast<const int*>(degree_list_buffer_->GetData());
auto v_cap = size();
for (size_t i = 0; i < v_cap; ++i) {
auto deg = degree_list_ptr[i];
const nbr_t* begin = adj_lists_ptr[i];
for (size_t j = 0; j < deg; ++j) {
if (begin[j].neighbor != std::numeric_limits<vid_t>::max()) {
ret++;
}
}
}
return ret;
}
size_t edge_num() const override { return edge_num_.load(); }

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override;
Expand Down Expand Up @@ -139,6 +120,7 @@ class ImmutableCsr : public TypedCsrBase<EDATA_T> {
std::unique_ptr<IDataContainer> degree_list_buffer_;
std::unique_ptr<IDataContainer> nbr_list_buffer_;
timestamp_t unsorted_since_;
std::atomic<uint64_t> edge_num_{0};
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
};

template <typename EDATA_T>
Expand Down Expand Up @@ -172,21 +154,7 @@ class SingleImmutableCsr : public TypedCsrBase<EDATA_T> {
: 0;
}

size_t edge_num() const override {
size_t ret = 0;
if (!nbr_list_buffer_) {
return 0;
}
auto v_cap = nbr_list_buffer_->GetDataSize() / sizeof(nbr_t);
const auto* nbr_arr =
reinterpret_cast<const nbr_t*>(nbr_list_buffer_->GetData());
for (size_t i = 0; i < v_cap; ++i) {
if (nbr_arr[i].neighbor != std::numeric_limits<vid_t>::max()) {
++ret;
}
}
return ret;
}
size_t edge_num() const override { return edge_num_.load(); }

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override;
Expand Down Expand Up @@ -236,7 +204,10 @@ class SingleImmutableCsr : public TypedCsrBase<EDATA_T> {
}

private:
void load_meta(const std::string& prefix);
void dump_meta(const std::string& prefix) const;
std::unique_ptr<IDataContainer> nbr_list_buffer_;
std::atomic<uint64_t> edge_num_{0};
};

} // namespace neug
36 changes: 9 additions & 27 deletions include/neug/storages/csr/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,7 @@ class MutableCsr : public TypedCsrBase<EDATA_T> {

size_t size() const override { return vertex_capacity(); }

size_t edge_num() const override {
size_t res = 0;
auto* adj_lists =
reinterpret_cast<const nbr_t* const*>(adj_list_buffer_->GetData());
auto* degrees =
reinterpret_cast<const std::atomic<int>*>(adj_list_size_->GetData());
for (size_t i = 0; i < vertex_capacity(); ++i) {
auto begin = adj_lists[i];
for (int j = 0; j < degrees[i].load(); ++j) {
if (begin[j].timestamp.load() !=
std::numeric_limits<timestamp_t>::max()) {
res++;
}
}
}
return res;
}
size_t edge_num() const override { return edge_num_.load(); }

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override;
Expand Down Expand Up @@ -156,6 +140,7 @@ class MutableCsr : public TypedCsrBase<EDATA_T> {
nbr.neighbor = dst;
nbr.data = data;
nbr.timestamp.store(ts);
edge_num_.fetch_add(1);
locks_[src].unlock();
return prev_size;
}
Expand Down Expand Up @@ -208,6 +193,7 @@ class MutableCsr : public TypedCsrBase<EDATA_T> {
std::unique_ptr<IDataContainer> adj_list_capacity_;
std::unique_ptr<IDataContainer> nbr_list_;
timestamp_t unsorted_since_;
std::atomic<uint64_t> edge_num_{0};

size_t vertex_capacity() const {
if (!adj_list_size_) {
Expand Down Expand Up @@ -243,16 +229,7 @@ class SingleMutableCsr : public TypedCsrBase<EDATA_T> {

size_t size() const override { return vertex_capacity(); }

size_t edge_num() const override {
size_t cnt = 0;
auto* nbrs = reinterpret_cast<const nbr_t*>(nbr_list_->GetData());
for (size_t i = 0; i < vertex_capacity(); ++i) {
if (nbrs[i].timestamp.load() != std::numeric_limits<timestamp_t>::max()) {
cnt++;
}
}
return cnt;
}
size_t edge_num() const override { return edge_num_.load(); }
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

void open(const std::string& name, const std::string& snapshot_dir,
const std::string& work_dir) override;
Expand Down Expand Up @@ -307,6 +284,7 @@ class SingleMutableCsr : public TypedCsrBase<EDATA_T> {
nbrs[src].data = data;
CHECK_EQ(nbrs[src].timestamp, std::numeric_limits<timestamp_t>::max());
nbrs[src].timestamp.store(ts);
edge_num_.fetch_add(1, std::memory_order_relaxed);
return 0;
}

Expand All @@ -317,7 +295,11 @@ class SingleMutableCsr : public TypedCsrBase<EDATA_T> {
}

private:
void load_meta(const std::string& prefix);
void dump_meta(const std::string& prefix) const;

std::unique_ptr<IDataContainer> nbr_list_;
std::atomic<uint64_t> edge_num_{0};

size_t vertex_capacity() const {
if (!nbr_list_) {
Expand Down
Loading
Loading