Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 62 additions & 144 deletions src/commands/upload/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use clap::{Args, Subcommand};
use regex::Regex;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
Expand Down Expand Up @@ -103,6 +102,7 @@ pub struct UploadCmdArgs {
pub disable_description_generation: bool,
}

#[derive(Clone)]
struct FileToUpload {
upload_path: PathBuf,
original_path: PathBuf,
Expand Down Expand Up @@ -415,130 +415,39 @@ fn run_upload(args: UploadCmdArgs) -> Result<(), String> {
});
}

let mut requests: Vec<AssetUploadRequest> = Vec::with_capacity(files_to_upload.len());
let mut related_umids_per_file: Vec<Vec<String>> = Vec::with_capacity(files_to_upload.len());
let mut file_upload_ids: Vec<String> = Vec::with_capacity(files_to_upload.len());
let mut file_in_app_paths: Vec<String> = Vec::with_capacity(files_to_upload.len());
let mut upload_paths: Vec<PathBuf> = Vec::with_capacity(files_to_upload.len());
for file_info in &files_to_upload {
let content_length = std::fs::metadata(&file_info.upload_path)
.map_err(|e| format!("failed to stat {}: {}", file_info.upload_path.display(), e))?
.len();

let upload_id = Uuid::new_v4().to_string();
file_upload_ids.push(upload_id.clone());

output::info(format!(
"Build upload request: id={} file={} size={} bytes",
upload_id,
file_info.upload_path.display(),
content_length
));

let file_in_app_path = super::utils::compute_in_app_path(
&file_info.original_path,
&base_dir,
&args.in_app_path,
);
file_in_app_paths.push(file_in_app_path.clone());
upload_paths.push(file_info.upload_path.clone());

let now_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i32;

let file_name_str = file_info
.original_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let umid = extract_media_metadata(&file_info.original_path)
.ok();
let mut source_info = SourceFileInfo::new(
"__user_upload__".to_string(),
None,
None,
vec!["__current_user__".to_string()],
Some(now_secs),
now_secs,
vec![file_in_app_path.clone()],
Some(file_name_str),
None,
vec![],
);

if let Some(metadata) = umid.as_ref() {
if let Some(umid_value) = metadata.material_package_umid.as_ref() {
source_info.capture_device_umid = Some(Some(umid_value.clone()));
}
if let Some(first_with_data) = metadata.file_package_umids.iter().find(|u| u.has_data) {
source_info.umid = Some(Some(first_with_data.umid.clone()));
}
}
if is_mxf_file(&file_info.original_path) {
if let Ok(Some(probe)) = get_ffprobe_json(&file_info.original_path) {
if let serde_json::Value::Object(map) = probe {
source_info.original_ffprobe_metadata =
Some(Some(map.into_iter().collect()));
}
}
}
related_umids_per_file.push(
umid.map(|m| m.file_package_umids.iter().map(|u| u.umid.clone()).collect())
.unwrap_or_default(),
);

let req = AssetUploadRequest::new(
i32::try_from(content_length).unwrap_or(i32::MAX),
upload_id,
source_info,
);
requests.push(req);
}

// local_encoding false path: default generate_proxy to 720 when omitted
let effective_generate_proxy =
args.generate_proxy.clone().or_else(|| Some(vec![GenerateProxy::Variant720]));

let base_dir = base_dir.clone();
let in_app_path = args.in_app_path.clone();

