@@ -89,12 +89,10 @@ use std::pin::Pin;
8989use std:: sync:: Arc ;
9090use std:: vec:: IntoIter ;
9191use thiserror:: Error ;
92+ use tokio:: sync:: OnceCell ;
9293use tokio:: {
9394 runtime:: Handle ,
94- sync:: {
95- mpsc:: { unbounded_channel, UnboundedReceiver } ,
96- RwLock ,
97- } ,
95+ sync:: mpsc:: { unbounded_channel, UnboundedReceiver } ,
9896 task:: JoinHandle ,
9997} ;
10098
@@ -230,7 +228,7 @@ impl EventUnsubscriber<'_> {
230228pub struct Program < C > {
231229 program_id : Pubkey ,
232230 cfg : Config < C > ,
233- sub_client : Arc < RwLock < Option < PubsubClient > > > ,
231+ sub_client : OnceCell < Arc < PubsubClient > > ,
234232 #[ cfg( not( feature = "async" ) ) ]
235233 rt : tokio:: runtime:: Runtime ,
236234 internal_rpc_client : AsyncRpcClient ,
@@ -297,20 +295,6 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
297295 } )
298296 }
299297
300- async fn init_sub_client_if_needed ( & self ) -> Result < ( ) , ClientError > {
301- let lock = & self . sub_client ;
302- let mut client = lock. write ( ) . await ;
303-
304- if client. is_none ( ) {
305- let sub_client = PubsubClient :: new ( self . cfg . cluster . ws_url ( ) )
306- . await
307- . map_err ( Box :: new) ?;
308- * client = Some ( sub_client) ;
309- }
310-
311- Ok ( ( ) )
312- }
313-
314298 async fn on_internal < T : anchor_lang:: Event + anchor_lang:: AnchorDeserialize > (
315299 & self ,
316300 mut f : impl FnMut ( & EventContext , T ) + Send + ' static ,
@@ -321,41 +305,45 @@ impl<C: Deref<Target = impl Signer> + Clone> Program<C> {
321305 ) ,
322306 ClientError ,
323307 > {
324- self . init_sub_client_if_needed ( ) . await ?;
308+ let client = self
309+ . sub_client
310+ . get_or_try_init ( || async {
311+ PubsubClient :: new ( self . cfg . cluster . ws_url ( ) )
312+ . await
313+ . map ( Arc :: new)
314+ . map_err ( |e| ClientError :: SolanaClientPubsubError ( Box :: new ( e) ) )
315+ } )
316+ . await ?
317+ . clone ( ) ;
318+
325319 let ( tx, rx) = unbounded_channel :: < _ > ( ) ;
326320 let config = RpcTransactionLogsConfig {
327321 commitment : self . cfg . options ,
328322 } ;
329323 let program_id_str = self . program_id . to_string ( ) ;
330324 let filter = RpcTransactionLogsFilter :: Mentions ( vec ! [ program_id_str. clone( ) ] ) ;
331325
332- let lock = Arc :: clone ( & self . sub_client ) ;
333-
334326 let handle = tokio:: spawn ( async move {
335- if let Some ( ref client) = * lock. read ( ) . await {
336- let ( mut notifications, unsubscribe) = client
337- . logs_subscribe ( filter, config)
338- . await
339- . map_err ( Box :: new) ?;
340-
341- tx. send ( unsubscribe) . map_err ( |e| {
342- ClientError :: SolanaClientPubsubError ( Box :: new (
343- PubsubClientError :: RequestFailed {
344- message : "Unsubscribe failed" . to_string ( ) ,
345- reason : e. to_string ( ) ,
346- } ,
347- ) )
348- } ) ?;
349-
350- while let Some ( logs) = notifications. next ( ) . await {
351- let ctx = EventContext {
352- signature : logs. value . signature . parse ( ) . unwrap ( ) ,
353- slot : logs. context . slot ,
354- } ;
355- let events = parse_logs_response ( logs, & program_id_str) ?;
356- for e in events {
357- f ( & ctx, e) ;
358- }
327+ let ( mut notifications, unsubscribe) = client
328+ . logs_subscribe ( filter, config)
329+ . await
330+ . map_err ( Box :: new) ?;
331+
332+ tx. send ( unsubscribe) . map_err ( |e| {
333+ ClientError :: SolanaClientPubsubError ( Box :: new ( PubsubClientError :: RequestFailed {
334+ message : "Unsubscribe failed" . to_string ( ) ,
335+ reason : e. to_string ( ) ,
336+ } ) )
337+ } ) ?;
338+
339+ while let Some ( logs) = notifications. next ( ) . await {
340+ let ctx = EventContext {
341+ signature : logs. value . signature . parse ( ) . unwrap ( ) ,
342+ slot : logs. context . slot ,
343+ } ;
344+ let events = parse_logs_response ( logs, & program_id_str) ?;
345+ for e in events {
346+ f ( & ctx, e) ;
359347 }
360348 }
361349 Ok :: < ( ) , ClientError > ( ( ) )
@@ -744,7 +732,10 @@ fn parse_logs_response<T: anchor_lang::Event + anchor_lang::AnchorDeserialize>(
744732
745733#[ cfg( test) ]
746734mod tests {
735+ use futures:: { SinkExt , StreamExt } ;
747736 use solana_rpc_client_api:: response:: RpcResponseContext ;
737+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
738+ use tokio_tungstenite:: tungstenite:: Message ;
748739
749740 // Creating a mock struct that implements `anchor_lang::events`
750741 // for type inference in `test_logs`
@@ -916,4 +907,92 @@ mod tests {
916907
917908 Ok ( ( ) )
918909 }
910+
911+ /// Regression test that registering multiple event listeners does not deadlock.
912+ #[ test]
913+ fn multiple_listeners_no_deadlock ( ) {
914+ // Spin up a tiny mock websocket server that responds to `logsSubscribe`
915+ // JSON-RPC requests with a valid subscription id.
916+ let rt = tokio:: runtime:: Builder :: new_multi_thread ( )
917+ . enable_all ( )
918+ . build ( )
919+ . unwrap ( ) ;
920+
921+ let ( addr_tx, addr_rx) = std:: sync:: mpsc:: channel ( ) ;
922+
923+ rt. spawn ( async move {
924+ let listener = tokio:: net:: TcpListener :: bind ( "127.0.0.1:0" ) . await . unwrap ( ) ;
925+ let addr = listener. local_addr ( ) . unwrap ( ) ;
926+ addr_tx. send ( addr) . unwrap ( ) ;
927+
928+ static SUB_ID : AtomicU64 = AtomicU64 :: new ( 0 ) ;
929+
930+ loop {
931+ let ( stream, _) = listener. accept ( ) . await . unwrap ( ) ;
932+ tokio:: spawn ( async move {
933+ let mut ws = tokio_tungstenite:: accept_async ( stream) . await . unwrap ( ) ;
934+ while let Some ( Ok ( Message :: Text ( _) ) ) = ws. next ( ) . await {
935+ let sub_id = SUB_ID . fetch_add ( 1 , Ordering :: Relaxed ) ;
936+ // The PubsubClient sends sequential integer ids starting at 0.
937+ let resp =
938+ format ! ( r#"{{"jsonrpc":"2.0","result":{sub_id},"id":{sub_id}}}"# ) ;
939+ ws. send ( Message :: Text ( resp. into ( ) ) ) . await . unwrap ( ) ;
940+ }
941+ } ) ;
942+ }
943+ } ) ;
944+
945+ let addr = addr_rx. recv ( ) . unwrap ( ) ;
946+ let ws_url = format ! ( "ws://{}" , addr) ;
947+
948+ let client = super :: Client :: new (
949+ super :: Cluster :: Custom ( ws_url. clone ( ) , ws_url) ,
950+ std:: sync:: Arc :: new ( solana_keypair:: Keypair :: new ( ) ) ,
951+ ) ;
952+ let program = client. program ( Pubkey :: new_unique ( ) ) . unwrap ( ) ;
953+
954+ // With the old RwLock-based code, the second call would deadlock.
955+ // Use a timeout to ensure the test fails instead of hanging forever.
956+ let ( done_tx, done_rx) = std:: sync:: mpsc:: channel ( ) ;
957+ let handle = std:: thread:: spawn ( move || {
958+ #[ cfg( not( feature = "async" ) ) ]
959+ {
960+ let _listener1 = program
961+ . on :: < MockEvent > ( |_ctx, _event| { } )
962+ . expect ( "first listener" ) ;
963+
964+ let _listener2 = program
965+ . on :: < MockEvent > ( |_ctx, _event| { } )
966+ . expect ( "second listener" ) ;
967+ }
968+
969+ #[ cfg( feature = "async" ) ]
970+ {
971+ let rt = tokio:: runtime:: Builder :: new_current_thread ( )
972+ . enable_all ( )
973+ . build ( )
974+ . unwrap ( ) ;
975+ rt. block_on ( async {
976+ let _listener1 = program
977+ . on :: < MockEvent > ( |_ctx, _event| { } )
978+ . await
979+ . expect ( "first listener" ) ;
980+
981+ let _listener2 = program
982+ . on :: < MockEvent > ( |_ctx, _event| { } )
983+ . await
984+ . expect ( "second listener" ) ;
985+ } ) ;
986+ }
987+
988+ let _ = done_tx. send ( ( ) ) ;
989+ } ) ;
990+
991+ // If this times out, the deadlock is still present.
992+ done_rx
993+ . recv_timeout ( std:: time:: Duration :: from_secs ( 5 ) )
994+ . expect ( "registering two listeners should not deadlock" ) ;
995+
996+ handle. join ( ) . unwrap ( ) ;
997+ }
919998}
0 commit comments