Add thread-safe cache operations and test coverage for cache optimization#138
Conversation
|
🚫 This pull request was requested to be canceled by Abhi Mehrotra (a GitHub user), so it was removed from the merge queue. See more details here.
|
Co-authored-by: abhimehro <84992105+abhimehro@users.noreply.github.com>
Co-authored-by: abhimehro <84992105+abhimehro@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
This PR attempts to add thread-safety to cache operations introduced in PR #137, which optimized sync_profile by skipping DNS validation for cached URLs in concurrent execution contexts. The changes add a threading.Lock to protect cache dictionary operations and include test coverage for the thread-safety improvements.
Changes:
- Added
threading.Lock(_cache_lock) to protect _cache dictionary operations - Wrapped cache checks and read/write operations in _gh_get(), warm_up_cache(), and _fetch_if_valid()
- Added test_cache_optimization.py with 6 test cases covering cache optimization and thread-safety scenarios
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| main.py | Added threading import, _cache_lock initialization, and lock protection for cache operations in _gh_get(), warm_up_cache(), and _fetch_if_valid() |
| tests/test_cache_optimization.py | New test file with 6 test cases for cache optimization and thread-safety |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
main.py
Outdated
| # Check cache with lock to ensure thread-safe read | ||
| with _cache_lock: | ||
| if url in _cache: | ||
| return _cache[url] | ||
|
|
||
| # Fetch data if not cached | ||
| # Explicitly let HTTPError propagate (no need to catch just to re-raise) | ||
| with _gh.stream("GET", url) as r: | ||
| r.raise_for_status() | ||
|
|
||
| # 1. Check Content-Length header if present | ||
| cl = r.headers.get("Content-Length") | ||
| if cl: | ||
| try: | ||
| if int(cl) > MAX_RESPONSE_SIZE: | ||
| raise ValueError( | ||
| f"Response too large from {sanitize_for_log(url)} " | ||
| f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)" | ||
| f"({int(cl) / (1024 * 1024):.2f} MB)" | ||
| ) | ||
| chunks.append(chunk) | ||
| except ValueError as e: | ||
| # Only catch the conversion error, let the size error propagate | ||
| if "Response too large" in str(e): | ||
| raise e | ||
| log.warning( | ||
| f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. " | ||
| "Falling back to streaming size check." | ||
| ) | ||
|
|
||
| try: | ||
| _cache[url] = json.loads(b"".join(chunks)) | ||
| except json.JSONDecodeError as e: | ||
| # 2. Stream and check actual size | ||
| chunks = [] | ||
| current_size = 0 | ||
| for chunk in r.iter_bytes(): | ||
| current_size += len(chunk) | ||
| if current_size > MAX_RESPONSE_SIZE: | ||
| raise ValueError( | ||
| f"Invalid JSON response from {sanitize_for_log(url)}" | ||
| ) from e | ||
| f"Response too large from {sanitize_for_log(url)} " | ||
| f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)" | ||
| ) | ||
| chunks.append(chunk) | ||
|
|
||
| return _cache[url] | ||
| try: | ||
| data = json.loads(b"".join(chunks)) | ||
| except json.JSONDecodeError as e: | ||
| raise ValueError( | ||
| f"Invalid JSON response from {sanitize_for_log(url)}" | ||
| ) from e | ||
|
|
||
| # Store in cache (write operation, needs lock) | ||
| with _cache_lock: | ||
| _cache[url] = data | ||
|
|
||
| return data |
There was a problem hiding this comment.
Critical race condition: Multiple threads can simultaneously pass the cache check and fetch the same URL, leading to redundant network requests and potential cache corruption.
The current implementation:
- Checks if URL is in cache (with lock)
- Releases lock
- Fetches data (without lock)
- Stores in cache (with lock)
If two threads call _gh_get() for the same uncached URL concurrently:
- Both pass the cache check at line 530
- Both fetch the data independently (lines 535-573)
- Both write to cache (lines 576-577)
This defeats the purpose of caching and can cause race conditions. Consider using a "check-lock-check" pattern or a separate lock per URL to ensure only one thread fetches each URL.
| def test_gh_get_thread_safety(self): | ||
| """ | ||
| Test that _gh_get handles concurrent access correctly. | ||
| When multiple threads try to fetch the same URL, only one should fetch | ||
| and the rest should get the cached result. | ||
| """ | ||
| test_url = "https://example.com/test.json" | ||
| test_data = {"group": {"group": "Test Folder"}, "domains": ["example.com"]} | ||
|
|
||
| class FetchTracker: | ||
| """Track fetch count using a class to avoid closure issues.""" | ||
| def __init__(self): | ||
| self.count = 0 | ||
|
|
||
| tracker = FetchTracker() | ||
|
|
||
| def mock_stream_get(method, url): | ||
| """Mock the streaming GET request.""" | ||
| tracker.count += 1 | ||
| mock_response = MagicMock() | ||
| mock_response.raise_for_status = MagicMock() | ||
| mock_response.headers = {"Content-Length": "100"} | ||
| # Return JSON bytes properly | ||
| json_bytes = b'{"group": {"group": "Test Folder"}, "domains": ["example.com"]}' | ||
| mock_response.iter_bytes = MagicMock(return_value=[json_bytes]) | ||
| mock_response.__enter__ = MagicMock(return_value=mock_response) | ||
| mock_response.__exit__ = MagicMock(return_value=False) | ||
| return mock_response | ||
|
|
||
| results = [] | ||
| errors = [] | ||
|
|
||
| def fetch_data(): | ||
| try: | ||
| data = main._gh_get(test_url) | ||
| results.append(data) | ||
| except Exception as e: | ||
| errors.append(e) | ||
|
|
||
| with patch.object(main._gh, 'stream', side_effect=mock_stream_get): | ||
| # Spawn multiple threads to fetch the same URL concurrently | ||
| threads = [] | ||
| for _ in range(5): | ||
| thread = threading.Thread(target=fetch_data) | ||
| threads.append(thread) | ||
| thread.start() | ||
|
|
||
| # Wait for all threads to complete | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| # Verify no errors occurred | ||
| self.assertEqual(len(errors), 0, f"Errors occurred: {errors}") | ||
| # Verify all threads got results | ||
| self.assertEqual(len(results), 5) | ||
| # All results should be the same | ||
| for result in results: | ||
| self.assertEqual(result, test_data) |
There was a problem hiding this comment.
Test doesn't verify its stated expectation: The test comment (lines 190-192) claims "only one should fetch and the rest should get the cached result," but the test doesn't verify this. The tracker.count is never checked.
With the current _gh_get implementation, all 5 threads will likely fetch simultaneously because they can all pass the cache check before any writes to the cache. The test should assert that tracker.count equals 1, but it currently only verifies that all threads got the same result (which could be true even with 5 fetches).
Add an assertion to verify the expected behavior.
| def test_cache_check_in_fetch_if_valid(self): | ||
| """ | ||
| Test the actual _fetch_if_valid logic used in sync_profile. | ||
| This is an integration test that verifies the optimization path. | ||
| """ | ||
| test_url = "https://example.com/test.json" | ||
| test_data = {"group": {"group": "Test Folder"}, "domains": ["example.com"]} | ||
|
|
||
| # Pre-populate cache to simulate warm_up_cache | ||
| with main._cache_lock: | ||
| main._cache[test_url] = test_data | ||
|
|
||
| # Mock validate_folder_url to track if it's called | ||
| with patch('main.validate_folder_url') as mock_validate: | ||
| with patch('main._gh_get', return_value=test_data): | ||
| # Simulate the logic in _fetch_if_valid | ||
| with main._cache_lock: | ||
| url_is_cached = test_url in main._cache | ||
|
|
||
| if url_is_cached: | ||
| result = main.fetch_folder_data(test_url) | ||
| else: | ||
| if main.validate_folder_url(test_url): | ||
| result = main.fetch_folder_data(test_url) | ||
| else: | ||
| result = None | ||
|
|
||
| # Verify validation was NOT called because URL was cached | ||
| mock_validate.assert_not_called() | ||
| self.assertEqual(result, test_data) |
There was a problem hiding this comment.
Test duplicates logic instead of testing actual implementation: Since _fetch_if_valid is a nested function inside sync_profile (main.py:1052), this test manually reimplements its logic rather than testing it through sync_profile. This creates maintenance burden and could miss integration issues.
Consider either:
- Extracting _fetch_if_valid to module level so it can be tested directly
- Testing this behavior through sync_profile with appropriate mocking
- Adding a comment explaining why the logic is duplicated
The current approach risks the test passing while the actual implementation has bugs.
tests/test_cache_optimization.py
Outdated
| def test_non_cached_url_calls_validation(self): | ||
| """ | ||
| Test that when a URL is NOT in the cache, validate_folder_url is called. | ||
| This ensures we don't skip validation for non-cached URLs. | ||
| """ | ||
| test_url = "https://example.com/test.json" | ||
| test_data = {"group": {"group": "Test Folder"}, "domains": ["example.com"]} | ||
|
|
||
| # Ensure URL is NOT in cache | ||
| self.assertNotIn(test_url, main._cache) | ||
|
|
||
| with patch('main.validate_folder_url', return_value=True): | ||
| with patch('main._gh_get', return_value=test_data): | ||
| # Call the helper function that's used in sync_profile | ||
| # This mimics what happens in _fetch_if_valid | ||
| with main._cache_lock: | ||
| url_in_cache = test_url in main._cache | ||
|
|
||
| if not url_in_cache: | ||
| # Should validate because URL is not cached | ||
| is_valid = main.validate_folder_url(test_url) | ||
| self.assertTrue(is_valid) | ||
|
|
||
| if is_valid: | ||
| result = main.fetch_folder_data(test_url) | ||
| self.assertEqual(result, test_data) |
There was a problem hiding this comment.
Test doesn't verify validate_folder_url was called: The test comment (line 55) states it should verify "validate_folder_url is called," but the test manually calls validate_folder_url itself (line 73) rather than verifying that fetch_folder_data or _fetch_if_valid calls it.
This test doesn't actually verify the integration behavior - it just checks that the test code itself calls the validation function. The mock on line 64 is never asserted, so we don't verify that the actual code path calls validation for non-cached URLs.
Add an assertion like mock_validate.assert_called_once_with(test_url) to verify the actual code calls validation, or restructure to test through the real code path.
main.py
Outdated
| if url in _cache: | ||
| return fetch_folder_data(url) |
There was a problem hiding this comment.
Potential deadlock: threading.Lock is not reentrant. If a thread holds _cache_lock and calls fetch_folder_data (line 1057), which calls _gh_get (line 699), _gh_get will try to acquire _cache_lock again (line 529), causing a deadlock.
Scenario:
- Thread acquires _cache_lock (line 1055)
- Finds URL in cache (line 1056)
- Calls fetch_folder_data while holding lock (line 1057)
- fetch_folder_data calls _gh_get (main.py:699)
- _gh_get tries to acquire _cache_lock (main.py:529)
- Deadlock - thread waits for lock it already holds
Solution: Use threading.RLock instead of threading.Lock, or restructure to release the lock before calling fetch_folder_data, or read directly from cache without calling fetch_folder_data.
| if url in _cache: | |
| return fetch_folder_data(url) | |
| cached = _cache.get(url) | |
| if cached is not None: | |
| return cached |
main.py
Outdated
| # Optimization: If we already have the content in cache, skip validation | ||
| # because the content was validated at the time of fetch (warm_up_cache). | ||
| if url in _cache: | ||
| return fetch_folder_data(url) | ||
| with _cache_lock: | ||
| if url in _cache: | ||
| return fetch_folder_data(url) |
There was a problem hiding this comment.
Misleading PR description and comment: The PR description claims this creates "atomic check-then-read" but it's actually a deadlock. The code checks cache with lock, then calls fetch_folder_data WHILE HOLDING THE LOCK (line 1057 is inside the 'with _cache_lock:' block on line 1055).
Since fetch_folder_data calls _gh_get, which tries to acquire _cache_lock again, this creates a deadlock. The PR description and code comment (line 1053-1054) are misleading about what this code does.
To actually make this atomic and avoid deadlock, either:
- Read directly from cache:
return _cache[url]instead of calling fetch_folder_data - Use threading.RLock (reentrant lock) instead of Lock
- Release lock before calling fetch_folder_data (but this isn't atomic)
tests/test_cache_optimization.py
Outdated
| 2. Non-cached URLs still get validated | ||
| 3. Cache operations are thread-safe | ||
| """ | ||
| import concurrent.futures |
There was a problem hiding this comment.
Import of 'concurrent' is not used.
| import concurrent.futures |
|
Hey @copilot, please apply changes based on the comments in this thread. Thanks again for your help with this! I really appreciate it. |
…copilot/sub-pr-137
Co-authored-by: abhimehro <84992105+abhimehro@users.noreply.github.com>
Co-authored-by: abhimehro <84992105+abhimehro@users.noreply.github.com>
Applied all changes from the review feedback. Fixed in commits dea62ff and 1b8c966: Thread-Safety Issues Fixed:
Test Issues Fixed:
All 47 tests pass. CodeQL: 0 alerts. |
87ca781
into
bolt-optimization-dns-cache-15293716235198344605
Plan for Fixing Thread-Safety Issues
_gh_get()using double-checked locking pattern_fetch_if_valid()by reading directly from cache instead of calling fetch_folder_datathreading.RLock(reentrant lock) instead ofLockto handle nested lock acquisitions safelyconcurrentimporttest_gh_get_thread_safetytest_non_cached_url_calls_validationto properly verify validation is calledtest_cache_check_in_fetch_if_validduplicates logic💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.