Skip to content

Commit 6c68c8b

Browse files
authored
test: deflake integration tests by polling instead of fixed sleeps (#844)
## Summary Started as a fix for [this flaky CI run](https://github.com/apify/apify-client-python/actions/runs/26818252886/job/79065522664) of `test_request_queue_unlock_requests[sync]` (`assert 2 == 3` on `unlocked_count`, caused by replication lag between `list_and_lock_head` and `unlock_requests`), and grew into deflaking the integration test suite as a whole. **Changes:** - Add a `poll_until_condition` helper to `tests/integration/_utils.py`: polls a sync-or-async callable at a constant interval until a condition holds or a wall-clock timeout expires. An optional `backoff_factor` multiplies the interval after each poll, for highly variable waits (e.g. Actor run container startup) where a growing delay covers a long timeout with few calls. - Fix the flaky unlock test by polling `list_head` until the locked IDs disappear from the queue head before unlocking. - Replace all hand-rolled `for _ in range(5): sleep(1); read; break` polling loops (10×) and single fixed `sleep(1)` waits (26×) across the request queue, dataset, key-value store, and run integration tests with `poll_until_condition`. This makes the tests both faster on the happy path (no unconditional sleep) and more robust under load (polls until the timeout instead of hoping 1 s is enough). - Generalize `maybe_await` to accept any awaitable. The three `iterate_keys` sleeps in the KVS tests are intentionally left as-is: draining an iterator per attempt wants attempt-count semantics (like `collect_iterate_until_present`), not a wall-clock deadline. Follow-up to #786.
1 parent f72b087 commit 6c68c8b

19 files changed

Lines changed: 375 additions & 273 deletions

tests/__init__.py

Whitespace-only changes.
Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import inspect
45
import secrets
56
import string
67
import time
78
from collections.abc import AsyncIterator, Iterator
89
from dataclasses import dataclass
9-
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, overload
10+
from typing import TYPE_CHECKING, Any, Protocol, TypeVar, cast, overload
1011

1112
import pytest
1213

1314
if TYPE_CHECKING:
14-
from collections.abc import Callable, Coroutine
15+
from collections.abc import Awaitable, Callable
1516

1617
# Environment variable names for test configuration
1718
TOKEN_ENV_VAR = 'APIFY_TEST_USER_API_TOKEN'
@@ -92,22 +93,14 @@ def get_random_resource_name(label: str) -> str:
9293
return name_template.format(label, get_crypto_random_object_id(random_id_length))
9394

9495

95-
@overload
96-
async def maybe_await(value: Coroutine[Any, Any, T]) -> T: ...
97-
98-
99-
@overload
100-
async def maybe_await(value: T) -> T: ...
101-
102-
103-
async def maybe_await(value: T | Coroutine[Any, Any, T]) -> T:
104-
"""Await coroutines, pass through other values.
96+
async def maybe_await(value: Awaitable[T] | T) -> T:
97+
"""Await `value` if it is awaitable, otherwise return it unchanged.
10598
10699
Enables unified test code for both sync and async clients:
107100
result = await maybe_await(client.datasets().list())
108101
"""
109-
if hasattr(value, '__await__'):
110-
return await value # ty: ignore[invalid-await]
102+
if inspect.isawaitable(value):
103+
return await cast('Awaitable[T]', value)
111104
return value
112105

113106

@@ -119,6 +112,56 @@ async def maybe_sleep(seconds: float, *, is_async: bool) -> None:
119112
time.sleep(seconds) # noqa: ASYNC251
120113

121114

115+
@overload
116+
async def poll_until_condition(
117+
fn: Callable[[], Awaitable[T]],
118+
condition: Callable[[T], bool] = ...,
119+
*,
120+
timeout: float = ...,
121+
poll_interval: float = ...,
122+
backoff_factor: float = ...,
123+
) -> T: ...
124+
@overload
125+
async def poll_until_condition(
126+
fn: Callable[[], T],
127+
condition: Callable[[T], bool] = ...,
128+
*,
129+
timeout: float = ...,
130+
poll_interval: float = ...,
131+
backoff_factor: float = ...,
132+
) -> T: ...
133+
async def poll_until_condition(
134+
fn: Callable[[], Awaitable[T] | T],
135+
condition: Callable[[T], bool] = bool,
136+
*,
137+
timeout: float = 5,
138+
poll_interval: float = 1,
139+
backoff_factor: float = 1,
140+
) -> T:
141+
"""Poll `fn` until `condition(result)` is True or the timeout expires.
142+
143+
Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed.
144+
Returns the last polled result regardless of whether the condition was met, so the caller can run its own
145+
assertion. The default condition checks for a truthy result.
146+
147+
Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly
148+
created resource appearing in a listing) that may take a variable amount of time to propagate. For highly
149+
variable wait times (e.g. an Actor run container starting up), pass `backoff_factor` > 1 to multiply the
150+
interval after each poll, covering a long timeout with few calls.
151+
"""
152+
deadline = time.monotonic() + timeout
153+
delay = poll_interval
154+
result = await maybe_await(fn())
155+
while not condition(result):
156+
remaining = deadline - time.monotonic()
157+
if remaining <= 0:
158+
break
159+
await asyncio.sleep(min(delay, remaining))
160+
delay *= backoff_factor
161+
result = await maybe_await(fn())
162+
return result
163+
164+
122165
async def collect_iterate_until_present(
123166
iterator_factory: Callable[[], Iterator[_HasIdT] | AsyncIterator[_HasIdT]],
124167
expected_ids: set[str],
@@ -132,7 +175,7 @@ async def collect_iterate_until_present(
132175
133176
Handles eventual consistency on listing endpoints: under parallel load a freshly
134177
created resource may not appear in the listing for a short window. Each attempt
135-
builds a fresh iterator via `iterator_factory`, drains it, and breaks early once
178+
builds a fresh iterator via `iterator_factory`, drains it, and stops early once
136179
`expected_ids` is a subset of the collected items' `.id` values. The most recent
137180
collection is returned regardless of whether the condition was met, so the caller
138181
can run its own assertion with a helpful failure message.
@@ -142,18 +185,16 @@ async def collect_iterate_until_present(
142185
expected_ids: IDs that must all appear in the collected items.
143186
item_type: Asserted to match the runtime type of each yielded item.
144187
is_async: Whether the iterator is async (and so are sleeps).
145-
max_attempts: Maximum number of polling rounds.
146-
interval: Seconds to sleep before each attempt.
188+
max_attempts: Maximum number of polling rounds, guaranteed regardless of how long each drain takes.
189+
interval: Seconds to sleep between attempts.
147190
148191
Returns:
149192
The most recently collected items.
150193
"""
151-
collected: list[_HasIdT] = []
152-
for attempt in range(max_attempts):
153-
if attempt > 0:
154-
await maybe_sleep(interval, is_async=is_async)
194+
195+
async def drain() -> list[_HasIdT]:
155196
iterator = iterator_factory()
156-
collected = []
197+
collected: list[_HasIdT] = []
157198
if is_async:
158199
assert isinstance(iterator, AsyncIterator)
159200
async for item in iterator:
@@ -164,8 +205,16 @@ async def collect_iterate_until_present(
164205
for item in iterator:
165206
assert isinstance(item, item_type)
166207
collected.append(item)
208+
return collected
209+
210+
# Loop on attempt count rather than a wall-clock deadline: drains take HTTP time, and charging it
211+
# against a deadline would mean fewer retries under load — exactly when they are needed most.
212+
collected = await drain()
213+
for _ in range(max_attempts - 1):
167214
if expected_ids.issubset(item.id for item in collected):
168215
break
216+
await maybe_sleep(interval, is_async=is_async)
217+
collected = await drain()
169218
return collected
170219

171220

tests/integration/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import pytest
88

9-
from ._utils import (
9+
from .._utils import (
1010
API_URL_ENV_VAR,
1111
TOKEN_ENV_VAR,
1212
TOKEN_ENV_VAR_2,

tests/integration/test_actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import UTC, datetime, timedelta
77
from typing import TYPE_CHECKING
88

9-
from ._utils import get_random_resource_name, maybe_await
9+
from .._utils import get_random_resource_name, maybe_await
1010
from apify_client._models import (
1111
Actor,
1212
ActorChargeEvent,

tests/integration/test_actor_env_var.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from collections.abc import AsyncIterator, Iterator
66
from typing import TYPE_CHECKING
77

8-
from ._utils import get_random_resource_name, maybe_await
8+
from .._utils import get_random_resource_name, maybe_await
99
from apify_client._models import Actor, EnvVar, ListOfEnvVars
1010

1111
if TYPE_CHECKING:

tests/integration/test_actor_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from collections.abc import AsyncIterator, Iterator
66
from typing import TYPE_CHECKING
77

8-
from ._utils import get_random_resource_name, maybe_await
8+
from .._utils import get_random_resource_name, maybe_await
99
from apify_client._models import Actor, ListOfVersions, Version
1010

1111
if TYPE_CHECKING:

tests/integration/test_apify_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from typing import TYPE_CHECKING
66

7-
from ._utils import maybe_await
7+
from .._utils import maybe_await
88
from apify_client._models import UserPrivateInfo, UserPublicInfo
99

1010
if TYPE_CHECKING:

tests/integration/test_build.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from datetime import timedelta
77
from typing import TYPE_CHECKING
88

9-
from ._utils import get_random_resource_name, maybe_await
9+
from .._utils import get_random_resource_name, maybe_await
1010
from apify_client._models import Actor, Build, BuildShort, ListOfBuilds
1111

1212
if TYPE_CHECKING:

0 commit comments

Comments
 (0)