Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions backend/app/database/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ def db_create_images_table() -> None:
"""
)

# Ensure Memories feature columns exist on older databases
cursor.execute("PRAGMA table_info(images)")
existing_columns = {row[1] for row in cursor.fetchall()}

for column_name, column_type in [
("latitude", "REAL"),
("longitude", "REAL"),
("captured_at", "DATETIME"),
]:
if column_name not in existing_columns:
cursor.execute(f"ALTER TABLE images ADD COLUMN {column_name} {column_type}")

# Create indexes for Memories feature queries
cursor.execute("CREATE INDEX IF NOT EXISTS ix_images_latitude ON images(latitude)")
cursor.execute(
Expand Down Expand Up @@ -249,6 +261,30 @@ def db_get_all_images(tagged: Union[bool, None] = None) -> List[dict]:
conn.close()


def db_get_image_path_by_id(image_id: ImageId) -> Optional[ImagePath]:
"""
Get the filesystem path for a single image by its ID.

Args:
image_id: ID of the image to look up

Returns:
The image path as a string, or None if the image is not found or an error occurs.
"""
conn = _connect()
cursor = conn.cursor()

try:
cursor.execute("SELECT path FROM images WHERE id = ?", (image_id,))
row = cursor.fetchone()
return row[0] if row else None
except Exception as e:
logger.error(f"Error getting image path for id {image_id}: {e}")
return None
finally:
conn.close()


def db_get_untagged_images() -> List[UntaggedImageRecord]:
"""
Find all images that need AI tagging.
Expand Down
173 changes: 171 additions & 2 deletions backend/app/routes/images.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
from fastapi import APIRouter, HTTPException, Query, status
from typing import List, Optional
from app.database.images import db_get_all_images
from app.schemas.images import ErrorResponse
from app.database.images import db_get_all_images, db_get_image_path_by_id
from app.schemas.images import ErrorResponse, RenameImageRequest, RenameImageResponse
from app.utils.images import image_util_parse_metadata
from pydantic import BaseModel
from app.database.images import db_toggle_image_favourite_status
Expand Down Expand Up @@ -128,3 +129,171 @@ class ImageInfoResponse(BaseModel):
isTagged: bool
isFavourite: bool
tags: Optional[List[str]] = None


@router.put(
"/rename-image",
response_model=RenameImageResponse,
responses={code: {"model": ErrorResponse} for code in [400, 404, 500]},
)
def rename_image(request: RenameImageRequest):
"""
Rename an image file on disk based on its image ID.

The database record is updated separately by the sync microservice.
"""
try:
image_id = request.image_id.strip()
raw_name = request.rename
new_name = raw_name.strip()

if not image_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="Validation Error",
message="Image ID cannot be empty",
).model_dump(),
)

if not new_name:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="Validation Error",
message="New image name cannot be empty",
).model_dump(),
)

# Disallow filesystem separators and common invalid characters (especially on Windows),
# plus a few extra characters that are prone to shell/filesystem issues.
invalid_chars = set('<>:"/\\\\|?*!^')
if any(ch in invalid_chars for ch in new_name):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="Validation Error",
message="New image name contains invalid characters.",
).model_dump(),
)

# Additional Windows-safe validations
# Reject names that are '.' or '..' or that consist only of dots
if all(ch == "." for ch in new_name):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="Validation Error",
message="New image name cannot be '.' , '..' or consist only of dots.",
).model_dump(),
)

# Reject names that end with a dot or a space (Windows trims these)
if raw_name and (raw_name.endswith(".") or raw_name.endswith(" ")):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="Validation Error",
message="New image name cannot end with a dot or space.",
).model_dump(),
)

# Reject names whose base stem matches Windows reserved device names
reserved_device_names = {
"CON",
"PRN",
"NUL",
"AUX",
*[f"COM{i}" for i in range(1, 10)],
*[f"LPT{i}" for i in range(1, 10)],
}
base_stem = new_name.split(".", 1)[0]
if base_stem and base_stem.upper() in reserved_device_names:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="Validation Error",
message=(
"New image name cannot use a reserved Windows device name "
"(CON, PRN, NUL, AUX, COM1–COM9, LPT1–LPT9)."
),
).model_dump(),
)

image_path = db_get_image_path_by_id(image_id)
if not image_path:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ErrorResponse(
success=False,
error="Image Not Found",
message=f"Image with ID '{image_id}' does not exist.",
).model_dump(),
)

folder_path = os.path.dirname(image_path)
extension = os.path.splitext(image_path)[1]
new_file_path = os.path.join(folder_path, new_name + extension)

# Atomically reserve the target path to avoid TOCTOU between existence
# checks and rename operations.
placeholder_created = False
try:
try:
fd = os.open(
new_file_path,
os.O_CREAT | os.O_EXCL | os.O_WRONLY,
)
os.close(fd)
placeholder_created = True
except FileExistsError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ErrorResponse(
success=False,
error="File Exists",
message="A file with the new name already exists.",
).model_dump(),
)

# Perform the actual rename now that the name is reserved.
os.rename(image_path, new_file_path)
except HTTPException:
# Bubble up HTTP errors (e.g., file already exists).
raise
except Exception as e:
# Clean up the placeholder if the rename failed unexpectedly.
if placeholder_created:
try:
os.unlink(new_file_path)
except OSError as cleanup_err:
logger.error(
f"Failed to clean up placeholder file '{new_file_path}': {cleanup_err}"
)
# Re-raise for the outer exception handler to translate.
raise e

return RenameImageResponse(
success=True,
message=f"Successfully renamed image to '{new_name}{extension}'",
)

except HTTPException as e:
# Re-raise HTTPExceptions to preserve their status codes and details.
raise e
except Exception as e:
logger.error(f"Error renaming image: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=ErrorResponse(
success=False,
error="Internal server error",
message=f"Unable to rename image: {str(e)}",
).model_dump(),
)
14 changes: 14 additions & 0 deletions backend/app/schemas/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,17 @@ class DeleteThumbnailsResponse(BaseModel):
class GetThumbnailPathResponse(BaseModel):
success: bool
thumbnailPath: str


class RenameImageRequest(BaseModel):
"""Request model for renaming an image."""

image_id: str
rename: str


class RenameImageResponse(BaseModel):
"""Response model for image rename operations."""

success: bool
message: str
Loading