diff --git a/instrumentation/opentelemetry-instrumentation-google-genai/.changelog/151.changed b/instrumentation/opentelemetry-instrumentation-google-genai/.changelog/151.changed new file mode 100644 index 0000000..9eb160b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-google-genai/.changelog/151.changed @@ -0,0 +1 @@ +Use `wrapt` instead of `functools.wraps` to monkey patch the SDK. \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-google-genai/README.rst b/instrumentation/opentelemetry-instrumentation-google-genai/README.rst index 1e52df9..1839d30 100644 --- a/instrumentation/opentelemetry-instrumentation-google-genai/README.rst +++ b/instrumentation/opentelemetry-instrumentation-google-genai/README.rst @@ -67,11 +67,11 @@ Limitations *********** When using the Google GenAI SDK with automatic function calling enabled, -the OpenTelemetry instrumentation creates a span only for the top-level -``generate_content`` call. +the OpenTelemetry instrumentation creates an ``execute_tool`` span for each tool call the SDK executes, +these spans are nested under the ``generate_content`` span. -Internal model or tool calls triggered automatically by the SDK are executed -within the SDK internals and are not traced as separate spans. +Only tool calls that the SDK executes are traced, if you disable the automatic function calling and instead +do the function calling in your application, our instrumentation cannot trace them. Enabling message content diff --git a/instrumentation/opentelemetry-instrumentation-google-genai/TODOS.md b/instrumentation/opentelemetry-instrumentation-google-genai/TODOS.md index 15e4226..b007e33 100644 --- a/instrumentation/opentelemetry-instrumentation-google-genai/TODOS.md +++ b/instrumentation/opentelemetry-instrumentation-google-genai/TODOS.md @@ -10,7 +10,6 @@ Here are some TODO items required to achieve stability for this package: - Including tool invocation information - Emit events for safety ratings when they block responses - Additional cleanup/improvement tasks such as: - - Adoption of 'wrapt' instead of 'functools.wraps' - Bolstering test coverage ## Future diff --git a/instrumentation/opentelemetry-instrumentation-google-genai/pyproject.toml b/instrumentation/opentelemetry-instrumentation-google-genai/pyproject.toml index f2366c2..4a03eaa 100644 --- a/instrumentation/opentelemetry-instrumentation-google-genai/pyproject.toml +++ b/instrumentation/opentelemetry-instrumentation-google-genai/pyproject.toml @@ -43,6 +43,7 @@ dependencies = [ "opentelemetry-instrumentation >=0.61b0, <2", "opentelemetry-semantic-conventions >=0.61b0, <2", "opentelemetry-util-genai >= 0.4b0, <2", + "wrapt >= 1.0.0, < 3.0.0", ] [project.optional-dependencies] diff --git a/instrumentation/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py b/instrumentation/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py index 624d597..d7ceb32 100644 --- a/instrumentation/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py +++ b/instrumentation/opentelemetry-instrumentation-google-genai/src/opentelemetry/instrumentation/google_genai/generate_content.py @@ -1,10 +1,10 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 -import functools import json import os -from typing import Any, AsyncIterator, Awaitable, Iterator, Optional, Union +from collections.abc import Callable +from typing import Any, Optional, Union from google.genai.models import AsyncModels, Models from google.genai.models import t as transformers @@ -17,6 +17,7 @@ Tool, ToolUnionDict, ) +from wrapt import wrap_function_wrapper from opentelemetry import context as context_api from opentelemetry.semconv._incubating.attributes import ( @@ -403,184 +404,278 @@ async def _maybe_get_tool_definitions_async( def _create_instrumented_generate_content( - snapshot: _MethodsSnapshot, telemetry_handler: TelemetryHandler, generate_content_config_key_allowlist: AllowList, ): - wrapped_func = snapshot.generate_content - - @functools.wraps(wrapped_func) def instrumented_generate_content( - self: Models, - *, - model: str, - contents: Union[ContentListUnion, ContentListUnionDict], - config: Optional[GenerateContentConfigOrDict] = None, - **kwargs: Any, - ) -> GenerateContentResponse: - # If we are unable to parse the config, or don't modify it, we pass it through - # as is to the real GenerateContent. This way the real GenerateContent can deal - # with invalid or empty configs as it normally would. - wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( - telemetry_handler, - config, - ) - finish_reasons = [] - with telemetry_handler.inference( - provider=_determine_genai_system(self), - request_model=model, - operation_name="generate_content", - ) as invocation: - _apply_request_attributes( - wrapped_config, - generate_content_config_key_allowlist, - invocation, + wrapped: Callable[..., Any], + instance: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ): + def _execute( + model: str, + contents: Union[ContentListUnion, ContentListUnionDict], + config: Optional[GenerateContentConfigOrDict] = None, + *_args, + **_kwargs, + ): + # If we are unable to parse the config, or don't modify it, we pass it through + # as is to the real GenerateContent. This way the real GenerateContent can deal + # with invalid or empty configs as it normally would. + wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( + telemetry_handler, + config, ) - invocation.attributes.update( - _get_extra_generate_content_attributes() - ) - invocation.tool_definitions = _maybe_get_tool_definitions( - wrapped_config - ) - - if telemetry_handler.should_capture_content(): - invocation.input_messages = to_input_messages( - contents=transformers.t_contents(contents) + finish_reasons = [] + with telemetry_handler.inference( + provider=_determine_genai_system(instance), + request_model=model, + operation_name="generate_content", + ) as invocation: + _apply_request_attributes( + wrapped_config, + generate_content_config_key_allowlist, + invocation, ) - if wrapped_config.system_instruction: - invocation.system_instruction = to_system_instructions( - content=transformers.t_contents( - wrapped_config.system_instruction - )[0] - ) - candidates = [] - try: - response = wrapped_func( - self, - model=model, - contents=contents, - config=wrapped_config if has_wrapped_tools else config, - **kwargs, + invocation.attributes.update( + _get_extra_generate_content_attributes() ) - _apply_response_attributes( - response, finish_reasons, invocation + invocation.tool_definitions = _maybe_get_tool_definitions( + wrapped_config ) - if response.candidates: - candidates.extend(response.candidates) - return response - finally: - if telemetry_handler.should_capture_content() and candidates: - invocation.output_messages = to_output_messages( - candidates=candidates + + if telemetry_handler.should_capture_content(): + invocation.input_messages = to_input_messages( + contents=transformers.t_contents(contents) + ) + if wrapped_config.system_instruction: + invocation.system_instruction = to_system_instructions( + content=transformers.t_contents( + wrapped_config.system_instruction + )[0] + ) + candidates = [] + try: + response = wrapped( + model=model, + contents=contents, + config=wrapped_config if has_wrapped_tools else config, + *_args, + **_kwargs, + ) + _apply_response_attributes( + response, finish_reasons, invocation ) + if response.candidates: + candidates.extend(response.candidates) + return response + finally: + if ( + telemetry_handler.should_capture_content() + and candidates + ): + invocation.output_messages = to_output_messages( + candidates=candidates + ) + + return _execute(*args, **kwargs) return instrumented_generate_content def _create_instrumented_generate_content_stream( - snapshot: _MethodsSnapshot, telemetry_handler: TelemetryHandler, generate_content_config_key_allowlist: AllowList, ): - wrapped_func = snapshot.generate_content_stream - - @functools.wraps(wrapped_func) def instrumented_generate_content_stream( - self: Models, - *, - model: str, - contents: Union[ContentListUnion, ContentListUnionDict], - config: Optional[GenerateContentConfigOrDict] = None, - **kwargs: Any, - ) -> Iterator[GenerateContentResponse]: - # If we are unable to parse the config, or don't modify it, we pass it through - # as is to the real GenerateContent. This way the real GenerateContent can deal - # with invalid or empty configs as it normally would. - wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( - telemetry_handler, - config, - ) - finish_reasons = [] - with telemetry_handler.inference( - provider=_determine_genai_system(self), - request_model=model, - operation_name="generate_content", - ) as invocation: - _apply_request_attributes( - wrapped_config, - generate_content_config_key_allowlist, - invocation, + wrapped: Callable[..., Any], + instance: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ): + def _execute( + model: str, + contents: Union[ContentListUnion, ContentListUnionDict], + config: Optional[GenerateContentConfigOrDict] = None, + *_args, + **_kwargs, + ): + # If we are unable to parse the config, or don't modify it, we pass it through + # as is to the real GenerateContent. This way the real GenerateContent can deal + # with invalid or empty configs as it normally would. + wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( + telemetry_handler, + config, ) - invocation.attributes.update( - _get_extra_generate_content_attributes() - ) - invocation.tool_definitions = _maybe_get_tool_definitions( - wrapped_config - ) - - if telemetry_handler.should_capture_content(): - invocation.input_messages = to_input_messages( - contents=transformers.t_contents(contents) + finish_reasons = [] + with telemetry_handler.inference( + provider=_determine_genai_system(instance), + request_model=model, + operation_name="generate_content", + ) as invocation: + _apply_request_attributes( + wrapped_config, + generate_content_config_key_allowlist, + invocation, ) - if wrapped_config.system_instruction: - invocation.system_instruction = to_system_instructions( - content=transformers.t_contents( - wrapped_config.system_instruction - )[0] - ) - candidates = [] - try: - for resp in wrapped_func( - self, - model=model, - contents=contents, - config=wrapped_config if has_wrapped_tools else config, - **kwargs, - ): - _apply_response_attributes( - resp, finish_reasons, invocation - ) - if resp.candidates: - candidates.extend(resp.candidates) - yield resp - finally: - if telemetry_handler.should_capture_content() and candidates: - invocation.output_messages = to_output_messages( - candidates=candidates + invocation.attributes.update( + _get_extra_generate_content_attributes() + ) + invocation.tool_definitions = _maybe_get_tool_definitions( + wrapped_config + ) + + if telemetry_handler.should_capture_content(): + invocation.input_messages = to_input_messages( + contents=transformers.t_contents(contents) ) + if wrapped_config.system_instruction: + invocation.system_instruction = to_system_instructions( + content=transformers.t_contents( + wrapped_config.system_instruction + )[0] + ) + candidates = [] + try: + for resp in wrapped( + model=model, + contents=contents, + config=wrapped_config if has_wrapped_tools else config, + *_args, + **_kwargs, + ): + _apply_response_attributes( + resp, finish_reasons, invocation + ) + if resp.candidates: + candidates.extend(resp.candidates) + yield resp + finally: + if ( + telemetry_handler.should_capture_content() + and candidates + ): + invocation.output_messages = to_output_messages( + candidates=candidates + ) + + return _execute(*args, **kwargs) return instrumented_generate_content_stream def _create_instrumented_async_generate_content( - snapshot: _MethodsSnapshot, telemetry_handler: TelemetryHandler, generate_content_config_key_allowlist: AllowList, ): - wrapped_func = snapshot.async_generate_content - - @functools.wraps(wrapped_func) async def instrumented_generate_content( - self: AsyncModels, - *, - model: str, - contents: Union[ContentListUnion, ContentListUnionDict], - config: Optional[GenerateContentConfigOrDict] = None, - **kwargs: Any, - ) -> GenerateContentResponse: - # If we are unable to parse the config, or don't modify it, we pass it through - # as is to the real GenerateContent. This way the real GenerateContent can deal - # with invalid or empty configs as it normally would. - wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( - telemetry_handler, - config, - ) - finish_reasons = [] - with telemetry_handler.inference( - provider=_determine_genai_system(self), - request_model=model, - operation_name="generate_content", - ) as invocation: + wrapped: Callable[..., Any], + instance: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ): + async def _execute( + model: str, + contents: Union[ContentListUnion, ContentListUnionDict], + config: Optional[GenerateContentConfigOrDict] = None, + *_args, + **_kwargs, + ): + # If we are unable to parse the config, or don't modify it, we pass it through + # as is to the real GenerateContent. This way the real GenerateContent can deal + # with invalid or empty configs as it normally would. + wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( + telemetry_handler, + config, + ) + finish_reasons = [] + with telemetry_handler.inference( + provider=_determine_genai_system(instance), + request_model=model, + operation_name="generate_content", + ) as invocation: + invocation.attributes.update( + _get_extra_generate_content_attributes() + ) + _apply_request_attributes( + wrapped_config, + generate_content_config_key_allowlist, + invocation, + ) + invocation.tool_definitions = ( + await _maybe_get_tool_definitions_async(wrapped_config) + ) + + if telemetry_handler.should_capture_content(): + invocation.input_messages = to_input_messages( + contents=transformers.t_contents(contents) + ) + if wrapped_config.system_instruction: + invocation.system_instruction = to_system_instructions( + content=transformers.t_contents( + wrapped_config.system_instruction + )[0] + ) + candidates = [] + try: + response = await wrapped( + model=model, + contents=contents, + config=wrapped_config if has_wrapped_tools else config, + *_args, + **_kwargs, + ) + _apply_response_attributes( + response, finish_reasons, invocation + ) + if response.candidates: + candidates.extend(response.candidates) + return response + finally: + if ( + telemetry_handler.should_capture_content() + and candidates + ): + invocation.output_messages = to_output_messages( + candidates=candidates + ) + + return await _execute(*args, **kwargs) + + return instrumented_generate_content + + +def _create_instrumented_async_generate_content_stream( # type: ignore + telemetry_handler: TelemetryHandler, + generate_content_config_key_allowlist: AllowList, +): + async def instrumented_generate_content_stream( + wrapped: Callable[..., Any], + instance: Any, + args: tuple[Any, ...], + kwargs: dict[str, Any], + ): + async def _execute( + model: str, + contents: Union[ContentListUnion, ContentListUnionDict], + config: Optional[GenerateContentConfigOrDict] = None, + *_args, + **_kwargs, + ): + # If we are unable to parse the config, or don't modify it, we pass it through + # as is to the real GenerateContent. This way the real GenerateContent can deal + # with invalid or empty configs as it normally would. + wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( + telemetry_handler, + config, + ) + finish_reasons = [] + invocation = telemetry_handler.inference( + provider=_determine_genai_system(instance), + request_model=model, + operation_name="generate_content", + ) invocation.attributes.update( _get_extra_generate_content_attributes() ) @@ -603,109 +698,45 @@ async def instrumented_generate_content( wrapped_config.system_instruction )[0] ) - candidates = [] - try: - response = await wrapped_func( - self, - model=model, - contents=contents, - config=wrapped_config if has_wrapped_tools else config, - **kwargs, - ) - _apply_response_attributes( - response, finish_reasons, invocation - ) - if response.candidates: - candidates.extend(response.candidates) - return response - finally: - if telemetry_handler.should_capture_content() and candidates: - invocation.output_messages = to_output_messages( - candidates=candidates - ) - - return instrumented_generate_content - -# Disabling type checking because this is not yet implemented and tested fully. -def _create_instrumented_async_generate_content_stream( # type: ignore - snapshot: _MethodsSnapshot, - telemetry_handler: TelemetryHandler, - generate_content_config_key_allowlist: AllowList, -): - wrapped_func = snapshot.async_generate_content_stream - - @functools.wraps(wrapped_func) - async def instrumented_generate_content_stream( - self: AsyncModels, - *, - model: str, - contents: Union[ContentListUnion, ContentListUnionDict], - config: Optional[GenerateContentConfigOrDict] = None, - **kwargs: Any, - ) -> Awaitable[AsyncIterator[GenerateContentResponse]]: # type: ignore - # If we are unable to parse the config, or don't modify it, we pass it through - # as is to the real GenerateContent. This way the real GenerateContent can deal - # with invalid or empty configs as it normally would. - wrapped_config, has_wrapped_tools = _wrapped_config_with_tools( - telemetry_handler, - config, - ) - finish_reasons = [] - invocation = telemetry_handler.inference( - provider=_determine_genai_system(self), - request_model=model, - operation_name="generate_content", - ) - invocation.attributes.update(_get_extra_generate_content_attributes()) - _apply_request_attributes( - wrapped_config, generate_content_config_key_allowlist, invocation - ) - invocation.tool_definitions = await _maybe_get_tool_definitions_async( - wrapped_config - ) - - if telemetry_handler.should_capture_content(): - invocation.input_messages = to_input_messages( - contents=transformers.t_contents(contents) - ) - if wrapped_config.system_instruction: - invocation.system_instruction = to_system_instructions( - content=transformers.t_contents( - wrapped_config.system_instruction - )[0] - ) - - async def _response_async_generator_wrapper(): - candidates = [] - try: - async for resp in await wrapped_func( - self, - model=model, - contents=contents, - config=wrapped_config if has_wrapped_tools else config, - **kwargs, - ): - _apply_response_attributes( - resp, finish_reasons, invocation - ) - if resp.candidates: - candidates.extend(resp.candidates) - yield resp - if telemetry_handler.should_capture_content() and candidates: - invocation.output_messages = to_output_messages( - candidates=candidates - ) - invocation.stop() - except Exception as exc: - if telemetry_handler.should_capture_content() and candidates: - invocation.output_messages = to_output_messages( - candidates=candidates - ) - invocation.fail(exc) - raise - - return _response_async_generator_wrapper() + async def _response_async_generator_wrapper(): + candidates = [] + try: + async for resp in await wrapped( + model=model, + contents=contents, + config=wrapped_config if has_wrapped_tools else config, + *_args, + **_kwargs, + ): + _apply_response_attributes( + resp, finish_reasons, invocation + ) + if resp.candidates: + candidates.extend(resp.candidates) + yield resp + if ( + telemetry_handler.should_capture_content() + and candidates + ): + invocation.output_messages = to_output_messages( + candidates=candidates + ) + invocation.stop() + except Exception as exc: + if ( + telemetry_handler.should_capture_content() + and candidates + ): + invocation.output_messages = to_output_messages( + candidates=candidates + ) + invocation.fail(exc) + raise + + return _response_async_generator_wrapper() + + return await _execute(*args, **kwargs) return instrumented_generate_content_stream @@ -721,28 +752,36 @@ def instrument_generate_content( ) -> object: os.environ["OTEL_INSTRUMENTATION_GENAI_EMIT_EVENT"] = "true" snapshot = _MethodsSnapshot() - Models.generate_content = _create_instrumented_generate_content( - snapshot, - telemetry_handler, - generate_content_config_key_allowlist, + wrap_function_wrapper( + "google.genai.models", + "Models.generate_content", + _create_instrumented_generate_content( + telemetry_handler, + generate_content_config_key_allowlist, + ), ) - Models.generate_content_stream = ( + wrap_function_wrapper( + "google.genai.models", + "Models.generate_content_stream", _create_instrumented_generate_content_stream( - snapshot, telemetry_handler, generate_content_config_key_allowlist, - ) + ), ) - AsyncModels.generate_content = _create_instrumented_async_generate_content( - snapshot, - telemetry_handler, - generate_content_config_key_allowlist, + wrap_function_wrapper( + "google.genai.models", + "AsyncModels.generate_content", + _create_instrumented_async_generate_content( + telemetry_handler, + generate_content_config_key_allowlist, + ), ) - AsyncModels.generate_content_stream = ( + wrap_function_wrapper( + "google.genai.models", + "AsyncModels.generate_content_stream", _create_instrumented_async_generate_content_stream( - snapshot, telemetry_handler, generate_content_config_key_allowlist, - ) + ), ) return snapshot diff --git a/uv.lock b/uv.lock index 1c58bda..dc34cab 100644 --- a/uv.lock +++ b/uv.lock @@ -1287,6 +1287,7 @@ dependencies = [ { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-semantic-conventions" }, { name = "opentelemetry-util-genai" }, + { name = "wrapt" }, ] [package.optional-dependencies] @@ -1301,6 +1302,7 @@ requires-dist = [ { name = "opentelemetry-instrumentation", specifier = ">=0.61b0,<2" }, { name = "opentelemetry-semantic-conventions", specifier = ">=0.61b0,<2" }, { name = "opentelemetry-util-genai", editable = "util/opentelemetry-util-genai" }, + { name = "wrapt", specifier = ">=1.0.0,<3.0.0" }, ] provides-extras = ["instruments"]