44
55from sqlalchemy import and_ , func , or_ , select
66from sqlalchemy .ext .asyncio import AsyncSession
7- from sqlalchemy .orm import aliased , contains_eager , joinedload , load_only
7+ from sqlalchemy .orm import aliased , contains_eager , joinedload , load_only , with_loader_criteria
88
99import dstack ._internal .server .services .services .autoscalers as autoscalers
1010from dstack ._internal .core .errors import ServerError
@@ -111,7 +111,15 @@ async def _process_next_run():
111111 ),
112112 ),
113113 )
114- .options (joinedload (RunModel .jobs ).load_only (JobModel .id ))
114+ .options (
115+ joinedload (RunModel .jobs ).load_only (JobModel .id ),
116+ # No need to lock finished jobs
117+ with_loader_criteria (
118+ JobModel ,
119+ JobModel .status .not_in (JobStatus .finished_statuses ()),
120+ include_aliases = True ,
121+ ),
122+ )
115123 .options (load_only (RunModel .id ))
116124 .order_by (RunModel .last_processed_at .asc ())
117125 .limit (1 )
@@ -126,7 +134,14 @@ async def _process_next_run():
126134 JobModel .run_id == run_model .id ,
127135 JobModel .id .not_in (job_lockset ),
128136 )
129- .options (load_only (JobModel .id ))
137+ .options (
138+ load_only (JobModel .id ),
139+ with_loader_criteria (
140+ JobModel ,
141+ JobModel .status .not_in (JobStatus .finished_statuses ()),
142+ include_aliases = True ,
143+ ),
144+ )
130145 .order_by (JobModel .id ) # take locks in order
131146 .with_for_update (skip_locked = True , key_share = True )
132147 )
0 commit comments