chore: improve subscription reconciler#945
chore: improve subscription reconciler#945thlorenz wants to merge 28 commits intothlorenz/websocket-pool-conectionsfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughRefactors subscription management to use a shared, thread-safe set (pub type SharedSubscriptions = Arc<RwLock<HashSet>>), surfaces that handle from ChainLaserActor to clients, replaces per-client subscription_count/subscriptions APIs with set-based subscriptions_union/subscriptions_intersection, and implements a union/intersection-based reconcile_subscriptions that resubscribes missing LRU items and unsubscribes extra pubsub items. Multiple modules and public signatures were updated and unit tests for the reconciler were added or adapted. Assessment against linked issues
Out-of-scope changes
Suggested reviewers
✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Amp-Thread-ID: https://ampcode.com/threads/T-019c5080-01ee-71fa-aa34-97264f3a0900 Co-authored-by: Amp <amp@ampcode.com>
…better-reconciler
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-chainlink/src/remote_account_provider/lru_cache.rs (1)
127-137: 🧹 Nitpick | 🔵 Trivial
pubkeys()andsubscribed_accounts()have identical implementations.Both methods lock the same
subscribed_accountsmutex, iterate the LRU, and collect intoHashSet<Pubkey>. Consider having one delegate to the other.♻️ Suggested deduplication
pub fn pubkeys(&self) -> HashSet<Pubkey> { - let subs = self.subscribed_accounts.lock(); - subs.iter().map(|(k, _)| *k).collect() + self.subscribed_accounts() } } impl SubscribedAccountsTracker for AccountsLruCache { fn subscribed_accounts(&self) -> HashSet<Pubkey> { let subs = self.subscribed_accounts.lock(); subs.iter().map(|(k, _)| *k).collect() } }
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs`:
- Around line 418-428: The two-step read-then-write in remove_sub creates a
benign TOCTOU and can return AccountSubscriptionDoesNotExist incorrectly;
simplify by acquiring a write lock on self.subscriptions immediately in
remove_sub and call remove(pubkey) directly (remove the initial read() and the
exists/if branch) so the removal and check are atomic under the write lock;
update logic that matches on removed to behave the same after this change.
In `@magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs`:
- Around line 47-55: Remove the leftover TODO/debug marker from the doc comment
for subscription_count and replace it with a clear description of what's being
counted; update the comment on the async fn subscription_count(&self, exclude:
Option<&[Pubkey]>) -> (usize, usize) to explain precisely that the first value
is the total number of subscriptions and the second is the number after
excluding any pubkeys provided in exclude, and delete the `TODO: @@@ what is it
recommended to count` line so the documentation is clean and production-ready.
In `@magicblock-chainlink/src/remote_account_provider/subscription_reconciler.rs`:
- Around line 156-158: Update the unclear inline comment in
subscription_reconciler.rs to the suggested clearer wording: replace the
sentence "Pubsubs should be subscribed to all accounts in LRU accounts no
tracked by it since they are never evicted" with something like "Pubsubs should
be subscribed to all accounts in LRU, plus accounts not tracked by LRU since
they are never evicted." Locate and edit the comment near the reconciliation
logic in the subscription reconciler (around the block describing that
reconciling worked and subscribed accounts are up to date) so the intent is
unambiguous.
- Around line 156-160: The current return value optimistically reports lru_count
+ never_evicted.len() even when resubscriptions logged as failures (the log
around the resubscribe failure at lines ~131-133), so update the function in
subscription_reconciler.rs to compute the actual subscribed count by tracking
resubscription failures and subtracting them (e.g., maintain a
failed_resub_count and return lru_count + never_evicted.len() -
failed_resub_count), or alternatively change the function signature/docs to
explicitly state it returns the target count (and adjust callers such as
set_monitored_accounts_count in mod.rs accordingly). Ensure you reference the
resubscribe call where failures are logged and update the return or docstring to
make the behavior correct and unambiguous.
In `@magicblock-chainlink/src/submux/mod.rs`:
- Around line 884-905: The call to .unwrap() in subscriptions_intersection is
asserted-safe due to the early return when sets.is_empty(), but per guidelines
replace .unwrap() with .expect() and include an explicit invariant
justification; update the code in subscriptions_intersection where you obtain
smallest (the result of sets.iter().min_by_key(|s| s.len())) to use
.expect("sets is non-empty: early return above ensures at least one set") (or
similar wording) so the safety assumption is documented inline.
magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs
Outdated
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
Outdated
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/subscription_reconciler.rs
Outdated
Show resolved
Hide resolved
magicblock-chainlink/src/remote_account_provider/subscription_reconciler.rs
Show resolved
Hide resolved
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
This reverts commit c41ddb4.
…better-reconciler * thlorenz/websocket-pool-conections:
bmuddha
left a comment
There was a problem hiding this comment.
LGTM overall, but I strongly feel that we should use those super expensive operations on collections sparingly (ideally make them redundant altogether). May be we can tackle it during the refactoring.
| SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS / 400; | ||
| const MAX_SLOTS_BACKFILL: u64 = 400; | ||
|
|
||
| pub type SharedSubscriptions = Arc<RwLock<HashSet<Pubkey>>>; |
There was a problem hiding this comment.
using scc::HashSet would eliminate the use reliance on locks.
| // Copy subscriptions and release the read lock immediately | ||
| let new_pubkeys: HashSet<Pubkey> = { | ||
| let subs = self.subscriptions.read(); | ||
| // Check if the active subscriptions match what we already have |
There was a problem hiding this comment.
not sure how this is going to be true in general, when each stream (ws or grpc) only handles a subset of global subscriptions
| } | ||
|
|
||
| fn subscriptions(&self) -> Option<Vec<Pubkey>> { | ||
| /// Returns the subscriptions of a client or the union of subscriptions |
There was a problem hiding this comment.
nit: duplicating the same doc defined on trait is redundant.
| fn subscriptions_union(&self) -> HashSet<Pubkey> { | ||
| let subs = self.subscribed_pubkeys.lock(); | ||
| Some(subs.iter().copied().collect()) | ||
| subs.iter().copied().collect() |
There was a problem hiding this comment.
these expensive collection manipulation do not scale well. If we refactor this code, may be we should focus on designing it in such a way that makes reconciliation unnecessary, i.e. every pool will always have the entire global set of subscriptions present, either active or inflight (in the process of subscribing).
Summary
Improve subscription reconciliation across multiple pubsub clients.
CLOSES: #941
CLOSES: #910
CLOSES: #903
Details
subscription_reconciler
PubsubConnection Trait & Implementations
subscriptions_union()andsubscriptions_intersection()methods to traitpubsub_connectionmodule with genericPubsubConnectiontrait for pool abstractionsubmux/SubmuxClient Changes
subscription_count()subscriptions_union()- computes union across all inner clientssubscriptions_intersection()- efficient multi-set intersection using smallest-set-first iterationget_subscriptions()helperMetrics
pubsub_client_connections_gaugeto track number of pooled websocket connections per clientSummary by CodeRabbit
New Features
Bug Fixes
API Changes
Error Handling
Tests
Chores