From 57857503995090fa8f76cedfbcbf9ddb69fb9570 Mon Sep 17 00:00:00 2001 From: Adithya Raja Date: Wed, 20 May 2026 18:22:12 +0530 Subject: [PATCH 1/6] fix: add semaphore-bounded fan-out to storage executor downloads Prevents storage_executor (96 workers) from being saturated when multiple users trigger large audio merges simultaneously. Caps concurrent chunk downloads at 20 and precache operations at 20. Fixes #7387 Co-Authored-By: Claude Opus 4.6 --- backend/utils/other/storage.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/backend/utils/other/storage.py b/backend/utils/other/storage.py index 6c97802d6ce..3a96450385b 100644 --- a/backend/utils/other/storage.py +++ b/backend/utils/other/storage.py @@ -3,12 +3,15 @@ import json import os import struct +import threading import wave from typing import List from concurrent.futures import as_completed from utils.executors import postprocess_executor, storage_executor +_STORAGE_FANOUT_SEMAPHORE = threading.Semaphore(20) + import opuslib from google.cloud import storage from google.oauth2 import service_account @@ -760,11 +763,20 @@ def download_single_chunk(timestamp: float) -> tuple[float, bytes | None]: individual_timestamps = [ts for ts in timestamps if round(ts, 3) not in ts_to_batch_path] unique_batch_paths = set(ts_to_batch_path.values()) - # Submit individual chunk downloads via shared storage executor - individual_futures = {storage_executor.submit(download_single_chunk, ts): ts for ts in individual_timestamps} + def _bounded_download_single_chunk(ts): + with _STORAGE_FANOUT_SEMAPHORE: + return download_single_chunk(ts) + + def _bounded_download_and_decode_blob(path): + with _STORAGE_FANOUT_SEMAPHORE: + return _download_and_decode_blob(path) - # Submit batch blob downloads (once per unique path) - batch_futures = {storage_executor.submit(_download_and_decode_blob, path): path for path in unique_batch_paths} + individual_futures = { + storage_executor.submit(_bounded_download_single_chunk, ts): ts for ts in individual_timestamps + } + batch_futures = { + storage_executor.submit(_bounded_download_and_decode_blob, path): path for path in unique_batch_paths + } # Collect individual results for future in as_completed(individual_futures): @@ -994,7 +1006,11 @@ def _cache_single(af): except Exception as e: logger.error(f"[PRECACHE] Error caching audio file {af.get('id')}: {e}") - futures = [storage_executor.submit(_cache_single, af) for af in audio_files] + def _bounded_cache_single(af): + with _STORAGE_FANOUT_SEMAPHORE: + return _cache_single(af) + + futures = [storage_executor.submit(_bounded_cache_single, af) for af in audio_files] for future in as_completed(futures): try: future.result() From 323b63d84b24047a6f6f105e517a61b8d8be6811 Mon Sep 17 00:00:00 2001 From: Adithya Raja Date: Wed, 20 May 2026 18:27:27 +0530 Subject: [PATCH 2/6] fix: bound sync precache fan-out with per-request semaphores Limits concurrent storage_executor submissions for audio precaching to 10 per request, preventing pool saturation during large syncs. Fixes #7387 Co-Authored-By: Claude Opus 4.6 --- backend/routers/sync.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/backend/routers/sync.py b/backend/routers/sync.py index 4cd3e72fe51..5aa0f8f65e5 100644 --- a/backend/routers/sync.py +++ b/backend/routers/sync.py @@ -202,12 +202,16 @@ def precache_conversation_audio_endpoint( if not audio_files: return {"status": "no_audio", "message": "No audio files in conversation"} - # Start background parallel pre-caching for all audio files using storage_executor + _precache_sem = threading.Semaphore(10) + def _precache_all_parallel(): logger.info(f"Pre-caching all {len(audio_files)} audio files for conversation {conversation_id} (parallel)") - futures = [ - submit_with_context(storage_executor, _precache_audio_file, uid, conversation_id, af) for af in audio_files - ] + + def _bounded_precache(af): + with _precache_sem: + return _precache_audio_file(uid, conversation_id, af) + + futures = [submit_with_context(storage_executor, _bounded_precache, af) for af in audio_files] for future in futures: try: future.result() @@ -300,14 +304,15 @@ def get_audio_signed_urls_endpoint( ) uncached_files.append(af) - # Cache remaining files in background if uncached_files: + _uncached_sem = threading.Semaphore(10) def _cache_uncached_parallel(): - futures = [ - submit_with_context(storage_executor, _precache_audio_file, uid, conversation_id, af) - for af in uncached_files - ] + def _bounded_precache(af): + with _uncached_sem: + return _precache_audio_file(uid, conversation_id, af) + + futures = [submit_with_context(storage_executor, _bounded_precache, af) for af in uncached_files] for future in futures: try: future.result() From cf1dd267512773bee71118cea4ca39ade2fec000 Mon Sep 17 00:00:00 2001 From: Adithya Raja Date: Wed, 20 May 2026 18:29:40 +0530 Subject: [PATCH 3/6] fix: bound knowledge graph rebuild fan-out with semaphore Limits concurrent memory processing in rebuild_knowledge_graph to 10, preventing storage_executor saturation when rebuilding large graphs. Fixes #7387 Co-Authored-By: Claude Opus 4.6 --- backend/utils/llm/knowledge_graph.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/backend/utils/llm/knowledge_graph.py b/backend/utils/llm/knowledge_graph.py index 02f13c5653f..05d363c0c5d 100644 --- a/backend/utils/llm/knowledge_graph.py +++ b/backend/utils/llm/knowledge_graph.py @@ -260,7 +260,13 @@ def process_memory(memory): all_nodes = [] all_edges = [] - futures = [storage_executor.submit(process_memory, m) for m in memories] + _kg_sem = threading.Semaphore(10) + + def _bounded_process_memory(m): + with _kg_sem: + return process_memory(m) + + futures = [storage_executor.submit(_bounded_process_memory, m) for m in memories] for future in as_completed(futures): try: result = future.result() From 6a847ad84396cda7eff10ad3de340f96e38ac9d1 Mon Sep 17 00:00:00 2001 From: Adithya Raja Date: Wed, 20 May 2026 18:29:53 +0530 Subject: [PATCH 4/6] test: add unit tests for storage executor fan-out semaphore Validates semaphore concurrency limiting, deadlock prevention, and AST-level presence of _STORAGE_FANOUT_SEMAPHORE in storage module. Co-Authored-By: Claude Opus 4.6 --- .../tests/unit/test_storage_fanout_limit.py | 130 ++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 backend/tests/unit/test_storage_fanout_limit.py diff --git a/backend/tests/unit/test_storage_fanout_limit.py b/backend/tests/unit/test_storage_fanout_limit.py new file mode 100644 index 00000000000..eb42d72d390 --- /dev/null +++ b/backend/tests/unit/test_storage_fanout_limit.py @@ -0,0 +1,130 @@ +"""Tests for bounded fan-out in storage executor submissions (issue #7387). + +Verifies that concurrent storage_executor submissions are capped by +threading.Semaphore to prevent queue spikes from unbounded fan-out. +""" + +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from unittest.mock import patch, MagicMock + +import pytest + + +class TestStorageFanoutSemaphore: + """Verify that _STORAGE_FANOUT_SEMAPHORE limits concurrent GCS operations.""" + + def test_semaphore_limits_concurrency(self): + """Fan-out of 50 tasks with semaphore(5) should never exceed 5 concurrent.""" + sem = threading.Semaphore(5) + max_concurrent = 0 + current = 0 + lock = threading.Lock() + + def work(i): + nonlocal max_concurrent, current + with sem: + with lock: + current += 1 + if current > max_concurrent: + max_concurrent = current + time.sleep(0.01) + with lock: + current -= 1 + return i + + executor = ThreadPoolExecutor(max_workers=20) + futures = [executor.submit(work, i) for i in range(50)] + for f in as_completed(futures): + f.result() + executor.shutdown(wait=True) + + assert max_concurrent <= 5, f"Max concurrent was {max_concurrent}, expected <= 5" + assert max_concurrent >= 2, "Semaphore should allow some parallelism" + + def test_semaphore_does_not_deadlock(self): + """Nested semaphore acquisition (precache -> chunk download) must not deadlock.""" + outer_sem = threading.Semaphore(3) + inner_sem = threading.Semaphore(3) + results = [] + lock = threading.Lock() + + def inner_work(val): + with inner_sem: + time.sleep(0.005) + return val * 2 + + def outer_work(i): + with outer_sem: + inner_executor = ThreadPoolExecutor(max_workers=2) + futs = [inner_executor.submit(inner_work, j) for j in range(3)] + vals = [f.result(timeout=5) for f in futs] + inner_executor.shutdown(wait=True) + with lock: + results.append(sum(vals)) + return i + + executor = ThreadPoolExecutor(max_workers=10) + futures = [executor.submit(outer_work, i) for i in range(6)] + for f in as_completed(futures): + f.result(timeout=10) + executor.shutdown(wait=True) + + assert len(results) == 6 + assert all(r == 6 for r in results) # 0*2 + 1*2 + 2*2 = 6 + + def test_storage_module_has_semaphore(self): + """The storage module must define a _STORAGE_FANOUT_SEMAPHORE. + + Uses ast parsing to avoid importing heavy native deps (opuslib, GCS). + """ + import ast + import pathlib + + storage_path = pathlib.Path(__file__).resolve().parents[2] / 'utils' / 'other' / 'storage.py' + tree = ast.parse(storage_path.read_text()) + + semaphore_found = False + semaphore_value = None + for node in ast.walk(tree): + if isinstance(node, ast.Assign): + for target in node.targets: + if isinstance(target, ast.Name) and target.id == '_STORAGE_FANOUT_SEMAPHORE': + semaphore_found = True + if isinstance(node.value, ast.Call) and node.value.args: + arg = node.value.args[0] + if isinstance(arg, ast.Constant): + semaphore_value = arg.value + + assert semaphore_found, "_STORAGE_FANOUT_SEMAPHORE not found in storage.py" + assert semaphore_value is not None, "Could not determine semaphore value" + assert 5 <= semaphore_value <= 30, f"Semaphore value {semaphore_value} outside expected range [5, 30]" + + def test_bounded_fan_out_caps_queue_depth(self): + """Simulate the storage pattern: N tasks submitted, only K run concurrently.""" + sem = threading.Semaphore(10) + task_count = 100 + peak_queue = 0 + running = 0 + lock = threading.Lock() + + def bounded_work(i): + nonlocal peak_queue, running + with sem: + with lock: + running += 1 + if running > peak_queue: + peak_queue = running + time.sleep(0.005) + with lock: + running -= 1 + return i + + executor = ThreadPoolExecutor(max_workers=50) + futures = [executor.submit(bounded_work, i) for i in range(task_count)] + results = [f.result() for f in as_completed(futures)] + executor.shutdown(wait=True) + + assert len(results) == task_count + assert peak_queue <= 10, f"Peak concurrent {peak_queue} exceeded semaphore limit 10" From 4a41fb6a2219ae2c90c3cdab4267d8626d196546 Mon Sep 17 00:00:00 2001 From: Adithya Raja Date: Wed, 20 May 2026 18:29:58 +0530 Subject: [PATCH 5/6] chore: add test_storage_fanout_limit to test runner Co-Authored-By: Claude Opus 4.6 --- backend/test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/test.sh b/backend/test.sh index afcc8870be2..014bb025ff8 100755 --- a/backend/test.sh +++ b/backend/test.sh @@ -119,6 +119,7 @@ pytest tests/unit/test_paywall_reconnect_gate.py -v pytest tests/unit/test_trial_metadata.py -v pytest tests/unit/test_vertex_ai_system_role.py -v pytest tests/unit/test_tts.py -v +pytest tests/unit/test_storage_fanout_limit.py -v # Fair-use integration tests (require Redis; skip gracefully if unavailable) if redis-cli ping >/dev/null 2>&1; then From 5484493d40bde43d577840341f80d19d98df6338 Mon Sep 17 00:00:00 2001 From: Adithya Raja Date: Sat, 30 May 2026 20:14:36 +0530 Subject: [PATCH 6/6] Fix Black formatting: remove extra blank lines Co-Authored-By: Claude Opus 4.6 --- backend/routers/sync.py | 1 - backend/utils/llm/knowledge_graph.py | 1 - backend/utils/other/storage.py | 2 -- 3 files changed, 4 deletions(-) diff --git a/backend/routers/sync.py b/backend/routers/sync.py index b83e649e3ae..d6061eb859f 100644 --- a/backend/routers/sync.py +++ b/backend/routers/sync.py @@ -205,7 +205,6 @@ def precache_conversation_audio_endpoint( if not audio_files: return {"status": "no_audio", "message": "No audio files in conversation"} - _precache_sem = threading.Semaphore(10) def _precache_all_parallel(): diff --git a/backend/utils/llm/knowledge_graph.py b/backend/utils/llm/knowledge_graph.py index c5fa0480223..271f4272ea7 100644 --- a/backend/utils/llm/knowledge_graph.py +++ b/backend/utils/llm/knowledge_graph.py @@ -262,7 +262,6 @@ def process_memory(memory): all_nodes = [] all_edges = [] - _kg_sem = threading.Semaphore(10) def _bounded_process_memory(m): diff --git a/backend/utils/other/storage.py b/backend/utils/other/storage.py index be4970e6018..3c893905c04 100644 --- a/backend/utils/other/storage.py +++ b/backend/utils/other/storage.py @@ -798,7 +798,6 @@ def _bounded_download_and_decode_blob(path): # Build unified job list: ('individual', ts) or ('batch', path) jobs = [('individual', ts) for ts in individual_timestamps] + [('batch', p) for p in unique_batch_paths] - def _submit_job(job): kind, key = job _STORAGE_CHUNK_SEM.acquire() @@ -1077,7 +1076,6 @@ def _cache_single(af): except Exception as e: logger.error(f"[PRECACHE] Error caching audio file {af.get('id')}: {e}") - def _bounded_cache_single(af): with _STORAGE_FANOUT_SEMAPHORE: return _cache_single(af)