Skip to content

chore: improve subscription reconciler#945

Open
thlorenz wants to merge 28 commits intothlorenz/websocket-pool-conectionsfrom
thlorenz/pool+better-reconciler
Open

chore: improve subscription reconciler#945
thlorenz wants to merge 28 commits intothlorenz/websocket-pool-conectionsfrom
thlorenz/pool+better-reconciler

Conversation

@thlorenz
Copy link
Collaborator

@thlorenz thlorenz commented Feb 12, 2026

Summary

Improve subscription reconciliation across multiple pubsub clients.

CLOSES: #941
CLOSES: #910
CLOSES: #903

Details

subscription_reconciler

  • Enhanced to consider both union and intersection of subscriptions across multiple clients
  • Now returns the total subscription count (LRU + never evicted)
  • Improved reconciliation logic to handle cases where not all clients are subscribed to all accounts
  • Logs subscription state mismatches with trace-then-warn pattern for operational visibility

PubsubConnection Trait & Implementations

  • Added subscriptions_union() and subscriptions_intersection() methods to trait
  • Implemented in all clients (laser, pubsub, mock)
  • Added pubsub_connection module with generic PubsubConnection trait for pool abstraction

submux/SubmuxClient Changes

  • Removed subscription_count()
  • Added subscriptions_union() - computes union across all inner clients
  • Added subscriptions_intersection() - efficient multi-set intersection using smallest-set-first iteration
  • Removes dependency on get_subscriptions() helper

Metrics

  • Added pubsub_client_connections_gauge to track number of pooled websocket connections per client

Summary by CodeRabbit

  • New Features

    • Shared, thread-safe subscription state accessible across components; union/intersection subscription queries.
  • Bug Fixes

    • Deterministic reconciliation: automatic resubscribe/unsubscribe, preserves never-evicted accounts, reduces unnecessary work.
  • API Changes

    • Replaced count/list/optional subscription APIs with set-based methods; client startup now exposes shared subscription handle; cache pubkeys now return sets.
  • Error Handling

    • Added an out-of-sync subscriptions error variant.
  • Tests

    • Extensive unit tests covering reconciliation and subscription scenarios.
  • Chores

    • Cleanup of CI workflows and build steps.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Refactors subscription management to use a shared, thread-safe set (pub type SharedSubscriptions = Arc<RwLock<HashSet>>), surfaces that handle from ChainLaserActor to clients, replaces per-client subscription_count/subscriptions APIs with set-based subscriptions_union/subscriptions_intersection, and implements a union/intersection-based reconcile_subscriptions that resubscribes missing LRU items and unsubscribes extra pubsub items. Multiple modules and public signatures were updated and unit tests for the reconciler were added or adapted.

Assessment against linked issues

