diff --git a/src/infra/network/library.rs b/src/infra/network/library.rs index 770a4d80..a20b16b4 100644 --- a/src/infra/network/library.rs +++ b/src/infra/network/library.rs @@ -304,7 +304,12 @@ impl LibraryNetwork for Network { } #[cfg(feature = "streaming")] - let folder_nodes = fetch_rootlist_folders(&self.streaming_player).await; + let streaming_player = { + let app = self.app.lock().await; + app.streaming_player.clone() + }; + #[cfg(feature = "streaming")] + let folder_nodes = fetch_rootlist_folders(streaming_player).await; #[cfg(not(feature = "streaming"))] let folder_nodes: Option> = None; @@ -860,9 +865,9 @@ mod tests { #[cfg(feature = "streaming")] async fn fetch_rootlist_folders( - streaming_player: &Option>, + streaming_player: Option>, ) -> Option> { - let player = streaming_player.as_ref()?; + let player = streaming_player?; let session = player.session(); let bytes = match session.spclient().get_rootlist(0, Some(100_000)).await { diff --git a/src/infra/network/mod.rs b/src/infra/network/mod.rs index 425179f9..3fc21d8b 100644 --- a/src/infra/network/mod.rs +++ b/src/infra/network/mod.rs @@ -28,9 +28,6 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::Mutex; -#[cfg(feature = "streaming")] -use crate::infra::player::StreamingPlayer; - // Re-export traits use self::library::LibraryNetwork; use self::metadata::MetadataNetwork; @@ -151,8 +148,6 @@ pub struct Network { pub small_search_limit: u32, pub client_config: ClientConfig, pub app: Arc>, - #[cfg(feature = "streaming")] - pub streaming_player: Option>, pub party_connection: Option, pub party_incoming_rx: Option>, } @@ -163,7 +158,6 @@ impl Network { spotify: AuthCodePkceSpotify, client_config: ClientConfig, app: &Arc>, - streaming_player: Option>, ) -> Self { Network { spotify, @@ -171,7 +165,6 @@ impl Network { small_search_limit: 4, client_config, app: Arc::clone(app), - streaming_player, party_connection: None, party_incoming_rx: None, } diff --git a/src/infra/network/playback.rs b/src/infra/network/playback.rs index 12113d90..89dffd91 100644 --- a/src/infra/network/playback.rs +++ b/src/infra/network/playback.rs @@ -7,13 +7,15 @@ use chrono::TimeDelta; use rspotify::model::{ enums::RepeatState, idtypes::{PlayContextId, PlayableId}, - PlayableItem, + Offset, PlayableItem, }; use rspotify::prelude::*; use std::time::{Duration, Instant}; #[cfg(feature = "streaming")] use librespot_connect::{LoadRequest, LoadRequestOptions, PlayingTrack}; +#[cfg(feature = "streaming")] +use std::sync::Arc; const MAX_API_PLAYBACK_URIS: usize = 100; @@ -69,26 +71,43 @@ fn trim_api_playback_uris( (trimmed_uris, Some(selected_index - start)) } +fn api_playback_offset( + context_uris: Option<&[PlayableId<'static>]>, + offset: Option, +) -> Option { + if let Some(first_uri) = context_uris.and_then(|uris| uris.first()) { + return Some(Offset::Uri(first_uri.uri())); + } + + offset.map(|index| Offset::Position(ChronoDuration::milliseconds(index as i64))) +} + +/// Get the currently active streaming player, if any. +/// Note: This logic is duplicated in `main.rs` as `active_streaming_player()`. +/// Both are identical; the difference is input type (Network vs. App Arc). +/// A future refactor could consolidate to a shared location like `src/core/app.rs`. +#[cfg(feature = "streaming")] +async fn current_streaming_player( + network: &Network, +) -> Option> { + let app = network.app.lock().await; + app.streaming_player.clone() +} + #[cfg(feature = "streaming")] async fn is_native_streaming_active_for_playback(network: &Network) -> bool { - let player_connected = network - .streaming_player - .as_ref() - .is_some_and(|p| p.is_connected()); + let app = network.app.lock().await; + let streaming_player = app.streaming_player.clone(); + let player_connected = streaming_player.as_ref().is_some_and(|p| p.is_connected()); if !player_connected { return false; } - // Get native device name once (no lock needed) - let native_device_name = network - .streaming_player + let native_device_name = streaming_player .as_ref() .map(|p| p.device_name().to_lowercase()); - // Single lock acquisition - check all conditions in one go - let app = network.app.lock().await; - // If no context yet (e.g., at startup), use the app state flag which is // set when the native streaming device is activated/selected. let Some(ref ctx) = app.current_playback_context else { @@ -116,25 +135,21 @@ async fn is_native_streaming_active_for_playback(network: &Network) -> bool { false } -#[cfg(feature = "streaming")] -fn is_native_streaming_active(network: &Network) -> bool { - network - .streaming_player - .as_ref() - .is_some_and(|p| p.is_connected()) -} - impl PlaybackNetwork for Network { async fn get_current_playback(&mut self) { // When using native streaming, the Spotify API returns stale server-side state // that doesn't reflect recent local changes (volume, shuffle, repeat, play/pause). // We need to preserve these local states and restore them after getting the API response. #[cfg(feature = "streaming")] + let streaming_player = current_streaming_player(self).await; + #[cfg(feature = "streaming")] + // Check if native streaming is active by examining the pre-fetched player + // (avoids redundant lock call from is_native_streaming_active) let local_state: Option<(Option, bool, rspotify::model::RepeatState, Option)> = - if is_native_streaming_active(self) { + if streaming_player.as_ref().is_some_and(|p| p.is_connected()) { let app = self.app.lock().await; if let Some(ref ctx) = app.current_playback_context { - let volume = self.streaming_player.as_ref().map(|p| p.get_volume()); + let volume = streaming_player.as_ref().map(|p| p.get_volume()); Some(( volume, ctx.shuffle_state, @@ -164,7 +179,7 @@ impl PlaybackNetwork for Network { // Detect whether the native spotatui streaming device is the active Spotify device. #[cfg(feature = "streaming")] - let is_native_device = self.streaming_player.as_ref().is_some_and(|p| { + let is_native_device = streaming_player.as_ref().is_some_and(|p| { if let (Some(current_id), Some(native_id)) = (c.device.id.as_ref(), app.native_device_id.as_ref()) { @@ -236,7 +251,7 @@ impl PlaybackNetwork for Network { if local_state.is_none() && is_native_device { c.shuffle_state = app.user_config.behavior.shuffle_enabled; // Proactively set native shuffle on first load to keep backend in sync - if let Some(ref player) = self.streaming_player { + if let Some(ref player) = streaming_player { let _ = player.set_shuffle(app.user_config.behavior.shuffle_enabled); } } @@ -386,7 +401,7 @@ impl PlaybackNetwork for Network { // Check if we should use native streaming for playback #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { let activation_time = Instant::now(); let should_transfer = { let app = self.app.lock().await; @@ -481,31 +496,32 @@ impl PlaybackNetwork for Network { } } - let offset_struct = - offset.map(|o| rspotify::model::Offset::Position(ChronoDuration::milliseconds(o as i64))); - - let result = if let Some(context) = context_id { - self - .spotify - .start_context_playback( - context, - None, // device_id - offset_struct, - None, // position - ) - .await - } else if let Some(track_uris) = uris { - self - .spotify - .start_uris_playback( - track_uris, - None, // device_id - offset_struct, - None, // position - ) - .await - } else { - self.spotify.resume_playback(None, None).await + let result = match (context_id, uris) { + (Some(context), track_uris) => { + let offset_struct = api_playback_offset(track_uris.as_deref(), offset); + self + .spotify + .start_context_playback( + context, + None, // device_id + offset_struct, + None, // position + ) + .await + } + (None, Some(track_uris)) => { + let offset_struct = api_playback_offset(None, offset); + self + .spotify + .start_uris_playback( + track_uris, + None, // device_id + offset_struct, + None, // position + ) + .await + } + (None, None) => self.spotify.resume_playback(None, None).await, }; match result { @@ -533,7 +549,7 @@ impl PlaybackNetwork for Network { // Check if using native streaming #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { player.pause(); // Update UI state immediately let mut app = self.app.lock().await; @@ -561,7 +577,7 @@ impl PlaybackNetwork for Network { async fn next_track(&mut self) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { player.next(); return; } @@ -576,7 +592,7 @@ impl PlaybackNetwork for Network { async fn previous_track(&mut self) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { player.prev(); return; } @@ -591,7 +607,7 @@ impl PlaybackNetwork for Network { async fn force_previous_track(&mut self) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { player.prev(); tokio::time::sleep(std::time::Duration::from_millis(500)).await; player.prev(); @@ -619,7 +635,7 @@ impl PlaybackNetwork for Network { async fn seek(&mut self, position_ms: u32) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { player.seek(position_ms); return; } @@ -638,7 +654,7 @@ impl PlaybackNetwork for Network { async fn shuffle(&mut self, shuffle_state: bool) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { let _ = player.set_shuffle(shuffle_state); let mut app = self.app.lock().await; if let Some(ctx) = &mut app.current_playback_context { @@ -665,7 +681,7 @@ impl PlaybackNetwork for Network { async fn repeat(&mut self, repeat_state: RepeatState) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { let _ = player.set_repeat(repeat_state); let mut app = self.app.lock().await; if let Some(ctx) = &mut app.current_playback_context { @@ -692,7 +708,7 @@ impl PlaybackNetwork for Network { async fn change_volume(&mut self, volume: u8) { #[cfg(feature = "streaming")] if is_native_streaming_active_for_playback(self).await { - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { player.set_volume(volume); let mut app = self.app.lock().await; if let Some(ctx) = &mut app.current_playback_context { @@ -719,7 +735,8 @@ impl PlaybackNetwork for Network { async fn transfert_playback_to_device(&mut self, device_id: String, persist_device_id: bool) { #[cfg(feature = "streaming")] { - let is_native_transfer = if let Some(ref player) = self.streaming_player { + let streaming_player = current_streaming_player(self).await; + let is_native_transfer = if let Some(ref player) = streaming_player { let native_name = player.device_name().to_lowercase(); let app = self.app.lock().await; let matches_cached_device = app.devices.as_ref().is_some_and(|payload| { @@ -734,7 +751,7 @@ impl PlaybackNetwork for Network { }; if is_native_transfer { - if let Some(ref player) = self.streaming_player { + if let Some(ref player) = streaming_player { let _ = player.transfer(None); player.activate(); let mut app = self.app.lock().await; @@ -772,7 +789,7 @@ impl PlaybackNetwork for Network { async fn auto_select_streaming_device(&mut self, device_name: String, persist_device_id: bool) { tokio::time::sleep(Duration::from_millis(200)).await; - if let Some(ref player) = self.streaming_player { + if let Some(player) = current_streaming_player(self).await { let activation_time = Instant::now(); let should_transfer = { let app = self.app.lock().await; @@ -949,4 +966,41 @@ mod tests { assert_eq!(offset, Some(99)); assert_eq!(trimmed[offset.unwrap()].uri(), uris[149].uri()); } + + #[test] + fn api_playback_offset_uses_track_uri_for_context_playback() { + let uris = vec![ + playable_track("0000000000000000000001"), + playable_track("0000000000000000000002"), + ]; + + let offset = api_playback_offset(Some(&uris), Some(1)); + + assert_eq!( + offset, + Some(Offset::Uri( + "spotify:track:0000000000000000000001".to_string() + )) + ); + } + + #[test] + fn api_playback_offset_uses_position_for_uri_list_playback() { + let offset = api_playback_offset(None, Some(1)); + + assert_eq!( + offset, + Some(Offset::Position(ChronoDuration::milliseconds(1))) + ); + } + + #[test] + fn api_playback_offset_falls_back_to_position_when_context_has_no_uri() { + let offset = api_playback_offset(None, Some(3)); + + assert_eq!( + offset, + Some(Offset::Position(ChronoDuration::milliseconds(3))) + ); + } } diff --git a/src/infra/player/streaming.rs b/src/infra/player/streaming.rs index 2348b094..b25da3f7 100644 --- a/src/infra/player/streaming.rs +++ b/src/infra/player/streaming.rs @@ -25,6 +25,7 @@ use log::{error, info, warn}; use std::any::Any; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::Mutex; use tokio::time::{timeout, Duration}; @@ -175,6 +176,31 @@ fn request_streaming_oauth_credentials() -> Result { Ok(Credentials::with_access_token(token.access_token)) } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum StreamingAuthMode { + /// Default startup mode: use cache first, then fall back to browser OAuth. + InteractiveIfNeeded, + /// Recovery mode: only use cached credentials; never open a browser. + CacheOnly, +} + +fn resolve_streaming_credentials( + cache: &Cache, + auth_mode: StreamingAuthMode, +) -> Result<(Credentials, bool)> { + if let Some(cached_creds) = cache.credentials() { + info!("Using cached streaming credentials"); + return Ok((cached_creds, true)); + } + + match auth_mode { + StreamingAuthMode::InteractiveIfNeeded => Ok((request_streaming_oauth_credentials()?, false)), + StreamingAuthMode::CacheOnly => Err(anyhow!( + "No cached streaming credentials found (cache-only recovery mode)" + )), + } +} + fn clear_cached_streaming_credentials(cache_path: &Option) { let Some(credentials_path) = cache_path .as_ref() @@ -255,6 +281,7 @@ pub struct StreamingPlayer { config: StreamingConfig, #[allow(dead_code)] state: Arc>, + spirc_alive: Arc, } #[allow(dead_code)] @@ -274,6 +301,25 @@ impl StreamingPlayer { /// * `redirect_uri` - OAuth redirect URI (must match Spotify app settings) /// * `config` - Streaming configuration options pub async fn new(_client_id: &str, _redirect_uri: &str, config: StreamingConfig) -> Result { + Self::new_with_auth_mode(config, StreamingAuthMode::InteractiveIfNeeded).await + } + + /// Create a new streaming player using ONLY cached credentials. + /// + /// This path is intended for runtime recovery flows where opening a browser + /// would be disruptive. + pub async fn new_cache_only( + _client_id: &str, + _redirect_uri: &str, + config: StreamingConfig, + ) -> Result { + Self::new_with_auth_mode(config, StreamingAuthMode::CacheOnly).await + } + + async fn new_with_auth_mode( + config: StreamingConfig, + auth_mode: StreamingAuthMode, + ) -> Result { // Set up cache paths let cache_path = config.cache_path.clone().or_else(get_default_cache_path); let audio_cache_path = if config.audio_cache { @@ -292,14 +338,9 @@ impl StreamingPlayer { let cache = Cache::new(cache_path.clone(), None, audio_cache_path, None)?; - // Try to get credentials from cache first + // Try to get credentials from cache first, then optionally fall back to OAuth. let (mut credentials, mut used_cached_credentials) = - if let Some(cached_creds) = cache.credentials() { - info!("Using cached streaming credentials"); - (cached_creds, true) - } else { - (request_streaming_oauth_credentials()?, false) - }; + resolve_streaming_credentials(&cache, auth_mode)?; // Create session configuration using spotify-player's client_id let session_config = SessionConfig { @@ -393,11 +434,12 @@ impl StreamingPlayer { match timeout(Duration::from_secs(init_timeout_secs), spirc_new).await { Ok(Ok(result)) => break result, Ok(Err(e)) - if should_retry_with_fresh_credentials( - true, - used_cached_credentials, - retried_with_fresh_credentials, - ) => + if matches!(auth_mode, StreamingAuthMode::InteractiveIfNeeded) + && should_retry_with_fresh_credentials( + true, + used_cached_credentials, + retried_with_fresh_credentials, + ) => { warn!( "Cached streaming credentials failed ({:?}); retrying with a fresh OAuth login", @@ -425,8 +467,15 @@ impl StreamingPlayer { } }; - // Spawn the Spirc task to run in the background - tokio::spawn(spirc_task); + // Track the Spirc runtime lifecycle so liveness checks can detect dead + // Connect sessions even if the player thread is still running. + let spirc_alive = Arc::new(AtomicBool::new(true)); + let spirc_alive_for_task = Arc::clone(&spirc_alive); + let spirc_handle = tokio::spawn(spirc_task); + tokio::spawn(async move { + let _ = spirc_handle.await; + spirc_alive_for_task.store(false, Ordering::Relaxed); + }); info!("Streaming connection established!"); @@ -437,6 +486,7 @@ impl StreamingPlayer { mixer, config, state: Arc::new(Mutex::new(PlayerState::default())), + spirc_alive, }) } @@ -447,7 +497,9 @@ impl StreamingPlayer { /// Check if the session is connected pub fn is_connected(&self) -> bool { - !self.player.is_invalid() + self.spirc_alive.load(Ordering::Relaxed) + && !self.session.is_invalid() + && !self.player.is_invalid() } /// Play a track by its Spotify URI (e.g., "spotify:track:xxxx") @@ -586,7 +638,7 @@ impl StreamingPlayer { /// Check if the player is invalid (e.g., session disconnected) pub fn is_invalid(&self) -> bool { - self.player.is_invalid() + !self.is_connected() } /// Activate the device (make it the active playback device) @@ -607,6 +659,7 @@ impl StreamingPlayer { /// Shutdown the player pub fn shutdown(&self) { + self.spirc_alive.store(false, Ordering::Relaxed); let _ = self.spirc.shutdown(); } diff --git a/src/main.rs b/src/main.rs index 3218fae1..f28a3c1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1161,7 +1161,7 @@ of the app. Beware that this comes at a CPU cost!", // Save, because we checked if the subcommand is present at runtime let m = matches.subcommand_matches(cmd).unwrap(); #[cfg(feature = "streaming")] - let network = Network::new(spotify, client_config, &app, None); // CLI doesn't use streaming + let network = Network::new(spotify, client_config, &app); // CLI doesn't use streaming #[cfg(not(feature = "streaming"))] let network = Network::new(spotify, client_config, &app); println!( @@ -1265,9 +1265,7 @@ of the app. Beware that this comes at a CPU cost!", app_mut.streaming_player = streaming_player.clone(); } - // Clone streaming player and device name for use in network spawn - #[cfg(feature = "streaming")] - let streaming_player_clone = streaming_player.clone(); + // Clone the device name for startup device selection in the network task. #[cfg(feature = "streaming")] let streaming_device_name = streaming_player .as_ref() @@ -1294,6 +1292,9 @@ of the app. Beware that this comes at a CPU cost!", let shared_position_for_mpris = Arc::clone(&shared_position); #[cfg(all(feature = "macos-media", target_os = "macos"))] let shared_is_playing_for_macos = Arc::clone(&shared_is_playing); + #[cfg(feature = "streaming")] + let (streaming_recovery_tx, mut streaming_recovery_rx) = + tokio::sync::mpsc::unbounded_channel::(); // Initialize MPRIS D-Bus integration for desktop media control // This registers spotatui as a controllable media player on the session bus @@ -1389,14 +1390,9 @@ of the app. Beware that this comes at a CPU cost!", #[cfg(all(feature = "macos-media", target_os = "macos"))] if let Some(ref macos_media) = macos_media_manager { if let Some(event_rx) = macos_media.take_event_rx() { - let streaming_player_for_macos = streaming_player.clone(); + let app_for_macos = Arc::clone(&app); tokio::spawn(async move { - handle_macos_media_events( - event_rx, - streaming_player_for_macos, - shared_is_playing_for_macos, - ) - .await; + handle_macos_media_events(event_rx, app_for_macos, shared_is_playing_for_macos).await; }); } } @@ -1435,21 +1431,97 @@ of the app. Beware that this comes at a CPU cost!", // Spawn player event listener (updates app state from native player events) #[cfg(feature = "streaming")] if let Some(ref player) = streaming_player { - let event_rx = player.get_event_channel(); - let app_for_events = Arc::clone(&app); - info!("spawning native player event handler"); + spawn_player_event_handler(PlayerEventContext { + player: Arc::clone(player), + app: Arc::clone(&app), + shared_position: shared_position_for_events, + shared_is_playing: shared_is_playing_for_events, + recovery_tx: streaming_recovery_tx.clone(), + #[cfg(all(feature = "mpris", target_os = "linux"))] + mpris_manager: mpris_for_events, + #[cfg(all(feature = "macos-media", target_os = "macos"))] + macos_media_manager: macos_media_for_events, + }); + } + + #[cfg(feature = "streaming")] + { + let app_for_recovery = Arc::clone(&app); + let shared_position_for_recovery = Arc::clone(&shared_position); + let shared_is_playing_for_recovery = Arc::clone(&shared_is_playing); + let recovery_tx = streaming_recovery_tx.clone(); + let recovery_client_config = client_config.clone(); + let recovery_redirect_uri = selected_redirect_uri.clone(); + #[cfg(all(feature = "mpris", target_os = "linux"))] + let mpris_for_recovery = mpris_manager.clone(); + #[cfg(all(feature = "macos-media", target_os = "macos"))] + let macos_media_for_recovery = macos_media_manager.clone(); + tokio::spawn(async move { - handle_player_events( - event_rx, - app_for_events, - shared_position_for_events, - shared_is_playing_for_events, - #[cfg(all(feature = "mpris", target_os = "linux"))] - mpris_for_events, - #[cfg(all(feature = "macos-media", target_os = "macos"))] - macos_media_for_events, - ) - .await; + while let Some(mut request) = streaming_recovery_rx.recv().await { + while let Ok(next_request) = streaming_recovery_rx.try_recv() { + request.reselect_device |= next_request.reselect_device; + } + + if active_streaming_player(&app_for_recovery).await.is_some() { + continue; + } + + let initial_volume = { + let app = app_for_recovery.lock().await; + app.user_config.behavior.volume_percent + }; + + let streaming_config = player::StreamingConfig { + device_name: recovery_client_config.streaming_device_name.clone(), + bitrate: recovery_client_config.streaming_bitrate, + audio_cache: recovery_client_config.streaming_audio_cache, + cache_path: player::get_default_cache_path(), + initial_volume, + }; + + info!("attempting native streaming recovery"); + + match player::StreamingPlayer::new_cache_only( + &recovery_client_config.client_id, + &recovery_redirect_uri, + streaming_config, + ) + .await + { + Ok(recovered_player) => { + let recovered_player = Arc::new(recovered_player); + { + let mut app = app_for_recovery.lock().await; + app.streaming_player = Some(Arc::clone(&recovered_player)); + app.set_status_message("Native streaming recovered.", 6); + if request.reselect_device { + app.dispatch(IoEvent::AutoSelectStreamingDevice( + recovery_client_config.streaming_device_name.clone(), + false, + )); + } + } + + spawn_player_event_handler(PlayerEventContext { + player: recovered_player, + app: Arc::clone(&app_for_recovery), + shared_position: Arc::clone(&shared_position_for_recovery), + shared_is_playing: Arc::clone(&shared_is_playing_for_recovery), + recovery_tx: recovery_tx.clone(), + #[cfg(all(feature = "mpris", target_os = "linux"))] + mpris_manager: mpris_for_recovery.clone(), + #[cfg(all(feature = "macos-media", target_os = "macos"))] + macos_media_manager: macos_media_for_recovery.clone(), + }); + } + Err(e) => { + info!("native streaming recovery failed: {}", e); + let mut app = app_for_recovery.lock().await; + app.set_status_message(format!("Native recovery failed: {}", e), 8); + } + } + } }); } @@ -1457,7 +1529,7 @@ of the app. Beware that this comes at a CPU cost!", info!("spawning spotify network event handler"); tokio::spawn(async move { #[cfg(feature = "streaming")] - let mut network = Network::new(spotify, client_config, &app, streaming_player_clone); + let mut network = Network::new(spotify, client_config, &app); #[cfg(not(feature = "streaming"))] let mut network = Network::new(spotify, client_config, &app); @@ -1589,9 +1661,11 @@ async fn start_tokio(io_rx: std::sync::mpsc::Receiver, network: &mut Ne #[cfg(feature = "streaming")] async fn handle_player_events( mut event_rx: librespot_playback::player::PlayerEventChannel, + player: Arc, app: Arc>, shared_position: Arc, shared_is_playing: Arc, + recovery_tx: tokio::sync::mpsc::UnboundedSender, #[cfg(all(feature = "mpris", target_os = "linux"))] mpris_manager: Option< Arc, >, @@ -1604,6 +1678,10 @@ async fn handle_player_events( use std::sync::atomic::Ordering; while let Some(event) = event_rx.recv().await { + if !is_current_streaming_player(&app, &player).await { + continue; + } + // Use try_lock() to avoid blocking when the UI thread is busy // If we can't get the lock, skip this update - the UI will catch up on the next tick match event { @@ -1892,11 +1970,181 @@ async fn handle_player_events( macos_media.set_position(position_ms as u64); } } + PlayerEvent::SessionDisconnected { .. } => { + #[cfg(all(feature = "mpris", target_os = "linux"))] + if let Some(ref mpris) = mpris_manager { + mpris.set_stopped(); + } + + #[cfg(all(feature = "macos-media", target_os = "macos"))] + if let Some(ref macos_media) = macos_media_manager { + macos_media.set_stopped(); + } + + if let Some(request) = disconnect_streaming_player( + &app, + &player, + &shared_position, + &shared_is_playing, + "Native streaming disconnected; attempting recovery.", + ) + .await + { + let _ = recovery_tx.send(request); + } + return; + } _ => { // Ignore other events } } } + + if let Some(request) = disconnect_streaming_player( + &app, + &player, + &shared_position, + &shared_is_playing, + "Native streaming stopped; attempting recovery.", + ) + .await + { + let _ = recovery_tx.send(request); + } +} + +#[cfg(feature = "streaming")] +#[derive(Clone, Copy, Default)] +struct StreamingRecoveryRequest { + reselect_device: bool, +} + +/// Bundled context for player event handling tasks. +/// Groups all shared state and managers needed by event handlers. +#[cfg(feature = "streaming")] +struct PlayerEventContext { + player: Arc, + app: Arc>, + shared_position: Arc, + shared_is_playing: Arc, + recovery_tx: tokio::sync::mpsc::UnboundedSender, + #[cfg(all(feature = "mpris", target_os = "linux"))] + mpris_manager: Option>, + #[cfg(all(feature = "macos-media", target_os = "macos"))] + macos_media_manager: Option>, +} + +/// Get the currently active streaming player (if any). +/// This is logically identical to `current_streaming_player` in src/infra/network/playback.rs. +/// The difference: this function takes `&Arc>` directly (used in main.rs event handlers), +/// while `current_streaming_player` takes `&Network` (used in playback API code). +/// Future refactor: consolidate to a shared location like `src/core/app.rs`. +#[cfg(feature = "streaming")] +async fn active_streaming_player(app: &Arc>) -> Option> { + let app_lock = app.lock().await; + app_lock.streaming_player.clone() +} + +#[cfg(feature = "streaming")] +async fn is_current_streaming_player( + app: &Arc>, + player: &Arc, +) -> bool { + // Pointer identity determines whether an event belongs to the active player. + // Do not reject disconnected players here: SessionDisconnected still needs to + // reach the handler so the recovery path can run. + let app_lock = app.lock().await; + app_lock + .streaming_player + .as_ref() + .is_some_and(|current| Arc::ptr_eq(current, player)) +} + +#[cfg(feature = "streaming")] +fn current_playback_matches_native(app: &App, player: &player::StreamingPlayer) -> bool { + let Some(ctx) = app.current_playback_context.as_ref() else { + return app.is_streaming_active; + }; + + if let Some(native_id) = app.native_device_id.as_ref() { + if ctx.device.id.as_ref() == Some(native_id) { + return true; + } + } + + ctx.device.name.eq_ignore_ascii_case(player.device_name()) +} + +#[cfg(feature = "streaming")] +async fn disconnect_streaming_player( + app: &Arc>, + player: &Arc, + shared_position: &Arc, + shared_is_playing: &Arc, + status_message: &str, +) -> Option { + let mut app_lock = app.lock().await; + let Some(current_player) = app_lock.streaming_player.as_ref() else { + return None; + }; + if !Arc::ptr_eq(current_player, player) { + return None; + } + + let reselect_device = current_playback_matches_native(&app_lock, player); + + app_lock.streaming_player = None; + app_lock.is_streaming_active = false; + app_lock.native_activation_pending = false; + app_lock.native_device_id = None; + app_lock.native_is_playing = Some(false); + app_lock.native_track_info = None; + app_lock.song_progress_ms = 0; + app_lock.last_track_id = None; + app_lock.last_device_activation = None; + app_lock.seek_ms = None; + if reselect_device { + app_lock.current_playback_context = None; + } + app_lock.set_status_message(status_message, 8); + app_lock.dispatch(IoEvent::GetCurrentPlayback); + + shared_position.store(0, Ordering::Relaxed); + shared_is_playing.store(false, Ordering::Relaxed); + + Some(StreamingRecoveryRequest { reselect_device }) +} + +#[cfg(feature = "streaming")] +fn spawn_player_event_handler(ctx: PlayerEventContext) { + let event_rx = ctx.player.get_event_channel(); + info!("spawning native player event handler"); + + let player = ctx.player.clone(); + let app = Arc::clone(&ctx.app); + let shared_position = Arc::clone(&ctx.shared_position); + let shared_is_playing = Arc::clone(&ctx.shared_is_playing); + let recovery_tx = ctx.recovery_tx.clone(); + #[cfg(all(feature = "mpris", target_os = "linux"))] + let mpris_manager = ctx.mpris_manager.clone(); + #[cfg(all(feature = "macos-media", target_os = "macos"))] + let macos_media_manager = ctx.macos_media_manager.clone(); + + tokio::spawn(async move { + handle_player_events( + event_rx, + player, + app, + shared_position, + shared_is_playing, + recovery_tx, + #[cfg(all(feature = "mpris", target_os = "linux"))] + mpris_manager, + #[cfg(all(feature = "macos-media", target_os = "macos"))] + macos_media_manager, + ) + .await; + }); } /// Handle MPRIS events from external clients (media keys, playerctl, etc.) @@ -2098,18 +2346,17 @@ async fn handle_mpris_events( #[cfg(all(feature = "macos-media", target_os = "macos"))] async fn handle_macos_media_events( mut event_rx: tokio::sync::mpsc::UnboundedReceiver, - streaming_player: Option>, + app: Arc>, shared_is_playing: Arc, ) { use macos_media::MacMediaEvent; use std::sync::atomic::Ordering; - let Some(player) = streaming_player else { - // No streaming player, nothing to control - return; - }; - while let Some(event) = event_rx.recv().await { + let Some(player) = active_streaming_player(&app).await else { + continue; + }; + match event { MacMediaEvent::PlayPause => { // Toggle based on atomic state (lock-free, always up-to-date)