tokio::runtime::Runtime::new()
.map_err(|e| format!("failed to start runtime: {}", e))?
.block_on(async move {
let bearer_header = api_config::get_bearer_header(args.auth_bearer.clone());
let mut progress =
crate::tui::InlineProgress::new("Uploading Files", upload_paths.len())?;
crate::tui::InlineProgress::new("Uploading Files", files_to_upload.len())?;
let progress_handle = progress.clone_handle();
let render_handle = progress.start_render_loop(progress_handle.clone());

let _ = progress_handle.add_info(format!(
"Requesting presigned URLs for {} files...",
requests.len()
"One presigned request per file ({} files), {} parallel upload(s)",
files_to_upload.len(),
args.parallel_uploads
));

let responses =
request_presigned_urls(&cfg, &requests, &api_key, bearer_header.as_deref()).await?;
let _ =
progress_handle.add_info(format!("Received {} presigned URL(s)", responses.len()));

let mut id_to_resp: HashMap<String, AssetUploadResponse> = HashMap::new();
for r in responses.iter().cloned() {
id_to_resp.insert(r.upload_id.clone(), r);
}

let upload_result = upload_to_presigned_urls(
&upload_paths,
&file_upload_ids,
&file_in_app_paths,
&id_to_resp,
let upload_result = upload_with_per_file_presigned(
&files_to_upload,
&base_dir,
&in_app_path,
&upload_request_id,
&user_id,
args.parallel_uploads,
&progress_handle,
&cfg,
&api_key,
bearer_header.as_deref(),
&related_umids_per_file,
args.disable_description_generation,
effective_generate_proxy.as_ref(),
)
Expand Down Expand Up @@ -1040,19 +949,18 @@ async fn request_presigned_urls(
Err(m)
}

async fn upload_to_presigned_urls(
files: &Vec<PathBuf>,
file_upload_ids: &Vec<String>,
file_in_app_paths: &Vec<String>,
id_to_resp: &HashMap<String, AssetUploadResponse>,
/// Upload files with one presigned URL request per file (queue-style, like encode local).
async fn upload_with_per_file_presigned(
files_to_upload: &[FileToUpload],
base_dir: &PathBuf,
in_app_path: &Option<String>,
upload_request_id: &str,
user_id: &str,
max_concurrent: usize,
progress_handle: &ProgressHandle,
cfg: &Configuration,
api_key: &str,
bearer_opt: Option<&str>,
related_umids_per_file: &[Vec<String>],
disable_description_generation: bool,
generate_proxy: Option<&Vec<GenerateProxy>>,
) -> Result<(), String> {
Expand All @@ -1068,56 +976,67 @@ async fn upload_to_presigned_urls(
let cfg = cfg.clone();
let api_key = api_key.to_string();
let bearer_opt = bearer_opt.map(String::from);
let base_dir = base_dir.clone();
let in_app_path = in_app_path.clone();
let upload_request_id = upload_request_id.to_string();
let user_id = user_id.to_string();
let generate_proxy = generate_proxy.cloned();

for (i, file_path) in files.iter().enumerate() {
let file_path = file_path.clone();
let upload_id = file_upload_ids[i].clone();
let in_app_path = file_in_app_paths[i].clone();
let upload_resp = id_to_resp
.get(&upload_id)
.ok_or_else(|| format!("missing presigned url for upload_id {}", upload_id))?
.clone();
let file_related_umids = related_umids_per_file
.get(i)
.cloned()
.unwrap_or_default();
for (i, file_info) in files_to_upload.iter().enumerate() {
let file_info = file_info.clone();
let http_clone = Arc::clone(&http);
let semaphore_clone = Arc::clone(&semaphore);
let progress_handle_clone = progress_handle.clone();
let user_id = user_id.to_string();
let upload_request_id = upload_request_id.to_string();
let task_id = i;
let cfg_clone = cfg.clone();
let api_key_clone = api_key.clone();
let bearer_clone = bearer_opt.clone();
let generate_proxy_clone = generate_proxy.cloned();

let file_name = file_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let file_size = std::fs::metadata(&file_path)
.map_err(|e| format!("failed to stat {}: {}", file_path.display(), e))?
.len();

let _ = progress_handle_clone.start_task(task_id, file_name.clone(), file_size);
let base_dir_clone = base_dir.clone();
let in_app_path_clone = in_app_path.clone();
let upload_request_id_clone = upload_request_id.clone();
let user_id_clone = user_id.clone();
let generate_proxy_clone = generate_proxy.clone();
let task_id = i;

let task = tokio::spawn(async move {
let _permit = semaphore_clone
.acquire()
.await
.map_err(|e| format!("failed to acquire semaphore: {}", e))?;

let (req, _upload_id, in_app_path_str, file_related_umids) =
build_single_upload_request(&file_info, &base_dir_clone, &in_app_path_clone)?;

let file_size = req.content_length.max(0) as u64;

let responses = request_presigned_urls(
&cfg_clone,
&vec![req],
&api_key_clone,
bearer_clone.as_deref(),
)
.await?;
let upload_resp = responses
.into_iter()
.next()
.ok_or_else(|| "missing presigned response".to_string())?;

let file_name = file_info
.upload_path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.to_string();
let _ = progress_handle_clone.start_task(task_id, file_name.clone(), file_size);

let result = upload_single_file(
&file_path,
&upload_id,
&file_info.upload_path,
&upload_resp.upload_id,
&upload_resp,
&in_app_path,
&upload_request_id,
&user_id,
&in_app_path_str,
&upload_request_id_clone,
&user_id_clone,
task_id,
&http_clone,
http_clone.as_ref(),
&progress_handle_clone,
file_size,
&cfg_clone,
Expand All @@ -1130,14 +1049,13 @@ async fn upload_to_presigned_urls(
let _ = progress_handle_clone.finish_task(task_id, success);

if success {
// Call preprocess as soon as this upload finishes
let mut preproc_req = ProcessAssetsRequest::new(
vec![upload_resp],
None::<tellers_api_client::models::VersionReference>,
);
preproc_req.generate_time_based_media_description =
Some(!disable_description_generation);
if let Some(proxy) = generate_proxy_clone.as_ref() {
if let Some(ref proxy) = generate_proxy_clone {
preproc_req.generate_proxy = Some(proxy.clone());
}
if !file_related_umids.is_empty() {
Expand Down
Loading