Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@
**Learning:** Re-validating resource properties (like DNS/IP) when using *cached content* is pure overhead. If the content is served from memory (proven safe at fetch time), checking the *current* state of the source is disconnected from the data being used.
**Action:** When using a multi-stage pipeline (Warmup -> Process), ensure validation state persists alongside the data cache. Avoid clearing validation caches between stages if the data cache is not also cleared.

## 2026-01-28 - [Thread Pool Overhead on Small Batches]
## 2026-01-28 - Thread Pool Overhead on Small Batches
**Learning:** Creating a `ThreadPoolExecutor` has measurable overhead (thread creation, context switching). For small tasks (e.g., a single batch of API requests), the overhead of the thread pool can exceed the benefit of parallelization, especially when the task itself is just a single synchronous I/O call.
**Action:** Always check if the workload justifies the overhead of a thread pool. For single-item or very small workloads, bypass the pool and execute synchronously.
28 changes: 13 additions & 15 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,12 +1149,10 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]:
log.debug(f"Response content: {sanitize_for_log(e.response.text)}")
return None

# Optimization 3: Batch processing with conditional parallelism
# Single batch: run synchronously to avoid thread pool overhead (common for small folders).
# Multiple batches: use up to 3 workers to speed up writes without hitting aggressive rate limits.
if total_batches == 1:
# Avoid thread pool overhead for single batch (very common for small folders)
result = process_batch(1, batches[0])
# Helper function to handle batch results and update progress
# This avoids code duplication between single-batch and multi-batch paths
def _handle_batch_result(result: Optional[List[str]]):
nonlocal successful_batches
if result:
successful_batches += 1
existing_rules.update(result)
Expand All @@ -1163,6 +1161,14 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]:
total_batches,
f"Folder {sanitize_for_log(folder_name)}",
)

# Optimization 3: Batch processing with conditional parallelism
# Single batch: run synchronously to avoid thread pool overhead (common for small folders).
# Multiple batches: use up to 3 workers to speed up writes without hitting aggressive rate limits.
if total_batches == 1:
# Avoid thread pool overhead for single batch (very common for small folders)
result = process_batch(1, batches[0])
_handle_batch_result(result)
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {
Expand All @@ -1172,15 +1178,7 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]:

for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
successful_batches += 1
existing_rules.update(result)

render_progress_bar(
successful_batches,
total_batches,
f"Folder {sanitize_for_log(folder_name)}",
)
_handle_batch_result(result)

if successful_batches == total_batches:
if USE_COLORS:
Expand Down
41 changes: 27 additions & 14 deletions tests/test_push_rules_perf.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
"""Tests for push_rules performance optimization."""
import unittest
from unittest.mock import MagicMock, patch

# pytest adds root to sys.path usually, but to be safe we can use relative import or assume root is in path
# if running via 'uv run pytest', root is in path.

import main

class TestPushRulesPerf(unittest.TestCase):
"""Test performance optimizations in push_rules."""

def setUp(self):
self.mock_client = MagicMock()
self.mock_client.post.return_value.status_code = 200
self.mock_client.post.return_value.raise_for_status = MagicMock()

@patch('concurrent.futures.as_completed')
@patch('concurrent.futures.ThreadPoolExecutor')
def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed):
def test_single_batch_avoids_thread_pool(
self, mock_executor, mock_as_completed
):
"""Test that single batch pushes avoid creating a ThreadPoolExecutor."""
import main

# Setup: < 500 rules (BATCH_SIZE is 500)
rules = [f"rule{i}.com" for i in range(100)]
existing_rules = set()
Expand All @@ -26,7 +29,7 @@ def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed)

# Mock submit to return a future
mock_future = MagicMock()
mock_future.result.return_value = ["rule0.com"] # Dummy result
mock_future.result.return_value = ["rule0.com"] # Dummy result
mock_executor_instance.submit.return_value = mock_future

# Mock as_completed to return the future once (so loop runs once)
Expand All @@ -47,13 +50,17 @@ def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed)
# Assert: ThreadPoolExecutor should NOT be called
mock_executor.assert_not_called()

# Verify functionality: client.post should be called once (via process_batch inside loop or direct)
# Verify: client.post should be called once
self.assertEqual(self.mock_client.post.call_count, 1)

@patch('concurrent.futures.as_completed')
@patch('concurrent.futures.ThreadPoolExecutor')
def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed):
def test_multi_batch_uses_thread_pool(
self, mock_executor, mock_as_completed
):
"""Test that multiple batch pushes DO create a ThreadPoolExecutor."""
import main

# Setup: > 500 rules
rules = [f"rule{i}.com" for i in range(1000)]
existing_rules = set()
Expand All @@ -62,13 +69,17 @@ def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed):
mock_executor_instance = mock_executor.return_value
mock_executor_instance.__enter__.return_value = mock_executor_instance

# Mock submit to return a future
mock_future = MagicMock()
mock_future.result.return_value = ["some_rules"]
mock_executor_instance.submit.return_value = mock_future
# Create distinct mock futures for each batch
mock_future1 = MagicMock()
mock_future1.result.return_value = ["some_rules_batch1"]
mock_future2 = MagicMock()
mock_future2.result.return_value = ["some_rules_batch2"]

# Mock submit to return distinct futures
mock_executor_instance.submit.side_effect = [mock_future1, mock_future2]

# Mock as_completed to return 2 futures (2 batches)
mock_as_completed.return_value = [mock_future, mock_future]
# Mock as_completed to return 2 distinct futures (2 batches)
mock_as_completed.return_value = [mock_future1, mock_future2]

# Execute
main.push_rules(
Expand All @@ -84,3 +95,5 @@ def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed):

# Assert: ThreadPoolExecutor SHOULD be called
mock_executor.assert_called()
# Verify submit was called 2 times (for 2 batches)
self.assertEqual(mock_executor_instance.submit.call_count, 2)
Loading