Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ env:
WEAVIATE_128: 1.28.16
WEAVIATE_129: 1.29.11
WEAVIATE_130: 1.30.22
WEAVIATE_131: 1.31.20
WEAVIATE_132: 1.32.23
WEAVIATE_133: 1.33.10
WEAVIATE_134: 1.34.5
WEAVIATE_135: 1.35.16-efdedfa
WEAVIATE_136: 1.36.9-d905e6c
WEAVIATE_137: 1.37.0-rc.0-b313954.amd64

WEAVIATE_131: 1.31.22
WEAVIATE_132: 1.32.27
WEAVIATE_133: 1.33.18
WEAVIATE_134: 1.34.19
WEAVIATE_135: 1.35.15
WEAVIATE_136: 1.36.6-8edcf08.amd64
WEAVIATE_137: 1.37.0-dev-29d5c87.amd64

jobs:
lint-and-format:
name: Run Linter and Formatter
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
Expand All @@ -60,6 +60,7 @@ jobs:
type-checking:
name: Run Type Checking
runs-on: ubuntu-latest
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
Expand All @@ -80,6 +81,7 @@ jobs:
unit-tests:
name: Run Unit Tests
runs-on: ubuntu-latest
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
Expand All @@ -104,6 +106,7 @@ jobs:
proto-test:
name: Run importing protos test
runs-on: ubuntu-latest
timeout-minutes: 5
strategy:
fail-fast: false
matrix:
Expand All @@ -124,6 +127,7 @@ jobs:
integration-tests-embedded:
name: Run Integration Tests Embedded
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
Expand Down Expand Up @@ -153,6 +157,7 @@ jobs:
integration-tests:
name: Run Integration Tests
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -208,6 +213,7 @@ jobs:
journey-tests:
name: Run Journey Tests
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -243,6 +249,7 @@ jobs:
Codecov:
needs: [Unit-Tests, Integration-Tests]
runs-on: ubuntu-latest
timeout-minutes: 5
if: github.ref_name != 'main' && !github.event.pull_request.head.repo.fork
steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -273,6 +280,7 @@ jobs:
build-package:
name: Build package
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -297,6 +305,7 @@ jobs:
test-package:
needs: [build-package]
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -341,6 +350,7 @@ jobs:
name: Build and publish Python 🐍 distributions 📦 to PyPI and TestPyPI
needs: [integration-tests, unit-tests, lint-and-format, type-checking, test-package, proto-test]
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -366,6 +376,7 @@ jobs:
name: Create a GitHub Release on new tags
if: startsWith(github.ref, 'refs/tags')
runs-on: ubuntu-latest
timeout-minutes: 5
needs: [build-and-publish]
steps:
- name: Download build artifact to append to release
Expand Down
59 changes: 59 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import faulthandler
import os
import threading

import pytest

DEFAULT_TIMEOUT = 300 # 5 minutes

_timeout_timer: threading.Timer | None = None


def _get_timeout(item: pytest.Item) -> float:
marker = item.get_closest_marker("timeout")
if marker and marker.args:
return float(marker.args[0])
return float(DEFAULT_TIMEOUT)


def pytest_runtest_setup(item: pytest.Item) -> None:
"""Start a watchdog timer that dumps all thread stack traces on timeout.

Unlike pytest-timeout, this does NOT raise KeyboardInterrupt (which crashes
xdist workers and corrupts asyncio event loops). Instead it:
1. Writes the test name + all thread tracebacks directly to fd 2 (stderr).
With --capture=sys in pytest.ini, fd 2 is the real stderr (not captured),
so the output goes directly to the CI log even under xdist.
2. Calls os._exit(1) to terminate the worker process.

xdist will report 'node down: Not properly terminated' which is expected —
the diagnostic output will already be in the CI logs above that message.
"""
global _timeout_timer
timeout = _get_timeout(item)
if timeout <= 0:
return

