diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 3b3c800..7c58661 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -1,6 +1,7 @@ use clap::{Args, Subcommand}; use regex::Regex; use std::fs::File; +use std::collections::HashMap; use std::io::Read; use std::path::PathBuf; use std::sync::Arc; @@ -100,6 +101,15 @@ pub struct UploadCmdArgs { #[arg(long, default_value_t = false)] pub disable_description_generation: bool, + + #[arg(long, default_value_t = false)] + pub show_status_until_done: bool, + + #[arg(long, default_value_t = false)] + pub show_status_until_analysed: bool, + + #[arg(long, default_value_t = false)] + pub show_status_until_transcoded: bool, } #[derive(Clone)] @@ -116,6 +126,20 @@ enum DownscaleWork { Passthrough(PathBuf), } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum StatusWaitMode { + Done, + Analysed, + Transcoded, +} + +#[derive(Clone, Debug, Default)] +struct AssetTaskProgress { + analyze_asset: Option<(String, f64)>, + downscaling: Option<(String, f64)>, + deep_analyze: Option<(String, f64)>, +} + fn parse_generate_proxy(s: &str) -> Result { match s.trim() { "360" => Ok(GenerateProxy::Variant360), @@ -260,6 +284,30 @@ fn run_recreate_filesystem(args: RecreateFilesystemArgs) -> Result<(), String> { } fn run_upload(args: UploadCmdArgs) -> Result<(), String> { + let script_start = std::time::Instant::now(); + let active_status_flags = [ + args.show_status_until_done, + args.show_status_until_analysed, + args.show_status_until_transcoded, + ] + .into_iter() + .filter(|v| *v) + .count(); + if active_status_flags > 1 { + return Err( + "Use only one of --show-status-until-done, --show-status-until-analysed, --show-status-until-transcoded" + .to_string(), + ); + } + let status_wait_mode = if args.show_status_until_done { + Some(StatusWaitMode::Done) + } else if args.show_status_until_analysed { + Some(StatusWaitMode::Analysed) + } else if args.show_status_until_transcoded { + Some(StatusWaitMode::Transcoded) + } else { + None + }; let base_dir = PathBuf::from(&args.path); if !base_dir.exists() { return Err(format!("path not found: {}", base_dir.display())); @@ -384,7 +432,7 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { } let work_items: Vec = build_downscale_work(&original_files)?; output::info(format!("{} file(s) in downscale queue", work_items.len())); - return run_two_queue_pipeline( + let uploaded_asset_ids = run_two_queue_pipeline( work_items, &base_dir, &args, @@ -394,6 +442,20 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { &user_id, &upload_request_id, ); + let uploaded_asset_ids = uploaded_asset_ids?; + if let Some(mode) = status_wait_mode { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("failed to start runtime: {}", e))?; + rt.block_on(wait_for_asset_processing_status( + &cfg, + &api_key, + bearer_header_for_auth.as_deref(), + &uploaded_asset_ids, + mode, + script_start, + ))?; + } + return Ok(()); } let mut files_to_upload: Vec = Vec::new(); @@ -456,7 +518,7 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { if let Err(ref e) = upload_result { let _ = progress_handle.add_error(format!("Upload failed: {}", e)); } - upload_result?; + let uploaded_asset_ids = upload_result?; let _ = progress_handle.add_success("All uploads completed"); @@ -466,6 +528,18 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { // Add empty line after progress display println!(); + if let Some(mode) = status_wait_mode { + wait_for_asset_processing_status( + &cfg, + &api_key, + bearer_header.as_deref(), + &uploaded_asset_ids, + mode, + script_start, + ) + .await?; + } + Ok(()) }) } @@ -602,7 +676,7 @@ fn run_two_queue_pipeline( bearer_opt: Option<&str>, user_id: &str, upload_request_id: &str, -) -> Result<(), String> { +) -> Result, String> { let (upload_tx, mut upload_rx) = tokio_mpsc::channel::(64); let mut progress = TwoQueueProgress::new()?; @@ -679,6 +753,8 @@ fn run_two_queue_pipeline( }; drop(upload_tx); + let uploaded_asset_ids: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); + let uploaded_asset_ids_consumer = Arc::clone(&uploaded_asset_ids); let consumer = async move { while let Some(file_info) = upload_rx.recv().await { progress_handle.decrement_upload_queued(); @@ -735,6 +811,9 @@ fn run_two_queue_pipeline( e )); } + if let Ok(mut guard) = uploaded_asset_ids_consumer.lock() { + guard.push(upload_resp.asset_id.clone()); + } // Call preprocess as soon as this upload finishes let _ = progress_handle.add_info("Triggering preprocessing..."); let mut preproc_req = ProcessAssetsRequest::new( @@ -771,17 +850,21 @@ fn run_two_queue_pipeline( }; let join_result = tokio::try_join!(producer, consumer); - Ok::<_, String>((render_handle, progress, join_result)) + let collected_ids = uploaded_asset_ids + .lock() + .map(|ids| ids.clone()) + .unwrap_or_default(); + Ok::<_, String>((render_handle, progress, join_result, collected_ids)) }); - let (render_handle, mut progress, join_result) = block_result?; + let (render_handle, mut progress, join_result, collected_ids) = block_result?; rt.block_on(TwoQueueProgress::stop_render_loop(render_handle)); let _ = progress.finish(); progress.print_messages_to_stderr(); println!(); join_result?; - Ok(()) + Ok(collected_ids) } fn build_single_upload_request( @@ -963,7 +1046,7 @@ async fn upload_with_per_file_presigned( bearer_opt: Option<&str>, disable_description_generation: bool, generate_proxy: Option<&Vec>, -) -> Result<(), String> { +) -> Result, String> { let http = Arc::new( reqwest::Client::builder() .timeout(std::time::Duration::from_secs(60)) @@ -973,6 +1056,7 @@ async fn upload_with_per_file_presigned( let semaphore = Arc::new(Semaphore::new(max_concurrent)); let mut upload_tasks = Vec::new(); + let uploaded_asset_ids: Arc>> = Arc::new(std::sync::Mutex::new(Vec::new())); let cfg = cfg.clone(); let api_key = api_key.to_string(); let bearer_opt = bearer_opt.map(String::from); @@ -995,6 +1079,7 @@ async fn upload_with_per_file_presigned( let upload_request_id_clone = upload_request_id.clone(); let user_id_clone = user_id.clone(); let generate_proxy_clone = generate_proxy.clone(); + let uploaded_asset_ids_clone = Arc::clone(&uploaded_asset_ids); let task_id = i; let task = tokio::spawn(async move { @@ -1049,6 +1134,9 @@ async fn upload_with_per_file_presigned( let _ = progress_handle_clone.finish_task(task_id, success); if success { + if let Ok(mut guard) = uploaded_asset_ids_clone.lock() { + guard.push(upload_resp.asset_id.clone()); + } let mut preproc_req = ProcessAssetsRequest::new( vec![upload_resp], None::, @@ -1090,6 +1178,275 @@ async fn upload_with_per_file_presigned( .map_err(|e| format!("upload failed: {}", e))?; } + Ok(uploaded_asset_ids + .lock() + .map(|ids| ids.clone()) + .unwrap_or_default()) +} + +fn normalize_task_type(task_type: &str) -> Option<&'static str> { + let t = task_type.trim().to_ascii_lowercase(); + if t == "analyze asset" { + Some("analyze asset") + } else if t == "downscaling" { + Some("downscaling") + } else if t == "deep analyze" { + Some("deep analyze") + } else { + None + } +} + +fn is_terminal_status(status: &str) -> bool { + let s = status.trim().to_ascii_lowercase(); + matches!( + s.as_str(), + "success" | "succeeded" | "done" | "completed" | "error" | "failed" | "cancelled" + ) +} + +fn needs_task_for_mode(mode: StatusWaitMode, task_type: &str) -> bool { + match mode { + StatusWaitMode::Done => matches!(task_type, "analyze asset" | "downscaling" | "deep analyze"), + StatusWaitMode::Analysed => matches!(task_type, "analyze asset" | "deep analyze"), + StatusWaitMode::Transcoded => task_type == "downscaling", + } +} + +fn all_done_for_mode(mode: StatusWaitMode, progress: &AssetTaskProgress) -> bool { + let check = |entry: &Option<(String, f64)>| -> bool { + entry.as_ref().map(|(status, _)| is_terminal_status(status)).unwrap_or(false) + }; + match mode { + StatusWaitMode::Done => check(&progress.analyze_asset) && check(&progress.downscaling) && check(&progress.deep_analyze), + StatusWaitMode::Analysed => check(&progress.analyze_asset) && check(&progress.deep_analyze), + StatusWaitMode::Transcoded => check(&progress.downscaling), + } +} + +fn is_error_status(status: &str) -> bool { + let s = status.trim().to_ascii_lowercase(); + matches!(s.as_str(), "error" | "failed") +} + +fn has_error_for_mode(mode: StatusWaitMode, progress: &AssetTaskProgress) -> bool { + let has_error = |entry: &Option<(String, f64)>| -> bool { + entry + .as_ref() + .map(|(status, _)| is_error_status(status)) + .unwrap_or(false) + }; + match mode { + StatusWaitMode::Done => { + has_error(&progress.analyze_asset) + || has_error(&progress.downscaling) + || has_error(&progress.deep_analyze) + } + StatusWaitMode::Analysed => has_error(&progress.analyze_asset) || has_error(&progress.deep_analyze), + StatusWaitMode::Transcoded => has_error(&progress.downscaling), + } +} + +fn asset_done_for_mode(mode: StatusWaitMode, progress: &AssetTaskProgress) -> bool { + // If one watched task errors for this asset, stop waiting for other tasks of this asset. + has_error_for_mode(mode, progress) || all_done_for_mode(mode, progress) +} + +fn task_progress_to_percent(progress: f64) -> f64 { + if (0.0..=1.0).contains(&progress) { + progress * 100.0 + } else { + progress.clamp(0.0, 100.0) + } +} + +fn render_status_row( + asset_id: &str, + progress: &AssetTaskProgress, +) -> String { + let analyze = progress + .analyze_asset + .as_ref() + .map(|(s, p)| format!("{}:{:.0}%", s, task_progress_to_percent(*p))) + .unwrap_or_else(|| "pending".to_string()); + let downscaling = progress + .downscaling + .as_ref() + .map(|(s, p)| format!("{}:{:.0}%", s, task_progress_to_percent(*p))) + .unwrap_or_else(|| "pending".to_string()); + let deep_analyze = progress + .deep_analyze + .as_ref() + .map(|(s, p)| format!("{}:{:.0}%", s, task_progress_to_percent(*p))) + .unwrap_or_else(|| "pending".to_string()); + let asset_display = if asset_id.len() > 20 { + format!("{}...{}", &asset_id[..8], &asset_id[asset_id.len().saturating_sub(8)..]) + } else { + asset_id.to_string() + }; + format!( + "asset_id={} | analyze asset={} | downscaling={} | deep analyze={}", + asset_display, analyze, downscaling, deep_analyze + ) +} + +fn render_status_rows( + ordered_asset_ids: &[String], + progress_by_asset: &HashMap, +) -> Vec { + let mut rows = Vec::with_capacity(ordered_asset_ids.len()); + for asset_id in ordered_asset_ids { + if let Some(progress) = progress_by_asset.get(asset_id) { + rows.push(render_status_row(asset_id, progress)); + } + } + rows +} + +async fn wait_for_asset_processing_status( + cfg: &Configuration, + api_key: &str, + bearer_opt: Option<&str>, + uploaded_asset_ids: &[String], + mode: StatusWaitMode, + script_start: std::time::Instant, +) -> Result<(), String> { + if uploaded_asset_ids.is_empty() { + output::info("No uploaded asset_id found; skipping task polling"); + return Ok(()); + } + let mut progress_by_asset: HashMap = uploaded_asset_ids + .iter() + .cloned() + .map(|id| (id, AssetTaskProgress::default())) + .collect(); + + output::info("Polling /users/tasks every 2s for processing status..."); + let mut status_progress = + crate::tui::InlineProgress::new("Processing Uploaded Assets", uploaded_asset_ids.len())?; + let status_handle = status_progress.clone_handle(); + let _ = status_handle.set_show_elapsed(false); + for (task_id, asset_id) in uploaded_asset_ids.iter().enumerate() { + let _ = status_handle.start_task(task_id, asset_id.clone(), 100); + } + let status_render_handle = status_progress.start_render_loop(status_handle.clone()); + loop { + let finish_before_seconds = (script_start.elapsed().as_secs() as i32) + 60; + let tasks = api::get_tasks_users_tasks_get( + cfg, + Some(finish_before_seconds), + Some(api_key), + bearer_opt, + ) + .await + .map_err(|e| format!("failed to get tasks: {}", e))?; + + for task in tasks { + let Some(normalized_type) = normalize_task_type(&task.task_type) else { + continue; + }; + if !needs_task_for_mode(mode, normalized_type) { + continue; + } + for asset_id in task.asset_ids { + let Some(entry) = progress_by_asset.get_mut(&asset_id) else { + continue; + }; + let pair = (task.status.clone(), task.progress); + match normalized_type { + "analyze asset" => entry.analyze_asset = Some(pair), + "downscaling" => entry.downscaling = Some(pair), + "deep analyze" => entry.deep_analyze = Some(pair), + _ => {} + } + } + } + + let rendered_rows = render_status_rows(uploaded_asset_ids, &progress_by_asset); + for (task_id, asset_id) in uploaded_asset_ids.iter().enumerate() { + if let Some(asset_progress) = progress_by_asset.get(asset_id) { + let row = rendered_rows + .get(task_id) + .cloned() + .unwrap_or_else(|| render_status_row(asset_id, asset_progress)); + let _ = status_handle.set_task_label(task_id, row); + let pct = match mode { + StatusWaitMode::Done => { + let mut sum = 0.0; + let mut count = 0.0; + if let Some((_, p)) = asset_progress.analyze_asset.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if let Some((_, p)) = asset_progress.downscaling.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if let Some((_, p)) = asset_progress.deep_analyze.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if count > 0.0 { sum / count } else { 0.0 } + } + StatusWaitMode::Analysed => { + let mut sum = 0.0; + let mut count = 0.0; + if let Some((_, p)) = asset_progress.analyze_asset.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if let Some((_, p)) = asset_progress.deep_analyze.as_ref() { + sum += task_progress_to_percent(*p); + count += 1.0; + } + if count > 0.0 { sum / count } else { 0.0 } + } + StatusWaitMode::Transcoded => asset_progress + .downscaling + .as_ref() + .map(|(_, p)| task_progress_to_percent(*p)) + .unwrap_or(0.0), + }; + let _ = status_handle.set_task_progress_pct(task_id, pct); + } + } + + let all_done = uploaded_asset_ids.iter().all(|asset_id| { + progress_by_asset + .get(asset_id) + .map(|p| asset_done_for_mode(mode, p)) + .unwrap_or(false) + }); + + if all_done { + for (task_id, asset_id) in uploaded_asset_ids.iter().enumerate() { + let has_error = progress_by_asset + .get(asset_id) + .map(|p| has_error_for_mode(mode, p)) + .unwrap_or(false); + let _ = status_handle.finish_task(task_id, !has_error); + if let Some(asset_progress) = progress_by_asset.get(asset_id) { + if let Some((status, _)) = asset_progress.downscaling.as_ref() { + if status.eq_ignore_ascii_case("error") + || status.eq_ignore_ascii_case("failed") + { + let _ = status_handle.add_warning(format!( + "asset_id={} reached terminal status with downscaling={}", + asset_id, status + )); + } + } + } + } + crate::tui::InlineProgress::stop_render_loop(status_render_handle).await; + status_progress.finish()?; + println!(); + output::success("All watched assets reached terminal state for selected tasks"); + break; + } + + sleep(Duration::from_secs(2)).await; + } Ok(()) } diff --git a/src/tui/inline_progress.rs b/src/tui/inline_progress.rs index 33b1394..f6ef6b5 100644 --- a/src/tui/inline_progress.rs +++ b/src/tui/inline_progress.rs @@ -29,6 +29,7 @@ struct TaskProgress { total_bytes: u64, uploaded_bytes: u64, completed: bool, + failed: bool, } #[derive(Clone)] @@ -49,6 +50,7 @@ pub(crate) struct ProgressState { title: String, total_tasks: usize, completed: usize, + show_elapsed: bool, in_progress: BTreeMap, recent_messages: Vec, max_messages: usize, @@ -70,6 +72,7 @@ impl InlineProgress { title: title.into(), total_tasks, completed: 0, + show_elapsed: true, in_progress: BTreeMap::new(), recent_messages: Vec::new(), max_messages: 5, @@ -166,6 +169,7 @@ impl ProgressHandle { total_bytes, uploaded_bytes: 0, completed: false, + failed: false, }, ); Ok(()) @@ -182,15 +186,42 @@ impl ProgressHandle { Ok(()) } + pub fn set_task_label(&self, task_id: TaskId, label: impl Into) -> Result<(), String> { + let mut state = self.state.lock().unwrap(); + if let Some(task) = state.in_progress.get_mut(&task_id) { + task.label = label.into(); + } + Ok(()) + } + + pub fn set_task_progress_pct(&self, task_id: TaskId, progress_pct: f64) -> Result<(), String> { + let mut state = self.state.lock().unwrap(); + if let Some(task) = state.in_progress.get_mut(&task_id) { + let clamped = progress_pct.clamp(0.0, 100.0); + task.progress = clamped; + if task.total_bytes > 0 { + task.uploaded_bytes = ((clamped / 100.0) * task.total_bytes as f64) as u64; + } + } + Ok(()) + } + + pub fn set_show_elapsed(&self, show_elapsed: bool) -> Result<(), String> { + let mut state = self.state.lock().unwrap(); + state.show_elapsed = show_elapsed; + Ok(()) + } + pub fn finish_task(&self, task_id: TaskId, success: bool) -> Result<(), String> { let mut state = self.state.lock().unwrap(); if let Some(task) = state.in_progress.get_mut(&task_id) { + task.completed = true; + task.failed = !success; if success { - task.completed = true; task.progress = 100.0; state.completed += 1; - // Don't add to messages - task stays visible in the list with completion status } + // Don't add to messages - task stays visible in the list with completion status } Ok(()) } @@ -253,9 +284,9 @@ pub fn draw_ui_internal(frame: &mut Frame, state: &ProgressState) { let bottom_area = areas[2]; let horizontal_areas = Layout::horizontal([ - Constraint::Percentage(40), + Constraint::Percentage(72), Constraint::Length(3), - Constraint::Percentage(57), + Constraint::Percentage(25), ]) .split(middle_area); let list_area = horizontal_areas[0]; @@ -304,22 +335,28 @@ pub fn draw_ui_internal(frame: &mut Frame, state: &ProgressState) { let items: Vec = tasks_to_show .iter() .map(|(_, task)| { - let elapsed_ms = task.started_at.elapsed().as_millis(); - let label = truncate_string(&task.label, 28); - let (icon, color) = if task.completed { + let label = truncate_string(&task.label, list_area.width.saturating_sub(4) as usize); + let (icon, color) = if task.failed { + ("✗", Color::Red) + } else if task.completed { ("✓", Color::Green) } else { ("●", Color::LightGreen) }; - ListItem::new(Line::from(vec![ + let mut spans = vec![ Span::raw(format!("{} ", icon)), Span::styled( label, Style::default().fg(color).add_modifier(Modifier::BOLD), ), - Span::raw(format!(" ({}ms)", elapsed_ms)), - ])) + ]; + if state.show_elapsed { + let elapsed_ms = task.started_at.elapsed().as_millis(); + spans.push(Span::raw(format!(" ({}ms)", elapsed_ms))); + } + + ListItem::new(Line::from(spans)) }) .collect(); @@ -344,7 +381,9 @@ pub fn draw_ui_internal(frame: &mut Frame, state: &ProgressState) { } for (i, (_, task)) in tasks_to_show.iter().enumerate() { - let gauge_color = if task.completed { + let gauge_color = if task.failed { + Color::Red + } else if task.completed { Color::Green } else { Color::Yellow