Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/fusion-docs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
on:
push:
tags:
- 'v*.*.*'
release:
types: [published]

name: Generate Fusion docs

Expand Down Expand Up @@ -36,6 +35,6 @@ jobs:

- name: Upload release asset
run: |
gh release upload ${{ github.ref_name }} fusion-docs.zip
gh release upload ${{ github.event.release.tag_name }} fusion-docs.zip
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
34 changes: 30 additions & 4 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
name: Publish packages

on:
push:
tags:
- 'v*-rc*'
- 'v*-test*'
- 'v*-alpha*'
- 'v*-beta*'
release:
types: [published]
workflow_dispatch:
Expand Down Expand Up @@ -157,15 +163,17 @@ jobs:
runs-on: ubuntu-latest

permissions:
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: read # Required for repository access
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: write # Required for gh release create/upload

environment:
name: publish
url: https://pypi.org/p/singlestoredb

steps:
- uses: actions/checkout@v3

- name: Download Linux wheels and sdist
uses: actions/download-artifact@v4
with:
Expand All @@ -184,8 +192,26 @@ jobs:
name: artifacts-macOS
path: dist

- name: Create GitHub Release (test tag)
if: github.event_name == 'push'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release create "${{ github.ref_name }}" \
--prerelease \
--title "${{ github.ref_name }}" \
--notes "" \
dist/*

- name: Upload assets to existing Release
if: github.event_name == 'release'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release upload "${{ github.event.release.tag_name }}" dist/* --clobber

- name: Publish to PyPI
if: ${{ github.event_name == 'release' || github.event.inputs.publish_pypi == 'true' }}
if: ${{ github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.publish_pypi == 'true') }}
uses: pypa/gh-action-pypi-publish@release/v1

# - name: Publish Conda package
Expand Down
51 changes: 30 additions & 21 deletions singlestoredb/apps/_python_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
if typing.TYPE_CHECKING:
from ._uvicorn_util import AwaitableUvicornServer

# Keep track of currently running server
# Keep track of currently running server and app
_running_server: 'typing.Optional[AwaitableUvicornServer]' = None
_running_app: typing.Optional[Application] = None

# Maximum number of UDFs allowed
MAX_UDFS_LIMIT = 10
Expand All @@ -21,7 +22,7 @@ async def run_udf_app(
log_level: str = 'error',
kill_existing_app_server: bool = True,
) -> UdfConnectionInfo:
global _running_server
global _running_server, _running_app
from ._uvicorn_util import AwaitableUvicornServer

try:
Expand All @@ -38,6 +39,9 @@ async def run_udf_app(
if _running_server is not None:
await _running_server.shutdown()
_running_server = None
if _running_app is not None:
_running_app.shutdown()
_running_app = None
Comment thread
cursor[bot] marked this conversation as resolved.

# Kill if any other process is occupying the port
kill_process_by_port(app_config.listen_port)
Expand All @@ -54,27 +58,32 @@ async def run_udf_app(
log_level=log_level,
)

if not app.endpoints:
raise ValueError('You must define at least one function.')
if len(app.endpoints) > MAX_UDFS_LIMIT:
raise ValueError(
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
)

config = uvicorn.Config(
app,
host='0.0.0.0',
port=app_config.listen_port,
log_config=app.get_uvicorn_log_config(),
)
try:
if not app.endpoints:
raise ValueError('You must define at least one function.')
if len(app.endpoints) > MAX_UDFS_LIMIT:
raise ValueError(
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
)

# Register the functions only if the app is running interactively.
if app_config.running_interactively:
app.register_functions(replace=True)
config = uvicorn.Config(
app,
host='0.0.0.0',
port=app_config.listen_port,
log_config=app.get_uvicorn_log_config(),
)

_running_server = AwaitableUvicornServer(config)
asyncio.create_task(_running_server.serve())
await _running_server.wait_for_startup()
# Register the functions only if the app is running interactively.
if app_config.running_interactively:
app.register_functions(replace=True)

_running_app = app
_running_server = AwaitableUvicornServer(config)
asyncio.create_task(_running_server.serve())
await _running_server.wait_for_startup()
except Exception:
app.shutdown()
raise

print(f'Python UDF registered at {base_url}')

Expand Down
60 changes: 52 additions & 8 deletions singlestoredb/functions/ext/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""
import argparse
import asyncio
import concurrent.futures
import contextvars
import dataclasses
import datetime
Expand Down Expand Up @@ -1000,6 +1001,9 @@ def __init__(
self.log_level = log_level
self.disable_metrics = disable_metrics

self._udf_loop: Optional[asyncio.AbstractEventLoop] = None
self._udf_thread: Optional[threading.Thread] = None

# Configure logging
self._configure_logging()

Expand Down Expand Up @@ -1033,6 +1037,27 @@ def _configure_logging(self) -> None:
# Prevent propagation to avoid duplicate or differently formatted messages
self.logger.propagate = False

def _get_udf_loop(self) -> asyncio.AbstractEventLoop:
"""Get or create the dedicated UDF event loop."""
if self._udf_loop is None:
self._udf_loop = asyncio.new_event_loop()
self._udf_thread = threading.Thread(
target=self._udf_loop.run_forever,
daemon=True,
name='async-udf-loop',
)
self._udf_thread.start()
return self._udf_loop

def shutdown(self) -> None:
"""Shut down the dedicated UDF event loop."""
if self._udf_loop is not None:
self._udf_loop.call_soon_threadsafe(self._udf_loop.stop)
if self._udf_thread is not None:
self._udf_thread.join(timeout=5)
Comment thread
kesmit13 marked this conversation as resolved.
self._udf_loop = None
self._udf_thread = None

def get_uvicorn_log_config(self) -> Dict[str, Any]:
"""
Create uvicorn log config that matches the Application's logging format.
Expand Down Expand Up @@ -1181,6 +1206,7 @@ async def __call__(
try:
all_tasks = []
result = []
udf_future: 'Optional[concurrent.futures.Future[Any]]' = None

cancel_event = threading.Event()

Expand All @@ -1189,15 +1215,23 @@ async def __call__(
func_info['colspec'], b''.join(data),
)

func_task = asyncio.create_task(
func(cancel_event, call_timer, *inputs)
if func_info['is_async']
else to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
func_task: 'asyncio.Task[Any]'
if func_info['is_async']:
udf_future = asyncio.run_coroutine_threadsafe(
func(cancel_event, call_timer, *inputs),
self._get_udf_loop(),
)
func_task = asyncio.ensure_future(
asyncio.wrap_future(udf_future),
)
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
cursor[bot] marked this conversation as resolved.
else:
func_task = asyncio.create_task(
to_thread(
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
),
),
),
)
)
disconnect_task = asyncio.create_task(
asyncio.sleep(int(1e9))
if ignore_cancel else cancel_on_disconnect(receive),
Expand All @@ -1214,16 +1248,23 @@ async def __call__(
)

await cancel_all_tasks(pending)
if func_task in pending and udf_future is not None:
cancel_event.set()
udf_future.cancel()

for task in done:
if task is disconnect_task:
cancel_event.set()
if udf_future is not None:
udf_future.cancel()
raise asyncio.CancelledError(
'Function call was cancelled by client disconnect',
)

elif task is timeout_task:
cancel_event.set()
if udf_future is not None:
udf_future.cancel()
raise asyncio.TimeoutError(
'Function call was cancelled due to timeout',
)
Expand Down Expand Up @@ -1286,6 +1327,9 @@ async def __call__(
await send(self.error_response_dict)

finally:
if udf_future is not None:
cancel_event.set()
udf_future.cancel()
Comment thread
cursor[bot] marked this conversation as resolved.
await cancel_all_tasks(all_tasks)

# Handle api reflection
Expand Down
34 changes: 18 additions & 16 deletions singlestoredb/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
try:
import pandas as pd
has_pandas = True
_pd_str_dtype = str(pd.DataFrame({'a': ['x']}).dtypes['a'])
except ImportError:
has_pandas = False
_pd_str_dtype = 'object'


class TestConnection(unittest.TestCase):
Expand Down Expand Up @@ -1124,21 +1126,21 @@ def test_alltypes_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'float64'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down Expand Up @@ -1266,21 +1268,21 @@ def test_alltypes_no_nulls_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'int16'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down
Loading