From c1ed7fa6b36aa48eb2be97ebac0d108f6884e05b Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 4 Nov 2025 15:43:00 +0000 Subject: [PATCH 1/2] use `shared` mode `ApifyStorageClient` for test_concurrent_processing_simulation --- .../integration/actor/test_actor_request_queue.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/integration/actor/test_actor_request_queue.py b/tests/integration/actor/test_actor_request_queue.py index a7a6059f..cfe3deb1 100644 --- a/tests/integration/actor/test_actor_request_queue.py +++ b/tests/integration/actor/test_actor_request_queue.py @@ -3,8 +3,6 @@ import asyncio from typing import TYPE_CHECKING -import pytest - from .._utils import generate_unique_resource_name from apify import Actor from apify._models import ActorRun @@ -387,9 +385,6 @@ async def main() -> None: assert run_result.status == 'SUCCEEDED' -@pytest.mark.skip( - reason='The Apify RQ client is not resilient to concurrent processing, making this test flaky. See issue #529.' -) async def test_concurrent_processing_simulation( make_actor: MakeActorFunction, run_actor: RunActorFunction, @@ -397,6 +392,16 @@ async def test_concurrent_processing_simulation( """Test simulation of concurrent request processing.""" async def main() -> None: + from crawlee import service_locator + + from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient + + # Use `request_queue_access='shared' for concurrent access` + service_locator.set_storage_client( + SmartApifyStorageClient( + cloud_storage_client=ApifyStorageClient(request_queue_access='shared'), + ) + ) async with Actor: rq = await Actor.open_request_queue() Actor.log.info('Request queue opened') From eeac476e0235f83852dd7ea633018a1a768fd6a6 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Mon, 17 Nov 2025 19:17:57 +0000 Subject: [PATCH 2/2] fix error branch --- tests/integration/actor/test_actor_request_queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integration/actor/test_actor_request_queue.py b/tests/integration/actor/test_actor_request_queue.py index cfe3deb1..1cc4c543 100644 --- a/tests/integration/actor/test_actor_request_queue.py +++ b/tests/integration/actor/test_actor_request_queue.py @@ -417,18 +417,21 @@ async def main() -> None: # Simulate concurrent workers async def worker() -> int: processed = 0 + request_counter = 0 while request := await rq.fetch_next_request(): # Simulate some work await asyncio.sleep(0.01) # Randomly reclaim some requests (simulate failures) - if processed % 7 == 0 and processed > 0: # Reclaim every 7th request + if request_counter % 5 == 0 and request_counter > 0: # Reclaim every 5th request await rq.reclaim_request(request) else: await rq.mark_request_as_handled(request) processed += 1 + request_counter += 1 + return processed # Run multiple workers concurrently