From fa30aa7f24353268d9f9673353eaac3a813f4229 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:07:34 -0500 Subject: [PATCH 01/12] Fix async UDF event loop starvation under heavy load in Jupyter Async UDFs were running directly in uvicorn's event loop via asyncio.create_task, competing with connection handling under heavy concurrent load. This caused unresponsiveness when running from Jupyter notebooks where the event loop is shared. The fix introduces a dedicated event loop in a background thread for async UDF execution. Coroutines are submitted via run_coroutine_threadsafe() and awaited from the server loop, isolating UDF work from HTTP I/O while preserving cooperative async scheduling between UDFs. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 38 +++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index d9090b38f..468c3de91 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1000,6 +1000,15 @@ def __init__( self.log_level = log_level self.disable_metrics = disable_metrics + # Dedicated event loop for async UDF execution, isolated from the server loop + self._udf_loop = asyncio.new_event_loop() + self._udf_thread = threading.Thread( + target=self._udf_loop.run_forever, + daemon=True, + name='async-udf-loop', + ) + self._udf_thread.start() + # Configure logging self._configure_logging() @@ -1033,6 +1042,11 @@ def _configure_logging(self) -> None: # Prevent propagation to avoid duplicate or differently formatted messages self.logger.propagate = False + def shutdown(self) -> None: + """Shut down the dedicated UDF event loop.""" + self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) + self._udf_thread.join(timeout=5) + def get_uvicorn_log_config(self) -> Dict[str, Any]: """ Create uvicorn log config that matches the Application's logging format. @@ -1189,15 +1203,23 @@ async def __call__( func_info['colspec'], b''.join(data), ) - func_task = asyncio.create_task( - func(cancel_event, call_timer, *inputs) - if func_info['is_async'] - else to_thread( - lambda: asyncio.run( - func(cancel_event, call_timer, *inputs), + func_task: 'asyncio.Task[Any]' + if func_info['is_async']: + future = asyncio.run_coroutine_threadsafe( + func(cancel_event, call_timer, *inputs), + self._udf_loop, + ) + func_task = asyncio.create_task( + asyncio.wrap_future(future), # type: ignore[arg-type] + ) + else: + func_task = asyncio.create_task( + to_thread( + lambda: asyncio.run( + func(cancel_event, call_timer, *inputs), + ), ), - ), - ) + ) disconnect_task = asyncio.create_task( asyncio.sleep(int(1e9)) if ignore_cancel else cancel_on_disconnect(receive), From c0d280a985f03b0f396bcf7e225a90eac4f0773b Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:11:01 -0500 Subject: [PATCH 02/12] Ensure proper cancellation of async UDFs in dedicated loop Cancel the concurrent.futures.Future in the UDF loop on disconnect/timeout so the coroutine is interrupted promptly, not just at the next cancel_on_event row check. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 468c3de91..5be68f6c4 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -24,6 +24,7 @@ """ import argparse import asyncio +import concurrent.futures import contextvars import dataclasses import datetime @@ -1204,13 +1205,14 @@ async def __call__( ) func_task: 'asyncio.Task[Any]' + udf_future: 'Optional[concurrent.futures.Future[Any]]' = None if func_info['is_async']: - future = asyncio.run_coroutine_threadsafe( + udf_future = asyncio.run_coroutine_threadsafe( func(cancel_event, call_timer, *inputs), self._udf_loop, ) func_task = asyncio.create_task( - asyncio.wrap_future(future), # type: ignore[arg-type] + asyncio.wrap_future(udf_future), # type: ignore[arg-type] ) else: func_task = asyncio.create_task( @@ -1240,12 +1242,16 @@ async def __call__( for task in done: if task is disconnect_task: cancel_event.set() + if udf_future is not None: + udf_future.cancel() raise asyncio.CancelledError( 'Function call was cancelled by client disconnect', ) elif task is timeout_task: cancel_event.set() + if udf_future is not None: + udf_future.cancel() raise asyncio.TimeoutError( 'Function call was cancelled due to timeout', ) From 6f617d749cc1349ddb65607ccc2dc7b441307cb6 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:19:46 -0500 Subject: [PATCH 03/12] Fix create_task expecting coroutine, use ensure_future for wrapped future asyncio.create_task() requires a coroutine but asyncio.wrap_future() returns a Future. Use asyncio.ensure_future() which accepts both. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 5be68f6c4..40520e9bc 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1211,8 +1211,8 @@ async def __call__( func(cancel_event, call_timer, *inputs), self._udf_loop, ) - func_task = asyncio.create_task( - asyncio.wrap_future(udf_future), # type: ignore[arg-type] + func_task = asyncio.ensure_future( + asyncio.wrap_future(udf_future), ) else: func_task = asyncio.create_task( From efee0a2e4fe424d28c05dd219b197152c7faf919 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:29:05 -0500 Subject: [PATCH 04/12] Fix pandas dtype assertions for newer pandas StringDtype Newer pandas versions use StringDtype ('str') instead of 'object' for string columns. Detect the actual dtype at import time and use it in test assertions. Binary columns remain 'object'. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/tests/test_connection.py | 34 ++++++++++++++------------ 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/singlestoredb/tests/test_connection.py b/singlestoredb/tests/test_connection.py index ee392d06a..2ae5cf1d2 100755 --- a/singlestoredb/tests/test_connection.py +++ b/singlestoredb/tests/test_connection.py @@ -22,8 +22,10 @@ try: import pandas as pd has_pandas = True + _pd_str_dtype = str(pd.DataFrame({'a': ['x']}).dtypes['a']) except ImportError: has_pandas = False + _pd_str_dtype = 'object' class TestConnection(unittest.TestCase): @@ -1124,21 +1126,21 @@ def test_alltypes_pandas(self): ('timestamp', 'datetime64[us]'), ('timestamp_6', 'datetime64[us]'), ('year', 'float64'), - ('char_100', 'object'), + ('char_100', _pd_str_dtype), ('binary_100', 'object'), - ('varchar_200', 'object'), + ('varchar_200', _pd_str_dtype), ('varbinary_200', 'object'), - ('longtext', 'object'), - ('mediumtext', 'object'), - ('text', 'object'), - ('tinytext', 'object'), + ('longtext', _pd_str_dtype), + ('mediumtext', _pd_str_dtype), + ('text', _pd_str_dtype), + ('tinytext', _pd_str_dtype), ('longblob', 'object'), ('mediumblob', 'object'), ('blob', 'object'), ('tinyblob', 'object'), ('json', 'object'), - ('enum', 'object'), - ('set', 'object'), + ('enum', _pd_str_dtype), + ('set', _pd_str_dtype), ('bit', 'object'), ] @@ -1266,21 +1268,21 @@ def test_alltypes_no_nulls_pandas(self): ('timestamp', 'datetime64[us]'), ('timestamp_6', 'datetime64[us]'), ('year', 'int16'), - ('char_100', 'object'), + ('char_100', _pd_str_dtype), ('binary_100', 'object'), - ('varchar_200', 'object'), + ('varchar_200', _pd_str_dtype), ('varbinary_200', 'object'), - ('longtext', 'object'), - ('mediumtext', 'object'), - ('text', 'object'), - ('tinytext', 'object'), + ('longtext', _pd_str_dtype), + ('mediumtext', _pd_str_dtype), + ('text', _pd_str_dtype), + ('tinytext', _pd_str_dtype), ('longblob', 'object'), ('mediumblob', 'object'), ('blob', 'object'), ('tinyblob', 'object'), ('json', 'object'), - ('enum', 'object'), - ('set', 'object'), + ('enum', _pd_str_dtype), + ('set', _pd_str_dtype), ('bit', 'object'), ] From e4cc4c4f61819b3325618c12c7804fac6f3a9450 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 10:43:14 -0500 Subject: [PATCH 05/12] Call Application.shutdown() when replacing UDF server Prevents UDF event loop thread leaks when run_udf_app() is called repeatedly in Jupyter notebooks. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/apps/_python_udfs.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index e45718dec..7835307cb 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -10,8 +10,9 @@ if typing.TYPE_CHECKING: from ._uvicorn_util import AwaitableUvicornServer -# Keep track of currently running server +# Keep track of currently running server and app _running_server: 'typing.Optional[AwaitableUvicornServer]' = None +_running_app: typing.Optional[Application] = None # Maximum number of UDFs allowed MAX_UDFS_LIMIT = 10 @@ -21,7 +22,7 @@ async def run_udf_app( log_level: str = 'error', kill_existing_app_server: bool = True, ) -> UdfConnectionInfo: - global _running_server + global _running_server, _running_app from ._uvicorn_util import AwaitableUvicornServer try: @@ -38,6 +39,9 @@ async def run_udf_app( if _running_server is not None: await _running_server.shutdown() _running_server = None + if _running_app is not None: + _running_app.shutdown() + _running_app = None # Kill if any other process is occupying the port kill_process_by_port(app_config.listen_port) @@ -72,6 +76,7 @@ async def run_udf_app( if app_config.running_interactively: app.register_functions(replace=True) + _running_app = app _running_server = AwaitableUvicornServer(config) asyncio.create_task(_running_server.serve()) await _running_server.wait_for_startup() From a2087db3e240f8a03f29bc9649c433be5957b70d Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 11:01:30 -0500 Subject: [PATCH 06/12] Add pre-release tag builds with GitHub Release assets Tags matching v*-rc*, v*-test*, v*-alpha*, v*-beta* now trigger the full wheel build pipeline and create a pre-release GitHub Release with all wheels attached. Production releases also attach wheels to the existing release before publishing to PyPI. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/publish.yml | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index fe684d0c7..a77917107 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -7,6 +7,12 @@ name: Publish packages on: + push: + tags: + - 'v*-rc*' + - 'v*-test*' + - 'v*-alpha*' + - 'v*-beta*' release: types: [published] workflow_dispatch: @@ -157,9 +163,9 @@ jobs: runs-on: ubuntu-latest permissions: - id-token: write # Required for OIDC trusted publishing - actions: read # Required for actions/download-artifact - contents: read # Required for repository access + id-token: write # Required for OIDC trusted publishing + actions: read # Required for actions/download-artifact + contents: write # Required for gh release create/upload environment: name: publish @@ -184,8 +190,26 @@ jobs: name: artifacts-macOS path: dist + - name: Create GitHub Release (test tag) + if: github.event_name == 'push' + env: + GH_TOKEN: ${{ github.token }} + run: | + gh release create "${{ github.ref_name }}" \ + --prerelease \ + --generate-notes \ + --title "${{ github.ref_name }}" \ + dist/* + + - name: Upload assets to existing Release + if: github.event_name == 'release' + env: + GH_TOKEN: ${{ github.token }} + run: | + gh release upload "${{ github.event.release.tag_name }}" dist/* --clobber + - name: Publish to PyPI - if: ${{ github.event_name == 'release' || github.event.inputs.publish_pypi == 'true' }} + if: ${{ github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.publish_pypi == 'true') }} uses: pypa/gh-action-pypi-publish@release/v1 # - name: Publish Conda package From 0d8ef25c05e64b261294b4551c96ca046689d6a1 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 11:23:28 -0500 Subject: [PATCH 07/12] Propagate cancellation to UDF loop and prevent thread leak on failure - Cancel udf_future when func_task is in pending set after asyncio.wait - Cancel udf_future in finally block to ensure cleanup on any exit path - Wrap post-construction code in try/except to call app.shutdown() if validation, config, or registration fails after Application is created Co-Authored-By: Claude Opus 4.6 --- singlestoredb/apps/_python_udfs.py | 44 ++++++++++++++++------------- singlestoredb/functions/ext/asgi.py | 6 ++++ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/singlestoredb/apps/_python_udfs.py b/singlestoredb/apps/_python_udfs.py index 7835307cb..1039565c4 100644 --- a/singlestoredb/apps/_python_udfs.py +++ b/singlestoredb/apps/_python_udfs.py @@ -58,28 +58,32 @@ async def run_udf_app( log_level=log_level, ) - if not app.endpoints: - raise ValueError('You must define at least one function.') - if len(app.endpoints) > MAX_UDFS_LIMIT: - raise ValueError( - f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.', - ) - - config = uvicorn.Config( - app, - host='0.0.0.0', - port=app_config.listen_port, - log_config=app.get_uvicorn_log_config(), - ) + try: + if not app.endpoints: + raise ValueError('You must define at least one function.') + if len(app.endpoints) > MAX_UDFS_LIMIT: + raise ValueError( + f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.', + ) - # Register the functions only if the app is running interactively. - if app_config.running_interactively: - app.register_functions(replace=True) + config = uvicorn.Config( + app, + host='0.0.0.0', + port=app_config.listen_port, + log_config=app.get_uvicorn_log_config(), + ) - _running_app = app - _running_server = AwaitableUvicornServer(config) - asyncio.create_task(_running_server.serve()) - await _running_server.wait_for_startup() + # Register the functions only if the app is running interactively. + if app_config.running_interactively: + app.register_functions(replace=True) + + _running_app = app + _running_server = AwaitableUvicornServer(config) + asyncio.create_task(_running_server.serve()) + await _running_server.wait_for_startup() + except Exception: + app.shutdown() + raise print(f'Python UDF registered at {base_url}') diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 40520e9bc..8f4535d4d 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1238,6 +1238,9 @@ async def __call__( ) await cancel_all_tasks(pending) + if func_task in pending and udf_future is not None: + cancel_event.set() + udf_future.cancel() for task in done: if task is disconnect_task: @@ -1314,6 +1317,9 @@ async def __call__( await send(self.error_response_dict) finally: + if udf_future is not None: + cancel_event.set() + udf_future.cancel() await cancel_all_tasks(all_tasks) # Handle api reflection From 72b2e3e1ac1efd98350f4ec7f3c81adb404c2a0e Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 11:26:50 -0500 Subject: [PATCH 08/12] Remove checkout and --generate-notes from publish job gh release create doesn't need a git repo when not generating notes. Use --notes "" for empty release body with just assets attached. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index a77917107..c2165da7e 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -197,8 +197,8 @@ jobs: run: | gh release create "${{ github.ref_name }}" \ --prerelease \ - --generate-notes \ --title "${{ github.ref_name }}" \ + --notes "" \ dist/* - name: Upload assets to existing Release From 35e92d5329fc328379acdc8efd0a3d8bd8659ac0 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 12:00:30 -0500 Subject: [PATCH 09/12] Trigger fusion-docs only on release, not pre-release tags Changed from push tag trigger (v*.*.*) to release event so it only runs on published releases, not rc/test/alpha/beta tags. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/fusion-docs.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/fusion-docs.yml b/.github/workflows/fusion-docs.yml index c82840512..75a74ffa6 100644 --- a/.github/workflows/fusion-docs.yml +++ b/.github/workflows/fusion-docs.yml @@ -1,7 +1,6 @@ on: - push: - tags: - - 'v*.*.*' + release: + types: [published] name: Generate Fusion docs @@ -36,6 +35,6 @@ jobs: - name: Upload release asset run: | - gh release upload ${{ github.ref_name }} fusion-docs.zip + gh release upload ${{ github.event.release.tag_name }} fusion-docs.zip env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From b84d9ead3d191e9b7869a0d98ebadd923d77a001 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 12:03:30 -0500 Subject: [PATCH 10/12] Fix udf_future NameError and lazily initialize UDF event loop - Move udf_future initialization before input_handler['load']() to prevent NameError in finally block if parsing raises - Lazily create UDF event loop on first async UDF invocation instead of unconditionally in __init__, avoiding wasted resources for sync-only or metadata-only usage - Guard shutdown() against None loop/thread Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 32 ++++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 8f4535d4d..431f814f7 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1001,14 +1001,8 @@ def __init__( self.log_level = log_level self.disable_metrics = disable_metrics - # Dedicated event loop for async UDF execution, isolated from the server loop - self._udf_loop = asyncio.new_event_loop() - self._udf_thread = threading.Thread( - target=self._udf_loop.run_forever, - daemon=True, - name='async-udf-loop', - ) - self._udf_thread.start() + self._udf_loop: Optional[asyncio.AbstractEventLoop] = None + self._udf_thread: Optional[threading.Thread] = None # Configure logging self._configure_logging() @@ -1043,10 +1037,24 @@ def _configure_logging(self) -> None: # Prevent propagation to avoid duplicate or differently formatted messages self.logger.propagate = False + def _get_udf_loop(self) -> asyncio.AbstractEventLoop: + """Get or create the dedicated UDF event loop.""" + if self._udf_loop is None: + self._udf_loop = asyncio.new_event_loop() + self._udf_thread = threading.Thread( + target=self._udf_loop.run_forever, + daemon=True, + name='async-udf-loop', + ) + self._udf_thread.start() + return self._udf_loop + def shutdown(self) -> None: """Shut down the dedicated UDF event loop.""" - self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) - self._udf_thread.join(timeout=5) + if self._udf_loop is not None: + self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) + if self._udf_thread is not None: + self._udf_thread.join(timeout=5) def get_uvicorn_log_config(self) -> Dict[str, Any]: """ @@ -1196,6 +1204,7 @@ async def __call__( try: all_tasks = [] result = [] + udf_future: 'Optional[concurrent.futures.Future[Any]]' = None cancel_event = threading.Event() @@ -1205,11 +1214,10 @@ async def __call__( ) func_task: 'asyncio.Task[Any]' - udf_future: 'Optional[concurrent.futures.Future[Any]]' = None if func_info['is_async']: udf_future = asyncio.run_coroutine_threadsafe( func(cancel_event, call_timer, *inputs), - self._udf_loop, + self._get_udf_loop(), ) func_task = asyncio.ensure_future( asyncio.wrap_future(udf_future), From 48f148a8eb0b7ad48913a7cbe75904fced2190c4 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 14:08:01 -0500 Subject: [PATCH 11/12] Add checkout to publish job for gh release create gh release create requires a git repo to determine the repository context even without --generate-notes. Co-Authored-By: Claude Opus 4.6 --- .github/workflows/publish.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index c2165da7e..1a0e6d0a6 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -172,6 +172,8 @@ jobs: url: https://pypi.org/p/singlestoredb steps: + - uses: actions/checkout@v3 + - name: Download Linux wheels and sdist uses: actions/download-artifact@v4 with: From 35a66587eb1ef73ac694747cec99366909a2c95d Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Thu, 14 May 2026 15:18:49 -0500 Subject: [PATCH 12/12] Reset UDF loop state in shutdown() to allow safe reuse After stopping the event loop and joining the thread, set both _udf_loop and _udf_thread back to None so that _get_udf_loop() can safely recreate them if called after shutdown. Co-Authored-By: Claude Opus 4.6 --- singlestoredb/functions/ext/asgi.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/singlestoredb/functions/ext/asgi.py b/singlestoredb/functions/ext/asgi.py index 431f814f7..ea4db0bd2 100755 --- a/singlestoredb/functions/ext/asgi.py +++ b/singlestoredb/functions/ext/asgi.py @@ -1055,6 +1055,8 @@ def shutdown(self) -> None: self._udf_loop.call_soon_threadsafe(self._udf_loop.stop) if self._udf_thread is not None: self._udf_thread.join(timeout=5) + self._udf_loop = None + self._udf_thread = None def get_uvicorn_log_config(self) -> Dict[str, Any]: """