diff --git a/.jules/bolt.md b/.jules/bolt.md index c5f9902..f54e01c 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -39,3 +39,7 @@ ## 2026-01-27 - Redundant Validation for Cached Data **Learning:** Re-validating resource properties (like DNS/IP) when using *cached content* is pure overhead. If the content is served from memory (proven safe at fetch time), checking the *current* state of the source is disconnected from the data being used. **Action:** When using a multi-stage pipeline (Warmup -> Process), ensure validation state persists alongside the data cache. Avoid clearing validation caches between stages if the data cache is not also cleared. + +## 2026-01-28 - [Thread Pool Overhead on Small Batches] +**Learning:** Creating a `ThreadPoolExecutor` has measurable overhead (thread creation, context switching). For small tasks (e.g., a single batch of API requests), the overhead of the thread pool can exceed the benefit of parallelization, especially when the task itself is just a single synchronous I/O call. +**Action:** Always check if the workload justifies the overhead of a thread pool. For single-item or very small workloads, bypass the pool and execute synchronously. diff --git a/main.py b/main.py index 4b76614..3fd6189 100644 --- a/main.py +++ b/main.py @@ -1151,23 +1151,35 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: # Optimization 3: Parallelize batch processing # Using 3 workers to speed up writes without hitting aggressive rate limits. - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - futures = { - executor.submit(process_batch, i, batch): i - for i, batch in enumerate(batches, 1) - } + if total_batches == 1: + # Avoid thread pool overhead for single batch (very common for small folders) + result = process_batch(1, batches[0]) + if result: + successful_batches += 1 + existing_rules.update(result) + render_progress_bar( + successful_batches, + total_batches, + f"Folder {sanitize_for_log(folder_name)}", + ) + else: + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = { + executor.submit(process_batch, i, batch): i + for i, batch in enumerate(batches, 1) + } - for future in concurrent.futures.as_completed(futures): - result = future.result() - if result: - successful_batches += 1 - existing_rules.update(result) - - render_progress_bar( - successful_batches, - total_batches, - f"Folder {sanitize_for_log(folder_name)}", - ) + for future in concurrent.futures.as_completed(futures): + result = future.result() + if result: + successful_batches += 1 + existing_rules.update(result) + + render_progress_bar( + successful_batches, + total_batches, + f"Folder {sanitize_for_log(folder_name)}", + ) if successful_batches == total_batches: if USE_COLORS: diff --git a/tests/test_plan_details.py b/tests/test_plan_details.py index 12cacb2..73d4439 100644 --- a/tests/test_plan_details.py +++ b/tests/test_plan_details.py @@ -2,11 +2,10 @@ from unittest.mock import patch -import main - def test_print_plan_details_no_colors(capsys): """Test print_plan_details output when colors are disabled.""" + import main with patch("main.USE_COLORS", False): plan_entry = { "profile": "test_profile", @@ -29,6 +28,7 @@ def test_print_plan_details_no_colors(capsys): def test_print_plan_details_empty_folders(capsys): """Test print_plan_details with no folders.""" + import main with patch("main.USE_COLORS", False): plan_entry = {"profile": "test_profile", "folders": []} main.print_plan_details(plan_entry) @@ -42,6 +42,7 @@ def test_print_plan_details_empty_folders(capsys): def test_print_plan_details_with_colors(capsys): """Test print_plan_details output when colors are enabled.""" + import main with patch("main.USE_COLORS", True): plan_entry = { "profile": "test_profile", diff --git a/tests/test_push_rules_perf.py b/tests/test_push_rules_perf.py new file mode 100644 index 0000000..c7bf83f --- /dev/null +++ b/tests/test_push_rules_perf.py @@ -0,0 +1,89 @@ +import unittest +from unittest.mock import MagicMock, patch +import concurrent.futures +import sys +import os + +# pytest adds root to sys.path usually, but to be safe we can use relative import or assume root is in path +# if running via 'uv run pytest', root is in path. + +import main + +class TestPushRulesPerf(unittest.TestCase): + def setUp(self): + self.mock_client = MagicMock() + self.mock_client.post.return_value.status_code = 200 + self.mock_client.post.return_value.raise_for_status = MagicMock() + + @patch('concurrent.futures.as_completed') + @patch('concurrent.futures.ThreadPoolExecutor') + def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed): + """Test that single batch pushes avoid creating a ThreadPoolExecutor.""" + # Setup: < 500 rules (BATCH_SIZE is 500) + rules = [f"rule{i}.com" for i in range(100)] + existing_rules = set() + + # Mock the executor context manager + mock_executor_instance = mock_executor.return_value + mock_executor_instance.__enter__.return_value = mock_executor_instance + + # Mock submit to return a future + mock_future = MagicMock() + mock_future.result.return_value = ["rule0.com"] # Dummy result + mock_executor_instance.submit.return_value = mock_future + + # Mock as_completed to return the future once (so loop runs once) + mock_as_completed.return_value = [mock_future] + + # Execute + main.push_rules( + profile_id="test_profile", + folder_name="test_folder", + folder_id="test_id", + do=0, + status=1, + hostnames=rules, + existing_rules=existing_rules, + client=self.mock_client + ) + + # Assert: ThreadPoolExecutor should NOT be called + mock_executor.assert_not_called() + + # Verify functionality: client.post should be called once (via process_batch inside loop or direct) + self.assertEqual(self.mock_client.post.call_count, 1) + + @patch('concurrent.futures.as_completed') + @patch('concurrent.futures.ThreadPoolExecutor') + def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed): + """Test that multiple batch pushes DO create a ThreadPoolExecutor.""" + # Setup: > 500 rules + rules = [f"rule{i}.com" for i in range(1000)] + existing_rules = set() + + # Mock the executor context manager + mock_executor_instance = mock_executor.return_value + mock_executor_instance.__enter__.return_value = mock_executor_instance + + # Mock submit to return a future + mock_future = MagicMock() + mock_future.result.return_value = ["some_rules"] + mock_executor_instance.submit.return_value = mock_future + + # Mock as_completed to return 2 futures (2 batches) + mock_as_completed.return_value = [mock_future, mock_future] + + # Execute + main.push_rules( + profile_id="test_profile", + folder_name="test_folder", + folder_id="test_id", + do=0, + status=1, + hostnames=rules, + existing_rules=existing_rules, + client=self.mock_client + ) + + # Assert: ThreadPoolExecutor SHOULD be called + mock_executor.assert_called()