diff --git a/CMakeLists.txt b/CMakeLists.txt index dfe23a6..feff092 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,6 +145,14 @@ if(LANTERN_BUILD_TESTS) ) add_test(NAME lantern_client_pending COMMAND lantern_client_pending_test) + add_executable(lantern_client_gossip_test tests/unit/test_client_gossip.c) + target_link_libraries(lantern_client_gossip_test PRIVATE lantern) + add_test(NAME lantern_client_gossip COMMAND lantern_client_gossip_test) + + add_executable(lantern_genesis_anchor_test tests/unit/test_genesis_anchor.c) + target_link_libraries(lantern_genesis_anchor_test PRIVATE lantern) + add_test(NAME lantern_genesis_anchor COMMAND lantern_genesis_anchor_test) + add_executable(lantern_genesis_bootstrap_test tests/unit/test_genesis_bootstrap.c) target_link_libraries(lantern_genesis_bootstrap_test PRIVATE lantern) target_compile_definitions( @@ -335,6 +343,7 @@ if(LANTERN_BUILD_TESTS) set(_lantern_ctest_targets lantern_client_vote lantern_client_pending + lantern_genesis_anchor lantern_genesis_bootstrap lantern_validator_selection lantern_rlp diff --git a/include/lantern/core/client.h b/include/lantern/core/client.h index 7f645be..7001763 100644 --- a/include/lantern/core/client.h +++ b/include/lantern/core/client.h @@ -64,6 +64,7 @@ struct lantern_client_options { const char *validator_registry_path; const char *nodes_path; const char *genesis_state_path; + bool use_genesis_state; const char *validator_config_path; const char *node_id; const char *node_key_hex; @@ -83,13 +84,14 @@ struct lantern_client_options { struct libp2p_subscription; struct libp2p_protocol_server; struct lantern_peer_status_entry; +struct lantern_active_blocks_request; struct lantern_pending_block { LanternSignedBlock block; LanternRoot root; LanternRoot parent_root; char peer_text[128]; bool parent_requested; - uint32_t parent_request_failures; + uint64_t parent_requested_ms; uint64_t received_ms; uint32_t backfill_depth; }; @@ -114,6 +116,14 @@ struct lantern_pending_block_list { struct lantern_pending_parent_index parent_index; }; +struct lantern_active_blocks_request { + uint64_t request_id; + char peer_id[128]; + uint64_t started_ms; + uint64_t deadline_ms; + bool timeout_recorded; +}; + struct lantern_agg_proof_cache_entry { LanternRoot data_root; LanternAggregatedSignatureProof proof; @@ -237,6 +247,10 @@ struct lantern_client { struct lantern_peer_status_entry *peer_status_entries; size_t peer_status_count; size_t peer_status_capacity; + struct lantern_active_blocks_request *active_blocks_requests; + size_t active_blocks_request_count; + size_t active_blocks_request_capacity; + uint64_t next_blocks_request_id; pthread_mutex_t status_lock; bool status_lock_initialized; bool debug_disable_block_requests; @@ -314,6 +328,13 @@ int lantern_client_debug_record_vote( const LanternSignedVote *vote, const char *peer_id_text); +int lantern_client_debug_gossip_block( + struct lantern_client *client, + const LanternSignedBlock *block); +int lantern_client_debug_gossip_vote( + struct lantern_client *client, + const LanternSignedVote *vote); + int lantern_client_debug_import_block( struct lantern_client *client, const LanternSignedBlock *block, diff --git a/src/core/client.c b/src/core/client.c index 0904736..5fc5b85 100644 --- a/src/core/client.c +++ b/src/core/client.c @@ -225,7 +225,8 @@ void lantern_client_options_init(struct lantern_client_options *options) options->genesis_config_path = LANTERN_DEFAULT_GENESIS_CONFIG; options->validator_registry_path = LANTERN_DEFAULT_VALIDATOR_REGISTRY; options->nodes_path = LANTERN_DEFAULT_NODES_FILE; - options->genesis_state_path = LANTERN_DEFAULT_GENESIS_STATE; + options->genesis_state_path = NULL; + options->use_genesis_state = false; options->validator_config_path = LANTERN_DEFAULT_VALIDATOR_CONFIG; options->node_id = LANTERN_DEFAULT_NODE_ID; options->node_key_hex = NULL; @@ -788,43 +789,11 @@ static bool client_try_genesis_from_pubkeys(struct lantern_client *client) } -/** - * @brief Attempt genesis creation from an SSZ state snapshot. - * - * Decodes the serialized genesis state if provided in the configuration. - * - * @param client Client with loaded genesis artifacts - * - * @return true on success, false if snapshot is missing or decode fails - * - * @note Thread safety: Must run before concurrent access to the state. - */ -static bool client_try_genesis_from_ssz(struct lantern_client *client) -{ - if (!client->genesis.state_bytes || client->genesis.state_size == 0) - { - return false; - } - - if (lantern_ssz_decode_state( - &client->state, - client->genesis.state_bytes, - client->genesis.state_size) - != 0) - { - return false; - } - - client->genesis_fallback_used = false; - return true; -} - - /** * @brief Attempt genesis creation from the validator registry file. * * Builds the genesis state using pubkeys sourced from the registry when the - * explicit pubkey array or SSZ snapshot is unavailable. + * explicit pubkey array is unavailable. * * @param client Client with loaded genesis registry * @@ -1143,8 +1112,9 @@ static lantern_client_error client_finalize_genesis_state(struct lantern_client /** * @brief Build genesis state using the available artifact priority order. * - * Tries embedded pubkeys first, then SSZ snapshot, and finally the validator - * registry. On success, finalizes validator vote structures. + * Tries embedded pubkeys first and then the validator registry. Lantern no + * longer decodes local genesis.ssz for bootstrap so replay/state roots remain + * deterministic from config/registry inputs. * * @param client Client being initialized * @@ -1160,11 +1130,6 @@ static lantern_client_error client_generate_state_from_genesis(struct lantern_cl return client_finalize_genesis_state(client); } - if (client_try_genesis_from_ssz(client)) - { - return client_finalize_genesis_state(client); - } - if (client_try_genesis_from_registry(client)) { return client_finalize_genesis_state(client); @@ -1746,6 +1711,11 @@ static void shutdown_peer_tracking(struct lantern_client *client) client->peer_status_entries = NULL; client->peer_status_count = 0; client->peer_status_capacity = 0; + free(client->active_blocks_requests); + client->active_blocks_requests = NULL; + client->active_blocks_request_count = 0; + client->active_blocks_request_capacity = 0; + client->next_blocks_request_id = 0; pthread_mutex_unlock(&client->status_lock); } else @@ -1754,6 +1724,11 @@ static void shutdown_peer_tracking(struct lantern_client *client) client->peer_status_entries = NULL; client->peer_status_count = 0; client->peer_status_capacity = 0; + free(client->active_blocks_requests); + client->active_blocks_requests = NULL; + client->active_blocks_request_count = 0; + client->active_blocks_request_capacity = 0; + client->next_blocks_request_id = 0; } pthread_mutex_destroy(&client->status_lock); client->status_lock_initialized = false; @@ -1764,6 +1739,11 @@ static void shutdown_peer_tracking(struct lantern_client *client) client->peer_status_entries = NULL; client->peer_status_count = 0; client->peer_status_capacity = 0; + free(client->active_blocks_requests); + client->active_blocks_requests = NULL; + client->active_blocks_request_count = 0; + client->active_blocks_request_capacity = 0; + client->next_blocks_request_id = 0; } if (client->peer_vote_lock_initialized) diff --git a/src/core/client_debug.c b/src/core/client_debug.c index 7392ef4..e33c2a9 100644 --- a/src/core/client_debug.c +++ b/src/core/client_debug.c @@ -49,6 +49,28 @@ int lantern_client_debug_record_vote( return LANTERN_CLIENT_OK; } +int lantern_client_debug_gossip_block( + struct lantern_client *client, + const LanternSignedBlock *block) +{ + if (!client || !block) + { + return LANTERN_CLIENT_ERR_INVALID_PARAM; + } + return gossip_block_handler(block, NULL, client); +} + +int lantern_client_debug_gossip_vote( + struct lantern_client *client, + const LanternSignedVote *vote) +{ + if (!client || !vote) + { + return LANTERN_CLIENT_ERR_INVALID_PARAM; + } + return gossip_vote_handler(vote, NULL, client); +} + /** * Debug API: Import a block for testing. diff --git a/src/core/client_init.c b/src/core/client_init.c index e8c1aab..b4577d8 100644 --- a/src/core/client_init.c +++ b/src/core/client_init.c @@ -78,10 +78,6 @@ int copy_genesis_paths( { return -1; } - if (set_owned_string(&paths->state_path, options->genesis_state_path) != 0) - { - return -1; - } if (set_owned_string(&paths->validator_config_path, options->validator_config_path) != 0) { return -1; diff --git a/src/core/client_network_internal.h b/src/core/client_network_internal.h index 805f37e..0640dee 100644 --- a/src/core/client_network_internal.h +++ b/src/core/client_network_internal.h @@ -46,7 +46,7 @@ extern "C" { #define LANTERN_PEER_DIAL_INTERVAL_SECONDS 5u /** Maximum concurrent blocks requests per peer */ -#define LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER 2u +#define LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER 1u /** Peer dial timeout in milliseconds */ #define LANTERN_PEER_DIAL_TIMEOUT_MS 4000 @@ -85,10 +85,8 @@ struct lantern_peer_status_entry LanternStatusMessage status; /**< Latest status message from peer */ bool has_status; /**< True if status has been received */ uint64_t last_status_ms; /**< Timestamp of last status message */ - uint32_t blocks_requests_inflight; /**< Count of in-flight block requests */ bool status_request_inflight; /**< True if status request is pending */ bool reqresp_legacy_len; /**< True if peer uses legacy reqresp length framing */ - uint64_t last_blocks_request_ms; /**< Timestamp of last blocks request */ uint32_t consecutive_blocks_failures; /**< Count of consecutive request failures */ uint32_t outstanding_status_requests; /**< Number of outstanding status requests */ uint32_t consecutive_ping_failures; /**< Count of consecutive ping failures */ @@ -102,6 +100,7 @@ struct lantern_peer_status_entry struct block_request_ctx { struct lantern_client *client; /**< Client instance */ + uint64_t request_id; /**< Internal request tracking ID */ peer_id_t peer_id; /**< Peer ID structure */ char peer_text[128]; /**< Peer ID as text */ LanternRoot *roots; /**< Roots being requested */ diff --git a/src/core/client_pending.c b/src/core/client_pending.c index 17797f7..a0dc298 100644 --- a/src/core/client_pending.c +++ b/src/core/client_pending.c @@ -774,7 +774,7 @@ struct lantern_pending_block *pending_block_list_append( entry->parent_root = *parent_root; entry->peer_text[0] = '\0'; entry->parent_requested = false; - entry->parent_request_failures = 0; + entry->parent_requested_ms = 0; entry->received_ms = monotonic_millis(); entry->backfill_depth = backfill_depth; diff --git a/src/core/client_reqresp.c b/src/core/client_reqresp.c index 9f6a2bf..703c630 100644 --- a/src/core/client_reqresp.c +++ b/src/core/client_reqresp.c @@ -119,19 +119,18 @@ static void lantern_client_peer_status_update( uint64_t local_slot); static bool lantern_client_update_blocks_request_tracking( struct lantern_client *client, + uint64_t request_id, const char *peer_id, enum lantern_blocks_request_outcome outcome, uint32_t *out_failure_count, - bool *out_entry_found); + bool *out_entry_found, + char *out_effective_peer_id, + size_t out_effective_peer_id_len); static const char *lantern_blocks_request_outcome_text(enum lantern_blocks_request_outcome outcome); static void lantern_client_handle_pending_parent_request_result( struct lantern_client *client, const LanternRoot *request_roots, - size_t root_count, - enum lantern_blocks_request_outcome outcome); -static void lantern_client_request_status_after_blocks_success( - struct lantern_client *client, - const char *peer_id); + size_t root_count); /* ============================================================================ @@ -860,28 +859,36 @@ static void lantern_client_peer_status_update( * @brief Update blocks request tracking state for a peer. * * @param client Client instance + * @param request_id Internal request tracking ID (0 for untracked) * @param peer_id Peer ID string * @param outcome Request outcome * @param out_failure_count Output consecutive failure count * @param out_entry_found Output whether peer entry was found + * @param out_effective_peer_id Output peer ID used for logging/follow-up + * @param out_effective_peer_id_len Output buffer length * @return true if tracking was updated (status_lock acquired), false otherwise * * @note Thread safety: This function acquires status_lock */ static bool lantern_client_update_blocks_request_tracking( struct lantern_client *client, + uint64_t request_id, const char *peer_id, enum lantern_blocks_request_outcome outcome, uint32_t *out_failure_count, - bool *out_entry_found) + bool *out_entry_found, + char *out_effective_peer_id, + size_t out_effective_peer_id_len) { - if (!client || !peer_id || !out_failure_count || !out_entry_found) + if (!client || !out_failure_count || !out_entry_found || !out_effective_peer_id + || out_effective_peer_id_len == 0) { return false; } *out_failure_count = 0; *out_entry_found = false; + out_effective_peer_id[0] = '\0'; const size_t peer_cap = sizeof(((struct lantern_peer_status_entry *)0)->peer_id); if (pthread_mutex_lock(&client->status_lock) != 0) @@ -889,15 +896,50 @@ static bool lantern_client_update_blocks_request_tracking( return false; } - for (size_t i = 0; i < client->peer_status_count; ++i) + bool tracking_matched = request_id == 0u; + if (request_id != 0u && client->active_blocks_request_count > 0) { - struct lantern_peer_status_entry *entry = &client->peer_status_entries[i]; - if (strncmp(entry->peer_id, peer_id, peer_cap) == 0) + for (size_t i = 0; i < client->active_blocks_request_count; ++i) { - if (entry->blocks_requests_inflight > 0) + struct lantern_active_blocks_request *request = &client->active_blocks_requests[i]; + if (request->request_id != request_id) + { + continue; + } + copy_peer_id_text(request->peer_id, out_effective_peer_id, out_effective_peer_id_len); + size_t last = client->active_blocks_request_count - 1u; + if (i != last) { - entry->blocks_requests_inflight -= 1u; + client->active_blocks_requests[i] = client->active_blocks_requests[last]; } + client->active_blocks_request_count = last; + tracking_matched = true; + break; + } + } + + if (out_effective_peer_id[0] == '\0') + { + copy_peer_id_text(peer_id, out_effective_peer_id, out_effective_peer_id_len); + } + + if (out_effective_peer_id[0] == '\0') + { + pthread_mutex_unlock(&client->status_lock); + return true; + } + + if (!tracking_matched) + { + pthread_mutex_unlock(&client->status_lock); + return true; + } + + for (size_t i = 0; i < client->peer_status_count; ++i) + { + struct lantern_peer_status_entry *entry = &client->peer_status_entries[i]; + if (strncmp(entry->peer_id, out_effective_peer_id, peer_cap) == 0) + { switch (outcome) { case LANTERN_BLOCKS_REQUEST_SUCCESS: @@ -916,18 +958,10 @@ static bool lantern_client_update_blocks_request_tracking( } break; case LANTERN_BLOCKS_REQUEST_ABORTED: - if (entry->blocks_requests_inflight == 0) - { - entry->last_blocks_request_ms = 0; - } break; default: break; } - if (outcome != LANTERN_BLOCKS_REQUEST_ABORTED && entry->last_blocks_request_ms == 0) - { - entry->last_blocks_request_ms = monotonic_millis(); - } *out_failure_count = entry->consecutive_blocks_failures; *out_entry_found = true; break; @@ -975,8 +1009,7 @@ static const char *lantern_blocks_request_outcome_text(enum lantern_blocks_reque static void lantern_client_handle_pending_parent_request_result( struct lantern_client *client, const LanternRoot *request_roots, - size_t root_count, - enum lantern_blocks_request_outcome outcome) + size_t root_count) { if (!client || !request_roots || root_count == 0) { @@ -989,10 +1022,9 @@ static void lantern_client_handle_pending_parent_request_result( return; } - bool track_empty = outcome == LANTERN_BLOCKS_REQUEST_EMPTY; struct lantern_pending_block_list *list = &client->pending_blocks; - for (size_t i = list->length; i-- > 0;) + for (size_t i = 0; i < list->length; ++i) { struct lantern_pending_block *entry = &list->items[i]; bool matches = false; @@ -1014,70 +1046,13 @@ static void lantern_client_handle_pending_parent_request_result( } entry->parent_requested = false; - if (!track_empty) - { - continue; - } - - if (entry->parent_request_failures < UINT32_MAX) - { - entry->parent_request_failures += 1u; - } - if (entry->parent_request_failures < LANTERN_MAX_PENDING_PARENT_EMPTY_RESPONSES) - { - continue; - } - - char root_hex[ROOT_HEX_BUFFER_LEN]; - char parent_hex[ROOT_HEX_BUFFER_LEN]; - format_root_hex(&entry->root, root_hex, sizeof(root_hex)); - format_root_hex(&entry->parent_root, parent_hex, sizeof(parent_hex)); - struct lantern_log_metadata meta = { - .validator = client->node_id, - .peer = entry->peer_text[0] ? entry->peer_text : NULL, - }; - lantern_log_warn( - "sync", - &meta, - "dropping pending block after empty parent fetches root=%s parent=%s attempts=%" PRIu32, - root_hex[0] ? root_hex : "0x0", - parent_hex[0] ? parent_hex : "0x0", - entry->parent_request_failures); - pending_block_list_remove(list, i); + entry->parent_requested_ms = 0; } lantern_client_unlock_pending(client, locked); } -/** - * @brief Issue a follow-up status request after successful blocks fetch. - * - * @param client Client instance - * @param peer_id Peer ID string - * - * @note Thread safety: This function is thread-safe - */ -static void lantern_client_request_status_after_blocks_success( - struct lantern_client *client, - const char *peer_id) -{ - if (!client || !peer_id || peer_id[0] == '\0') - { - return; - } - - peer_id_t parsed_peer = {0}; - bool parsed = peer_id_create_from_string(peer_id, &parsed_peer) == PEER_ID_SUCCESS; - - request_status_now(client, parsed ? &parsed_peer : NULL, peer_id); - - if (parsed) - { - peer_id_destroy(&parsed_peer); - } -} - /** * Build a status message for reqresp protocol. * @@ -1431,12 +1406,12 @@ static void lantern_client_adopt_peer_genesis( * @spec subspecs/networking/reqresp - Request lifecycle * * Updates peer tracking state after a blocks_by_root request completes: - * - Decrements request-in-flight counter + * - Finalizes active request registry entry (when tracked) * - Updates consecutive failure counter * - Clears parent_requested flag on pending blocks - * - Triggers follow-up status request on success * * @param client Client instance + * @param request_id Internal request tracking ID (0 for untracked completion) * @param peer_id Peer ID string * @param request_roots Roots that were requested * @param root_count Number of requested roots @@ -1444,29 +1419,37 @@ static void lantern_client_adopt_peer_genesis( * * @note Thread safety: This function acquires status_lock and pending_lock */ -void lantern_client_on_blocks_request_complete_batch( +void lantern_client_on_blocks_request_complete_batch_with_id( struct lantern_client *client, + uint64_t request_id, const char *peer_id, const LanternRoot *request_roots, size_t root_count, enum lantern_blocks_request_outcome outcome) { - if (!client || !peer_id || !client->status_lock_initialized) + if (!client || !client->status_lock_initialized) { return; } uint32_t failure_count = 0; bool entry_found = false; + char effective_peer[sizeof(((struct lantern_peer_status_entry *)0)->peer_id)]; + effective_peer[0] = '\0'; if (!lantern_client_update_blocks_request_tracking( client, + request_id, peer_id, outcome, &failure_count, - &entry_found)) + &entry_found, + effective_peer, + sizeof(effective_peer))) { return; } + const char *peer_for_logs = effective_peer[0] ? effective_peer : peer_id; + const LanternRoot *first_root = NULL; if (request_roots && root_count > 0) { @@ -1486,7 +1469,7 @@ void lantern_client_on_blocks_request_complete_batch( "reqresp", &(const struct lantern_log_metadata){ .validator = client->node_id, - .peer = peer_id}, + .peer = peer_for_logs && peer_for_logs[0] ? peer_for_logs : NULL}, "blocks_by_root complete outcome=%s roots=%zu first_root=%s entry_found=%s " "consecutive_failures=%" PRIu32, outcome_text, @@ -1501,7 +1484,7 @@ void lantern_client_on_blocks_request_complete_batch( "reqresp", &(const struct lantern_log_metadata){ .validator = client->node_id, - .peer = peer_id}, + .peer = peer_for_logs && peer_for_logs[0] ? peer_for_logs : NULL}, "blocks_by_root complete outcome=%s roots=%zu first_root=%s entry_found=%s " "consecutive_failures=%" PRIu32, outcome_text, @@ -1514,19 +1497,45 @@ void lantern_client_on_blocks_request_complete_batch( lantern_client_handle_pending_parent_request_result( client, request_roots, - root_count, - outcome); + root_count); - if (outcome == LANTERN_BLOCKS_REQUEST_SUCCESS && peer_id && peer_id[0] != '\0') + if (outcome == LANTERN_BLOCKS_REQUEST_SUCCESS && peer_for_logs && peer_for_logs[0] != '\0') { lantern_client_request_pending_parent_after_blocks( client, - peer_id, + peer_for_logs, + first_root); + } + else if (first_root && outcome != LANTERN_BLOCKS_REQUEST_SUCCESS) + { + /* + * On failures, immediately rescan pending parents without pinning to + * the previous peer. This lets peer selection rotate away from a peer + * that repeatedly times out or lacks the requested branch. + */ + lantern_client_request_pending_parent_after_blocks( + client, + NULL, first_root); - lantern_client_request_status_after_blocks_success(client, peer_id); } } +void lantern_client_on_blocks_request_complete_batch( + struct lantern_client *client, + const char *peer_id, + const LanternRoot *request_roots, + size_t root_count, + enum lantern_blocks_request_outcome outcome) +{ + lantern_client_on_blocks_request_complete_batch_with_id( + client, + 0u, + peer_id, + request_roots, + root_count, + outcome); +} + /** * Handle completion of a blocks request (single root). * diff --git a/src/core/client_reqresp_blocks.c b/src/core/client_reqresp_blocks.c index f058578..b661853 100644 --- a/src/core/client_reqresp_blocks.c +++ b/src/core/client_reqresp_blocks.c @@ -57,6 +57,7 @@ static bool lantern_client_process_stream_block_chunk( size_t chunk_len, const struct lantern_log_metadata *meta, bool *saw_block); +static void close_and_free_stream(libp2p_stream_t *stream); static void *block_request_worker(void *arg); static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int err); static int schedule_blocks_request_batch( @@ -64,7 +65,8 @@ static int schedule_blocks_request_batch( const char *peer_id_text, const LanternRoot *roots, const uint32_t *depths, - size_t root_count); + size_t root_count, + uint64_t request_id); /* ============================================================================ @@ -90,6 +92,16 @@ static void block_request_ctx_free(struct block_request_ctx *ctx) free(ctx); } +static void close_and_free_stream(libp2p_stream_t *stream) +{ + if (!stream) + { + return; + } + (void)libp2p_stream_close(stream); + libp2p_stream_free(stream); +} + /* ============================================================================ * Block Chunk Processing * ============================================================================ */ @@ -297,7 +309,7 @@ static void *block_request_worker(void *arg) { if (stream) { - libp2p_stream_free(stream); + close_and_free_stream(stream); } block_request_ctx_free(ctx); return NULL; @@ -484,6 +496,18 @@ static void *block_request_worker(void *arg) goto cleanup; } + /* Half-close write side so responder can finalize req/resp stream lifecycle. */ + int shutdown_rc = libp2p_stream_shutdown_write(stream); + if (shutdown_rc != 0) + { + lantern_log_error( + "reqresp", + &meta, + "failed to half-close blocks_by_root stream rc=%d", + shutdown_rc); + goto cleanup; + } + struct lantern_reqresp_service *service = ctx->client ? &ctx->client->reqresp : NULL; bool saw_block = false; @@ -586,11 +610,12 @@ static void *block_request_worker(void *arg) free(payload); free(raw_request); lantern_blocks_by_root_request_reset(&request); - libp2p_stream_free(stream); + close_and_free_stream(stream); if (ctx->client) { - lantern_client_on_blocks_request_complete_batch( + lantern_client_on_blocks_request_complete_batch_with_id( ctx->client, + ctx->request_id, ctx->peer_text, ctx->roots, ctx->root_count, @@ -627,7 +652,7 @@ static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int { if (stream) { - libp2p_stream_free(stream); + close_and_free_stream(stream); } return; } @@ -662,13 +687,14 @@ static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int root_hex[0] ? root_hex : "0x0"); if (stream) { - libp2p_stream_free(stream); + close_and_free_stream(stream); stream = NULL; } if (ctx->client) { - lantern_client_on_blocks_request_complete_batch( + lantern_client_on_blocks_request_complete_batch_with_id( ctx->client, + ctx->request_id, ctx->peer_text, ctx->roots, ctx->root_count, @@ -686,11 +712,12 @@ static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int &meta, "failed to allocate worker for %s stream", ctx->protocol_id); - libp2p_stream_free(stream); + close_and_free_stream(stream); if (ctx->client) { - lantern_client_on_blocks_request_complete_batch( + lantern_client_on_blocks_request_complete_batch_with_id( ctx->client, + ctx->request_id, ctx->peer_text, ctx->roots, ctx->root_count, @@ -710,11 +737,12 @@ static void block_request_on_open(libp2p_stream_t *stream, void *user_data, int &meta, "failed to spawn blocks_by_root worker"); free(worker); - libp2p_stream_free(stream); + close_and_free_stream(stream); if (ctx->client) { - lantern_client_on_blocks_request_complete_batch( + lantern_client_on_blocks_request_complete_batch_with_id( ctx->client, + ctx->request_id, ctx->peer_text, ctx->roots, ctx->root_count, @@ -741,7 +769,8 @@ static int schedule_blocks_request_batch( const char *peer_id_text, const LanternRoot *roots, const uint32_t *depths, - size_t root_count) + size_t root_count, + uint64_t request_id) { if (!client || !peer_id_text || !roots || root_count == 0) { @@ -771,8 +800,9 @@ static int schedule_blocks_request_batch( .validator = client->node_id, .peer = peer_id_text}, "skipping blocks_by_root dial for test run"); - lantern_client_on_blocks_request_complete_batch( + lantern_client_on_blocks_request_complete_batch_with_id( client, + request_id, peer_id_text, roots, root_count, @@ -799,6 +829,7 @@ static int schedule_blocks_request_batch( } ctx->client = client; + ctx->request_id = request_id; ctx->root_count = root_count; ctx->protocol_id = LANTERN_BLOCKS_BY_ROOT_PROTOCOL_ID; strncpy(ctx->peer_text, peer_id_text, sizeof(ctx->peer_text) - 1); @@ -886,9 +917,10 @@ int lantern_client_schedule_blocks_request_batch( const char *peer_id_text, const LanternRoot *roots, const uint32_t *depths, - size_t root_count) + size_t root_count, + uint64_t request_id) { - return schedule_blocks_request_batch(client, peer_id_text, roots, depths, root_count); + return schedule_blocks_request_batch(client, peer_id_text, roots, depths, root_count, request_id); } /** @@ -911,12 +943,13 @@ int lantern_client_schedule_blocks_request( struct lantern_client *client, const char *peer_id_text, const LanternRoot *root, - uint32_t backfill_depth) + uint32_t backfill_depth, + uint64_t request_id) { if (!root) { return LANTERN_CLIENT_ERR_INVALID_PARAM; } const uint32_t depth = backfill_depth; - return schedule_blocks_request_batch(client, peer_id_text, root, &depth, 1u); + return schedule_blocks_request_batch(client, peer_id_text, root, &depth, 1u, request_id); } diff --git a/src/core/client_reqresp_stream.c b/src/core/client_reqresp_stream.c index a04a867..f9879de 100644 --- a/src/core/client_reqresp_stream.c +++ b/src/core/client_reqresp_stream.c @@ -1510,9 +1510,63 @@ int stream_write_all( } return LANTERN_REQRESP_ERR_INVALID_PARAM; } + + char peer_text[LANTERN_REQRESP_PEER_TEXT_BYTES]; + struct lantern_log_metadata meta; + init_peer_log_metadata(stream, peer_text, sizeof(peer_text), &meta); + + uint64_t deadline_ms = UINT64_MAX; + uint64_t now_ms = reqresp_now_ms(); + uint64_t stall_timeout_ms = lantern_reqresp_stall_timeout_ms(); + if (stall_timeout_ms > 0u + && now_ms != 0u + && now_ms <= UINT64_MAX - stall_timeout_ms) + { + deadline_ms = now_ms + stall_timeout_ms; + } + size_t offset = 0; while (offset < length) { + if (deadline_ms != UINT64_MAX) + { + now_ms = reqresp_now_ms(); + if (now_ms != 0u && now_ms >= deadline_ms) + { + if (out_err) + { + *out_err = LIBP2P_ERR_TIMEOUT; + } + lantern_log_warn( + "reqresp", + &meta, + "stream write timed out bytes_written=%zu total_bytes=%zu", + offset, + length); + return LANTERN_REQRESP_ERR_STREAM_WRITE; + } + + uint64_t remaining_ms = (now_ms == 0u) ? stall_timeout_ms : (deadline_ms - now_ms); + if (remaining_ms == 0u) + { + remaining_ms = 1u; + } + int deadline_rc = libp2p_stream_set_deadline(stream, remaining_ms); + if (deadline_rc != 0) + { + if (out_err) + { + *out_err = (ssize_t)deadline_rc; + } + lantern_log_warn( + "reqresp", + &meta, + "failed to set write deadline err=%d", + deadline_rc); + return LANTERN_REQRESP_ERR_STREAM_WRITE; + } + } + ssize_t written = libp2p_stream_write(stream, data + offset, length - offset); if (written > 0) { diff --git a/src/core/client_services_internal.h b/src/core/client_services_internal.h index c14a037..ce2f71b 100644 --- a/src/core/client_services_internal.h +++ b/src/core/client_services_internal.h @@ -449,6 +449,20 @@ void lantern_client_on_blocks_request_complete_batch( size_t root_count, enum lantern_blocks_request_outcome outcome); +/** + * Handle completion of a tracked blocks request batch. + * + * Same as lantern_client_on_blocks_request_complete_batch(), but includes + * the internal request tracking ID used by the active request registry. + */ +void lantern_client_on_blocks_request_complete_batch_with_id( + struct lantern_client *client, + uint64_t request_id, + const char *peer_id, + const LanternRoot *request_roots, + size_t root_count, + enum lantern_blocks_request_outcome outcome); + /** * Handle completion of a blocks request (single root). * @@ -512,6 +526,7 @@ int lantern_reqresp_read_response_chunk( * @param roots Block roots to request * @param depths Backfill depth per root (may be NULL for zeros) * @param root_count Number of roots + * @param request_id Internal request tracking ID (0 disables tracking) * @return 0 on success * @return LANTERN_CLIENT_ERR_INVALID_PARAM if parameters are invalid, the peer ID is invalid, or any root is zero * @return LANTERN_CLIENT_ERR_ALLOC if allocation fails @@ -524,7 +539,8 @@ int lantern_client_schedule_blocks_request_batch( const char *peer_id_text, const LanternRoot *roots, const uint32_t *depths, - size_t root_count); + size_t root_count, + uint64_t request_id); /** * Schedule a single-root blocks_by_root request to a peer. @@ -535,6 +551,7 @@ int lantern_client_schedule_blocks_request_batch( * @param peer_id_text Peer ID string * @param root Block root to request * @param backfill_depth Backfill depth for the requested root + * @param request_id Internal request tracking ID (0 disables tracking) * @return 0 on success * @return LANTERN_CLIENT_ERR_INVALID_PARAM if parameters are invalid, the peer ID is invalid, or the root is zero * @return LANTERN_CLIENT_ERR_ALLOC if allocation fails @@ -546,13 +563,15 @@ int lantern_client_schedule_blocks_request( struct lantern_client *client, const char *peer_id_text, const LanternRoot *root, - uint32_t backfill_depth); + uint32_t backfill_depth, + uint64_t request_id); /** * Write all bytes to a stream. * - * Retries on AGAIN/TIMEOUT errors until all bytes are written. + * Retries on AGAIN/TIMEOUT errors until all bytes are written or the + * reqresp stall timeout window elapses. * * @param stream libp2p stream * @param data Data to write diff --git a/src/core/client_sync.c b/src/core/client_sync.c index 53a13ed..83a3788 100644 --- a/src/core/client_sync.c +++ b/src/core/client_sync.c @@ -169,14 +169,40 @@ static const char *peer_id_to_text(const peer_id_t *from, char *out, size_t out_ return out[0] ? out : NULL; } - -static bool client_accepts_gossip(const struct lantern_client *client) +/** + * Identify a pristine genesis-style state snapshot. + * + * This is used to preserve deterministic genesis anchor hashing when loading + * older snapshots that eagerly persisted latest_block_header.state_root. + */ +static bool state_has_genesis_shape(const LanternState *state) { - if (!client) + if (!state) + { + return false; + } + if (state->slot != 0 + || state->latest_block_header.slot != 0 + || state->latest_block_header.proposer_index != 0 + || state->latest_justified.slot != 0 + || state->latest_finalized.slot != 0) + { + return false; + } + if (state->historical_block_hashes.length != 0 + || state->justified_slots.bit_length != 0 + || state->justification_roots.length != 0 + || state->justification_validators.bit_length != 0) { return false; } - return client->sync_state != LANTERN_SYNC_STATE_IDLE; + if (!lantern_root_is_zero(&state->latest_block_header.parent_root) + || !lantern_root_is_zero(&state->latest_justified.root) + || !lantern_root_is_zero(&state->latest_finalized.root)) + { + return false; + } + return true; } @@ -194,7 +220,6 @@ static bool client_accepts_gossip(const struct lantern_client *client) * @param context Client instance * @return 0 on success * @return LANTERN_CLIENT_ERR_INVALID_PARAM if block or context is NULL - * @return LANTERN_CLIENT_ERR_IGNORED if gossip is ignored due to IDLE sync state * * @note Thread safety: This function is thread-safe */ @@ -212,18 +237,6 @@ int gossip_block_handler( char peer_text[PEER_TEXT_BUFFER_LEN]; const char *peer_id_text = peer_id_to_text(from, peer_text, sizeof(peer_text)); - if (!client_accepts_gossip(client)) - { - lantern_log_debug( - "gossip", - &(const struct lantern_log_metadata){ - .validator = client->node_id, - .peer = peer_id_text && *peer_id_text ? peer_id_text : NULL, - }, - "ignored block gossip in IDLE sync state"); - return LANTERN_CLIENT_ERR_IGNORED; - } - lantern_client_record_block(client, block, NULL, peer_id_text, "gossip", 0, false); return LANTERN_CLIENT_OK; } @@ -243,7 +256,6 @@ int gossip_block_handler( * @param context Client instance * @return 0 on success * @return LANTERN_CLIENT_ERR_INVALID_PARAM if vote or context is NULL - * @return LANTERN_CLIENT_ERR_IGNORED if gossip is ignored due to IDLE sync state * * @note Thread safety: This function is thread-safe */ @@ -261,18 +273,6 @@ int gossip_vote_handler( char peer_text[PEER_TEXT_BUFFER_LEN]; const char *peer_id_text = peer_id_to_text(from, peer_text, sizeof(peer_text)); - if (!client_accepts_gossip(client)) - { - lantern_log_debug( - "gossip", - &(const struct lantern_log_metadata){ - .validator = client->node_id, - .peer = peer_id_text && *peer_id_text ? peer_id_text : NULL, - }, - "ignored vote gossip in IDLE sync state"); - return LANTERN_CLIENT_ERR_IGNORED; - } - lantern_client_note_vote_delivery(client, peer_id_text, vote); lantern_client_record_vote(client, vote, peer_id_text); return LANTERN_CLIENT_OK; @@ -399,12 +399,34 @@ static int compute_fork_choice_anchor_roots( return LANTERN_CLIENT_ERR_INVALID_PARAM; } - if (lantern_hash_tree_root_state(&client->state, out_state_root) != 0) + const LanternState *state_for_hash = &client->state; + LanternState normalized_state = client->state; + bool normalized_genesis_snapshot = false; + if (state_has_genesis_shape(&client->state) + && !lantern_root_is_zero(&client->state.latest_block_header.state_root)) + { + memset( + normalized_state.latest_block_header.state_root.bytes, + 0, + sizeof(normalized_state.latest_block_header.state_root.bytes)); + state_for_hash = &normalized_state; + normalized_genesis_snapshot = true; + } + + if (lantern_hash_tree_root_state(state_for_hash, out_state_root) != 0) { lantern_log_error("forkchoice", meta, "failed to hash anchor state"); return LANTERN_CLIENT_ERR_RUNTIME; } + if (normalized_genesis_snapshot) + { + lantern_log_warn( + "forkchoice", + meta, + "normalizing persisted genesis header state_root to compute canonical anchor"); + } + *out_anchor_header = client->state.latest_block_header; out_anchor_header->state_root = *out_state_root; @@ -528,20 +550,6 @@ int initialize_fork_choice(struct lantern_client *client) &anchor_header.body_root, anchor_header.slot); - /* Also update the state's header state_root for subsequent state transitions */ - if (memcmp( - client->state.latest_block_header.state_root.bytes, - anchor_state_root.bytes, - LANTERN_ROOT_SIZE) - != 0) - { - client->state.latest_block_header.state_root = anchor_state_root; - lantern_log_debug( - "forkchoice", - &meta, - "updated genesis header state_root"); - } - LanternBlock anchor; memset(&anchor, 0, sizeof(anchor)); anchor.slot = client->state.latest_block_header.slot; @@ -1047,6 +1055,159 @@ void lantern_client_pending_remove_by_root(struct lantern_client *client, const lantern_client_unlock_pending(client, locked); } +static uint32_t active_blocks_requests_for_peer_locked( + const struct lantern_client *client, + const char *peer_id) +{ + if (!client || !peer_id || peer_id[0] == '\0') + { + return 0u; + } + + uint32_t count = 0u; + for (size_t i = 0; i < client->active_blocks_request_count; ++i) + { + const struct lantern_active_blocks_request *request = &client->active_blocks_requests[i]; + if (request->peer_id[0] == '\0') + { + continue; + } + if (strncmp(request->peer_id, peer_id, sizeof(request->peer_id)) == 0) + { + if (count < UINT32_MAX) + { + count += 1u; + } + } + } + return count; +} + +static bool reserve_active_blocks_request_locked( + struct lantern_client *client, + const char *peer_id, + uint64_t now_ms, + uint64_t timeout_ms, + uint64_t *out_request_id) +{ + if (!client || !peer_id || peer_id[0] == '\0' || !out_request_id) + { + return false; + } + + if (client->active_blocks_request_count >= client->active_blocks_request_capacity) + { + size_t new_capacity = client->active_blocks_request_capacity == 0 + ? 8u + : client->active_blocks_request_capacity * 2u; + if (new_capacity <= client->active_blocks_request_capacity + || new_capacity > (SIZE_MAX / sizeof(*client->active_blocks_requests))) + { + return false; + } + struct lantern_active_blocks_request *grown = realloc( + client->active_blocks_requests, + new_capacity * sizeof(*client->active_blocks_requests)); + if (!grown) + { + return false; + } + client->active_blocks_requests = grown; + client->active_blocks_request_capacity = new_capacity; + } + + if (client->next_blocks_request_id == 0u) + { + client->next_blocks_request_id = 1u; + } + uint64_t request_id = client->next_blocks_request_id; + client->next_blocks_request_id += 1u; + if (client->next_blocks_request_id == 0u) + { + client->next_blocks_request_id = 1u; + } + + uint64_t deadline_ms = UINT64_MAX; + if (timeout_ms < UINT64_MAX - now_ms) + { + deadline_ms = now_ms + timeout_ms; + } + + struct lantern_active_blocks_request *entry = + &client->active_blocks_requests[client->active_blocks_request_count]; + memset(entry, 0, sizeof(*entry)); + entry->request_id = request_id; + entry->started_ms = now_ms; + entry->deadline_ms = deadline_ms; + strncpy(entry->peer_id, peer_id, sizeof(entry->peer_id) - 1u); + entry->peer_id[sizeof(entry->peer_id) - 1u] = '\0'; + client->active_blocks_request_count += 1u; + *out_request_id = request_id; + return true; +} + +static void sweep_expired_active_blocks_requests_locked( + struct lantern_client *client, + uint64_t now_ms) +{ + if (!client || client->active_blocks_request_count == 0) + { + return; + } + + for (size_t i = 0; i < client->active_blocks_request_count;) + { + struct lantern_active_blocks_request *request = &client->active_blocks_requests[i]; + if (request->deadline_ms != UINT64_MAX && now_ms >= request->deadline_ms) + { + uint64_t age_ms = now_ms >= request->started_ms ? now_ms - request->started_ms : 0u; + if (!request->timeout_recorded) + { + request->timeout_recorded = true; + if (LANTERN_BLOCKS_REQUEST_HARD_TIMEOUT_MS < UINT64_MAX - now_ms) + { + request->deadline_ms = now_ms + LANTERN_BLOCKS_REQUEST_HARD_TIMEOUT_MS; + } + else + { + request->deadline_ms = UINT64_MAX; + } + + lantern_log_warn( + "reqresp", + &(const struct lantern_log_metadata){ + .validator = client->node_id, + .peer = request->peer_id[0] ? request->peer_id : NULL}, + "blocks_by_root request timed out request_id=%" PRIu64 + " age_ms=%" PRIu64 " keeping inflight slot until completion", + request->request_id, + age_ms); + i += 1u; + continue; + } + + lantern_log_warn( + "reqresp", + &(const struct lantern_log_metadata){ + .validator = client->node_id, + .peer = request->peer_id[0] ? request->peer_id : NULL}, + "blocks_by_root request hard timeout request_id=%" PRIu64 + " age_ms=%" PRIu64 " releasing inflight slot", + request->request_id, + age_ms); + + size_t last = client->active_blocks_request_count - 1u; + if (i != last) + { + client->active_blocks_requests[i] = client->active_blocks_requests[last]; + } + client->active_blocks_request_count = last; + continue; + } + i += 1u; + } +} + /** * Enqueue a pending block for later processing. @@ -1128,6 +1289,7 @@ static bool try_schedule_blocks_request_batch( } uint64_t now_ms = monotonic_millis(); + sweep_expired_active_blocks_requests_locked(client, now_ms); struct lantern_peer_status_entry *entry = NULL; const size_t peer_cap = sizeof(((struct lantern_peer_status_entry *)0)->peer_id); char selected_peer[PEER_TEXT_BUFFER_LEN]; @@ -1139,9 +1301,10 @@ static bool try_schedule_blocks_request_batch( if (entry) { bool connected = lantern_client_is_peer_connected(client, peer_text); + uint32_t inflight = active_blocks_requests_for_peer_locked(client, peer_text); bool inflight_full = - entry->blocks_requests_inflight >= LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER; - if (!connected || inflight_full) + inflight >= LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER; + if (!connected) { uint64_t age_ms = 0; if (entry->last_status_ms != 0 && now_ms >= entry->last_status_ms) @@ -1156,18 +1319,36 @@ static bool try_schedule_blocks_request_batch( "blocks_by_root requested peer not eligible connected=%s inflight=%" PRIu32 " failures=%" PRIu32 " has_status=%s status_age_ms=%" PRIu64, connected ? "true" : "false", - entry->blocks_requests_inflight, + inflight, entry->consecutive_blocks_failures, entry->has_status ? "true" : "false", age_ms); entry = NULL; } + else if (inflight_full) + { + /* Keep parent-chase continuity on the same peer. Falling back to a + * different peer while the preferred one is still streaming tends to + * produce alternating success/empty responses and slows convergence. */ + lantern_log_debug( + "reqresp", + &(const struct lantern_log_metadata){ + .validator = client->node_id, + .peer = peer_text}, + "blocks_by_root preferred peer busy inflight=%" PRIu32 + " max=%" PRIu32 " roots=%zu; deferring request", + inflight, + LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER, + root_count); + pthread_mutex_unlock(&client->status_lock); + return false; + } } } struct lantern_peer_status_entry *best_entry = entry; bool best_fresh = false; - uint32_t best_inflight = entry ? entry->blocks_requests_inflight : 0; + uint32_t best_inflight = entry ? active_blocks_requests_for_peer_locked(client, entry->peer_id) : 0u; uint32_t best_failures = entry ? entry->consecutive_blocks_failures : 0; uint64_t best_status_ms = entry ? entry->last_status_ms : 0; if (entry && entry->has_status @@ -1192,7 +1373,9 @@ static bool try_schedule_blocks_request_batch( { continue; } - if (candidate->blocks_requests_inflight >= LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER) + uint32_t candidate_inflight = + active_blocks_requests_for_peer_locked(client, candidate->peer_id); + if (candidate_inflight >= LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER) { continue; } @@ -1217,7 +1400,7 @@ static bool try_schedule_blocks_request_batch( { best_entry = candidate; best_fresh = fresh; - best_inflight = candidate->blocks_requests_inflight; + best_inflight = candidate_inflight; best_failures = candidate->consecutive_blocks_failures; best_status_ms = candidate->last_status_ms; continue; @@ -1227,7 +1410,7 @@ static bool try_schedule_blocks_request_batch( { best_entry = candidate; best_fresh = fresh; - best_inflight = candidate->blocks_requests_inflight; + best_inflight = candidate_inflight; best_failures = candidate->consecutive_blocks_failures; best_status_ms = candidate->last_status_ms; continue; @@ -1241,22 +1424,22 @@ static bool try_schedule_blocks_request_batch( { best_entry = candidate; best_fresh = true; - best_inflight = candidate->blocks_requests_inflight; + best_inflight = candidate_inflight; best_failures = candidate->consecutive_blocks_failures; best_status_ms = candidate->last_status_ms; continue; } if (fresh == best_fresh) { - if (candidate->blocks_requests_inflight < best_inflight) + if (candidate_inflight < best_inflight) { best_entry = candidate; - best_inflight = candidate->blocks_requests_inflight; + best_inflight = candidate_inflight; best_failures = candidate->consecutive_blocks_failures; best_status_ms = candidate->last_status_ms; continue; } - if (candidate->blocks_requests_inflight == best_inflight) + if (candidate_inflight == best_inflight) { if (candidate->last_status_ms > best_status_ms) { @@ -1283,7 +1466,9 @@ static bool try_schedule_blocks_request_batch( { continue; } - if (candidate->blocks_requests_inflight >= LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER) + uint32_t candidate_inflight = + active_blocks_requests_for_peer_locked(client, candidate->peer_id); + if (candidate_inflight >= LANTERN_MAX_BLOCKS_REQUESTS_PER_PEER) { inflight_full_entries += 1u; } @@ -1333,9 +1518,23 @@ static bool try_schedule_blocks_request_batch( } strncpy(selected_peer, entry->peer_id, copy_cap - 1u); selected_peer[copy_cap - 1u] = '\0'; - - entry->blocks_requests_inflight += 1u; - entry->last_blocks_request_ms = now_ms; + uint64_t request_id = 0u; + if (!reserve_active_blocks_request_locked( + client, + selected_peer, + now_ms, + LANTERN_BLOCKS_REQUEST_TIMEOUT_MS, + &request_id)) + { + lantern_log_warn( + "reqresp", + &(const struct lantern_log_metadata){ + .validator = client->node_id, + .peer = selected_peer[0] ? selected_peer : NULL}, + "blocks_by_root request skipped: unable to reserve request tracking entry"); + pthread_mutex_unlock(&client->status_lock); + return false; + } pthread_mutex_unlock(&client->status_lock); lantern_log_debug( @@ -1343,7 +1542,8 @@ static bool try_schedule_blocks_request_batch( &(const struct lantern_log_metadata){ .validator = client->node_id, .peer = selected_peer[0] ? selected_peer : NULL}, - "blocks_by_root scheduling roots=%zu first_root=%s depth_min=%" PRIu32 " depth_max=%" PRIu32, + "blocks_by_root scheduling request_id=%" PRIu64 " roots=%zu first_root=%s depth_min=%" PRIu32 " depth_max=%" PRIu32, + request_id, root_count, first_root_hex[0] ? first_root_hex : "0x0", min_depth == UINT32_MAX ? 0u : min_depth, @@ -1354,7 +1554,8 @@ static bool try_schedule_blocks_request_batch( selected_peer, roots, depths, - root_count) + root_count, + request_id) != 0) { lantern_log_warn( @@ -1364,8 +1565,9 @@ static bool try_schedule_blocks_request_batch( .peer = selected_peer}, "blocks_by_root request scheduling failed roots=%zu", root_count); - lantern_client_on_blocks_request_complete_batch( + lantern_client_on_blocks_request_complete_batch_with_id( client, + request_id, selected_peer, roots, root_count, @@ -1430,18 +1632,36 @@ static void mark_pending_parent_requested( return; } + uint64_t request_ms = requested ? monotonic_millis() : 0u; + for (size_t i = 0; i < client->pending_blocks.length; ++i) { struct lantern_pending_block *entry = &client->pending_blocks.items[i]; if (memcmp(entry->parent_root.bytes, parent_root->bytes, LANTERN_ROOT_SIZE) == 0) { entry->parent_requested = requested; + entry->parent_requested_ms = request_ms; } } lantern_client_unlock_pending(client, locked); } +static bool pending_parent_request_is_stale( + const struct lantern_pending_block *entry, + uint64_t now_ms) +{ + if (!entry || !entry->parent_requested) + { + return false; + } + if (entry->parent_requested_ms == 0) + { + return true; + } + return now_ms >= entry->parent_requested_ms + LANTERN_PENDING_PARENT_REQUEST_STALE_MS; +} + struct pending_parent_candidate { LanternRoot child_root; @@ -1450,6 +1670,25 @@ struct pending_parent_candidate bool parent_cached; }; +static int pending_parent_candidate_compare(const void *left, const void *right) +{ + const struct pending_parent_candidate *left_entry = left; + const struct pending_parent_candidate *right_entry = right; + + if (left_entry->request_depth > right_entry->request_depth) + { + return -1; + } + if (left_entry->request_depth < right_entry->request_depth) + { + return 1; + } + return memcmp( + left_entry->parent_root.bytes, + right_entry->parent_root.bytes, + LANTERN_ROOT_SIZE); +} + struct pending_child_replay { LanternSignedBlock block; @@ -1490,6 +1729,8 @@ void lantern_client_request_pending_parent_after_blocks( bool has_requested_root = false; bool prefer_requested_root = false; bool requested_parent_requested = false; + bool requested_parent_stale = false; + uint64_t now_ms = monotonic_millis(); struct pending_parent_candidate requested_candidate = {0}; if (request_root && !lantern_root_is_zero(request_root)) { @@ -1528,6 +1769,11 @@ void lantern_client_request_pending_parent_after_blocks( &entry->parent_root) != NULL; requested_parent_requested = entry->parent_requested; + requested_parent_stale = pending_parent_request_is_stale(entry, now_ms); + if (requested_parent_stale) + { + requested_parent_requested = false; + } if (!requested_candidate.parent_cached && !requested_parent_requested) { prefer_requested_root = true; @@ -1539,7 +1785,7 @@ void lantern_client_request_pending_parent_after_blocks( for (size_t i = 0; i < client->pending_blocks.length; ++i) { struct lantern_pending_block *entry = &client->pending_blocks.items[i]; - if (entry->parent_requested) + if (entry->parent_requested && !pending_parent_request_is_stale(entry, now_ms)) { continue; } @@ -1586,11 +1832,12 @@ void lantern_client_request_pending_parent_after_blocks( lantern_log_debug( "sync", &(const struct lantern_log_metadata){.validator = client->node_id}, - "pending parent scan requested_root=%s candidates=%zu prefer_requested=%s requested_parent=%s", + "pending parent scan requested_root=%s candidates=%zu prefer_requested=%s requested_parent=%s requested_parent_stale=%s", has_requested_root ? (requested_hex[0] ? requested_hex : "0x0") : "-", candidate_count, prefer_requested_root ? "true" : "false", - requested_parent_requested ? "true" : "false"); + requested_parent_requested ? "true" : "false", + requested_parent_stale ? "true" : "false"); if (candidate_count == 0 && peer_text && *peer_text) { @@ -1602,7 +1849,7 @@ void lantern_client_request_pending_parent_after_blocks( for (size_t i = 0; i < client->pending_blocks.length; ++i) { struct lantern_pending_block *entry = &client->pending_blocks.items[i]; - if (entry->parent_requested) + if (entry->parent_requested && !pending_parent_request_is_stale(entry, now_ms)) { continue; } @@ -1640,6 +1887,17 @@ void lantern_client_request_pending_parent_after_blocks( lantern_client_unlock_pending(client, locked); } + if (candidate_count > 1u) + { + /* Prioritize deepest missing ancestors first so backfill converges to a + * known anchor quickly instead of diffusing requests across shallow tips. */ + qsort( + candidates, + candidate_count, + sizeof(candidates[0]), + pending_parent_candidate_compare); + } + LanternRoot request_roots[LANTERN_MAX_BLOCKS_PER_REQUEST]; uint32_t request_depths[LANTERN_MAX_BLOCKS_PER_REQUEST]; size_t request_count = 0; @@ -1777,7 +2035,6 @@ void lantern_client_enqueue_pending_block( } } existing->received_ms = monotonic_millis(); - existing->parent_request_failures = 0; size_t pending_len = list->length; bool parent_requested = existing->parent_requested; lantern_client_unlock_pending(client, locked); @@ -1817,14 +2074,58 @@ void lantern_client_enqueue_pending_block( if (list->length >= LANTERN_PENDING_BLOCK_LIMIT && list->length > 0) { - char dropped_hex[ROOT_HEX_BUFFER_LEN]; - format_root_hex(&list->items[0].root, dropped_hex, sizeof(dropped_hex)); - lantern_log_warn( - "state", - &(const struct lantern_log_metadata){.validator = client->node_id}, - "pending block queue full; dropping oldest root=%s", - dropped_hex[0] ? dropped_hex : "0x0"); - pending_block_list_remove(list, 0); + if (client->sync_state != LANTERN_SYNC_STATE_IDLE) + { + size_t shallowest_index = 0u; + uint32_t shallowest_depth = list->items[0].backfill_depth; + for (size_t i = 1; i < list->length; ++i) + { + if (list->items[i].backfill_depth < shallowest_depth) + { + shallowest_depth = list->items[i].backfill_depth; + shallowest_index = i; + } + } + + if (backfill_depth <= shallowest_depth) + { + char dropped_hex[ROOT_HEX_BUFFER_LEN]; + format_root_hex(&block_root_local, dropped_hex, sizeof(dropped_hex)); + lantern_log_warn( + "state", + &(const struct lantern_log_metadata){.validator = client->node_id}, + "pending block queue full while syncing; dropping shallow incoming root=%s depth=%" PRIu32 + " shallowest_depth=%" PRIu32, + dropped_hex[0] ? dropped_hex : "0x0", + backfill_depth, + shallowest_depth); + lantern_client_unlock_pending(client, locked); + return; + } + + char evicted_hex[ROOT_HEX_BUFFER_LEN]; + format_root_hex(&list->items[shallowest_index].root, evicted_hex, sizeof(evicted_hex)); + lantern_log_warn( + "state", + &(const struct lantern_log_metadata){.validator = client->node_id}, + "pending block queue full while syncing; evicting shallow root=%s depth=%" PRIu32 + " for incoming depth=%" PRIu32, + evicted_hex[0] ? evicted_hex : "0x0", + shallowest_depth, + backfill_depth); + pending_block_list_remove(list, shallowest_index); + } + else + { + char dropped_hex[ROOT_HEX_BUFFER_LEN]; + format_root_hex(&list->items[0].root, dropped_hex, sizeof(dropped_hex)); + lantern_log_warn( + "state", + &(const struct lantern_log_metadata){.validator = client->node_id}, + "pending block queue full; dropping oldest root=%s", + dropped_hex[0] ? dropped_hex : "0x0"); + pending_block_list_remove(list, 0); + } } struct lantern_pending_block *entry = pending_block_list_append( diff --git a/src/core/client_sync_blocks.c b/src/core/client_sync_blocks.c index 9a188e2..acf0c75 100644 --- a/src/core/client_sync_blocks.c +++ b/src/core/client_sync_blocks.c @@ -639,35 +639,18 @@ static bool build_root_chain_locked( return out_chain->length > 0; } -static bool init_replay_state(const struct lantern_client *client, LanternState *out_state) +static bool resolve_replay_validator_pubkeys( + const struct lantern_client *client, + const struct lantern_log_metadata *meta, + const uint8_t **out_pubkeys, + size_t *out_validator_count, + bool *out_allocated_pubkeys) { - if (!client || !out_state) + if (!client || !out_pubkeys || !out_validator_count || !out_allocated_pubkeys) { return false; } - struct lantern_log_metadata meta = {.validator = client->node_id}; - lantern_state_init(out_state); - - if (client->genesis.state_bytes && client->genesis.state_size > 0) - { - if (lantern_ssz_decode_state( - out_state, - client->genesis.state_bytes, - client->genesis.state_size) - == 0) - { - return true; - } - lantern_log_warn( - "state", - &meta, - "init_replay_state failed to decode genesis state size=%zu", - client->genesis.state_size); - lantern_state_reset(out_state); - lantern_state_init(out_state); - } - const struct lantern_chain_config *config = &client->genesis.chain_config; size_t validator_count = config->validator_pubkeys_count; const uint8_t *pubkeys = config->validator_pubkeys; @@ -682,10 +665,9 @@ static bool init_replay_state(const struct lantern_client *client, LanternState { lantern_log_warn( "state", - &meta, + meta, "init_replay_state registry pubkey size overflow count=%zu", registry_count); - lantern_state_reset(out_state); return false; } @@ -696,10 +678,9 @@ static bool init_replay_state(const struct lantern_client *client, LanternState { lantern_log_warn( "state", - &meta, + meta, "init_replay_state failed to allocate registry pubkey buffer len=%zu", pubkeys_len); - lantern_state_reset(out_state); return false; } bool pubkey_ok = true; @@ -731,10 +712,9 @@ static bool init_replay_state(const struct lantern_client *client, LanternState { lantern_log_warn( "state", - &meta, + meta, "init_replay_state failed to populate registry pubkeys"); free(buffer); - lantern_state_reset(out_state); return false; } pubkeys = buffer; @@ -747,10 +727,9 @@ static bool init_replay_state(const struct lantern_client *client, LanternState { lantern_log_warn( "state", - &meta, + meta, "init_replay_state state pubkey size overflow count=%zu", state_count); - lantern_state_reset(out_state); return false; } validator_count = state_count; @@ -760,10 +739,9 @@ static bool init_replay_state(const struct lantern_client *client, LanternState { lantern_log_warn( "state", - &meta, + meta, "init_replay_state failed to allocate state pubkey buffer len=%zu", pubkeys_len); - lantern_state_reset(out_state); return false; } for (size_t i = 0; i < validator_count; ++i) @@ -779,9 +757,8 @@ static bool init_replay_state(const struct lantern_client *client, LanternState { lantern_log_warn( "state", - &meta, + meta, "init_replay_state missing validator pubkeys"); - lantern_state_reset(out_state); return false; } } @@ -794,12 +771,51 @@ static bool init_replay_state(const struct lantern_client *client, LanternState } lantern_log_warn( "state", - &meta, + meta, "init_replay_state invalid validator_count=0"); + return false; + } + + *out_pubkeys = pubkeys; + *out_validator_count = validator_count; + *out_allocated_pubkeys = allocated_pubkeys; + return true; +} + +static bool init_replay_state(const struct lantern_client *client, LanternState *out_state) +{ + if (!client || !out_state) + { + return false; + } + + struct lantern_log_metadata meta = {.validator = client->node_id}; + lantern_state_init(out_state); + + if (client->genesis.state_bytes && client->genesis.state_size > 0) + { + lantern_log_debug( + "state", + &meta, + "init_replay_state ignoring local genesis state bytes size=%zu", + client->genesis.state_size); + } + + const uint8_t *pubkeys = NULL; + size_t validator_count = 0; + bool allocated_pubkeys = false; + if (!resolve_replay_validator_pubkeys( + client, + &meta, + &pubkeys, + &validator_count, + &allocated_pubkeys)) + { lantern_state_reset(out_state); return false; } + const struct lantern_chain_config *config = &client->genesis.chain_config; if (lantern_state_generate_genesis( out_state, config->genesis_time, diff --git a/src/core/client_sync_internal.h b/src/core/client_sync_internal.h index 38db3a4..ccced25 100644 --- a/src/core/client_sync_internal.h +++ b/src/core/client_sync_internal.h @@ -48,10 +48,32 @@ typedef struct peer_id peer_id_t; /** Maximum roots per blocks_by_root request */ #define LANTERN_MAX_BLOCKS_PER_REQUEST 10u -/** Maximum backfill depth when requesting parents */ -#define LANTERN_MAX_BACKFILL_DEPTH LANTERN_PENDING_BLOCK_LIMIT -/** Maximum consecutive empty parent fetches before dropping pending blocks */ -#define LANTERN_MAX_PENDING_PARENT_EMPTY_RESPONSES 8u +/** + * Maximum parent depth for ancestor backfill requests. + * + * Keep this independent from the in-memory pending queue limit so a fresh + * node can backfill deep historical ancestors without requiring a huge + * pending list in RAM. + */ +#define LANTERN_MAX_BACKFILL_DEPTH 65535u +/** Retry parent fetches if a scheduled request does not complete within this window. */ +#define LANTERN_PENDING_PARENT_REQUEST_STALE_MS 15000u +/** + * Timeout used by outbound blocks-by-root request tracking. + * + * Requests that exceed this duration are expired from the active request registry + * and treated as failed for peer scoring. + */ +#define LANTERN_BLOCKS_REQUEST_TIMEOUT_MS 12000u +/** + * Hard cap for request tracking entry lifetime after soft timeout. + * + * Requests remain marked inflight after the first timeout signal so peer-side + * stream parsing can still complete without opening parallel replacement + * streams. If a request never completes, this hard timeout eventually releases + * the slot. + */ +#define LANTERN_BLOCKS_REQUEST_HARD_TIMEOUT_MS 60000u /* ============================================================================ diff --git a/src/core/main.c b/src/core/main.c index 2e5a3b7..701c1fd 100644 --- a/src/core/main.c +++ b/src/core/main.c @@ -28,6 +28,7 @@ enum { OPT_VALIDATOR_REGISTRY, OPT_NODES_PATH, OPT_GENESIS_STATE, + OPT_USE_GENESIS_STATE, OPT_VALIDATOR_CONFIG, OPT_NODE_ID, OPT_NODE_KEY, @@ -205,6 +206,9 @@ static lantern_client_error apply_option( case OPT_GENESIS_STATE: options->genesis_state_path = optarg; return LANTERN_CLIENT_OK; + case OPT_USE_GENESIS_STATE: + options->use_genesis_state = true; + return LANTERN_CLIENT_OK; case OPT_VALIDATOR_CONFIG: options->validator_config_path = optarg; return LANTERN_CLIENT_OK; @@ -418,6 +422,7 @@ static lantern_client_error parse_arguments( {"validator-registry-path", required_argument, NULL, OPT_VALIDATOR_REGISTRY}, {"nodes-path", required_argument, NULL, OPT_NODES_PATH}, {"genesis-state", required_argument, NULL, OPT_GENESIS_STATE}, + {"use-genesis-state", no_argument, NULL, OPT_USE_GENESIS_STATE}, {"validator-config", required_argument, NULL, OPT_VALIDATOR_CONFIG}, {"node-id", required_argument, NULL, OPT_NODE_ID}, {"node-key", required_argument, NULL, OPT_NODE_KEY}, @@ -451,6 +456,16 @@ static lantern_client_error parse_arguments( } } + if (options->use_genesis_state || options->genesis_state_path) + { + lantern_log_warn( + "cli", + &(const struct lantern_log_metadata){.validator = options->node_id}, + "ignoring --genesis-state/--use-genesis-state; Lantern derives genesis from config/registry"); + options->use_genesis_state = false; + options->genesis_state_path = NULL; + } + if (options->node_key_hex && options->node_key_path) { lantern_log_error( @@ -673,7 +688,11 @@ static void print_usage_paths(void) lantern_log_info( "main", NULL, - " --genesis-state PATH Path to genesis.ssz"); + " --genesis-state PATH Deprecated; ignored"); + lantern_log_info( + "main", + NULL, + " --use-genesis-state Deprecated; ignored"); lantern_log_info( "main", NULL, diff --git a/src/genesis/genesis.c b/src/genesis/genesis.c index d368855..aedd07b 100644 --- a/src/genesis/genesis.c +++ b/src/genesis/genesis.c @@ -175,7 +175,6 @@ int lantern_genesis_load( if (!paths->config_path || !paths->validator_registry_path || !paths->nodes_path - || !paths->state_path || !paths->validator_config_path) { lantern_log_error("genesis", NULL, "missing required genesis path"); @@ -227,18 +226,21 @@ int lantern_genesis_load( goto error; } - result = genesis_read_state_blob( - paths->state_path, - &artifacts->state_bytes, - &artifacts->state_size); - if (result != LANTERN_GENESIS_OK) + if (paths->state_path && paths->state_path[0] != '\0') { - lantern_log_error( - "genesis", - NULL, - "failed to read genesis state at %s", - paths->state_path); - goto error; + result = genesis_read_state_blob( + paths->state_path, + &artifacts->state_bytes, + &artifacts->state_size); + if (result != LANTERN_GENESIS_OK) + { + lantern_log_error( + "genesis", + NULL, + "failed to read genesis state at %s", + paths->state_path); + goto error; + } } return LANTERN_GENESIS_OK; diff --git a/src/networking/messages.c b/src/networking/messages.c index c75540b..381c4ab 100644 --- a/src/networking/messages.c +++ b/src/networking/messages.c @@ -272,16 +272,18 @@ int lantern_network_blocks_by_root_request_encode( return -1; } size_t roots_bytes = req->roots.length * LANTERN_ROOT_SIZE; - if (out_len < roots_bytes) { + size_t total_len = roots_bytes; + if (out_len < total_len) { return -1; } + /* leanSpec: BlocksByRootRequest is SSZList[Bytes32], encoded as packed roots bytes. */ if (roots_bytes > 0) { if (!req->roots.items) { return -1; } memcpy(out, req->roots.items, roots_bytes); } - *written = roots_bytes; + *written = total_len; return 0; } @@ -331,12 +333,7 @@ int lantern_network_blocks_by_root_request_decode( return -1; } - /* Canonical SSZ list encoding: raw concatenated roots. */ - if (data_len % LANTERN_ROOT_SIZE == 0) { - return decode_blocks_by_root_list(req, data, data_len); - } - - /* Zeam-compatible SSZ container encoding: 4-byte offset + list payload. */ + /* Canonical SSZ container encoding: 4-byte offset + list payload. */ if (data_len >= 4) { uint32_t offset = (uint32_t)data[0] | ((uint32_t)data[1] << 8) @@ -349,6 +346,11 @@ int lantern_network_blocks_by_root_request_decode( } } } + + /* Legacy compatibility: some older peers encoded only the raw list bytes. */ + if (data_len % LANTERN_ROOT_SIZE == 0) { + return decode_blocks_by_root_list(req, data, data_len); + } return -1; } diff --git a/src/storage/storage.c b/src/storage/storage.c index 60f969d..77a345e 100644 --- a/src/storage/storage.c +++ b/src/storage/storage.c @@ -70,6 +70,13 @@ struct lantern_storage_checkpoint_record { LanternCheckpoint finalized; }; +/* + * Internal helpers for filesystem/path handling, SSZ size estimation, and + * atomic file reads/writes. + * + * Public API: see include/lantern/storage/storage.h + */ + static int ensure_directory(const char *path) { if (!path) { return -1; @@ -94,14 +101,14 @@ static int join_path(const char *base, const char *leaf, char **out_path) { if (!base || !leaf || !out_path) { return -1; } - size_t base_len = strlen(base); - size_t leaf_len = strlen(leaf); - int needs_sep = 0; + const size_t base_len = strlen(base); + const size_t leaf_len = strlen(leaf); + bool needs_sep = false; if (base_len > 0) { - char tail = base[base_len - 1]; + const char tail = base[base_len - 1]; needs_sep = (tail != '/' && tail != '\\'); } - size_t total = base_len + (needs_sep ? 1 : 0) + leaf_len + 1; + const size_t total = base_len + (needs_sep ? 1u : 0u) + leaf_len + 1u; char *buffer = malloc(total); if (!buffer) { return -1; @@ -331,60 +338,59 @@ static int write_atomic_file(const char *path, const uint8_t *data, size_t data_ if (!path || !data || data_len == 0) { return -1; } - size_t path_len = strlen(path); - char *tmp_path = malloc(path_len + 5); + int rc = -1; + FILE *fp = NULL; + char *tmp_path = NULL; + + const size_t path_len = strlen(path); + tmp_path = malloc(path_len + sizeof(".tmp")); if (!tmp_path) { - return -1; + goto cleanup; } memcpy(tmp_path, path, path_len); - memcpy(tmp_path + path_len, ".tmp", 4); - tmp_path[path_len + 4] = '\0'; + memcpy(tmp_path + path_len, ".tmp", sizeof(".tmp")); - FILE *fp = fopen(tmp_path, "wb"); + fp = fopen(tmp_path, "wb"); if (!fp) { - free(tmp_path); - return -1; + goto cleanup; } - size_t written = fwrite(data, 1, data_len, fp); + const size_t written = fwrite(data, 1u, data_len, fp); if (written != data_len) { - fclose(fp); - free(tmp_path); - return -1; + goto cleanup; } if (fflush(fp) != 0) { - fclose(fp); - free(tmp_path); - return -1; + goto cleanup; } #if defined(_WIN32) if (_commit(_fileno(fp)) != 0) { - fclose(fp); - free(tmp_path); - return -1; + goto cleanup; } #else if (fsync(fileno(fp)) != 0) { - fclose(fp); - free(tmp_path); - return -1; + goto cleanup; } #endif - if (fclose(fp) != 0) { - free(tmp_path); - return -1; + const int close_rc = fclose(fp); + fp = NULL; + if (close_rc != 0) { + goto cleanup; } #if defined(_WIN32) if (remove(path) != 0 && errno != ENOENT) { - free(tmp_path); - return -1; + goto cleanup; } #endif if (rename(tmp_path, path) != 0) { - free(tmp_path); - return -1; + goto cleanup; + } + rc = 0; + +cleanup: + if (fp) { + fclose(fp); } free(tmp_path); - return 0; + return rc; } static int read_file_buffer(const char *path, uint8_t **out_data, size_t *out_len) { @@ -395,55 +401,63 @@ static int read_file_buffer(const char *path, uint8_t **out_data, size_t *out_le if (!fp) { return (errno == ENOENT) ? 1 : -1; } + int rc = -1; + uint8_t *buffer = NULL; + if (fseek(fp, 0, SEEK_END) != 0) { - fclose(fp); - return -1; + goto cleanup; } - long file_size = ftell(fp); + const long file_size = ftell(fp); if (file_size < 0) { - fclose(fp); - return -1; + goto cleanup; } if (fseek(fp, 0, SEEK_SET) != 0) { - fclose(fp); - return -1; + goto cleanup; } if (file_size == 0) { - fclose(fp); - return 1; + rc = 1; + goto cleanup; } - uint8_t *buffer = malloc((size_t)file_size); + buffer = malloc((size_t)file_size); if (!buffer) { - fclose(fp); - return -1; + goto cleanup; } - size_t read = fread(buffer, 1, (size_t)file_size, fp); - fclose(fp); + const size_t read = fread(buffer, 1u, (size_t)file_size, fp); if (read != (size_t)file_size) { - free(buffer); - return -1; + goto cleanup; } + *out_data = buffer; *out_len = (size_t)file_size; - return 0; + buffer = NULL; + rc = 0; + +cleanup: + fclose(fp); + free(buffer); + return rc; } static int write_state_meta(const char *data_dir, const LanternState *state) { if (!data_dir || !state) { return -1; } - struct lantern_storage_state_meta meta = { + int rc = -1; + char *meta_path = NULL; + + const struct lantern_storage_state_meta meta = { .version = LANTERN_STORAGE_STATE_META_VERSION, .reserved = 0, .historical_roots_offset = state->historical_roots_offset, .justified_slots_offset = state->justified_slots_offset, }; - char *meta_path = NULL; if (join_path(data_dir, LANTERN_STORAGE_STATE_META_FILE, &meta_path) != 0) { - return -1; + goto cleanup; } - int rc = write_atomic_file(meta_path, (const uint8_t *)&meta, sizeof(meta)); - free(meta_path); + rc = write_atomic_file(meta_path, (const uint8_t *)&meta, sizeof(meta)); + +cleanup: + free_path(meta_path); return rc; } @@ -464,28 +478,34 @@ static int read_state_meta(const char *data_dir, struct lantern_storage_state_me if (!data_dir || !meta) { return -1; } + int rc = -1; char *meta_path = NULL; - if (join_path(data_dir, LANTERN_STORAGE_STATE_META_FILE, &meta_path) != 0) { - return -1; - } uint8_t *buffer = NULL; size_t len = 0; - int rc = read_file_buffer(meta_path, &buffer, &len); - free(meta_path); + + if (join_path(data_dir, LANTERN_STORAGE_STATE_META_FILE, &meta_path) != 0) { + goto cleanup; + } + rc = read_file_buffer(meta_path, &buffer, &len); if (rc != 0) { - free(buffer); - return rc; + goto cleanup; } if (len != sizeof(*meta)) { - free(buffer); - return -1; + rc = -1; + goto cleanup; } memcpy(meta, buffer, sizeof(*meta)); - free(buffer); if (meta->version != LANTERN_STORAGE_STATE_META_VERSION) { - return -1; + rc = -1; + goto cleanup; } - return 0; + + rc = 0; + +cleanup: + free_path(meta_path); + free(buffer); + return rc; } static int build_blocks_dir(const char *data_dir, char **out_path) { @@ -510,113 +530,155 @@ static int build_slot_index_dir(const char *data_dir, char **out_path) { return rc; } +/** + * Ensure all storage directories exist under `data_dir`. + * + * @param data_dir Base directory path. + * @return 0 on success. + * @return -1 on invalid parameters or filesystem errors. + */ int lantern_storage_prepare(const char *data_dir) { if (!data_dir) { return -1; } + + int rc = -1; + char *blocks_dir = NULL; + char *states_dir = NULL; + char *indices_dir = NULL; + char *slot_dir = NULL; + if (ensure_directory(data_dir) != 0) { - return -1; + goto cleanup; } - char *blocks_dir = NULL; if (build_blocks_dir(data_dir, &blocks_dir) != 0) { - return -1; + goto cleanup; } - int rc = ensure_directory(blocks_dir); - free_path(blocks_dir); - if (rc != 0) { - return rc; + if (ensure_directory(blocks_dir) != 0) { + goto cleanup; } - char *states_dir = NULL; if (build_states_dir(data_dir, &states_dir) != 0) { - return -1; + goto cleanup; } - rc = ensure_directory(states_dir); - free_path(states_dir); - if (rc != 0) { - return rc; + if (ensure_directory(states_dir) != 0) { + goto cleanup; } - char *indices_dir = NULL; if (build_indices_dir(data_dir, &indices_dir) != 0) { - return -1; + goto cleanup; } - rc = ensure_directory(indices_dir); - free_path(indices_dir); - if (rc != 0) { - return rc; + if (ensure_directory(indices_dir) != 0) { + goto cleanup; } - char *slot_dir = NULL; if (build_slot_index_dir(data_dir, &slot_dir) != 0) { - return -1; + goto cleanup; + } + if (ensure_directory(slot_dir) != 0) { + goto cleanup; } - rc = ensure_directory(slot_dir); + + rc = 0; + +cleanup: + free_path(blocks_dir); + free_path(states_dir); + free_path(indices_dir); free_path(slot_dir); return rc; } +/** + * Persist `state` under `data_dir` using SSZ (`state.ssz`) plus `state.meta`. + * + * @param data_dir Base directory path. + * @param state State to persist. + * @return 0 on success. + * @return -1 on invalid parameters, encoding failure, or filesystem errors. + */ int lantern_storage_save_state(const char *data_dir, const LanternState *state) { if (!data_dir || !state || state->config.num_validators == 0) { return -1; } - size_t encoded_size = state_encoded_size(state); + int rc = -1; + uint8_t *buffer = NULL; + char *state_path = NULL; + + const size_t encoded_size = state_encoded_size(state); if (encoded_size == 0) { - return -1; + goto cleanup; } - uint8_t *buffer = malloc(encoded_size); + + buffer = malloc(encoded_size); if (!buffer) { - return -1; + goto cleanup; } size_t written = 0; if (lantern_ssz_encode_state(state, buffer, encoded_size, &written) != 0 || written != encoded_size) { - free(buffer); - return -1; + goto cleanup; } - char *state_path = NULL; if (join_path(data_dir, LANTERN_STORAGE_STATE_FILE, &state_path) != 0) { - free(buffer); - return -1; + goto cleanup; } - int rc = write_atomic_file(state_path, buffer, written); - free(state_path); - free(buffer); + rc = write_atomic_file(state_path, buffer, written); if (rc != 0) { - return rc; + goto cleanup; } - return write_state_meta(data_dir, state); + + rc = write_state_meta(data_dir, state); + +cleanup: + free_path(state_path); + free(buffer); + return rc; } +/** + * Load a persisted state from `data_dir/state.ssz`. + * + * On success, the contents of `state` are replaced. + * + * @param data_dir Base directory path. + * @param state Output state (replaced on success). + * @return 0 on success. + * @return 1 if the state file is missing or empty. + * @return -1 on invalid parameters or decode/validation errors. + */ int lantern_storage_load_state(const char *data_dir, LanternState *state) { if (!data_dir || !state) { return -1; } + + int rc = -1; char *state_path = NULL; - if (join_path(data_dir, LANTERN_STORAGE_STATE_FILE, &state_path) != 0) { - return -1; - } uint8_t *data = NULL; size_t data_len = 0; - int read_rc = read_file_buffer(state_path, &data, &data_len); - free(state_path); - if (read_rc != 0) { - return read_rc; - } + LanternState decoded; lantern_state_init(&decoded); + bool decoded_owned = true; + + if (join_path(data_dir, LANTERN_STORAGE_STATE_FILE, &state_path) != 0) { + goto cleanup; + } + rc = read_file_buffer(state_path, &data, &data_len); + if (rc != 0) { + goto cleanup; + } if (lantern_ssz_decode_state(&decoded, data, data_len) != 0) { - free(data); - lantern_state_reset(&decoded); - return -1; + rc = -1; + goto cleanup; } free(data); + data = NULL; if (decoded.config.num_validators == 0) { - lantern_state_reset(&decoded); - return -1; + rc = -1; + goto cleanup; } if (lantern_state_prepare_validator_votes(&decoded, decoded.config.num_validators) != 0) { - lantern_state_reset(&decoded); - return -1; + rc = -1; + goto cleanup; } struct lantern_storage_state_meta meta; - int meta_rc = read_state_meta(data_dir, &meta); + const int meta_rc = read_state_meta(data_dir, &meta); if (meta_rc == 0) { decoded.historical_roots_offset = meta.historical_roots_offset; decoded.justified_slots_offset = meta.justified_slots_offset; @@ -625,21 +687,44 @@ int lantern_storage_load_state(const char *data_dir, LanternState *state) { decoded.justified_slots_offset = decoded.latest_finalized.slot == UINT64_MAX ? 0u : (decoded.latest_finalized.slot + 1u); } else { - lantern_state_reset(&decoded); - return -1; + rc = -1; + goto cleanup; } + lantern_state_reset(state); *state = decoded; - return 0; + decoded_owned = false; + rc = 0; + +cleanup: + free_path(state_path); + free(data); + if (decoded_owned) { + lantern_state_reset(&decoded); + } + return rc; } +/** + * Persist all present validator votes to `data_dir/votes.bin`. + * + * @param data_dir Base directory path. + * @param state State containing validator votes. + * @return 0 on success. + * @return -1 on invalid parameters, encoding failure, or filesystem errors. + */ int lantern_storage_save_votes(const char *data_dir, const LanternState *state) { if (!data_dir || !state || state->config.num_validators == 0) { return -1; } - size_t capacity = lantern_state_validator_capacity(state); + + int rc = -1; + uint8_t *buffer = NULL; + char *votes_path = NULL; + + const size_t capacity = lantern_state_validator_capacity(state); if (capacity == 0) { - return -1; + goto cleanup; } size_t present = 0; for (size_t i = 0; i < capacity; ++i) { @@ -647,18 +732,17 @@ int lantern_storage_save_votes(const char *data_dir, const LanternState *state) present++; } } - struct lantern_storage_votes_header header; - memset(&header, 0, sizeof(header)); + struct lantern_storage_votes_header header = {0}; memcpy(header.magic, LANTERN_STORAGE_VOTES_MAGIC, sizeof(header.magic)); header.version = LANTERN_STORAGE_VOTES_VERSION; header.validator_count = capacity; header.record_count = present; - size_t payload_size = present * (sizeof(uint64_t) + LANTERN_SIGNED_VOTE_SSZ_SIZE); - size_t total_size = sizeof(header) + payload_size; - uint8_t *buffer = malloc(total_size); + const size_t payload_size = present * (sizeof(uint64_t) + LANTERN_SIGNED_VOTE_SSZ_SIZE); + const size_t total_size = sizeof(header) + payload_size; + buffer = malloc(total_size); if (!buffer) { - return -1; + goto cleanup; } uint8_t *cursor = buffer; memcpy(cursor, &header, sizeof(header)); @@ -668,17 +752,15 @@ int lantern_storage_save_votes(const char *data_dir, const LanternState *state) if (!lantern_state_validator_has_vote(state, i)) { continue; } - uint64_t validator_index = (uint64_t)i; - uint8_t index_bytes[sizeof(uint64_t)]; - for (size_t b = 0; b < sizeof(uint64_t); ++b) { - index_bytes[b] = (uint8_t)((validator_index >> (8u * b)) & 0xFFu); + const uint64_t validator_index = (uint64_t)i; + for (size_t b = 0; b < sizeof(validator_index); ++b) { + cursor[b] = (uint8_t)((validator_index >> (8u * b)) & 0xFFu); } - memcpy(cursor, index_bytes, sizeof(index_bytes)); - cursor += sizeof(index_bytes); + cursor += sizeof(validator_index); + LanternSignedVote signed_vote; if (lantern_state_get_signed_validator_vote(state, i, &signed_vote) != 0) { - free(buffer); - return -1; + goto cleanup; } size_t vote_written = 0; if (lantern_ssz_encode_signed_vote( @@ -688,47 +770,57 @@ int lantern_storage_save_votes(const char *data_dir, const LanternState *state) &vote_written) != 0 || vote_written != LANTERN_SIGNED_VOTE_SSZ_SIZE) { - free(buffer); - return -1; + goto cleanup; } cursor += LANTERN_SIGNED_VOTE_SSZ_SIZE; } - char *votes_path = NULL; if (join_path(data_dir, LANTERN_STORAGE_VOTES_FILE, &votes_path) != 0) { - free(buffer); - return -1; + goto cleanup; } - int rc = write_atomic_file(votes_path, buffer, total_size); - free(votes_path); + rc = write_atomic_file(votes_path, buffer, total_size); + +cleanup: + free_path(votes_path); free(buffer); return rc; } +/** + * Load persisted validator votes from `data_dir/votes.bin` into `state`. + * + * @param data_dir Base directory path. + * @param state State to populate with loaded votes. + * @return 0 on success. + * @return 1 if the votes file is missing or empty. + * @return -1 on invalid parameters or decode/validation errors. + */ int lantern_storage_load_votes(const char *data_dir, LanternState *state) { if (!data_dir || !state) { return -1; } + + int rc = -1; char *votes_path = NULL; - if (join_path(data_dir, LANTERN_STORAGE_VOTES_FILE, &votes_path) != 0) { - return -1; - } uint8_t *data = NULL; size_t data_len = 0; - int read_rc = read_file_buffer(votes_path, &data, &data_len); - free(votes_path); - if (read_rc != 0) { - return read_rc; + + if (join_path(data_dir, LANTERN_STORAGE_VOTES_FILE, &votes_path) != 0) { + goto cleanup; + } + rc = read_file_buffer(votes_path, &data, &data_len); + if (rc != 0) { + goto cleanup; } if (data_len < sizeof(struct lantern_storage_votes_header)) { - free(data); - return -1; + rc = -1; + goto cleanup; } struct lantern_storage_votes_header header; memcpy(&header, data, sizeof(header)); if (memcmp(header.magic, LANTERN_STORAGE_VOTES_MAGIC, sizeof(header.magic)) != 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } bool has_signatures = false; size_t signed_vote_size = 0; @@ -741,25 +833,25 @@ int lantern_storage_load_votes(const char *data_dir, LanternState *state) { has_signatures = true; signed_vote_size = LANTERN_SIGNED_VOTE_SSZ_SIZE; } else { - free(data); - return -1; + rc = -1; + goto cleanup; } if (header.validator_count == 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } if (state->config.num_validators == 0) { state->config.num_validators = header.validator_count; } if (state->config.num_validators != header.validator_count) { - free(data); - return -1; + rc = -1; + goto cleanup; } if (lantern_state_prepare_validator_votes(state, state->config.num_validators) != 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } - size_t capacity = lantern_state_validator_capacity(state); + const size_t capacity = lantern_state_validator_capacity(state); for (size_t i = 0; i < capacity; ++i) { lantern_state_clear_validator_vote(state, i); } @@ -770,92 +862,110 @@ int lantern_storage_load_votes(const char *data_dir, LanternState *state) { const size_t encoded_vote_size = has_signatures ? signed_vote_size : LANTERN_VOTE_SSZ_SIZE; while (records_read < header.record_count) { if (remaining < sizeof(uint64_t) + encoded_vote_size) { - free(data); - return -1; + rc = -1; + goto cleanup; } uint64_t validator_index = 0; - for (size_t b = 0; b < sizeof(uint64_t); ++b) { + for (size_t b = 0; b < sizeof(validator_index); ++b) { validator_index |= ((uint64_t)cursor[b]) << (8u * b); } - cursor += sizeof(uint64_t); - remaining -= sizeof(uint64_t); + cursor += sizeof(validator_index); + remaining -= sizeof(validator_index); if (validator_index >= state->validator_votes_len) { - free(data); - return -1; + rc = -1; + goto cleanup; } if (has_signatures) { LanternSignedVote signed_vote; memset(&signed_vote, 0, sizeof(signed_vote)); if (lantern_ssz_decode_signed_vote(&signed_vote, cursor, signed_vote_size) != 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } cursor += signed_vote_size; remaining -= signed_vote_size; if (lantern_state_set_signed_validator_vote(state, (size_t)validator_index, &signed_vote) != 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } } else { LanternVote vote; memset(&vote, 0, sizeof(vote)); if (lantern_ssz_decode_vote(&vote, cursor, LANTERN_VOTE_SSZ_SIZE) != 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } cursor += LANTERN_VOTE_SSZ_SIZE; remaining -= LANTERN_VOTE_SSZ_SIZE; if (lantern_state_set_validator_vote(state, (size_t)validator_index, &vote) != 0) { - free(data); - return -1; + rc = -1; + goto cleanup; } } records_read++; } + + rc = 0; + +cleanup: + free_path(votes_path); free(data); - return 0; + return rc; } +/** + * Store a signed block under `data_dir/blocks/.ssz`. + * + * The operation is idempotent: if the block already exists on disk, this + * function returns success without modifying it. + * + * @param data_dir Base directory path. + * @param block Block to persist. + * @return 0 on success. + * @return -1 on invalid parameters, encoding failure, or filesystem errors. + */ int lantern_storage_store_block(const char *data_dir, const LanternSignedBlock *block) { if (!data_dir || !block) { return -1; } + + int rc = -1; char *blocks_dir = NULL; + char *block_path = NULL; + uint8_t *buffer = NULL; + if (build_blocks_dir(data_dir, &blocks_dir) != 0) { - return -1; + goto cleanup; } if (ensure_directory(blocks_dir) != 0) { - free_path(blocks_dir); - return -1; + goto cleanup; } + LanternRoot root; if (lantern_hash_tree_root_block(&block->message.block, &root) != 0) { - free_path(blocks_dir); - return -1; + goto cleanup; } char root_hex[2u * LANTERN_ROOT_SIZE + 1u]; if (lantern_bytes_to_hex(root.bytes, LANTERN_ROOT_SIZE, root_hex, sizeof(root_hex), 0) != 0) { - free_path(blocks_dir); - return -1; + goto cleanup; } char filename[sizeof(root_hex) + 4]; - int written = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); + const int written = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); if (written < 0 || (size_t)written >= sizeof(filename)) { - free_path(blocks_dir); - return -1; + goto cleanup; } - char *block_path = NULL; + if (join_path(blocks_dir, filename, &block_path) != 0) { - free_path(blocks_dir); - return -1; + goto cleanup; } - free_path(blocks_dir); + struct stat st; if (stat(block_path, &st) == 0) { - free_path(block_path); - return 0; + rc = 0; + goto cleanup; } - size_t encoded_size = signed_block_encoded_size(block); + + const size_t encoded_size = signed_block_encoded_size(block); if (encoded_size == 0) { lantern_log_warn( "storage", @@ -865,16 +975,14 @@ int lantern_storage_store_block(const char *data_dir, const LanternSignedBlock * block->message.block.body.attestations.length, block->message.block.body.legacy_plain_attestation_layout ? "true" : "false", block->signatures.attestation_signatures.length); - free_path(block_path); - return -1; + goto cleanup; } - uint8_t *buffer = malloc(encoded_size); + buffer = malloc(encoded_size); if (!buffer) { - free_path(block_path); - return -1; + goto cleanup; } size_t written_size = 0; - int encode_rc = lantern_ssz_encode_signed_block(block, buffer, encoded_size, &written_size); + const int encode_rc = lantern_ssz_encode_signed_block(block, buffer, encoded_size, &written_size); if (encode_rc != 0 || written_size == 0 || written_size > encoded_size) { @@ -886,16 +994,27 @@ int lantern_storage_store_block(const char *data_dir, const LanternSignedBlock * encode_rc, encoded_size, written_size); - free(buffer); - free_path(block_path); - return -1; + goto cleanup; } - int rc = write_atomic_file(block_path, buffer, written_size); + + rc = write_atomic_file(block_path, buffer, written_size); + +cleanup: free(buffer); free_path(block_path); + free_path(blocks_dir); return rc; } +/** + * Persist `state` under `data_dir/states` using the given `root` as filename. + * + * @param data_dir Base directory path. + * @param root Root used to build the on-disk filename. + * @param state State to persist. + * @return 0 on success. + * @return -1 on invalid parameters, encoding failure, or filesystem errors. + */ int lantern_storage_store_state_for_root( const char *data_dir, const LanternRoot *root, @@ -903,75 +1022,79 @@ int lantern_storage_store_state_for_root( if (!data_dir || !root || !state || state->config.num_validators == 0) { return -1; } - size_t encoded_size = state_encoded_size(state); + + int rc = -1; + uint8_t *buffer = NULL; + char *states_dir = NULL; + char *state_path = NULL; + char *meta_path = NULL; + + const size_t encoded_size = state_encoded_size(state); if (encoded_size == 0) { - return -1; + goto cleanup; } - uint8_t *buffer = malloc(encoded_size); + buffer = malloc(encoded_size); if (!buffer) { - return -1; + goto cleanup; } size_t written = 0; if (lantern_ssz_encode_state(state, buffer, encoded_size, &written) != 0 || written != encoded_size) { - free(buffer); - return -1; + goto cleanup; } - char *states_dir = NULL; if (build_states_dir(data_dir, &states_dir) != 0) { - free(buffer); - return -1; + goto cleanup; } if (ensure_directory(states_dir) != 0) { - free_path(states_dir); - free(buffer); - return -1; + goto cleanup; } char root_hex[2u * LANTERN_ROOT_SIZE + 1u]; if (lantern_bytes_to_hex(root->bytes, LANTERN_ROOT_SIZE, root_hex, sizeof(root_hex), 0) != 0) { - free_path(states_dir); - free(buffer); - return -1; + goto cleanup; } char filename[sizeof(root_hex) + 4]; - int name_written = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); + const int name_written = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); if (name_written < 0 || (size_t)name_written >= sizeof(filename)) { - free_path(states_dir); - free(buffer); - return -1; + goto cleanup; } - char *state_path = NULL; if (join_path(states_dir, filename, &state_path) != 0) { - free_path(states_dir); - free(buffer); - return -1; + goto cleanup; } - int rc = write_atomic_file(state_path, buffer, written); - free(buffer); + rc = write_atomic_file(state_path, buffer, written); if (rc != 0) { - free_path(state_path); - free_path(states_dir); - return rc; + goto cleanup; } + char meta_name[sizeof(root_hex) + 6]; - int meta_written = snprintf(meta_name, sizeof(meta_name), "%s.meta", root_hex); + const int meta_written = snprintf(meta_name, sizeof(meta_name), "%s.meta", root_hex); if (meta_written < 0 || (size_t)meta_written >= sizeof(meta_name)) { - free_path(state_path); - free_path(states_dir); - return -1; + rc = -1; + goto cleanup; } - char *meta_path = NULL; if (join_path(states_dir, meta_name, &meta_path) != 0) { - free_path(state_path); - free_path(states_dir); - return -1; + rc = -1; + goto cleanup; } - int meta_rc = write_state_meta_path(meta_path, state); + rc = write_state_meta_path(meta_path, state); + +cleanup: free_path(meta_path); free_path(state_path); free_path(states_dir); - return meta_rc; + free(buffer); + return rc; } +/** + * Load the raw persisted SSZ bytes for a state stored under `data_dir/states`. + * + * @param data_dir Base directory path. + * @param root Root used to build the on-disk filename. + * @param out_data Output buffer (caller must free) on success. + * @param out_len Output length on success. + * @return 0 on success. + * @return 1 if the state file is missing or empty. + * @return -1 on invalid parameters or filesystem errors. + */ int lantern_storage_load_state_bytes_for_root( const char *data_dir, const LanternRoot *root, @@ -983,44 +1106,62 @@ int lantern_storage_load_state_bytes_for_root( *out_data = NULL; *out_len = 0; + int rc = -1; char *states_dir = NULL; + char *state_path = NULL; + uint8_t *data = NULL; + size_t len = 0; + if (build_states_dir(data_dir, &states_dir) != 0) { - return -1; + goto cleanup; } char root_hex[2u * LANTERN_ROOT_SIZE + 1u]; if (lantern_bytes_to_hex(root->bytes, LANTERN_ROOT_SIZE, root_hex, sizeof(root_hex), 0) != 0) { - free_path(states_dir); - return -1; + goto cleanup; } char filename[sizeof(root_hex) + 4]; - int name_written = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); + const int name_written = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); if (name_written < 0 || (size_t)name_written >= sizeof(filename)) { - free_path(states_dir); - return -1; + goto cleanup; } - char *state_path = NULL; if (join_path(states_dir, filename, &state_path) != 0) { - free_path(states_dir); - return -1; + goto cleanup; } - free_path(states_dir); - uint8_t *data = NULL; - size_t len = 0; - int read_rc = read_file_buffer(state_path, &data, &len); - free_path(state_path); - if (read_rc != 0) { + rc = read_file_buffer(state_path, &data, &len); + if (rc != 0) { if (data) { free(data); + data = NULL; } - return read_rc > 0 ? 1 : -1; + rc = (rc > 0) ? 1 : -1; + goto cleanup; } + *out_data = data; *out_len = len; - return 0; + data = NULL; + rc = 0; + +cleanup: + free_path(state_path); + free_path(states_dir); + free(data); + return rc; } +/** + * Persist the mapping from a slot number to its block root on disk. + * + * Writes `root` into `data_dir/indices/slots/.root` using an + * atomic write so readers never see a partial file. + * + * @param data_dir Base storage directory. + * @param slot Slot number to index. + * @param root 32-byte block root for the slot. + * @return 0 on success, -1 on error. + */ int lantern_storage_store_slot_root( const char *data_dir, uint64_t slot, @@ -1028,31 +1169,46 @@ int lantern_storage_store_slot_root( if (!data_dir || !root) { return -1; } + + int rc = -1; char *slot_dir = NULL; + char *slot_path = NULL; + if (build_slot_index_dir(data_dir, &slot_dir) != 0) { - return -1; + goto cleanup; } if (ensure_directory(slot_dir) != 0) { - free_path(slot_dir); - return -1; + goto cleanup; } + char filename[64]; - int written = snprintf(filename, sizeof(filename), "%" PRIu64 ".root", slot); + const int written = snprintf(filename, sizeof(filename), "%" PRIu64 ".root", slot); if (written < 0 || (size_t)written >= sizeof(filename)) { - free_path(slot_dir); - return -1; + goto cleanup; } - char *slot_path = NULL; if (join_path(slot_dir, filename, &slot_path) != 0) { - free_path(slot_dir); - return -1; + goto cleanup; } - free_path(slot_dir); - int rc = write_atomic_file(slot_path, root->bytes, LANTERN_ROOT_SIZE); + + rc = write_atomic_file(slot_path, root->bytes, LANTERN_ROOT_SIZE); + +cleanup: free_path(slot_path); + free_path(slot_dir); return rc; } +/** + * Persist the current head slot and root so the node can resume after restart. + * + * Writes a compact `{slot, root}` record into `data_dir/indices/head.bin` + * using an atomic write. + * + * @param data_dir Base storage directory. + * @param slot Head slot number. + * @param root 32-byte head block root. + * @return 0 on success, -1 on error. + */ int lantern_storage_store_head_root( const char *data_dir, uint64_t slot, @@ -1060,29 +1216,44 @@ int lantern_storage_store_head_root( if (!data_dir || !root) { return -1; } + + int rc = -1; char *indices_dir = NULL; + char *head_path = NULL; + if (build_indices_dir(data_dir, &indices_dir) != 0) { - return -1; + goto cleanup; } if (ensure_directory(indices_dir) != 0) { - free_path(indices_dir); - return -1; + goto cleanup; } - char *head_path = NULL; if (join_path(indices_dir, LANTERN_STORAGE_HEAD_FILE, &head_path) != 0) { - free_path(indices_dir); - return -1; + goto cleanup; } - free_path(indices_dir); - struct lantern_storage_head_record record = { + + const struct lantern_storage_head_record record = { .slot = slot, .root = *root, }; - int rc = write_atomic_file(head_path, (const uint8_t *)&record, sizeof(record)); + rc = write_atomic_file(head_path, (const uint8_t *)&record, sizeof(record)); + +cleanup: free_path(head_path); + free_path(indices_dir); return rc; } +/** + * Persist justified and finalized checkpoint data to disk. + * + * Writes both checkpoints as a single record into + * `data_dir/indices/checkpoints.bin` using an atomic write. + * + * @param data_dir Base storage directory. + * @param justified Justified checkpoint to store. + * @param finalized Finalized checkpoint to store. + * @return 0 on success, -1 on error. + */ int lantern_storage_store_checkpoints( const char *data_dir, const LanternCheckpoint *justified, @@ -1090,32 +1261,49 @@ int lantern_storage_store_checkpoints( if (!data_dir || !justified || !finalized) { return -1; } + + int rc = -1; char *indices_dir = NULL; + char *checkpoint_path = NULL; + if (build_indices_dir(data_dir, &indices_dir) != 0) { - return -1; + goto cleanup; } if (ensure_directory(indices_dir) != 0) { - free_path(indices_dir); - return -1; + goto cleanup; } - char *checkpoint_path = NULL; if (join_path(indices_dir, LANTERN_STORAGE_CHECKPOINTS_FILE, &checkpoint_path) != 0) { - free_path(indices_dir); - return -1; + goto cleanup; } - free_path(indices_dir); - struct lantern_storage_checkpoint_record record = { + + const struct lantern_storage_checkpoint_record record = { .justified = *justified, .finalized = *finalized, }; - int rc = write_atomic_file( + rc = write_atomic_file( checkpoint_path, (const uint8_t *)&record, sizeof(record)); + +cleanup: free_path(checkpoint_path); + free_path(indices_dir); return rc; } +/** + * Collect signed blocks from disk that match the given set of roots. + * + * For each root in `roots`, looks up `data_dir/blocks/.ssz`, decodes + * the block, verifies its hash-tree-root matches the requested root, and + * appends it to `out_blocks`. Missing blocks are silently skipped. + * + * @param data_dir Base storage directory. + * @param roots Array of block roots to look up. + * @param root_count Number of entries in `roots`. + * @param out_blocks Output collection (resized to 0 on entry, filled on success). + * @return 0 on success, -1 on error. + */ int lantern_storage_collect_blocks( const char *data_dir, const LanternRoot *roots, @@ -1127,27 +1315,28 @@ int lantern_storage_collect_blocks( if (lantern_blocks_by_root_response_resize(out_blocks, 0) != 0) { return -1; } + + int rc = -1; char *blocks_dir = NULL; + if (build_blocks_dir(data_dir, &blocks_dir) != 0) { - return -1; + goto cleanup; } - struct lantern_log_metadata meta = {0}; + + const struct lantern_log_metadata meta = {0}; for (size_t i = 0; i < root_count; ++i) { char root_hex[2u * LANTERN_ROOT_SIZE + 1u]; if (lantern_bytes_to_hex(roots[i].bytes, LANTERN_ROOT_SIZE, root_hex, sizeof(root_hex), 0) != 0) { - free_path(blocks_dir); - return -1; + goto cleanup; } char filename[sizeof(root_hex) + 4]; - int wrote = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); + const int wrote = snprintf(filename, sizeof(filename), "%s.ssz", root_hex); if (wrote < 0 || (size_t)wrote >= sizeof(filename)) { - free_path(blocks_dir); - return -1; + goto cleanup; } char *block_path = NULL; if (join_path(blocks_dir, filename, &block_path) != 0) { - free_path(blocks_dir); - return -1; + goto cleanup; } lantern_log_trace( "storage", @@ -1155,9 +1344,10 @@ int lantern_storage_collect_blocks( "collect_blocks search root=%s path=%s", root_hex, block_path ? block_path : "null"); + uint8_t *data = NULL; size_t data_len = 0; - int read_rc = read_file_buffer(block_path, &data, &data_len); + const int read_rc = read_file_buffer(block_path, &data, &data_len); free_path(block_path); if (read_rc != 0) { lantern_log_debug( @@ -1168,28 +1358,25 @@ int lantern_storage_collect_blocks( read_rc); continue; } - size_t current = out_blocks->length; + + const size_t current = out_blocks->length; if (lantern_blocks_by_root_response_resize(out_blocks, current + 1) != 0) { free(data); - free_path(blocks_dir); - return -1; + goto cleanup; } LanternSignedBlock *dest = &out_blocks->blocks[current]; if (lantern_ssz_decode_signed_block(dest, data, data_len) != 0) { free(data); - free_path(blocks_dir); - return -1; + goto cleanup; } LanternRoot computed; if (lantern_hash_tree_root_block(&dest->message.block, &computed) != 0) { free(data); - free_path(blocks_dir); - return -1; + goto cleanup; } if (memcmp(computed.bytes, roots[i].bytes, LANTERN_ROOT_SIZE) != 0) { free(data); - free_path(blocks_dir); - return -1; + goto cleanup; } lantern_log_trace( "storage", @@ -1200,10 +1387,29 @@ int lantern_storage_collect_blocks( dest->message.block.body.attestations.length); free(data); } + rc = 0; + +cleanup: free_path(blocks_dir); - return 0; + return rc; } +/** + * Iterate over every persisted block in the blocks directory. + * + * Opens `data_dir/blocks/`, reads each `.ssz` file, decodes the signed + * block, computes its hash-tree-root, and calls `visitor` with the block, + * root, and caller-supplied `context`. Iteration stops early if the + * visitor returns non-zero (its return value is propagated). + * + * @param data_dir Base storage directory. + * @param visitor Callback invoked for each block. + * @param context Opaque pointer forwarded to the visitor. + * @return 0 on success (all blocks visited). + * @return 1 if the blocks directory does not exist. + * @return -1 on I/O or decoding errors; positive visitor return values + * are forwarded as-is. + */ int lantern_storage_iterate_blocks( const char *data_dir, lantern_storage_block_visitor_fn visitor, @@ -1211,22 +1417,27 @@ int lantern_storage_iterate_blocks( if (!data_dir || !visitor) { return -1; } + + int rc = -1; char *blocks_dir = NULL; + DIR *dir = NULL; + if (build_blocks_dir(data_dir, &blocks_dir) != 0) { - return -1; + goto cleanup; } - DIR *dir = opendir(blocks_dir); + dir = opendir(blocks_dir); if (!dir) { - free_path(blocks_dir); - return (errno == ENOENT) ? 1 : -1; + rc = (errno == ENOENT) ? 1 : -1; + goto cleanup; } + + rc = 0; struct dirent *entry = NULL; - int rc = 0; while ((entry = readdir(dir)) != NULL) { if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) { continue; } - size_t len = strlen(entry->d_name); + const size_t len = strlen(entry->d_name); if (len < 5 || strcmp(entry->d_name + len - 4, ".ssz") != 0) { continue; } @@ -1237,7 +1448,7 @@ int lantern_storage_iterate_blocks( } uint8_t *data = NULL; size_t data_len = 0; - int read_rc = read_file_buffer(block_path, &data, &data_len); + const int read_rc = read_file_buffer(block_path, &data, &data_len); free_path(block_path); if (read_rc != 0) { if (read_rc == 1) { @@ -1261,7 +1472,7 @@ int lantern_storage_iterate_blocks( rc = -1; break; } - int visit_rc = visitor(&block, &root, context); + const int visit_rc = visitor(&block, &root, context); lantern_signed_block_with_attestation_reset(&block); free(data); if (visit_rc != 0) { @@ -1269,7 +1480,11 @@ int lantern_storage_iterate_blocks( break; } } - closedir(dir); + +cleanup: + if (dir) { + closedir(dir); + } free_path(blocks_dir); return rc; } diff --git a/tests/unit/test_client_gossip.c b/tests/unit/test_client_gossip.c new file mode 100644 index 0000000..3f4bdd8 --- /dev/null +++ b/tests/unit/test_client_gossip.c @@ -0,0 +1,50 @@ +#include +#include + +#include "lantern/core/client.h" + +static int test_idle_gossip_not_ignored(void) +{ + struct lantern_client client; + memset(&client, 0, sizeof(client)); + client.node_id = "test_idle_gossip"; + client.sync_state = LANTERN_SYNC_STATE_IDLE; + + LanternSignedBlock block; + memset(&block, 0, sizeof(block)); + lantern_block_body_init(&block.message.body); + block.message.slot = 1; + + int block_rc = lantern_client_debug_gossip_block(&client, &block); + lantern_block_body_reset(&block.message.body); + if (block_rc != LANTERN_CLIENT_OK) + { + fprintf(stderr, "idle block gossip was not accepted rc=%d\n", block_rc); + return 1; + } + + LanternSignedVote vote; + memset(&vote, 0, sizeof(vote)); + vote.data.validator_id = 0; + vote.data.slot = 1; + + int vote_rc = lantern_client_debug_gossip_vote(&client, &vote); + if (vote_rc != LANTERN_CLIENT_OK) + { + fprintf(stderr, "idle vote gossip was not accepted rc=%d\n", vote_rc); + return 1; + } + + return 0; +} + +int main(void) +{ + if (test_idle_gossip_not_ignored() != 0) + { + return 1; + } + + puts("lantern_client_gossip_test OK"); + return 0; +} diff --git a/tests/unit/test_client_pending.c b/tests/unit/test_client_pending.c index d90c019..70831ad 100644 --- a/tests/unit/test_client_pending.c +++ b/tests/unit/test_client_pending.c @@ -249,6 +249,119 @@ static int test_pending_block_queue(void) { return rc; } +static int test_pending_block_queue_sync_drops_incoming(void) { + struct lantern_client client; + memset(&client, 0, sizeof(client)); + client.node_id = "test_pending_sync_queue"; + client.sync_state = LANTERN_SYNC_STATE_SYNCING; + client.sync_in_progress = true; + client.debug_disable_block_requests = true; + + if (pthread_mutex_init(&client.pending_lock, NULL) != 0) { + fprintf(stderr, "failed to initialize pending mutex\n"); + return 1; + } + client.pending_lock_initialized = true; + lantern_client_debug_pending_reset(&client); + + LanternRoot oldest_root; + LanternRoot latest_root; + LanternRoot extra_root; + LanternRoot parent_root; + client_test_fill_root_with_index(&oldest_root, 0); + client_test_fill_root_with_index(&latest_root, 0); + + int rc = 0; + for (size_t i = 0; i < LANTERN_PENDING_BLOCK_LIMIT; ++i) { + LanternSignedBlock block; + memset(&block, 0, sizeof(block)); + lantern_block_body_init(&block.message.body); + block.message.slot = 100 + i; + + LanternRoot block_root; + client_test_fill_root_with_index(&block_root, 10000u + (uint32_t)i); + client_test_fill_root_with_index(&parent_root, 20000u + (uint32_t)i); + if (i == 0) { + oldest_root = block_root; + } + if (i + 1u == LANTERN_PENDING_BLOCK_LIMIT) { + latest_root = block_root; + } + + if (lantern_client_debug_enqueue_pending_block( + &client, + &block, + &block_root, + &parent_root, + NULL) + != 0) { + fprintf(stderr, "failed to enqueue pending block %zu\n", i); + lantern_block_body_reset(&block.message.body); + rc = 1; + goto cleanup; + } + + lantern_block_body_reset(&block.message.body); + } + + if (lantern_client_pending_block_count(&client) != LANTERN_PENDING_BLOCK_LIMIT) { + fprintf(stderr, "pending queue count mismatch after fill in sync mode\n"); + rc = 1; + goto cleanup; + } + + LanternSignedBlock extra_block; + memset(&extra_block, 0, sizeof(extra_block)); + lantern_block_body_init(&extra_block.message.body); + extra_block.message.slot = 999999; + client_test_fill_root_with_index(&extra_root, 900000u); + client_test_fill_root_with_index(&parent_root, 910000u); + + if (lantern_client_debug_enqueue_pending_block( + &client, + &extra_block, + &extra_root, + &parent_root, + NULL) + != 0) { + fprintf(stderr, "failed to enqueue overflow pending block in sync mode\n"); + lantern_block_body_reset(&extra_block.message.body); + rc = 1; + goto cleanup; + } + lantern_block_body_reset(&extra_block.message.body); + + if (lantern_client_pending_block_count(&client) != LANTERN_PENDING_BLOCK_LIMIT) { + fprintf(stderr, "pending queue count changed after overflow enqueue in sync mode\n"); + rc = 1; + goto cleanup; + } + + if (!client_test_pending_contains_root(&client, &oldest_root)) { + fprintf(stderr, "oldest pending block was unexpectedly evicted in sync mode\n"); + rc = 1; + goto cleanup; + } + if (!client_test_pending_contains_root(&client, &latest_root)) { + fprintf(stderr, "latest accepted pending block missing in sync mode\n"); + rc = 1; + goto cleanup; + } + if (client_test_pending_contains_root(&client, &extra_root)) { + fprintf(stderr, "overflow pending block should have been dropped in sync mode\n"); + rc = 1; + goto cleanup; + } + +cleanup: + lantern_client_debug_pending_reset(&client); + if (client.pending_lock_initialized) { + pthread_mutex_destroy(&client.pending_lock); + client.pending_lock_initialized = false; + } + return rc; +} + static int test_import_block_parent_mismatch(void) { struct lantern_client client; memset(&client, 0, sizeof(client)); @@ -462,6 +575,9 @@ int main(void) { if (test_pending_block_queue() != 0) { return 1; } + if (test_pending_block_queue_sync_drops_incoming() != 0) { + return 1; + } if (test_import_block_parent_mismatch() != 0) { return 1; } diff --git a/tests/unit/test_genesis_anchor.c b/tests/unit/test_genesis_anchor.c new file mode 100644 index 0000000..4c05ec1 --- /dev/null +++ b/tests/unit/test_genesis_anchor.c @@ -0,0 +1,119 @@ +#include +#include +#include + +#include "lantern/consensus/hash.h" +#include "lantern/consensus/state.h" +#include "lantern/core/client.h" + +#include "../../src/core/client_sync_internal.h" + +static void fill_pubkeys(uint8_t *pubkeys, size_t count) +{ + if (!pubkeys) + { + return; + } + for (size_t i = 0; i < count; ++i) + { + for (size_t j = 0; j < LANTERN_VALIDATOR_PUBKEY_SIZE; ++j) + { + pubkeys[(i * LANTERN_VALIDATOR_PUBKEY_SIZE) + j] = (uint8_t)(((i + 1u) * 31u) + j); + } + } +} + +static int roots_equal(const LanternRoot *left, const LanternRoot *right) +{ + if (!left || !right) + { + return 0; + } + return memcmp(left->bytes, right->bytes, LANTERN_ROOT_SIZE) == 0; +} + +int main(void) +{ + struct lantern_client client; + memset(&client, 0, sizeof(client)); + client.node_id = "genesis_anchor_regression"; + client.has_state = true; + lantern_state_init(&client.state); + + if (lantern_state_generate_genesis(&client.state, UINT64_C(1761717362), 3u) != 0) + { + fprintf(stderr, "failed to generate genesis state\n"); + return 1; + } + + uint8_t pubkeys[3u * LANTERN_VALIDATOR_PUBKEY_SIZE]; + fill_pubkeys(pubkeys, 3u); + if (lantern_state_set_validator_pubkeys(&client.state, pubkeys, 3u) != 0) + { + fprintf(stderr, "failed to set validator pubkeys\n"); + lantern_state_reset(&client.state); + return 1; + } + + LanternRoot canonical_state_root; + if (lantern_hash_tree_root_state(&client.state, &canonical_state_root) != 0) + { + fprintf(stderr, "failed to hash canonical genesis state\n"); + lantern_state_reset(&client.state); + return 1; + } + + LanternBlockHeader expected_anchor_header = client.state.latest_block_header; + expected_anchor_header.state_root = canonical_state_root; + + LanternRoot expected_anchor_root; + if (lantern_hash_tree_root_block_header(&expected_anchor_header, &expected_anchor_root) != 0) + { + fprintf(stderr, "failed to hash expected anchor header\n"); + lantern_state_reset(&client.state); + return 1; + } + + /* + * Simulate previously persisted bootstrap snapshots where genesis header + * state_root was eagerly populated before restart. + */ + client.state.latest_block_header.state_root = canonical_state_root; + + if (initialize_fork_choice(&client) != LANTERN_CLIENT_OK) + { + fprintf(stderr, "initialize_fork_choice failed\n"); + lantern_state_reset(&client.state); + lantern_fork_choice_reset(&client.fork_choice); + return 1; + } + + LanternRoot actual_head; + if (lantern_fork_choice_current_head(&client.fork_choice, &actual_head) != 0) + { + fprintf(stderr, "failed to read fork choice head\n"); + lantern_state_reset(&client.state); + lantern_fork_choice_reset(&client.fork_choice); + return 1; + } + + if (!roots_equal(&actual_head, &expected_anchor_root)) + { + fprintf(stderr, "fork choice anchor mismatch for persisted genesis snapshot\n"); + lantern_state_reset(&client.state); + lantern_fork_choice_reset(&client.fork_choice); + return 1; + } + + if (!roots_equal(&client.state.latest_block_header.state_root, &canonical_state_root)) + { + fprintf(stderr, "initialize_fork_choice unexpectedly rewrote genesis header state_root\n"); + lantern_state_reset(&client.state); + lantern_fork_choice_reset(&client.fork_choice); + return 1; + } + + lantern_state_reset(&client.state); + lantern_fork_choice_reset(&client.fork_choice); + return 0; +} diff --git a/tests/unit/test_networking_messages.c b/tests/unit/test_networking_messages.c index b898f32..b66818a 100644 --- a/tests/unit/test_networking_messages.c +++ b/tests/unit/test_networking_messages.c @@ -1622,6 +1622,11 @@ static void test_blocks_by_root_request(void) { check_zero(lantern_network_blocks_by_root_request_encode(&req, encoded, sizeof(encoded), &written), "request encode"); size_t expected_written = req.roots.length * LANTERN_ROOT_SIZE; CHECK(written == expected_written); + CHECK(memcmp( + encoded, + req.roots.items, + req.roots.length * LANTERN_ROOT_SIZE) + == 0); LanternBlocksByRootRequest decoded; lantern_blocks_by_root_request_init(&decoded); @@ -1629,6 +1634,28 @@ static void test_blocks_by_root_request(void) { CHECK(decoded.roots.length == req.roots.length); CHECK(memcmp(decoded.roots.items[1].bytes, req.roots.items[1].bytes, LANTERN_ROOT_SIZE) == 0); + /* Legacy compatibility: decode old container form [offset=4][roots...]. */ + uint8_t legacy_encoded[132]; + legacy_encoded[0] = 4u; + legacy_encoded[1] = 0u; + legacy_encoded[2] = 0u; + legacy_encoded[3] = 0u; + memcpy( + legacy_encoded + sizeof(uint32_t), + encoded, + expected_written); + + LanternBlocksByRootRequest legacy_decoded; + lantern_blocks_by_root_request_init(&legacy_decoded); + check_zero( + lantern_network_blocks_by_root_request_decode( + &legacy_decoded, + legacy_encoded, + sizeof(uint32_t) + expected_written), + "request decode legacy container"); + CHECK(legacy_decoded.roots.length == req.roots.length); + CHECK(memcmp(legacy_decoded.roots.items[0].bytes, req.roots.items[0].bytes, LANTERN_ROOT_SIZE) == 0); + uint8_t compressed[256]; size_t compressed_len = 0; size_t request_raw_len = 0; @@ -1650,6 +1677,7 @@ static void test_blocks_by_root_request(void) { lantern_blocks_by_root_request_reset(&req); lantern_blocks_by_root_request_reset(&decoded); + lantern_blocks_by_root_request_reset(&legacy_decoded); lantern_blocks_by_root_request_reset(&snappy_decoded); }