From c612c0b81a204e4376bb77389eae8edd7b8d0747 Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Thu, 5 Mar 2026 14:55:22 +0000 Subject: [PATCH 1/7] feat: configurable e2e test cases --- tests/functional/e2e/test_classify_single.py | 4 ++-- tests/functional/e2e/testcases/parser.py | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/functional/e2e/test_classify_single.py b/tests/functional/e2e/test_classify_single.py index 06d26a4..9af4cb0 100644 --- a/tests/functional/e2e/test_classify_single.py +++ b/tests/functional/e2e/test_classify_single.py @@ -11,10 +11,10 @@ from resolver_athena_client.client.models import ImageData from tests.functional.e2e.testcases.parser import ( AthenaTestCase, - load_test_cases, + load_test_cases_by_env, ) -TEST_CASES = load_test_cases("integrator_sample") +TEST_CASES = load_test_cases_by_env() FP_ERROR_TOLERANCE = 1e-4 diff --git a/tests/functional/e2e/testcases/parser.py b/tests/functional/e2e/testcases/parser.py index 80d7901..fd81217 100644 --- a/tests/functional/e2e/testcases/parser.py +++ b/tests/functional/e2e/testcases/parser.py @@ -1,4 +1,5 @@ import json +import os from pathlib import Path # Path to the shared testcases directory in athena-protobufs @@ -23,6 +24,12 @@ def __init__( self.classification_labels: list[str] = classification_labels +def load_test_cases_by_env() -> list[AthenaTestCase]: + return load_test_cases( + os.getenv("ATHENA_E2E_TESTCASE_DIR", "integrator_sample") + ) + + def load_test_cases(dirname: str = "benign_model") -> list[AthenaTestCase]: with Path.open( Path(TESTCASES_DIR / dirname / "expected_outputs.json"), From 03f62ed34544b8c3f9acdaab1ba076ade4887cf2 Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Thu, 5 Mar 2026 15:08:51 +0000 Subject: [PATCH 2/7] test: load dotenv before loading e2e test cases like other fixtures --- tests/functional/e2e/testcases/parser.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/functional/e2e/testcases/parser.py b/tests/functional/e2e/testcases/parser.py index fd81217..3d2275a 100644 --- a/tests/functional/e2e/testcases/parser.py +++ b/tests/functional/e2e/testcases/parser.py @@ -2,6 +2,8 @@ import os from pathlib import Path +from dotenv import load_dotenv + # Path to the shared testcases directory in athena-protobufs _REPO_ROOT = Path(__file__).parent.parent.parent.parent.parent TESTCASES_DIR = _REPO_ROOT / "athena-protobufs" / "testcases" @@ -25,6 +27,7 @@ def __init__( def load_test_cases_by_env() -> list[AthenaTestCase]: + _ = load_dotenv() return load_test_cases( os.getenv("ATHENA_E2E_TESTCASE_DIR", "integrator_sample") ) From 541957a8101004b7af6d5d5d6bccbfc561f64b9c Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Thu, 5 Mar 2026 15:38:51 +0000 Subject: [PATCH 3/7] doc: update docs to include information about e2e testing --- README.md | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9a2d35a..bac9b7c 100644 --- a/README.md +++ b/README.md @@ -162,6 +162,8 @@ ATHENA_NON_EXISTENT_AFFILIATE=non-existent-affiliate-id (default: thisaffiliatedoesnotexist123) - this is used to test error handling. ATHENA_NON_PERMITTED_AFFILIATE=non-permitted-affiliate-id (default: thisaffiliatedoesnothaveathenaenabled) - this is used to test error handling. +ATHENA_E2E_TESTCASE_DIR=test-case-directory (default: integrator_sample) - this is the test case directory to use for the e2e tests. +See E2E Tests section below for more details. ``` Then run the functional tests with: @@ -170,8 +172,18 @@ Then run the functional tests with: pytest -m functional ``` -To exclude the e2e tests, which require usage of the live classifier and -therefore are unsuitable for regular development runs, use: +#### E2E Tests + +The e2e tests assert that the API returns some expected _scores_ rather than +exercising different API paths. As such, they are dependent on the classifier +that you are calling through the API. Right now, there are 2 types of +classifier, benign and live. By default, the tests will run the +`integrator_sample` test set, which uses the live classifier. If you wish to +use the benign classifier instead, you may set the `ATHENA_E2E_TESTCASE_DIR` +environment variable to `benign_model`. + +Alternatively, you may disable these tests altogether, by excluding tests that +have the `e2e` marker, something like this: ```bash pytest -m 'functional and not e2e' From fa14ec997fd990b393aeae69067ccaa8660749c5 Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Fri, 6 Mar 2026 16:48:46 +0000 Subject: [PATCH 4/7] perf: streaming connection for e2e test cases --- tests/functional/conftest.py | 90 +++++++++++++++++++- tests/functional/e2e/test_classify_single.py | 65 ++++++-------- 2 files changed, 115 insertions(+), 40 deletions(-) diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index 4d58a8c..a00dde4 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -1,19 +1,31 @@ import os import uuid +from asyncio import Future, Queue, Task, create_task +from collections.abc import AsyncIterator +from copy import deepcopy import cv2 as cv import numpy as np import pytest import pytest_asyncio from dotenv import load_dotenv +from grpc.aio import Channel +from resolver_athena_client.client.athena_client import AthenaClient from resolver_athena_client.client.athena_options import AthenaOptions -from resolver_athena_client.client.channel import CredentialHelper +from resolver_athena_client.client.channel import ( + CredentialHelper, + create_channel_with_credentials, +) from resolver_athena_client.client.consts import ( EXPECTED_HEIGHT, EXPECTED_WIDTH, MAX_DEPLOYMENT_ID_LENGTH, ) +from resolver_athena_client.client.models.input_model import ImageData +from resolver_athena_client.generated.athena.models_pb2 import ( + ClassificationOutput, +) def _create_base_test_image_opencv(width: int, height: int) -> np.ndarray: @@ -79,7 +91,7 @@ async def credential_helper() -> CredentialHelper: ) -@pytest.fixture +@pytest.fixture(scope="session") def athena_options() -> AthenaOptions: _ = load_dotenv() host = os.getenv("ATHENA_HOST", "localhost") @@ -99,6 +111,7 @@ def athena_options() -> AthenaOptions: timeout=120.0, # Maximum duration, not forced timeout keepalive_interval=30.0, # Longer intervals for persistent streams affiliate=affiliate, + compression_quality=2, ) @@ -144,3 +157,76 @@ def valid_formatted_image( _ = f.write(image_bytes) return image_bytes + + +class StreamingSender: + """Helper class to provide a single-send-like interface with speed + + The class provides a 'send' method that can be passed an imagedata and will + send it along a stream, and collect all results into an internal buffer. + + The 'send' method will asynchronously wait for the result and return it, + providing an interface that mimics a single request-response call, while + under the hood it is using a streaming connection for speed. + """ + + def __init__(self, grpc_channel: Channel, options: AthenaOptions) -> None: + self._results: list[ClassificationOutput] = [] + self._request_queue: Queue[ImageData] = Queue() + self._pending_results: dict[str, Future[ClassificationOutput]] = {} + + # tests are run in series, so we gain nothing here from waiting for a + # batch that will never fill, so just send it immediately for better + # latency + streaming_options = deepcopy(options) + streaming_options.max_batch_size = 1 + + self._run_task: Task[None] = create_task( + self._run(grpc_channel, streaming_options) + ) + + async def _run(self, grpc_channel: Channel, options: AthenaOptions) -> None: + async with AthenaClient(grpc_channel, options) as client: + generator = self._send_from_queue() + responses = client.classify_images(generator) + async for response in responses: + for output in response.outputs: + if output.correlation_id in self._pending_results: + future = self._pending_results.pop( + output.correlation_id + ) + future.set_result(output) + self._results.append(output) + + async def _send_from_queue(self) -> AsyncIterator[ImageData]: + """Async generator to yield requests from the queue.""" + while True: + if image_data := await self._request_queue.get(): + yield image_data + self._request_queue.task_done() + + async def send(self, image_data: ImageData) -> ClassificationOutput: + """Send an image and wait for the corresponding result.""" + if self._run_task.done(): + self._run_task.result() + + if image_data.correlation_id is None: + image_data.correlation_id = str(uuid.uuid4()) + future: Future[ClassificationOutput] = Future() + self._pending_results[image_data.correlation_id] = future + + await self._request_queue.put(image_data) + + return await future + + +@pytest_asyncio.fixture(scope="session", loop_scope="session") +async def streaming_sender( + athena_options: AthenaOptions, credential_helper: CredentialHelper +) -> StreamingSender: + """Fixture to provide a helper for sending over a streaming connection.""" + # Create gRPC channel with credentials + channel = await create_channel_with_credentials( + athena_options.host, credential_helper + ) + return StreamingSender(channel, athena_options) diff --git a/tests/functional/e2e/test_classify_single.py b/tests/functional/e2e/test_classify_single.py index 9af4cb0..e302747 100644 --- a/tests/functional/e2e/test_classify_single.py +++ b/tests/functional/e2e/test_classify_single.py @@ -2,13 +2,8 @@ import pytest -from resolver_athena_client.client.athena_client import AthenaClient -from resolver_athena_client.client.athena_options import AthenaOptions -from resolver_athena_client.client.channel import ( - CredentialHelper, - create_channel_with_credentials, -) from resolver_athena_client.client.models import ImageData +from tests.functional.conftest import StreamingSender from tests.functional.e2e.testcases.parser import ( AthenaTestCase, load_test_cases_by_env, @@ -19,13 +14,12 @@ FP_ERROR_TOLERANCE = 1e-4 -@pytest.mark.asyncio +@pytest.mark.asyncio(loop_scope="session") @pytest.mark.functional @pytest.mark.e2e @pytest.mark.parametrize("test_case", TEST_CASES, ids=lambda tc: tc.id) -async def test_classify_single( - athena_options: AthenaOptions, - credential_helper: CredentialHelper, +async def test_e2e_case( + streaming_sender: StreamingSender, test_case: AthenaTestCase, ) -> None: """Functional test for ClassifySingle endpoint and API methods. @@ -34,38 +28,33 @@ async def test_classify_single( """ - # Create gRPC channel with credentials - channel = await create_channel_with_credentials( - athena_options.host, credential_helper - ) with Path.open(Path(test_case.filepath), "rb") as f: image_bytes = f.read() - async with AthenaClient(channel, athena_options) as client: - image_data = ImageData(image_bytes) + image_data = ImageData(image_bytes) - # Classify with auto-generated correlation ID - result = await client.classify_single(image_data) + # Classify with auto-generated correlation ID + result = await streaming_sender.send(image_data) - if result.error.code: - msg = f"Image Result Error: {result.error.message}" - pytest.fail(msg) + if result.error.code: + msg = f"Image Result Error: {result.error.message}" + pytest.fail(msg) - actual_output = {c.label: c.weight for c in result.classifications} - assert set(test_case.expected_output.keys()).issubset( - set(actual_output.keys()) - ), ( - "Expected output to contain labels: ", - f"{test_case.expected_output.keys() - actual_output.keys()}", + actual_output = {c.label: c.weight for c in result.classifications} + assert set(test_case.expected_output.keys()).issubset( + set(actual_output.keys()) + ), ( + "Expected output to contain labels: ", + f"{test_case.expected_output.keys() - actual_output.keys()}", + ) + actual_output = {k: actual_output[k] for k in test_case.expected_output} + + for label in test_case.expected_output: + expected = test_case.expected_output[label] + actual = actual_output[label] + diff = abs(expected - actual) + assert diff < FP_ERROR_TOLERANCE, ( + f"Weight for label '{label}' differs by more than " + f"{FP_ERROR_TOLERANCE}: expected={expected}, actual={actual}, " + f"diff={diff}" ) - actual_output = {k: actual_output[k] for k in test_case.expected_output} - - for label in test_case.expected_output: - expected = test_case.expected_output[label] - actual = actual_output[label] - diff = abs(expected - actual) - assert diff < FP_ERROR_TOLERANCE, ( - f"Weight for label '{label}' differs by more than " - f"{FP_ERROR_TOLERANCE}: expected={expected}, actual={actual}, " - f"diff={diff}" - ) From 3cac70bac6423fd179029a6b98ebe1aaa2cb7819 Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Fri, 6 Mar 2026 17:09:55 +0000 Subject: [PATCH 5/7] test: athena_options is function scoped again --- tests/functional/conftest.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index a00dde4..d1644cb 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -91,8 +91,7 @@ async def credential_helper() -> CredentialHelper: ) -@pytest.fixture(scope="session") -def athena_options() -> AthenaOptions: +def _load_options() -> AthenaOptions: _ = load_dotenv() host = os.getenv("ATHENA_HOST", "localhost") @@ -115,6 +114,11 @@ def athena_options() -> AthenaOptions: ) +@pytest.fixture +def athena_options() -> AthenaOptions: + return _load_options() + + @pytest.fixture(scope="session", params=SUPPORTED_TEST_FORMATS) def valid_formatted_image( request: pytest.FixtureRequest, @@ -222,11 +226,12 @@ async def send(self, image_data: ImageData) -> ClassificationOutput: @pytest_asyncio.fixture(scope="session", loop_scope="session") async def streaming_sender( - athena_options: AthenaOptions, credential_helper: CredentialHelper + credential_helper: CredentialHelper, ) -> StreamingSender: """Fixture to provide a helper for sending over a streaming connection.""" # Create gRPC channel with credentials + opts = _load_options() channel = await create_channel_with_credentials( - athena_options.host, credential_helper + opts.host, credential_helper ) - return StreamingSender(channel, athena_options) + return StreamingSender(channel, opts) From 617a5fe5a1d2eec8211fef1e0fdf808d07dc746d Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Fri, 6 Mar 2026 17:11:10 +0000 Subject: [PATCH 6/7] test: remove results buffer --- tests/functional/conftest.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index d1644cb..23552b7 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -175,7 +175,6 @@ class StreamingSender: """ def __init__(self, grpc_channel: Channel, options: AthenaOptions) -> None: - self._results: list[ClassificationOutput] = [] self._request_queue: Queue[ImageData] = Queue() self._pending_results: dict[str, Future[ClassificationOutput]] = {} @@ -200,7 +199,6 @@ async def _run(self, grpc_channel: Channel, options: AthenaOptions) -> None: output.correlation_id ) future.set_result(output) - self._results.append(output) async def _send_from_queue(self) -> AsyncIterator[ImageData]: """Async generator to yield requests from the queue.""" From a453b67cb3d6a38cfa8c199e89b29cf546a04eee Mon Sep 17 00:00:00 2001 From: anna-singleton-resolver Date: Fri, 6 Mar 2026 17:13:08 +0000 Subject: [PATCH 7/7] test: always create futures in the correct event loop --- tests/functional/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index 23552b7..2f7d4a6 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -1,3 +1,4 @@ +import asyncio import os import uuid from asyncio import Future, Queue, Task, create_task @@ -214,7 +215,7 @@ async def send(self, image_data: ImageData) -> ClassificationOutput: if image_data.correlation_id is None: image_data.correlation_id = str(uuid.uuid4()) - future: Future[ClassificationOutput] = Future() + future = asyncio.get_event_loop().create_future() self._pending_results[image_data.correlation_id] = future await self._request_queue.put(image_data)