Skip to content

feat: add opt-in Dynamic Instrumentation#761

Open
srprash wants to merge 15 commits into
mainfrom
feat/dynamic-instrumentation
Open

feat: add opt-in Dynamic Instrumentation#761
srprash wants to merge 15 commits into
mainfrom
feat/dynamic-instrumentation

Conversation

@srprash
Copy link
Copy Markdown
Contributor

@srprash srprash commented Jun 4, 2026

Description

Adds Dynamic Instrumentation (DI) to the ADOT Python distro — an opt-in feature that periodically polls instrumentation configurations from a control plane, applies runtime breakpoints/probes to a running application, captures runtime snapshots, and exports them as OTLP log records.

DI is disabled by default and only activates when OTEL_AWS_DYNAMIC_INSTRUMENTATION_ENABLED=true. Initialization is wrapped so any failure is a no-op that never affects the host application.

Changes

  • New debugger package: configuration poller, instrumentation manager, snapshot capture/serialization, OTLP emitter, and status reporter.
  • Two instrumentation engines: bytecode injection (Python 3.9–3.11) and sys.monitoring (Python 3.12+).
  • Wired into the distro via a guarded initialize_debugger() call in AwsOpenTelemetryDistro._configure() (default off).
  • python-dateutil and bytecode (3.9–3.11 only, via environment marker) added as core dependencies so a plain pip install aws-opentelemetry-distro yields a fully working feature with no extras.
  • DI unit tests plus Docker-based contract tests (a di-flask sample app and a mock control-plane API); the mock collector gains an OTLP-logs gRPC/HTTP path so contract tests can assert on captured snapshots.

Testing

  • tox -e 3.11-test-aws-opentelemetry-distro and tox -e 3.12-test-aws-opentelemetry-distro pass (full distro suite + coverage gate), exercising both engines.
  • DI contract tests: all 24 pass (function-level, line-level, probe, hit-limit, capture-limit, and shared-function coexistence scenarios).
  • Lint (black/isort/flake8/pylint), codespell, and a default-off / opt-in smoke check all pass.

Default behavior

No change for existing users: DI is off unless explicitly enabled. When enabled, snapshots are exported to the OTLP logs endpoint (OTEL_AWS_OTLP_LOGS_ENDPOINT).

Add Dynamic Instrumentation (DI) to the ADOT Python distro: an opt-in
feature that polls instrumentation configurations from a control plane,
applies runtime breakpoints/probes to a running application, captures
runtime snapshots, and exports them as OTLP log records.

DI is disabled by default and only activates when the user sets
OTEL_AWS_DYNAMIC_INSTRUMENTATION_ENABLED=true. Initialization is wrapped
so any failure is a no-op that never affects the host application.

Changes:
- Add the debugger package (config poller, instrumentation manager,
  snapshot capture/serialization, OTLP emitter, status reporter) with
  a bytecode-injection engine (Python 3.9-3.11) and a sys.monitoring
  engine (Python 3.12+).
- Wire DI into the distro: guarded initialize_debugger() call in
  AwsOpenTelemetryDistro._configure(), default off.
- Add python-dateutil and bytecode (3.9-3.11 only) as core dependencies
  so a plain `pip install aws-opentelemetry-distro` yields a fully
  working feature with no extras.
- Add DI unit tests and Docker-based contract tests, including a
  di-flask sample app and mock control-plane API.
- Extend the mock collector with an OTLP-logs gRPC/HTTP path so contract
  tests can assert on captured snapshots.

Notes on the bytecode engine:
- Only remove the injected handler from a module's globals once no other
  instrumented function in that module still references it, preventing a
  NameError in sibling user functions.
- Reuse an existing sys.monitoring tool-id registration that the engine
  already owns (e.g. inherited across fork) instead of failing, so line
  breakpoints keep working in prefork (gunicorn/uWSGI) workers.
@srprash srprash requested a review from a team as a code owner June 4, 2026 18:37
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Dynamic Instrumentation (PR 761) -- BUGS: (1) _data_models.py: datetime.fromtimestamp(expires_at_value) creates a naive local-time datetime, but created_at correctly uses tz=timezone.utc. This causes expiration checks to be off by the UTC offset. Fix: add tz=timezone.utc. (2) instrumentation_manager.py: increment_hit_count performs an O(n) linear scan over all active functions on every breakpoint hit while holding the RLock. (3) instrumentation_manager.py: Dict[str, any] should be Dict[str, Any] (capital A) on apply_configuration return type. -- SECURITY: (4) No sensitive data redaction in captured variables (_function_wrapper.py). Function args, return values, and locals are emitted as OTLP logs with no mechanism to skip passwords/tokens/PII. (5) _failed_configs dict can grow without bound if the control plane sends new broken configs with unique IDs. -- CONCURRENCY: (6) _snapshot_otlp_emitter.py: Double-checked locking reads self._event_logger outside the lock. Safe under CPython GIL but fragile under free-threaded Python (PEP 703). -- CODE QUALITY: (7) _debugger_client.py: _poll_probes_loop and _poll_breakpoints_loop are nearly identical. Extract a shared helper. (8) _variable_serializer.py: Marked DEPRECATED yet introduced as new code. Remove if no external consumers. -- See inline comments for details.

