@@ -98,6 +98,66 @@ async def _get_upload_record(db: AsyncSession, upload_id: str) -> models.UploadR
9898 return record
9999
100100
101+ async def _finalize_upload (
102+ db : AsyncSession ,
103+ record : models .UploadRecord ,
104+ queue : ProcessingQueue | None ,
105+ ) -> models .UploadRecord :
106+ """Validate and finalize an uploaded file after all bytes have been received."""
107+ if record .upload_length is None :
108+ raise HTTPException (status_code = status .HTTP_409_CONFLICT , detail = "Upload length unknown" )
109+
110+ if record .upload_offset < record .upload_length :
111+ raise HTTPException (status_code = status .HTTP_409_CONFLICT , detail = "Upload not finished" )
112+
113+ if record .status in {"completed" , "postprocessing" }:
114+ return record
115+
116+ path = Path (record .storage_path )
117+ if not path .exists ():
118+ raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "Uploaded file not found" )
119+
120+ try :
121+ actual_mimetype : str = detect_mimetype (path )
122+ except Exception as e :
123+ raise HTTPException (
124+ status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
125+ detail = f"Failed to detect file type: { e } " ,
126+ ) from e
127+
128+ stmt : Select [tuple [models .UploadToken ]] = select (models .UploadToken ).where (models .UploadToken .id == record .token_id )
129+ res : Result [tuple [models .UploadToken ]] = await db .execute (stmt )
130+
131+ if not (token := res .scalar_one_or_none ()):
132+ raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "Token not found" )
133+
134+ if not mime_allowed (actual_mimetype , token .allowed_mime ):
135+ path .unlink (missing_ok = True )
136+ await db .delete (record )
137+ await db .commit ()
138+ raise HTTPException (
139+ status_code = status .HTTP_415_UNSUPPORTED_MEDIA_TYPE ,
140+ detail = f"Actual file type '{ actual_mimetype } ' does not match allowed types" ,
141+ )
142+
143+ record .mimetype = actual_mimetype
144+
145+ if is_multimedia (actual_mimetype ):
146+ record .status = "postprocessing"
147+ record .completed_at = None
148+ await db .commit ()
149+ await db .refresh (record )
150+ if queue :
151+ await queue .enqueue (record .public_id )
152+ return record
153+
154+ record .status = "completed"
155+ record .completed_at = datetime .now (UTC )
156+ await db .commit ()
157+ await db .refresh (record )
158+ return record
159+
160+
101161@router .post ("/initiate" , response_model = schemas .InitiateUploadResponse , status_code = status .HTTP_201_CREATED , name = "initiate_upload" )
102162async def initiate_upload (
103163 request : Request ,
@@ -207,7 +267,6 @@ async def tus_patch(
207267 upload_id : str ,
208268 request : Request ,
209269 db : Annotated [AsyncSession , Depends (get_db )],
210- queue : Annotated [ProcessingQueue | None , Depends (get_processing_queue )],
211270 upload_offset : Annotated [int , Header (convert_underscores = False , alias = "Upload-Offset" )] = ...,
212271 content_length : Annotated [int | None , Header ()] = None ,
213272 content_type : Annotated [str , Header (convert_underscores = False , alias = "Content-Type" )] = ...,
@@ -219,7 +278,6 @@ async def tus_patch(
219278 upload_id (str): The public ID of the upload.
220279 request (Request): The incoming HTTP request.
221280 db (AsyncSession): Database session.
222- queue (ProcessingQueue | None): The processing queue for post-processing.
223281 upload_offset (int): The current upload offset from the client.
224282 content_length (int | None): The Content-Length header value.
225283 content_type (str): The Content-Type header value.
@@ -268,52 +326,14 @@ async def tus_patch(
268326 if record .upload_offset > record .upload_length :
269327 raise HTTPException (status_code = status .HTTP_413_CONTENT_TOO_LARGE , detail = "Upload exceeds declared length" )
270328
271- if record .upload_offset == record .upload_length :
272- try :
273- actual_mimetype : str = detect_mimetype (path )
274- except Exception as e :
275- raise HTTPException (
276- status_code = status .HTTP_500_INTERNAL_SERVER_ERROR ,
277- detail = f"Failed to detect file type: { e } " ,
278- )
279-
280- stmt : Select [tuple [models .UploadToken ]] = select (models .UploadToken ).where (models .UploadToken .id == record .token_id )
281- res : Result [tuple [models .UploadToken ]] = await db .execute (stmt )
282-
283- if not (token := res .scalar_one_or_none ()):
284- raise HTTPException (status_code = status .HTTP_404_NOT_FOUND , detail = "Token not found" )
285-
286- if not mime_allowed (actual_mimetype , token .allowed_mime ):
287- path .unlink (missing_ok = True )
288- await db .delete (record )
289- await db .commit ()
290- raise HTTPException (
291- status_code = status .HTTP_415_UNSUPPORTED_MEDIA_TYPE ,
292- detail = f"Actual file type '{ actual_mimetype } ' does not match allowed types" ,
293- )
294-
295- record .mimetype = actual_mimetype
296-
297- if is_multimedia (actual_mimetype ):
298- record .status = "postprocessing"
299- await db .commit ()
300- await db .refresh (record )
301- if queue :
302- await queue .enqueue (record .public_id )
303- else :
304- record .status = "completed"
305- record .completed_at = datetime .now (UTC )
306- await db .commit ()
307- await db .refresh (record )
308- else :
309- record .status = "in_progress"
310-
311- try :
312- await db .commit ()
313- await db .refresh (record )
314- except Exception :
315- await db .rollback ()
316- await db .refresh (record )
329+ record .status = "in_progress"
330+
331+ try :
332+ await db .commit ()
333+ await db .refresh (record )
334+ except Exception :
335+ await db .rollback ()
336+ await db .refresh (record )
317337
318338 return Response (
319339 status_code = status .HTTP_204_NO_CONTENT ,
@@ -366,26 +386,32 @@ async def tus_delete(upload_id: str, db: Annotated[AsyncSession, Depends(get_db)
366386
367387
368388@router .post ("/{upload_id}/complete" , response_model = schemas .UploadRecordResponse , name = "mark_complete" )
369- async def mark_complete (upload_id : str , db : Annotated [AsyncSession , Depends (get_db )]) -> models .UploadRecord :
389+ async def mark_complete (
390+ upload_id : str ,
391+ db : Annotated [AsyncSession , Depends (get_db )],
392+ queue : Annotated [ProcessingQueue | None , Depends (get_processing_queue )],
393+ token : Annotated [str , Query (description = "Upload token" )] = ...,
394+ ) -> models .UploadRecord :
370395 """
371396 Mark an upload as complete.
372397
373398 Args:
374399 upload_id (str): The public ID of the upload.
375400 db (AsyncSession): Database session.
401+ queue (ProcessingQueue | None): The processing queue for post-processing.
402+ token (str): The upload token string.
376403
377404 Returns:
378405 UploadRecord: The updated upload record.
379406
380407 """
381408 record : models .UploadRecord = await _get_upload_record (db , upload_id )
382- await _ensure_token (db , token_id = record . token_id , check_remaining = False )
409+ token_row : models . UploadToken = await _ensure_token (db , token_value = token , check_remaining = False )
383410
384- record .status = "completed"
385- record .completed_at = datetime .now (UTC )
386- await db .commit ()
387- await db .refresh (record )
388- return record
411+ if record .token_id != token_row .id :
412+ raise HTTPException (status_code = status .HTTP_403_FORBIDDEN , detail = "Upload does not belong to this token" )
413+
414+ return await _finalize_upload (db , record , queue )
389415
390416
391417@router .delete ("/{upload_id}/cancel" , response_model = dict , name = "cancel_upload" )
0 commit comments