diff --git a/network-tests/test_subaccount_auth.py b/network-tests/test_subaccount_auth.py index ce0a0abe6..4daf5c09f 100644 --- a/network-tests/test_subaccount_auth.py +++ b/network-tests/test_subaccount_auth.py @@ -336,7 +336,7 @@ def test_revoke_subaccount(omq, random_sn, sk, exclude): f"revoked_subaccounts{ts}".encode(), encoder=Base64Encoder ).signature.decode(), } - ).encode(), + ).encode() ], ).get() assert len(r) == 1 @@ -408,7 +408,7 @@ def test_revoke_subaccount(omq, random_sn, sk, exclude): f"revoked_subaccounts{ts}".encode(), encoder=Base64Encoder ).signature.decode(), } - ).encode(), + ).encode() ], ).get() assert len(r) == 1 @@ -416,7 +416,6 @@ def test_revoke_subaccount(omq, random_sn, sk, exclude): assert len(r["revoked_subaccounts"]) == 1 assert r["revoked_subaccounts"][0] == b64(dude_token) - # But the one in the revoked-keys-allowed namespace should work: r = omq.request_future( conn, @@ -511,7 +510,7 @@ def test_revoke_subaccount(omq, random_sn, sk, exclude): f"revoked_subaccounts{ts}".encode(), encoder=Base64Encoder ).signature.decode(), } - ).encode(), + ).encode() ], ).get() assert len(r) == 1 diff --git a/oxenss/common/pubkey.cpp b/oxenss/common/pubkey.cpp index e433b2b5c..db1c2311d 100644 --- a/oxenss/common/pubkey.cpp +++ b/oxenss/common/pubkey.cpp @@ -1,11 +1,23 @@ #include "pubkey.h" #include "mainnet.h" +#include "oxenc/endian.h" #include #include #include namespace oxenss { +uint64_t pubkey_to_swarm_space(const user_pubkey& pk) { + const auto bytes = pk.raw(); + assert(bytes.size() == 32); + + uint64_t res = 0; + for (size_t i = 0; i < bytes.size(); i += 8) + res ^= oxenc::load_big_to_host(bytes.data() + i); + + return res; +} + user_pubkey& user_pubkey::load(std::string_view pk) { if (pk.size() == USER_PUBKEY_SIZE_HEX && oxenc::is_hex(pk)) { uint8_t netid; diff --git a/oxenss/common/pubkey.h b/oxenss/common/pubkey.h index af59b8b6e..e2b19eb98 100644 --- a/oxenss/common/pubkey.h +++ b/oxenss/common/pubkey.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace oxenss { @@ -13,14 +14,14 @@ class user_pubkey { int network_ = -1; std::string pubkey_; - user_pubkey(int network, std::string raw_pk) : network_{network}, pubkey_{std::move(raw_pk)} {} - friend class DatabaseImpl; public: // Default constructor; constructs an invalid pubkey user_pubkey() = default; + user_pubkey(int network, std::string raw_pk) : network_{network}, pubkey_{std::move(raw_pk)} {} + // bool conversion: returns true if this object contains a valid pubkey explicit operator bool() const { return !pubkey_.empty(); } @@ -57,6 +58,10 @@ class user_pubkey { std::string prefixed_raw() const; }; +/// Maps a pubkey into a 64-bit "swarm space" value; the swarm you belong to is whichever one +/// has a swarm id closest to this pubkey-derived value. +uint64_t pubkey_to_swarm_space(const user_pubkey& pk); + } // namespace oxenss namespace std { diff --git a/oxenss/crypto/keys.h b/oxenss/crypto/keys.h index bd0c073f0..d383eb8cd 100644 --- a/oxenss/crypto/keys.h +++ b/oxenss/crypto/keys.h @@ -3,7 +3,6 @@ #include #include #include -#include #include #include diff --git a/oxenss/daemon/command_line.cpp b/oxenss/daemon/command_line.cpp index 540008988..e43896eba 100644 --- a/oxenss/daemon/command_line.cpp +++ b/oxenss/daemon/command_line.cpp @@ -175,6 +175,11 @@ parse_result parse_cli_args(int argc, char* argv[]) { "--force-start", options.force_start, "Ignore the initialisation ready check (primarily for debugging)."); + cli.add_flag( + "--skip-bootstrap-nodes", + options.skip_bootstrap, + "Skip the contacting of bootstrap seed nodes on startup (primarily for private node " + "networks)"); cli.add_option( "--stats-access-key", options.stats_access_keys, diff --git a/oxenss/daemon/command_line.h b/oxenss/daemon/command_line.h index 4dd64b089..2f3285833 100644 --- a/oxenss/daemon/command_line.h +++ b/oxenss/daemon/command_line.h @@ -12,6 +12,7 @@ struct command_line_options { uint16_t https_port = 22021; uint16_t omq_quic_port = 22020; std::string oxend_omq_rpc; // Defaults to ipc://$HOME/.oxen/[testnet/]oxend.sock + bool skip_bootstrap = false; bool force_start = false; bool testnet = false; std::string log_level = "info"; diff --git a/oxenss/daemon/oxen-storage.cpp b/oxenss/daemon/oxen-storage.cpp index 6318e1fe5..370880194 100644 --- a/oxenss/daemon/oxen-storage.cpp +++ b/oxenss/daemon/oxen-storage.cpp @@ -153,7 +153,12 @@ int main(int argc, char* argv[]) { auto& oxenmq_server = *oxenmq_server_ptr; snode::ServiceNode service_node{ - l_keys, me, oxenmq_server, options.data_dir, options.force_start}; + l_keys, + me, + oxenmq_server, + options.data_dir, + options.force_start, + options.skip_bootstrap}; rpc::RequestHandler request_handler{service_node, channel_encryption, ed_keys.sec}; diff --git a/oxenss/rpc/client_rpc_endpoints.h b/oxenss/rpc/client_rpc_endpoints.h index cc739c4fb..b5fd87936 100644 --- a/oxenss/rpc/client_rpc_endpoints.h +++ b/oxenss/rpc/client_rpc_endpoints.h @@ -541,9 +541,9 @@ struct delete_before final : recursive { }; /// Updates (shortens) the expiry of all stored messages, and broadcasts the update request to all -/// other swarm members. Note that this will not extend existing expiries, it will only shorten the -/// expiry of any messages that have expiries after the requested value. (To extend expiries of one -/// or more individual messages use the `expire` endpoint). +/// other swarm members. Note that this will not extend existing expiries, it will only shorten the +/// expiry of any messages that have expiries after the requested value. (To extend expiries of one +/// or more individual messages use the `expire_msgs` endpoint). /// /// Takes parameters of: /// - pubkey -- the pubkey whose messages shall have their expiries reduced, in hex (66) or bytes @@ -625,8 +625,8 @@ struct expire_all final : recursive { /// ("expire" || ShortenOrExtend || expiry || messages[0] || ... || messages[N]) /// where `expiry` is the expiry timestamp expressed as a string, for a single expiry, or the /// expiries concatenated together (expiry[0] || expiry[1] || ...) for multiple expiries. -/// `ShortenOrExtend` is string "shorten" if the shorten option is given (and true), "extend" if -/// `extend` is true, and empty otherwise. The signature must be base64 encoded (json) or bytes +/// `ShortenOrExtend` is the string "shorten" if the shorten option is given (and true), "extend" +/// if `extend` is true, and empty otherwise. The signature must be base64 encoded (json) or bytes /// (bt). /// /// Returns dict of: diff --git a/oxenss/rpc/request_handler.cpp b/oxenss/rpc/request_handler.cpp index fdd0d8af6..a83ad5ffa 100644 --- a/oxenss/rpc/request_handler.cpp +++ b/oxenss/rpc/request_handler.cpp @@ -403,13 +403,17 @@ struct swarm_response { bool b64; nlohmann::json result; std::function cb; + std::string cmd; + std::string req_payload; + std::chrono::system_clock::time_point expiry; + int64_t db_req_id{0}; }; // Replies to a swarm request via its callback; sends an http::OK unless all of the // swarm entries returned things with "failed" in them or in the case of a non-recursive request, // the top-level object has a "failed" in it then we send back an INTERNAL_SERVER_ERROR // along with the response. -void reply_or_fail(const std::shared_ptr& res) { +static void reply_or_fail(const std::shared_ptr& res) { auto res_code = http::INTERNAL_SERVER_ERROR; if (auto swarm_obj = res->result.find("swarm"); swarm_obj != res->result.end()) { for (const auto& [sn_pkey, obj] : swarm_obj->items()) { @@ -425,49 +429,65 @@ void reply_or_fail(const std::shared_ptr& res) { res->cb(Response{res_code, std::move(res->result)}); } -static void distribute_command( - snode::ServiceNode& sn, - std::shared_ptr& res, - std::string_view cmd, - const rpc::recursive& req) { +SNStorageCCResult interpret_sn_storage_cc_response_parts( + bool success, std::span parts) { + bool good_result = success && parts.size() == 1; + SNStorageCCResult result = {}; + if (good_result) { + result.status = SNStorageCCResultStatus::Good; + } else { + bool timeout = !success; + if (timeout) { + result.status = SNStorageCCResultStatus::Timeout; + } else if (parts.size() == 2) { + result.status = SNStorageCCResultStatus::ErrorCodeReason; + result.error_code = parts[0]; + result.error_reason = parts[1]; + } else { + result.status = SNStorageCCResultStatus::BadPeerResponse; + } + } + return result; +} + +static void distribute_command(snode::ServiceNode& sn, std::shared_ptr& res) { auto peers = sn.swarm().peers(); res->pending += peers.size(); for (auto& peer : peers) { - auto ct = sn.contacts().find(peer); + auto ct = sn.contacts().find(peer.first); if (!ct || !*ct) { log::debug( logcat, "Not distributing {} to swarm peer {}: SN {}", - cmd, - peer, + res->cmd, + peer.first, ct ? "is non-contactable" : "not found"); res->pending--; + + res->db_req_id = sn.db->add_retry_request( + peer.first, res->cmd, res->req_payload, res->db_req_id); continue; } + sn.omq_server()->request( ct->pubkey_x25519.view(), "sn.storage_cc", - [res, peer, peer_ed = ct->pubkey_ed25519, cmd](bool success, auto parts) { + [res, peer, peer_ed = ct->pubkey_ed25519, &sn](bool success, auto parts) { json peer_result; - if (!success) - log::warning( - logcat, - "Response timeout from {} for forwarded command {}", - peer, - cmd); - bool good_result = success && parts.size() == 1; - if (good_result) { + SNStorageCCResult store_result = + interpret_sn_storage_cc_response_parts(success, parts); + if (store_result.status == SNStorageCCResultStatus::Good) { try { peer_result = bt_to_json(oxenc::bt_dict_consumer{parts[0]}); } catch (const std::exception& e) { log::warning( logcat, "Received unparsable response to {} from {}: {}", - cmd, - peer, + res->cmd, + peer.first, e.what()); - good_result = false; + store_result.status = SNStorageCCResultStatus::BadPeerResponse; } } @@ -476,15 +496,31 @@ static void distribute_command( // If we're the last response then we reply: bool send_reply = --res->pending == 0; - if (!good_result) { + if (store_result.status != SNStorageCCResultStatus::Good) { peer_result = json{{"failed", true}}; - if (!success) + bool timeout = store_result.status == SNStorageCCResultStatus::Timeout; + if (timeout) { peer_result["timeout"] = true; - else if (parts.size() == 2) { - peer_result["code"] = parts[0]; - peer_result["reason"] = parts[1]; - } else + } else if ( + store_result.status == SNStorageCCResultStatus::ErrorCodeReason) { + peer_result["code"] = store_result.error_code; + peer_result["reason"] = store_result.error_reason; + } else { peer_result["bad_peer_response"] = true; + } + + log::debug( + logcat, + "Failure response from {} for forwarded command {} ({}): <{}>", + peer.first, + res->cmd, + timeout ? "will be retried" : "unretryable due to error", + peer_result.dump()); + + if (timeout) { + res->db_req_id = sn.db->add_retry_request( + peer.first, res->cmd, res->req_payload, res->db_req_id); + } } else if (res->b64) { if (auto it = peer_result.find("signature"); it != peer_result.end() && it->is_string()) @@ -492,12 +528,11 @@ static void distribute_command( } res->result["swarm"][peer_ed.hex()] = std::move(peer_result); - if (send_reply) reply_or_fail(res); }, - cmd, - bt_serialize(req.to_bt()), + res->cmd, + res->req_payload, oxenmq::send_option::request_timeout{5s}); } } @@ -509,11 +544,13 @@ std::pair, std::unique_lock> static res->cb = std::move(cb); res->pending = 1; res->b64 = req.b64; + res->cmd = RPC::names()[0]; + res->req_payload = bt_serialize(req.to_bt()); std::unique_lock lock{res->mutex, std::defer_lock}; if (req.recurse) { // Send it off to our peers right away, before we process it ourselves - distribute_command(sn, res, RPC::names()[0], req); + distribute_command(sn, res); lock.lock(); } return {std::move(res), std::move(lock)}; @@ -565,7 +602,7 @@ void RequestHandler::process_client_req(rpc::store&& req, std::function msgs; bool more = false; try { - std::tie(msgs, more) = service_node_.get_db().retrieve( + std::tie(msgs, more) = service_node_.db->retrieve( req.pubkey, req.msg_namespace, req.last_hash.value_or(""), @@ -870,7 +907,7 @@ void RequestHandler::process_client_req( Response{http::NOT_ACCEPTABLE, "delete_all timestamp too far from current time"sv}); } if (!verify_signature( - service_node_.get_db(), + *service_node_.db, req.pubkey, req.pubkey_ed25519, req.subaccount, @@ -896,7 +933,7 @@ void RequestHandler::process_client_req( handle_action_all_ns( mine, "deleted", - service_node_.get_db().delete_all(req.pubkey), + service_node_.db->delete_all(req.pubkey), req.b64, ed25519_sk_, req.pubkey.prefixed_hex(), @@ -906,8 +943,7 @@ void RequestHandler::process_client_req( handle_action_one_ns( mine, "deleted", - service_node_.get_db().delete_all( - req.pubkey, std::get(req.msg_namespace)), + service_node_.db->delete_all(req.pubkey, std::get(req.msg_namespace)), req.b64, ed25519_sk_, req.pubkey.prefixed_hex(), @@ -928,7 +964,7 @@ void RequestHandler::process_client_req(rpc::delete_msgs&& req, std::functionresult["swarm"][service_node_.own_address().pubkey_ed25519.hex()] : res->result; - auto deleted = service_node_.get_db().delete_by_hash(req.pubkey, req.messages); + auto deleted = service_node_.db->delete_by_hash(req.pubkey, req.messages); std::sort(deleted.begin(), deleted.end()); auto sig = create_signature(ed25519_sk_, req.pubkey.prefixed_hex(), req.messages, deleted); mine["deleted"] = std::move(deleted); @@ -1002,7 +1038,7 @@ void RequestHandler::process_client_req( } if (!verify_signature( - service_node_.get_db(), + *service_node_.db, req.pubkey, req.pubkey_ed25519, std::nullopt, // no subaccount allowed @@ -1024,7 +1060,7 @@ void RequestHandler::process_client_req( ? res->result["swarm"][service_node_.own_address().pubkey_ed25519.hex()] : res->result; - service_node_.get_db().revoke_subaccounts(req.pubkey, req.revoke); + service_node_.db->revoke_subaccounts(req.pubkey, req.revoke); auto sig = create_signature(ed25519_sk_, req.pubkey.prefixed_hex(), req.timestamp, req.revoke); mine["signature"] = req.b64 ? oxenc::to_base64(sig.begin(), sig.end()) : util::view_guts(sig); if (req.recurse) @@ -1055,7 +1091,7 @@ void RequestHandler::process_client_req( } if (!verify_signature( - service_node_.get_db(), + *service_node_.db, req.pubkey, req.pubkey_ed25519, std::nullopt, // no subaccount allowed @@ -1077,7 +1113,7 @@ void RequestHandler::process_client_req( ? res->result["swarm"][service_node_.own_address().pubkey_ed25519.hex()] : res->result; - mine["count"] = service_node_.get_db().unrevoke_subaccounts(req.pubkey, req.unrevoke); + mine["count"] = service_node_.db->unrevoke_subaccounts(req.pubkey, req.unrevoke); auto sig = create_signature(ed25519_sk_, req.pubkey.prefixed_hex(), req.timestamp, req.unrevoke); mine["signature"] = req.b64 ? oxenc::to_base64(sig.begin(), sig.end()) : util::view_guts(sig); @@ -1106,7 +1142,7 @@ void RequestHandler::process_client_req( } if (!verify_signature( - service_node_.get_db(), + *service_node_.db, req.pubkey, std::nullopt, std::nullopt, // no subaccount allowed @@ -1122,7 +1158,7 @@ void RequestHandler::process_client_req( std::vector revoked_subaccounts; try { - revoked_subaccounts = service_node_.get_db().revoked_subaccounts(req.pubkey); + revoked_subaccounts = service_node_.db->revoked_subaccounts(req.pubkey); } catch (const std::exception& e) { auto msg = fmt::format( "Internal Server Error. Could not retrieve revoked_subaccounts for {}", @@ -1165,7 +1201,7 @@ void RequestHandler::process_client_req( } if (!verify_signature( - service_node_.get_db(), + *service_node_.db, req.pubkey, req.pubkey_ed25519, req.subaccount, @@ -1191,7 +1227,7 @@ void RequestHandler::process_client_req( handle_action_all_ns( mine, "deleted", - service_node_.get_db().delete_by_timestamp(req.pubkey, req.before), + service_node_.db->delete_by_timestamp(req.pubkey, req.before), req.b64, ed25519_sk_, req.pubkey.prefixed_hex(), @@ -1201,7 +1237,7 @@ void RequestHandler::process_client_req( handle_action_one_ns( mine, "deleted", - service_node_.get_db().delete_by_timestamp( + service_node_.db->delete_by_timestamp( req.pubkey, std::get(req.msg_namespace), req.before), req.b64, ed25519_sk_, @@ -1232,7 +1268,7 @@ void RequestHandler::process_client_req(rpc::expire_all&& req, std::functionupdate_all_expiries(req.pubkey, req.expiry), req.b64, ed25519_sk_, req.pubkey.prefixed_hex(), @@ -1267,7 +1303,7 @@ void RequestHandler::process_client_req(rpc::expire_all&& req, std::functionupdate_all_expiries( req.pubkey, std::get(req.msg_namespace), req.expiry), req.b64, ed25519_sk_, @@ -1309,7 +1345,7 @@ void RequestHandler::process_client_req(rpc::expire_msgs&& req, std::functionresult["swarm"][service_node_.own_address().pubkey_ed25519.hex()] : res->result; - auto updated = service_node_.get_db().update_expiry( + auto updated = service_node_.db->update_expiry( req.pubkey, req.messages, expiry, @@ -1373,7 +1409,7 @@ void RequestHandler::process_client_req(rpc::expire_msgs&& req, std::functionget_expiries(req.pubkey, unchanged_hashes); } std::vector updated_hash; @@ -1439,7 +1475,7 @@ void RequestHandler::process_client_req(rpc::get_expiries&& req, std::functionget_expiries(req.pubkey, req.messages); return cb(Response{http::OK, std::move(res)}); } @@ -1657,7 +1693,7 @@ void RequestHandler::process_client_req( Response RequestHandler::process_retrieve_all() { std::vector msgs; try { - msgs = service_node_.get_db().retrieve_all(); + msgs = service_node_.db->retrieve_all(); } catch (const std::exception& e) { return {http::INTERNAL_SERVER_ERROR, "could not retrieve all messages"s}; } diff --git a/oxenss/rpc/request_handler.h b/oxenss/rpc/request_handler.h index dac913c5c..2eeff3a78 100644 --- a/oxenss/rpc/request_handler.h +++ b/oxenss/rpc/request_handler.h @@ -22,13 +22,6 @@ namespace oxenss::rpc { -// When a storage test returns a "retry" response, we retry again after this interval: -inline constexpr auto TEST_RETRY_INTERVAL = 50ms; - -// If a storage test is still returning "retry" after this long since the initial request then -// we give up and send an error response back to the requestor: -inline constexpr auto TEST_RETRY_PERIOD = 55s; - // Minimum and maximum TTL permitted for storing a new, public message inline constexpr auto TTL_MINIMUM = 10s; inline constexpr auto TTL_MAXIMUM = 14 * 24h; @@ -84,6 +77,21 @@ struct Response { status{status}, body{binary_response}, keepalive{keepalive} {} }; +enum class SNStorageCCResultStatus { + Good, + Timeout, + ErrorCodeReason, + BadPeerResponse, +}; + +// Helper struct that stores the decoded response of a 'sn.storage_cc' request to a storage server +// and consequently the possible replies/states that can be returned from this operation. +struct SNStorageCCResult { + SNStorageCCResultStatus status = {}; + std::string_view error_code; + std::string_view error_reason; +}; + // Views the string or string_view body inside a Response. Should only be called when the body // has already been verified to not contain a json object or binary blob. inline std::string_view view_body(const Response& r) { @@ -148,6 +156,11 @@ std::string compute_hash(Func hasher, const T&... args) { /// Computes a message hash using blake2b hash of various messages attributes. std::string computeMessageHash(const user_pubkey& pubkey, namespace_id ns, std::string_view data); +/// Interpret the result an OMQ request to the 'sn.storage_cc' endpoint, typically for recursive +/// swarm requests. +SNStorageCCResult interpret_sn_storage_cc_response_parts( + bool success, std::span parts); + struct OnionRequestMetadata { crypto::x25519_pubkey ephem_key; std::function cb; diff --git a/oxenss/server/omq.cpp b/oxenss/server/omq.cpp index 182612937..975e29c76 100644 --- a/oxenss/server/omq.cpp +++ b/oxenss/server/omq.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -45,7 +46,11 @@ std::string OMQ::peer_lookup(std::string_view pubkey_bin) const { } void OMQ::handle_sn_data_ready(oxenmq::Message& message) { - log::debug(logcat, "[OMQ] handle sn.data_ready from: {}", message.conn.to_string()); + log::debug( + logcat, + "[OMQ] handle sn.data_ready from: {} (parts {})", + message.conn.to_string(), + message.data.size()); auto& xpk_str = message.conn.pubkey(); if (xpk_str.size() != sizeof(crypto::x25519_pubkey)) @@ -56,6 +61,34 @@ void OMQ::handle_sn_data_ready(oxenmq::Message& message) { if (!service_node_->is_swarm_peer(xpk)) return message.send_reply("Swarm mismatch"); + std::optional ct = service_node_->contacts().find(xpk); + if (!ct) + return message.send_reply("Contact info missing"); + + if (ct->version >= snode::SN_DATA_READY_WITH_REQUEST_VERSION) { + if (message.data.empty()) + return message.send_reply("Request payload missing"); + + bool needs_db_dump{false}; + try { + needs_db_dump = snode::deserialise_data_ready_request(message.data[0]); + } catch (const std::exception& e) { + log::info(logcat, "DataReadyRequest deserialization error: {}", e.what()); + return message.send_reply("Request payload malformed."); + } + + if (needs_db_dump) + service_node_->set_member_needs_db_dump(crypto::legacy_pubkey{ct->pubkey_ed25519}); + + if (log::get_level(logcat) <= log::Level::debug) { + log::debug( + logcat, + "sn.data ready processed (edpk: {}, needs db dump: {})", + ct->pubkey_ed25519, + needs_db_dump); + } + } + message.send_reply("OK"); } @@ -264,6 +297,9 @@ OMQ::OMQ( log::debug(logcat,"Received new snode address info from oxend for {}", pk.hex()); service_node_->contacts().update(pk, c); + + // Wake up thread incase there are retryable requests blocked on missing contact info + service_node_->retryable_requests_cv.notify_all(); } catch (const std::exception& e) { log::error(logcat, "Received invalid snode address update from oxend: {}", e.what()); } diff --git a/oxenss/server/omq.h b/oxenss/server/omq.h index 8c6fa1b49..2385cf784 100644 --- a/oxenss/server/omq.h +++ b/oxenss/server/omq.h @@ -27,7 +27,6 @@ namespace snode { } // namespace oxenss namespace oxenss::server { - class OMQ : public MQBase { oxenmq::OxenMQ omq_; oxenmq::ConnectionID oxend_conn_; diff --git a/oxenss/snode/network.cpp b/oxenss/snode/network.cpp index d230ce26f..11ad3b2d0 100644 --- a/oxenss/snode/network.cpp +++ b/oxenss/snode/network.cpp @@ -11,15 +11,37 @@ namespace oxenss::snode { Network::Network(oxenmq::OxenMQ& omq) : contacts{omq} {} -uint64_t Network::pubkey_to_swarm_space(const user_pubkey& pk) { - const auto bytes = pk.raw(); - assert(bytes.size() == 32); - - uint64_t res = 0; - for (size_t i = 0; i < bytes.size(); i += 8) - res ^= oxenc::load_big_to_host(bytes.data() + i); +std::pair Network::get_swarm_boundaries(const uint64_t swarm) const { + if (swarms_.size() <= 1) + return {0, 0}; + + const auto it = swarms_.find(swarm); + if (it == swarms_.end()) + throw std::logic_error{"This function should only be called with a current swarm id."}; + + // FIXME: this logic is a little weird, but should work. + uint64_t prev_swarm, next_swarm; + if (it == swarms_.begin()) { + next_swarm = std::next(it)->first; + prev_swarm = std::prev(swarms_.end())->first; + } else { + prev_swarm = std::prev(it)->first; + auto it2 = std::next(it); + if (it2 == swarms_.end()) + it2 = swarms_.begin(); + next_swarm = it2->first; + } - return res; + // now have target swarm id, the one before it, and the one after it + // + // in the event of a distance tie in swarm space (e.g. id 1 and 7 with swarm space 4), + // the "right" (next) swarm loses. This means when querying with what we return here, + // we should do x > lower_bound AND x <= upper_bound + auto left_diff = swarm - prev_swarm; + if (left_diff % 2) + left_diff += 1; // round the average up on the left side + auto right_diff = next_swarm - swarm; + return {swarm - (left_diff / 2), swarm + (right_diff / 2)}; } swarms_t::const_iterator Network::_find_swarm_for(const user_pubkey& pk) const { @@ -29,6 +51,10 @@ swarms_t::const_iterator Network::_find_swarm_for(const user_pubkey& pk) const { return swarms_.begin(); const uint64_t swarm_pos = pubkey_to_swarm_space(pk); + return _find_swarm_for_swarm_space(swarm_pos); +} + +swarms_t::const_iterator Network::_find_swarm_for_swarm_space(const swarm_id_t swarm_pos) const { // Find the right boundary, i.e. first swarm with swarm_id >= res auto right_it = swarms_.lower_bound(swarm_pos); @@ -153,4 +179,12 @@ std::shared_ptr> Network::all_nodes_blob() const { return blob; } +std::set Network::get_all_swarm_ids() const { + std::set ret; + + for (const auto& [id, swarm] : swarms_) + ret.emplace(id); + return ret; +} + } // namespace oxenss::snode diff --git a/oxenss/snode/network.h b/oxenss/snode/network.h index 4d0cfe18a..58dcc5972 100644 --- a/oxenss/snode/network.h +++ b/oxenss/snode/network.h @@ -37,7 +37,7 @@ class Network { friend class Swarm; - swarms_t::const_iterator _find_swarm_for(const user_pubkey& pk) const; + friend class ServiceNode; // Cached value of the all_nodes_blob() return value. The cache is cleared whenever swarms or // any contact info changes. @@ -53,6 +53,10 @@ class Network { swarms_t&& new_swarms, const std::map& new_contacts); public: + std::pair get_swarm_boundaries(const uint64_t swarm) const; + swarms_t::const_iterator _find_swarm_for(const user_pubkey& pk) const; + swarms_t::const_iterator _find_swarm_for_swarm_space(const swarm_id_t swarm_pos) const; + /// Constructs a Network object. The omq instance will be passed to `contacts` so that any /// x25519 pubkey list changes are automatically propagated to oxenmq for SN authentication. Network(oxenmq::OxenMQ& omq); @@ -60,10 +64,6 @@ class Network { // Holds all current contact information for network nodes. Contacts contacts; - /// Maps a pubkey into a 64-bit "swarm space" value; the swarm you belong to is whichever one - /// has a swarm id closest to this pubkey-derived value. - static uint64_t pubkey_to_swarm_space(const user_pubkey& pk); - // Looks up the swarm for a pubkey and returns the swarm_id. Returns nullopt on error (which // will only happen if there are no swarms at all). std::optional get_swarm_id_for(const user_pubkey& pk) const; @@ -97,6 +97,8 @@ class Network { // This value is cached and recomputed whenever swarms or contact info of any active node // changes. std::shared_ptr> all_nodes_blob() const; + + std::set get_all_swarm_ids() const; }; } // namespace oxenss::snode diff --git a/oxenss/snode/reachability_testing.h b/oxenss/snode/reachability_testing.h index 9a24165d5..eb958f8bb 100644 --- a/oxenss/snode/reachability_testing.h +++ b/oxenss/snode/reachability_testing.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace oxenss::snode { diff --git a/oxenss/snode/service_node.cpp b/oxenss/snode/service_node.cpp index 8cb8d1990..b1377e93d 100644 --- a/oxenss/snode/service_node.cpp +++ b/oxenss/snode/service_node.cpp @@ -2,6 +2,7 @@ #include "serialization.h" #include "sn_test.h" +#include #include #include #include @@ -43,29 +44,73 @@ using MISSING_PUBKEY_THRESHOLD = std::ratio<3, 100>; /// TODO: there should be config.h to store constants like these constexpr auto OXEND_PING_INTERVAL = 30s; +// How often to trigger 'check_new_members' which checks for 'data ready' handshakes from +// swarm members and propagate a DB dump if necessary. constexpr auto NEW_SWARM_MEMBER_INTERVAL = 10s; +// TODO: if these *are* going to be named constants rather than just existing in 2 places +// (where this is serialized and where it is deserialized), they should live in the header +// or something. +namespace data_ready_req { + constexpr std::string_view VERSION_KEY = "@"; + constexpr std::string_view STATUS_KEY = "s"; + constexpr std::string_view NEED_DB_DUMP_KEY = "t"; +} // namespace data_ready_req + +std::string serialise_data_ready_request(bool needs_db_dump) { + uint32_t version = 0; + static_assert(data_ready_req::VERSION_KEY < data_ready_req::STATUS_KEY); + static_assert(data_ready_req::STATUS_KEY < data_ready_req::NEED_DB_DUMP_KEY); + + oxenc::bt_dict_producer d; + d.append(data_ready_req::VERSION_KEY, version); + d.append(data_ready_req::NEED_DB_DUMP_KEY, needs_db_dump); + return std::move(d).str(); +} + +bool deserialise_data_ready_request(std::string_view data) { + oxenc::bt_dict_consumer d{data}; + [[maybe_unused]] auto version = d.require(data_ready_req::VERSION_KEY); + return d.require(data_ready_req::NEED_DB_DUMP_KEY); +} + ServiceNode::ServiceNode( const crypto::legacy_keypair& keys, const contact& contact, server::OMQ& omq_server, const std::filesystem::path& db_location, - const bool force_start) : + bool force_start, + bool skip_bootstrap) : force_start_{force_start}, - db_{std::make_unique(db_location)}, + skip_bootstrap_{skip_bootstrap}, + db{std::make_unique(db_location)}, our_keys_{keys}, our_contact_{contact}, network_{*omq_server}, + swarm_{network_, our_keys_.pub, *db}, omq_server_{omq_server}, all_stats_{*omq_server} { mq_servers_.push_back(&omq_server); - log::info(logcat, "Requesting initial swarm state"); + if (auto id = db->get_current_swarm()) { + swarm_.cur_swarm_id_ = *id; + } + + // Check if the DB was empty and remember if so for later when talking to swarm members on + // handshake that we need to request a DB dump from them to populate our DB. In the edge case + // where there _are_ 0 messages, this will request a DB dump of 0 messages and essentially + // no-op. + if (db->get_message_count() == 0) { + // The 'cur_swarm_id' might be INVALID_SWARM_ID. This will be the case if the DB was deletd + // (and so the blobs storing our swarms were also deleted). The swarm is then + // bootstrapped to a proper swarm when we process the first handshake from a swarm member. + swarm_.db_was_initially_empty_with_swarm_id = swarm_.cur_swarm_id_; + } omq_server->add_timer( [this] { std::lock_guard l{sn_mutex_}; - db_->clean_expired(); + db->clean_expired(); }, Database::CLEANUP_PERIOD); @@ -86,6 +131,10 @@ ServiceNode::ServiceNode( syncing_ = false; }, 1h); + + // Setup the retryable requests thread + retryable_requests_thread = + std::thread(&ServiceNode::retryable_requests_thread_entry_point, this); } void ServiceNode::on_oxend_connected() { @@ -335,6 +384,8 @@ void ServiceNode::bootstrap_fallback() { void ServiceNode::shutdown() { shutting_down_ = true; + retryable_requests_cv.notify_all(); + retryable_requests_thread.join(); } bool ServiceNode::snode_ready(std::string* reason) { @@ -362,7 +413,7 @@ bool ServiceNode::snode_ready(std::string* reason) { return problems.empty() || force_start_; } -bool ServiceNode::is_swarm_peer(const crypto::x25519_pubkey& xpk) { +std::optional ServiceNode::is_swarm_peer(const crypto::x25519_pubkey& xpk) { return swarm_.is_member(xpk); } @@ -395,8 +446,13 @@ void ServiceNode::record_retrieve_request() { all_stats_.bump_retrieve_requests(); } +struct LookupRetryIndexes { + std::optional retryable_index; + std::optional node_index; +}; + void ServiceNode::check_new_members() { - for (const auto& pk : swarm_.extract_pending_members()) { + for (const auto& pk : swarm_.extract_contact_pending_members()) { auto c = network_.contacts.find(pk); if (!c || !*c) { // We don't have contact info, so don't do anything right now and this will get @@ -416,46 +472,96 @@ void ServiceNode::check_new_members() { pk, fmt::join(NEW_SWARM_MEMBER_HANDSHAKE_VERSION, "."), fmt::join(c->version, ".")); - swarm_.set_member_ready(pk); + + std::lock_guard network_lock{network().mut_}; + if (SwarmMemberState* member = swarm_.is_member_locked(pk); member) + member->status = SwarmMemberStatus::Ready; continue; } - log::debug(logcat, "Initiating contact with new swarm member {}", pk); - omq_server_->request( - c->pubkey_x25519.view(), - "sn.data_ready", - [this, pk](bool success, std::vector data) { - if (data.empty()) { - success = false; - data.push_back("Empty reply"s); - } else if (data[0] != "OK"sv) { - success = false; - } - if (!success) { - log::info( - logcat, - "Failed to connect to remote SS {} to initiate new " - "data transfer ({}); will retry soon", - pk, - fmt::join(data, ", ")); - return; + auto on_sn_data_ready_response = [this, pk](bool success, std::vector data) { + if (data.empty()) { + success = false; + data.push_back("Empty reply"s); + } else if (data[0] != "OK"sv) { + success = false; + } + + if (success) { + log::debug( + logcat, + "Successful contact made with swarm member {}, marking as ready", + pk); + } else { + log::info( + logcat, + "Failed to connect to remote SS {} to initiate new " + "data transfer ({}); will retry soon", + pk, + fmt::join(data, ", ")); + } + + // The 'pk' member might not be in the swarm anymore if the request elapsed over a + // period of time where the swarm composition changed. + std::lock_guard network_lock{network().mut_}; + if (SwarmMemberState* member = swarm_.is_member_locked(pk); member) { + // Update the requested DB dump state machine if necessary. + SwarmRequestedDBDump& status = member->our_ss_requested_db_dump; + if (status == SwarmRequestedDBDump::RequestUnderway) { + status = success ? SwarmRequestedDBDump::Done + : SwarmRequestedDBDump::NeedsToRequest; + } + + if (success) + member->status = SwarmMemberStatus::Ready; + } + }; + + if (c->version >= SN_DATA_READY_WITH_REQUEST_VERSION) { + // Build 'data ready' request + bool needs_db_dump{false}; + { + std::lock_guard network_lock{network().mut_}; + if (SwarmMemberState* member = swarm_.is_member_locked(pk); member) { + SwarmRequestedDBDump& status = member->our_ss_requested_db_dump; + if (status == SwarmRequestedDBDump::NeedsToRequest) { + status = SwarmRequestedDBDump::RequestUnderway; + needs_db_dump = true; } - log::debug( - logcat, - "Successful contact made with swarm member {}, queuing a message push", - pk); - swarm_.set_member_ready(pk); - }); + } + } + + // Serialise our response and send it off + auto serialised = snode::serialise_data_ready_request(needs_db_dump); + log::debug( + logcat, + "Initiating contact with new swarm member {}{}", + pk, + needs_db_dump ? " (requesting DB dump)" : ""); + omq_server_->request( + c->pubkey_x25519.view(), + "sn.data_ready", + on_sn_data_ready_response, + std::move(serialised)); + } else { + log::debug(logcat, "Initiating contact with new swarm member {}", pk); + omq_server_->request( + c->pubkey_x25519.view(), "sn.data_ready", on_sn_data_ready_response); + } } - if (auto send_now = swarm_.extract_ready_members(); !send_now.empty()) { - auto msgs = db_->retrieve_all(); + if (auto send_now = swarm_.extract_contacts_needing_db_dump(); !send_now.empty()) { log::debug( logcat, - "Initiating swarm message dump ({} message) to new swarm member(s): {}", - msgs.size(), + "Initiating swarm message dump to swarm member(s): {}", fmt::join(send_now, ", ")); - relay_messages(std::move(msgs), send_now); + auto boundaries = network_.get_swarm_boundaries(swarm_.cur_swarm_id_); + db->foreach_swarm_message( + [&send_now, this](const std::vector& messages) { + relay_messages(messages, send_now); + }, + boundaries.first, + boundaries.second); } } @@ -528,7 +634,7 @@ bool ServiceNode::process_store( all_stats_.bump_store_requests(); /// store in the database (if not already present) - const auto result = db_->store(msg, expiry); + const auto result = db->store(msg, expiry); if (new_msg) *new_msg = result == StoreResult::New; @@ -540,7 +646,7 @@ bool ServiceNode::process_store( void ServiceNode::save_bulk(const std::vector& msgs) { try { - db_->bulk_store(msgs); + db->bulk_store(msgs); } catch (const std::exception& e) { log::error(logcat, "failed to save batch to the database: {}", e.what()); return; @@ -550,7 +656,7 @@ void ServiceNode::save_bulk(const std::vector& msgs) { } void ServiceNode::on_bootstrap_update(block_update&& bu) { - swarm_.update_swarms(std::move(bu.swarms), bu.contacts); + swarm_.update_swarms(bu.height, std::move(bu.swarms), bu.contacts); target_height_ = std::max(target_height_, bu.height); } @@ -592,7 +698,7 @@ void ServiceNode::on_snodes_update(block_update&& bu) { active_ = true; } - auto events = swarm_.update_swarms(std::move(bu.swarms), bu.contacts); + auto events = swarm_.update_swarms(bu.height, std::move(bu.swarms), bu.contacts); if (const SnodeStatus status = events.our_swarm_id != INVALID_SWARM_ID ? SnodeStatus::ACTIVE : bu.decommed ? SnodeStatus::DECOMMISSIONED @@ -673,6 +779,12 @@ void ServiceNode::update_swarms(std::promise* on_finish) { params.dump()); } +void ServiceNode::set_member_needs_db_dump(const crypto::legacy_pubkey& pk) { + std::lock_guard lock{network().mut_}; // Use the same lock as Swarm member functions + if (SwarmMemberState* state = swarm_.is_member_locked(pk); state) + state->their_ss_needs_db_dump = true; +} + void ServiceNode::process_snodes_update(std::string_view data) { auto maybe_bu = parse_swarm_update(data, our_keys_.pub); @@ -701,8 +813,9 @@ void ServiceNode::process_snodes_update(std::string_view data) { auto [total, contactable] = network_.contacts.counts(); auto missing = total - contactable; - if (total >= (oxenss::is_mainnet ? 100 : 10) && - missing <= MISSING_PUBKEY_THRESHOLD::num * total / MISSING_PUBKEY_THRESHOLD::den) { + if (skip_bootstrap_ || + (total >= (oxenss::is_mainnet ? 100 : 10) && + missing <= MISSING_PUBKEY_THRESHOLD::num * total / MISSING_PUBKEY_THRESHOLD::den)) { log::info( logcat, "Initialized from oxend with {}/{} contactable service nodes", @@ -963,36 +1076,31 @@ void ServiceNode::report_reachability( void ServiceNode::bootstrap_swarms(const std::set& swarms) const { std::lock_guard guard(sn_mutex_); - if (swarms.empty()) - log::info(logcat, "Bootstrapping all swarms"); - else if (logcat->level() <= log::Level::info) - log::info(logcat, "Bootstrapping swarms: [{}]", fmt::join(swarms, ", ")); - - std::unordered_map pk_swarm_cache; - std::unordered_map> to_relay; + const std::set* swarms_ptr = &swarms; + std::optional> all_swarms; - std::vector all_msgs = db_->retrieve_all(); - log::debug(logcat, "We have {} messages", all_msgs.size()); - for (auto& entry : all_msgs) { - if (!entry.pubkey) { - log::error(logcat, "Invalid pubkey in a message while bootstrapping other nodes"); - continue; + if (swarms.empty()) { + log::info(logcat, "Bootstrapping all swarms"); + all_swarms = network_.get_all_swarm_ids(); + if (all_swarms->empty()) { + log::warning(logcat, "Bootstrapping all swarms, but there are none?"); + return; + } + swarms_ptr = &*all_swarms; + } else if (logcat->level() <= log::Level::info) + log::info(logcat, "Bootstrapping swarms: [{}]", fmt::join(*swarms_ptr, ", ")); + + for (const auto& swarm_id : *swarms_ptr) { + if (auto swarm = network_.get_swarm(swarm_id)) { + auto boundaries = network_.get_swarm_boundaries(swarm_id); + db->foreach_swarm_message( + [&swarm, this](const std::vector& messages) { + relay_messages(messages, *swarm); + }, + boundaries.first, + boundaries.second); } - - auto [it, ins] = pk_swarm_cache.try_emplace(entry.pubkey); - if (ins) - it->second = network_.get_swarm_id_for(entry.pubkey).value_or(INVALID_SWARM_ID); - auto swarm_id = it->second; - - if (swarms.empty() || swarms.count(swarm_id)) - to_relay[swarm_id].push_back(std::move(entry)); } - - log::trace(logcat, "Bootstrapping {} swarms", to_relay.size()); - - for (const auto& [swarm_id, items] : to_relay) - if (auto swarm = network_.get_swarm(swarm_id)) - relay_messages(items, *swarm); } void ServiceNode::relay_messages( @@ -1077,7 +1185,7 @@ std::string ServiceNode::get_stats() const { val["height"] = block_height_; val["target_height"] = target_height_; - std::vector counts = db_->get_message_counts(); + std::vector counts = db->get_message_counts(); int64_t total = std::accumulate(counts.begin(), counts.end(), int64_t{0}); counts.erase( @@ -1128,12 +1236,12 @@ std::string ServiceNode::get_stats() const { val["account_msg_mean"] = total / (double)counts.size(); auto& ns_stats = (val["namespace_messages"] = nlohmann::json::object()); - for (auto& [ns, count] : db_->get_namespace_counts()) + for (auto& [ns, count] : db->get_namespace_counts()) ns_stats[fmt::format("{}", ns)] = count; - val["db_used"] = db_->get_used_bytes(); - val["db_total"] = db_->get_total_bytes(); - val["db_max"] = Database::SIZE_LIMIT; + val["dbused"] = db->get_used_bytes(); + val["dbtotal"] = db->get_total_bytes(); + val["dbmax"] = Database::SIZE_LIMIT; return val.dump(); } @@ -1162,9 +1270,9 @@ std::string ServiceNode::get_status_line() const { STORAGE_SERVER_VERSION_STRING, oxenss::is_mainnet ? "" : " (TESTNET)", syncing_ ? "; SYNCING" : "", - db_->get_message_count(), - util::get_human_readable_bytes(db_->get_used_bytes()), - db_->get_owner_count(), + db->get_message_count(), + util::get_human_readable_bytes(db->get_used_bytes()), + db->get_owner_count(), stats.client_store_requests, stats.client_retrieve_requests, stats.onion_requests, @@ -1196,4 +1304,67 @@ void ServiceNode::process_push_batch(std::string_view blob, std::string_view sen log::trace(logcat, "Saving all: end"); } +void ServiceNode::check_retry_requests() { + db->remove_expired_retry_requests(); + + db->foreach_ready_retry_request([this](const crypto::legacy_pubkey& key, + const std::string& cmd, + const std::string& payload, + int64_t req_id) { + // FIXME: non-swarm-member retries should be purged automatically + // std::optional is_member = swarm_.is_member(key); + + crypto::x25519_pubkey pubkey_x25519 = {}; + + auto ct = contacts().find(key); + if (ct && *ct) + pubkey_x25519 = ct->pubkey_x25519; + + if (pubkey_x25519) { + auto on_request_done = [this, req_id](bool success, std::vector parts) { + // We cleanup the request in all situations except timeout (timeout + // indicating that the node was non-responsive, maybe offline). In an error + // state we don't know what state the recipient's storage server is in and + // we default to deleting it and ending the retry attempts. + rpc::SNStorageCCResult store_result = + rpc::interpret_sn_storage_cc_response_parts(success, parts); + if (store_result.status != rpc::SNStorageCCResultStatus::Timeout) { + db->remove_node_retry_request(req_id); + } + }; + omq_server()->request( + pubkey_x25519.view(), + "sn.storage_cc", + on_request_done, + cmd, + payload, + oxenmq::send_option::request_timeout{5s}); + } + }); +} + +void ServiceNode::retryable_requests_thread_entry_point() { + while (!shutting_down_) { + // FIXME: is this extra wakeup necessary/useful? If a retry is pending, the initial + // request must have timed out (5 seconds), so presumably just checking on retries + // every 5 seconds (maybe slightly more frequently?) should be fine. + // + // At longest, we timeout on the blocking sleep every 5s, or, as soon as someone wakes up + // the thread by notifying the condition var + // - when a new retryable request is added + // - we're shutting down + // - or we know there's an earlier deadline in the list of requests to be retried + // - a node's contact detail was updated + // - a retryable request failed and a new deadline was posted + auto earliest_deadline = std::chrono::steady_clock::now() + 5s; + + std::unique_lock lock{retryable_requests_mutex}; + retryable_requests_cv.wait_until(lock, earliest_deadline); + + if (shutting_down_) + break; + + check_retry_requests(); + } +} } // namespace oxenss::snode diff --git a/oxenss/snode/service_node.h b/oxenss/snode/service_node.h index 3c8d76b74..ee54115e6 100644 --- a/oxenss/snode/service_node.h +++ b/oxenss/snode/service_node.h @@ -57,11 +57,7 @@ inline constexpr hf_revision STORAGE_SERVER_HARDFORK = {19, 6}; // The storage server version at which initial handshaking is supported before attempting a swarm // message transfer. inline constexpr std::array NEW_SWARM_MEMBER_HANDSHAKE_VERSION = {2, 10, 0}; - -class Swarm; - -/// WRONG_REQ - request was ignored as not valid (e.g. incorrect tester) -enum class MessageTestStatus { SUCCESS, RETRY, ERROR, WRONG_REQ }; +inline constexpr std::array SN_DATA_READY_WITH_REQUEST_VERSION = {2, 11, 0}; constexpr std::string_view to_string(SnodeStatus status) { switch (status) { @@ -73,27 +69,54 @@ constexpr std::string_view to_string(SnodeStatus status) { return "Unknown"sv; } +enum class RetryReason { + NON_CONTACTABLE, + FAILED_TO_SEND, +}; + +struct RequestRetryEntry { + crypto::legacy_pubkey key; + RetryReason reason; + bool retry_underway; + std::chrono::steady_clock::time_point deadline; + std::chrono::milliseconds next_retry_delay; +}; + +struct RequestRetry { + std::string cmd; + std::string req_payload; + uint64_t hash; + std::chrono::steady_clock::time_point create_time; + std::vector nodes; +}; + /// All service node logic that is not network-specific class ServiceNode { bool syncing_ = true; bool active_ = false; std::atomic got_first_response_ = false; bool force_start_ = false; + bool skip_bootstrap_ = false; std::atomic shutting_down_ = false; hf_revision hardfork_ = {0, 0}; uint64_t block_height_ = 0; uint64_t target_height_ = 0; std::string block_hash_; - std::unique_ptr db_; std::weak_ptr http_; + public: + // bit messy, but Swarm needs db startup version, so db has to init before Swarm + std::unique_ptr db; + + private: SnodeStatus status_ = SnodeStatus::UNKNOWN; const crypto::legacy_keypair our_keys_; const contact our_contact_; Network network_; - Swarm swarm_{network_, our_keys_.pub}; + + Swarm swarm_; server::OMQ& omq_server_; std::vector mq_servers_; @@ -112,6 +135,19 @@ class ServiceNode { mutable std::recursive_mutex sn_mutex_; + // Lock to be taken when interacting with the 'retryable_requests' queue + mutable std::mutex retryable_requests_mutex; + + std::thread retryable_requests_thread; + + // The hash of the last swarms blob that was serialised, used for dirty checks before storing to + // the DB. + uint64_t last_swarms_serialize_hash = 0; + + // The hash of the last retryable requsts blob that was serialised, used for dirty checks before + // storing to the DB. + uint64_t last_retryable_serialize_hash = 0; + void send_notifies(message m); // Save multiple messages to the database at once (i.e. in a single transaction) @@ -170,16 +206,15 @@ class ServiceNode { const contact& contact, server::OMQ& omq_server, const std::filesystem::path& db_location, - bool force_start); - - Database& get_db() { return *db_; } - const Database& get_db() const { return *db_; } + bool force_start, + bool skip_bootstrap); const Network& network() { return network_; } const Swarm& swarm() { return swarm_; } Contacts& contacts() { return network_.contacts; } + const Contacts& contacts() const { return network_.contacts; } const contact& own_address() { return our_contact_; } @@ -210,8 +245,9 @@ class ServiceNode { rpc::OnionRequestMetadata&& data, std::function data)> cb) const; - // Returns true if the given x pubkey is recognized as one of our current swarm members - bool is_swarm_peer(const crypto::x25519_pubkey& xpk); + // Returns the peer's state if the given x pubkey is recognized as one of our current swarm + // members + std::optional is_swarm_peer(const crypto::x25519_pubkey& xpk); const hf_revision& hf() const { return hardfork_; } @@ -271,9 +307,24 @@ class ServiceNode { // Called when oxend notifies us of a new block to update swarm info void update_swarms(std::promise* on_completion = nullptr); + // Mark the swarm member identified by 'pk' as needing a dump of the DB. When the 'check new + // members' routine for swarms is periodically executed, swarm members marked with this flag + // will then get the entire DB synchronised to them. No-op if the key does not match anyone in + // the swarm. + void set_member_needs_db_dump(const crypto::legacy_pubkey& pk); + server::OMQ& omq_server() { return omq_server_; } + + std::condition_variable retryable_requests_cv; + + void retryable_requests_thread_entry_point(); + + void check_retry_requests(); }; +// at the moment we only care about the "needs_db_dump" boolean +bool deserialise_data_ready_request(std::string_view data); + } // namespace oxenss::snode template <> diff --git a/oxenss/snode/swarm.cpp b/oxenss/snode/swarm.cpp index e3a604f88..ac4e8258a 100644 --- a/oxenss/snode/swarm.cpp +++ b/oxenss/snode/swarm.cpp @@ -16,7 +16,7 @@ static auto logswarm = log::Cat("swarm"); Swarm::~Swarm() = default; -SwarmEvents Swarm::derive_swarm_events(const swarms_t& swarms) const { +SwarmEvents Swarm::derive_swarm_events(uint64_t height, const swarms_t& swarms) const { SwarmEvents events{}; events.our_swarm_id = INVALID_SWARM_ID; @@ -31,14 +31,27 @@ SwarmEvents Swarm::derive_swarm_events(const swarms_t& swarms) const { const auto& new_swarm = events.our_swarm_id; const auto& old_swarm = cur_swarm_id_; - if (new_swarm == INVALID_SWARM_ID) + if (new_swarm == INVALID_SWARM_ID) { + if (cur_swarm_id_ != INVALID_SWARM_ID) + log::warning( + logswarm, + "Leaving swarm {:#018x}: we are no longer an active Service Node", + cur_swarm_id_); + else + log::debug(logswarm, "Still not an active Service Node"); + // We are not in any swarm (or have been kicked out); nothing to do return events; + } - if (old_swarm == INVALID_SWARM_ID) - // We were previously not in a swarm, which means we just got assigned to one and so we have - // nothing to do (other snodes will also see this and push messages to us). + if (old_swarm == INVALID_SWARM_ID) { + log::info(logcat, "Joined swarm {:#18x} (blk {})", new_swarm, height); + // We were previously not in a swarm, which means we just got assigned to one, we need to + // relay any of our messages belonging to the swarm + events.new_swarm_members = events.our_swarm_members; + events.new_swarm_members.erase(our_pk); return events; + } if (old_swarm != new_swarm) { // Moved to a new swarm @@ -58,6 +71,13 @@ SwarmEvents Swarm::derive_swarm_events(const swarms_t& swarms) const { // |.................########|########!!!!!!!!!!!!!!!!!| events.dissolved = true; } + log::info( + logcat, + "Changed from {:018x} {}to {:018x} (blk {})", + old_swarm, + new_swarm, + height, + events.dissolved ? "(dissolved) " : ""); // If our old swarm is still alive then that means we got moved out of it, and so there's // nothing for us to do because the remaining swarm members will continue to administer the @@ -69,12 +89,9 @@ SwarmEvents Swarm::derive_swarm_events(const swarms_t& swarms) const { /// --- WE are still in the same swarm if we reach here --- /// See if anyone joined our swarm: if so, we need to push messages to them: - std::set_difference( - events.our_swarm_members.begin(), - events.our_swarm_members.end(), - members_.begin(), - members_.end(), - std::inserter(events.new_swarm_members, events.new_swarm_members.end())); + for (auto it : events.our_swarm_members) + if (members_.count(it) == 0) + events.new_swarm_members.insert(it); events.new_swarm_members.erase(our_pk); // See if there are any new swarms, because if there are, we might need to push messages to them @@ -105,48 +122,92 @@ SwarmEvents Swarm::derive_swarm_events(const swarms_t& swarms) const { } SwarmEvents Swarm::update_swarms( - swarms_t&& swarms, const std::map& new_contacts) { + uint64_t height, + swarms_t&& swarms, + const std::map& new_contacts) { std::lock_guard lock{network.mut_}; - auto events = derive_swarm_events(swarms); - - if (events.our_swarm_id == INVALID_SWARM_ID) { - if (cur_swarm_id_ != INVALID_SWARM_ID) - log::warning( - logswarm, - "Leaving swarm {:#018x}: we are no longer an active Service Node", - cur_swarm_id_); - else - log::debug(logswarm, "Still not an active Service Node"); - } else { - - if (cur_swarm_id_ == INVALID_SWARM_ID) - log::info(logswarm, "SN now active, joining swarm {:#018x}", events.our_swarm_id); - else if (cur_swarm_id_ != events.our_swarm_id) - log::info( - logswarm, - "SN moving from swarm {:#018x} to swarm {:#018x}", - cur_swarm_id_, - events.our_swarm_id); - - // The following only make sense if we are active, i.e. still in a swarm - - if (events.dissolved) - log::info(logswarm, "Our swarm ({:#018x}) got DISSOLVED!", cur_swarm_id_); + auto events = derive_swarm_events(height, swarms); + if (db_was_initially_empty_with_swarm_id == INVALID_SWARM_ID) + db_was_initially_empty_with_swarm_id = events.our_swarm_id; - for (const auto& pk : events.new_swarm_members) { + if (events.our_swarm_id != INVALID_SWARM_ID) { + for (const auto& pk : events.new_swarm_members) log::info(logswarm, "New SN joining our swarm: {}", pk); - pending_new_members_.emplace(pk, std::chrono::steady_clock::now()); - } for (auto swarm : events.new_swarms) log::info(logswarm, "New network swarm: {}", swarm); - members_ = events.our_swarm_members; + // Remove members that are no longer in the swarm from our runtime state + for (auto it = members_.begin(); it != members_.end();) { + if (events.our_swarm_members.find(it->first) == events.our_swarm_members.end()) + it = members_.erase(it); + else + it++; + } + + // TODO: Remove the versions checks below after everyone migrates their SQL DB to v1. The + // version checks gate the new behaviour where this SS will request a dump of the swarm + // member's DB to synchronise new messages. + // + // When a SS upgrades to this version, their DB is initially set to v0 and all the prior + // active service nodes that upgrade will have the chain synchronised and their SS's sitting + // in the correct swarm. We do _not_ want those storage servers to, on upgrade, request a DB + // dump of all the messages from each swarm peer as they are (presumably) relatively synced. + // + // The SS's on v0 don't persist the swarm state to the DB, so on startup they always + // re-bootstrap the state of their swarms. This populates the new-swarm-members array and + // hence triggers the extraneous swarm dump. + // + // The version gate protects against that happening to all the individual nodes on upgrade. + // Once all v0 SS's upgrade, the DB will be marked v1. From that point, swarms are persisted + // onto disk and so any SS's that appear in the new-swarm-members array is _actually_ a new + // SS and we _should_ request a DB a dump from them to synchronise messages they might have + // for us. + // + // New incoming nodes in general are going to end up having 0 messages for us if they are + // joining the network for the first time. + // + // If we are joining a swarm, then, all the members of the swarm are in the + // new-swarm-members array and we will request a DB dump from them. + // + // In a swarm dissolving case, then, these new nodes will have a chunk of messages in the + // adjacent message space that belong to this swarm they are merging into. That is handled + // here. + + // Add members from the swarm that are missing from our runtime state and request a DB dump + // from them to ensure we have all the messages they have that we don't. + for (auto it : events.new_swarm_members) { + auto& pair = members_[it]; + if (!did_swarm_space_check && _db.had_swarm_state_on_open()) { + if (pair.our_ss_requested_db_dump == SwarmRequestedDBDump::Nil) + pair.our_ss_requested_db_dump = SwarmRequestedDBDump::NeedsToRequest; + } + } + + did_swarm_space_check = true; + + // If the DB was empty on startup then we mark all swarm members as peers that we need to + // request a DB dump from. Note we only do this if the swarm matches the initial swarm we + // were in when the DB was queried. We might have changed swarms since startup, in which + // case, the above branch will already initiate a DB dump request for us. + // + // This also covers the case where someone drops the messages table and restarts the SS, we + // need to resync all the messages from everyone in the swarm. + if (db_was_initially_empty_with_swarm_id == events.our_swarm_id && + !db_was_initially_empty_handled) { + db_was_initially_empty_handled = true; + for (auto& it : members_) { + if (it.second.our_ss_requested_db_dump == SwarmRequestedDBDump::Nil) { + it.second.our_ss_requested_db_dump = SwarmRequestedDBDump::NeedsToRequest; + } + } + } } cur_swarm_id_ = events.our_swarm_id; + _db.update_current_swarm(cur_swarm_id_); network.update_swarms(std::move(swarms), new_contacts); @@ -158,35 +219,47 @@ bool Swarm::is_pubkey_for_us(const user_pubkey& pk) const { return maybe_swarm && cur_swarm_id_ == *maybe_swarm; } -std::set Swarm::members() const { +std::map Swarm::members() const { std::shared_lock lock{network.mut_}; return members_; } // Returns a copy of all the other members of this swarm, not including this node. -std::set Swarm::peers() const { +std::map Swarm::peers() const { auto peers = members(); peers.erase(our_pk); return peers; } -bool Swarm::is_member(const crypto::legacy_pubkey& pk) const { +std::optional Swarm::is_member(const crypto::legacy_pubkey& pk) const { std::shared_lock lock{network.mut_}; - return members_.count(pk); + std::optional result; + if (const auto& it = members_.find(pk); it != members_.end()) + result = it->second; + return result; } -bool Swarm::is_member(const crypto::x25519_pubkey& pk) const { +std::optional Swarm::is_member(const crypto::x25519_pubkey& pk) const { std::shared_lock lock{network.mut_}; + std::optional result; if (auto lpk = network.contacts.lookup(pk)) - return members_.count(*lpk); - return false; + result = is_member(*lpk); + return result; } -bool Swarm::is_member(const crypto::ed25519_pubkey& pk) const { +std::optional Swarm::is_member(const crypto::ed25519_pubkey& pk) const { std::shared_lock lock{network.mut_}; + std::optional result; if (auto lpk = network.contacts.lookup(pk)) - return members_.count(*lpk); - return false; + result = is_member(*lpk); + return result; +} + +SwarmMemberState* Swarm::is_member_locked(const crypto::legacy_pubkey& pk) { + SwarmMemberState* result = nullptr; + if (auto it = members_.find(pk); it != members_.end()) + result = &it->second; + return result; } size_t Swarm::size() const { @@ -194,54 +267,41 @@ size_t Swarm::size() const { return members_.size(); } -std::set Swarm::extract_pending_members() { +std::set Swarm::extract_contact_pending_members() { std::lock_guard lock{network.mut_}; std::set result; auto now = std::chrono::steady_clock::now(); - for (auto it = pending_new_members_.begin(); it != pending_new_members_.end();) { - auto& [pk, when] = *it; - if (!members_.count(pk)) { - // No longer in our swarm - it = pending_new_members_.erase(it); + for (auto it = members_.begin(); it != members_.end(); it++) { + SwarmMemberState& state = it->second; + if (state.status != SwarmMemberStatus::ContactDetailsPending) continue; - } - - if (when && *when <= now) { - *when = now + NEW_SWARM_MEMBER_RETRY; + std::chrono::steady_clock::time_point& next_retry = + it->second.check_contact_info_next_retry; + if (now >= next_retry) { + next_retry = now + NEW_SWARM_MEMBER_RETRY; + const crypto::legacy_pubkey& pk = it->first; result.insert(pk); } - ++it; } return result; } -std::set Swarm::extract_ready_members() { +std::set Swarm::extract_contacts_needing_db_dump() { std::lock_guard lock{network.mut_}; std::set result; - for (auto it = pending_new_members_.begin(); it != pending_new_members_.end();) { - auto& [pk, when] = *it; - if (!members_.count(pk)) { - // No longer in our swarm - it = pending_new_members_.erase(it); - } else if (!when) { - // Found one that is marked ready, so steal it: - result.insert(pk); - it = pending_new_members_.erase(it); - } else { - ++it; + for (auto& it : members_) { + if (it.second.status == SwarmMemberStatus::Ready) { + const crypto::legacy_pubkey& pk = it.first; + if (it.second.their_ss_needs_db_dump) { + it.second.their_ss_needs_db_dump = false; + result.insert(pk); + } } } return result; } - -void Swarm::set_member_ready(const crypto::legacy_pubkey& pk) { - std::lock_guard lock{network.mut_}; - if (auto it = pending_new_members_.find(pk); it != pending_new_members_.end()) - it->second = std::nullopt; -} - } // namespace oxenss::snode diff --git a/oxenss/snode/swarm.h b/oxenss/snode/swarm.h index 91dd0fafa..0876ccb8b 100644 --- a/oxenss/snode/swarm.h +++ b/oxenss/snode/swarm.h @@ -2,10 +2,10 @@ #include #include -#include #include "network.h" #include "oxenss/crypto/keys.h" +#include "oxenss/storage/database.hpp" namespace oxenss::snode { @@ -28,66 +28,120 @@ struct SwarmEvents { std::set our_swarm_members; }; +enum struct SwarmMemberStatus { + // Pubkeys of new members into our swarm who we haven't yet established communications with; + // once we do, we push all our swarm's messages to them. + ContactDetailsPending, + Ready, +}; + +enum struct SwarmRequestedDBDump { + Nil, + NeedsToRequest, + RequestUnderway, + Done, +}; + +struct SwarmMemberState { + SwarmMemberStatus status; + + // Flags for if our storage server needs to initiate a request to receive a DB dump from this + // member. 'Nil' if no action is to be taken, otherwise this flag transition from + // 'NeedsToRequest' to 'RequestUnderway' to 'Done' via the outgoing data ready handshake. + SwarmRequestedDBDump our_ss_requested_db_dump; + + // Set if this swarm member has requested a DB dump from us in the data ready handshake. If set + // they are assumed to not have any of the messages for the swarm yet so a full DB dump will be + // initiated for messages we own that belong to the swarm when the 'check new members' routine + // occurs. + bool their_ss_needs_db_dump; + + // The earliest timestamp at which the swarm will check if they have received contact + // information for this member yet and can send them data. Only utilised when status is + // 'ContactDetailsPending' before transitioning to 'ContactDetailsReady' when the contact + // detail has been confirmed. + std::chrono::steady_clock::time_point check_contact_info_next_retry; +}; + // How often we wait, after returning a pending new member, before we return the member again from // `extract_new_members()`. constexpr auto NEW_SWARM_MEMBER_RETRY = 30s; class Swarm { + // Extract relevant information from incoming swarm composition. + SwarmEvents derive_swarm_events(uint64_t height, const swarms_t& swarms) const; + + friend class ServiceNode; + + std::map + members_; // includes `our_pk`, when we are in a swarm. + swarm_id_t cur_swarm_id_ = INVALID_SWARM_ID; - std::set members_; // includes `our_pk`, when we are in a swarm. + // Track which swarm we were set to when we determined that the DB was empty. This helps track + // which set of peers we should attempt to request a DB dump from since swarms may change during + // that asynchronous process. If the swarm does change, the act of joining a new swarm triggers + // a DB dump which invalidates the need to request a DB dump from our initial but now, + // irrelevant swarm peers, identified by this swarm ID. + // + // It is important to remember this on startup because if you were active, you may start + // receiving messages before the server contacts peers to request a swarm DB dump to synchronise + // messages which would seed the database and checking this later would fail. + swarm_id_t db_was_initially_empty_with_swarm_id = INVALID_SWARM_ID; - // Pubkeys of new members into our swarm who we haven't yet established communications with; - // once we do, we push all our swarm's messages to them. The value is the earliest timestamp at - // which we should next try contacting them, or nullopt if we have confirmed contact and can now - // send the data. - std::unordered_map> - pending_new_members_; + // Flag that stops the DB initially empty w/ swarm ID from executing more than once. + bool db_was_initially_empty_handled = false; - // Extract relevant information from incoming swarm composition. - SwarmEvents derive_swarm_events(const swarms_t& swarms) const; + Database& _db; - public: - Network& network; - const crypto::legacy_pubkey our_pk; + bool did_swarm_space_check = false; - Swarm(Network& network, const crypto::legacy_pubkey& our_pk) : - network{network}, our_pk{our_pk} {} + public: + Swarm(Network& network, const crypto::legacy_pubkey& our_pk, Database& db) : + _db(db), network{network}, our_pk{our_pk} {} ~Swarm(); + Network& network; + + const crypto::legacy_pubkey our_pk; + /// Update swarm state; this takes care of updating both this swarm itself, and propagates the /// general network swarm changes to the Network object (including contacts) as well. SwarmEvents update_swarms( - swarms_t&& swarms, const std::map& new_contacts); + uint64_t height, + swarms_t&& swarms, + const std::map& new_contacts); bool is_pubkey_for_us(const user_pubkey& pk) const; // Returns a copy of all the members of this swarm, including this node. - std::set members() const; + std::map members() const; // Returns a copy of all the other members of this swarm, not including this node. - std::set peers() const; + std::map peers() const; - // Returns true if the given pubkey is recognized as a member of this swarm. - bool is_member(const crypto::legacy_pubkey& pk) const; - bool is_member(const crypto::x25519_pubkey& pk) const; - bool is_member(const crypto::ed25519_pubkey& pk) const; + // Returns the swarm member's state if the given pubkey is recognized as a member of this swarm. + std::optional is_member(const crypto::legacy_pubkey& pk) const; + std::optional is_member(const crypto::x25519_pubkey& pk) const; + std::optional is_member(const crypto::ed25519_pubkey& pk) const; + + // Returns the underlying swarm member's state. Returns a null pointer if 'pk' is not a member + // in your swarm. Caller must hold a lock on the network mutex to call this and the pointer is + // only valid whilst that lock remains held. + SwarmMemberState* is_member_locked(const crypto::legacy_pubkey& pk); // Returns the size of this swarm (including this node). size_t size() const; // Resets the timer and returns the pubkeys of any new swarm members that are due to be - // contacted to push swarm messages to. - std::set extract_pending_members(); - - // Marks a pending member as ready, so that it is returned by the next call to - // `extract_ready_members()`, and is no longer returned by `extract_pending_members()`. - void set_member_ready(const crypto::legacy_pubkey& pk); + // contacted to establish liveness in prep for transitioning to a contact that we can push swarm + // messages to. + std::set extract_contact_pending_members(); - // Extracts any "ready" members (that is, those that were pending and then marked ready with - // `set_member_ready`), returning them and removing them from the pending members list. - std::set extract_ready_members(); + // Returns the pubkeys of any new swarm members that have joined that we now have contact + // details for, mark them as ready and need a dump of the DB. + std::set extract_contacts_needing_db_dump(); swarm_id_t our_swarm_id() const { std::shared_lock lock{network.mut_}; diff --git a/oxenss/storage/database.cpp b/oxenss/storage/database.cpp index 3b91fb286..0b92c0f55 100644 --- a/oxenss/storage/database.cpp +++ b/oxenss/storage/database.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -18,6 +19,9 @@ #include #include #include +#include "oxenc/bt_serialize.h" +#include "oxenc/bt_value.h" +#include "oxenss/crypto/keys.h" #include #include @@ -230,6 +234,34 @@ namespace { } // namespace +user_pubkey load_pubkey(uint8_t type, std::string pk) { + return {type, std::move(pk)}; +} + +void sqlite_swarm_space( + sqlite3_context* sqlite_context, [[maybe_unused]] int argc, sqlite3_value** argv, bool hi) { + assert(argc == 1); + assert(sqlite3_value_bytes(argv[0])); + auto* key_blob = sqlite3_value_blob(argv[0]); + auto pubkey = load_pubkey(0 /* irrelevant */, {reinterpret_cast(key_blob), 32}); + auto swarm_space = pubkey_to_swarm_space(pubkey); + + if (hi) + swarm_space = swarm_space >> 32; + else + swarm_space = swarm_space & 0xffffffff; + + sqlite3_result_int64(sqlite_context, swarm_space); +} + +void sqlite_swarm_space_hi(sqlite3_context* sqlite_context, int argc, sqlite3_value** argv) { + sqlite_swarm_space(sqlite_context, argc, argv, true); +} + +void sqlite_swarm_space_lo(sqlite3_context* sqlite_context, int argc, sqlite3_value** argv) { + sqlite_swarm_space(sqlite_context, argc, argv, false); +} + class DatabaseImpl { public: oxenss::Database& parent; @@ -245,6 +277,11 @@ class DatabaseImpl { SQLite::OPEN_READWRITE | (initialize ? SQLite::OPEN_CREATE : 0) | SQLite::OPEN_NOMUTEX, SQLite_busy_timeout.count()} { + + // intialize sqlite application-defined functions (must be set up per-connection). + db.createFunction("func_swarm_space_hi", 1, true, nullptr, &sqlite_swarm_space_hi); + db.createFunction("func_swarm_space_lo", 1, true, nullptr, &sqlite_swarm_space_lo); + // Don't fail on these because we can still work even if they fail if (int rc = db.tryExec("PRAGMA journal_mode = WAL"); rc != SQLITE_OK) log::error(logcat, "Failed to set journal mode to WAL: {}", sqlite3_errstr(rc)); @@ -283,6 +320,8 @@ class DatabaseImpl { } void initialize_database() { + parent._had_swarm_state_on_open = db.tableExists("state_kv"); + if (!db.tableExists("owners")) { create_schema(); } @@ -332,8 +371,119 @@ CREATE TRIGGER IF NOT EXISTS revoked_autoclean )"); } - views_triggers_indices(); + if (!parent._had_swarm_state_on_open) { + log::info( + logcat, + "Upgrading database schema: adding swarm space cache, runtime state, " + "retryable requests, and public namespace unique constraint"); + + // swarm space is 64-bit unsigned, which means unfortunately we can't do queries + // on it with arithmetic properly (sqlite INTEGER is 64-bit signed). As such, we + // store it as two separate columns so we can query on it. + // + // The added trigger will automatically populate these two columns on insert, and the + // existing rows will have these columns populated by the UPDATE query after this. + db.exec(R"( +ALTER TABLE owners ADD COLUMN swarm_space_hi INTEGER NOT NULL DEFAULT -1; +ALTER TABLE owners ADD COLUMN swarm_space_lo INTEGER NOT NULL DEFAULT -1; + +CREATE TRIGGER swarm_space_trigger +AFTER INSERT ON owners +FOR EACH ROW +WHEN NEW.swarm_space_hi = -1 OR NEW.swarm_space_lo = -1 +BEGIN + UPDATE owners SET + swarm_space_hi = func_swarm_space_hi(NEW.pubkey), swarm_space_lo = func_swarm_space_lo(NEW.pubkey) + WHERE owners.id = NEW.id; +END; + )"); + + db.exec(R"( +UPDATE owners +SET swarm_space_hi = func_swarm_space_hi(pubkey), +swarm_space_lo = func_swarm_space_lo(pubkey) +WHERE swarm_space_hi = -1; + )"); + + db.exec(R"( +CREATE TABLE retry_requests ( + id INTEGER PRIMARY KEY, + command TEXT NOT NULL, + payload BLOB NOT NULL, + created DOUBLE PRECISION NOT NULL DEFAULT (unixepoch('now', 'subsec')) +); + +CREATE TABLE retry_pubkeys ( + id INTEGER PRIMARY KEY, + pubkey BLOB NOT NULL, + UNIQUE(pubkey) +); +CREATE TABLE retry_node_requests ( + id INTEGER PRIMARY KEY, + rr_id INTEGER NOT NULL REFERENCES retry_requests(id) ON DELETE CASCADE, + pk_id INTEGER NOT NULL REFERENCES retry_pubkeys(id) ON DELETE CASCADE, + next_retry DOUBLE PRECISION NOT NULL, + UNIQUE(rr_id, pk_id) +); + +CREATE INDEX retry_node_requests_pk_idx ON retry_node_requests(pk_id); + +CREATE VIEW retry_node_reqs AS + SELECT retry_node_requests.id AS rr_id, retry_requests.command, retry_requests.payload, + retry_pubkeys.pubkey AS pubkey, next_retry + FROM retry_node_requests + JOIN retry_requests ON retry_node_requests.rr_id = retry_requests.id + JOIN retry_pubkeys ON pk_id = retry_pubkeys.id; + +CREATE TRIGGER retry_node_add +INSTEAD OF INSERT ON retry_node_reqs +BEGIN + -- Allows insertion into the view (with the raw pubkey value) to automatically do the pubkey + -- lookup (with autovivification) for you. + INSERT OR IGNORE INTO retry_pubkeys (pubkey) VALUES (NEW.pubkey); + INSERT INTO retry_node_requests (rr_id, pk_id, next_retry) + VALUES (NEW.rr_id, (SELECT id FROM retry_pubkeys WHERE retry_pubkeys.pubkey = NEW.pubkey), NEW.next_retry); +END; + +CREATE TRIGGER rr_cleanup +AFTER DELETE ON retry_node_requests +BEGIN + -- After deleting a node request record this trigger handles cleaning up any pubkeys or request + -- commands that are no longer referenced. + DELETE FROM retry_pubkeys + WHERE id = OLD.pk_id + AND NOT EXISTS ( + SELECT 1 FROM retry_node_requests WHERE pk_id = OLD.pk_id + ); + DELETE FROM retry_requests + WHERE id = OLD.rr_id + AND NOT EXISTS ( + SELECT 1 FROM retry_node_requests WHERE rr_id = OLD.rr_id + ); +END; + +-- Generic key->value store for the database +-- in future, we may explicitly require TEXT for keys, but arbitrary type for values. +-- store arbitrary persistent state, e.g. which swarm were we in before restart +CREATE TABLE state_kv ( + key TEXT NOT NULL, + value TEXT, + UNIQUE(key) +); + +-- public namespaces are at most used for testing before this migration, so clear them before +-- adding the unique owner/namespace index +DELETE FROM messages WHERE namespace < 0 AND namespace % 20 = -1; + +CREATE UNIQUE INDEX message_outbox_singleton +ON messages(owner, namespace) +WHERE namespace < 0 AND namespace % 20 = -1; + + )"); + } + + views_triggers_indices(); log::info(logcat, "Database setup complete"); } @@ -462,6 +612,9 @@ CREATE INDEX IF NOT EXISTS messages_expiry ON messages(expiry); CREATE INDEX IF NOT EXISTS messages_owner ON messages(owner, namespace, timestamp); CREATE INDEX IF NOT EXISTS messages_hash ON messages(hash); +CREATE INDEX IF NOT EXISTS owners_swarm_hi ON owners(swarm_space_hi); +CREATE INDEX IF NOT EXISTS owners_swarm_lo ON owners(swarm_space_lo); + CREATE VIEW IF NOT EXISTS owned_messages AS SELECT owners.id AS oid, type, pubkey, messages.id AS mid, hash, namespace, timestamp, expiry, data FROM messages JOIN owners ON messages.owner = owners.id; @@ -507,8 +660,6 @@ DROP TRIGGER IF EXISTS owned_messages_upsert; auto prepared_get(const std::string& query, const Bind&... bind) { return exec_and_get(prepared_st(query), bind...); } - - user_pubkey load_pubkey(uint8_t type, std::string pk) { return {type, std::move(pk)}; } }; Database::Database(std::filesystem::path db_path) : db_path_{std::move(db_path)} { @@ -623,15 +774,20 @@ int64_t Database::get_used_bytes() { impl->prepared_get("PRAGMA freelist_count") * impl->page_size; } -static std::optional get_message(DatabaseImpl& impl, SQLite::Statement& st) { +std::optional Database::retrieve_by_hash(const std::string& msg_hash) { + auto impl = get_impl(false); + auto st = impl->prepared_st( + "SELECT hash, type, pubkey, namespace, timestamp, expiry, data" + " FROM owned_messages WHERE hash = ?"); + st->bindNoCopy(1, msg_hash); std::optional msg; - while (st.executeStep()) { + while (st->executeStep()) { assert(!msg); auto [hash, otype, opubkey, ns, ts, exp, data] = get( st); msg.emplace( - impl.load_pubkey(otype, std::move(opubkey)), + load_pubkey(otype, std::move(opubkey)), std::move(hash), ns, from_epoch_ms(ts), @@ -641,26 +797,6 @@ static std::optional get_message(DatabaseImpl& impl, SQLite::Statement& return msg; } -std::optional Database::retrieve_random() { - clean_expired(); // *Must* be before the below get_impl because otherwise the read-only impl - // would deadlock with the clean_expired write=true get_impl(). - auto impl = get_impl(false); - auto st = impl->prepared_st( - "SELECT hash, type, pubkey, namespace, timestamp, expiry, data" - " FROM owned_messages " - " WHERE mid = (SELECT id FROM messages ORDER BY RANDOM() LIMIT 1)"); - return get_message(*impl, st); -} - -std::optional Database::retrieve_by_hash(const std::string& msg_hash) { - auto impl = get_impl(false); - auto st = impl->prepared_st( - "SELECT hash, type, pubkey, namespace, timestamp, expiry, data" - " FROM owned_messages WHERE hash = ?"); - st->bindNoCopy(1, msg_hash); - return get_message(*impl, st); -} - StoreResult Database::store(const message& msg, std::chrono::system_clock::time_point* expiry) { auto impl = get_impl(true); @@ -679,17 +815,6 @@ StoreResult Database::store(const message& msg, std::chrono::system_clock::time_ owner_id = impl->prepared_get( "INSERT INTO owners (pubkey, type) VALUES (?, ?) RETURNING id", msg.pubkey); - // When storing to a public namespace we clear anything there (except for a duplicate, to - // avoid unnecessary storage churn). - if (is_public_outbox_namespace(msg.msg_namespace)) { - impl->prepared_exec( - "DELETE FROM messages" - " WHERE owner = ? AND namespace = ? AND hash != ?", - owner_id, - msg.msg_namespace, - msg.hash); - } - auto new_exp = to_epoch_ms(msg.expiry); if (auto existing = exec_and_maybe_get( @@ -706,15 +831,25 @@ StoreResult Database::store(const message& msg, std::chrono::system_clock::time_ if (expiry) *expiry = from_epoch_ms(exp); } else { - impl->prepared_exec( + auto rows = impl->prepared_exec( "INSERT INTO messages (owner, hash, namespace, timestamp, expiry, data)" - " VALUES (?, ?, ?, ?, ?, ?)", + " VALUES (?, ?, ?, ?, ?, ?)" + " ON CONFLICT (owner, namespace) WHERE namespace < 0 AND namespace % 20 = -1" + " DO UPDATE SET" + " hash = EXCLUDED.hash, timestamp = EXCLUDED.timestamp," + " expiry = EXCLUDED.expiry, data = EXCLUDED.data" + " WHERE EXCLUDED.timestamp > messages.timestamp", owner_id, msg.hash, msg.msg_namespace, to_epoch_ms(msg.timestamp), to_epoch_ms(msg.expiry), blob_binder{msg.data}); + + // did not insert, which means public namespace and not newer + if (rows == 0) + return StoreResult::Obsolete; + ret = StoreResult::New; if (expiry) @@ -768,7 +903,14 @@ void Database::bulk_store(const std::vector& items) { auto insert_message = impl->prepared_st( "INSERT INTO messages (owner, hash, namespace, timestamp, expiry, data)" " VALUES (?, ?, ?, ?, ?, ?)" - " ON CONFLICT DO NOTHING"); + " ON CONFLICT (hash)" + " DO UPDATE SET" + " expiry = MAX(EXCLUDED.expiry, messages.expiry)" + " ON CONFLICT (owner, namespace) WHERE namespace < 0 AND namespace % 20 = -1" + " DO UPDATE SET" + " hash = EXCLUDED.hash, timestamp = EXCLUDED.timestamp," + " expiry = EXCLUDED.expiry, data = EXCLUDED.data" + " WHERE EXCLUDED.timestamp > messages.timestamp"); for (auto& m : items) { if (!m.pubkey) @@ -869,7 +1011,7 @@ std::vector Database::retrieve_all() { get( st); results.emplace_back( - impl->load_pubkey(type, pubkey), + load_pubkey(type, pubkey), std::move(hash), ns, from_epoch_ms(ts), @@ -1191,4 +1333,166 @@ void oxenss::Database::test_suite_block_for(std::chrono::milliseconds duration) std::this_thread::sleep_for(duration); } +int64_t Database::add_retry_request( + const crypto::legacy_pubkey& key, + const std::string& cmd, + const std::string& payload, + int64_t req_id) { + auto impl = get_impl(/*write =*/true); + + // insert into request table if not present + if (req_id == 0) { + req_id = impl->prepared_get( + "INSERT INTO retry_requests (command, payload) values (?,?) RETURNING id", + cmd, + payload); + } + + // first retry 5 seconds after insertion, subsequent retries will be 60 seconds after the last. + impl->prepared_exec( + "INSERT INTO retry_node_reqs (rr_id, pubkey, next_retry) VALUES(?, ?, unixepoch('now', " + "'subsec') + 5)", + req_id, + key.str()); + + return req_id; +} + +void Database::foreach_ready_retry_request(std::function< + void(const crypto::legacy_pubkey& key, + const std::string& cmd, + const std::string& payload, + int64_t req_id)> callback) { + auto impl = get_impl(/*write =*/true); + + auto stmt = impl->prepared_st( + "SELECT * from retry_node_reqs WHERE next_retry < unixepoch('now', 'subsec')"); + + using sql_duration = std::chrono::duration>; + double now = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // retry 60 seconds after this retry. Initial retries are staggered (5sec after timeout), + // but it doesn't seem useful to stagger here. Further, it would be a pain to do so after + // restart. Could update this time if/when the retry fails, but here seems more convenient. + auto next_time = now + 60; + while (stmt->executeStep()) { + auto [req_id, key_str, cmd, payload, next_retry] = + get(stmt); + auto key = crypto::legacy_pubkey::from_bytes(key_str); + impl->prepared_exec("UPDATE retry_node_requests SET next_retry = ?", next_time); + + callback(key, cmd, payload, req_id); + } +} + +int64_t Database::retry_request_count() { + auto impl = get_impl(/*write =*/false); + return impl->prepared_get("SELECT COUNT(*) from retry_node_reqs"); +} + +void Database::foreach_swarm_message( + std::function&)> callback, + uint64_t lower_bound, + uint64_t upper_bound, + bool zero_inclusive) { + + if (lower_bound > upper_bound) { + foreach_swarm_message(callback, lower_bound, std::numeric_limits::max()); + foreach_swarm_message(callback, 0, upper_bound, /*zero_inclusive=*/true); + return; + } + + auto impl = get_impl(/*write =*/false); + + constexpr size_t batch_size = 100; + + std::optional statement; + + // weird case of their exists exactly one swarm, which should be impossible + if (lower_bound == upper_bound) { + statement = SQLite::Statement{ + impl->db, + "SELECT type, pubkey, hash, namespace, timestamp, expiry, data" + " FROM owned_messages ORDER BY mid"}; + } else { + // there's probably a better way to do this, but it should be fine + std::string query = R"( +SELECT type, pubkey, hash, namespace, timestamp, expiry, data +FROM owned_messages +JOIN owners ON oid = id +WHERE + )"; + query += R"( + (owners.swarm_space_hi >{0} ?1 OR (owners.swarm_space_hi == ?1 AND owners.swarm_space_lo >{0} ?2)) + AND + (owners.swarm_space_hi <= ?3 OR (owners.swarm_space_hi == ?3 AND owners.swarm_space_lo <= ?4)) +ORDER BY mid; + )"_format(zero_inclusive ? "=" : ""); + + statement = SQLite::Statement{impl->db, query}; + + int pos = 1; + statement->bind(pos++, (int64_t)(lower_bound >> 32)); + statement->bind(pos++, (int64_t)(lower_bound & 0xffffffff)); + statement->bind(pos++, (int64_t)(upper_bound >> 32)); + statement->bind(pos++, (int64_t)(upper_bound & 0xffffffff)); + } + + auto& st = *statement; + std::vector messages; + while (st.executeStep()) { + auto [type, pubkey, hash, ns, ts, exp, data] = + get( + st); + messages.emplace_back( + load_pubkey(type, pubkey), + std::move(hash), + ns, + from_epoch_ms(ts), + from_epoch_ms(exp), + std::move(data)); + if (messages.size() >= batch_size) { + callback(messages); + messages.clear(); + } + } + if (messages.size()) + callback(messages); +} + +void Database::remove_node_retry_request(int64_t req_id) { + auto impl = get_impl(/*write =*/true); + impl->prepared_exec("DELETE FROM retry_node_reqs WHERE id = ?", req_id); +} + +void Database::remove_expired_retry_requests(std::chrono::system_clock::time_point now) { + auto impl = get_impl(/*write =*/true); + + // FIXME: retry requests don't have an expiry, so we need to pick a good expiration time + // for these retries. For now, using 4 hours ago. Tests will pass 4 hours from + // now. + impl->prepared_exec("DELETE FROM retry_requests WHERE created < ?", to_epoch_double(now - 4h)); +} + +void Database::update_current_swarm(uint64_t swarm_id) { + auto as_hex = oxenc::bt_serialize(swarm_id); + auto impl = get_impl(/*write =*/true); + impl->prepared_exec( + "INSERT OR REPLACE INTO state_kv (key, value) VALUES ('swarm_id', ?)", as_hex); +} + +std::optional Database::get_current_swarm() { + auto impl = get_impl(/*write =*/false); + try { + auto as_hex = impl->prepared_get( + "SELECT value FROM state_kv WHERE key = 'swarm_id'"); + return oxenc::bt_deserialize(as_hex); + } catch (const std::exception& e) { + return std::nullopt; + } + return std::nullopt; +} + } // namespace oxenss diff --git a/oxenss/storage/database.hpp b/oxenss/storage/database.hpp index 18c2fa78f..324e9af0f 100644 --- a/oxenss/storage/database.hpp +++ b/oxenss/storage/database.hpp @@ -16,6 +16,7 @@ #include #include #include +#include "oxenss/crypto/keys.h" namespace oxenss { @@ -29,9 +30,17 @@ enum class StoreResult { New, // Message did not exist and was inserted. Extended, // Message existed, but the expiry was extended to match the stored timestamp. Exists, // Message exists and already has an expiry >= the stored one. + Obsolete, // Newer message exists and message type is singleton (e.g. public outbox) Full, // Can't insert right now because the database is full. }; +inline std::atomic tmp_init_db_version = 0; + +enum class BlobType { + Swarms, + RetryableRequests, +}; + // Storage database class. class Database { std::stack> impl_pool_; @@ -49,6 +58,12 @@ class Database { // keep track of db full errors so we don't print them on every store std::atomic db_full_counter = 0; + // True if swarm state was already persisted in the database when it was opened. + // On the first swarm update after startup, this prevents spurious DB dump requests + // to peers who only appear as new members because swarm state was not persisted + // in pre-migration databases. + bool _had_swarm_state_on_open = false; + public: // Recommended period for calling clean_expired() static constexpr auto CLEANUP_PERIOD = 10s; @@ -61,6 +76,8 @@ class Database { ~Database(); + bool had_swarm_state_on_open() const { return _had_swarm_state_on_open; } + // if the database is full then print an error only once ever N errors static constexpr int DB_FULL_FREQUENCY = 100; @@ -100,6 +117,11 @@ class Database { // Retrieves all messages. std::vector retrieve_all(); + enum class GetMessageCount { + All, + Owned, // Only messages that belong to this node's swarm + }; + // Return the total number of messages stored int64_t get_message_count(); @@ -122,9 +144,6 @@ class Database { // bound on actual stored size as there may be partially filled pages. int64_t get_used_bytes(); - // Get random message. Returns nullopt if there are no messages. - std::optional retrieve_random(); - // Get message by `msg_hash`, return true if found. Note that this does *not* filter by // pubkey or namespace! std::optional retrieve_by_hash(const std::string& msg_hash); @@ -210,6 +229,50 @@ class Database { // found are not included). std::map get_expiries( const user_pubkey& pubkey, const std::vector& msg_hashes); + + // Adds a request retry to the database, to be retried later. If req_id is specified, this + // is a subsequent failure on the same request. It's not great to leak database table indices + // into the rest of the code if avoidable, but deduplication would be otherwise tedious. + int64_t add_retry_request( + const crypto::legacy_pubkey& key, + const std::string& cmd, + const std::string& payload, + int64_t req_id = 0); + + // executes the provided callback for each request retry in the database which ready to retry. + // The table id is provided so the callback can call remove_retry_request on success. + void foreach_ready_retry_request(std::function< + void(const crypto::legacy_pubkey& key, + const std::string& cmd, + const std::string& payload, + int64_t req_id)>); + + // This is just for the test suite, as using "ready retry requests" as above would require it + // to take several seconds longer to execute, per call. + int64_t retry_request_count(); + + // executes the provided callback for every swarm message (in batches) for the swarm with the + // given swarm space boundaries. The lower bound is exclusive; the upper inclusive. + // if the lower bound is higher than the upper bound (i.e. overflow wrapping), will be called + // recursively on both sides of the overflow. In this case, zero as the lower bound *will* + // be inclusive + void foreach_swarm_message( + std::function&)> callback, + uint64_t lower_bound, + uint64_t upper_bound, + bool zero_inclusive = false); + + // Remove the specified request retry. This is one node's retry request, not the request + // itself -- if no more nodes need the request retried it will be removed as well. + void remove_node_retry_request(int64_t req_id); + + // the `now` argument here only exists for the test suite; do not use it. + void remove_expired_retry_requests( + std::chrono::system_clock::time_point now = std::chrono::system_clock::now()); + + void update_current_swarm(uint64_t swarm_id); + + std::optional get_current_swarm(); }; } // namespace oxenss diff --git a/oxenss/utils/time.hpp b/oxenss/utils/time.hpp index 59c40b6dc..8a096a8b0 100644 --- a/oxenss/utils/time.hpp +++ b/oxenss/utils/time.hpp @@ -9,6 +9,10 @@ inline int64_t to_epoch_ms(std::chrono::system_clock::time_point t) { return std::chrono::duration_cast(t.time_since_epoch()).count(); } +inline double to_epoch_double(std::chrono::system_clock::time_point t) { + return std::chrono::duration{t.time_since_epoch()}.count(); +} + inline std::chrono::system_clock::time_point from_epoch_ms(int64_t t) { return std::chrono::system_clock::time_point{std::chrono::milliseconds{t}}; } diff --git a/unit_test/CMakeLists.txt b/unit_test/CMakeLists.txt index 79eb1db82..6fc1ad8a4 100644 --- a/unit_test/CMakeLists.txt +++ b/unit_test/CMakeLists.txt @@ -19,3 +19,7 @@ target_link_libraries(Test Catch2::Catch2) target_include_directories(Test PRIVATE ..) + +add_executable(migrate_test migrate_test.cpp) +target_link_libraries(migrate_test PRIVATE common storage utils crypto snode rpc server) +target_include_directories(migrate_test PRIVATE ..) diff --git a/unit_test/create_old_db.py b/unit_test/create_old_db.py new file mode 100644 index 000000000..c2be547e5 --- /dev/null +++ b/unit_test/create_old_db.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +""" +Creates a historical test database for exercising schema migrations. +Run this first, then run migrate_test against the output path. + +Usage: python3 create_old_db.py [schema] [output_dir] + schema : which historical schema to create (default: pre-swarm-space) + output_dir : where to write storage.db (default: /tmp/test_migration_) + +Available schemas: + + pre-swarm-space (as of v2.11.3) + owners(id INTEGER PK, type INTEGER, pubkey BLOB, UNIQUE(pubkey, type)) + messages(id INTEGER PK, hash TEXT UNIQUE, owner→owners, namespace INTEGER, + timestamp INTEGER, expiry INTEGER, data BLOB) + revoked_subaccounts(owner→owners, token BLOB, timestamp INTEGER) + + post-swarm-space (current) + owners: added swarm_space_hi INTEGER, swarm_space_lo INTEGER + (upper/lower 32-bit halves of pubkey_to_swarm_space(); populated on migration + via custom SQLite functions func_swarm_space_hi/lo registered by C++ at open time) + new trigger: swarm_space_trigger auto-populates these on INSERT + new indices: owners_swarm_hi, owners_swarm_lo + messages: public outbox namespaces (namespace < 0 AND namespace % 20 = -1, i.e. -1,-21,-41,…) + cleared entirely, then UNIQUE INDEX message_outbox_singleton added on + (owner, namespace) — enforces singleton behaviour going forward + new tables: retry_requests(id, command, payload, created) + retry_pubkeys(id, pubkey UNIQUE) + retry_node_requests(id, rr_id→retry_requests, pk_id→retry_pubkeys, + next_retry, UNIQUE(rr_id,pk_id)) + new view+triggers: retry_node_reqs (insert view), retry_node_add, rr_cleanup + new table: state_kv(key TEXT UNIQUE, value TEXT) — generic persistent key/value store +""" + +import sqlite3, os, sys, time + + +def create_pre_swarm_space(c): + c.executescript(""" +CREATE TABLE owners ( + id INTEGER PRIMARY KEY, + type INTEGER NOT NULL, + pubkey BLOB NOT NULL, + UNIQUE(pubkey, type) +); + +CREATE TABLE messages ( + id INTEGER PRIMARY KEY, + hash TEXT NOT NULL, + owner INTEGER NOT NULL REFERENCES owners(id), + namespace INTEGER NOT NULL DEFAULT 0, + timestamp INTEGER NOT NULL, + expiry INTEGER NOT NULL, + data BLOB NOT NULL, + UNIQUE(hash) +); + +CREATE TABLE revoked_subaccounts ( + owner INTEGER REFERENCES owners(id) ON DELETE CASCADE, + token BLOB NOT NULL, + timestamp INTEGER NOT NULL DEFAULT (CAST((julianday('now') - 2440587.5)*86400000 AS INTEGER)) +); +""") + + # Pubkeys: 32-byte blobs (type prefix stored separately in the type column). + # swarm_space = XOR of four big-endian uint64 chunks of the pubkey bytes. + pk_100 = bytes(31) + bytes([0x64]) # swarm_space=100 (hi=0, lo=100) + pk_1 = bytes(31) + bytes([0x01]) # swarm_space=1 (hi=0, lo=1) + pk_0 = bytes(32) # swarm_space=0 (hi=0, lo=0) + pk_maxu64 = bytes(24) + bytes([0xff]*8) # swarm_space=UINT64_MAX + + now_ms = int(time.time() * 1000) + future_ms = now_ms + 86400_000 + + def ins_owner(pk, t=5): + c.execute("INSERT INTO owners (type, pubkey) VALUES (?, ?)", (t, pk)) + return c.lastrowid + + def ins_msg(owner_id, ns, h, d=b"data"): + c.execute("INSERT INTO messages (hash, owner, namespace, timestamp, expiry, data)" + " VALUES (?, ?, ?, ?, ?, ?)", (h, owner_id, ns, now_ms, future_ms, d)) + + o1 = ins_owner(pk_100) # swarm_space=100 + o2 = ins_owner(pk_1) # swarm_space=1 + o3 = ins_owner(pk_0) # swarm_space=0 + o4 = ins_owner(pk_maxu64) # swarm_space=UINT64_MAX + + # o1: regular namespaces only — all messages survive migration + ins_msg(o1, 0, "o1_ns0") + ins_msg(o1, 5, "o1_ns5") + + # o2: one public outbox message (ns=-1) — deleted by migration + ins_msg(o2, -1, "o2_ns-1") + + # o3: multiple public outbox messages + non-outbox negative ns + ins_msg(o3, -1, "o3_ns-1_a") # deleted (public outbox) + ins_msg(o3, -1, "o3_ns-1_b") # deleted (public outbox, same ns) + ins_msg(o3, -21, "o3_ns-21") # deleted (also public outbox: -21 % 20 = -1) + ins_msg(o3, -2, "o3_ns-2") # survives (-2 % 20 = -2, not public outbox) + + # o4: mix of outbox and non-outbox + ins_msg(o4, -1, "o4_ns-1_a") # deleted + ins_msg(o4, -1, "o4_ns-1_b") # deleted + ins_msg(o4, 10, "o4_ns10") # survives + + +SCHEMAS = { + 'pre-swarm-space': create_pre_swarm_space, +} + +schema = sys.argv[1] if len(sys.argv) > 1 else 'pre-swarm-space' +db_dir = sys.argv[2] if len(sys.argv) > 2 else f'/tmp/test_migration_{schema}' +db_path = os.path.join(db_dir, 'storage.db') + +if schema not in SCHEMAS: + print(f"Unknown schema '{schema}'. Available: {', '.join(SCHEMAS)}") + sys.exit(1) + +os.makedirs(db_dir, exist_ok=True) +if os.path.exists(db_path): + os.remove(db_path) + +conn = sqlite3.connect(db_path) +c = conn.cursor() + +SCHEMAS[schema](c) + +conn.commit() + +print(f"=== PRE-MIGRATION STATE ({schema}) ===") +print(f"owners columns : {[r[1] for r in c.execute('PRAGMA table_info(owners)')]}") +print(f"owners : {c.execute('SELECT id, type FROM owners').fetchall()}") +print(f"messages : {[(r[0], r[1]) for r in c.execute('SELECT namespace, hash FROM messages ORDER BY hash')]}") +print(f"\nDatabase written to: {db_path}") +print(f"Now run: ./migrate_test {db_dir}") + +conn.close() diff --git a/unit_test/migrate_test.cpp b/unit_test/migrate_test.cpp new file mode 100644 index 000000000..e6edf4ab6 --- /dev/null +++ b/unit_test/migrate_test.cpp @@ -0,0 +1,22 @@ +// Standalone migration test: open a pre-existing database, trigger migration, print results. +#include +#include +#include + +int main(int argc, char* argv[]) { + if (argc < 2) { + std::cerr << "Usage: migrate_test \n"; + return 1; + } + std::string path = argv[1]; + std::cout << "Opening: " << path << "\n"; + try { + oxenss::Database db{path}; + std::cout << "had_swarm_state_on_open: " << db.had_swarm_state_on_open() << "\n"; + std::cout << "Migration complete.\n"; + } catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << "\n"; + return 1; + } + return 0; +} diff --git a/unit_test/serialization.cpp b/unit_test/serialization.cpp index ce250bc56..7e96f6a42 100644 --- a/unit_test/serialization.cpp +++ b/unit_test/serialization.cpp @@ -1,9 +1,11 @@ #include #include #include +#include #include +#include #include #include @@ -64,9 +66,42 @@ TEST_CASE("v1 serialization - batch serialization", "[serialization]") { auto first = serialized.front(); const size_t num_messages = (SERIALIZATION_BATCH_SIZE / (serialized.front().size() - 2)) + 1; msgs = {num_messages, msgs.front()}; - serialized = serialize_messages(msgs.begin(), msgs.end(), 1); + serialized = serialize_messages(msgs.begin(), msgs.end(), SERIALIZATION_VERSION_BT); CHECK(serialized.size() == 1); msgs.push_back(msgs.front()); - serialized = serialize_messages(msgs.begin(), msgs.end(), 1); + serialized = serialize_messages(msgs.begin(), msgs.end(), SERIALIZATION_VERSION_BT); CHECK(serialized.size() == 2); } + +TEST_CASE("v1 serialization - message payload 100MiB", "[serialization]") { + oxenss::user_pubkey pub_key; + REQUIRE(pub_key.load("054368520005786b249bcd461d28f75e560ea794014eeb17fcf6003f37d876783e"s)); + + const std::chrono::system_clock::time_point timestamp{1'622'576'077s}; + oxenss::message base_msg{ + pub_key, + "hash", + oxenss::namespace_id::Default, + timestamp, + timestamp + 24h, + std::string(1 * 1024 * 1024 /*1MiB*/, 'x')}; + std::vector msg_list(100, base_msg); // 100 MiB total + + auto begin = std::chrono::high_resolution_clock::now(); + auto serialized = + serialize_messages(msg_list.begin(), msg_list.end(), SERIALIZATION_VERSION_BT); + auto elapsed = std::chrono::high_resolution_clock::now() - begin; + + size_t total_bytes = msg_list.size() * base_msg.data.size(); + std::string total_bytes_str = oxenss::util::get_human_readable_bytes(total_bytes); + double total_gbs = static_cast(total_bytes) / (1024 * 1024 * 1024); + double gbs_per_s = + total_gbs / std::chrono::duration_cast(elapsed).count(); + + fmt::println( + "Messages: {}; Size: {}; Elapsed: {}; Rate: {:.2f} GiB/s", + msg_list.size(), + oxenss::util::get_human_readable_bytes(total_bytes), + std::chrono::duration_cast(elapsed), + gbs_per_s); +} diff --git a/unit_test/storage.cpp b/unit_test/storage.cpp index 96460481b..3ca4b1edf 100644 --- a/unit_test/storage.cpp +++ b/unit_test/storage.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -10,14 +11,19 @@ #include #include +#include "oxenss/utils/time.hpp" using namespace oxenss; using namespace std::literals; struct StorageDeleter { + bool delete_it = true; StorageDeleter() { std::filesystem::remove("storage.db"); } - ~StorageDeleter() { std::filesystem::remove("storage.db"); } + ~StorageDeleter() { + if (delete_it) + std::filesystem::remove("storage.db"); + } }; TEST_CASE("storage - database file creation", "[storage]") { @@ -427,3 +433,60 @@ TEST_CASE("storage - connection pool", "[storage][pool]") { // returned to the pool: CHECK(oxenss::TestSuiteHacks::db_pool_size(storage) == 1 + n_blocked_threads); } + +TEST_CASE("storage - current swarm", "[storage]") { + StorageDeleter fixture; + + Database storage{"."}; + + // new db has no current swarm + CHECK(storage.get_current_swarm() == std::nullopt); + + storage.update_current_swarm(12345); + + CHECK(*(storage.get_current_swarm()) == 12345); +} + +TEST_CASE("storage - retry requests", "[storage]") { + StorageDeleter fixture; + fixture.delete_it = false; + oxenss::crypto::legacy_pubkey pubkey, pubkey2; + pubkey.load_from_hex("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"); + pubkey2.load_from_hex("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcde0"); + + Database storage{"."}; + + // first row id = 1 + CHECK(storage.add_retry_request(pubkey, "foo", "bar") == 1); + + // unique on retry id and pubkey + CHECK_THROWS(storage.add_retry_request(pubkey, "foo", "bar", 1)); + // this will silently fail, but also should never be done. + CHECK_THROWS(storage.add_retry_request(pubkey, "foo", "bar", 2)); + + auto req_count = storage.retry_request_count(); + CHECK(req_count == 1); + + storage.add_retry_request(pubkey2, "foo", "bar"); + req_count = storage.retry_request_count(); + CHECK(req_count == 2); + + storage.add_retry_request(pubkey, "bits", "bits"); + req_count = storage.retry_request_count(); + CHECK(req_count == 3); + + std::this_thread::sleep_for(500ms); + // FIXME: "expiry" is currently 4h, this is incredibly arbitrary and should be considered + // further. + auto the_future = std::chrono::system_clock::now() + 4h; + + std::this_thread::sleep_for( + 500ms); // the following insert should *not* be considered "expired" + CHECK_NOTHROW(storage.add_retry_request(pubkey, "fools", "barred") == 4); + req_count = storage.retry_request_count(); + CHECK(req_count == 4); + // remove expired, pretending it's 4h (minus the sleep) from now + CHECK_NOTHROW(storage.remove_expired_retry_requests(the_future)); + req_count = storage.retry_request_count(); + CHECK(req_count == 1); +} diff --git a/unit_test/swarm.cpp b/unit_test/swarm.cpp index b0cf30f22..55c809055 100644 --- a/unit_test/swarm.cpp +++ b/unit_test/swarm.cpp @@ -13,47 +13,54 @@ using namespace std::literals; using ip_ports = std::tuple; +using oxenss::snode::INVALID_SWARM_ID; using oxenss::snode::Network; using oxenss::snode::Swarm; TEST_CASE("swarm - pubkey to swarm space", "[swarm]") { oxenss::user_pubkey pk; REQUIRE(pk.load("053506f4a71324b7dd114eddbf4e311f39dde243e1f2cb97c40db1961f70ebaae8")); - CHECK(Network::pubkey_to_swarm_space(pk) == 17589930838143112648ULL); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 17589930838143112648ULL); REQUIRE(pk.load("05cf27da303a50ac8c4b2d43d27259505c9bcd73fc21cf2a57902c3d050730b604")); - CHECK(Network::pubkey_to_swarm_space(pk) == 10370619079776428163ULL); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 10370619079776428163ULL); REQUIRE(pk.load("03d3511706b8b34f6e8411bf07bd22ba6b2435ca56846fbccf6eb1e166a6cd15cc")); - CHECK(Network::pubkey_to_swarm_space(pk) == 2144983569669512198ULL); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 2144983569669512198ULL); REQUIRE(pk.load("ff0f06693428fca9102a451e3f28d9cc743d8ea60a89ab6aa69eb119470c11cbd3")); - CHECK(Network::pubkey_to_swarm_space(pk) == 9690840703409570833ULL); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 9690840703409570833ULL); REQUIRE(pk.load("05ffba630924aa1224bb930dde21c0d11bf004608f2812217f8ac812d6c7e3ad48")); - CHECK(Network::pubkey_to_swarm_space(pk) == 4532060000165252872ULL); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 4532060000165252872ULL); REQUIRE(pk.load("05eeeeeeeeeeeeeeee777777777777777711111111111111118888888888888888")); - CHECK(Network::pubkey_to_swarm_space(pk) == 0); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 0); REQUIRE(pk.load("050123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")); - CHECK(Network::pubkey_to_swarm_space(pk) == 0); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 0); REQUIRE(pk.load("05fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe")); - CHECK(Network::pubkey_to_swarm_space(pk) == 1); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 1); REQUIRE(pk.load("05ffffffffffffffffffffffffffffffffffffffffffffffff7fffffffffffffff")); - CHECK(Network::pubkey_to_swarm_space(pk) == 1ULL << 63); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 1ULL << 63); REQUIRE(pk.load("05000000000000000000000000000000000000000000000000ffffffffffffffff")); - CHECK(Network::pubkey_to_swarm_space(pk) == (uint64_t)-1); + CHECK(oxenss::pubkey_to_swarm_space(pk) == INVALID_SWARM_ID); REQUIRE(pk.load("050000000000000000000000000000000000000000000000000123456789abcdef")); - CHECK(Network::pubkey_to_swarm_space(pk) == 0x0123456789abcdefULL); + CHECK(oxenss::pubkey_to_swarm_space(pk) == 0x0123456789abcdefULL); } +struct StorageDeleter { + StorageDeleter() { std::filesystem::remove("storage.db"); } + ~StorageDeleter() { std::filesystem::remove("storage.db"); } +}; + TEST_CASE("service nodes - pubkey to swarm id") { const auto fake_pk = oxenss::crypto::legacy_pubkey::from_hex( "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"); oxenmq::OxenMQ omq; Network network{omq}; - Swarm swarm{network, fake_pk}; + oxenss::Database db{"."}; // unused here, but required by Swarm + Swarm swarm{network, fake_pk, db}; using oxenss::snode::swarms_t; swarms_t swarms; for (oxenss::snode::swarm_id_t s : {100, 200, 300, 399, 498, 596, 694}) swarms[s]; - swarm.update_swarms(swarms_t{swarms}, {}); + swarm.update_swarms(0, swarms_t{swarms}, {}); oxenss::user_pubkey pk; @@ -82,7 +89,8 @@ TEST_CASE("service nodes - pubkey to swarm id") { REQUIRE(pk.load("05000000000000000000000000000000000000000000000000a000000000000000")); CHECK(network.get_swarm_id_for(pk).value() == 100); - // This is the invalid swarm id for swarms, but should still work for a client + // A pubkey whose swarm space == INVALID_SWARM_ID is not a valid swarm id, but *is* a valid + // swarm space value REQUIRE(pk.load("05000000000000000000000000000000000000000000000000ffffffffffffffff")); CHECK(network.get_swarm_id_for(pk).value() == 100); @@ -147,7 +155,7 @@ TEST_CASE("service nodes - pubkey to swarm id") { // *sigh*). oxenss::snode::swarm_id_t wrapped_swarm = (uint64_t)-20; swarms[wrapped_swarm]; - swarm.update_swarms(swarms_t{swarms}, {}); + swarm.update_swarms(0, swarms_t{swarms}, {}); REQUIRE(pk.load("050000000000000000000000000000000000000000000000000000000000000027")); CHECK(network.get_swarm_id_for(pk).value() == swarms.rbegin()->first); REQUIRE(pk.load("050000000000000000000000000000000000000000000000000000000000000028")); @@ -155,14 +163,162 @@ TEST_CASE("service nodes - pubkey to swarm id") { REQUIRE(pk.load("050000000000000000000000000000000000000000000000000000000000000029")); CHECK(network.get_swarm_id_for(pk).value() == swarms.begin()->first); - // The code used to have a broken edge case if we have a swarm at zero and a client at max-u64 - // because of an overflow in how the distance is calculated (the first swarm will be calculated - // as max-u64 away, rather than 1 away), and so the id always maps to the highest swarm (even - // though 0xfff...fe maps to the lowest swarm; the first check here, then, would fail. + // The code used to have a broken edge case if we have a swarm at zero and a client at + // INVALID_SWARM_ID (UINT64_MAX) because of an overflow in how the distance is calculated (the + // first swarm will be calculated as UINT64_MAX away (i.e. -1), rather than 1 away), and so the + // id always maps to the highest swarm (even though 0xfff...fe maps to the lowest swarm); the + // first check here, then, would fail. swarms[0]; - swarm.update_swarms(swarms_t{swarms}, {}); + swarm.update_swarms(0, swarms_t{swarms}, {}); REQUIRE(pk.load("05000000000000000000000000000000000000000000000000ffffffffffffffff")); CHECK(network.get_swarm_id_for(pk).value() == 0); REQUIRE(pk.load("05000000000000000000000000000000000000000000000000fffffffffffffffe")); CHECK(network.get_swarm_id_for(pk).value() == 0); } + +TEST_CASE( + "service nodes - swarm id to swarm space, boundaries near 0 and INVALID_SWARM_ID", + "[swarm]") { + const auto fake_pk = oxenss::crypto::legacy_pubkey::from_hex( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"); + oxenmq::OxenMQ omq; + Network network{omq}; + oxenss::Database db{"."}; + Swarm swarm{network, fake_pk, db}; + + using oxenss::snode::swarm_id_t; + using oxenss::snode::swarms_t; + + // INVALID_SWARM_ID (UINT64_MAX) cannot be a swarm id; INVALID_SWARM_ID - 1 can. + const swarm_id_t near_max = INVALID_SWARM_ID - 1; + swarms_t swarms; + for (swarm_id_t s : {swarm_id_t{1}, swarm_id_t{100}, swarm_id_t{694}, near_max}) + swarms[s]; + swarm.update_swarms(0, swarms_t{swarms}, {}); + + // swarm 1 (prev=near_max, next=100): + // left_diff = 3 (odd, rounded up to 4); lo = 1 - 2 = INVALID_SWARM_ID (UINT64_MAX) + // right_diff = 99; hi = 50 + // lo == INVALID_SWARM_ID (UINT64_MAX), which is a valid swarm space position. + // Range wraps: (INVALID_SWARM_ID, 50] = [0, 50] since nothing exceeds INVALID_SWARM_ID. + auto b_1 = network.get_swarm_boundaries(1); + REQUIRE(b_1.first == INVALID_SWARM_ID); + REQUIRE(b_1.second == 50); + + // swarm 100 (prev=1, next=694): + // left_diff = 99 (odd, rounded up to 100); lo = 50; right_diff = 594; hi = 397 + auto b_100 = network.get_swarm_boundaries(100); + REQUIRE(b_100.first == 50); + REQUIRE(b_100.second == 397); + + // swarm 694 (prev=100, next=near_max): + // left_diff = 594 (even); lo = 397 + // right_diff = near_max - 694 (large, even); hi = 0x800000000000015A + auto b_694 = network.get_swarm_boundaries(694); + REQUIRE(b_694.first == 397); + REQUIRE(b_694.second == 0x800000000000015AULL); + + // swarm near_max (prev=694, next=1): + // left_diff = near_max - 694 (large, even); lo = 0x800000000000015A + // right_diff = 3; hi = near_max + 1 = INVALID_SWARM_ID (UINT64_MAX) + // hi lands exactly on INVALID_SWARM_ID (UINT64_MAX): a valid swarm space position even + // though it cannot be a swarm id. near_max owns INVALID_SWARM_ID as a swarm space position. + auto b_near_max = network.get_swarm_boundaries(near_max); + REQUIRE(b_near_max.first == 0x800000000000015AULL); + REQUIRE(b_near_max.second == INVALID_SWARM_ID); + + // Shared boundaries + REQUIRE(b_694.second == b_near_max.first); // 0x800000000000015A + REQUIRE(b_near_max.second == + b_1.first); // INVALID_SWARM_ID (UINT64_MAX): near_max's hi == swarm 1's lo +} + +TEST_CASE( + "service nodes - swarm id to swarm space, minimal 0 and INVALID_SWARM_ID - 1", "[swarm]") { + const auto fake_pk = oxenss::crypto::legacy_pubkey::from_hex( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"); + oxenmq::OxenMQ omq; + Network network{omq}; + oxenss::Database db{"."}; + Swarm swarm{network, fake_pk, db}; + + using oxenss::snode::swarm_id_t; + using oxenss::snode::swarms_t; + + swarms_t swarms; + for (swarm_id_t s : {swarm_id_t{0}, INVALID_SWARM_ID - 1}) + swarms[s]; + swarm.update_swarms(0, swarms_t{swarms}, {}); + + // Two swarms split the space exactly in half. + // swarm 0: left_diff=2 (even); lo = 0 - 1 = INVALID_SWARM_ID (UINT64_MAX); hi = + // 0x7FFFFFFFFFFFFFFF swarm INVALID_SWARM_ID-1: lo = 0x7FFFFFFFFFFFFFFF; hi = INVALID_SWARM_ID - + // 1 + 1 = INVALID_SWARM_ID INVALID_SWARM_ID is a valid swarm space position, owned here by + // swarm INVALID_SWARM_ID - 1. + auto b_0 = network.get_swarm_boundaries(0); + REQUIRE(b_0.first == INVALID_SWARM_ID); + REQUIRE(b_0.second == 0x7FFFFFFFFFFFFFFFULL); + + auto b_max = network.get_swarm_boundaries(INVALID_SWARM_ID - 1); + REQUIRE(b_max.first == 0x7FFFFFFFFFFFFFFFULL); + REQUIRE(b_max.second == INVALID_SWARM_ID); + + REQUIRE(b_0.second == b_max.first); // shared midpoint + REQUIRE(b_max.second == b_0.first); // shared boundary at INVALID_SWARM_ID (UINT64_MAX) +} + +// A round-trip test against "service nodes - pubkey to swarm id" is not needed here. +// Both get_swarm_boundaries and _find_swarm_for_swarm_space use consistent uint64_t modular +// arithmetic, and the wrapping range case (lo > hi, crossing INVALID_SWARM_ID (UINT64_MAX)) is +// already exercised by swarm 100's boundaries below. INVALID_SWARM_ID is a valid swarm space +// position but is assumed (and enforced elsewhere) to never be a swarm id, so no additional edge +// cases exist. +TEST_CASE("service nodes - swarm id to swarm space (pubkey range)") { + const auto fake_pk = oxenss::crypto::legacy_pubkey::from_hex( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"); + oxenmq::OxenMQ omq; + Network network{omq}; + oxenss::Database db{"."}; // unused here, but required by Swarm + Swarm swarm{network, fake_pk, db}; + + using oxenss::snode::swarms_t; + swarms_t swarms; + for (oxenss::snode::swarm_id_t s : {100, 200, 300, 399, 498, 596, 694}) + swarms[s]; + swarm.update_swarms(0, swarms_t{swarms}, {}); + + oxenss::user_pubkey pk; + + auto boundaries = network.get_swarm_boundaries(200); + REQUIRE(boundaries.first == 150); + REQUIRE(boundaries.second == 250); // swarm 300 loses this tie; 250 is inclusive for swarm 200 + + boundaries = network.get_swarm_boundaries(300); + REQUIRE(boundaries.first == 250); + REQUIRE(boundaries.second == 349); + + boundaries = network.get_swarm_boundaries(399); + REQUIRE(boundaries.first == 349); + REQUIRE(boundaries.second == 448); + + boundaries = network.get_swarm_boundaries(498); + REQUIRE(boundaries.first == 448); // left_diff=99 (odd, rounded up to 100), so lo = 498 - 50 + REQUIRE(boundaries.second == 547); // right_diff=98, so hi = 498 + 49 + + boundaries = network.get_swarm_boundaries(596); + REQUIRE(boundaries.first == 547); + REQUIRE(boundaries.second == 645); + + auto boundaries_100 = network.get_swarm_boundaries(100); + REQUIRE(boundaries_100.first == (0x18d + 0x8000000000000000)); + REQUIRE(boundaries_100.second == 150); + + // 694 is the last element; its successor wraps around to 100. + // right_diff = 100 - 694 (uint64 wraparound) = 0xFFFFFFFFFFFFFDAE + // hi = 694 + 0x7FFFFFFFFFFFFED7 = 0x800000000000018D + // 694's upper bound and 100's lower bound must be the same value (shared boundary). + auto boundaries_694 = network.get_swarm_boundaries(694); + REQUIRE(boundaries_694.first == 645); + REQUIRE(boundaries_694.second == (0x18d + 0x8000000000000000)); + REQUIRE(boundaries_694.second == boundaries_100.first); +}