Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timedelta
from typing import List, Optional, Tuple

from sqlalchemy import and_, not_, or_, select
from sqlalchemy import and_, func, not_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload

Expand Down Expand Up @@ -365,6 +365,10 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel):
project=project,
run=run,
)
# FIXME: Fleet is not locked which may lead to duplicate instance_num.
# This is currently hard to fix without locking the fleet for entire provisioning duration.
# Processing should be done in multiple steps so that
# InstanceModel is created before provisioning.
instance_num = await _get_next_instance_num(
session=session,
fleet_model=fleet_model,
Expand Down Expand Up @@ -773,25 +777,11 @@ def _create_fleet_model_for_job(


async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel) -> int:
if len(fleet_model.instances) == 0:
# No instances means the fleet is not in the db yet, so don't lock.
return 0
async with get_locker(get_db().dialect_name).lock_ctx(
FleetModel.__tablename__, [fleet_model.id]
):
fleet_model = (
(
await session.execute(
select(FleetModel)
.where(FleetModel.id == fleet_model.id)
.options(joinedload(FleetModel.instances))
.execution_options(populate_existing=True)
)
)
.unique()
.scalar_one()
)
return len(fleet_model.instances)
res = await session.execute(
select(func.count(InstanceModel.id)).where(InstanceModel.fleet_id == fleet_model.id)
)
instance_count = res.scalar_one()
return instance_count


def _create_instance_model_for_job(
Expand Down
Loading