Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 159 additions & 17 deletions lib/mu-indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <unordered_map>
#include <unordered_set>
#include <chrono>
#include <string_view>
#include <ranges>
using namespace std::chrono_literals;

#include "mu-store.hh"
Expand All @@ -45,7 +47,8 @@ struct IndexState {
enum State { Idle,
Scanning,
Finishing,
Cleaning
Cleaning,
Aborting,
};
static const char* name(State s) {
switch (s) {
Expand All @@ -57,6 +60,8 @@ struct IndexState {
return "finishing";
case Cleaning:
return "cleaning";
case Aborting:
return "aborting";
default:
return "<error>";
}
Expand Down Expand Up @@ -109,7 +114,8 @@ struct Indexer::Private {

bool add_message(const std::string& path);

bool cleanup();
void cleanup_from_scratch();
void cleanup_incremental();
bool start(const Indexer::Config& conf, bool block);
bool stop();

Expand Down Expand Up @@ -137,10 +143,16 @@ struct Indexer::Private {
Progress progress_{};
IndexState state_{};
std::mutex lock_, w_lock_;
Option<time_t> started_;
std::atomic<time_t> completed_{};
bool was_empty_{};

uint64_t last_index_{};

// pathnames we've seen traversing the maildir hierarchy. entries
// with a trailing slash are maildir directories we've skipped as
// up-to-date.
std::vector<std::string> seen_maildir_paths_;
};

bool
Expand Down Expand Up @@ -168,6 +180,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
htype == Scanner::HandleType::EnterNewCur) {
mu_debug("skip {} (seems up-to-date: {:%FT%T} >= {:%FT%T})",
fullpath, mu_time(dirstamp_), mu_time(statbuf->st_ctime));
seen_maildir_paths_.emplace_back(fullpath + "/");
return false;
}

Expand All @@ -184,6 +197,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
auto noupdate = ::access((fullpath + "/.noupdate").c_str(), F_OK) == 0;
if (noupdate) {
mu_debug("skip {} (has .noupdate)", fullpath);
seen_maildir_paths_.emplace_back(fullpath + "/");
return false;
}
}
Expand All @@ -198,6 +212,8 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,

case Scanner::HandleType::File: {
++progress_.checked;
seen_maildir_paths_.push_back(fullpath);

if (conf_.lazy_check && static_cast<uint64_t>(statbuf->st_ctime) < last_index_) {
// in lazy mode, ignore the file if it has not changed
// since the last indexing op.
Expand Down Expand Up @@ -261,7 +277,7 @@ Indexer::Private::handle_item(WorkItem&& item)
++progress_.updated;
} break;
case WorkItem::Type::Dir:
store_.set_dirstamp(item.full_path, ::time(NULL));
store_.set_dirstamp(item.full_path, started_.value());
break;
default:
g_warn_if_reached();
Expand All @@ -272,12 +288,11 @@ Indexer::Private::handle_item(WorkItem&& item)
}
}

bool
Indexer::Private::cleanup()
void
Indexer::Private::cleanup_from_scratch()
{
mu_debug("starting cleanup");
mu_debug("starting cleanup without using scan results");

size_t n{};
std::vector<Store::Id> orphans; // store messages without files.

using DirFiles = std::unordered_set<std::string>;
Expand Down Expand Up @@ -309,7 +324,6 @@ Indexer::Private::cleanup()
};

store_.for_each_message_path([&](Store::Id id, const std::string& path) {
++n;
if (!is_file_present(path)) {
mu_debug("cannot read {} (id={}); queuing for removal from store",
path, id);
Expand All @@ -326,14 +340,100 @@ Indexer::Private::cleanup()
store_.remove_messages(orphans);
progress_.removed += orphans.size();
}
}

void
Indexer::Private::cleanup_incremental()
{
mu_debug("starting cleanup after scan");

// Sort the seen paths into the same order Xapian will give its terms to us
std::vector<std::string> fs_terms;
fs_terms.reserve(seen_maildir_paths_.size());
for (std::string_view fullpath : seen_maildir_paths_)
fs_terms.emplace_back(field_from_id(Field::Id::Path).xapian_term(fullpath));
std::ranges::sort(fs_terms);

// Discard duplicates from fs_terms in case two paths collided to one term, e.g. over
// case. That shouldn't happen, but be correct-ish if it does.
auto [new_end, old_end] = std::ranges::unique(fs_terms);
if (new_end != old_end) {
mu_warning("collisions under term normalization: using regular cleanup");
cleanup_from_scratch();
return;
}

return true;
fs_terms.erase(new_end, old_end);
seen_maildir_paths_.clear();

// Walk through all the path terms. If the DB has a path term that we didn't see in our
// filesystem walk above, add it the orphans list for removal from the DB. If we were in
// lazy scan mode, we may have skipped some directories entirely: these are represented by
// entries in fs_terms with a trailing slash. When we see one, we deem all DB entries
// that have the skip entry as a prefix as present.

size_t fs_terms_pos = 0;
auto current_fs_term = [&]() -> std::string_view {
if (fs_terms_pos < fs_terms.size())
return fs_terms[fs_terms_pos];
// N.B. '~' compares greater than the start of any field shortcut, so use it as an
// after-the-end sentinel.
return "~";
};

std::vector<std::string> orphan_terms;

auto handle_db_term = [&](std::string_view db_term) {
for (;;) {
std::string_view fs_term = current_fs_term();
bool is_wildcard = fs_term.ends_with('/');
if (is_wildcard && db_term.starts_with(fs_term)) {
return true;
}
int cmp = db_term.compare(fs_term);
if (cmp < 0) {
mu_debug("orphan in db={} but not fs={}", db_term, fs_term);
orphan_terms.emplace_back(db_term);
return true;
}

if (cmp == 0) {
++fs_terms_pos;
return true;
}

++fs_terms_pos;
// FS has an entry not in the DB. If not a directory, we should have
// indexed it.
if (!is_wildcard)
mu_warning("unexpectedly unindexed message: {}", fs_term);
}
};

store_.for_each_term(Field::Id::Path, handle_db_term);

if (orphan_terms.empty())
mu_debug("nothing to clean up");
else {
size_t nr_removed = store_.remove_messages_by_term(
orphan_terms, [&](size_t nr_cleaned_up_so_far) {
progress_.removed = nr_cleaned_up_so_far;
});
mu_debug("removing {} stale message(s) from store", nr_removed);
progress_.removed = nr_removed;
if (nr_removed != orphan_terms.size())
mu_warning("should have removed {} messages but actually removed {}",
orphan_terms.size(), nr_removed);
}
}

void
Indexer::Private::scan_worker()
{
progress_.reset();
seen_maildir_paths_.clear();
started_ = time(NULL);

if (conf_.scan) {
mu_debug("starting scanner");
if (!scanner_.start()) { // blocks.
Expand All @@ -344,21 +444,61 @@ Indexer::Private::scan_worker()
mu_debug("scanner finished");
}

// and let the worker finish their work.
state_.change_to(IndexState::Finishing);
enum class CleanupKind {
None,
FromScratch,
Incremental,
};

CleanupKind cleanup_kind = CleanupKind::Incremental;
bool aborted = state_ == IndexState::Aborting;

if (cleanup_kind >= CleanupKind::None && !conf_.cleanup) {
mu_debug("cleanup: not running as requested");
cleanup_kind = CleanupKind::None;
}

if (cleanup_kind > CleanupKind::None && aborted) {
mu_debug("cleanup: disabling because indexer aborted");
cleanup_kind = CleanupKind::None;
}

if (cleanup_kind >= CleanupKind::Incremental &&
g_getenv("MU_NO_INCREMENTAL_CLEANUP")) {
mu_debug("cleanup: not using incremental: MU_NO_INCREMENTAL_CLEANUP in environ");
cleanup_kind = CleanupKind::FromScratch;
}

if (cleanup_kind >= CleanupKind::Incremental && !conf_.scan) {
mu_debug("cleanup: not using incremental: scan not done");
cleanup_kind = CleanupKind::FromScratch;
}

state_.change_to(IndexState::Cleaning);

switch (cleanup_kind) {
case CleanupKind::None:
break;
case CleanupKind::FromScratch:
cleanup_from_scratch();
break;
case CleanupKind::Incremental:
cleanup_incremental();
break;
}

if (conf_.cleanup) {
mu_debug("starting cleanup");
state_.change_to(IndexState::Cleaning);
cleanup();
mu_debug("cleanup finished");
aborted = state_ == IndexState::Aborting;
if (!aborted) {
// Store started time, not ending time, so that next time we run we know to scan
// anything that appeared during our scan.
store_.config().set<Mu::Config::Id::LastIndex>(started_.value());
}

completed_ = ::time({});
// attempt to commit to disk.
store_.xapian_db().request_commit(true);
store_.config().set<Mu::Config::Id::LastIndex>(completed_);
state_.change_to(IndexState::Idle);
started_ = Nothing;
}

bool
Expand Down Expand Up @@ -399,6 +539,8 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
bool
Indexer::Private::stop()
{
if (state_ != IndexState::Idle)
state_.change_to(IndexState::Aborting);
scanner_.stop();

if (scanner_worker_.joinable())
Expand Down
5 changes: 5 additions & 0 deletions lib/mu-labels-cache.hh
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public:
*/
Result<void> restore(const Store& store);

/**
* @return whether the labels map is non-empty
*/
bool empty() const { return label_map_.empty(); }

private:
/**
* Deserialize the cache into a Map
Expand Down
69 changes: 61 additions & 8 deletions lib/mu-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <chrono>
#include <mutex>
#include <algorithm>
#include <array>
#include <cstdlib>
#include <stdexcept>
Expand Down Expand Up @@ -130,6 +131,9 @@ struct Store::Private {
Option<const std::string&> target_mdir,
Option<Flags> new_flags,
MoveOptions opts);

bool remove_message_by_id_unlocked(Store::Id id);

XapianDb xapian_db_;
Config config_;
ContactsCache contacts_cache_;
Expand Down Expand Up @@ -430,17 +434,66 @@ Store::remove_messages(const std::vector<Store::Id>& ids)

xapian_db().request_transaction();

for (auto&& id : ids) {
if (const auto msg = priv_->find_message_unlocked(id); !msg) {
mu_warning("cannot find document {} for deletion", id);
} else {
for (auto&& label: msg->labels())
priv_->labels_cache_.decrease(label);
xapian_db().delete_document(id);
}
for (auto&& id : ids)
priv_->remove_message_by_id_unlocked(id);

xapian_db().request_commit(true/*force*/);
}

bool
Store::Private::remove_message_by_id_unlocked(Store::Id id)
{
if (auto msg = labels_cache_.empty() ? Nothing : find_message_unlocked(id); msg)
for (auto&& label: msg->labels())
labels_cache_.decrease(label);

if (!xapian_db_.delete_document(id)) {
mu_warning("cannot find document {} for deletion", id);
return false;
}

return true;
}

size_t
Store::remove_messages_by_term(std::span<const std::string> terms,
std::function<void (size_t)> progress_fn)
{
std::unique_lock lock{priv_->lock_};
size_t nr_removed = 0;
std::vector<Xapian::Query> qvec;
std::vector<Store::Id> ids_to_remove;

xapian_db().request_transaction();

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This loop could use a comment, i.e., a few lines on what we're doing here.

while (!terms.empty()) {
auto chunk = terms.subspan(0, std::min<size_t>(terms.size(), 1024));
terms = terms.subspan(chunk.size());
qvec.clear();
qvec.reserve(chunk.size());
std::ranges::copy(chunk, std::back_inserter(qvec));
auto enq = xapian_db().enquire();
enq.set_weighting_scheme(Xapian::BoolWeight()); // No score
enq.set_docid_order(Xapian::Enquire::ASCENDING);
enq.set_query(Xapian::Query{Xapian::Query::OP_OR, qvec.begin(), qvec.end()});
auto mset = enq.get_mset(0, xapian_db().size());
// Work around Xapian MSetIterator trait mismatch:
// https://trac.xapian.org/ticket/850
ids_to_remove.reserve(ids_to_remove.size() + mset.size());
for (auto it = mset.begin(); it != mset.end(); ++it)
ids_to_remove.push_back(*it);
}

// Sort the IDs to remove to make Xapian tree traversal easier
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To what extent are we depending on Xapian implementation details here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Xapian contractually iterates over terms in ascending byte-lexicographic order, so it's only natural to suppose that its storage engines in general will get the best locality accessing terms in this order. We're allowed to delete in any order we want, so if we have to choose an order, this one seems reasonable. There's no functional dependency on Xapian internals --- and sadly, Xapian has no public bulk delete API, AFAIK

https://xapian.org/docs/apidoc/html/classXapian_1_1Database.html#abf8de9d7fe351a347e7fa9af605a71bb

std::ranges::sort(ids_to_remove);
for (Id id : ids_to_remove) {
nr_removed += priv_->remove_message_by_id_unlocked(id);
if (nr_removed % 500 == 0)
progress_fn(nr_removed);
}

xapian_db().request_commit(true/*force*/);
return nr_removed;
}

Option<Message>
Expand Down
Loading
Loading