def _on_timeout() -> None:
banner = "=" * 70
os.write(2, f"\n\n{banner}\n".encode())
os.write(2, f"TIMEOUT: {item.nodeid} exceeded {timeout}s\n".encode())
os.write(2, f"{banner}\n\n".encode())
# faulthandler needs a file object — wrap a dup of fd 2 to avoid closing it
with os.fdopen(os.dup(2), "w") as f:
faulthandler.dump_traceback(file=f)
f.flush()
os.write(2, f"\n{banner}\n\n".encode())
os._exit(1)

_timeout_timer = threading.Timer(timeout, _on_timeout)
_timeout_timer.daemon = True
_timeout_timer.start()


def pytest_runtest_teardown(item: pytest.Item, nextitem: pytest.Item | None) -> None:
"""Cancel the watchdog timer after each test completes."""
global _timeout_timer
if _timeout_timer is not None:
_timeout_timer.cancel()
_timeout_timer = None
4 changes: 4 additions & 0 deletions integration/test_batch_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ def test_add_ref_batch_with_tenant(client_factory: ClientFactory) -> None:
assert ret_obj.references["test"].objects[0].uuid == obj[0]


@pytest.mark.timeout(600)
@pytest.mark.parametrize(
"batching_method",
[
Expand Down Expand Up @@ -717,6 +718,7 @@ def test_non_existant_collection(client_factory: ClientFactory) -> None:
# not, so we do not check for errors here


@pytest.mark.timeout(600)
def test_number_of_stored_results_in_batch(client_factory: ClientFactory) -> None:
client, name = client_factory()
with client.batch.dynamic() as batch:
Expand Down Expand Up @@ -816,6 +818,7 @@ def test_references_with_to_uuids(client_factory: ClientFactory) -> None:


@pytest.mark.asyncio
@pytest.mark.timeout(600)
async def test_add_one_hundred_thousand_objects_async_client(
async_client_factory: AsyncClientFactory,
) -> None:
Expand Down Expand Up @@ -846,6 +849,7 @@ async def test_add_one_hundred_thousand_objects_async_client(
await client.collections.delete(name)


@pytest.mark.timeout(600)
def test_add_one_hundred_thousand_objects_sync_client(
client_factory: ClientFactory,
) -> None:
Expand Down
3 changes: 3 additions & 0 deletions integration/test_collection_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ def test_non_existant_collection(collection_factory_get: CollectionFactoryGet) -


@pytest.mark.asyncio
@pytest.mark.timeout(600)
async def test_batch_one_hundred_thousand_objects_async_collection(
batch_collection_async: BatchCollectionAsync,
) -> None:
Expand Down Expand Up @@ -298,6 +299,7 @@ async def test_batch_one_hundred_thousand_objects_async_collection(


@pytest.mark.asyncio
@pytest.mark.timeout(600)
async def test_ingest_one_hundred_thousand_data_objects_async(
batch_collection_async: BatchCollectionAsync,
) -> None:
Expand All @@ -319,6 +321,7 @@ async def test_ingest_one_hundred_thousand_data_objects_async(
assert len(results.errors) == 0, [obj.message for obj in results.errors.values()]


@pytest.mark.timeout(600)
def test_ingest_one_hundred_thousand_data_objects(
batch_collection: BatchCollection,
) -> None:
Expand Down
48 changes: 47 additions & 1 deletion mock_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import time
from concurrent import futures
from typing import Generator, Mapping
from typing import AsyncGenerator, Generator, Mapping

import grpc
import pytest
Expand Down Expand Up @@ -141,6 +141,16 @@ def weaviate_client(
client.close()


@pytest.fixture(scope="function")
async def weaviate_client_async(
weaviate_mock: HTTPServer, start_grpc_server: grpc.Server
) -> AsyncGenerator[weaviate.WeaviateAsyncClient, None]:
client = weaviate.use_async_with_local(port=MOCK_PORT, host=MOCK_IP, grpc_port=MOCK_PORT_GRPC)
await client.connect()
yield client
await client.close()


@pytest.fixture(scope="function")
def weaviate_timeouts_client(
weaviate_timeouts_mock: HTTPServer, start_grpc_server: grpc.Server
Expand Down Expand Up @@ -368,3 +378,39 @@ def forbidden(
service = MockForbiddenWeaviateService()
weaviate_pb2_grpc.add_WeaviateServicer_to_server(service, start_grpc_server)
return weaviate_client.collections.use("ForbiddenCollection")


class MockWeaviateService(weaviate_pb2_grpc.WeaviateServicer):
def BatchStream(
self,
request_iterator: Generator[batch_pb2.BatchStreamRequest, None, None],
context: grpc.ServicerContext,
) -> Generator[batch_pb2.BatchStreamReply, None, None]:
while True:
if context.is_active():
time.sleep(0.1)
else:
raise grpc.RpcError(grpc.StatusCode.DEADLINE_EXCEEDED, "Deadline exceeded")


@pytest.fixture(scope="function")
def stream_cancel(
weaviate_client: weaviate.WeaviateClient,
weaviate_mock: HTTPServer,
start_grpc_server: grpc.Server,
) -> Generator[weaviate.collections.Collection, None, None]:
name = "StreamCancelCollection"
weaviate_mock.expect_request(f"/v1/schema/{name}").respond_with_response(
Response(status=404)
) # skips __create_batch_reset vectorizer logic
weaviate_pb2_grpc.add_WeaviateServicer_to_server(MockWeaviateService(), start_grpc_server)
client = weaviate.connect_to_local(
port=MOCK_PORT,
host=MOCK_IP,
grpc_port=MOCK_PORT_GRPC,
additional_config=weaviate.classes.init.AdditionalConfig(
timeout=weaviate.classes.init.Timeout(insert=1)
),
)
yield client.collections.use(name)
client.close()
3 changes: 2 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[pytest]
addopts = -m 'not profiling' --benchmark-skip -l
addopts = -m 'not profiling' --benchmark-skip -l --capture=sys --max-worker-restart=3
markers =
profiling: marks tests that can be profiled
timeout: marks tests with a custom timeout in seconds (default: 300)
asyncio_default_fixture_loop_scope = function
90 changes: 90 additions & 0 deletions test/test_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""Tests for the custom per-test timeout mechanism in conftest.py.

Uses subprocess because the timeout mechanism calls os._exit(1).
"""

import subprocess
import sys
import textwrap
from pathlib import Path

PROJECT_ROOT = Path(__file__).parent.parent


def _run_pytest(tmp_path: Path, test_code: str, *extra_args: str) -> subprocess.CompletedProcess:
"""Run pytest in a subprocess with a copy of our timeout conftest."""
(tmp_path / "conftest.py").write_text((PROJECT_ROOT / "conftest.py").read_text())
(tmp_path / "pytest.ini").write_text(
"[pytest]\naddopts = --capture=sys --max-worker-restart=0\nmarkers =\n timeout: custom timeout\n"
)
(tmp_path / "test_it.py").write_text(textwrap.dedent(test_code))
return subprocess.run(
[
sys.executable,
"-m",
"pytest",
"-v",
"-n",
"auto",
"--dist",
"loadgroup",
"test_it.py",
*extra_args,
],
capture_output=True,
text=True,
timeout=60,
cwd=str(tmp_path),
)


def test_timeout_prints_test_name_and_stacktrace(tmp_path: Path) -> None:
result = _run_pytest(
tmp_path,
"""\
import time
import pytest

@pytest.mark.timeout(2)
def test_hangs():
time.sleep(999)
""",
)
assert result.returncode != 0
assert "TIMEOUT: test_it.py::test_hangs exceeded 2.0s" in result.stderr
assert "test_hangs" in result.stderr


def test_fast_test_not_killed(tmp_path: Path) -> None:
result = _run_pytest(
tmp_path,
"""\
import pytest

@pytest.mark.timeout(10)
def test_fast():
assert True
""",
)
assert result.returncode == 0
assert "TIMEOUT" not in result.stderr


def test_timeout_with_passing_and_hanging_test(tmp_path: Path) -> None:
result = _run_pytest(
tmp_path,
"""\
import time
import pytest

@pytest.mark.timeout(2)
def test_hangs_in_worker():
time.sleep(999)

def test_passes():
assert True
""",
)
assert result.returncode != 0
assert "TIMEOUT: test_it.py::test_hangs_in_worker exceeded 2.0s" in result.stderr
assert "test_hangs_in_worker" in result.stderr
Loading
Loading