diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/_utils.py b/tests/_utils.py new file mode 100644 index 00000000..71480921 --- /dev/null +++ b/tests/_utils.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import asyncio +import inspect +import time +from typing import TYPE_CHECKING, TypeVar, cast, overload + +from crawlee._utils.crypto import crypto_random_object_id + +from apify._crypto import _load_public_key, load_private_key + +if TYPE_CHECKING: + from collections.abc import Awaitable, Callable + +T = TypeVar('T') + + +async def maybe_await(value: Awaitable[T] | T) -> T: + """Await `value` if it is awaitable, otherwise return it unchanged. + + Lets `poll_until_condition` accept both sync and async callables. + """ + if inspect.isawaitable(value): + return await cast('Awaitable[T]', value) + return cast('T', value) + + +@overload +async def poll_until_condition( + fn: Callable[[], Awaitable[T]], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., + backoff_factor: float = ..., +) -> T: ... +@overload +async def poll_until_condition( + fn: Callable[[], T], + condition: Callable[[T], bool] = ..., + *, + timeout: float = ..., + poll_interval: float = ..., + backoff_factor: float = ..., +) -> T: ... +async def poll_until_condition( + fn: Callable[[], Awaitable[T] | T], + condition: Callable[[T], bool] = bool, + *, + timeout: float = 5, + poll_interval: float = 1, + backoff_factor: float = 1, +) -> T: + """Poll `fn` until `condition(result)` is True or the timeout expires. + + Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. + Returns the last polled result regardless of whether the condition was met, so the caller can run its own + assertion. The default condition checks for a truthy result. Pass `timeout=0` to call `fn` exactly once. + + Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent state (e.g. a freshly + created resource appearing in a listing) that may take a variable amount of time to propagate. For highly + variable wait times (e.g. an Actor run container starting up), pass `backoff_factor` > 1 to multiply the + interval after each poll, covering a long timeout with few calls. + """ + deadline = time.monotonic() + timeout + delay = poll_interval + result = await maybe_await(fn()) + while not condition(result): + remaining = deadline - time.monotonic() + if remaining <= 0: + break + await asyncio.sleep(min(delay, remaining)) + delay *= backoff_factor + result = await maybe_await(fn()) + return result + + +def generate_unique_resource_name(label: str) -> str: + """Generates a unique resource name, which will contain the given label.""" + name_template = 'python-sdk-tests-{}-generated-{}' + template_length = len(name_template.format('', '')) + api_name_limit = 63 + generated_random_id_length = 8 + label_length_limit = api_name_limit - template_length - generated_random_id_length + + label = label.replace('_', '-') + assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' + + return name_template.format(label, crypto_random_object_id(generated_random_id_length)) + + +# RSA test key material shared across crypto-related tests. +# NOTE: Uses the same keys as in: +# https://github.com/apify/apify-shared-js/blob/master/test/crypto.test.ts +PRIVATE_KEY_PEM_BASE64 = 'LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpQcm9jLVR5cGU6IDQsRU5DUllQVEVECkRFSy1JbmZvOiBERVMtRURFMy1DQkMsNTM1QURERjIzNUQ4QkFGOQoKMXFWUzl0S0FhdkVhVUVFMktESnpjM3plMk1lZkc1dmVEd2o1UVJ0ZkRaMXdWNS9VZmIvcU5sVThTSjlNaGhKaQp6RFdrWExueUUzSW0vcEtITVZkS0czYWZkcFRtcis2TmtidXptd0dVMk0vSWpzRjRJZlpad0lGbGJoY09jUnp4CmZmWVIvTlVyaHNrS1RpNGhGV0lBUDlLb3Z6VDhPSzNZY3h6eVZQWUxYNGVWbWt3UmZzeWkwUU5Xb0tGT3d0ZC8KNm9HYzFnd2piRjI5ZDNnUThZQjFGWmRLa1AyMTJGbkt1cTIrUWgvbE1zTUZrTHlTQTRLTGJ3ZG1RSXExbE1QUwpjbUNtZnppV3J1MlBtNEZoM0dmWlQyaE1JWHlIRFdEVzlDTkxKaERodExOZ2RRamFBUFpVT1E4V2hwSkE5MS9vCjJLZzZ3MDd5Z2RCcVd5dTZrc0pXcjNpZ1JpUEJ5QmVNWEpEZU5HY3NhaUZ3Q2c5eFlja1VORXR3NS90WlRsTjIKSEdZV0NpVU5Ed0F2WllMUHR1SHpIOFRFMGxsZm5HR0VuVC9QQlp1UHV4andlZlRleE1mdzFpbGJRU3lkcy9HMgpOOUlKKzkydms0N0ZXR2NOdGh1Q3lCbklva0NpZ0c1ZlBlV2IwQTdpdjk0UGtwRTRJZ3plc0hGQ0ZFQWoxWldLCnpQdFRBQlkwZlJrUzBNc3UwMHYxOXloTTUrdFUwYkVCZWo2eWpzWHRoYzlwS01hcUNIZWlQTC9TSHRkaWsxNVMKQmU4Sml4dVJxZitUeGlYWWVuNTg2aDlzTFpEYzA3cGpkUGp2NVNYRnBYQjhIMlVxQ0tZY2p4R3RvQWpTV0pjWApMNHc3RHNEby80bVg1N0htR09iamlCN1ZyOGhVWEJDdFh2V0dmQXlmcEFZNS9vOXowdm4zREcxaDc1NVVwdDluCkF2MFZrbm9qcmJVYjM1ZlJuU1lYTVltS01LSnpNRlMrdmFvRlpwV0ZjTG10cFRWSWNzc0JGUEYyZEo3V1c0WHMKK0d2Vkl2eFl3S2wyZzFPTE1TTXRZa09vekdlblBXTzdIdU0yMUVKVGIvbHNEZ25GaTkrYWRGZHBLY3R2cm0zdgpmbW1HeG5pRmhLU05GU0xtNms5YStHL2pjK3NVQVBhb2FZNEQ3NHVGajh0WGp0eThFUHdRRGxVUGRVZld3SE9PClF3bVgyMys1REh4V0VoQy91Tm8yNHNNY2ZkQzFGZUpBV281bUNuVU5vUVVmMStNRDVhMzNJdDhhMmlrNUkxUWoKeSs1WGpRaG0xd3RBMWhWTWE4aUxBR0toT09lcFRuK1VBZHpyS0hvNjVtYzNKbGgvSFJDUXJabnVxWkErK0F2WgpjeWU0dWZGWC8xdmRQSTdLb2Q0MEdDM2dlQnhweFFNYnp1OFNUcGpOcElJRkJvRVc5dFRhemUzeHZXWnV6dDc0CnFjZS8xWURuUHBLeW5lM0xGMk94VWoyYWVYUW5YQkpYcGhTZTBVTGJMcWJtUll4bjJKWkl1d09RNHV5dm94NjUKdG9TWGNac054dUs4QTErZXNXR3JSN3pVc0djdU9QQTFERE9Ja2JjcGtmRUxMNjk4RTJRckdqTU9JWnhrcWdxZQoySE5VNktWRmV2NzdZeEJDbm1VcVdXZEhYMjcyU2NPMUYzdWpUdFVnRVBNWGN0aEdBckYzTWxEaUw1Q0k0RkhqCnhHc3pVemxzalRQTmpiY2MzdUE2MjVZS3VVZEI2c1h1Rk5NUHk5UDgwTzBpRWJGTXl3MWxmN2VpdFhvaUUxWVoKc3NhMDVxTUx4M3pPUXZTLzFDdFpqaFp4cVJMRW5pQ3NWa2JVRlVYclpodEU4dG94bGpWSUtpQ25qbitORmtqdwo2bTZ1anpBSytZZHd2Nk5WMFB4S0gwUk5NYVhwb1lmQk1oUmZ3dGlaS3V3Y2hyRFB5UEhBQ2J3WXNZOXdtUE9rCnpwdDNxWi9JdDVYTmVqNDI0RzAzcGpMbk1sd1B1T1VzYmFQUWQ2VHU4TFhsckZReUVjTXJDNHdjUTA1SzFVN3kKM1NNN3RFaTlnbjV3RjY1YVI5eEFBR0grTUtMMk5WNnQrUmlTazJVaWs1clNmeDE4Mk9wYmpSQ2grdmQ4UXhJdwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=' # noqa: E501 +PRIVATE_KEY_PASSWORD = 'pwd1234' +PUBLIC_KEY_PEM_BASE64 = 'LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0dis3NlNXbklhOFFKWC94RUQxRQpYdnBBQmE3ajBnQnVYenJNUU5adjhtTW1RU0t2VUF0TmpOL2xacUZpQ0haZUQxU2VDcGV1MnFHTm5XbGRxNkhUCnh5cXJpTVZEbFNKaFBNT09QSENISVNVdFI4Tk5lR1Y1MU0wYkxJcENabHcyTU9GUjdqdENWejVqZFRpZ1NvYTIKQWxrRUlRZWQ4UVlDKzk1aGJoOHk5bGcwQ0JxdEdWN1FvMFZQR2xKQ0hGaWNuaWxLVFFZay9MZzkwWVFnUElPbwozbUppeFl5bWFGNmlMZTVXNzg1M0VHWUVFVWdlWmNaZFNjaGVBMEdBMGpRSFVTdnYvMEZjay9adkZNZURJOTVsCmJVQ0JoQjFDbFg4OG4wZUhzUmdWZE5vK0NLMDI4T2IvZTZTK1JLK09VaHlFRVdPTi90alVMdGhJdTJkQWtGcmkKOFFJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==' # noqa: E501 +PRIVATE_KEY = load_private_key(PRIVATE_KEY_PEM_BASE64, PRIVATE_KEY_PASSWORD) +PUBLIC_KEY = _load_public_key(PUBLIC_KEY_PEM_BASE64) diff --git a/tests/e2e/_utils.py b/tests/e2e/_utils.py deleted file mode 100644 index b5323272..00000000 --- a/tests/e2e/_utils.py +++ /dev/null @@ -1,17 +0,0 @@ -from __future__ import annotations - -from crawlee._utils.crypto import crypto_random_object_id - - -def generate_unique_resource_name(label: str) -> str: - """Generates a unique resource name, which will contain the given label.""" - name_template = 'python-sdk-tests-{}-generated-{}' - template_length = len(name_template.format('', '')) - api_name_limit = 63 - generated_random_id_length = 8 - label_length_limit = api_name_limit - template_length - generated_random_id_length - - label = label.replace('_', '-') - assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' - - return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index bdcc9883..df44a233 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -17,7 +17,7 @@ from crawlee import service_locator import apify._actor -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify._models import ActorRun from apify.storage_clients._apify._alias_resolving import AliasResolver diff --git a/tests/e2e/test_actor_api_helpers.py b/tests/e2e/test_actor_api_helpers.py index 3747dd3b..b978e4d3 100644 --- a/tests/e2e/test_actor_api_helpers.py +++ b/tests/e2e/test_actor_api_helpers.py @@ -7,7 +7,7 @@ from apify_shared.consts import ActorPermissionLevel from crawlee._utils.crypto import crypto_random_object_id -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor from apify._models import ActorRun @@ -393,6 +393,7 @@ async def test_actor_adds_webhook_and_receives_event( ) -> None: async def main_server() -> None: import os + import time from http.server import BaseHTTPRequestHandler, HTTPServer from apify_shared.consts import ActorEnvVars @@ -419,12 +420,19 @@ def do_POST(self) -> None: container_port = int(os.getenv(ActorEnvVars.WEB_SERVER_PORT, '')) with HTTPServer(('', container_port), WebhookHandler) as server: await Actor.set_value('INITIALIZED', value=True) - while not webhook_body: + # Bound the wait so that a webhook that never fires (e.g. one that did not propagate before the + # client run finished) surfaces as an empty WEBHOOK_BODY in the test instead of blocking here + # until the run times out. + server.timeout = 5 + deadline = time.monotonic() + 300 + while not webhook_body and time.monotonic() < deadline: server.handle_request() await Actor.set_value('WEBHOOK_BODY', webhook_body) async def main_client() -> None: + import asyncio + from apify import Webhook, WebhookEventType async with Actor: @@ -438,6 +446,12 @@ async def main_client() -> None: ) ) + # Keep the run alive for a moment after registering the webhook. Without this, the run finishes + # just milliseconds later and the platform may process the run-succeeded event before the freshly + # added ad-hoc webhook has propagated, in which case the webhook never fires and the server Actor + # waits until it times out. + await asyncio.sleep(5) + server_actor, client_actor = await asyncio.gather( make_actor(label='add-webhook-server', main_func=main_server), make_actor(label='add-webhook-client', main_func=main_client), @@ -446,10 +460,15 @@ async def main_client() -> None: server_actor_run = await server_actor.start() server_actor_container_url = server_actor_run['containerUrl'] - server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED') - while not server_actor_initialized: - server_actor_initialized = await server_actor.last_run().key_value_store().get_record('INITIALIZED') - await asyncio.sleep(1) + # Wait for the server Actor's container to start up and bind its HTTP server. The startup time is highly + # variable (image pull, container creation), so poll with a growing interval instead of a fixed sleep. + server_actor_initialized = await poll_until_condition( + lambda: server_actor.last_run().key_value_store().get_record('INITIALIZED'), + timeout=300, + poll_interval=1, + backoff_factor=1.5, + ) + assert server_actor_initialized is not None, 'The server Actor did not initialize in time.' ac_run_result = await run_actor( client_actor, @@ -465,7 +484,7 @@ async def main_client() -> None: webhook_body_record = await server_actor.last_run().key_value_store().get_record('WEBHOOK_BODY') assert webhook_body_record is not None - assert webhook_body_record['value'] != '' + assert webhook_body_record['value'] != '', 'The ad-hoc webhook never fired (it likely did not propagate in time).' parsed_webhook_body = json.loads(webhook_body_record['value']) assert parsed_webhook_body['eventData']['actorId'] == ac_run_result.act_id diff --git a/tests/e2e/test_actor_charge.py b/tests/e2e/test_actor_charge.py index f8b1f393..649d141d 100644 --- a/tests/e2e/test_actor_charge.py +++ b/tests/e2e/test_actor_charge.py @@ -1,25 +1,29 @@ from __future__ import annotations -import asyncio from decimal import Decimal +from functools import partial from typing import TYPE_CHECKING import pytest_asyncio from apify_shared.consts import ActorJobStatus +from .._utils import poll_until_condition from apify import Actor from apify._models import ActorRun if TYPE_CHECKING: - from collections.abc import Iterable - from apify_client import ApifyClientAsync from apify_client.clients import ActorClientAsync from .conftest import MakeActorFunction, RunActorFunction +async def _get_run(apify_client_async: ApifyClientAsync, run_id: str) -> ActorRun: + """Fetch the current state of the given run from the platform.""" + return ActorRun.model_validate(await apify_client_async.run(run_id).get()) + + @pytest_asyncio.fixture(scope='module', loop_scope='module') async def ppe_push_data_actor_build(make_actor: MakeActorFunction) -> str: async def main() -> None: @@ -112,13 +116,6 @@ async def ppe_actor( return apify_client_async.actor(ppe_actor_build) -def retry_counter(total_attempts: int) -> Iterable[tuple[bool, int]]: - for retry in range(total_attempts - 1): - yield False, retry - - yield True, total_attempts - 1 - - async def test_actor_charge_basic( ppe_actor: ActorClientAsync, run_actor: RunActorFunction, @@ -126,19 +123,16 @@ async def test_actor_charge_basic( ) -> None: run = await run_actor(ppe_actor) - # Refetch until the platform gets its act together - for is_last_attempt, _ in retry_counter(30): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + # Refetch until the charged event counts propagate on the platform. + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 4}, + timeout=30, + poll_interval=1, + ) - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 4} - break - except AssertionError: - if is_last_attempt: - raise + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 4} async def test_actor_charge_limit( @@ -148,19 +142,16 @@ async def test_actor_charge_limit( ) -> None: run = await run_actor(ppe_actor, max_total_charge_usd=Decimal('0.2')) - # Refetch until the platform gets its act together - for is_last_attempt, _ in retry_counter(30): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) + # Refetch until the charged event counts propagate on the platform. + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == {'foobar': 2}, + timeout=30, + poll_interval=1, + ) - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == {'foobar': 2} - break - except AssertionError: - if is_last_attempt: - raise + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == {'foobar': 2} async def test_actor_push_data_charges_both_events( @@ -171,24 +162,23 @@ async def test_actor_push_data_charges_both_events( """Test that push_data charges both the explicit event and the synthetic apify-default-dataset-item event.""" run = await run_actor(ppe_push_data_actor) - # Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`: - # the platform computes them from dataset writes asynchronously, so they propagate more slowly than - # explicit charges (which are reflected immediately via the charge endpoint). - for is_last_attempt, _ in retry_counter(120): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == { - 'push-item': 5, - 'apify-default-dataset-item': 5, - } - break - except AssertionError: - if is_last_attempt: - raise + expected_counts = { + 'push-item': 5, + 'apify-default-dataset-item': 5, + } + + # Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them + # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are + # reflected immediately via the charge endpoint). + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, + timeout=120, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == expected_counts async def test_actor_push_data_combined_budget_limit( @@ -202,21 +192,20 @@ async def test_actor_push_data_combined_budget_limit( """ run = await run_actor(ppe_push_data_actor, max_total_charge_usd=Decimal('0.20')) - # Use a longer retry window (120 attempts x 1 s) for synthetic events like `apify-default-dataset-item`: - # the platform computes them from dataset writes asynchronously, so they propagate more slowly than - # explicit charges (which are reflected immediately via the charge endpoint). - for is_last_attempt, _ in retry_counter(120): - await asyncio.sleep(1) - updated_run = await apify_client_async.run(run.id).get() - run = ActorRun.model_validate(updated_run) - - try: - assert run.status == ActorJobStatus.SUCCEEDED - assert run.charged_event_counts == { - 'push-item': 2, - 'apify-default-dataset-item': 2, - } - break - except AssertionError: - if is_last_attempt: - raise + expected_counts = { + 'push-item': 2, + 'apify-default-dataset-item': 2, + } + + # Use a longer timeout for synthetic events like `apify-default-dataset-item`: the platform computes them + # from dataset writes asynchronously, so they propagate more slowly than explicit charges (which are + # reflected immediately via the charge endpoint). + run = await poll_until_condition( + partial(_get_run, apify_client_async, run.id), + lambda r: r.status == ActorJobStatus.SUCCEEDED and r.charged_event_counts == expected_counts, + timeout=120, + poll_interval=1, + ) + + assert run.status == ActorJobStatus.SUCCEEDED + assert run.charged_event_counts == expected_counts diff --git a/tests/integration/_utils.py b/tests/integration/_utils.py deleted file mode 100644 index 3fb26bbd..00000000 --- a/tests/integration/_utils.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -import asyncio -import time -from typing import TYPE_CHECKING, Literal, TypeVar - -from crawlee._utils.crypto import crypto_random_object_id - -from apify import Actor - -if TYPE_CHECKING: - from collections.abc import Awaitable, Callable - -T = TypeVar('T') - - -async def call_with_exp_backoff( - fn: Callable[[], Awaitable[T]], - *, - rq_access_mode: Literal['single', 'shared'], - max_retries: int = 5, -) -> T | None: - """Call an async callable with exponential backoff retries until it returns a truthy value. - - In shared request queue mode, there is a propagation delay before newly added, reclaimed, or handled requests - become visible in the API (see https://github.com/apify/apify-sdk-python/issues/808). This helper retries with - exponential backoff to handle that delay in integration tests. - - When `rq_access_mode` is `'single'`, the function is called once without retries. - """ - if rq_access_mode == 'single': - return await fn() - - if rq_access_mode == 'shared': - result = None - - for attempt in range(max_retries): - result = await fn() - - if result: - return result - - delay = 2**attempt - Actor.log.info(f'{fn} returned {result!r}, retrying in {delay}s (attempt {attempt + 1}/{max_retries})') - await asyncio.sleep(delay) - - return result - - raise ValueError(f'Invalid rq_access_mode: {rq_access_mode}') - - -async def poll_until_condition( - fn: Callable[[], Awaitable[T]], - condition: Callable[[T], bool], - *, - timeout: float = 60, - poll_interval: float = 5, -) -> T: - """Poll `fn` until `condition(result)` is True or the timeout expires. - - Polls `fn` at `poll_interval`-second intervals until `condition` is satisfied or `timeout` seconds have elapsed. - Returns the last polled result regardless of whether the condition was met. - - Use this instead of a fixed `asyncio.sleep` when waiting for eventually-consistent API state (e.g. request queue - stats) that may take a variable amount of time to propagate. - """ - deadline = time.monotonic() + timeout - result = await fn() - while not condition(result): - remaining = deadline - time.monotonic() - if remaining <= 0: - break - await asyncio.sleep(min(poll_interval, remaining)) - result = await fn() - return result - - -def generate_unique_resource_name(label: str) -> str: - """Generates a unique resource name, which will contain the given label.""" - name_template = 'python-sdk-tests-{}-generated-{}' - template_length = len(name_template.format('', '')) - api_name_limit = 63 - generated_random_id_length = 8 - label_length_limit = api_name_limit - template_length - generated_random_id_length - - label = label.replace('_', '-') - assert len(label) <= label_length_limit, f'Max label length is {label_length_limit}, but got {len(label)}' - - return name_template.format(label, crypto_random_object_id(generated_random_id_length)) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 30aa077d..05335cdc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -94,6 +94,23 @@ async def request_queue_apify( await rq.drop() +@pytest.fixture +def rq_access_mode(request: pytest.FixtureRequest) -> str: + """Return the access mode (`single` or `shared`) of the parametrized `request_queue_apify` fixture.""" + return request.node.callspec.params.get('request_queue_apify') + + +@pytest.fixture +def rq_poll_timeout(rq_access_mode: str) -> int: + """Return the `poll_until_condition` timeout matching the `request_queue_apify` access mode. + + In single mode, reads are immediately consistent, so the caller should poll exactly once (`timeout=0`). In + shared mode, there is a propagation delay between operations, so reads are retried for up to 30 seconds. + See https://github.com/apify/apify-sdk-python/issues/808. + """ + return 0 if rq_access_mode == 'single' else 30 + + @pytest.fixture(autouse=True) def _isolate_test_environment(prepare_test_env: Callable[[], None]) -> None: """Isolate the testing environment by resetting global state before each test. diff --git a/tests/integration/test_dataset.py b/tests/integration/test_dataset.py index 5a5d1b92..75a80695 100644 --- a/tests/integration/test_dataset.py +++ b/tests/integration/test_dataset.py @@ -6,7 +6,7 @@ from apify_shared.consts import ApifyEnvVars -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient from apify.storages import Dataset diff --git a/tests/integration/test_key_value_store.py b/tests/integration/test_key_value_store.py index 912ecfb0..2ea4672d 100644 --- a/tests/integration/test_key_value_store.py +++ b/tests/integration/test_key_value_store.py @@ -7,7 +7,7 @@ from apify_shared.consts import ApifyEnvVars from crawlee import service_locator -from ._utils import generate_unique_resource_name +from .._utils import generate_unique_resource_name from apify import Actor from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify._alias_resolving import AliasResolver diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 26b46113..240547fd 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -12,7 +12,7 @@ from crawlee import service_locator from crawlee.crawlers import BasicCrawler -from ._utils import call_with_exp_backoff, generate_unique_resource_name, poll_until_condition +from .._utils import generate_unique_resource_name, poll_until_condition from apify import Actor, Request from apify.storage_clients import ApifyStorageClient from apify.storage_clients._apify import ApifyRequestQueueClient @@ -25,10 +25,12 @@ from apify.storage_clients._apify._models import ApifyRequestQueueMetadata -# In shared mode, there is a propagation delay between operations so we use test helper -# `call_with_exp_backoff` for exponential backoff. The delay also means that the relative order of requests -# added or reclaimed close together is not guaranteed, so order-sensitive tests wait for propagation and -# relax exact-order assertions in shared mode. See https://github.com/apify/apify-sdk-python/issues/808. +# In shared mode, there is a propagation delay between operations, so we retry reads with the test helper +# `poll_until_condition` (exponential backoff). In single mode reads are immediately consistent, so we call once. +# The mode-appropriate timeout is provided by the `rq_poll_timeout` fixture (see conftest). The delay also means +# that the relative order of requests added or reclaimed close together is not guaranteed, so order-sensitive +# tests wait for propagation and relax exact-order assertions in shared mode. +# See https://github.com/apify/apify-sdk-python/issues/808. # How long to wait in shared mode for forefront operations to propagate to the queue head before fetching. _SHARED_MODE_PROPAGATION_DELAY = 10 @@ -36,10 +38,9 @@ async def test_add_and_fetch_requests( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test basic functionality of adding and fetching requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 Actor.log.info('Opening request queue...') @@ -51,7 +52,7 @@ async def test_add_and_fetch_requests( await rq.add_request(f'https://example.com/{i}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): Actor.log.info('Fetching next request...') queue_operation_info = await rq.mark_request_as_handled(next_request) assert queue_operation_info is not None, f'queue_operation_info={queue_operation_info}' @@ -65,16 +66,15 @@ async def test_add_and_fetch_requests( f'desired_request_count={desired_request_count}', ) Actor.log.info('Waiting for queue to be finished...') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_add_requests_in_batches( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding multiple requests in a single batch operation.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 rq = request_queue_apify @@ -86,7 +86,7 @@ async def test_add_requests_in_batches( Actor.log.info(f'Added {desired_request_count} requests in batch, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}...') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -100,16 +100,15 @@ async def test_add_requests_in_batches( f'handled_request_count={handled_request_count}', f'desired_request_count={desired_request_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_add_non_unique_requests_in_batch( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding requests with duplicate unique keys in batch.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') desired_request_count = 100 rq = request_queue_apify @@ -125,7 +124,7 @@ async def test_add_non_unique_requests_in_batch( Actor.log.info(f'Added {desired_request_count} requests with duplicate unique keys, total in queue: {total_count}') handled_request_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): if handled_request_count % 20 == 0: Actor.log.info(f'Processing request {handled_request_count + 1}: {next_request.url}') queue_operation_info = await rq.mark_request_as_handled(next_request) @@ -140,17 +139,17 @@ async def test_add_non_unique_requests_in_batch( f'handled_request_count={handled_request_count}', f'expected_count={expected_count}', ) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Processed {handled_request_count}/{expected_count} requests, finished: {is_finished}') assert is_finished is True, f'is_finished={is_finished}' async def test_forefront_requests_ordering( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, + rq_access_mode: str, ) -> None: """Test that forefront requests are processed before regular requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -174,7 +173,7 @@ async def test_forefront_requests_ordering( # Fetch requests and verify order. fetched_urls = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): Actor.log.info(f'Fetched request: {next_request.url}') fetched_urls.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -208,10 +207,9 @@ async def test_forefront_requests_ordering( async def test_request_unique_key_behavior( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test behavior of custom unique keys.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -241,7 +239,7 @@ async def test_request_unique_key_behavior( # Only 2 requests should be fetchable. fetched_count = 0 fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): fetched_count += 1 fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -259,10 +257,9 @@ async def test_request_unique_key_behavior( async def test_request_reclaim_functionality( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request reclaiming for failed processing.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -272,7 +269,7 @@ async def test_request_reclaim_functionality( Actor.log.info('Added test request') # Fetch and reclaim the request. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert fetched_request is not None Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -285,7 +282,12 @@ async def test_request_reclaim_functionality( # Should be able to fetch the same request again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. - request2 = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) + request2 = await poll_until_condition( + rq.fetch_next_request, + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert request2 is not None assert request2.url == fetched_request.url @@ -294,18 +296,17 @@ async def test_request_reclaim_functionality( # Mark as handled this time await rq.mark_request_as_handled(request2) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True async def test_request_reclaim_with_forefront( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, + rq_access_mode: str, ) -> None: """Test reclaiming requests to the front of the queue.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') - rq = request_queue_apify Actor.log.info('Request queue opened') @@ -316,7 +317,7 @@ async def test_request_reclaim_with_forefront( Actor.log.info('Added 3 requests') # Fetch first request. - first_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + first_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert first_request is not None Actor.log.info(f'Fetched first request: {first_request.url}') @@ -331,7 +332,12 @@ async def test_request_reclaim_with_forefront( # The reclaimed request should be fetched first again. A reclaimed request may take a moment to reappear # at the queue head (eventually-consistent API state), even in single mode, so poll until it does. - next_request = await poll_until_condition(rq.fetch_next_request, lambda result: result is not None) + next_request = await poll_until_condition( + rq.fetch_next_request, + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert next_request is not None assert next_request.url == first_request.url @@ -351,10 +357,9 @@ async def test_request_reclaim_with_forefront( async def test_complex_request_objects( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling complex Request objects with various properties.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -371,7 +376,7 @@ async def test_complex_request_objects( Actor.log.info(f'Added complex request: {complex_request.url} with method {complex_request.method}') # Fetch and verify all properties are preserved. - fetched_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + fetched_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert fetched_request is not None, f'fetched_request={fetched_request}' Actor.log.info(f'Fetched request: {fetched_request.url}') @@ -398,10 +403,9 @@ async def test_complex_request_objects( async def test_get_request_by_unique_key( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test retrieving specific requests by their unique_key.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -412,9 +416,10 @@ async def test_get_request_by_unique_key( request_unique_key = add_result.unique_key Actor.log.info(f'Request added with unique_key: {request_unique_key}') - retrieved_request = await call_with_exp_backoff( + retrieved_request = await poll_until_condition( lambda: rq.get_request(request_unique_key), - rq_access_mode=rq_access_mode, + timeout=rq_poll_timeout, + backoff_factor=2, ) assert retrieved_request is not None, f'retrieved_request={retrieved_request}' assert retrieved_request.url == 'https://example.com/test', f'retrieved_request.url={retrieved_request.url}' @@ -429,10 +434,9 @@ async def test_get_request_by_unique_key( async def test_metadata_tracking( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test request queue metadata and counts.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -457,7 +461,7 @@ async def test_metadata_tracking( # Process some requests. for _ in range(3): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) @@ -473,10 +477,9 @@ async def test_metadata_tracking( async def test_batch_operations_performance( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test batch operations vs individual operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -498,7 +501,7 @@ async def test_batch_operations_performance( # Process all requests. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): processed_count += 1 await rq.mark_request_as_handled(next_request) if processed_count >= 50: # Safety break @@ -507,17 +510,16 @@ async def test_batch_operations_performance( Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 50, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_state_consistency( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test queue state consistency during concurrent operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -535,7 +537,7 @@ async def test_state_consistency( reclaimed_requests = [] for i in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: if i % 2 == 0: # Process even indices await rq.mark_request_as_handled(next_request) @@ -562,31 +564,30 @@ async def test_state_consistency( # Process remaining requests. remaining_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_count} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' -async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pytest.FixtureRequest) -> None: +async def test_empty_rq_behavior(request_queue_apify: RequestQueue, rq_poll_timeout: int) -> None: """Test behavior with empty queues.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') # Test empty queue operations - is_empty = await call_with_exp_backoff(rq.is_empty, rq_access_mode=rq_access_mode) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_empty = await poll_until_condition(rq.is_empty, timeout=rq_poll_timeout, backoff_factor=2) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Empty queue - is_empty: {is_empty}, is_finished: {is_finished}') assert is_empty is True, f'is_empty={is_empty}' assert is_finished is True, f'is_finished={is_finished}' # Fetch from empty queue - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) Actor.log.info(f'Fetch result from empty queue: {next_request}') assert next_request is None, f'request={next_request}' @@ -605,10 +606,9 @@ async def test_empty_rq_behavior(request_queue_apify: RequestQueue, request: pyt async def test_large_batch_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling large batches of requests.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -628,23 +628,22 @@ async def test_large_batch_operations( # Process all in chunks to test performance. processed_count = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): await rq.mark_request_as_handled(next_request) processed_count += 1 Actor.log.info(f'Processing completed. Total processed: {processed_count}') assert processed_count == 500, f'processed_count={processed_count}' - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished is True, f'is_finished={is_finished}' async def test_mixed_string_and_request_objects( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test adding both string URLs and Request objects.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -673,7 +672,7 @@ async def test_mixed_string_and_request_objects( # Fetch and verify all types work. fetched_requests = [] - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): fetched_requests.append(next_request) await rq.mark_request_as_handled(next_request) @@ -693,10 +692,9 @@ async def test_mixed_string_and_request_objects( async def test_persistence_across_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that queue state persists across different operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') # Open queue and add some requests rq = request_queue_apify @@ -713,7 +711,7 @@ async def test_persistence_across_operations( # Process some requests. processed_count = 0 for _ in range(5): - next_request = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + next_request = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) if next_request: await rq.mark_request_as_handled(next_request) processed_count += 1 @@ -738,12 +736,12 @@ async def test_persistence_across_operations( # Process remaining. remaining_processed = 0 - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): remaining_processed += 1 await rq.mark_request_as_handled(next_request) Actor.log.info(f'Processed {remaining_processed} remaining requests') - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) final_total = await rq.get_total_count() final_handled = await rq.get_handled_count() @@ -775,7 +773,12 @@ async def test_request_deduplication_edge_cases(request_queue_apify: RequestQueu for url, expected_duplicate in urls_and_deduplication_expectations: # In shared mode, `add_request` may transiently return None until the operation propagates, # so poll with backoff until it returns a result. - result = await poll_until_condition(lambda url=url: rq.add_request(url), lambda result: result is not None) + result = await poll_until_condition( + lambda url=url: rq.add_request(url), + lambda result: result is not None, + timeout=60, + poll_interval=5, + ) assert result is not None results.append(result.was_already_present) @@ -809,10 +812,10 @@ async def test_request_deduplication_edge_cases(request_queue_apify: RequestQueu async def test_request_ordering_with_mixed_operations( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, + rq_access_mode: str, ) -> None: """Test request ordering with mixed add/reclaim operations.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify Actor.log.info('Request queue opened') @@ -823,7 +826,7 @@ async def test_request_ordering_with_mixed_operations( Actor.log.info('Added initial requests') # Fetch one and reclaim to forefront. - request1 = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request1 = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request1 is not None, f'request1={request1}' if rq_access_mode == 'shared': # In shared mode, the relative order of requests added close together is not guaranteed (see the note at @@ -848,7 +851,7 @@ async def test_request_ordering_with_mixed_operations( # Fetch all requests and verify forefront behavior. urls_ordered = list[str]() - while next_request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode): + while next_request := await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2): urls_ordered.append(next_request.url) await rq.mark_request_as_handled(next_request) @@ -914,7 +917,12 @@ async def test_request_queue_metadata_another_client( await api_client.add_request(Request.from_url('http://example.com/1').model_dump(by_alias=True, exclude={'id'})) # Poll until the API has propagated the metadata change. - metadata = await poll_until_condition(rq.get_metadata, lambda m: m.total_request_count >= 1) + metadata = await poll_until_condition( + rq.get_metadata, + lambda m: m.total_request_count >= 1, + timeout=60, + poll_interval=5, + ) assert metadata.total_request_count == 1 @@ -1015,6 +1023,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= expected_write_count, + timeout=60, + poll_interval=5, ) Actor.log.info(f'{metadata.stats=}') assert metadata.stats.write_count == expected_write_count @@ -1043,6 +1053,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= len(requests), + timeout=60, + poll_interval=5, ) stats_before = metadata.stats Actor.log.info(stats_before) @@ -1057,6 +1069,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.read_count - stats_before.read_count >= len(requests), + timeout=60, + poll_interval=5, ) stats_after = metadata.stats Actor.log.info(stats_after) @@ -1084,6 +1098,8 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: apify_metadata = await poll_until_condition( _get_rq_metadata, lambda m: m.stats.write_count >= add_request_count, + timeout=60, + poll_interval=5, ) assert hasattr(apify_metadata, 'stats') @@ -1092,10 +1108,9 @@ async def _get_rq_metadata() -> ApifyRequestQueueMetadata: async def test_rq_long_url( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test handling of requests with long URLs and extended unique keys.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') rq = request_queue_apify long_url_request = Request.from_url( 'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1', @@ -1109,24 +1124,23 @@ async def test_rq_long_url( assert processed_request is not None assert processed_request.id == request_id - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request_obtained is not None await rq.mark_request_as_handled(request_obtained) - is_finished = await call_with_exp_backoff(rq.is_finished, rq_access_mode=rq_access_mode) + is_finished = await poll_until_condition(rq.is_finished, timeout=rq_poll_timeout, backoff_factor=2) assert is_finished async def test_pre_existing_request_with_user_data( request_queue_apify: RequestQueue, apify_client_async: ApifyClientAsync, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: """Test that pre-existing requests with user data are fully fetched. list_head does not return user data, so we need to test that fetching unknown requests is not relying on it.""" - rq_access_mode = request.node.callspec.params.get('request_queue_apify') custom_data = {'key': 'value'} rq = request_queue_apify @@ -1140,7 +1154,7 @@ async def test_pre_existing_request_with_user_data( await rq_client.add_request(req.model_dump(by_alias=True)) # Fetch the request by the client under test. - request_obtained = await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode=rq_access_mode) + request_obtained = await poll_until_condition(rq.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2) assert request_obtained is not None # Test that custom_data is preserved in user_data (custom_data should be subset of obtained user_data) assert custom_data.items() <= request_obtained.user_data.items() @@ -1167,21 +1181,22 @@ async def test_force_cloud( async def test_request_queue_is_finished( request_queue_apify: RequestQueue, - request: pytest.FixtureRequest, + rq_poll_timeout: int, ) -> None: - rq_access_mode = request.node.callspec.params.get('request_queue_apify') await request_queue_apify.add_request(Request.from_url('http://example.com')) assert not await request_queue_apify.is_finished() - fetched = await call_with_exp_backoff(request_queue_apify.fetch_next_request, rq_access_mode=rq_access_mode) + fetched = await poll_until_condition( + request_queue_apify.fetch_next_request, timeout=rq_poll_timeout, backoff_factor=2 + ) assert fetched is not None assert not await request_queue_apify.is_finished(), ( 'RequestQueue should not be finished unless the request is marked as handled.' ) await request_queue_apify.mark_request_as_handled(fetched) - assert await call_with_exp_backoff(request_queue_apify.is_finished, rq_access_mode=rq_access_mode) + assert await poll_until_condition(request_queue_apify.is_finished, timeout=rq_poll_timeout, backoff_factor=2) async def test_request_queue_deduplication_unprocessed_requests( @@ -1232,6 +1247,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1, + timeout=60, + poll_interval=5, ) Actor.log.info(stats_after) @@ -1340,6 +1357,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 1, + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == 1 @@ -1372,6 +1391,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= 2, + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == 2 @@ -1410,6 +1431,8 @@ async def _get_rq_stats() -> dict: stats_after = await poll_until_condition( _get_rq_stats, lambda s: s.get('writeCount', 0) - stats_before.get('writeCount', 0) >= len(requests), + timeout=60, + poll_interval=5, ) assert (stats_after['writeCount'] - stats_before['writeCount']) == len(requests) @@ -1467,7 +1490,7 @@ async def worker() -> int: assert total_after_workers == 20 remaining_count = 0 - while request := await call_with_exp_backoff(rq.fetch_next_request, rq_access_mode='shared'): + while request := await poll_until_condition(rq.fetch_next_request, timeout=30, backoff_factor=2): remaining_count += 1 await rq.mark_request_as_handled(request) diff --git a/tests/unit/actor/test_actor_key_value_store.py b/tests/unit/actor/test_actor_key_value_store.py index 581d775d..8e8e6589 100644 --- a/tests/unit/actor/test_actor_key_value_store.py +++ b/tests/unit/actor/test_actor_key_value_store.py @@ -1,13 +1,11 @@ from __future__ import annotations -import asyncio - import pytest from apify_shared.consts import ApifyEnvVars from crawlee._utils.file import json_dumps -from ..test_crypto import PRIVATE_KEY_PASSWORD, PRIVATE_KEY_PEM_BASE64, PUBLIC_KEY +from ..._utils import PRIVATE_KEY_PASSWORD, PRIVATE_KEY_PEM_BASE64, PUBLIC_KEY, poll_until_condition from apify import Actor from apify._consts import ENCRYPTED_JSON_VALUE_PREFIX, ENCRYPTED_STRING_VALUE_PREFIX from apify._crypto import public_encrypt @@ -119,10 +117,13 @@ async def test_use_state(monkeypatch: pytest.MonkeyPatch) -> None: state['state'] = 'first_state' - await asyncio.sleep(0.2) # Wait for the state to be persisted - + # Wait for the state to be persisted (the persist interval is 100 ms). kvs = await actor.open_key_value_store() - stored_state = await kvs.get_value('APIFY_GLOBAL_STATE') + stored_state = await poll_until_condition( + lambda: kvs.get_value('APIFY_GLOBAL_STATE'), + lambda value: value == {'state': 'first_state'}, + poll_interval=0.05, + ) assert stored_state == {'state': 'first_state'} state['state'] = 'finished_state' @@ -142,10 +143,13 @@ async def test_use_state_non_default(monkeypatch: pytest.MonkeyPatch) -> None: state['state'] = 'first_state' - await asyncio.sleep(0.2) # Wait for the state to be persisted - + # Wait for the state to be persisted (the persist interval is 100 ms). kvs = await actor.open_key_value_store(name='custom-kvs') - stored_state = await kvs.get_value('custom_state_key') + stored_state = await poll_until_condition( + lambda: kvs.get_value('custom_state_key'), + lambda value: value == {'state': 'first_state'}, + poll_interval=0.05, + ) assert stored_state == {'state': 'first_state'} state['state'] = 'finished_state' diff --git a/tests/unit/actor/test_actor_lifecycle.py b/tests/unit/actor/test_actor_lifecycle.py index 03fdd00e..c93b27a0 100644 --- a/tests/unit/actor/test_actor_lifecycle.py +++ b/tests/unit/actor/test_actor_lifecycle.py @@ -11,6 +11,7 @@ from apify_shared.consts import ActorExitCodes, ApifyEnvVars from crawlee.events._types import Event +from ..._utils import poll_until_condition from apify import Actor if TYPE_CHECKING: @@ -199,7 +200,8 @@ def on_event(event_type: Event) -> Callable: assert actor._active actor.on(Event.PERSIST_STATE, on_event(Event.PERSIST_STATE)) actor.on(Event.SYSTEM_INFO, on_event(Event.SYSTEM_INFO)) - await asyncio.sleep(1) + # Wait until both periodic events are emitted at least once (the emit interval is 100 ms). + await poll_until_condition(lambda: bool(on_persist) and bool(on_system_info), poll_interval=0.05) on_persist_count = len(on_persist) on_system_info_count = len(on_system_info) diff --git a/tests/unit/events/test_apify_event_manager.py b/tests/unit/events/test_apify_event_manager.py index eb8dd375..33e14e87 100644 --- a/tests/unit/events/test_apify_event_manager.py +++ b/tests/unit/events/test_apify_event_manager.py @@ -17,6 +17,7 @@ from apify_shared.consts import ActorEnvVars from crawlee.events._types import Event +from ..._utils import poll_until_condition from apify import Configuration from apify.events import ApifyEventManager from apify.events._types import SystemInfoEventData @@ -91,7 +92,7 @@ def event_handler(data: Any) -> None: # Test adding the handler event_manager.on(event=Event.SYSTEM_INFO, listener=handler_system_info) event_manager.emit(event=Event.SYSTEM_INFO, event_data=dummy_system_info) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: bool(event_calls[Event.SYSTEM_INFO]), poll_interval=0.05) assert event_calls[Event.SYSTEM_INFO] == [(None, dummy_system_info)] event_calls[Event.SYSTEM_INFO].clear() @@ -114,7 +115,7 @@ def event_handler(data: Any) -> None: # Test that they all work event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls[Event.PERSIST_STATE]) >= 3, poll_interval=0.05) assert set(event_calls[Event.PERSIST_STATE]) == { (1, dummy_persist_state), (2, dummy_persist_state), @@ -125,7 +126,7 @@ def event_handler(data: Any) -> None: # Test that if you remove one, the others stay event_manager.off(event=Event.PERSIST_STATE, listener=handler_persist_state_3) event_manager.emit(event=Event.PERSIST_STATE, event_data=dummy_persist_state) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls[Event.PERSIST_STATE]) >= 2, poll_interval=0.05) assert set(event_calls[Event.PERSIST_STATE]) == { (1, dummy_persist_state), (2, dummy_persist_state), @@ -211,7 +212,7 @@ def listener(data: Any) -> None: # Test sending event with data await send_platform_event(Event.SYSTEM_INFO, dummy_system_info) - await asyncio.sleep(0.1) + await poll_until_condition(lambda: len(event_calls) == 1, poll_interval=0.05) assert len(event_calls) == 1 assert event_calls[0] is not None assert event_calls[0]['cpuInfo']['usedRatio'] == 0.0845549815498155 @@ -230,7 +231,8 @@ async def handler(_data: Any) -> None: persist_state_counter += 1 event_manager.on(event=Event.PERSIST_STATE, listener=handler) - await asyncio.sleep(1.5) + # Wait until at least one PERSIST_STATE event is handled (the persist interval is 500 ms). + await poll_until_condition(lambda: persist_state_counter > 0, poll_interval=0.05) first_count = persist_state_counter assert first_count > 0 @@ -275,7 +277,7 @@ async def test_unknown_event_is_logged(monkeypatch: pytest.MonkeyPatch, caplog: # Send an unknown event unknown_message = json.dumps({'name': 'totallyNewEvent2099', 'data': {'foo': 'bar'}}) websockets.broadcast(connected_ws_clients, unknown_message) - await asyncio.sleep(0.2) + await poll_until_condition(lambda: 'Unknown message received' in caplog.text, poll_interval=0.05) assert 'Unknown message received' in caplog.text assert 'totallyNewEvent2099' in caplog.text @@ -307,7 +309,10 @@ def migrating_listener(data: Any) -> None: # Send migrating event migrating_message = json.dumps({'name': 'migrating'}) websockets.broadcast(connected_ws_clients, migrating_message) - await asyncio.sleep(0.2) + await poll_until_condition( + lambda: bool(migrating_calls) and any(getattr(c, 'is_migrating', False) for c in persist_calls), + poll_interval=0.05, + ) assert len(migrating_calls) == 1 # MIGRATING should also trigger a PERSIST_STATE with is_migrating=True @@ -361,6 +366,6 @@ async def test_malformed_message_logs_exception( # Send malformed message websockets.broadcast(connected_ws_clients, 'this is not valid json{{{') - await asyncio.sleep(0.2) + await poll_until_condition(lambda: 'Cannot parse Actor event' in caplog.text, poll_interval=0.05) assert 'Cannot parse Actor event' in caplog.text diff --git a/tests/unit/test_crypto.py b/tests/unit/test_crypto.py index 88dbc03c..c1e283f2 100644 --- a/tests/unit/test_crypto.py +++ b/tests/unit/test_crypto.py @@ -4,6 +4,7 @@ import pytest +from .._utils import PRIVATE_KEY, PUBLIC_KEY from apify._crypto import ( _load_public_key, create_hmac_signature, @@ -14,14 +15,6 @@ public_encrypt, ) -# NOTE: Uses the same keys as in: -# https://github.com/apify/apify-shared-js/blob/master/test/crypto.test.ts -PRIVATE_KEY_PEM_BASE64 = 'LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpQcm9jLVR5cGU6IDQsRU5DUllQVEVECkRFSy1JbmZvOiBERVMtRURFMy1DQkMsNTM1QURERjIzNUQ4QkFGOQoKMXFWUzl0S0FhdkVhVUVFMktESnpjM3plMk1lZkc1dmVEd2o1UVJ0ZkRaMXdWNS9VZmIvcU5sVThTSjlNaGhKaQp6RFdrWExueUUzSW0vcEtITVZkS0czYWZkcFRtcis2TmtidXptd0dVMk0vSWpzRjRJZlpad0lGbGJoY09jUnp4CmZmWVIvTlVyaHNrS1RpNGhGV0lBUDlLb3Z6VDhPSzNZY3h6eVZQWUxYNGVWbWt3UmZzeWkwUU5Xb0tGT3d0ZC8KNm9HYzFnd2piRjI5ZDNnUThZQjFGWmRLa1AyMTJGbkt1cTIrUWgvbE1zTUZrTHlTQTRLTGJ3ZG1RSXExbE1QUwpjbUNtZnppV3J1MlBtNEZoM0dmWlQyaE1JWHlIRFdEVzlDTkxKaERodExOZ2RRamFBUFpVT1E4V2hwSkE5MS9vCjJLZzZ3MDd5Z2RCcVd5dTZrc0pXcjNpZ1JpUEJ5QmVNWEpEZU5HY3NhaUZ3Q2c5eFlja1VORXR3NS90WlRsTjIKSEdZV0NpVU5Ed0F2WllMUHR1SHpIOFRFMGxsZm5HR0VuVC9QQlp1UHV4andlZlRleE1mdzFpbGJRU3lkcy9HMgpOOUlKKzkydms0N0ZXR2NOdGh1Q3lCbklva0NpZ0c1ZlBlV2IwQTdpdjk0UGtwRTRJZ3plc0hGQ0ZFQWoxWldLCnpQdFRBQlkwZlJrUzBNc3UwMHYxOXloTTUrdFUwYkVCZWo2eWpzWHRoYzlwS01hcUNIZWlQTC9TSHRkaWsxNVMKQmU4Sml4dVJxZitUeGlYWWVuNTg2aDlzTFpEYzA3cGpkUGp2NVNYRnBYQjhIMlVxQ0tZY2p4R3RvQWpTV0pjWApMNHc3RHNEby80bVg1N0htR09iamlCN1ZyOGhVWEJDdFh2V0dmQXlmcEFZNS9vOXowdm4zREcxaDc1NVVwdDluCkF2MFZrbm9qcmJVYjM1ZlJuU1lYTVltS01LSnpNRlMrdmFvRlpwV0ZjTG10cFRWSWNzc0JGUEYyZEo3V1c0WHMKK0d2Vkl2eFl3S2wyZzFPTE1TTXRZa09vekdlblBXTzdIdU0yMUVKVGIvbHNEZ25GaTkrYWRGZHBLY3R2cm0zdgpmbW1HeG5pRmhLU05GU0xtNms5YStHL2pjK3NVQVBhb2FZNEQ3NHVGajh0WGp0eThFUHdRRGxVUGRVZld3SE9PClF3bVgyMys1REh4V0VoQy91Tm8yNHNNY2ZkQzFGZUpBV281bUNuVU5vUVVmMStNRDVhMzNJdDhhMmlrNUkxUWoKeSs1WGpRaG0xd3RBMWhWTWE4aUxBR0toT09lcFRuK1VBZHpyS0hvNjVtYzNKbGgvSFJDUXJabnVxWkErK0F2WgpjeWU0dWZGWC8xdmRQSTdLb2Q0MEdDM2dlQnhweFFNYnp1OFNUcGpOcElJRkJvRVc5dFRhemUzeHZXWnV6dDc0CnFjZS8xWURuUHBLeW5lM0xGMk94VWoyYWVYUW5YQkpYcGhTZTBVTGJMcWJtUll4bjJKWkl1d09RNHV5dm94NjUKdG9TWGNac054dUs4QTErZXNXR3JSN3pVc0djdU9QQTFERE9Ja2JjcGtmRUxMNjk4RTJRckdqTU9JWnhrcWdxZQoySE5VNktWRmV2NzdZeEJDbm1VcVdXZEhYMjcyU2NPMUYzdWpUdFVnRVBNWGN0aEdBckYzTWxEaUw1Q0k0RkhqCnhHc3pVemxzalRQTmpiY2MzdUE2MjVZS3VVZEI2c1h1Rk5NUHk5UDgwTzBpRWJGTXl3MWxmN2VpdFhvaUUxWVoKc3NhMDVxTUx4M3pPUXZTLzFDdFpqaFp4cVJMRW5pQ3NWa2JVRlVYclpodEU4dG94bGpWSUtpQ25qbitORmtqdwo2bTZ1anpBSytZZHd2Nk5WMFB4S0gwUk5NYVhwb1lmQk1oUmZ3dGlaS3V3Y2hyRFB5UEhBQ2J3WXNZOXdtUE9rCnpwdDNxWi9JdDVYTmVqNDI0RzAzcGpMbk1sd1B1T1VzYmFQUWQ2VHU4TFhsckZReUVjTXJDNHdjUTA1SzFVN3kKM1NNN3RFaTlnbjV3RjY1YVI5eEFBR0grTUtMMk5WNnQrUmlTazJVaWs1clNmeDE4Mk9wYmpSQ2grdmQ4UXhJdwotLS0tLUVORCBSU0EgUFJJVkFURSBLRVktLS0tLQo=' # noqa: E501 -PRIVATE_KEY_PASSWORD = 'pwd1234' -PUBLIC_KEY_PEM_BASE64 = 'LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0dis3NlNXbklhOFFKWC94RUQxRQpYdnBBQmE3ajBnQnVYenJNUU5adjhtTW1RU0t2VUF0TmpOL2xacUZpQ0haZUQxU2VDcGV1MnFHTm5XbGRxNkhUCnh5cXJpTVZEbFNKaFBNT09QSENISVNVdFI4Tk5lR1Y1MU0wYkxJcENabHcyTU9GUjdqdENWejVqZFRpZ1NvYTIKQWxrRUlRZWQ4UVlDKzk1aGJoOHk5bGcwQ0JxdEdWN1FvMFZQR2xKQ0hGaWNuaWxLVFFZay9MZzkwWVFnUElPbwozbUppeFl5bWFGNmlMZTVXNzg1M0VHWUVFVWdlWmNaZFNjaGVBMEdBMGpRSFVTdnYvMEZjay9adkZNZURJOTVsCmJVQ0JoQjFDbFg4OG4wZUhzUmdWZE5vK0NLMDI4T2IvZTZTK1JLK09VaHlFRVdPTi90alVMdGhJdTJkQWtGcmkKOFFJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==' # noqa: E501 -PRIVATE_KEY = load_private_key(PRIVATE_KEY_PEM_BASE64, PRIVATE_KEY_PASSWORD) -PUBLIC_KEY = _load_public_key(PUBLIC_KEY_PEM_BASE64) - def test_encrypt_decrypt_various_strings() -> None: for value in [