11use clap:: { Args , Subcommand } ;
22use regex:: Regex ;
3- use std:: collections:: HashMap ;
43use std:: fs:: File ;
54use std:: io:: Read ;
65use std:: path:: PathBuf ;
@@ -103,6 +102,7 @@ pub struct UploadCmdArgs {
103102 pub disable_description_generation : bool ,
104103}
105104
105+ #[ derive( Clone ) ]
106106struct FileToUpload {
107107 upload_path : PathBuf ,
108108 original_path : PathBuf ,
@@ -415,130 +415,39 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {
415415 } ) ;
416416 }
417417
418- let mut requests: Vec < AssetUploadRequest > = Vec :: with_capacity ( files_to_upload. len ( ) ) ;
419- let mut related_umids_per_file: Vec < Vec < String > > = Vec :: with_capacity ( files_to_upload. len ( ) ) ;
420- let mut file_upload_ids: Vec < String > = Vec :: with_capacity ( files_to_upload. len ( ) ) ;
421- let mut file_in_app_paths: Vec < String > = Vec :: with_capacity ( files_to_upload. len ( ) ) ;
422- let mut upload_paths: Vec < PathBuf > = Vec :: with_capacity ( files_to_upload. len ( ) ) ;
423- for file_info in & files_to_upload {
424- let content_length = std:: fs:: metadata ( & file_info. upload_path )
425- . map_err ( |e| format ! ( "failed to stat {}: {}" , file_info. upload_path. display( ) , e) ) ?
426- . len ( ) ;
427-
428- let upload_id = Uuid :: new_v4 ( ) . to_string ( ) ;
429- file_upload_ids. push ( upload_id. clone ( ) ) ;
430-
431- output:: info ( format ! (
432- "Build upload request: id={} file={} size={} bytes" ,
433- upload_id,
434- file_info. upload_path. display( ) ,
435- content_length
436- ) ) ;
437-
438- let file_in_app_path = super :: utils:: compute_in_app_path (
439- & file_info. original_path ,
440- & base_dir,
441- & args. in_app_path ,
442- ) ;
443- file_in_app_paths. push ( file_in_app_path. clone ( ) ) ;
444- upload_paths. push ( file_info. upload_path . clone ( ) ) ;
445-
446- let now_secs = std:: time:: SystemTime :: now ( )
447- . duration_since ( std:: time:: UNIX_EPOCH )
448- . unwrap_or_default ( )
449- . as_secs ( ) as i32 ;
450-
451- let file_name_str = file_info
452- . original_path
453- . file_name ( )
454- . unwrap_or_default ( )
455- . to_string_lossy ( )
456- . to_string ( ) ;
457- let umid = extract_media_metadata ( & file_info. original_path )
458- . ok ( ) ;
459- let mut source_info = SourceFileInfo :: new (
460- "__user_upload__" . to_string ( ) ,
461- None ,
462- None ,
463- vec ! [ "__current_user__" . to_string( ) ] ,
464- Some ( now_secs) ,
465- now_secs,
466- vec ! [ file_in_app_path. clone( ) ] ,
467- Some ( file_name_str) ,
468- None ,
469- vec ! [ ] ,
470- ) ;
471-
472- if let Some ( metadata) = umid. as_ref ( ) {
473- if let Some ( umid_value) = metadata. material_package_umid . as_ref ( ) {
474- source_info. capture_device_umid = Some ( Some ( umid_value. clone ( ) ) ) ;
475- }
476- if let Some ( first_with_data) = metadata. file_package_umids . iter ( ) . find ( |u| u. has_data ) {
477- source_info. umid = Some ( Some ( first_with_data. umid . clone ( ) ) ) ;
478- }
479- }
480- if is_mxf_file ( & file_info. original_path ) {
481- if let Ok ( Some ( probe) ) = get_ffprobe_json ( & file_info. original_path ) {
482- if let serde_json:: Value :: Object ( map) = probe {
483- source_info. original_ffprobe_metadata =
484- Some ( Some ( map. into_iter ( ) . collect ( ) ) ) ;
485- }
486- }
487- }
488- related_umids_per_file. push (
489- umid. map ( |m| m. file_package_umids . iter ( ) . map ( |u| u. umid . clone ( ) ) . collect ( ) )
490- . unwrap_or_default ( ) ,
491- ) ;
492-
493- let req = AssetUploadRequest :: new (
494- i32:: try_from ( content_length) . unwrap_or ( i32:: MAX ) ,
495- upload_id,
496- source_info,
497- ) ;
498- requests. push ( req) ;
499- }
500-
501418 // local_encoding false path: default generate_proxy to 720 when omitted
502419 let effective_generate_proxy =
503420 args. generate_proxy . clone ( ) . or_else ( || Some ( vec ! [ GenerateProxy :: Variant720 ] ) ) ;
504421
422+ let base_dir = base_dir. clone ( ) ;
423+ let in_app_path = args. in_app_path . clone ( ) ;
424+
505425 tokio:: runtime:: Runtime :: new ( )
506426 . map_err ( |e| format ! ( "failed to start runtime: {}" , e) ) ?
507427 . block_on ( async move {
508428 let bearer_header = api_config:: get_bearer_header ( args. auth_bearer . clone ( ) ) ;
509429 let mut progress =
510- crate :: tui:: InlineProgress :: new ( "Uploading Files" , upload_paths . len ( ) ) ?;
430+ crate :: tui:: InlineProgress :: new ( "Uploading Files" , files_to_upload . len ( ) ) ?;
511431 let progress_handle = progress. clone_handle ( ) ;
512432 let render_handle = progress. start_render_loop ( progress_handle. clone ( ) ) ;
513433
514434 let _ = progress_handle. add_info ( format ! (
515- "Requesting presigned URLs for {} files..." ,
516- requests. len( )
435+ "One presigned request per file ({} files), {} parallel upload(s)" ,
436+ files_to_upload. len( ) ,
437+ args. parallel_uploads
517438 ) ) ;
518439
519- let responses =
520- request_presigned_urls ( & cfg, & requests, & api_key, bearer_header. as_deref ( ) ) . await ?;
521- let _ =
522- progress_handle. add_info ( format ! ( "Received {} presigned URL(s)" , responses. len( ) ) ) ;
523-
524- let mut id_to_resp: HashMap < String , AssetUploadResponse > = HashMap :: new ( ) ;
525- for r in responses. iter ( ) . cloned ( ) {
526- id_to_resp. insert ( r. upload_id . clone ( ) , r) ;
527- }
528-
529- let upload_result = upload_to_presigned_urls (
530- & upload_paths,
531- & file_upload_ids,
532- & file_in_app_paths,
533- & id_to_resp,
440+ let upload_result = upload_with_per_file_presigned (
441+ & files_to_upload,
442+ & base_dir,
443+ & in_app_path,
534444 & upload_request_id,
535445 & user_id,
536446 args. parallel_uploads ,
537447 & progress_handle,
538448 & cfg,
539449 & api_key,
540450 bearer_header. as_deref ( ) ,
541- & related_umids_per_file,
542451 args. disable_description_generation ,
543452 effective_generate_proxy. as_ref ( ) ,
544453 )
@@ -1040,19 +949,18 @@ async fn request_presigned_urls(
1040949 Err ( m)
1041950}
1042951
1043- async fn upload_to_presigned_urls (
1044- files : & Vec < PathBuf > ,
1045- file_upload_ids : & Vec < String > ,
1046- file_in_app_paths : & Vec < String > ,
1047- id_to_resp : & HashMap < String , AssetUploadResponse > ,
952+ /// Upload files with one presigned URL request per file (queue-style, like encode local).
953+ async fn upload_with_per_file_presigned (
954+ files_to_upload : & [ FileToUpload ] ,
955+ base_dir : & PathBuf ,
956+ in_app_path : & Option < String > ,
1048957 upload_request_id : & str ,
1049958 user_id : & str ,
1050959 max_concurrent : usize ,
1051960 progress_handle : & ProgressHandle ,
1052961 cfg : & Configuration ,
1053962 api_key : & str ,
1054963 bearer_opt : Option < & str > ,
1055- related_umids_per_file : & [ Vec < String > ] ,
1056964 disable_description_generation : bool ,
1057965 generate_proxy : Option < & Vec < GenerateProxy > > ,
1058966) -> Result < ( ) , String > {
@@ -1068,56 +976,67 @@ async fn upload_to_presigned_urls(
1068976 let cfg = cfg. clone ( ) ;
1069977 let api_key = api_key. to_string ( ) ;
1070978 let bearer_opt = bearer_opt. map ( String :: from) ;
979+ let base_dir = base_dir. clone ( ) ;
980+ let in_app_path = in_app_path. clone ( ) ;
981+ let upload_request_id = upload_request_id. to_string ( ) ;
982+ let user_id = user_id. to_string ( ) ;
983+ let generate_proxy = generate_proxy. cloned ( ) ;
1071984
1072- for ( i, file_path) in files. iter ( ) . enumerate ( ) {
1073- let file_path = file_path. clone ( ) ;
1074- let upload_id = file_upload_ids[ i] . clone ( ) ;
1075- let in_app_path = file_in_app_paths[ i] . clone ( ) ;
1076- let upload_resp = id_to_resp
1077- . get ( & upload_id)
1078- . ok_or_else ( || format ! ( "missing presigned url for upload_id {}" , upload_id) ) ?
1079- . clone ( ) ;
1080- let file_related_umids = related_umids_per_file
1081- . get ( i)
1082- . cloned ( )
1083- . unwrap_or_default ( ) ;
985+ for ( i, file_info) in files_to_upload. iter ( ) . enumerate ( ) {
986+ let file_info = file_info. clone ( ) ;
1084987 let http_clone = Arc :: clone ( & http) ;
1085988 let semaphore_clone = Arc :: clone ( & semaphore) ;
1086989 let progress_handle_clone = progress_handle. clone ( ) ;
1087- let user_id = user_id. to_string ( ) ;
1088- let upload_request_id = upload_request_id. to_string ( ) ;
1089- let task_id = i;
1090990 let cfg_clone = cfg. clone ( ) ;
1091991 let api_key_clone = api_key. clone ( ) ;
1092992 let bearer_clone = bearer_opt. clone ( ) ;
1093- let generate_proxy_clone = generate_proxy. cloned ( ) ;
1094-
1095- let file_name = file_path
1096- . file_name ( )
1097- . unwrap_or_default ( )
1098- . to_string_lossy ( )
1099- . to_string ( ) ;
1100- let file_size = std:: fs:: metadata ( & file_path)
1101- . map_err ( |e| format ! ( "failed to stat {}: {}" , file_path. display( ) , e) ) ?
1102- . len ( ) ;
1103-
1104- let _ = progress_handle_clone. start_task ( task_id, file_name. clone ( ) , file_size) ;
993+ let base_dir_clone = base_dir. clone ( ) ;
994+ let in_app_path_clone = in_app_path. clone ( ) ;
995+ let upload_request_id_clone = upload_request_id. clone ( ) ;
996+ let user_id_clone = user_id. clone ( ) ;
997+ let generate_proxy_clone = generate_proxy. clone ( ) ;
998+ let task_id = i;
1105999
11061000 let task = tokio:: spawn ( async move {
11071001 let _permit = semaphore_clone
11081002 . acquire ( )
11091003 . await
11101004 . map_err ( |e| format ! ( "failed to acquire semaphore: {}" , e) ) ?;
11111005
1006+ let ( req, _upload_id, in_app_path_str, file_related_umids) =
1007+ build_single_upload_request ( & file_info, & base_dir_clone, & in_app_path_clone) ?;
1008+
1009+ let file_size = req. content_length . max ( 0 ) as u64 ;
1010+
1011+ let responses = request_presigned_urls (
1012+ & cfg_clone,
1013+ & vec ! [ req] ,
1014+ & api_key_clone,
1015+ bearer_clone. as_deref ( ) ,
1016+ )
1017+ . await ?;
1018+ let upload_resp = responses
1019+ . into_iter ( )
1020+ . next ( )
1021+ . ok_or_else ( || "missing presigned response" . to_string ( ) ) ?;
1022+
1023+ let file_name = file_info
1024+ . upload_path
1025+ . file_name ( )
1026+ . unwrap_or_default ( )
1027+ . to_string_lossy ( )
1028+ . to_string ( ) ;
1029+ let _ = progress_handle_clone. start_task ( task_id, file_name. clone ( ) , file_size) ;
1030+
11121031 let result = upload_single_file (
1113- & file_path ,
1114- & upload_id,
1032+ & file_info . upload_path ,
1033+ & upload_resp . upload_id ,
11151034 & upload_resp,
1116- & in_app_path ,
1117- & upload_request_id ,
1118- & user_id ,
1035+ & in_app_path_str ,
1036+ & upload_request_id_clone ,
1037+ & user_id_clone ,
11191038 task_id,
1120- & http_clone,
1039+ http_clone. as_ref ( ) ,
11211040 & progress_handle_clone,
11221041 file_size,
11231042 & cfg_clone,
@@ -1130,14 +1049,13 @@ async fn upload_to_presigned_urls(
11301049 let _ = progress_handle_clone. finish_task ( task_id, success) ;
11311050
11321051 if success {
1133- // Call preprocess as soon as this upload finishes
11341052 let mut preproc_req = ProcessAssetsRequest :: new (
11351053 vec ! [ upload_resp] ,
11361054 None :: < tellers_api_client:: models:: VersionReference > ,
11371055 ) ;
11381056 preproc_req. generate_time_based_media_description =
11391057 Some ( !disable_description_generation) ;
1140- if let Some ( proxy) = generate_proxy_clone. as_ref ( ) {
1058+ if let Some ( ref proxy) = generate_proxy_clone {
11411059 preproc_req. generate_proxy = Some ( proxy. clone ( ) ) ;
11421060 }
11431061 if !file_related_umids. is_empty ( ) {
0 commit comments