From fa6fb681562954eb7a1470b0b121528e9ba4194d Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Fri, 13 Feb 2026 18:39:19 -0500 Subject: [PATCH 1/7] feat: Support Arize for Telemtry --- .gitignore | 5 +- .specify/memory/constitution.md | 164 ++++++++++++ .vscode/launch.json | 3 +- .vscode/settings.json | 19 +- .../trace_calls_with_open_telemetry.md | 12 +- .../Qtype Server/add_feedback_buttons.md | 4 +- .../trace_with_arize.qtype.yaml | 56 ++++ ...ype.yaml => trace_with_phoenix.qtype.yaml} | 0 pyproject.toml | 3 +- qtype/dsl/model.py | 2 +- qtype/interpreter/feedback_api.py | 77 +++++- qtype/interpreter/telemetry.py | 115 ++++++++- qtype/semantic/model.py | 2 +- schema/qtype.schema.json | 3 +- tests/interpreter/test_telemetry_arize.py | 243 ++++++++++++++++++ tests/semantic/test_feedback_validation.py | 41 +++ uv.lock | 61 +++++ 17 files changed, 791 insertions(+), 19 deletions(-) create mode 100644 .specify/memory/constitution.md create mode 100644 examples/observability_debugging/trace_with_arize.qtype.yaml rename examples/observability_debugging/{trace_with_opentelemetry.qtype.yaml => trace_with_phoenix.qtype.yaml} (100%) create mode 100644 tests/interpreter/test_telemetry_arize.py diff --git a/.gitignore b/.gitignore index 7a9ad3ca..20d5fdb4 100644 --- a/.gitignore +++ b/.gitignore @@ -225,4 +225,7 @@ ignored qdrant_storage -.qtype-cache \ No newline at end of file +.qtype-cache + +# SSL certificates (combined bundle with corporate certs) +certs/*.pem \ No newline at end of file diff --git a/.specify/memory/constitution.md b/.specify/memory/constitution.md new file mode 100644 index 00000000..07849eb3 --- /dev/null +++ b/.specify/memory/constitution.md @@ -0,0 +1,164 @@ + + +# QType Constitution + +## Core Principles + +### I. Code Quality + +All code MUST pass the project's automated quality gates before +merge. This is non-negotiable. + +- Every function, class, and module MUST have type hints and + docstrings. +- All Python code MUST pass `ruff`, `ty`, and `isort` checks with + zero violations. +- Logging MUST be used instead of print statements for all + debug, info, and error output. +- Catch specific exceptions — bare `except` clauses are forbidden. +- Tests MUST accompany new functionality; untested code is + incomplete code. + +**Rationale**: Automated enforcement eliminates style debates and +catches defects early. Consistent quality makes the codebase +navigable by any contributor. + +### II. Clean Design Patterns & Abstractions + +Architecture MUST follow the established layered dependency flow: +CLI → Application → Interpreter → Semantic → DSL → Base. + +- Each layer MUST only import from layers below it; never upward + or sideways. +- Classes and functions MUST have a single, clear responsibility. +- Use composition over inheritance; prefer small, focused + interfaces. +- Data structures MUST use Pydantic `BaseModel` for validation + and serialization. +- Shared utilities belong in the Base layer; domain logic belongs + in its respective layer. + +**Rationale**: Strict layering prevents entanglement and makes each +component independently testable and replaceable. + +### III. Concise, Minimal & Readable Code + +Code MUST communicate intent clearly with the least amount of +syntax necessary. + +- Prefer explicit over implicit — but never verbose over concise. +- Use f-strings, comprehensions, and pattern matching where they + improve clarity. +- Lines MUST NOT exceed 79 characters unless breaking would harm + readability. +- Functions SHOULD fit on a single screen (~30 lines); extract + when they grow beyond that. +- Comments MUST explain *why*, not *what* — the code itself MUST + explain the what. +- Remove dead code immediately; do not comment it out. + +**Rationale**: Code is read far more often than it is written. +Minimalism reduces cognitive load and accelerates onboarding. + +### IV. Async-First Execution + +All I/O-bound operations MUST use asynchronous execution via +`async`/`await`. + +- Network calls, file I/O, and external process invocations MUST + be async. +- Use `asyncio` primitives (`gather`, `TaskGroup`, semaphores) + for concurrency control. +- CPU-bound work SHOULD remain synchronous unless it blocks an + event loop, in which case offload to a thread/process executor. +- Async functions MUST NOT call blocking synchronous I/O without + wrapping in `asyncio.to_thread`. +- Prefer structured concurrency (`TaskGroup`) over bare + `create_task` for proper error propagation. + +**Rationale**: QType orchestrates LLM calls, tool invocations, and +document retrieval — all inherently I/O-bound. Async execution +maximizes throughput without thread complexity. + +### V. Simplicity — YAGNI, DRY, No Over-Engineering + +Build only what is needed *right now*. Eliminate duplication. +Resist speculative abstractions. + +- Do NOT add code, parameters, or abstractions for hypothetical + future requirements (YAGNI). +- Extract shared logic into a single source of truth when it + appears in two or more places (DRY). +- Prefer flat, straightforward control flow over clever patterns. +- Optimization MUST be driven by measured bottlenecks, never by + assumption. +- If a simpler solution solves the problem, it is the correct + solution — complexity MUST be justified. + +**Rationale**: Over-engineering is the leading cause of +unmaintainable code. Simplicity keeps velocity high and defect +rates low. + +## Technology Stack & Standards + +- **Language**: Python 3.10 — use all 3.10 features (union `|` + types, `match`/`case`, built-in generics). +- **Package manager**: `uv` — all commands run via `uv run`. +- **Linting**: `ruff` (line-length 79, target py310). +- **Type checking**: `ty` with strict annotations. +- **Import sorting**: `isort` with standard/third-party/local + grouping. +- **Data models**: Pydantic `BaseModel` for all structured data. +- **UI framework**: React + TypeScript with shadcn component + library (functional components, hooks only). +- **Testing**: `pytest` via `uv run pytest`. + +## Development Workflow + +- All changes MUST be validated against `ruff`, `ty`, and `isort` + before committing. +- Use feature branches; merge only after CI passes. +- Code review MUST verify adherence to these principles. +- Follow the project's copilot-instructions for detailed style + rules (`.github/copilot-instructions.md`). +- Use `logging` for runtime diagnostics; structured logs + preferred. + +## Governance + +This constitution is the authoritative source of engineering +standards for QType. It supersedes informal conventions and +personal preferences. + +- **Amendments** require documentation of the change, rationale, + and a version bump to this file. +- **Versioning** follows semantic versioning: + - MAJOR: principle removal or incompatible redefinition. + - MINOR: new principle or materially expanded guidance. + - PATCH: clarifications, wording, or typo fixes. +- **Compliance** is verified during code review; every PR MUST + be checked against these principles. +- **Complexity justification**: any deviation from Principle V + MUST be documented inline with a rationale. + +**Version**: 1.0.0 | **Ratified**: 2026-02-13 | **Last Amended**: 2026-02-13 diff --git a/.vscode/launch.json b/.vscode/launch.json index fa478206..d42f8511 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,7 +11,8 @@ "cwd": "${workspaceFolder}", "justMyCode": false, "env": { - "PYTHONPATH": "${workspaceFolder}" + "PYTHONPATH": "${workspaceFolder}", + "GRPC_DEFAULT_SSL_ROOTS_FILE_PATH": "${workspaceFolder}/certs/combined-cacerts.pem" }, "envFile": "${workspaceFolder}/.env" }, diff --git a/.vscode/settings.json b/.vscode/settings.json index c1082dad..978c35f7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,8 +5,10 @@ } ], "yaml.schemaStore.enable": false, - "yaml.schemas" : { - "./schema/qtype.schema.json": ["*.qtype.yaml"] + "yaml.schemas": { + "./schema/qtype.schema.json": [ + "*.qtype.yaml" + ] }, "python.testing.pytestArgs": [ "tests" @@ -19,5 +21,16 @@ "envManager": "ms-python.python:venv", "packageManager": "ms-python.python:pip" } - ] + ], + "chat.promptFilesRecommendations": { + "speckit.constitution": true, + "speckit.specify": true, + "speckit.plan": true, + "speckit.tasks": true, + "speckit.implement": true + }, + "chat.tools.terminal.autoApprove": { + ".specify/scripts/bash/": true, + ".specify/scripts/powershell/": true + } } diff --git a/docs/How To/Observability & Debugging/trace_calls_with_open_telemetry.md b/docs/How To/Observability & Debugging/trace_calls_with_open_telemetry.md index ceb8339a..3f07a187 100644 --- a/docs/How To/Observability & Debugging/trace_calls_with_open_telemetry.md +++ b/docs/How To/Observability & Debugging/trace_calls_with_open_telemetry.md @@ -15,7 +15,7 @@ telemetry: - **telemetry**: Top-level application configuration for observability - **id**: Unique identifier for the telemetry sink -- **provider**: Telemetry backend (`Phoenix` or `Langfuse`) +- **provider**: Telemetry backend (`Phoenix`, `Arize`, or `Langfuse`) - **endpoint**: URL where OpenTelemetry traces are sent ### Starting Phoenix @@ -31,17 +31,23 @@ Phoenix will start on `http://localhost:6006` where you can view traces and span ## Complete Example ```yaml ---8<-- "../examples/observability_debugging/trace_with_opentelemetry.qtype.yaml" +--8<-- "../examples/observability_debugging/trace_with_phoenix.qtype.yaml" ``` Run the example: ```bash -qtype run examples/observability_debugging/trace_with_opentelemetry.qtype.yaml --text "I love this product!" +qtype run examples/observability_debugging/trace_with_phoenix.qtype.yaml --text "I love this product!" ``` Then open `http://localhost:6006` in your browser to see the traced execution. +## Complete Example for Arize Cloud + +```yaml +--8<-- "../examples/observability_debugging/trace_with_arize.qtype.yaml" +``` + ## See Also - [Application Reference](../../components/Application.md) diff --git a/docs/How To/Qtype Server/add_feedback_buttons.md b/docs/How To/Qtype Server/add_feedback_buttons.md index 39d45495..89e94d56 100644 --- a/docs/How To/Qtype Server/add_feedback_buttons.md +++ b/docs/How To/Qtype Server/add_feedback_buttons.md @@ -2,6 +2,8 @@ Collect user feedback (thumbs, ratings, or categories) directly in the QType UI by adding a `feedback` block to your flow. Feedback submission requires `telemetry` to be enabled so QType can attach the feedback to traces/spans. +> **Note**: Feedback is only supported for **Conversational** interfaces and flows accessed via the **REST endpoint** in the UI. Feedback is not available for **Complete** interfaces. + ### QType YAML ```yaml @@ -22,7 +24,7 @@ telemetry: ### Explanation -- **flows[].feedback**: Enables a feedback widget on the flow’s outputs in the UI. +- **flows[].feedback**: Enables a feedback widget on the flow’s outputs in the UI.- **flows[].interface.type**: Must be `Conversational` for feedback to work in the streaming UI. For flows without an interface (or with `Complete` interface), use the REST endpoint tab in the UI instead of the streaming tab. - **feedback.type**: Feedback widget type: `thumbs`, `rating`, or `category`. - **feedback.explanation**: If `true`, prompts the user for an optional text explanation along with their feedback. - **rating.scale**: For `rating` feedback, sets the maximum score (typically `5` or `10`). diff --git a/examples/observability_debugging/trace_with_arize.qtype.yaml b/examples/observability_debugging/trace_with_arize.qtype.yaml new file mode 100644 index 00000000..802af1b2 --- /dev/null +++ b/examples/observability_debugging/trace_with_arize.qtype.yaml @@ -0,0 +1,56 @@ +id: arize_trace_example +description: Example of tracing QType application calls with OpenTelemetry to Arize Cloud + +auths: + - id: arize-auth + type: api_key + api_key: ${ARIZE_API_KEY} + + - id: bedrock-auth + type: aws + region: us-east-1 + +models: + - type: Model + id: nova + provider: aws-bedrock + model_id: amazon.nova-lite-v1:0 + auth: bedrock-auth + inference_params: + temperature: 0.7 + max_tokens: 512 + +flows: + - type: Flow + id: classify_text + feedback: + type: thumbs + explanation: true + variables: + - id: text + type: text + - id: response + type: text + inputs: + - text + outputs: + - response + steps: + - id: classify + type: LLMInference + model: nova + system_message: "Classify the following text as positive, negative, or neutral. Respond with only one word." + inputs: + - text + outputs: + - response + +telemetry: + id: arize-telemetry + provider: Arize + endpoint: https://otlp.arize.com/v1/traces + auth: arize-auth + args: + space_id: ${ARIZE_SPACE_ID} + project_name: qtype-classify-example + diff --git a/examples/observability_debugging/trace_with_opentelemetry.qtype.yaml b/examples/observability_debugging/trace_with_phoenix.qtype.yaml similarity index 100% rename from examples/observability_debugging/trace_with_opentelemetry.qtype.yaml rename to examples/observability_debugging/trace_with_phoenix.qtype.yaml diff --git a/pyproject.toml b/pyproject.toml index fbe32fdb..d3ef4332 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,9 @@ Homepage = "https://github.com/bazaarvoice/qtype" [project.optional-dependencies] interpreter = [ "aiostream>=0.7.1", + "arize>=8.0.0", "arize-phoenix-otel>=0.12.1", + "arize-otel>=0.11.0", "boto3>=1.34.0", "datasets>=4.4.1", "diskcache>=5.6.3", @@ -101,7 +103,6 @@ docs = [ "mkdocs>=1.5.0", "mkdocs-material>=9.0.0", ] - [tool.uv] # Install dev dependencies by default when running uv sync default-groups = ["dev"] diff --git a/qtype/dsl/model.py b/qtype/dsl/model.py index 6c4679ed..140468eb 100644 --- a/qtype/dsl/model.py +++ b/qtype/dsl/model.py @@ -1003,7 +1003,7 @@ class TelemetrySink(StrictBaseModel): id: str = Field( ..., description="Unique ID of the telemetry sink configuration." ) - provider: Literal["Phoenix", "Langfuse"] = "Phoenix" + provider: Literal["Phoenix", "Langfuse", "Arize"] = "Phoenix" auth: Reference[AuthProviderType] | str | None = Field( default=None, description="AuthorizationProvider used to authenticate telemetry data transmission.", diff --git a/qtype/interpreter/feedback_api.py b/qtype/interpreter/feedback_api.py index 62255903..e284da2f 100644 --- a/qtype/interpreter/feedback_api.py +++ b/qtype/interpreter/feedback_api.py @@ -1,14 +1,16 @@ from __future__ import annotations import logging +from dataclasses import dataclass from enum import Enum -from typing import Annotated, Any, Literal +from typing import Annotated, Any, Literal, cast from urllib.parse import urlparse from fastapi import FastAPI, HTTPException, status from pydantic import BaseModel, Field from qtype.interpreter.base.secrets import SecretManagerBase +from qtype.interpreter.telemetry import _resolve_arize_credentials from qtype.semantic.model import TelemetrySink logger = logging.getLogger(__name__) @@ -19,6 +21,25 @@ class TelemetryProvider(str, Enum): PHOENIX = "Phoenix" LANGFUSE = "Langfuse" + ARIZE = "Arize" + + +@dataclass +class ArizeClientWrapper: + """Encapsulates Arize client with resolved authentication and project context. + + This wrapper combines the Arize API client with the space and project + identifiers required for feedback submission operations. + + Attributes: + client: Authenticated Arize API client instance. + space_id: Arize workspace/space identifier. + project_name: Project name for organizing telemetry data. + """ + + client: Any # ArizeClient type - keep Any to avoid requiring arize at import time + space_id: str + project_name: str def _format_feedback_label(feedback: FeedbackData) -> str: @@ -123,6 +144,19 @@ def _create_telemetry_client( "Feedback endpoint will not be created." ) return None + elif telemetry.provider == TelemetryProvider.ARIZE.value: + # Resolve credentials using shared helper + space_id, project_name, api_key = _resolve_arize_credentials( + telemetry, None, secret_manager + ) + + from arize import ArizeClient + + return ArizeClientWrapper( + client=ArizeClient(api_key=api_key), + space_id=space_id, + project_name=project_name, + ) else: logger.warning( f"Feedback endpoint not created: unsupported telemetry " @@ -200,10 +234,51 @@ async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: "Langfuse feedback not yet implemented" ) + elif telemetry.provider == TelemetryProvider.ARIZE.value: + import pandas as pd + + label = _format_feedback_label(request.feedback) + explanation = request.feedback.explanation + + # Calculate score based on feedback type + score = None + if isinstance(request.feedback, ThumbsFeedbackData): + score = 1.0 if request.feedback.value else 0.0 + elif isinstance(request.feedback, RatingFeedbackData): + score = float(request.feedback.score) + + # Build annotation DataFrame + data: dict[str, list[Any]] = { + "context.span_id": [request.span_id], + "annotation.user_feedback.label": [label], + "annotation.user_feedback.updated_by": ["human"], + } + if score is not None: + data["annotation.user_feedback.score"] = [score] + if explanation: + data["annotation.notes"] = [explanation] + + df = pd.DataFrame(data) + # Cast since we're in Arize provider branch where client is ArizeClientWrapper + arize_wrapper = cast(ArizeClientWrapper, client) + arize_wrapper.client.spans.update_annotations( + space_id=arize_wrapper.space_id, + project_name=arize_wrapper.project_name, + dataframe=df, + validate=True, + ) + + logger.info( + "Feedback submitted to Arize for " + f"span {request.span_id}: " + f"{request.feedback.type} = {label}" + ) + return FeedbackResponse() except Exception as e: logger.error(f"Failed to submit feedback: {e}", exc_info=True) + raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to submit feedback.", diff --git a/qtype/interpreter/telemetry.py b/qtype/interpreter/telemetry.py index 596268b2..c1ad00c3 100644 --- a/qtype/interpreter/telemetry.py +++ b/qtype/interpreter/telemetry.py @@ -1,6 +1,8 @@ from __future__ import annotations import base64 +import logging +from typing import cast from openinference.instrumentation.llama_index import LlamaIndexInstrumentor from opentelemetry import trace @@ -11,8 +13,11 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +from qtype.interpreter.auth.generic import auth from qtype.interpreter.base.secrets import SecretManagerBase -from qtype.semantic.model import TelemetrySink +from qtype.semantic.model import APIKeyAuthProvider, TelemetrySink + +logger = logging.getLogger(__name__) def _setup_langfuse_otel( @@ -83,6 +88,103 @@ def _setup_langfuse_otel( return tracer_provider +def _resolve_arize_credentials( + sink: TelemetrySink, + project_id: str | None, + secret_manager: SecretManagerBase, +) -> tuple[str, str, str]: + """Resolve Arize credentials from telemetry sink configuration. + + Args: + sink: TelemetrySink with Arize configuration. + project_id: Default project identifier if not in args. + secret_manager: For resolving secret references. + + Returns: + Tuple of (space_id, project_name, api_key). + + Raises: + ValueError: If required credentials are missing or invalid. + """ + if not sink.auth: + raise ValueError( + f"Arize telemetry sink '{sink.id}' requires " + "'auth' field with an APIKeyAuthProvider." + ) + space_id = sink.args.get("space_id", None) + if not space_id: + raise ValueError( + f"Arize telemetry sink '{sink.id}' requires 'space_id' in args." + ) + + # Cast types since resolve_secrets_in_dict returns str (not SecretReference) + space_id = cast(str, space_id) + project_name = project_id or sink.args.get("project_name", sink.id) + + # Resolve API key from auth provider + with auth(sink.auth, secret_manager) as provider: + if not isinstance(provider, APIKeyAuthProvider): + raise ValueError( + f"Arize telemetry sink '{sink.id}' requires " + f"APIKeyAuthProvider, got {type(provider).__name__}" + ) + # Cast since auth() context manager resolves SecretReferences to str + api_key = cast(str, provider.api_key) + + return space_id, project_name, api_key + + +def _setup_arize_otel( + sink: TelemetrySink, + project_id: str, + secret_manager: SecretManagerBase, +) -> TracerProvider: + """Initialize and register Arize Cloud as an OTEL trace exporter. + + Uses the ``arize.otel.register()`` helper with HTTP transport. + + Args: + sink: TelemetrySink with Arize endpoint and credentials. + project_id: Project identifier for grouping traces. + secret_manager: For resolving secret references in args. + + Returns: + Configured OpenTelemetry TracerProvider. + + Raises: + ValueError: If required credentials are missing or invalid. + """ + # Resolve credentials + space_id, project_name, api_key = _resolve_arize_credentials( + sink, project_id, secret_manager + ) + + # Resolve endpoint + config = {"endpoint": sink.endpoint} + resolved = secret_manager.resolve_secrets_in_dict( + config, f"telemetry sink '{sink.id}'" + ) + endpoint = resolved.get("endpoint", "") + if not endpoint: + msg = f"Arize telemetry sink '{sink.id}' requires an 'endpoint' field." + raise ValueError(msg) + + # Use HTTP transport (avoids gRPC TLS certificate issues) + # Import arize here to allow GRPC env var to be set before pyarrow loads + from arize.otel import register as arize_register + from arize.otel.otel import Transport + + tracer_provider = arize_register( + space_id=space_id, + api_key=api_key, + project_name=project_name, + endpoint=endpoint, + transport=Transport.HTTP, + ) + + return tracer_provider + + def register( telemetry: TelemetrySink, secret_manager: SecretManagerBase, @@ -108,12 +210,9 @@ def register( TracerProvider instance for managing telemetry lifecycle. Note: - Supports Phoenix and Langfuse telemetry providers. + Supports Phoenix, Langfuse, and Arize telemetry providers. Phoenix is the default. """ - - # Only llama_index and phoenix are supported for now - project_id = project_id if project_id else telemetry.id if telemetry.provider == "Phoenix": @@ -135,6 +234,12 @@ def register( secret_manager=secret_manager, context=f"telemetry sink '{telemetry.id}'", ) + elif telemetry.provider == "Arize": + tracer_provider = _setup_arize_otel( + sink=telemetry, + project_id=project_id, + secret_manager=secret_manager, + ) else: raise ValueError( f"Unsupported telemetry provider: {telemetry.provider}" diff --git a/qtype/semantic/model.py b/qtype/semantic/model.py index f6b50657..5258593d 100644 --- a/qtype/semantic/model.py +++ b/qtype/semantic/model.py @@ -297,7 +297,7 @@ class TelemetrySink(BaseModel): id: str = Field( ..., description="Unique ID of the telemetry sink configuration." ) - provider: Literal["Phoenix", "Langfuse"] = Field("Phoenix") + provider: Literal["Phoenix", "Langfuse", "Arize"] = Field("Phoenix") auth: AuthorizationProvider | None = Field( None, description="AuthorizationProvider used to authenticate telemetry data transmission.", diff --git a/schema/qtype.schema.json b/schema/qtype.schema.json index 89ed7f86..41d1fc49 100644 --- a/schema/qtype.schema.json +++ b/schema/qtype.schema.json @@ -3689,7 +3689,8 @@ "default": "Phoenix", "enum": [ "Phoenix", - "Langfuse" + "Langfuse", + "Arize" ], "title": "Provider", "type": "string" diff --git a/tests/interpreter/test_telemetry_arize.py b/tests/interpreter/test_telemetry_arize.py new file mode 100644 index 00000000..ae90a66a --- /dev/null +++ b/tests/interpreter/test_telemetry_arize.py @@ -0,0 +1,243 @@ +"""Unit tests for Arize Cloud telemetry integration.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from qtype.interpreter.base.secrets import NoOpSecretManager +from qtype.interpreter.feedback_api import ( + ArizeClientWrapper, + FeedbackRequest, + ThumbsFeedbackData, + _create_telemetry_client, +) +from qtype.semantic.model import APIKeyAuthProvider, TelemetrySink + + +# Shared fixtures +@pytest.fixture +def secret_manager(): + """Reusable NoOp secret manager.""" + return NoOpSecretManager() + + +@pytest.fixture +def arize_auth(): + """Standard Arize auth configuration.""" + return APIKeyAuthProvider( + id="arize-auth", + type="api_key", + api_key="test-api-key", + ) + + +@pytest.fixture +def arize_sink(arize_auth): + """Standard Arize telemetry sink.""" + return TelemetrySink( + id="arize-sink", + provider="Arize", + endpoint="https://otlp.arize.com/v1", + auth=arize_auth, + args={ + "space_id": "test-space", + "project_name": "test-project", + }, + ) + + +class TestSetupArizeOtel: + """Tests for _setup_arize_otel() function.""" + + @patch("arize.otel.register") + @patch("qtype.interpreter.telemetry._resolve_arize_credentials") + def test_calls_arize_register_with_resolved_credentials( + self, + mock_resolve_creds, + mock_arize_register, + arize_sink, + secret_manager, + ): + """Test arize.otel.register is called with resolved credentials.""" + from qtype.interpreter.telemetry import _setup_arize_otel + + mock_resolve_creds.return_value = ( + "test-space-id", + "my-project", + "resolved-key", + ) + mock_tracer = MagicMock() + mock_arize_register.return_value = mock_tracer + + result = _setup_arize_otel( + sink=arize_sink, + project_id="my-project", + secret_manager=secret_manager, + ) + + from arize.otel.otel import Transport + + mock_arize_register.assert_called_once_with( + space_id="test-space-id", + api_key="resolved-key", + project_name="my-project", + endpoint="https://otlp.arize.com/v1", + transport=Transport.HTTP, + ) + assert result is mock_tracer + + @pytest.mark.parametrize( + "missing_field,args,auth", + [ + ( + "space_id", + {}, + APIKeyAuthProvider(id="a", type="api_key", api_key="key"), + ), + ("auth", {"space_id": "test"}, None), + ], + ) + def test_missing_required_field_raises_value_error( + self, missing_field, args, auth, secret_manager + ): + """Test missing required fields raise ValueError.""" + from qtype.interpreter.telemetry import _resolve_arize_credentials + + sink = TelemetrySink( + id="arize-sink", + provider="Arize", + endpoint="https://otlp.arize.com/v1", + auth=auth, + args=args, + ) + + with pytest.raises(ValueError, match=missing_field): + _resolve_arize_credentials( + sink=sink, + project_id="proj", + secret_manager=secret_manager, + ) + + +class TestRegisterArize: + """Tests for the Arize branch in register().""" + + @patch("qtype.interpreter.telemetry._setup_arize_otel") + @patch("qtype.interpreter.telemetry.LlamaIndexInstrumentor") + def test_register_dispatches_to_arize_setup( + self, mock_instrumentor_cls, mock_setup, arize_sink, secret_manager + ): + """Test register() dispatches to _setup_arize_otel.""" + from qtype.interpreter.telemetry import register + + mock_tracer = MagicMock() + mock_setup.return_value = mock_tracer + + result = register(arize_sink, secret_manager, project_id="proj") + + mock_setup.assert_called_once_with( + sink=arize_sink, + project_id="proj", + secret_manager=secret_manager, + ) + assert result is mock_tracer + mock_instrumentor_cls().instrument.assert_called_once_with( + tracer_provider=mock_tracer + ) + + +class TestCreateTelemetryClientArize: + """Tests for _create_telemetry_client() Arize branch.""" + + @patch("arize.ArizeClient") + @patch("qtype.interpreter.feedback_api._resolve_arize_credentials") + def test_returns_arize_client_wrapper( + self, + mock_resolve_creds, + mock_arize_client_cls, + arize_sink, + secret_manager, + ): + """Test Arize branch returns ArizeClientWrapper.""" + mock_resolve_creds.return_value = ( + "test-space", + "test-project", + "resolved-key", + ) + + mock_client = MagicMock() + mock_arize_client_cls.return_value = mock_client + + result = _create_telemetry_client(arize_sink, secret_manager) + + assert isinstance(result, ArizeClientWrapper) + assert result.client is mock_client + assert result.space_id == "test-space" + assert result.project_name == "test-project" + mock_arize_client_cls.assert_called_once_with(api_key="resolved-key") + + def test_missing_auth_raises_value_error(self, secret_manager): + """Test missing auth raises ValueError.""" + sink = TelemetrySink( + id="arize-sink", + provider="Arize", + endpoint="https://otlp.arize.com/v1", + auth=None, + args={"space_id": "test-space"}, + ) + with pytest.raises(ValueError, match="auth"): + _create_telemetry_client(sink, secret_manager) + + +class TestSubmitFeedbackArize: + """Tests for submit_feedback() Arize branch.""" + + @patch("qtype.interpreter.feedback_api._create_telemetry_client") + def test_submit_thumbs_feedback_calls_update_annotations( + self, mock_create_client, arize_sink, secret_manager + ): + """Test thumbs feedback calls update_annotations correctly.""" + import asyncio + + from qtype.interpreter.feedback_api import create_feedback_endpoint + + mock_arize_client = MagicMock() + wrapper = ArizeClientWrapper( + client=mock_arize_client, + space_id="test-space", + project_name="test-project", + ) + mock_create_client.return_value = wrapper + + from fastapi import FastAPI + + app = FastAPI() + create_feedback_endpoint(app, arize_sink, secret_manager) + + # Find feedback route + feedback_route = next( + r + for r in app.routes + if hasattr(r, "path") and r.path == "/feedback" + ) + + request = FeedbackRequest( + span_id="span-123", + trace_id="trace-456", + feedback=ThumbsFeedbackData( + value=True, explanation="Great response" + ), + ) + + result = asyncio.get_event_loop().run_until_complete( + feedback_route.endpoint(request) + ) + + assert result.status == "success" + mock_arize_client.spans.update_annotations.assert_called_once() + call_kwargs = mock_arize_client.spans.update_annotations.call_args[1] + assert call_kwargs["space_id"] == "test-space" + assert call_kwargs["project_name"] == "test-project" + assert call_kwargs["validate"] is True diff --git a/tests/semantic/test_feedback_validation.py b/tests/semantic/test_feedback_validation.py index 56a02a19..53cdd979 100644 --- a/tests/semantic/test_feedback_validation.py +++ b/tests/semantic/test_feedback_validation.py @@ -114,3 +114,44 @@ def test_flow_without_feedback(self, tmp_path): app, _ = loader.load(str(yaml_file)) flow = app.flows[0] assert flow.feedback is None + + def test_arize_provider_with_feedback_loads_correctly(self, tmp_path): + """Test that Arize provider with feedback configuration + loads and validates.""" + yaml_content = """ +id: test_app + +auths: + - id: arize-auth + type: api_key + api_key: test-api-key + +flows: + - id: test_flow + feedback: + type: thumbs + explanation: true + steps: + - type: Echo + id: echo1 + +telemetry: + id: arize-telemetry + provider: Arize + endpoint: https://otlp.arize.com/v1 + auth: arize-auth + args: + space_id: test-space-id + project_name: test-project +""" + yaml_file = tmp_path / "test.yaml" + yaml_file.write_text(yaml_content) + + app, _ = loader.load(str(yaml_file)) + assert app.telemetry is not None + assert app.telemetry.provider == "Arize" + assert app.telemetry.args["space_id"] == "test-space-id" + assert app.telemetry.args["project_name"] == "test-project" + flow = app.flows[0] + assert flow.feedback is not None + assert flow.feedback.type == "thumbs" diff --git a/uv.lock b/uv.lock index 83944ac9..630dfe4e 100644 --- a/uv.lock +++ b/uv.lock @@ -345,6 +345,51 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/15/b3/9b1a8074496371342ec1e796a96f99c82c945a339cd81a8e73de28b4cf9e/anyio-4.11.0-py3-none-any.whl", hash = "sha256:0287e96f4d26d4149305414d4e3bc32f0dcd0862365a4bddea19d7a1ec38c4fc", size = 109097, upload-time = "2025-09-23T09:19:10.601Z" }, ] +[[package]] +name = "arize" +version = "8.2.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, + { name = "numpy", version = "2.3.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, + { name = "openinference-semantic-conventions" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-sdk" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "pandas" }, + { name = "protobuf" }, + { name = "pyarrow" }, + { name = "pydantic" }, + { name = "python-dateutil" }, + { name = "requests" }, + { name = "requests-futures" }, + { name = "tqdm" }, + { name = "typing-extensions" }, + { name = "urllib3" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/35/80/cdc6fce57e4176cdcb101d1814be6598e2e6d5040265319a5946c52515b2/arize-8.2.1.tar.gz", hash = "sha256:ff6794c5732819709617f19cc25cbedce5f2a6901eea14e6f82e1e8a3e692dd5", size = 262597, upload-time = "2026-02-13T19:40:24.325Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/bf/2c314ca44e16aad6debef13cf0bdb27c0ff3253f9f363fdfb535283aa493/arize-8.2.1-py3-none-any.whl", hash = "sha256:2b3132bf95e597f8fa23c8d432506ae043d8d466d84a003ca87ed33a9b009638", size = 425148, upload-time = "2026-02-13T19:40:22.636Z" }, +] + +[[package]] +name = "arize-otel" +version = "0.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "openinference-instrumentation" }, + { name = "openinference-semantic-conventions" }, + { name = "opentelemetry-exporter-otlp" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/0a/43/feecfd74cf03cd9f73bfa43155d8edb4ee38e880e7cd6ecc4277f2f4143c/arize_otel-0.11.0.tar.gz", hash = "sha256:cf0729b7c236c51c11e7054b3f96e7deadf6a813386a35391a670c64a63a09bf", size = 15801, upload-time = "2025-10-30T05:53:07.279Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/55/8b/8f5426c47eee5e4f3acaf8a8037dbe5a8d503c739e9327be8d699e6631a5/arize_otel-0.11.0-py3-none-any.whl", hash = "sha256:48e7e4c5b30d53a89c86085bd00a98077bad766ccde79ca906daa78eb17b93d2", size = 16839, upload-time = "2025-10-30T05:53:05.763Z" }, +] + [[package]] name = "arize-phoenix" version = "12.35.0" @@ -5648,6 +5693,8 @@ dependencies = [ [package.optional-dependencies] interpreter = [ { name = "aiostream" }, + { name = "arize" }, + { name = "arize-otel" }, { name = "arize-phoenix-otel" }, { name = "boto3" }, { name = "datasets" }, @@ -5723,6 +5770,8 @@ docs = [ [package.metadata] requires-dist = [ { name = "aiostream", marker = "extra == 'interpreter'", specifier = ">=0.7.1" }, + { name = "arize", marker = "extra == 'interpreter'", specifier = ">=8.0.0" }, + { name = "arize-otel", marker = "extra == 'interpreter'", specifier = ">=0.11.0" }, { name = "arize-phoenix-otel", marker = "extra == 'interpreter'", specifier = ">=0.12.1" }, { name = "boto3", marker = "extra == 'interpreter'", specifier = ">=1.34.0" }, { name = "cachetools", specifier = ">=6.2.1" }, @@ -5962,6 +6011,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, ] +[[package]] +name = "requests-futures" +version = "1.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/88/f8/175b823241536ba09da033850d66194c372c65c38804847ac9cef0239542/requests_futures-1.0.2.tar.gz", hash = "sha256:6b7eb57940336e800faebc3dab506360edec9478f7b22dc570858ad3aa7458da", size = 10356, upload-time = "2024-11-15T22:14:51.988Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/91/23/7c1096731c15c83826cb0dd42078b561a838aed44c36f370aeb815168106/requests_futures-1.0.2-py2.py3-none-any.whl", hash = "sha256:a3534af7c2bf670cd7aa730716e9e7d4386497554f87792be7514063b8912897", size = 7671, upload-time = "2024-11-15T22:14:50.255Z" }, +] + [[package]] name = "rfc3339-validator" version = "0.1.4" From b72e8c02b2059528300701df8cf0833b02139624 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Mon, 16 Feb 2026 17:30:50 -0500 Subject: [PATCH 2/7] remove useless test --- tests/semantic/test_feedback_validation.py | 157 --------------------- 1 file changed, 157 deletions(-) delete mode 100644 tests/semantic/test_feedback_validation.py diff --git a/tests/semantic/test_feedback_validation.py b/tests/semantic/test_feedback_validation.py deleted file mode 100644 index 53cdd979..00000000 --- a/tests/semantic/test_feedback_validation.py +++ /dev/null @@ -1,157 +0,0 @@ -"""Tests for semantic validation of feedback configurations.""" - -from __future__ import annotations - -from qtype.semantic import loader - - -class TestFeedbackSemanticValidation: - """Test semantic validation rules for feedback.""" - - def test_thumbs_feedback_loads_correctly(self, tmp_path): - """Test that thumbs feedback configuration loads and validates.""" - yaml_content = """ -id: test_app -flows: - - id: test_flow - feedback: - type: thumbs - explanation: true - steps: - - type: Echo - id: echo1 - -telemetry: - id: test_telemetry - provider: Phoenix - endpoint: http://localhost:6006/v1/traces -""" - yaml_file = tmp_path / "test.yaml" - yaml_file.write_text(yaml_content) - - app, _ = loader.load(str(yaml_file)) - flow = app.flows[0] - assert flow.feedback is not None - assert flow.feedback.type == "thumbs" - assert flow.feedback.explanation is True - - def test_rating_feedback_loads_correctly(self, tmp_path): - """Test that rating feedback configuration loads and validates.""" - yaml_content = """ -id: test_app -flows: - - id: test_flow - feedback: - type: rating - scale: 10 - explanation: false - steps: - - type: Echo - id: echo1 - -telemetry: - id: test_telemetry - provider: Phoenix - endpoint: http://localhost:6006/v1/traces -""" - yaml_file = tmp_path / "test.yaml" - yaml_file.write_text(yaml_content) - - app, _ = loader.load(str(yaml_file)) - flow = app.flows[0] - assert flow.feedback is not None - assert flow.feedback.type == "rating" - assert flow.feedback.scale == 10 - assert flow.feedback.explanation is False - - def test_category_feedback_loads_correctly(self, tmp_path): - """Test that category feedback configuration loads and validates.""" - yaml_content = """ -id: test_app -flows: - - id: test_flow - feedback: - type: category - categories: - - accurate - - helpful - - creative - allow_multiple: true - explanation: true - steps: - - type: Echo - id: echo1 - -telemetry: - id: test_telemetry - provider: Phoenix - endpoint: http://localhost:6006/v1/traces -""" - yaml_file = tmp_path / "test.yaml" - yaml_file.write_text(yaml_content) - - app, _ = loader.load(str(yaml_file)) - flow = app.flows[0] - assert flow.feedback is not None - assert flow.feedback.type == "category" - assert flow.feedback.categories == ["accurate", "helpful", "creative"] - assert flow.feedback.allow_multiple is True - assert flow.feedback.explanation is True - - def test_flow_without_feedback(self, tmp_path): - """Test that flows work without feedback configuration.""" - yaml_content = """ -id: test_app -flows: - - id: test_flow - steps: - - type: Echo - id: echo1 -""" - yaml_file = tmp_path / "test.yaml" - yaml_file.write_text(yaml_content) - - app, _ = loader.load(str(yaml_file)) - flow = app.flows[0] - assert flow.feedback is None - - def test_arize_provider_with_feedback_loads_correctly(self, tmp_path): - """Test that Arize provider with feedback configuration - loads and validates.""" - yaml_content = """ -id: test_app - -auths: - - id: arize-auth - type: api_key - api_key: test-api-key - -flows: - - id: test_flow - feedback: - type: thumbs - explanation: true - steps: - - type: Echo - id: echo1 - -telemetry: - id: arize-telemetry - provider: Arize - endpoint: https://otlp.arize.com/v1 - auth: arize-auth - args: - space_id: test-space-id - project_name: test-project -""" - yaml_file = tmp_path / "test.yaml" - yaml_file.write_text(yaml_content) - - app, _ = loader.load(str(yaml_file)) - assert app.telemetry is not None - assert app.telemetry.provider == "Arize" - assert app.telemetry.args["space_id"] == "test-space-id" - assert app.telemetry.args["project_name"] == "test-project" - flow = app.flows[0] - assert flow.feedback is not None - assert flow.feedback.type == "thumbs" From ee1d24da94a641bc2cca27537d62d04cdd347b61 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Tue, 17 Feb 2026 07:19:49 -0500 Subject: [PATCH 3/7] remove speckit --- .vscode/settings.json | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 978c35f7..c1082dad 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,10 +5,8 @@ } ], "yaml.schemaStore.enable": false, - "yaml.schemas": { - "./schema/qtype.schema.json": [ - "*.qtype.yaml" - ] + "yaml.schemas" : { + "./schema/qtype.schema.json": ["*.qtype.yaml"] }, "python.testing.pytestArgs": [ "tests" @@ -21,16 +19,5 @@ "envManager": "ms-python.python:venv", "packageManager": "ms-python.python:pip" } - ], - "chat.promptFilesRecommendations": { - "speckit.constitution": true, - "speckit.specify": true, - "speckit.plan": true, - "speckit.tasks": true, - "speckit.implement": true - }, - "chat.tools.terminal.autoApprove": { - ".specify/scripts/bash/": true, - ".specify/scripts/powershell/": true - } + ] } From 47e3ddb89810fb46e9901f4ae3f782806987e5a1 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Tue, 17 Feb 2026 07:21:02 -0500 Subject: [PATCH 4/7] fix doc --- docs/How To/Qtype Server/add_feedback_buttons.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/How To/Qtype Server/add_feedback_buttons.md b/docs/How To/Qtype Server/add_feedback_buttons.md index 89e94d56..e47d6415 100644 --- a/docs/How To/Qtype Server/add_feedback_buttons.md +++ b/docs/How To/Qtype Server/add_feedback_buttons.md @@ -24,7 +24,8 @@ telemetry: ### Explanation -- **flows[].feedback**: Enables a feedback widget on the flow’s outputs in the UI.- **flows[].interface.type**: Must be `Conversational` for feedback to work in the streaming UI. For flows without an interface (or with `Complete` interface), use the REST endpoint tab in the UI instead of the streaming tab. +- **flows[].feedback**: Enables a feedback widget on the flow’s outputs in the UI. +- **flows[].interface.type**: Must be `Conversational` for feedback to work in the streaming UI. For flows without an interface (or with `Complete` interface), use the REST endpoint tab in the UI instead of the streaming tab. - **feedback.type**: Feedback widget type: `thumbs`, `rating`, or `category`. - **feedback.explanation**: If `true`, prompts the user for an optional text explanation along with their feedback. - **rating.scale**: For `rating` feedback, sets the maximum score (typically `5` or `10`). From 7a620ad633fbf9cc049c43a0811af4b489788032 Mon Sep 17 00:00:00 2001 From: lou-k Date: Thu, 19 Feb 2026 13:42:59 -0500 Subject: [PATCH 5/7] Clean up arize telemetry --- .vscode/launch.json | 1 - qtype/interpreter/base/base_step_executor.py | 9 +++- qtype/interpreter/feedback_api.py | 10 ++-- qtype/interpreter/flow.py | 40 ++++++++++++++++ qtype/interpreter/telemetry.py | 14 ++++-- qtype/interpreter/types.py | 10 ++-- uv.lock | 50 ++++++++++++++------ 7 files changed, 106 insertions(+), 28 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index d42f8511..947a81da 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -12,7 +12,6 @@ "justMyCode": false, "env": { "PYTHONPATH": "${workspaceFolder}", - "GRPC_DEFAULT_SSL_ROOTS_FILE_PATH": "${workspaceFolder}/certs/combined-cacerts.pem" }, "envFile": "${workspaceFolder}/.env" }, diff --git a/qtype/interpreter/base/base_step_executor.py b/qtype/interpreter/base/base_step_executor.py index 2743461f..fd0af88d 100644 --- a/qtype/interpreter/base/base_step_executor.py +++ b/qtype/interpreter/base/base_step_executor.py @@ -406,11 +406,16 @@ async def _process_message_with_telemetry( "error": str(output_msg.error), }, ) + from opentelemetry.trace import get_current_span + + current_span = get_current_span() + current_span_context= current_span.get_span_context() + # Enrich with process_message span for feedback tracking span_context = span.get_span_context() output_msg = output_msg.with_telemetry_metadata( - span_id=format(span_context.span_id, "016x"), - trace_id=format(span_context.trace_id, "032x"), + span_id=current_span_context.span_id, + trace_id=current_span_context.trace_id, ) yield output_msg diff --git a/qtype/interpreter/feedback_api.py b/qtype/interpreter/feedback_api.py index e284da2f..38422acc 100644 --- a/qtype/interpreter/feedback_api.py +++ b/qtype/interpreter/feedback_api.py @@ -147,7 +147,7 @@ def _create_telemetry_client( elif telemetry.provider == TelemetryProvider.ARIZE.value: # Resolve credentials using shared helper space_id, project_name, api_key = _resolve_arize_credentials( - telemetry, None, secret_manager + telemetry, project_id=telemetry.id, secret_manager=secret_manager ) from arize import ArizeClient @@ -214,6 +214,7 @@ async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: elif isinstance(request.feedback, RatingFeedbackData): score = float(request.feedback.score) + # Span ID is already in hex format from metadata client.spans.add_span_annotation( span_id=request.span_id, annotation_name="user_feedback", @@ -224,8 +225,8 @@ async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: ) logger.info( - f"Feedback submitted to Phoenix for span {request.span_id}: " - f"{request.feedback.type} = {label}" + f"Feedback submitted to Phoenix for span " + f"{request.span_id}: {request.feedback.type} = {label}" ) elif telemetry.provider == TelemetryProvider.LANGFUSE.value: @@ -259,9 +260,10 @@ async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: data["annotation.notes"] = [explanation] df = pd.DataFrame(data) + # Cast since we're in Arize provider branch where client is ArizeClientWrapper arize_wrapper = cast(ArizeClientWrapper, client) - arize_wrapper.client.spans.update_annotations( + response = arize_wrapper.client.spans.update_annotations( space_id=arize_wrapper.space_id, project_name=arize_wrapper.project_name, dataframe=df, diff --git a/qtype/interpreter/flow.py b/qtype/interpreter/flow.py index 3deb8e34..3ca1ae17 100644 --- a/qtype/interpreter/flow.py +++ b/qtype/interpreter/flow.py @@ -87,7 +87,11 @@ async def run_flow( # Only attach if span is recording (i.e., real tracer is configured) ctx = trace.set_span_in_context(span) token = otel_context.attach(ctx) if span.is_recording() else None + from opentelemetry.trace import format_span_id, format_trace_id + flow_span_context = span.get_span_context() + flow_span_id = format_span_id(flow_span_context.span_id) + flow_trace_id = format_trace_id(flow_span_context.trace_id) try: # 1. Get the execution plan is just the steps in order execution_plan = flow.steps @@ -147,6 +151,29 @@ async def list_stream(): # Close the progress bars if any if progress_callback is not None: progress_callback.close() + + # Override metadata with flow-level span ID for feedback tracking + # Arize stores the root CHAIN span, not individual step spans + if span.is_recording(): + + flow_span_context = span.get_span_context() + flow_span_id = format_span_id(flow_span_context.span_id) + flow_trace_id = format_trace_id(flow_span_context.trace_id) + + # Update all final messages with flow-level span metadata + final_results = [ + msg.model_copy( + update={ + "metadata": { + **msg.metadata, + "span_id": flow_span_id, + "trace_id": flow_trace_id, + } + } + ) + for msg in final_results + ] + # Record flow completion metrics span.set_attribute("flow.output_count", len(final_results)) error_count = sum(1 for msg in final_results if msg.is_failed()) @@ -197,4 +224,17 @@ async def list_stream(): # Only detach if we successfully attached (span was recording) if token is not None: otel_context.detach(token) + + # Check if we should flush BEFORE ending the span + should_flush = span.is_recording() span.end() + + # Force flush spans to ensure they're sent to telemetry backend + # immediately (critical for feedback submission that needs span IDs) + if should_flush: + tracer_provider = trace.get_tracer_provider() + flush_success = tracer_provider.force_flush(timeout_millis=5000) + if not flush_success: + logger.warning( + "Telemetry flush timed out or failed after flow execution" + ) diff --git a/qtype/interpreter/telemetry.py b/qtype/interpreter/telemetry.py index c1ad00c3..f845585b 100644 --- a/qtype/interpreter/telemetry.py +++ b/qtype/interpreter/telemetry.py @@ -90,7 +90,7 @@ def _setup_langfuse_otel( def _resolve_arize_credentials( sink: TelemetrySink, - project_id: str | None, + project_id: str, secret_manager: SecretManagerBase, ) -> tuple[str, str, str]: """Resolve Arize credentials from telemetry sink configuration. @@ -119,7 +119,8 @@ def _resolve_arize_credentials( # Cast types since resolve_secrets_in_dict returns str (not SecretReference) space_id = cast(str, space_id) - project_name = project_id or sink.args.get("project_name", sink.id) + # Args project_name takes precedence over project_id parameter + project_name = sink.args.get("project_name", project_id) # Resolve API key from auth provider with auth(sink.auth, secret_manager) as provider: @@ -174,12 +175,19 @@ def _setup_arize_otel( from arize.otel import register as arize_register from arize.otel.otel import Transport + if "transport" in sink.args: + # convert the string to Transport enum if provided + transport_str = sink.args["transport"].upper() + transport = Transport[transport_str] + else: + transport = Transport.GRPC + tracer_provider = arize_register( space_id=space_id, api_key=api_key, project_name=project_name, endpoint=endpoint, - transport=Transport.HTTP, + transport=transport ) return tracer_provider diff --git a/qtype/interpreter/types.py b/qtype/interpreter/types.py index 185ff588..6d125d3c 100644 --- a/qtype/interpreter/types.py +++ b/qtype/interpreter/types.py @@ -356,7 +356,7 @@ def is_failed(self) -> bool: return self.error is not None def with_telemetry_metadata( - self, span_id: str, trace_id: str + self, span_id: int, trace_id: int ) -> "FlowMessage": """Create a copy with telemetry metadata added. @@ -365,12 +365,14 @@ def with_telemetry_metadata( trace_id: OpenTelemetry trace ID (32 hex chars) Returns: - New FlowMessage with telemetry metadata + New FlowMessage with telemetry metadata (IDs as hex strings) """ + from opentelemetry.trace import format_span_id, format_trace_id + updated_metadata = { **self.metadata, - "span_id": span_id, - "trace_id": trace_id, + "span_id": format_span_id(span_id), + "trace_id": format_trace_id(trace_id), } return self.model_copy(update={"metadata": updated_metadata}) diff --git a/uv.lock b/uv.lock index 630dfe4e..54aaddd9 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", @@ -392,7 +392,7 @@ wheels = [ [[package]] name = "arize-phoenix" -version = "12.35.0" +version = "13.0.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aioitertools" }, @@ -410,15 +410,17 @@ dependencies = [ { name = "httpx" }, { name = "jinja2" }, { name = "jmespath" }, + { name = "jsonpath-ng" }, + { name = "jsonschema" }, { name = "ldap3" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "numpy", version = "2.3.4", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "openinference-instrumentation" }, + { name = "openinference-instrumentation-openai" }, { name = "openinference-semantic-conventions" }, { name = "opentelemetry-exporter-otlp" }, { name = "opentelemetry-proto" }, { name = "opentelemetry-sdk" }, - { name = "opentelemetry-semantic-conventions" }, { name = "orjson" }, { name = "pandas" }, { name = "prometheus-client" }, @@ -426,8 +428,10 @@ dependencies = [ { name = "psutil" }, { name = "pyarrow" }, { name = "pydantic" }, + { name = "pystache" }, { name = "python-dateutil" }, { name = "python-multipart" }, + { name = "pyyaml" }, { name = "scikit-learn" }, { name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "scipy", version = "1.16.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, @@ -440,14 +444,14 @@ dependencies = [ { name = "uvicorn" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/5c/7f/e2a298879e5b40efc8860c480dd9f79fb59e1495729828244235e4682266/arize_phoenix-12.35.0.tar.gz", hash = "sha256:0833c4b478ebb7b02d7f74f04f6bbe6d68ed7cebf2d22baffc5eb4aaf4358002", size = 2394470, upload-time = "2026-02-09T23:27:56.696Z" } +sdist = { url = "https://files.pythonhosted.org/packages/76/b4/d9bdda048156ff494529ee8245ad2d144d75e95ab79e8167984ec528aae2/arize_phoenix-13.0.3.tar.gz", hash = "sha256:98d195d7fb1cea438362926a15774f4b02a737007ce601b05338984888ac80fe", size = 758364, upload-time = "2026-02-14T00:10:18.326Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/d4/c4/3c1c839297a6b4e967d49cb4139454f329a6453589b04b457b1a95a665b9/arize_phoenix-12.35.0-py3-none-any.whl", hash = "sha256:2dc5b214d82be151718b8089440f83ad4c8e59d9097e3459b458e071bf655995", size = 2620357, upload-time = "2026-02-09T23:27:54.712Z" }, + { url = "https://files.pythonhosted.org/packages/3d/bd/70482a0e928af2bd3a4bf975057cbd8c8d0443baf8f484d687af1488677d/arize_phoenix-13.0.3-py3-none-any.whl", hash = "sha256:994141d86ed03b9648c1936d88d89d8e2c97c50ad1c943d04a0dea7f854a87a9", size = 2823254, upload-time = "2026-02-14T00:10:16.11Z" }, ] [[package]] name = "arize-phoenix-client" -version = "1.28.1" +version = "1.29.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -458,9 +462,9 @@ dependencies = [ { name = "tqdm" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/89/a8/b15e630f68bc1d65de8756f18e44e518f9d3df985f659c85ca7bad5ad0f9/arize_phoenix_client-1.28.1.tar.gz", hash = "sha256:382ac12e635a7b9c1bcbf793b100fcc5a7f3cec84702c3ba6088f1ffbdc9abda", size = 143651, upload-time = "2026-02-09T23:01:44.582Z" } +sdist = { url = "https://files.pythonhosted.org/packages/18/b0/162fd719f6aba63a6cdf0a81d160467ee462db3677ff9c6a386c147270d9/arize_phoenix_client-1.29.0.tar.gz", hash = "sha256:f7734b1c1c485f3c130773cde3766507887d648695e9b20b921227ba2423be5e", size = 143650, upload-time = "2026-02-13T21:02:06.458Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/be/47/03c280e8b723eac3469f747bfdaa95c3d678a4d52472ac3bcb7f59d7eb73/arize_phoenix_client-1.28.1-py3-none-any.whl", hash = "sha256:14e4002815b45ce975e4d1af119c6f8aabd2e7068e62d012fcc3528af62aad3d", size = 149693, upload-time = "2026-02-09T23:01:43.114Z" }, + { url = "https://files.pythonhosted.org/packages/91/a3/9ecacbae1b8cdc413c2ca411f81fcebfe38259a622443295e00e3c2bd297/arize_phoenix_client-1.29.0-py3-none-any.whl", hash = "sha256:72ebaafc7920629f8d39beab839c90f2ab8f331f719110aaccbe0b1337c6c5f5", size = 149691, upload-time = "2026-02-13T21:02:04.789Z" }, ] [[package]] @@ -4075,7 +4079,7 @@ wheels = [ [[package]] name = "openinference-instrumentation" -version = "0.1.42" +version = "0.1.44" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "openinference-semantic-conventions" }, @@ -4083,9 +4087,9 @@ dependencies = [ { name = "opentelemetry-sdk" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/00/d0/b19061a21fd6127d2857c77744a36073bba9c1502d1d5e8517b708eb8b7c/openinference_instrumentation-0.1.42.tar.gz", hash = "sha256:2275babc34022e151b5492cfba41d3b12e28377f8e08cb45e5d64fe2d9d7fe37", size = 23954, upload-time = "2025-11-05T01:37:46.869Z" } +sdist = { url = "https://files.pythonhosted.org/packages/41/d9/c0d3040c0b5dc2b97ad20c35fb3fc1e3f2006bb4b08741ff325efcf3a96a/openinference_instrumentation-0.1.44.tar.gz", hash = "sha256:141953d2da33d54d428dfba2bfebb27ce0517dc43d52e1449a09db72ec7d318e", size = 23959, upload-time = "2026-02-01T01:45:55.88Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/c3/71/43ee4616fc95dbd2f560550f199c6652a5eb93f84e8aa0039bc95c19cfe0/openinference_instrumentation-0.1.42-py3-none-any.whl", hash = "sha256:e7521ff90833ef7cc65db526a2f59b76a496180abeaaee30ec6abbbc0b43f8ec", size = 30086, upload-time = "2025-11-05T01:37:43.866Z" }, + { url = "https://files.pythonhosted.org/packages/5e/6d/6a19587b26ffa273eb27ba7dd2482013afe3b47c8d9f1f39295216975f9f/openinference_instrumentation-0.1.44-py3-none-any.whl", hash = "sha256:86b2a8931e0f39ecfb739901f8987c654961da03baf3cfa5d5b4f45a96897b2d", size = 30093, upload-time = "2026-02-01T01:45:54.932Z" }, ] [[package]] @@ -4106,13 +4110,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5e/3f/36816c214bfdc066133354291ca324a862c9310dc844a136abfe9741c9ab/openinference_instrumentation_llama_index-4.3.8-py3-none-any.whl", hash = "sha256:e7a16881ba124079980508d8f6b20d5b23ba15f13a976e4dd68336c036351cfb", size = 28866, upload-time = "2025-10-23T20:55:32.578Z" }, ] +[[package]] +name = "openinference-instrumentation-openai" +version = "0.1.41" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "openinference-instrumentation" }, + { name = "openinference-semantic-conventions" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/66/06/77b2fe7171336f71313936daf1b644a9968da85ff0b473a03ca05cc3d5c1/openinference_instrumentation_openai-0.1.41.tar.gz", hash = "sha256:ef4db680986a613b1639720f9beaa315c9e388c20bc985dbbbdf0f4df007c6e9", size = 22848, upload-time = "2025-12-04T19:58:35.349Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a1/db/48f1f540d335f98fa67891e9c25ad56020be7e7b2c0d4fd5014875fe5ddf/openinference_instrumentation_openai-0.1.41-py3-none-any.whl", hash = "sha256:6fad453446835e51333b660882eacababbf1052689ca53cba444a7d97fa2e910", size = 30273, upload-time = "2025-12-04T19:58:34.17Z" }, +] + [[package]] name = "openinference-semantic-conventions" -version = "0.1.25" +version = "0.1.26" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/0b/68/81c8a0b90334ff11e4f285e4934c57f30bea3ef0c0b9f99b65e7b80fae3b/openinference_semantic_conventions-0.1.25.tar.gz", hash = "sha256:f0a8c2cfbd00195d1f362b4803518341e80867d446c2959bf1743f1894fce31d", size = 12767, upload-time = "2025-11-05T01:37:45.89Z" } +sdist = { url = "https://files.pythonhosted.org/packages/5a/91/f67c1971deaf5b75dea84731393bca2042ff4a46acae9a727dfe267dd568/openinference_semantic_conventions-0.1.26.tar.gz", hash = "sha256:34dae06b40743fb7b846a36fd402810a554b2ec4ee96b9dd8b820663aee4a1f1", size = 12782, upload-time = "2026-02-01T01:09:46.095Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fd/3d/dd14ee2eb8a3f3054249562e76b253a1545c76adbbfd43a294f71acde5c3/openinference_semantic_conventions-0.1.25-py3-none-any.whl", hash = "sha256:3814240f3bd61f05d9562b761de70ee793d55b03bca1634edf57d7a2735af238", size = 10395, upload-time = "2025-11-05T01:37:43.697Z" }, + { url = "https://files.pythonhosted.org/packages/6b/ca/bb4b9cbd96f72600abec5280cf8ed67bcd849ed19b8bec919aec97adb61c/openinference_semantic_conventions-0.1.26-py3-none-any.whl", hash = "sha256:35b4f487d18ac7d016125c428c0d950dd290e18dafb99787880a9b2e05745f42", size = 10401, upload-time = "2026-02-01T01:09:44.781Z" }, ] [[package]] From 6d747ef2bd4ebaff14f4b95e50aabe7b9eac3fbb Mon Sep 17 00:00:00 2001 From: lou-k Date: Thu, 19 Feb 2026 13:50:29 -0500 Subject: [PATCH 6/7] fix error check --- qtype/interpreter/feedback_api.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/qtype/interpreter/feedback_api.py b/qtype/interpreter/feedback_api.py index 38422acc..5b9f245d 100644 --- a/qtype/interpreter/feedback_api.py +++ b/qtype/interpreter/feedback_api.py @@ -270,6 +270,9 @@ async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: validate=True, ) + if 'records_updated' not in response or int(response['records_updated']) < 1: + raise Exception(f"Arize API did not update any records: {response}") + logger.info( "Feedback submitted to Arize for " f"span {request.span_id}: " From 046277b4f0c6249f767eb697feabfb90af2b04d1 Mon Sep 17 00:00:00 2001 From: Lou Kratz <219901029+loukratz-bv@users.noreply.github.com> Date: Thu, 19 Feb 2026 13:52:03 -0500 Subject: [PATCH 7/7] fix unit tests --- qtype/interpreter/base/base_step_executor.py | 8 ++------ qtype/interpreter/feedback_api.py | 9 +++++++-- qtype/interpreter/flow.py | 1 - qtype/interpreter/telemetry.py | 6 +++--- tests/interpreter/test_telemetry_arize.py | 14 +++++++++++++- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/qtype/interpreter/base/base_step_executor.py b/qtype/interpreter/base/base_step_executor.py index fd0af88d..39d5ed0a 100644 --- a/qtype/interpreter/base/base_step_executor.py +++ b/qtype/interpreter/base/base_step_executor.py @@ -406,16 +406,12 @@ async def _process_message_with_telemetry( "error": str(output_msg.error), }, ) - from opentelemetry.trace import get_current_span - - current_span = get_current_span() - current_span_context= current_span.get_span_context() # Enrich with process_message span for feedback tracking span_context = span.get_span_context() output_msg = output_msg.with_telemetry_metadata( - span_id=current_span_context.span_id, - trace_id=current_span_context.trace_id, + span_id=span_context.span_id, + trace_id=span_context.trace_id, ) yield output_msg diff --git a/qtype/interpreter/feedback_api.py b/qtype/interpreter/feedback_api.py index 5b9f245d..0081a4de 100644 --- a/qtype/interpreter/feedback_api.py +++ b/qtype/interpreter/feedback_api.py @@ -270,8 +270,13 @@ async def submit_feedback(request: FeedbackRequest) -> FeedbackResponse: validate=True, ) - if 'records_updated' not in response or int(response['records_updated']) < 1: - raise Exception(f"Arize API did not update any records: {response}") + if ( + "records_updated" not in response + or int(response["records_updated"]) < 1 + ): + raise Exception( + f"Arize API did not update any records: {response}" + ) logger.info( "Feedback submitted to Arize for " diff --git a/qtype/interpreter/flow.py b/qtype/interpreter/flow.py index 3ca1ae17..448dabf1 100644 --- a/qtype/interpreter/flow.py +++ b/qtype/interpreter/flow.py @@ -155,7 +155,6 @@ async def list_stream(): # Override metadata with flow-level span ID for feedback tracking # Arize stores the root CHAIN span, not individual step spans if span.is_recording(): - flow_span_context = span.get_span_context() flow_span_id = format_span_id(flow_span_context.span_id) flow_trace_id = format_trace_id(flow_span_context.trace_id) diff --git a/qtype/interpreter/telemetry.py b/qtype/interpreter/telemetry.py index f845585b..78c62bc7 100644 --- a/qtype/interpreter/telemetry.py +++ b/qtype/interpreter/telemetry.py @@ -142,7 +142,8 @@ def _setup_arize_otel( ) -> TracerProvider: """Initialize and register Arize Cloud as an OTEL trace exporter. - Uses the ``arize.otel.register()`` helper with HTTP transport. + Uses the ``arize.otel.register()`` helper. Defaults to gRPC transport; + override by setting ``transport`` in ``sink.args``. Args: sink: TelemetrySink with Arize endpoint and credentials. @@ -170,7 +171,6 @@ def _setup_arize_otel( msg = f"Arize telemetry sink '{sink.id}' requires an 'endpoint' field." raise ValueError(msg) - # Use HTTP transport (avoids gRPC TLS certificate issues) # Import arize here to allow GRPC env var to be set before pyarrow loads from arize.otel import register as arize_register from arize.otel.otel import Transport @@ -187,7 +187,7 @@ def _setup_arize_otel( api_key=api_key, project_name=project_name, endpoint=endpoint, - transport=transport + transport=transport, ) return tracer_provider diff --git a/tests/interpreter/test_telemetry_arize.py b/tests/interpreter/test_telemetry_arize.py index ae90a66a..5cf49bb2 100644 --- a/tests/interpreter/test_telemetry_arize.py +++ b/tests/interpreter/test_telemetry_arize.py @@ -84,7 +84,7 @@ def test_calls_arize_register_with_resolved_credentials( api_key="resolved-key", project_name="my-project", endpoint="https://otlp.arize.com/v1", - transport=Transport.HTTP, + transport=Transport.GRPC, ) assert result is mock_tracer @@ -210,6 +210,9 @@ def test_submit_thumbs_feedback_calls_update_annotations( project_name="test-project", ) mock_create_client.return_value = wrapper + mock_arize_client.spans.update_annotations.return_value = { + "records_updated": 1 + } from fastapi import FastAPI @@ -241,3 +244,12 @@ def test_submit_thumbs_feedback_calls_update_annotations( assert call_kwargs["space_id"] == "test-space" assert call_kwargs["project_name"] == "test-project" assert call_kwargs["validate"] is True + + # Verify DataFrame structure matches Arize schema + df = call_kwargs["dataframe"] + assert "context.span_id" in df.columns + assert df["context.span_id"].iloc[0] == "span-123" + assert df["annotation.user_feedback.label"].iloc[0] == "👍" + assert df["annotation.user_feedback.score"].iloc[0] == 1.0 + # trace_id should NOT be in the DataFrame per Arize schema + assert "context.trace_id" not in df.columns