Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
9973fb2
chore: add persub limit option to pubsub config
thlorenz Feb 6, 2026
93cf550
feat: initial pool impl using pubsub client trait
thlorenz Feb 6, 2026
0c21aa8
chore: extract pubsub connection to separate module
thlorenz Feb 6, 2026
43d9b7a
test: add mock PubsubConnection and make pool generic
thlorenz Feb 6, 2026
bb2b31d
chore: account sub tests for pool
thlorenz Feb 7, 2026
f220952
test: add comprehensive tests for pubsub pool account and program sub…
thlorenz Feb 7, 2026
50a86bd
chore: fix overkill and incomplete error conversion
thlorenz Feb 7, 2026
4512806
chore: prevent multi connection creation race condition
thlorenz Feb 7, 2026
29158ee
chore: reconnect pubsub pool on recovery
thlorenz Feb 7, 2026
ec5073a
refactor: extract subscribe logic to helper method
thlorenz Feb 7, 2026
a37d798
chore: harden pubsub reconnect to ensure all existing subs are closed
thlorenz Feb 9, 2026
5c4a206
feat: add pubsub_client_connections_gauge metric
thlorenz Feb 9, 2026
5c98860
fix: fmt
thlorenz Feb 9, 2026
0179218
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 9, 2026
58089c8
tmp: dial down max subs per connection
thlorenz Feb 10, 2026
6bb8e0d
chore: fix unsub bug
thlorenz Feb 10, 2026
02f3dd8
chore: subscriptions fn returns hashset
thlorenz Feb 11, 2026
b8ba3e7
chore: subs union
thlorenz Feb 11, 2026
72ce021
chore: introducing union/intersection
thlorenz Feb 12, 2026
4dd13c0
chore: laser client has access to shared subscriptions
thlorenz Feb 12, 2026
4d823e7
chore: all clients return subs (instead option)
thlorenz Feb 12, 2026
06a4f67
chore: optimize set intersection method for submux
thlorenz Feb 12, 2026
c3d5329
feat: reconciler considers union vs. intersection of subscriptions
thlorenz Feb 12, 2026
230328f
chore: test reconciler
thlorenz Feb 12, 2026
5e1668a
chore: move previously existint reconciler tests to same module
thlorenz Feb 12, 2026
4047bdc
chore: fix bug in reconciler logic
thlorenz Feb 12, 2026
940db38
chore: no more reconciliation outside reconciler
thlorenz Feb 12, 2026
cc75e15
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 12, 2026
35e31a8
chore: fmt + lint
thlorenz Feb 12, 2026
5253d66
chore: remove subscription_count method
thlorenz Feb 12, 2026
bf4a759
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 12, 2026
41f2040
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 13, 2026
b8df74e
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz Feb 13, 2026
0e3810d
chore: fmt
thlorenz Feb 13, 2026
46f1a1c
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 13, 2026
cdee786
fix: lint
thlorenz Feb 13, 2026
2d280a5
fix: fmt
thlorenz Feb 13, 2026
7ec483a
chore: fix coderabbits
thlorenz Feb 13, 2026
d401b8c
chore: fix import
thlorenz Feb 13, 2026
d7bd7a9
chore: remove read/write lock with potential race condition
thlorenz Feb 13, 2026
db8480d
ci: attempt to fix protoc discovery
thlorenz Feb 13, 2026
c41ddb4
tmp: remove non-problemeatic workflows for quicker triaging
thlorenz Feb 13, 2026
2afd9a1
ci: bust cache
thlorenz Feb 13, 2026
486e44d
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 13, 2026
ec54854
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz Feb 13, 2026
e3b3f04
Revert "tmp: remove non-problemeatic workflows for quicker triaging"
thlorenz Feb 13, 2026
0aeb168
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 13, 2026
d5d5ad9
chore: address @bmuddha nits
thlorenz Feb 17, 2026
41dcf2e
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 17, 2026
bff0cd2
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz Feb 17, 2026
4c276e6
chore: fix duplicate doc
thlorenz Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 68 additions & 33 deletions magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::{
collections::{HashMap, HashSet},
fmt,
pin::Pin,
sync::atomic::{AtomicU16, AtomicU64, Ordering},
sync::{
atomic::{AtomicU16, AtomicU64, Ordering},
Arc,
},
time::Duration,
};

