Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/fusion-docs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
on:
push:
tags:
- 'v*.*.*'
release:
types: [published]

name: Generate Fusion docs

Expand Down Expand Up @@ -36,6 +35,6 @@ jobs:

- name: Upload release asset
run: |
gh release upload ${{ github.ref_name }} fusion-docs.zip
gh release upload ${{ github.event.release.tag_name }} fusion-docs.zip
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
61 changes: 57 additions & 4 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
name: Publish packages

on:
push:
tags:
- 'v*-rc*'
- 'v*-test*'
- 'v*-alpha*'
- 'v*-beta*'
release:
types: [published]
workflow_dispatch:
Expand Down Expand Up @@ -69,6 +75,33 @@ jobs:
steps:
- uses: actions/checkout@v3

- name: Patch version for pre-release tag
if: github.event_name == 'push'
shell: python
run: |
import re, os, pathlib
tag = os.environ["GITHUB_REF_NAME"].lstrip("v")
patterns = [
(r"^(\d+\.\d+\.\d+)-test(\d+)$", r"\1.dev\2"),
(r"^(\d+\.\d+\.\d+)-alpha(\d+)$", r"\1a\2"),
(r"^(\d+\.\d+\.\d+)-beta(\d+)$", r"\1b\2"),
(r"^(\d+\.\d+\.\d+)-rc(\d+)$", r"\1rc\2"),
]
version = None
for pattern, repl in patterns:
m = re.match(pattern, tag)
if m:
version = re.sub(pattern, repl, tag)
break
if version is None:
print("No pre-release suffix, keeping version as-is")
else:
print(f"Patching version to: {version}")
p = pathlib.Path("pyproject.toml")
content = p.read_text()
content = re.sub(r'^version = ".*"', f'version = "{version}"', content, count=1, flags=re.MULTILINE)
p.write_text(content)

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
Expand Down Expand Up @@ -157,15 +190,17 @@ jobs:
runs-on: ubuntu-latest

permissions:
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: read # Required for repository access
id-token: write # Required for OIDC trusted publishing
actions: read # Required for actions/download-artifact
contents: write # Required for gh release create/upload

environment:
name: publish
url: https://pypi.org/p/singlestoredb

steps:
- uses: actions/checkout@v3

- name: Download Linux wheels and sdist
uses: actions/download-artifact@v4
with:
Expand All @@ -184,8 +219,26 @@ jobs:
name: artifacts-macOS
path: dist

- name: Create GitHub Release (test tag)
if: github.event_name == 'push'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release create "${{ github.ref_name }}" \
--prerelease \
--title "${{ github.ref_name }}" \
--notes "" \
dist/*

- name: Upload assets to existing Release
if: github.event_name == 'release'
env:
GH_TOKEN: ${{ github.token }}
run: |
gh release upload "${{ github.event.release.tag_name }}" dist/* --clobber

- name: Publish to PyPI
if: ${{ github.event_name == 'release' || github.event.inputs.publish_pypi == 'true' }}
if: ${{ github.event_name == 'release' || (github.event_name == 'workflow_dispatch' && github.event.inputs.publish_pypi == 'true') }}
uses: pypa/gh-action-pypi-publish@release/v1

# - name: Publish Conda package
Expand Down
64 changes: 59 additions & 5 deletions singlestoredb/functions/ext/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,56 @@ async def to_thread(
return await loop.run_in_executor(None, func_call)


async def _poll_cancel(cancel_event: threading.Event) -> None:
while not cancel_event.is_set():
await asyncio.sleep(0.1)


async def _cancellable_run(
cancel_event: threading.Event,
coro: Any,
) -> Any:
task = asyncio.create_task(coro)
cancel_check = asyncio.create_task(_poll_cancel(cancel_event))
done, pending = await asyncio.wait(
[task, cancel_check], return_when=asyncio.FIRST_COMPLETED,
)
for p in pending:
p.cancel()
if cancel_check in done:
task.cancel()
raise asyncio.CancelledError()
return task.result()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successful UDF result discarded when cancel races completion

Medium Severity

_cancellable_run unconditionally prioritizes cancellation over a successful result when both tasks complete in the same event loop iteration. asyncio.wait(return_when=FIRST_COMPLETED) can return both task and cancel_check in done if they finish simultaneously. Because if cancel_check in done is checked without first checking whether task also succeeded, a completed UDF result is silently discarded and replaced with CancelledError. Checking task in done first (and returning its result) would preserve the successfully-computed value.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 2f819f5. Configure here.



def _run_with_graceful_shutdown(coro: Any) -> Any:
"""Run a coroutine in a new event loop, draining callbacks before close.

Unlike asyncio.run(), this prevents 'Event loop is closed' errors from
libraries (httpx/anyio) that schedule cleanup callbacks during teardown.
"""
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
try:
pending = asyncio.all_tasks(loop)
if pending:
for task in pending:
task.cancel()
loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True),
)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
loop.call_soon(loop.stop)
loop.run_forever()
asyncio.set_event_loop(None)
loop.close()


# Use negative values to indicate unsigned ints / binary data / usec time precision
rowdat_1_type_map = {
'bool': ft.LONGLONG,
Expand Down Expand Up @@ -1190,11 +1240,12 @@ async def __call__(
)

func_task = asyncio.create_task(
func(cancel_event, call_timer, *inputs)
if func_info['is_async']
else to_thread(
Comment thread
cursor[bot] marked this conversation as resolved.
lambda: asyncio.run(
func(cancel_event, call_timer, *inputs),
to_thread(
lambda: _run_with_graceful_shutdown(
_cancellable_run(
cancel_event,
func(cancel_event, call_timer, *inputs),
),
),
),
)
Expand All @@ -1214,6 +1265,8 @@ async def __call__(
)

await cancel_all_tasks(pending)
if func_task in pending:
cancel_event.set()

for task in done:
if task is disconnect_task:
Expand Down Expand Up @@ -1286,6 +1339,7 @@ async def __call__(
await send(self.error_response_dict)

finally:
cancel_event.set()
await cancel_all_tasks(all_tasks)

# Handle api reflection
Expand Down
34 changes: 18 additions & 16 deletions singlestoredb/tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
try:
import pandas as pd
has_pandas = True
_pd_str_dtype = str(pd.DataFrame({'a': ['x']}).dtypes['a'])
except ImportError:
has_pandas = False
_pd_str_dtype = 'object'


class TestConnection(unittest.TestCase):
Expand Down Expand Up @@ -1124,21 +1126,21 @@ def test_alltypes_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'float64'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down Expand Up @@ -1266,21 +1268,21 @@ def test_alltypes_no_nulls_pandas(self):
('timestamp', 'datetime64[us]'),
('timestamp_6', 'datetime64[us]'),
('year', 'int16'),
('char_100', 'object'),
('char_100', _pd_str_dtype),
('binary_100', 'object'),
('varchar_200', 'object'),
('varchar_200', _pd_str_dtype),
('varbinary_200', 'object'),
('longtext', 'object'),
('mediumtext', 'object'),
('text', 'object'),
('tinytext', 'object'),
('longtext', _pd_str_dtype),
('mediumtext', _pd_str_dtype),
('text', _pd_str_dtype),
('tinytext', _pd_str_dtype),
('longblob', 'object'),
('mediumblob', 'object'),
('blob', 'object'),
('tinyblob', 'object'),
('json', 'object'),
('enum', 'object'),
('set', 'object'),
('enum', _pd_str_dtype),
('set', _pd_str_dtype),
('bit', 'object'),
]

Expand Down
Loading
Loading