diff --git a/services/git/git_clone_to_efs.py b/services/git/git_clone_to_efs.py index 6482b0d0..e8237011 100644 --- a/services/git/git_clone_to_efs.py +++ b/services/git/git_clone_to_efs.py @@ -1,7 +1,7 @@ import asyncio import os -from services.git.remove_stale_git_locks import remove_stale_git_locks +from services.git.resolve_git_locks import resolve_git_locks from utils.command.run_subprocess_async import run_subprocess_async from utils.error.handle_exceptions import handle_exceptions from utils.logging.logging_config import logger @@ -32,8 +32,8 @@ async def git_clone_to_efs(efs_dir: str, clone_url: str, branch: str): if os.path.exists(efs_git_dir): logger.info("EFS already has .git at %s, ensuring latest", efs_dir) - # Lambda can be killed mid-fetch, leaving lock files on EFS that persist forever - remove_stale_git_locks(efs_git_dir) + # Remove stale locks (>10min, crashed Lambda) and wait for fresh ones (concurrent Lambda) + await resolve_git_locks(efs_git_dir) # EFS persists across Lambda invocations; origin URL may contain expired token # Check if origin remote exists (may be missing if previous clone was interrupted) diff --git a/services/git/git_reset.py b/services/git/git_reset.py index 04167551..b16d6f11 100644 --- a/services/git/git_reset.py +++ b/services/git/git_reset.py @@ -1,34 +1,17 @@ -import asyncio import os -from services.git.remove_stale_git_locks import remove_stale_git_locks +from services.git.resolve_git_locks import resolve_git_locks from utils.command.run_subprocess_async import run_subprocess_async from utils.error.handle_exceptions import handle_exceptions from utils.logging.logging_config import logger -LOCK_POLL_SECONDS = 5 - @handle_exceptions(default_return_value=False, raise_on_error=False) async def git_reset(target_dir: str): git_dir = os.path.join(target_dir, ".git") - # Remove any stale locks left by killed Lambda invocations - remove_stale_git_locks(git_dir) - - # Wait for fresh index.lock — another process is actively writing - lock_file = os.path.join(git_dir, "index.lock") - waited = False - - while os.path.exists(lock_file): - waited = True - logger.info("Waiting for index.lock: %s", lock_file) - await asyncio.sleep(LOCK_POLL_SECONDS) - - # If we waited for another process, it already fetched+reset the same branch - if waited: - logger.info("index.lock released, repo already updated: %s", target_dir) - return True + # Remove stale locks (>10min, crashed Lambda) and wait for fresh ones (concurrent Lambda) + await resolve_git_locks(git_dir) # Reset to FETCH_HEAD after git_fetch has been called returncode, _ = await run_subprocess_async( diff --git a/services/git/remove_stale_git_locks.py b/services/git/remove_stale_git_locks.py deleted file mode 100644 index b4871065..00000000 --- a/services/git/remove_stale_git_locks.py +++ /dev/null @@ -1,36 +0,0 @@ -import os -import time - -from utils.error.handle_exceptions import handle_exceptions -from utils.logging.logging_config import logger - -STALE_LOCK_SECONDS = 600 # 10 minutes — matches Lambda max timeout - - -@handle_exceptions(default_return_value=None, raise_on_error=False) -def remove_stale_git_locks(git_dir: str): - """Remove *.lock files left by killed Lambda invocations. - - EFS persists across invocations, so a Lambda killed mid-fetch/reset leaves - lock files (shallow.lock, index.lock, config.lock, etc.) that block all - subsequent git operations on the same repo. - """ - for root, _, files in os.walk(git_dir): - for fname in files: - if not fname.endswith(".lock"): - continue - - lock_path = os.path.join(root, fname) - try: - age = time.time() - os.path.getmtime(lock_path) - except FileNotFoundError: - continue # Lock was released between walk and stat - - if age > STALE_LOCK_SECONDS: - try: - os.remove(lock_path) - logger.info( - "Removed stale %s (%.0fs old): %s", fname, age, lock_path - ) - except FileNotFoundError: - pass # Lock was released between stat and remove diff --git a/services/git/resolve_git_locks.py b/services/git/resolve_git_locks.py new file mode 100644 index 00000000..42b00f88 --- /dev/null +++ b/services/git/resolve_git_locks.py @@ -0,0 +1,65 @@ +import asyncio +import os +import time + +from utils.error.handle_exceptions import handle_exceptions +from utils.logging.logging_config import logger + +STALE_LOCK_SECONDS = 600 # 10 minutes — matches Lambda max timeout +LOCK_POLL_INTERVAL = 2 # seconds between polls +LOCK_WAIT_TIMEOUT = 60 # max seconds to wait for a fresh lock + + +@handle_exceptions(default_return_value=None, raise_on_error=False) +async def resolve_git_locks(git_dir: str): + """Clear git lock files: remove stale ones immediately, wait for fresh ones. + + - Lock >10min old (crashed Lambda): remove immediately + - Lock fresh (concurrent Lambda): poll every 2s up to 60s, force-remove on timeout + """ + deadline = time.time() + LOCK_WAIT_TIMEOUT + while time.time() < deadline: + has_fresh_locks = False + for root, _, files in os.walk(git_dir): + for fname in files: + if not fname.endswith(".lock"): + continue + + lock_path = os.path.join(root, fname) + try: + age = time.time() - os.path.getmtime(lock_path) + except FileNotFoundError: + continue # Lock was released between walk and stat + + if age > STALE_LOCK_SECONDS: + try: + os.remove(lock_path) + logger.info( + "Removed stale %s (%.0fs old): %s", fname, age, lock_path + ) + except FileNotFoundError: + pass + else: + has_fresh_locks = True + + if not has_fresh_locks: + return + + logger.info("Waiting for fresh git locks in %s", git_dir) + await asyncio.sleep(LOCK_POLL_INTERVAL) + + # Timeout - force remove remaining locks + for root, _, files in os.walk(git_dir): + for fname in files: + if not fname.endswith(".lock"): + continue + lock_path = os.path.join(root, fname) + try: + os.remove(lock_path) + logger.warning( + "Force-removed lock after %ds timeout: %s", + LOCK_WAIT_TIMEOUT, + lock_path, + ) + except FileNotFoundError: + pass diff --git a/services/git/test_git_reset.py b/services/git/test_git_reset.py index 4511eff6..d917a7e3 100644 --- a/services/git/test_git_reset.py +++ b/services/git/test_git_reset.py @@ -10,9 +10,7 @@ async def test_git_reset_success(): with patch( "services.git.git_reset.run_subprocess_async", new_callable=AsyncMock ) as mock_run, patch( - "services.git.git_reset.os.path.exists", return_value=False - ), patch( - "services.git.git_reset.remove_stale_git_locks" + "services.git.git_reset.resolve_git_locks", new_callable=AsyncMock ): mock_run.return_value = (0, "") result = await git_reset("/tmp/repo") @@ -26,9 +24,7 @@ async def test_git_reset_failure(): with patch( "services.git.git_reset.run_subprocess_async", new_callable=AsyncMock ) as mock_run, patch( - "services.git.git_reset.os.path.exists", return_value=False - ), patch( - "services.git.git_reset.remove_stale_git_locks" + "services.git.git_reset.resolve_git_locks", new_callable=AsyncMock ): mock_run.return_value = (1, "fatal: reset failed") result = await git_reset("/tmp/repo") @@ -38,36 +34,13 @@ async def test_git_reset_failure(): @pytest.mark.asyncio -async def test_git_reset_waits_for_fresh_lock_then_skips(): - """Fresh lock disappears after one poll — skip reset since other process did it.""" - exists_calls = [True, False] # Lock exists, then gone - - with patch( - "services.git.git_reset.run_subprocess_async", new_callable=AsyncMock - ) as mock_run, patch( - "services.git.git_reset.os.path.exists", side_effect=exists_calls - ), patch( - "services.git.git_reset.asyncio.sleep", new_callable=AsyncMock - ), patch( - "services.git.git_reset.remove_stale_git_locks" - ): - result = await git_reset("/tmp/repo") - - assert result is True - mock_run.assert_not_called() - - -@pytest.mark.asyncio -async def test_git_reset_calls_remove_stale_git_locks(): - """Verify remove_stale_git_locks is called with the .git directory.""" +async def test_git_reset_calls_resolve_git_locks(): with patch( "services.git.git_reset.run_subprocess_async", new_callable=AsyncMock ) as mock_run, patch( - "services.git.git_reset.os.path.exists", return_value=False - ), patch( - "services.git.git_reset.remove_stale_git_locks" - ) as mock_remove: + "services.git.git_reset.resolve_git_locks", new_callable=AsyncMock + ) as mock_clear: mock_run.return_value = (0, "") await git_reset("/tmp/repo") - mock_remove.assert_called_once_with("/tmp/repo/.git") + mock_clear.assert_called_once_with("/tmp/repo/.git") diff --git a/services/git/test_remove_stale_git_locks.py b/services/git/test_resolve_git_locks.py similarity index 51% rename from services/git/test_remove_stale_git_locks.py rename to services/git/test_resolve_git_locks.py index 2ad00438..177487c4 100644 --- a/services/git/test_remove_stale_git_locks.py +++ b/services/git/test_resolve_git_locks.py @@ -1,14 +1,18 @@ +import asyncio import os import time from unittest.mock import patch -from services.git.remove_stale_git_locks import ( +import pytest + +from services.git.resolve_git_locks import ( STALE_LOCK_SECONDS, - remove_stale_git_locks, + resolve_git_locks, ) -def test_removes_stale_shallow_lock(tmp_path): +@pytest.mark.asyncio +async def test_removes_stale_shallow_lock(tmp_path): git_dir = tmp_path / ".git" git_dir.mkdir() lock_file = git_dir / "shallow.lock" @@ -16,12 +20,13 @@ def test_removes_stale_shallow_lock(tmp_path): stale_mtime = time.time() - STALE_LOCK_SECONDS - 1 os.utime(lock_file, (stale_mtime, stale_mtime)) - remove_stale_git_locks(str(git_dir)) + await resolve_git_locks(str(git_dir)) assert not lock_file.exists() -def test_removes_stale_index_lock(tmp_path): +@pytest.mark.asyncio +async def test_removes_stale_index_lock(tmp_path): git_dir = tmp_path / ".git" git_dir.mkdir() lock_file = git_dir / "index.lock" @@ -29,12 +34,13 @@ def test_removes_stale_index_lock(tmp_path): stale_mtime = time.time() - STALE_LOCK_SECONDS - 1 os.utime(lock_file, (stale_mtime, stale_mtime)) - remove_stale_git_locks(str(git_dir)) + await resolve_git_locks(str(git_dir)) assert not lock_file.exists() -def test_removes_stale_lock_in_subdirectory(tmp_path): +@pytest.mark.asyncio +async def test_removes_stale_lock_in_subdirectory(tmp_path): git_dir = tmp_path / ".git" refs_dir = git_dir / "refs" / "heads" refs_dir.mkdir(parents=True) @@ -43,24 +49,13 @@ def test_removes_stale_lock_in_subdirectory(tmp_path): stale_mtime = time.time() - STALE_LOCK_SECONDS - 1 os.utime(lock_file, (stale_mtime, stale_mtime)) - remove_stale_git_locks(str(git_dir)) + await resolve_git_locks(str(git_dir)) assert not lock_file.exists() -def test_preserves_fresh_lock(tmp_path): - git_dir = tmp_path / ".git" - git_dir.mkdir() - lock_file = git_dir / "shallow.lock" - lock_file.touch() - # Fresh lock — just created, mtime is now - - remove_stale_git_locks(str(git_dir)) - - assert lock_file.exists() - - -def test_preserves_non_lock_files(tmp_path): +@pytest.mark.asyncio +async def test_preserves_non_lock_files(tmp_path): git_dir = tmp_path / ".git" git_dir.mkdir() config = git_dir / "config" @@ -68,39 +63,48 @@ def test_preserves_non_lock_files(tmp_path): stale_mtime = time.time() - STALE_LOCK_SECONDS - 1 os.utime(config, (stale_mtime, stale_mtime)) - remove_stale_git_locks(str(git_dir)) + await resolve_git_locks(str(git_dir)) assert config.exists() -def test_handles_lock_deleted_between_walk_and_stat(tmp_path): +@pytest.mark.asyncio +async def test_no_locks_returns_immediately(tmp_path): + git_dir = tmp_path / ".git" + git_dir.mkdir() + + await resolve_git_locks(str(git_dir)) + + +@pytest.mark.asyncio +async def test_waits_for_fresh_lock_release(tmp_path): git_dir = tmp_path / ".git" git_dir.mkdir() lock_file = git_dir / "shallow.lock" lock_file.touch() - # Simulate lock disappearing between os.walk and os.path.getmtime - original_getmtime = os.path.getmtime + async def remove_lock_after_delay(): + await asyncio.sleep(0.5) + lock_file.unlink() - def getmtime_raises(path): - if str(path).endswith(".lock"): - raise FileNotFoundError - return original_getmtime(path) + asyncio.create_task(remove_lock_after_delay()) - with patch( - "services.git.remove_stale_git_locks.time.time", return_value=time.time() - ): - with patch( - "services.git.remove_stale_git_locks.os.path.getmtime", - side_effect=getmtime_raises, - ): - remove_stale_git_locks(str(git_dir)) - # Should not raise + with patch("services.git.resolve_git_locks.LOCK_POLL_INTERVAL", 0.2): + await resolve_git_locks(str(git_dir)) + + assert not lock_file.exists() -def test_no_error_on_empty_git_dir(tmp_path): +@pytest.mark.asyncio +async def test_force_removes_fresh_lock_on_timeout(tmp_path): git_dir = tmp_path / ".git" git_dir.mkdir() + lock_file = git_dir / "shallow.lock" + lock_file.touch() - remove_stale_git_locks(str(git_dir)) - # Should not raise + with patch("services.git.resolve_git_locks.LOCK_POLL_INTERVAL", 0.1), patch( + "services.git.resolve_git_locks.LOCK_WAIT_TIMEOUT", 0.3 + ): + await resolve_git_locks(str(git_dir)) + + assert not lock_file.exists()