Expand All @@ -23,6 +26,7 @@ use magicblock_metrics::metrics::{
inc_per_program_account_updates_count,
inc_program_subscription_account_updates_count,
};
use parking_lot::RwLock;
use solana_account::Account;
use solana_commitment_config::CommitmentLevel as SolanaCommitmentLevel;
use solana_pubkey::Pubkey;
Expand Down Expand Up @@ -55,6 +59,8 @@ const SLOTS_BETWEEN_ACTIVATIONS: u64 =
SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS / 400;
const MAX_SLOTS_BACKFILL: u64 = 400;

pub type SharedSubscriptions = Arc<RwLock<HashSet<Pubkey>>>;
Comment thread
thlorenz marked this conversation as resolved.

// -----------------
// Slots
// -----------------
Expand Down Expand Up @@ -116,8 +122,10 @@ impl fmt::Display for AccountUpdateSource {
pub struct ChainLaserActor {
/// Configuration used to create the laser client
laser_client_config: LaserstreamConfig,
/// Requested subscriptions, some may not be active yet
subscriptions: HashSet<Pubkey>,
/// Requested subscriptions, some may not be active yet.
/// Shared with ChainLaserClientImpl for sync access to
/// subscription_count and subscriptions_union.
subscriptions: SharedSubscriptions,
/// Pubkeys of currently active subscriptions
active_subscription_pubkeys: HashSet<Pubkey>,
/// Subscriptions that have been activated via the helius provider
Expand Down Expand Up @@ -155,6 +163,7 @@ impl ChainLaserActor {
Self,
mpsc::Sender<ChainPubsubActorMessage>,
mpsc::Receiver<SubscriptionUpdate>,
SharedSubscriptions,
) {
let channel_options = ChannelOptions {
connect_timeout_secs: Some(5),
Expand Down Expand Up @@ -190,17 +199,21 @@ impl ChainLaserActor {
Self,
mpsc::Sender<ChainPubsubActorMessage>,
mpsc::Receiver<SubscriptionUpdate>,
SharedSubscriptions,
) {
let (subscription_updates_sender, subscription_updates_receiver) =
mpsc::channel(SUBSCRIPTION_UPDATE_CHANNEL_SIZE);
let (messages_sender, messages_receiver) =
mpsc::channel(MESSAGE_CHANNEL_SIZE);
let commitment = grpc_commitment_from_solana(commitment);

let subscriptions: SharedSubscriptions = Default::default();
let shared_subscriptions = Arc::clone(&subscriptions);

let me = Self {
laser_client_config,
messages_receiver,
subscriptions: Default::default(),
subscriptions,
active_subscriptions: Default::default(),
active_subscription_pubkeys: Default::default(),
program_subscriptions: Default::default(),
Expand All @@ -212,14 +225,19 @@ impl ChainLaserActor {
rpc_client,
};

(me, messages_sender, subscription_updates_receiver)
(
me,
messages_sender,
subscription_updates_receiver,
shared_subscriptions,
)
}

#[allow(dead_code)]
#[instrument(skip(self), fields(client_id = %self.client_id))]
fn shutdown(&mut self) {
info!("Shutting down laser actor");
self.subscriptions.clear();
self.subscriptions.write().clear();
self.active_subscriptions.clear();
self.active_subscription_pubkeys.clear();
}
Expand Down Expand Up @@ -256,7 +274,7 @@ impl ChainLaserActor {
None => {
debug!("Account subscription stream ended");
Self::signal_connection_issue(
&mut self.subscriptions,
&self.subscriptions,
&mut self.active_subscriptions,
&mut self.active_subscription_pubkeys,
&mut self.program_subscriptions,
Expand All @@ -281,7 +299,7 @@ impl ChainLaserActor {
None => {
debug!("Program subscription stream ended");
Self::signal_connection_issue(
&mut self.subscriptions,
&self.subscriptions,
&mut self.active_subscriptions,
&mut self.active_subscription_pubkeys,
&mut self.program_subscriptions,
Expand Down Expand Up @@ -338,7 +356,7 @@ impl ChainLaserActor {
Shutdown { response } => {
info!(client_id = self.client_id, "Received Shutdown message");
Self::clear_subscriptions(
&mut self.subscriptions,
&self.subscriptions,
&mut self.active_subscriptions,
&mut self.active_subscription_pubkeys,
&mut self.program_subscriptions,
Expand All @@ -360,14 +378,28 @@ impl ChainLaserActor {
pubkey: Pubkey,
sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
) {
if self.subscriptions.contains(&pubkey) {
debug!(pubkey = %pubkey, "Already subscribed to account");
let inserted = {
// Fast path: check with read lock first
let already_subscribed = {
let subs = self.subscriptions.read();
subs.contains(&pubkey)
};

if already_subscribed {
false
} else {
// Write lock only when we need to modify
let mut subs = self.subscriptions.write();
subs.insert(pubkey);
true
}
};
if !inserted {
trace!(pubkey = %pubkey, "Already subscribed to account");
sub_response.send(Ok(())).unwrap_or_else(|_| {
warn!(pubkey = %pubkey, "Failed to send already subscribed response");
});
} else {
self.subscriptions.insert(pubkey);
// If this is the first sub for the clock sysvar we want to activate it immediately
if self.active_subscriptions.is_empty() {
self.update_active_subscriptions();
}
Expand All @@ -383,7 +415,8 @@ impl ChainLaserActor {
pubkey: &Pubkey,
unsub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
) {
match self.subscriptions.remove(pubkey) {
let removed = self.subscriptions.write().remove(pubkey);
match removed {
true => {
trace!(pubkey = %pubkey, "Unsubscribed from account");
unsub_response.send(Ok(())).unwrap_or_else(|_| {
Expand All @@ -405,25 +438,28 @@ impl ChainLaserActor {
}

fn update_active_subscriptions(&mut self) {
// Check if the active subscriptions match what we already have
let new_pubkeys: HashSet<Pubkey> =
self.subscriptions.iter().copied().collect();
if new_pubkeys == self.active_subscription_pubkeys {
trace!(
count = self.subscriptions.len(),
"Active subscriptions already up to date"
);
return;
}
// Copy subscriptions and release the read lock immediately
let new_pubkeys: HashSet<Pubkey> = {
let subs = self.subscriptions.read();
// Check if the active subscriptions match what we already have
Comment thread
thlorenz marked this conversation as resolved.
if subs.eq(&self.active_subscription_pubkeys) {
trace!(
count = subs.len(),
"Active subscriptions already up to date"
);
return;
}
subs.iter().copied().collect()
};

inc_account_subscription_activations_count(&self.client_id);

let mut new_subs: StreamMap<usize, LaserStream> = StreamMap::new();

// Re-create streams for all subscriptions
let subs = self.subscriptions.iter().collect::<Vec<_>>();
let sub_refs = new_pubkeys.iter().collect::<Vec<_>>();

let chunks = subs
let chunks = sub_refs
.chunks(PER_STREAM_SUBSCRIPTION_LIMIT)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();
Expand All @@ -435,7 +471,7 @@ impl ChainLaserActor {

if tracing::enabled!(tracing::Level::TRACE) {
trace!(
account_count = self.subscriptions.len(),
account_count = new_pubkeys.len(),
chain_slot,
from_slot,
stream_count = chunks.len(),
Expand Down Expand Up @@ -639,7 +675,7 @@ impl ChainLaserActor {

error!(error = ?err, slots = ?self.slots, "Error in {} stream", source);
Self::signal_connection_issue(
&mut self.subscriptions,
&self.subscriptions,
&mut self.active_subscriptions,
&mut self.active_subscription_pubkeys,
&mut self.program_subscriptions,
Expand Down Expand Up @@ -710,12 +746,12 @@ impl ChainLaserActor {
}

fn clear_subscriptions(
subscriptions: &mut HashSet<Pubkey>,
subscriptions: &SharedSubscriptions,
active_subscriptions: &mut StreamMap<usize, LaserStream>,
active_subscription_pubkeys: &mut HashSet<Pubkey>,
program_subscriptions: &mut Option<(HashSet<Pubkey>, LaserStream)>,
) {
subscriptions.clear();
subscriptions.write().clear();
active_subscriptions.clear();
active_subscription_pubkeys.clear();
*program_subscriptions = None;
Expand All @@ -727,7 +763,7 @@ impl ChainLaserActor {
/// we add this as a backup in case it is unable to do so
#[instrument(skip(subscriptions, active_subscriptions, active_subscription_pubkeys, program_subscriptions, abort_sender), fields(client_id = %client_id))]
async fn signal_connection_issue(
subscriptions: &mut HashSet<Pubkey>,
subscriptions: &SharedSubscriptions,
active_subscriptions: &mut StreamMap<usize, LaserStream>,
active_subscription_pubkeys: &mut HashSet<Pubkey>,
program_subscriptions: &mut Option<(HashSet<Pubkey>, LaserStream)>,
Expand Down Expand Up @@ -837,8 +873,7 @@ impl ChainLaserActor {
);
}

if !self.subscriptions.contains(&pubkey) {
// Ignore updates for accounts we are not subscribed to
if !self.subscriptions.read().contains(&pubkey) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::{mpsc, oneshot};
use tracing::*;

use crate::remote_account_provider::{
chain_laser_actor::{ChainLaserActor, Slots},
chain_laser_actor::{ChainLaserActor, SharedSubscriptions, Slots},
chain_rpc_client::ChainRpcClientImpl,
pubsub_common::{ChainPubsubActorMessage, SubscriptionUpdate},
ChainPubsubClient, ReconnectableClient, RemoteAccountProviderError,
Expand Down Expand Up @@ -48,6 +48,8 @@ pub struct ChainLaserClientImpl {
updates: Arc<Mutex<Option<mpsc::Receiver<SubscriptionUpdate>>>>,
/// Channel to send messages to the actor
messages: mpsc::Sender<ChainPubsubActorMessage>,
/// Shared subscriptions with the actor for sync access
subscriptions: SharedSubscriptions,
/// Client identifier
client_id: String,
}
Expand All @@ -62,18 +64,20 @@ impl ChainLaserClientImpl {
slots: Slots,
rpc_client: ChainRpcClientImpl,
) -> Self {
let (actor, messages, updates) = ChainLaserActor::new_from_url(
pubsub_url,
&client_id,
api_key,
commitment,
abort_sender,
slots,
rpc_client,
);
let (actor, messages, updates, subscriptions) =
ChainLaserActor::new_from_url(
pubsub_url,
&client_id,
api_key,
commitment,
abort_sender,
slots,
rpc_client,
);
let client = Self {
updates: Arc::new(Mutex::new(Some(updates))),
messages,
subscriptions,
client_id,
};
tokio::spawn(actor.run());
Expand Down Expand Up @@ -174,15 +178,8 @@ impl ChainPubsubClient for ChainLaserClientImpl {
.expect("ChainLaserClientImpl::take_updates called more than once")
}

async fn subscription_count(
&self,
_exclude: Option<&[Pubkey]>,
) -> Option<(usize, usize)> {
None
}

fn subscriptions(&self) -> Option<Vec<Pubkey>> {
None
fn subscriptions_union(&self) -> HashSet<Pubkey> {
self.subscriptions.read().clone()
}

fn subs_immediately(&self) -> bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
sync::{
atomic::{AtomicBool, AtomicU16, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -169,26 +169,9 @@ impl ChainPubsubActor {
}
}

pub fn subscription_count(&self, filter: &[Pubkey]) -> usize {
pub fn subscriptions(&self) -> HashSet<Pubkey> {
if !self.is_connected.load(Ordering::SeqCst) {
return 0;
}
let subs = self
.subscriptions
.lock()
.expect("subscriptions lock poisoned");
if filter.is_empty() {
subs.len()
} else {
subs.keys()
.filter(|pubkey| !filter.contains(pubkey))
.count()
}
}

pub fn subscriptions(&self) -> Vec<Pubkey> {
if !self.is_connected.load(Ordering::SeqCst) {
return vec![];
return HashSet::new();
}
let subs = self
.subscriptions
Expand Down
Loading