diff --git a/.jules/bolt.md b/.jules/bolt.md index f54e01c..b8a2536 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -40,6 +40,6 @@ **Learning:** Re-validating resource properties (like DNS/IP) when using *cached content* is pure overhead. If the content is served from memory (proven safe at fetch time), checking the *current* state of the source is disconnected from the data being used. **Action:** When using a multi-stage pipeline (Warmup -> Process), ensure validation state persists alongside the data cache. Avoid clearing validation caches between stages if the data cache is not also cleared. -## 2026-01-28 - [Thread Pool Overhead on Small Batches] +## 2026-01-28 - Thread Pool Overhead on Small Batches **Learning:** Creating a `ThreadPoolExecutor` has measurable overhead (thread creation, context switching). For small tasks (e.g., a single batch of API requests), the overhead of the thread pool can exceed the benefit of parallelization, especially when the task itself is just a single synchronous I/O call. **Action:** Always check if the workload justifies the overhead of a thread pool. For single-item or very small workloads, bypass the pool and execute synchronously. diff --git a/main.py b/main.py index 8caa621..6c945cf 100644 --- a/main.py +++ b/main.py @@ -1149,12 +1149,10 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: log.debug(f"Response content: {sanitize_for_log(e.response.text)}") return None - # Optimization 3: Batch processing with conditional parallelism - # Single batch: run synchronously to avoid thread pool overhead (common for small folders). - # Multiple batches: use up to 3 workers to speed up writes without hitting aggressive rate limits. - if total_batches == 1: - # Avoid thread pool overhead for single batch (very common for small folders) - result = process_batch(1, batches[0]) + # Helper function to handle batch results and update progress + # This avoids code duplication between single-batch and multi-batch paths + def _handle_batch_result(result: Optional[List[str]]): + nonlocal successful_batches if result: successful_batches += 1 existing_rules.update(result) @@ -1163,6 +1161,14 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: total_batches, f"Folder {sanitize_for_log(folder_name)}", ) + + # Optimization 3: Batch processing with conditional parallelism + # Single batch: run synchronously to avoid thread pool overhead (common for small folders). + # Multiple batches: use up to 3 workers to speed up writes without hitting aggressive rate limits. + if total_batches == 1: + # Avoid thread pool overhead for single batch (very common for small folders) + result = process_batch(1, batches[0]) + _handle_batch_result(result) else: with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = { @@ -1172,15 +1178,7 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: for future in concurrent.futures.as_completed(futures): result = future.result() - if result: - successful_batches += 1 - existing_rules.update(result) - - render_progress_bar( - successful_batches, - total_batches, - f"Folder {sanitize_for_log(folder_name)}", - ) + _handle_batch_result(result) if successful_batches == total_batches: if USE_COLORS: diff --git a/tests/test_push_rules_perf.py b/tests/test_push_rules_perf.py index 08a5768..e3d96b6 100644 --- a/tests/test_push_rules_perf.py +++ b/tests/test_push_rules_perf.py @@ -1,12 +1,11 @@ +"""Tests for push_rules performance optimization.""" import unittest from unittest.mock import MagicMock, patch -# pytest adds root to sys.path usually, but to be safe we can use relative import or assume root is in path -# if running via 'uv run pytest', root is in path. - -import main class TestPushRulesPerf(unittest.TestCase): + """Test performance optimizations in push_rules.""" + def setUp(self): self.mock_client = MagicMock() self.mock_client.post.return_value.status_code = 200 @@ -14,8 +13,12 @@ def setUp(self): @patch('concurrent.futures.as_completed') @patch('concurrent.futures.ThreadPoolExecutor') - def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed): + def test_single_batch_avoids_thread_pool( + self, mock_executor, mock_as_completed + ): """Test that single batch pushes avoid creating a ThreadPoolExecutor.""" + import main + # Setup: < 500 rules (BATCH_SIZE is 500) rules = [f"rule{i}.com" for i in range(100)] existing_rules = set() @@ -26,7 +29,7 @@ def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed) # Mock submit to return a future mock_future = MagicMock() - mock_future.result.return_value = ["rule0.com"] # Dummy result + mock_future.result.return_value = ["rule0.com"] # Dummy result mock_executor_instance.submit.return_value = mock_future # Mock as_completed to return the future once (so loop runs once) @@ -47,13 +50,17 @@ def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed) # Assert: ThreadPoolExecutor should NOT be called mock_executor.assert_not_called() - # Verify functionality: client.post should be called once (via process_batch inside loop or direct) + # Verify: client.post should be called once self.assertEqual(self.mock_client.post.call_count, 1) @patch('concurrent.futures.as_completed') @patch('concurrent.futures.ThreadPoolExecutor') - def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed): + def test_multi_batch_uses_thread_pool( + self, mock_executor, mock_as_completed + ): """Test that multiple batch pushes DO create a ThreadPoolExecutor.""" + import main + # Setup: > 500 rules rules = [f"rule{i}.com" for i in range(1000)] existing_rules = set() @@ -62,13 +69,17 @@ def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed): mock_executor_instance = mock_executor.return_value mock_executor_instance.__enter__.return_value = mock_executor_instance - # Mock submit to return a future - mock_future = MagicMock() - mock_future.result.return_value = ["some_rules"] - mock_executor_instance.submit.return_value = mock_future + # Create distinct mock futures for each batch + mock_future1 = MagicMock() + mock_future1.result.return_value = ["some_rules_batch1"] + mock_future2 = MagicMock() + mock_future2.result.return_value = ["some_rules_batch2"] + + # Mock submit to return distinct futures + mock_executor_instance.submit.side_effect = [mock_future1, mock_future2] - # Mock as_completed to return 2 futures (2 batches) - mock_as_completed.return_value = [mock_future, mock_future] + # Mock as_completed to return 2 distinct futures (2 batches) + mock_as_completed.return_value = [mock_future1, mock_future2] # Execute main.push_rules( @@ -84,3 +95,5 @@ def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed): # Assert: ThreadPoolExecutor SHOULD be called mock_executor.assert_called() + # Verify submit was called 2 times (for 2 batches) + self.assertEqual(mock_executor_instance.submit.call_count, 2)