11import re
2+ import uuid
3+ from collections .abc import AsyncGenerator
4+ from contextlib import asynccontextmanager
25from typing import Dict , List , Optional
36
47import sqlalchemy .exc
5- from sqlalchemy import delete , select , update
8+ from sqlalchemy import select
69from sqlalchemy .ext .asyncio import AsyncSession
710
811from dstack ._internal .core .errors import (
1114 ServerClientError ,
1215)
1316from dstack ._internal .core .models .secrets import Secret
14- from dstack ._internal .server .models import DecryptedString , ProjectModel , SecretModel
15- from dstack ._internal .utils .logging import get_logger
16-
17- logger = get_logger (__name__ )
18-
17+ from dstack ._internal .server .db import get_db
18+ from dstack ._internal .server .models import DecryptedString , ProjectModel , SecretModel , UserModel
19+ from dstack ._internal .server .services import events
20+ from dstack ._internal .server .services .locking import get_locker
1921
2022_SECRET_NAME_REGEX = "^[A-Za-z0-9-_]{1,200}$"
2123_SECRET_VALUE_MAX_LENGTH = 5000
@@ -57,6 +59,7 @@ async def create_or_update_secret(
5759 project : ProjectModel ,
5860 name : str ,
5961 value : str ,
62+ user : UserModel ,
6063) -> Secret :
6164 _validate_secret (name = name , value = value )
6265 try :
@@ -65,13 +68,15 @@ async def create_or_update_secret(
6568 project = project ,
6669 name = name ,
6770 value = value ,
71+ user = user ,
6872 )
6973 except ResourceExistsError :
7074 secret_model = await update_secret (
7175 session = session ,
7276 project = project ,
7377 name = name ,
7478 value = value ,
79+ user = user ,
7580 )
7681 return secret_model_to_secret (secret_model , include_value = True )
7782
@@ -80,26 +85,24 @@ async def delete_secrets(
8085 session : AsyncSession ,
8186 project : ProjectModel ,
8287 names : List [str ],
88+ user : UserModel ,
8389):
84- existing_secrets_query = await session .execute (
85- select (SecretModel ).where (
86- SecretModel .project_id == project .id ,
87- SecretModel .name .in_ (names ),
88- )
89- )
90- existing_names = [s .name for s in existing_secrets_query .scalars ().all ()]
91- missing_names = set (names ) - set (existing_names )
92- if missing_names :
93- raise ResourceNotExistsError (f"Secrets not found: { ', ' .join (missing_names )} " )
94-
95- await session .execute (
96- delete (SecretModel ).where (
97- SecretModel .project_id == project .id ,
98- SecretModel .name .in_ (names ),
99- )
100- )
101- await session .commit ()
102- logger .info ("Deleted secrets %s in project %s" , names , project .name )
90+ async with get_project_secret_models_by_name_for_update (
91+ session = session , project = project , names = names
92+ ) as secret_models :
93+ existing_names = [s .name for s in secret_models ]
94+ missing_names = set (names ) - set (existing_names )
95+ if missing_names :
96+ raise ResourceNotExistsError (f"Secrets not found: { ', ' .join (missing_names )} " )
97+ for secret_model in secret_models :
98+ await session .delete (secret_model )
99+ events .emit (
100+ session ,
101+ "Secret deleted" ,
102+ actor = events .UserActor .from_user (user ),
103+ targets = [events .Target .from_model (secret_model )],
104+ )
105+ await session .commit ()
103106
104107
105108def secret_model_to_secret (secret_model : SecretModel , include_value : bool = False ) -> Secret :
@@ -142,20 +145,60 @@ async def get_project_secret_model_by_name(
142145 return res .scalar_one_or_none ()
143146
144147
148+ @asynccontextmanager
149+ async def get_project_secret_models_by_name_for_update (
150+ session : AsyncSession , project : ProjectModel , names : list [str ]
151+ ) -> AsyncGenerator [list [SecretModel ], None ]:
152+ """
153+ Fetch secrets from the database and lock them for update.
154+
155+ **NOTE**: commit changes to the database before exiting from this context manager,
156+ so that in-memory locks are only released after commit.
157+ """
158+ filters = [
159+ SecretModel .project_id == project .id ,
160+ SecretModel .name .in_ (names ),
161+ ]
162+ res = await session .execute (select (SecretModel .id ).where (* filters ))
163+ secret_ids = res .scalars ().all ()
164+ if not secret_ids :
165+ yield []
166+ else :
167+ async with get_locker (get_db ().dialect_name ).lock_ctx (
168+ SecretModel .__tablename__ , sorted (secret_ids )
169+ ):
170+ # Refetch after lock
171+ res = await session .execute (
172+ select (SecretModel )
173+ .where (SecretModel .id .in_ (secret_ids ), * filters )
174+ .with_for_update (key_share = True )
175+ .order_by (SecretModel .id ) # take locks in order
176+ )
177+ yield list (res .scalars ().all ())
178+
179+
145180async def create_secret (
146181 session : AsyncSession ,
147182 project : ProjectModel ,
148183 name : str ,
149184 value : str ,
185+ user : UserModel ,
150186) -> SecretModel :
151187 secret_model = SecretModel (
188+ id = uuid .uuid4 (),
152189 project_id = project .id ,
153190 name = name ,
154191 value = DecryptedString (plaintext = value ),
155192 )
156193 try :
157194 async with session .begin_nested ():
158195 session .add (secret_model )
196+ events .emit (
197+ session ,
198+ "Secret created" ,
199+ actor = events .UserActor .from_user (user ),
200+ targets = [events .Target .from_model (secret_model )],
201+ )
159202 except sqlalchemy .exc .IntegrityError :
160203 raise ResourceExistsError ()
161204 await session .commit ()
@@ -167,25 +210,23 @@ async def update_secret(
167210 project : ProjectModel ,
168211 name : str ,
169212 value : str ,
213+ user : UserModel ,
170214) -> SecretModel :
171- await session .execute (
172- update (SecretModel )
173- .where (
174- SecretModel .project_id == project .id ,
175- SecretModel .name == name ,
176- )
177- .values (
178- value = DecryptedString (plaintext = value ),
179- )
180- )
181- await session .commit ()
182- secret_model = await get_project_secret_model_by_name (
183- session = session ,
184- project = project ,
185- name = name ,
186- )
187- if secret_model is None :
188- raise ResourceNotExistsError ()
215+ async with get_project_secret_models_by_name_for_update (
216+ session = session , project = project , names = [name ]
217+ ) as secret_models :
218+ if not secret_models :
219+ raise ResourceNotExistsError ()
220+ secret_model = secret_models [0 ]
221+ if secret_model .value .get_plaintext_or_error () != value :
222+ secret_model .value = DecryptedString (plaintext = value )
223+ events .emit (
224+ session ,
225+ "Secret updated" ,
226+ actor = events .UserActor .from_user (user ),
227+ targets = [events .Target .from_model (secret_model )],
228+ )
229+ await session .commit ()
189230 return secret_model
190231
191232
0 commit comments