-
Notifications
You must be signed in to change notification settings - Fork 1
⚡ Bolt: Avoid thread pool overhead for small rule updates #182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1151,23 +1151,35 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: | |
|
|
||
| # Optimization 3: Parallelize batch processing | ||
| # Using 3 workers to speed up writes without hitting aggressive rate limits. | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | ||
| futures = { | ||
| executor.submit(process_batch, i, batch): i | ||
| for i, batch in enumerate(batches, 1) | ||
| } | ||
| if total_batches == 1: | ||
| # Avoid thread pool overhead for single batch (very common for small folders) | ||
| result = process_batch(1, batches[0]) | ||
| if result: | ||
| successful_batches += 1 | ||
| existing_rules.update(result) | ||
| render_progress_bar( | ||
| successful_batches, | ||
| total_batches, | ||
| f"Folder {sanitize_for_log(folder_name)}", | ||
| ) | ||
| else: | ||
| with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: | ||
| futures = { | ||
| executor.submit(process_batch, i, batch): i | ||
| for i, batch in enumerate(batches, 1) | ||
| } | ||
|
|
||
| for future in concurrent.futures.as_completed(futures): | ||
| result = future.result() | ||
| if result: | ||
| successful_batches += 1 | ||
| existing_rules.update(result) | ||
|
|
||
| render_progress_bar( | ||
| successful_batches, | ||
| total_batches, | ||
| f"Folder {sanitize_for_log(folder_name)}", | ||
| ) | ||
| for future in concurrent.futures.as_completed(futures): | ||
| result = future.result() | ||
| if result: | ||
| successful_batches += 1 | ||
| existing_rules.update(result) | ||
|
|
||
| render_progress_bar( | ||
| successful_batches, | ||
| total_batches, | ||
| f"Folder {sanitize_for_log(folder_name)}", | ||
| ) | ||
|
Comment on lines
+1154
to
+1182
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this optimization is a great improvement, it has introduced some code duplication. The logic for processing a batch result (updating counters, updating To improve maintainability and adhere to the DRY (Don't Repeat Yourself) principle, you can extract this common logic into a nested helper function. This makes the code cleaner and easier to modify in the future. def _handle_batch_result(result: Optional[List[str]]):
nonlocal successful_batches
if result:
successful_batches += 1
existing_rules.update(result)
render_progress_bar(
successful_batches,
total_batches,
f"Folder {sanitize_for_log(folder_name)}",
)
if total_batches == 1:
# Avoid thread pool overhead for single batch (very common for small folders)
result = process_batch(1, batches[0])
_handle_batch_result(result)
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {
executor.submit(process_batch, i, batch): i
for i, batch in enumerate(batches, 1)
}
for future in concurrent.futures.as_completed(futures):
result = future.result()
_handle_batch_result(result) |
||
|
|
||
| if successful_batches == total_batches: | ||
| if USE_COLORS: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| import unittest | ||
Check warningCode scanning / Pylint (reported by Codacy) Missing module docstring Warning test
Missing module docstring
Check warningCode scanning / Pylintpython3 (reported by Codacy) Missing module docstring Warning test
Missing module docstring
|
||
| from unittest.mock import MagicMock, patch | ||
| import concurrent.futures | ||
| import sys | ||
| import os | ||
|
|
||
| # pytest adds root to sys.path usually, but to be safe we can use relative import or assume root is in path | ||
Check warningCode scanning / Pylint (reported by Codacy) Line too long (107/100) Warning test
Line too long (107/100)
Check warningCode scanning / Pylintpython3 (reported by Codacy) Line too long (107/100) Warning test
Line too long (107/100)
|
||
| # if running via 'uv run pytest', root is in path. | ||
|
|
||
| import main | ||
|
|
||
| class TestPushRulesPerf(unittest.TestCase): | ||
Check warningCode scanning / Pylint (reported by Codacy) Missing class docstring Warning test
Missing class docstring
Check warningCode scanning / Pylintpython3 (reported by Codacy) Missing class docstring Warning test
Missing class docstring
Comment on lines
+7
to
+12
|
||
| def setUp(self): | ||
| self.mock_client = MagicMock() | ||
| self.mock_client.post.return_value.status_code = 200 | ||
| self.mock_client.post.return_value.raise_for_status = MagicMock() | ||
|
|
||
| @patch('concurrent.futures.as_completed') | ||
| @patch('concurrent.futures.ThreadPoolExecutor') | ||
| def test_single_batch_avoids_thread_pool(self, mock_executor, mock_as_completed): | ||
| """Test that single batch pushes avoid creating a ThreadPoolExecutor.""" | ||
| # Setup: < 500 rules (BATCH_SIZE is 500) | ||
| rules = [f"rule{i}.com" for i in range(100)] | ||
| existing_rules = set() | ||
|
|
||
| # Mock the executor context manager | ||
| mock_executor_instance = mock_executor.return_value | ||
| mock_executor_instance.__enter__.return_value = mock_executor_instance | ||
|
|
||
| # Mock submit to return a future | ||
| mock_future = MagicMock() | ||
| mock_future.result.return_value = ["rule0.com"] # Dummy result | ||
| mock_executor_instance.submit.return_value = mock_future | ||
|
|
||
| # Mock as_completed to return the future once (so loop runs once) | ||
| mock_as_completed.return_value = [mock_future] | ||
|
|
||
| # Execute | ||
| main.push_rules( | ||
| profile_id="test_profile", | ||
| folder_name="test_folder", | ||
| folder_id="test_id", | ||
| do=0, | ||
| status=1, | ||
| hostnames=rules, | ||
| existing_rules=existing_rules, | ||
| client=self.mock_client | ||
| ) | ||
|
|
||
| # Assert: ThreadPoolExecutor should NOT be called | ||
| mock_executor.assert_not_called() | ||
|
|
||
| # Verify functionality: client.post should be called once (via process_batch inside loop or direct) | ||
Check warningCode scanning / Pylint (reported by Codacy) Line too long (107/100) Warning test
Line too long (107/100)
Check warningCode scanning / Pylintpython3 (reported by Codacy) Line too long (107/100) Warning test
Line too long (107/100)
|
||
| self.assertEqual(self.mock_client.post.call_count, 1) | ||
|
|
||
| @patch('concurrent.futures.as_completed') | ||
| @patch('concurrent.futures.ThreadPoolExecutor') | ||
| def test_multi_batch_uses_thread_pool(self, mock_executor, mock_as_completed): | ||
| """Test that multiple batch pushes DO create a ThreadPoolExecutor.""" | ||
| # Setup: > 500 rules | ||
| rules = [f"rule{i}.com" for i in range(1000)] | ||
| existing_rules = set() | ||
|
|
||
| # Mock the executor context manager | ||
| mock_executor_instance = mock_executor.return_value | ||
| mock_executor_instance.__enter__.return_value = mock_executor_instance | ||
|
|
||
| # Mock submit to return a future | ||
| mock_future = MagicMock() | ||
| mock_future.result.return_value = ["some_rules"] | ||
| mock_executor_instance.submit.return_value = mock_future | ||
|
|
||
| # Mock as_completed to return 2 futures (2 batches) | ||
| mock_as_completed.return_value = [mock_future, mock_future] | ||
|
|
||
|
Comment on lines
+68
to
+75
|
||
| # Execute | ||
| main.push_rules( | ||
| profile_id="test_profile", | ||
| folder_name="test_folder", | ||
| folder_id="test_id", | ||
| do=0, | ||
| status=1, | ||
| hostnames=rules, | ||
| existing_rules=existing_rules, | ||
| client=self.mock_client | ||
| ) | ||
|
|
||
| # Assert: ThreadPoolExecutor SHOULD be called | ||
| mock_executor.assert_called() | ||
Check notice
Code scanning / Remark-lint (reported by Codacy)
Warn when references to undefined definitions are found. Note