Objective Addressed Explanation
Implement union and intersection and use them in reconciler (reconcile: union, intersection, unsubscribe union not in LRU, resub LRU not in intersection) [#941]
Replace per-client subscription_count/subscriptions APIs with set-based union/intersection (add methods in submux and propagate) [#941]
Reduce excessive/resubscribe-to-many behavior (correct reconciliation behavior observed in logs) [#910]
Ensure subscriptions added during resubscription/reconnect are handled (detect and resubscribe subs added during resubscribe window) [#903] Reconciler performs a single-pass reconcile and returns a subscribed count; no explicit post-resubscribe re-check or replay loop is present to capture concurrent additions.

Out-of-scope changes

Code Change Explanation
Added error variant AccountSubscriptionsOutOfSync(String) (magicblock-chainlink/src/remote_account_provider/errors.rs) New enum variant is not referenced in reconciliation changes and is unrelated to the union/intersection reconciliation objective.
Removed verbose diagnostics and many in-module tests in active-subscriptions updater (magicblock-chainlink/src/remote_account_provider/mod.rs) Deletion of extensive logging/tests is a behavioral/test-scope change not required by the reconciler objective; it reduces local diagnostics coverage rather than altering reconciliation logic.

Suggested reviewers

  • bmuddha
  • GabrielePicco
  • Dodecahedr0x
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch thlorenz/pool+better-reconciler

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
magicblock-chainlink/src/remote_account_provider/lru_cache.rs (1)

127-137: 🧹 Nitpick | 🔵 Trivial

pubkeys() and subscribed_accounts() have identical implementations.

Both methods lock the same subscribed_accounts mutex, iterate the LRU, and collect into HashSet<Pubkey>. Consider having one delegate to the other.

♻️ Suggested deduplication
     pub fn pubkeys(&self) -> HashSet<Pubkey> {
-        let subs = self.subscribed_accounts.lock();
-        subs.iter().map(|(k, _)| *k).collect()
+        self.subscribed_accounts()
     }
 }
 
 impl SubscribedAccountsTracker for AccountsLruCache {
     fn subscribed_accounts(&self) -> HashSet<Pubkey> {
         let subs = self.subscribed_accounts.lock();
         subs.iter().map(|(k, _)| *k).collect()
     }
 }
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs`:
- Around line 418-428: The two-step read-then-write in remove_sub creates a
benign TOCTOU and can return AccountSubscriptionDoesNotExist incorrectly;
simplify by acquiring a write lock on self.subscriptions immediately in
remove_sub and call remove(pubkey) directly (remove the initial read() and the
exists/if branch) so the removal and check are atomic under the write lock;
update logic that matches on removed to behave the same after this change.

In `@magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs`:
- Around line 47-55: Remove the leftover TODO/debug marker from the doc comment
for subscription_count and replace it with a clear description of what's being
counted; update the comment on the async fn subscription_count(&self, exclude:
Option<&[Pubkey]>) -> (usize, usize) to explain precisely that the first value
is the total number of subscriptions and the second is the number after
excluding any pubkeys provided in exclude, and delete the `TODO: @@@ what is it
recommended to count` line so the documentation is clean and production-ready.

In `@magicblock-chainlink/src/remote_account_provider/subscription_reconciler.rs`:
- Around line 156-158: Update the unclear inline comment in
subscription_reconciler.rs to the suggested clearer wording: replace the
sentence "Pubsubs should be subscribed to all accounts in LRU accounts no
tracked by it since they are never evicted" with something like "Pubsubs should
be subscribed to all accounts in LRU, plus accounts not tracked by LRU since
they are never evicted." Locate and edit the comment near the reconciliation
logic in the subscription reconciler (around the block describing that
reconciling worked and subscribed accounts are up to date) so the intent is
unambiguous.
- Around line 156-160: The current return value optimistically reports lru_count
+ never_evicted.len() even when resubscriptions logged as failures (the log
around the resubscribe failure at lines ~131-133), so update the function in
subscription_reconciler.rs to compute the actual subscribed count by tracking
resubscription failures and subtracting them (e.g., maintain a
failed_resub_count and return lru_count + never_evicted.len() -
failed_resub_count), or alternatively change the function signature/docs to
explicitly state it returns the target count (and adjust callers such as
set_monitored_accounts_count in mod.rs accordingly). Ensure you reference the
resubscribe call where failures are logged and update the return or docstring to
make the behavior correct and unambiguous.

In `@magicblock-chainlink/src/submux/mod.rs`:
- Around line 884-905: The call to .unwrap() in subscriptions_intersection is
asserted-safe due to the early return when sets.is_empty(), but per guidelines
replace .unwrap() with .expect() and include an explicit invariant
justification; update the code in subscriptions_intersection where you obtain
smallest (the result of sets.iter().min_by_key(|s| s.len())) to use
.expect("sets is non-empty: early return above ensures at least one set") (or
similar wording) so the safety assumption is documented inline.

@thlorenz thlorenz changed the base branch from thlorenz/websocket-pool-conections to master February 13, 2026 07:52
@github-actions
Copy link

github-actions bot commented Feb 13, 2026

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

@thlorenz thlorenz changed the base branch from master to thlorenz/websocket-pool-conections February 13, 2026 07:58
Copy link
Collaborator

@bmuddha bmuddha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, but I strongly feel that we should use those super expensive operations on collections sparingly (ideally make them redundant altogether). May be we can tackle it during the refactoring.

SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS / 400;
const MAX_SLOTS_BACKFILL: u64 = 400;

pub type SharedSubscriptions = Arc<RwLock<HashSet<Pubkey>>>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using scc::HashSet would eliminate the use reliance on locks.

// Copy subscriptions and release the read lock immediately
let new_pubkeys: HashSet<Pubkey> = {
let subs = self.subscriptions.read();
// Check if the active subscriptions match what we already have
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how this is going to be true in general, when each stream (ws or grpc) only handles a subset of global subscriptions

}

fn subscriptions(&self) -> Option<Vec<Pubkey>> {
/// Returns the subscriptions of a client or the union of subscriptions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: duplicating the same doc defined on trait is redundant.

fn subscriptions_union(&self) -> HashSet<Pubkey> {
let subs = self.subscribed_pubkeys.lock();
Some(subs.iter().copied().collect())
subs.iter().copied().collect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these expensive collection manipulation do not scale well. If we refactor this code, may be we should focus on designing it in such a way that makes reconciliation unnecessary, i.e. every pool will always have the entire global set of subscriptions present, either active or inflight (in the process of subscribing).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants