Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
run: uv run ruff check .

- name: Run tests
run: uv run pytest # --cov-fail-under=70 Temporarily disabled
run: uv run pytest --cov-fail-under=95

- name: Run mypy
run: uv run mypy .
File renamed without changes.
2 changes: 1 addition & 1 deletion app/core/admin/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def docs_link_formatter(model: Any, name: Any) -> Any:
return Markup(', '.join(links))


class VerificationRequestAdmin(ModelView, model=VerificationRequest):
class VerificationRequestAdmin(AdminAccessMixin, model=VerificationRequest):
column_list = [
VerificationRequest.id,
VerificationRequest.user_id,
Expand Down
6 changes: 6 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ class Settings(BaseSettings):
max_file_size_bytes: int = Field(alias='MAX_FILE_SIZE_BYTES')
secret_key: str = Field(alias='SECRET_KEY')
debug_mode: bool = Field(default=False, alias='DEBUG_MODE')
unverified_seller_limit: int = 3
verified_seller_limit: int = 100

@computed_field
def database_url(self) -> str:
return f'postgresql+asyncpg://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}'

@computed_field
def database_url_masked(self) -> str:
return f'postgresql+asyncpg://{self.db_user}:****@{self.db_host}:{self.db_port}/{self.db_name}'


settings = Settings()
10 changes: 10 additions & 0 deletions app/core/exception_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
InsufficientInventoryError,
NotFoundError,
PermissionDeniedError,
SellerLimitExceededError,
UserAlreadyExists,
VerificationRequestAlreadyExists,
)
Expand Down Expand Up @@ -76,3 +77,12 @@ async def verification_request_already_exists_handler(
status_code=status.HTTP_400_BAD_REQUEST,
content={'detail': str(exc) or 'Verification request already exists'},
)


async def seller_limit_exceeded_handler(
request: Request, exc: SellerLimitExceededError
) -> JSONResponse:
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={'detail': str(exc) or 'Seller limit exceeded'},
)
7 changes: 7 additions & 0 deletions app/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ class UserAlreadyExists(AppError):
"""User with such email already exists."""


class SellerLimitExceededError(AppError):
"""Seller product listing limit exceeded."""

def __init__(self, message: str = 'Seller product listing limit exceeded'):
super().__init__(message=message)


class CredentialsError(AppError):
"""Invalid credentials."""

Expand Down
8 changes: 4 additions & 4 deletions app/core/s3.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import logging
from collections.abc import AsyncGenerator
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

import aioboto3 # type: ignore
import structlog
from botocore.exceptions import ClientError

from app.core.config import settings

logger = logging.getLogger(__name__)
logger = structlog.get_logger(__name__)

session = aioboto3.Session()


@asynccontextmanager
async def get_s3_client() -> AsyncGenerator[Any, None]:
async def get_s3_client() -> AsyncIterator[Any]:
async with session.client(
's3',
endpoint_url=settings.minio_url,
Expand Down
9 changes: 6 additions & 3 deletions app/core/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ async def check_permission(
def check_ownership(user: User, obj: Any) -> None:
if user.role in (UserRole.ADMIN, UserRole.MODERATOR):
return
if not hasattr(obj, 'owner_id'):
raise ValueError(f'Object {type(obj)} does not have owner_id')
if obj.owner_id != user.id:
owner_attr = 'owner_id'
if not hasattr(obj, 'owner_id') and hasattr(obj, 'user_id'):
owner_attr = 'user_id'
if not hasattr(obj, owner_attr):
raise ValueError(f'Object {type(obj)} does not have owner_id or user_id')
if getattr(obj, owner_attr) != user.id:
raise PermissionDeniedError


Expand Down
6 changes: 6 additions & 0 deletions app/core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
insufficient_inventory_error_handler,
not_found_error_handler,
permission_denied_handler,
seller_limit_exceeded_handler,
user_already_exists_handler,
verification_request_already_exists_handler,
)
Expand All @@ -15,6 +16,7 @@
InsufficientInventoryError,
NotFoundError,
PermissionDeniedError,
SellerLimitExceededError,
UserAlreadyExists,
VerificationRequestAlreadyExists,
)
Expand All @@ -37,3 +39,7 @@ def setup_exception_handlers(app: FastAPI) -> None:
VerificationRequestAlreadyExists,
verification_request_already_exists_handler, # type: ignore[arg-type]
)
app.add_exception_handler(
SellerLimitExceededError,
seller_limit_exceeded_handler, # type: ignore[arg-type]
)
2 changes: 1 addition & 1 deletion app/services/inventory/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
SELLER_DEPENDENCY = Depends(
RoleChecker(
allowed_roles=[UserRole.SELLER, UserRole.SELLER_B2B],
required_verified=True,
required_verified=False,
)
)
ADMIN_DEPENDENCY = Depends(
Expand Down
6 changes: 4 additions & 2 deletions app/services/inventory/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ class ProductCreate(BaseModel):
class ProductUpdate(BaseModel):
name: str | None = None
description: str | None = None
price: Decimal | None = Field(gt=0, description='Price must be greater than 0')
price: Decimal | None = Field(
default=None, gt=0, description='Price must be greater than 0'
)
qty_available: int | None = Field(
ge=0, description='Quantity must be greater than or equal to 0'
default=None, ge=0, description='Quantity must be greater than or equal to 0'
)


Expand Down
20 changes: 19 additions & 1 deletion app/services/inventory/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
ConflictError,
InsufficientInventoryError,
NotFoundError,
SellerLimitExceededError,
)
from app.core.security import check_ownership
from app.services.inventory.models import Product, ProductStatus, Reservation
Expand All @@ -22,10 +23,26 @@
ReservationCreate,
)
from app.services.orders.models import OrderStatus
from app.services.user.models import User
from app.services.user.models import User, UserRole


class InventoryService:
@staticmethod
async def _check_seller_limit(session: AsyncSession, user: User) -> None:
"""Check if the user has exceeded their product listing limit."""
if user.role in (UserRole.ADMIN, UserRole.MODERATOR):
return
query = select(Product).where(Product.owner_id == user.id)
result = await session.execute(query)
product_count = len(result.scalars().all())
limit = settings.unverified_seller_limit
if user.role in (UserRole.SELLER, UserRole.SELLER_B2B) and user.is_verified:
limit = settings.verified_seller_limit
if product_count >= limit:
raise SellerLimitExceededError(
f'Limit of {limit} products reached for your account type'
)

@staticmethod
async def _get_product(
session: AsyncSession,
Expand Down Expand Up @@ -99,6 +116,7 @@ async def create_product(
product_data: ProductCreate,
current_user: User,
) -> Product:
await InventoryService._check_seller_limit(session, current_user)
new_product = Product(**product_data.model_dump())
new_product.owner_id = owner_id
session.add(new_product)
Expand Down
50 changes: 31 additions & 19 deletions app/services/inventory/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,36 @@ async def release_expired_reservations(ctx: dict) -> None:
expired_ids = expired_reservations.scalars().all()
if not expired_ids:
return
logger.info(f'Found {len(expired_ids)} expired reservations. Processing...')
logger.info('processing expired reservations', count=len(expired_ids))
for res_id in expired_ids:
res_result = await session.execute(
select(Reservation).with_for_update().where(Reservation.id == res_id)
)
reservation = res_result.scalar_one_or_none()
if reservation is None or reservation.status != OrderStatus.PENDING:
try:
async with session.begin_nested():
res_result = await session.execute(
select(Reservation)
.with_for_update()
.where(Reservation.id == res_id)
)
reservation = res_result.scalar_one_or_none()
if reservation is None or reservation.status != OrderStatus.PENDING:
continue
prod_result = await session.execute(
select(Product)
.with_for_update()
.where(Product.id == reservation.product_id)
)
product = prod_result.scalar_one_or_none()
if product:
product.qty_available += reservation.qty_reserved
reservation.status = OrderStatus.EXPIRED
if reservation.order_id is not None:
await cancel_order_by_system(session, reservation.order_id)
await session.commit()
logger.info('released reservation', reservation_id=res_id)
except Exception as e:
await session.rollback()
logger.error(
'failed to release reservation',
reservation_id=res_id,
error=str(e),
)
continue
prod_result = await session.execute(
select(Product)
.with_for_update()
.where(Product.id == reservation.product_id)
)
product = prod_result.scalar_one_or_none()
if product:
product.qty_available += reservation.qty_reserved
reservation.status = OrderStatus.EXPIRED
if reservation.order_id is not None:
await cancel_order_by_system(session, reservation.order_id)
await session.commit()
logger.info(f'Released reservation {res_id}')
1 change: 1 addition & 0 deletions app/services/media/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ImageStatus(StrEnum):
PENDING = 'pending'
ACTIVE = 'active'
INACTIVE = 'inactive'
FAILED = 'failed'


class ProductImage(Base):
Expand Down
45 changes: 30 additions & 15 deletions app/services/media/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ async def get_secure_file_path(
)
v_req = result_v.scalar_one_or_none()
if v_req and v_req.docs_url and doc_key:
return str(v_req.docs_url.get(doc_key))
doc_val = v_req.docs_url.get(doc_key)
return str(doc_val) if doc_val else None
elif target_type == 'product_image':
result_i = await session.execute(
select(ProductImage).where(ProductImage.id == target_id)
Expand Down Expand Up @@ -110,10 +111,12 @@ async def handle_minio_webhook(
event: MinioWebhookEvent,
arq_redis: Any = None,
) -> None:
"""Processes MinIO S3:ObjectCreated events with idempotency and robust errors."""
if not event.records:
return
for record in event.records:
if not record.event_name.startswith('s3:ObjectCreated:'):
logger.debug('skipping non-create event', event_name=record.event_name)
continue
object_key = unquote(record.s3.object.key)
result = await session.execute(
Expand All @@ -122,17 +125,29 @@ async def handle_minio_webhook(
.where(ProductImage.file_path == object_key)
)
image = result.scalar_one_or_none()
if image is not None and image.status == ImageStatus.PENDING:
# Point 3: Enqueue background sanitization task
if arq_redis:
await arq_redis.enqueue_job(
'sanitize_and_activate_image_task',
image_id=image.id,
bucket=settings.minio_bucket_name,
object_key=object_key,
)
logger.info('enqueued image sanitization task', key=object_key)
else:
logger.warning('arq_redis pool not provided to webhook handler')
else:
logger.warning('image not found or not in pending status')
if image is None:
logger.warning('image record not found in DB for S3 event', key=object_key)
continue
if image.status != ImageStatus.PENDING:
logger.info(
'skipping webhook: image not in pending status',
key=object_key,
current_status=image.status,
)
continue
if not arq_redis:
logger.error(
'CRITICAL: arq_redis pool missing, cannot enqueue sanitization',
key=object_key,
)
continue
await arq_redis.enqueue_job(
'sanitize_and_activate_image_task',
image_id=image.id,
bucket=settings.minio_bucket_name,
object_key=object_key,
)
logger.info(
'enqueued image sanitization task', image_id=image.id, key=object_key
)
await session.commit()
Loading