if isinstance(expires_at_value, (int, float)):
expires_at = datetime.fromtimestamp(expires_at_value)
# Try as ISO 8601 string
elif isinstance(expires_at_value, str):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: datetime.fromtimestamp(expires_at_value) creates a naive datetime in local time. However, the created_at parsing correctly uses tz=timezone.utc. This inconsistency means expiration checks will malfunction on non-UTC servers. Fix: datetime.fromtimestamp(expires_at_value, tz=timezone.utc)

# Remove from active
del self._active_functions[func_key]

logger.debug("Removed function wrapper: %s", func_key)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance: increment_hit_count performs an O(n) linear scan over all breakpoints on every probe hit. For hot functions this could become a bottleneck. Consider using a dict keyed by breakpoint_id for O(1) lookup instead of iterating through the list.

# Prevents retrying the same broken config on every poll cycle.
# Entries are cleared when the config is removed from the incoming list.
self._failed_configs: Dict[str, str] = {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: _failed_configs dict grows unbounded. Every failed configuration is added but never removed, even after successful retries or when configurations are deleted upstream. Over long-lived processes this could accumulate significant memory. Consider adding eviction logic or a max size cap.

logger.error("Error cleaning up orphaned states: %s", exception, exc_info=True)

def apply_configuration(self, configs: List[BreakpointConfiguration]) -> Dict[str, any]:
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Return type annotation uses lowercase 'any' (Dict[str, any]) instead of 'Any' from typing. This won't cause a runtime error but is incorrect from a type-checking perspective.

self._lock = threading.Lock()

def _ensure_initialized(self):
"""Lazily initialize the LoggerProvider and EventLogger on first use.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrency: The double-checked locking pattern here is fragile. Under PEP 703 (no-GIL Python), the check-then-act outside the lock is a data race. Even with the GIL, if initialization raises an exception, the half-constructed state may be visible to other threads. Consider using threading.Lock with a single check inside, or lazy_object_proxy / functools.cached_property for safer lazy init.

wrapper_self = self

def sync_wrapper(*args, **kwargs):
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: Captured local variables are serialized and exported without any redaction or filtering. Sensitive data (passwords, tokens, PII) in local variables will be sent to the OTLP endpoint. Consider adding a configurable deny-list of variable names (e.g. password, secret, token, key) or a depth/size limit on captured values to reduce exposure.

)

def _poll_probes_loop(self):
logger.debug("Starting PROBE polling loop for %s : %s", self.client.service_name, self.client.environment)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code duplication: _poll_loop and _poll_loop_degraded are nearly identical methods with the same structure. Consider extracting the shared logic into a single method parameterized by the interval and max-failures threshold, to reduce maintenance burden and risk of the two diverging.

"""
Variable serialization utilities for debugger.

DEPRECATED: This module is superseded by _snapshot_serializer.py.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confusing: This file is marked DEPRECATED in its docstring but is being introduced as brand-new code in this PR. If it is genuinely deprecated in favor of another serializer, consider not adding it at all, or if it must exist as a fallback, clarify what replaces it and under what conditions the deprecated path is used.

…ntation

The public repo's pylint config enforces several refactor/convention checks
that the source did not yet satisfy. Resolve them without changing behavior:

- Remove a redundant inline `import os` (os is already imported at module top).
- Rename single-character loop/exception variables (k/v/i/e) to descriptive
  names for invalid-name compliance.
- Add targeted `# pylint: disable=` annotations for the structural checks
  (too-many-*, no-self-use, broad-exception-caught, import-outside-toplevel,
  no-else-continue) and for the BaseHTTPRequestHandler-mandated do_POST name,
  matching the existing per-occurrence convention used across the repo.
- Apply black formatting to the affected files.

No functional changes; debugger unit tests unchanged (189 passed, 25 skipped).
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review findings for the Dynamic Instrumentation PR. Multiple issues identified related to security, bugs, and code quality.

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Dynamic Instrumentation PR

I reviewed the core implementation files in this PR. Below are findings related to bugs, security concerns, and code quality issues.

# TODO: Should refactor and simplify this method in the future and get rid of the disable.
# pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-many-return-statements
@classmethod
def from_api_config(cls, breakpoint_config: Dict[str, Any]) -> Optional["BreakpointConfiguration"]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (timezone inconsistency): datetime.fromtimestamp(expires_at_value) creates a naive datetime (no timezone info), while created_at parsing a few lines below correctly uses datetime.fromtimestamp(created_at_value, tz=timezone.utc). This inconsistency means:

  1. If expires_at is later compared to a timezone-aware datetime, Python will raise TypeError.
  2. On non-UTC servers, the timestamp will be interpreted in local time, causing breakpoints to expire at the wrong time.

Fix: expires_at = datetime.fromtimestamp(expires_at_value, tz=timezone.utc)

"""Background threads that poll the API for PROBE and BREAKPOINT configurations.

This class manages two independent polling threads:
- PROBE thread: Polls every 10 minutes (configurable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security concern (SSRF vector): The proxy_url is read from env var OTEL_AWS_DYNAMIC_INSTRUMENTATION_API_URL with no validation, then used directly for HTTP POST requests. While env vars are generally trusted, this URL is concatenated with paths and could potentially be pointed at internal metadata endpoints. Consider validating the URL scheme (only allow http/https) and optionally logging a warning when using a non-default URL.


Thread Safety:
- Uses a single shared lock (_config_lock) for PROBE/BREAKPOINT configuration operations
- Ensures atomic merge-then-apply to prevent partial state
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security (sensitive data in logs): This line logs the full raw API config response at DEBUG level. This could include ARNs, service names, and configuration details. In production environments where debug logging might be accidentally enabled, this creates an information disclosure risk. Consider logging only non-sensitive metadata or removing this log.

if original_size > self.max_collection_size:
cv.truncated = True
cv.size = original_size
return cv
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security concern (information disclosure via variable capture): The serializer captures arbitrary object attributes including __dict__. When DI is enabled, this could inadvertently capture and export sensitive data such as passwords, tokens, API keys, or PII stored in local variables or object fields. While the feature is opt-in, there is no built-in redaction/deny-list mechanism for sensitive field names.

Consider adding a configurable deny-list of field name patterns that are never captured, similar to how other APM tools redact sensitive data by default.

def increment_hit_count(self, breakpoint_key: str) -> bool:
"""
Increment hit count for a breakpoint and check for hit limit and rate limit.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance (hot-path linear scan): increment_hit_count is called on every breakpoint hit (the hot path) and iterates over ALL active functions while holding the RLock. With many instrumented functions, this O(n) linear scan under lock on every request will add measurable latency.

Suggestion: Add a direct lookup dict _breakpoint_key_to_state maintained alongside _active_functions for O(1) lookup.

if emitter and hasattr(emitter, "reset"):
emitter.reset()
set_snapshot_emitter(None)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (potential issue in double-fork): _after_fork_in_child calls initialize_debugger(). Since os.register_at_fork callbacks persist across forks, in a double-fork scenario (master -> worker -> sub-worker), the callback fires again. Consider adding a PID check: if os.getpid() == _initialized_pid: return to prevent redundant re-initialization.

self._init_failed = False
self._lock = threading.Lock()

def _ensure_initialized(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code quality (forward compat): The double-checked locking pattern checks self._event_logger outside the lock. In CPython this is safe due to the GIL, but on free-threaded builds (PEP 703, Python 3.13+) this could be a data race. Consider documenting the GIL dependency or using a single check inside the lock.

func: Function object (not used by sys.monitoring engine, but kept for interface consistency)
"""
if not self._initialized:
logger.warning("SysMonitoringEngine not initialized, cannot disable breakpoints")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code quality (id stability): id(code) is used as dict key but only guarantees uniqueness during the objects lifetime. After disable_breakpoints_for_function clears state and the code object is GCd, a new code object could reuse the same address. Consider documenting this invariant or holding a reference.

main_module = sys.modules.get("__main__")
if main_module is not None:
# Match by __spec__.name (most reliable)
spec = getattr(main_module, "__spec__", None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (overly broad Flask patching): The fallback name-based Flask patching matches ANY endpoint whose view function has the same __name__. If two different modules both define a function named index registered as Flask routes, this could incorrectly patch the wrong endpoint.

Consider also comparing __module__ to avoid false positive matches.

self._window_start_ns = now_ns
self._capture_count = 1
return True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (sentinel value conflict): The sentinel 0 for _window_start_ns conflicts with a legitimate now_ns=0 argument. If called explicitly with now_ns=0, the lazy init sets _window_start_ns=0, and the next real call will always roll the window. Consider using None as the uninitialized sentinel instead.

srprash added 2 commits June 4, 2026 13:34
…s reporter

Raise Dynamic Instrumentation unit-test coverage with focused tests that mock
the HTTP session, instrumentation engine, and snapshot emitter (no real network,
threads, or fork):

- test_debugger_client.py: fetch_configuration_by_type (200/404/400/5xx/timeout/
  bad-JSON/pagination), API URL + service/environment resolution, attribute-filter
  matching, config parsing, staleness, degraded-mode logging.
- test_instrumentation_manager.py: apply_configuration add/update/remove/unchanged
  with error isolation, hit-count + rate-limit + maxHits handling, grouping,
  change detection, status reporting, resource building.
- test_function_wrapper.py: function/method discovery and method-type detection,
  sync and async wrapper behavior driven via instrument_function (return passthrough,
  user exceptions re-raised, rate-limit/disabled skip), snapshot building, restore.
- test_status_reporter_logic.py: status queueing/batching, payload build + POST,
  start/stop lifecycle (thread mocked), error handling.
- Plus targeted tests for debugger config parsing, the OTLP snapshot emitter,
  the snapshot serializer object paths, and stack utilities.

Coverage of the debugger package rises from ~54% to ~79% on 3.12; all tests pass
on 3.11 and 3.12 (431 passed, 25 skipped).
…ve engines

Close the remaining Dynamic Instrumentation coverage gaps so the package meets
the distro's 95% line-coverage gate:

- Add unit tests for the previously-uncovered branches in the DI client
  (poller start/stop with threads mocked, service/environment fallbacks,
  HTTP-suppression import fallback), the instrumentation manager (resource
  build, grouping/change-detection/cleanup error paths), the function wrapper
  (wrapper error/edge branches, Flask-patch failures, capture helpers), the
  config data models (from_api_config validation branches), the debugger entry
  module (init/cleanup guards), and the snapshot serializer (timeout/error
  branches). Tests mock the HTTP session, engine, emitter, and threads — no real
  network, threads, or fork.

- Exclude the two line-breakpoint engines from the coverage gate in the
  coverage configs. They are mutually version-exclusive (bytecode on 3.9-3.11,
  sys.monitoring on 3.12+), so one is always dead code under the shared config
  and cannot meet the line threshold. Each is unit-tested on its native Python
  version and both are validated end-to-end by the DI contract tests.

- Mark the long-running daemon polling loops and the real engine-selection
  path with `# pragma: no cover`; these block on threads / construct real
  engines and are covered by the contract tests rather than unit tests.

Debugger-package coverage rises to ~99.6%; full debugger suite passes on
Python 3.11 (512 passed) and 3.12 (516 passed).
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed core source files for bugs, security issues, and code quality. Inline review comments follow on specific lines.

# Try as Unix timestamp (float/int) first
if isinstance(expires_at_value, (int, float)):
expires_at = datetime.fromtimestamp(expires_at_value)
# Try as ISO 8601 string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: datetime.fromtimestamp(expires_at_value) creates a naive datetime in the server's local timezone. A few lines below, created_at correctly uses datetime.fromtimestamp(created_at_value, tz=timezone.utc). This inconsistency means expires_at will be off by the UTC offset on non-UTC servers, and naive/aware datetime comparisons will raise TypeError.

Fix: expires_at = datetime.fromtimestamp(expires_at_value, tz=timezone.utc)

timestamp_ms = int(time.time() * 1000)

# Convert duration from nanoseconds to milliseconds per v1 spec
duration_ms = duration_ns / 1_000_000 if duration_ns else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (type mismatch): Snapshot.duration is annotated as Optional[int] (see _snapshot_models.py line 7: duration: Optional[int] = None), but this expression uses true division (/), which always returns a float in Python 3. This could cause unexpected behavior in downstream serializers or type-checked code.

Fix: use integer division duration_ms = duration_ns // 1_000_000 if duration_ns else None, or change the Snapshot type annotation to Optional[float] if sub-millisecond precision is desired.

# Find the breakpoint state
# TODO: optimize the breakpoint lookup using function name and breakpoint id or something.
# we shouldn't be iterating on functions and breakpoints for every hit and increment.
for bp_set in self._active_functions.values():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance: This linear scan over all active functions runs on every breakpoint hit (hot path). With many instrumented functions, this becomes O(n) per hit. Consider maintaining a direct Dict[str, BreakpointState] lookup (breakpoint_key -> state) alongside _active_functions for O(1) lookups. The TODO above acknowledges this.

stem = os.path.splitext(os.path.basename(main_file))[0]
if stem == module_name:
return main_module
return importlib.import_module(module_name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security note: This imports arbitrary modules based on names received from the polled API configuration. While this is inherent to the DI feature design, it means the API proxy endpoint is a trust boundary -- a compromised proxy could cause arbitrary module imports. The existing mitigations (opt-in gate, Lambda exclusion) are appropriate, but consider documenting this trust relationship explicitly in the security model.

disabled_entries = []
error_entries = []

for bp_set in self._manager._active_functions.values():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread safety: This iterates over self._manager._active_functions.values() while only holding self._lock (the reporter's own lock), not the manager's _lock. If a configuration poll thread calls apply_configuration concurrently, it could modify _active_functions (adding/removing entries) while this iteration is in progress, causing a RuntimeError: dictionary changed size during iteration. Consider either: (1) acquiring self._manager._lock here, or (2) snapshotting with list(self._manager._active_functions.values()) to iterate a copy.

try:
raw_config = response.json()

logger.debug("Raw Config Response from /ListInstrumentationConfigurations: %s", raw_config)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security (low risk): Logging the entire raw API response at DEBUG level could expose configuration details (e.g., ARNs, location hashes, service names) in log files if DEBUG logging is enabled in production. Consider truncating or summarizing the logged output, or only logging the config count and key identifiers.

"python-dateutil == 2.9.0.post0",
# Used by Dynamic Instrumentation's bytecode-injection engine for line-level instrumentation on
# Python 3.9-3.11 (3.12+ uses the built-in sys.monitoring instead, so the marker excludes it there).
# A core dependency so `pip install aws-opentelemetry-distro` gives a fully working feature with no extras.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bytecode dependency has no version pin. Other dependencies in this file use exact pins (e.g., python-dateutil == 2.9.0.post0). An unpinned dependency could introduce breaking changes on a future bytecode release. Consider pinning to a known-good version or at least a compatible-release constraint (e.g., bytecode ~= 0.16).

CI pins pylint==3.0.2 and runs flake8 before pylint:

- Move the long multi-check pylint disables off the `def` line (they pushed
  the line past 120 chars and tripped flake8 E501) to a disable comment on
  the following line.
- Remove `too-many-positional-arguments` (R0917) from disable comments: that
  check did not exist until pylint 3.3, so pinned pylint 3.0.2 rejects it as
  an unknown option value (W0012). It is not emitted by 3.0.2, so no disable
  is needed.
- Re-sort one test import with the pinned isort.

No behavior change; debugger suite still passes (516 passed, 25 skipped).
return self.instrumentation_type == "PROBE"

@property
def is_temporary(self) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (_data_models.py line 225): Naive datetime from fromtimestamp causes timezone-dependent expiration checks

datetime.fromtimestamp(expires_at_value) creates a naive datetime in the system local timezone, while isoparse(...) on line 229 may return a UTC-aware datetime. This inconsistency means expiration comparisons downstream will either raise TypeError (comparing aware vs naive) or silently produce wrong results.

For consistency with created_at parsing on line 240 (which correctly passes tz=timezone.utc), this should be:

expires_at = datetime.fromtimestamp(expires_at_value, tz=timezone.utc)

allowing automatic retry on next call.
"""
# Return cached value if we successfully found it before
if self._cached_environment:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: Full API response logged at DEBUG level may leak sensitive data

This logs the entire raw API response including LatestConfigurations which may contain breakpoint configurations with location details, ARNs, or other sensitive identifiers. If a user enables DEBUG logging in production (common for troubleshooting), this could expose internal application structure to log aggregation systems.

Consider logging only non-sensitive summary info (e.g., number of configs, Changed flag):

logger.debug("Response: Changed=%s, configs=%d", raw_config.get("Changed"), len(raw_config.get("LatestConfigurations", [])))

logger.error("Error in status report loop: %s", exception, exc_info=True)
logger.debug("Status reporter loop stopped")

def _pull_and_report_statuses(self, is_initial_report: bool = False):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition — iterating _active_functions without acquiring the manager's lock

This method iterates over self._manager._active_functions.values() and mutates state objects (state.hit_in_last_period = False) while only holding the reporter's own self._lock. The manager's _lock (RLock) is what protects _active_functions — iterating and mutating without it is a race condition with the polling threads that call apply_configuration (which adds/removes entries from the same dict).

This could lead to RuntimeError: dictionary changed size during iteration or inconsistent state reads. The loop should also acquire self._manager._lock.

thrown = exception
try:
thrown_caller_stack = traceback.extract_stack()
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug/Security: Name-based Flask view_functions patching could patch the wrong endpoint

The fallback from identity-based matching to name-based matching (matching by name) is unsafe. If two different view functions share the same name (e.g., decorated functions, or functions with the same name in different modules registered on the same app), this could incorrectly patch the wrong endpoint.

This could cause DI instrumentation to capture data from an unintended code path. Consider removing the name-based fallback, or at minimum also verifying the module attribute of the matched function.

liustve added 2 commits June 5, 2026 12:43
The name-only fallback for Flask view_functions matched by
__name__ alone, so two view functions named (e.g.) ``index`` from
different modules registered on the same app would all be patched
when DI targeted only one.

Pass the original function's __module__ through to the fallback and
require both name and module to match. functools.wraps (used by OTel's
Flask instrumentation, which is the reason the name fallback exists at
all) preserves __module__, so the OTel-wrapping path keeps working.
When the original has no __module__, fall back to name-only matching
to stay graceful.
_ensure_initialized() did a double-checked-lock fast path: read
self._event_logger / self._init_failed outside the lock, only acquire
the lock to do the actual init. Safe today because the GIL serializes
attribute reads in CPython, but it becomes a data race on free-threaded
Python (PEP 703, 3.13+) where the GIL is gone.

Emission is not a hot path — _ensure_initialized runs once per emitted
snapshot, not per captured value — so the cost of always acquiring the
lock is negligible. Drop the unsafe outer reads.
@liustve
Copy link
Copy Markdown
Contributor

liustve commented Jun 5, 2026

note:
for visibility, the unpinned bytecode dependency at pyproject.toml#L98 does have a near-term compatibility issue:

discussed internally going to handle this in a separate follow-up PR rather than rolling it into this one and leaving this comment to flag it here so the broader DI work doesn't merge with the gap unaddressed.

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code review completed - see inline comments for findings on bugs, security issues, and code quality.

self._service_name_override = service_name
self._cached_service_name: Optional[str] = None
self._cached_environment: Optional[str] = None
self.proxy_url = api_url or self._get_api_url()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: SSRF via unvalidated API URL

The API URL is taken directly from the environment variable OTEL_AWS_DYNAMIC_INSTRUMENTATION_API_URL without any validation or sanitization. A malicious environment variable could point to an internal service (e.g., http://169.254.169.254/latest/meta-data/ on AWS) causing Server-Side Request Forgery.

Consider:

  1. Validating the URL scheme is http or https
  2. Optionally restricting to known/expected domains or IP ranges
  3. At minimum, logging a warning when a non-default URL is used

try:
raw_config = response.json()

logger.debug("Raw Config Response from /ListInstrumentationConfigurations: %s", raw_config)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Server-controlled poll interval has no upper bound

The server response's SyncInterval is applied directly to the client's poll interval with only a > 0 check. A malicious or buggy backend could set this to an extremely low value (e.g., 1 second), causing the client to hammer the API and consume excessive resources, or to an extremely high value, effectively disabling polling.

Consider clamping the value to a reasonable range, e.g.:

MIN_POLL_INTERVAL = 10   # seconds
MAX_POLL_INTERVAL = 3600 # 1 hour
next_sync_interval = max(MIN_POLL_INTERVAL, min(MAX_POLL_INTERVAL, next_sync_interval))

if response.status_code == 200:
try:
raw_config = response.json()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: Sensitive data in debug logs

The full raw API response is logged at DEBUG level. In production environments where debug logging might be enabled for troubleshooting, this could expose sensitive configuration data (e.g., ARNs, internal endpoints, customer-specific instrumentation details) to log aggregation systems.

Consider logging only non-sensitive metadata (e.g., number of configs returned, response status) rather than the full response body.

with self._lock:
if func_id not in self._injection_states:
self._injection_states[func_id] = InjectionState(
original_code=code, function_ref=func, function_key=function_key
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: _breakpoint_handler bound as instance method but injected as plain function

On line 157, self._breakpoint_handler is stored into the function's __globals__ dict. Because it's a bound method reference, this works correctly - the self parameter is already captured in the bound method object.

However, there's a subtle lifetime/GC issue: the bound method holds a strong reference to the BytecodeInjectionEngine instance. If the engine is replaced (e.g., after fork and re-initialization), instrumented functions from the old engine will keep the old engine instance alive via this reference. Consider using a weakref or a module-level dispatch function that looks up the current engine.

common path (already initialized).
"""
with self._lock:
if self._event_logger is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition in double-checked locking pattern

The _ensure_initialized method uses double-checked locking but the outer fast-path check (if self._event_logger is not None: return True) is missing. Looking at emit_snapshot, it always calls _ensure_initialized() which acquires the lock every time.

While the current implementation is technically safe because it always locks, the docstring says 'double-checked locking' suggesting the intent was to have a lock-free fast path. If you later add the fast-path optimization, note that Python's memory model (GIL) makes this safe for CPython, but on free-threaded builds (PEP 703), you'd need _event_logger to be set atomically after all initialization is complete. Currently the assignment order is correct (logger_provider -> event_logger last), but this is fragile.

Consider adding a simple boolean flag set last, or document that this relies on CPython's GIL guarantee.

# Check descriptor type
if isinstance(raw_method, staticmethod):
return MethodType.STATIC
if isinstance(raw_method, classmethod):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: No secret/sensitive data redaction in variable capture

The function wrapper captures arguments and return values of instrumented functions without any filtering for sensitive data. When DI is enabled and configured to capture arguments, values like passwords, API keys, tokens, PII, etc., will be serialized and sent via OTLP logs to the configured endpoint.

Consider:

  1. Adding a configurable deny-list of parameter names to skip (e.g., password, secret, token, key, auth, credential)
  2. Redacting values that match common secret patterns (long base64 strings, JWT tokens, etc.)
  3. At minimum, documenting this risk prominently so users understand that enabling DI with argument capture may expose secrets in their observability pipeline

self._failed_configs: Dict[str, str] = {}

# Thread safety
self._lock = RLock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Potential unbounded memory growth in _failed_configs

The _failed_configs dict stores location_hash -> error_cause for every failed configuration, and entries are only cleaned up when they disappear from the incoming config list (line ~753). However, if the API keeps sending the same broken configs indefinitely (e.g., a misconfiguration that's never fixed), these entries accumulate forever.

More importantly, _preserved_states (line 59) preserves breakpoint states across configuration updates but _cleanup_orphaned_states only cleans states that aren't in active functions. If a function is repeatedly added and removed (flapping config), preserved states can accumulate.

Consider adding a TTL or max-size bound to _failed_configs, and auditing whether _preserved_states can grow unbounded under pathological config update patterns.

line_number = 0

if not isinstance(line_number, int) or line_number < 0:
logger.warning(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Quality: Dead code - unreachable validation after safe_int

The check if not isinstance(line_number, int) or line_number < 0 on line 371 (diff line ~648) is unreachable because safe_int() (called on line 361/diff ~638) always returns an int (either the parsed value or the default 0). The isinstance check will always be True, and line_number < 0 can never happen because safe_int returns the default (0) on failure rather than a negative value.

This is dead code that gives a false sense of additional validation. Consider removing it or replacing with a comment noting the invariant.

"""
Increment hit count for a breakpoint and check for hit limit and rate limit.

This method is called by the instrumentation engine when a breakpoint is hit.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: increment_hit_count iterates all active functions on every breakpoint hit

The increment_hit_count method (called on the hot path for every breakpoint hit) iterates through ALL active functions and their states to find the matching breakpoint_key. With many instrumented functions, this is O(N*M) where N = number of functions and M = average breakpoints per function. This is especially problematic because this runs inside the manager's RLock, blocking all other configuration operations.

The TODO in the code acknowledges this. Consider maintaining a direct lookup dict breakpoint_key -> (bp_set, state) that is updated when functions are added/removed. This would make the hot path O(1) and reduce lock contention.

try:
# Use provided tracer provider or get the global one
if tracer_provider is None and get_tracer_provider is not None:
tracer_provider = get_tracer_provider()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Recursive re-initialization in post-fork handler

The _after_fork_in_child callback calls initialize_debugger() which calls _register_fork_handler() again. While the idempotency guard (_fork_handler_registered) prevents double-registration, there's a subtlety: _fork_handler_registered is a module-level global that survives fork (it's True in the child). This means the fork handler registration is a no-op in children, which is correct.

However, if the child process itself forks (e.g., a gunicorn worker spawning sub-processes), the handler fires in the grandchild but _fork_handler_registered is still True from the original parent, so no handler gets registered for the grandchild's children. This is likely fine for typical deployments but worth documenting.

type_name = type(value).__name__
return CapturedValue(type=type_name, not_captured_reason="timeout")

def _serialize_value( # pylint: disable=too-many-return-statements
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: Object serialization via __dict__ may leak internal state

The _serialize_object method accesses value.__dict__ to enumerate fields for serialization. This means if a line-level breakpoint captures a local variable that is (for example) an HTTP request object, database connection, or security context, its internal state including credentials, session tokens, connection strings, etc., will be serialized and exported.

While the depth/field limits help bound the exposure, they don't prevent the most sensitive fields from being in the first N fields captured. The capture_locals filter only controls which variables to capture, not which fields within an object to redact.

Consider adding object-type-level redaction (e.g., skip objects whose type matches known sensitive patterns like *Session*, *Credential*, *Auth*).

Black at the project line-length of 120 collapses the rate-limited
debug call introduced in ed30cb1 onto one line. Apply the formatting
to keep `tox -e lint` green.
Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security & Code Quality Review

I've reviewed this PR for bugs, security issues, and code quality. The feature is well-structured with appropriate opt-in guards and error isolation. Below I've posted inline comments on specific issues found. Here's a summary of the most critical ones:

Security Concerns (High)

  1. Arbitrary module import from remote API_resolve_module() calls importlib.import_module() with a module name sourced from API responses. If the control plane is compromised or traffic is intercepted (default endpoint is HTTP), this enables code execution.
  2. Default HTTP endpoint — The default API URL is http://localhost:2000 with no TLS enforcement or validation, making MITM feasible in non-localhost deployments.

Bugs / Race Conditions (Medium)

  1. _status_reporter.py iterates manager's _active_functions without holding the manager's lock — can raise RuntimeError: dictionary changed size during iteration.
  2. _snapshot_otlp_emitter.py reset() is not thread-safe — sets _event_logger = None without the lock, racing with concurrent emit_snapshot calls.
  3. Missing double-checked locking in _ensure_initialized() — always acquires the lock even when already initialized, causing unnecessary contention on the hot path.

DoS / Resource Exhaustion (Medium)

  1. Unbounded _reported_configs set — grows without limit as configs rotate over time.
  2. _failed_configs dict has no size cap — a malicious API flooding unique failing config IDs can grow this without bound.
  3. list(value) materializes entire sets before truncation — should use itertools.islice.
  4. No cap on number of active instrumented functions — API can instruct arbitrarily many instrumentations.

Dependency Issue

  1. bytecode dependency has no version constraint — a breaking release could silently break the bytecode injection engine.

if stem == module_name:
return main_module
return importlib.import_module(module_name)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: Arbitrary module import from remote API (High)

module_name originates from the API response (BreakpointConfiguration.module) and flows here without any validation or allowlisting. importlib.import_module() executes top-level module code on import, so a compromised or MITMed control plane could achieve remote code execution by specifying a malicious module name.

Consider:

  1. Restricting to modules already in sys.modules (i.e., only instrument already-loaded code).
  2. Validating module names against a pattern allowlist (e.g., no leading underscores, no relative imports).
  3. Requiring HTTPS for the API endpoint to prevent MITM injection of malicious configs.

error_entries = []

for bp_set in self._manager._active_functions.values():
for _, state in bp_set.states.items():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition — iterating manager's internal dict without its lock (Medium)

This iterates self._manager._active_functions.values() while only holding self._lock (the reporter's own lock). The manager's _active_functions dict can be concurrently modified by apply_configuration() (which holds the manager's self._lock, a different lock). This can raise RuntimeError: dictionary changed size during iteration.

Fix: Either expose a thread-safe snapshot method on the manager (e.g., get_active_functions_snapshot() that copies under its own lock), or acquire self._manager._lock here before iterating.

self._lock = threading.Lock()

def _ensure_initialized(self):
"""Lazily initialize the LoggerProvider and EventLogger on first use.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance: Missing fast-path check in double-checked locking (Medium)

The docstring says "double-checked locking" but the implementation always acquires the lock. The classic double-checked locking pattern should check if self._event_logger is not None: return True before acquiring the lock to avoid contention on the hot path (every snapshot emission after initialization):

def _ensure_initialized(self):
    if self._event_logger is not None:  # fast path - no lock
        return True
    with self._lock:
        if self._event_logger is not None:  # re-check under lock
            return True
        ...

This also makes the reset() method unsafe — it sets _event_logger = None without holding the lock, which could race with concurrent reads in the fast path. reset() should acquire the lock too.


# Track reported configurations to avoid duplicate reports
self._reported_configs: Set[str] = set()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak: Unbounded _reported_configs set (Medium)

This set grows monotonically — entries are added whenever a config transitions to READY or ERROR but are never removed. In a long-running process with config rotation (old configs removed, new ones added), this set will grow without bound, constituting a slow memory leak.

Consider pruning entries that correspond to configs no longer in _active_functions, or switching to an LRU-bounded structure.

# TODO: optimize the breakpoint lookup using function name and breakpoint id or something.
# we shouldn't be iterating on functions and breakpoints for every hit and increment.
for bp_set in self._active_functions.values():
if breakpoint_key in bp_set.states:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance: O(n) iteration on every breakpoint hit (Medium)

increment_hit_count is called on every breakpoint hit (hot path) and iterates through ALL active functions and their breakpoints to find the matching one. With many active instrumentations, this causes O(n) lock-held time on every single hit.

The TODO comment acknowledges this. Consider maintaining a direct Dict[str, BreakpointState] lookup by breakpoint_key for O(1) access, updated when functions are added/removed.

# Prevents retrying the same broken config on every poll cycle.
# Entries are cleared when the config is removed from the incoming list.
self._failed_configs: Dict[str, str] = {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoS: Unbounded _failed_configs dict (Medium)

While stale entries are cleaned when their config IDs disappear from the incoming API response, there is no maximum size limit on this dict. If an attacker controls the API and continuously sends new unique config IDs that all fail (different IDs each poll cycle, so stale cleanup doesn't help), this dict grows without bound.

Consider adding a max-size eviction (e.g., drop oldest entries when exceeding a threshold like 1000).

"""Serialize list/tuple/set/frozenset as elements."""
type_name = type(value).__name__
items = list(value)
original_size = len(items)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoS: Full materialization of sets/frozensets before truncation (Low-Medium)

list(value) materializes the entire collection into memory before the max_collection_size limit is applied. For a set with millions of elements, this creates a full copy before only taking the first N items.

Use itertools.islice to lazily consume only what's needed:

import itertools
items = list(itertools.islice(value, self.max_collection_size))
original_size = len(value)  # len() is O(1) for sets



def initialize_global_manager(tracer_provider=None, service: str = "", environment: str = "") -> InstrumentationManager:
"""Initialize the global InstrumentationManager instance.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: TOCTOU race in initialize_global_manager (Low)

The check-then-assign pattern (if _global_manager_instance is not None ... _global_manager_instance = ...) is not atomic. If two threads call this concurrently during startup, both could pass the None check and create separate instances, with one silently discarded.

This is low severity because it's typically called once during distro _configure(), but the post-fork handler could race with application code that somehow triggers re-initialization. Consider using a module-level lock.

config_data_str = item.get("ConfigurationData")
if config_data_str:
config_item["ConfigurationData"] = json.loads(config_data_str)
# Ensure AttributeFilters is a list or None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Security: No response size limit on API responses (Medium)

The response.json() call deserializes the entire response body without any size check. A single page response could contain an arbitrarily large LatestConfigurations array (the max_pages=3 limit doesn't bound per-page size). A compromised or malicious API endpoint could return a multi-GB JSON response, causing OOM.

Consider:

  1. Setting stream=True on the request and checking Content-Length before reading.
  2. Or using a streaming JSON parser with a byte limit.
  3. At minimum, cap the number of configs per page (e.g., reject responses with more than 100 items).

# Used by Dynamic Instrumentation's bytecode-injection engine for line-level instrumentation on
# Python 3.9-3.11 (3.12+ uses the built-in sys.monitoring instead, so the marker excludes it there).
# A core dependency so `pip install aws-opentelemetry-distro` gives a fully working feature with no extras.
"bytecode; python_version >= '3.9' and python_version < '3.12'",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dependency: bytecode has no version constraint (Low-Medium)

Unlike python-dateutil which is pinned to a specific version, bytecode has no version bounds. The bytecode injection engine relies on specific internal APIs of this library (e.g., Instr, Bytecode, the tuple format for LOAD_GLOBAL in 3.11). A breaking release could silently produce incorrect bytecode, leading to hard-to-debug crashes in user applications.

Consider pinning to a compatible range, e.g.:

"bytecode >= 0.15, < 1.0; python_version >= '3.9' and python_version < '3.12'",

return {"Changed": False, "LatestConfigurations": []}
elif response.status_code >= 500:
logger.error("Server error (%d): %s", response.status_code, response.text)
return None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Info leak: Logging full error response body (Low)

response.text from a server error may contain internal server details (stack traces, internal paths, debugging info). Logging it at ERROR level means it could end up in production log files accessible to broader audiences.

Consider truncating: response.text[:500] or logging at DEBUG level instead.

"""
# Active instrumentation (function_key -> FunctionBreakpointSet)
self._active_functions: Dict[str, FunctionBreakpointSet] = {}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoS: No upper bound on number of active instrumentations (Medium)

_active_functions has no maximum size limit. A malicious or misconfigured API could return hundreds or thousands of configurations targeting different functions. Each instrumentation allocates wrapper closures, state objects, and (for the bytecode engine) modified code objects. This could exhaust memory or cause severe performance degradation.

Consider adding a configurable cap (e.g., MAX_ACTIVE_INSTRUMENTATIONS = 100) and rejecting configurations beyond that limit with an appropriate error status report.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants