From 7d8dd7142598c947be975202fdcc2e333e5166c5 Mon Sep 17 00:00:00 2001 From: bjay kamwa watanabe Date: Thu, 26 Mar 2026 11:17:39 +0100 Subject: [PATCH] Clean up ui for simple upload --- src/commands/upload/main.rs | 206 +++++++++++------------------------- 1 file changed, 62 insertions(+), 144 deletions(-) diff --git a/src/commands/upload/main.rs b/src/commands/upload/main.rs index 65b7435..3b3c800 100644 --- a/src/commands/upload/main.rs +++ b/src/commands/upload/main.rs @@ -1,6 +1,5 @@ use clap::{Args, Subcommand}; use regex::Regex; -use std::collections::HashMap; use std::fs::File; use std::io::Read; use std::path::PathBuf; @@ -103,6 +102,7 @@ pub struct UploadCmdArgs { pub disable_description_generation: bool, } +#[derive(Clone)] struct FileToUpload { upload_path: PathBuf, original_path: PathBuf, @@ -415,122 +415,32 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { }); } - let mut requests: Vec = Vec::with_capacity(files_to_upload.len()); - let mut related_umids_per_file: Vec> = Vec::with_capacity(files_to_upload.len()); - let mut file_upload_ids: Vec = Vec::with_capacity(files_to_upload.len()); - let mut file_in_app_paths: Vec = Vec::with_capacity(files_to_upload.len()); - let mut upload_paths: Vec = Vec::with_capacity(files_to_upload.len()); - for file_info in &files_to_upload { - let content_length = std::fs::metadata(&file_info.upload_path) - .map_err(|e| format!("failed to stat {}: {}", file_info.upload_path.display(), e))? - .len(); - - let upload_id = Uuid::new_v4().to_string(); - file_upload_ids.push(upload_id.clone()); - - output::info(format!( - "Build upload request: id={} file={} size={} bytes", - upload_id, - file_info.upload_path.display(), - content_length - )); - - let file_in_app_path = super::utils::compute_in_app_path( - &file_info.original_path, - &base_dir, - &args.in_app_path, - ); - file_in_app_paths.push(file_in_app_path.clone()); - upload_paths.push(file_info.upload_path.clone()); - - let now_secs = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() as i32; - - let file_name_str = file_info - .original_path - .file_name() - .unwrap_or_default() - .to_string_lossy() - .to_string(); - let umid = extract_media_metadata(&file_info.original_path) - .ok(); - let mut source_info = SourceFileInfo::new( - "__user_upload__".to_string(), - None, - None, - vec!["__current_user__".to_string()], - Some(now_secs), - now_secs, - vec![file_in_app_path.clone()], - Some(file_name_str), - None, - vec![], - ); - - if let Some(metadata) = umid.as_ref() { - if let Some(umid_value) = metadata.material_package_umid.as_ref() { - source_info.capture_device_umid = Some(Some(umid_value.clone())); - } - if let Some(first_with_data) = metadata.file_package_umids.iter().find(|u| u.has_data) { - source_info.umid = Some(Some(first_with_data.umid.clone())); - } - } - if is_mxf_file(&file_info.original_path) { - if let Ok(Some(probe)) = get_ffprobe_json(&file_info.original_path) { - if let serde_json::Value::Object(map) = probe { - source_info.original_ffprobe_metadata = - Some(Some(map.into_iter().collect())); - } - } - } - related_umids_per_file.push( - umid.map(|m| m.file_package_umids.iter().map(|u| u.umid.clone()).collect()) - .unwrap_or_default(), - ); - - let req = AssetUploadRequest::new( - i32::try_from(content_length).unwrap_or(i32::MAX), - upload_id, - source_info, - ); - requests.push(req); - } - // local_encoding false path: default generate_proxy to 720 when omitted let effective_generate_proxy = args.generate_proxy.clone().or_else(|| Some(vec![GenerateProxy::Variant720])); + let base_dir = base_dir.clone(); + let in_app_path = args.in_app_path.clone(); + tokio::runtime::Runtime::new() .map_err(|e| format!("failed to start runtime: {}", e))? .block_on(async move { let bearer_header = api_config::get_bearer_header(args.auth_bearer.clone()); let mut progress = - crate::tui::InlineProgress::new("Uploading Files", upload_paths.len())?; + crate::tui::InlineProgress::new("Uploading Files", files_to_upload.len())?; let progress_handle = progress.clone_handle(); let render_handle = progress.start_render_loop(progress_handle.clone()); let _ = progress_handle.add_info(format!( - "Requesting presigned URLs for {} files...", - requests.len() + "One presigned request per file ({} files), {} parallel upload(s)", + files_to_upload.len(), + args.parallel_uploads )); - let responses = - request_presigned_urls(&cfg, &requests, &api_key, bearer_header.as_deref()).await?; - let _ = - progress_handle.add_info(format!("Received {} presigned URL(s)", responses.len())); - - let mut id_to_resp: HashMap = HashMap::new(); - for r in responses.iter().cloned() { - id_to_resp.insert(r.upload_id.clone(), r); - } - - let upload_result = upload_to_presigned_urls( - &upload_paths, - &file_upload_ids, - &file_in_app_paths, - &id_to_resp, + let upload_result = upload_with_per_file_presigned( + &files_to_upload, + &base_dir, + &in_app_path, &upload_request_id, &user_id, args.parallel_uploads, @@ -538,7 +448,6 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> { &cfg, &api_key, bearer_header.as_deref(), - &related_umids_per_file, args.disable_description_generation, effective_generate_proxy.as_ref(), ) @@ -1040,11 +949,11 @@ async fn request_presigned_urls( Err(m) } -async fn upload_to_presigned_urls( - files: &Vec, - file_upload_ids: &Vec, - file_in_app_paths: &Vec, - id_to_resp: &HashMap, +/// Upload files with one presigned URL request per file (queue-style, like encode local). +async fn upload_with_per_file_presigned( + files_to_upload: &[FileToUpload], + base_dir: &PathBuf, + in_app_path: &Option, upload_request_id: &str, user_id: &str, max_concurrent: usize, @@ -1052,7 +961,6 @@ async fn upload_to_presigned_urls( cfg: &Configuration, api_key: &str, bearer_opt: Option<&str>, - related_umids_per_file: &[Vec], disable_description_generation: bool, generate_proxy: Option<&Vec>, ) -> Result<(), String> { @@ -1068,40 +976,26 @@ async fn upload_to_presigned_urls( let cfg = cfg.clone(); let api_key = api_key.to_string(); let bearer_opt = bearer_opt.map(String::from); + let base_dir = base_dir.clone(); + let in_app_path = in_app_path.clone(); + let upload_request_id = upload_request_id.to_string(); + let user_id = user_id.to_string(); + let generate_proxy = generate_proxy.cloned(); - for (i, file_path) in files.iter().enumerate() { - let file_path = file_path.clone(); - let upload_id = file_upload_ids[i].clone(); - let in_app_path = file_in_app_paths[i].clone(); - let upload_resp = id_to_resp - .get(&upload_id) - .ok_or_else(|| format!("missing presigned url for upload_id {}", upload_id))? - .clone(); - let file_related_umids = related_umids_per_file - .get(i) - .cloned() - .unwrap_or_default(); + for (i, file_info) in files_to_upload.iter().enumerate() { + let file_info = file_info.clone(); let http_clone = Arc::clone(&http); let semaphore_clone = Arc::clone(&semaphore); let progress_handle_clone = progress_handle.clone(); - let user_id = user_id.to_string(); - let upload_request_id = upload_request_id.to_string(); - let task_id = i; let cfg_clone = cfg.clone(); let api_key_clone = api_key.clone(); let bearer_clone = bearer_opt.clone(); - let generate_proxy_clone = generate_proxy.cloned(); - - let file_name = file_path - .file_name() - .unwrap_or_default() - .to_string_lossy() - .to_string(); - let file_size = std::fs::metadata(&file_path) - .map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))? - .len(); - - let _ = progress_handle_clone.start_task(task_id, file_name.clone(), file_size); + let base_dir_clone = base_dir.clone(); + let in_app_path_clone = in_app_path.clone(); + let upload_request_id_clone = upload_request_id.clone(); + let user_id_clone = user_id.clone(); + let generate_proxy_clone = generate_proxy.clone(); + let task_id = i; let task = tokio::spawn(async move { let _permit = semaphore_clone @@ -1109,15 +1003,40 @@ async fn upload_to_presigned_urls( .await .map_err(|e| format!("failed to acquire semaphore: {}", e))?; + let (req, _upload_id, in_app_path_str, file_related_umids) = + build_single_upload_request(&file_info, &base_dir_clone, &in_app_path_clone)?; + + let file_size = req.content_length.max(0) as u64; + + let responses = request_presigned_urls( + &cfg_clone, + &vec![req], + &api_key_clone, + bearer_clone.as_deref(), + ) + .await?; + let upload_resp = responses + .into_iter() + .next() + .ok_or_else(|| "missing presigned response".to_string())?; + + let file_name = file_info + .upload_path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + let _ = progress_handle_clone.start_task(task_id, file_name.clone(), file_size); + let result = upload_single_file( - &file_path, - &upload_id, + &file_info.upload_path, + &upload_resp.upload_id, &upload_resp, - &in_app_path, - &upload_request_id, - &user_id, + &in_app_path_str, + &upload_request_id_clone, + &user_id_clone, task_id, - &http_clone, + http_clone.as_ref(), &progress_handle_clone, file_size, &cfg_clone, @@ -1130,14 +1049,13 @@ async fn upload_to_presigned_urls( let _ = progress_handle_clone.finish_task(task_id, success); if success { - // Call preprocess as soon as this upload finishes let mut preproc_req = ProcessAssetsRequest::new( vec![upload_resp], None::, ); preproc_req.generate_time_based_media_description = Some(!disable_description_generation); - if let Some(proxy) = generate_proxy_clone.as_ref() { + if let Some(ref proxy) = generate_proxy_clone { preproc_req.generate_proxy = Some(proxy.clone()); } if !file_related_umids.is_empty() {