From f4449b5faf778b9eece7ebba37583b0659561fd3 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Wed, 18 Mar 2026 14:40:26 -0700 Subject: [PATCH 01/17] fix: defer Django middleware injection when settings are not yet configured --- .../instrumentation/django/instrumentation.py | 49 +++++++++++++++++-- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/drift/instrumentation/django/instrumentation.py b/drift/instrumentation/django/instrumentation.py index e7c7f87..911f7aa 100644 --- a/drift/instrumentation/django/instrumentation.py +++ b/drift/instrumentation/django/instrumentation.py @@ -49,23 +49,34 @@ def _resolve_http_transforms( @override def patch(self, module: ModuleType) -> None: """Patch Django by injecting middleware.""" + if not self._try_inject_middleware(): + # Settings not configured yet — defer injection until django.setup() runs + self._defer_middleware_injection() + + def _try_inject_middleware(self) -> bool: + """Attempt to inject DriftMiddleware into Django settings. + + Returns: + True if middleware was injected (or already present), False if + settings are not yet configured and injection should be deferred. + """ global _middleware_injected if _middleware_injected: logger.debug("Middleware already injected, skipping") - return + return True try: from django.conf import settings if not settings.configured: - logger.warning("Django settings not configured, cannot inject middleware") - return + logger.debug("Django settings not configured yet, will defer middleware injection") + return False middleware_setting = self._get_middleware_setting(settings) if not middleware_setting: logger.warning("Could not find middleware setting, cannot inject") - return + return True # Don't retry — this won't change current_middleware = list(getattr(settings, middleware_setting, [])) @@ -73,7 +84,7 @@ def patch(self, module: ModuleType) -> None: if middleware_path in current_middleware: logger.debug("DriftMiddleware already in settings, skipping injection") _middleware_injected = True - return + return True # Insert at position 0 to capture all requests current_middleware.insert(0, middleware_path) @@ -89,11 +100,39 @@ def patch(self, module: ModuleType) -> None: self._force_database_reconnect() print("Django instrumentation applied") + return True except ImportError as e: logger.warning(f"Could not import Django settings: {e}") + return True # Don't retry on import errors except Exception as e: logger.error(f"Failed to inject middleware: {e}", exc_info=True) + return True # Don't retry on unexpected errors + + def _defer_middleware_injection(self) -> None: + """Monkey-patch django.setup() to inject middleware after settings are configured. + + When TuskDrift.initialize() runs before DJANGO_SETTINGS_MODULE is set + (common in manage.py where the SDK init is the first import), Django + settings aren't available yet. This defers injection to run after + django.setup() completes, which is when settings are guaranteed to be + configured. + """ + import django + + original_setup = django.setup + + def patched_setup(*args, **kwargs): + # Restore original setup first to avoid re-entrance + django.setup = original_setup + # Run the original django.setup() + result = original_setup(*args, **kwargs) + # Now settings are configured — inject middleware + self._try_inject_middleware() + return result + + django.setup = patched_setup + logger.debug("Deferred middleware injection to django.setup()") def _force_database_reconnect(self) -> None: """Force Django to close and recreate database connections.""" From b694610e88d9eacb00377a00b31b395fb6831fc8 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Wed, 18 Mar 2026 15:52:35 -0700 Subject: [PATCH 02/17] auto mark app as ready --- drift/core/drift_sdk.py | 2 -- drift/instrumentation/django/middleware.py | 3 +++ drift/instrumentation/fastapi/instrumentation.py | 3 +++ drift/instrumentation/wsgi/handler.py | 3 +++ 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 2f83e6b..cf75f54 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -646,8 +646,6 @@ def mark_app_as_ready(self) -> None: if self._td_span_processor: self._td_span_processor.update_app_ready(True) - logger.debug("Application marked as ready") - if self.mode == TuskDriftMode.REPLAY: logger.debug("Replay mode active - ready to serve mocked responses") elif self.mode == TuskDriftMode.RECORD: diff --git a/drift/instrumentation/django/middleware.py b/drift/instrumentation/django/middleware.py index 2e571c6..670679d 100644 --- a/drift/instrumentation/django/middleware.py +++ b/drift/instrumentation/django/middleware.py @@ -66,6 +66,9 @@ def __call__(self, request: HttpRequest) -> HttpResponse: if sdk.mode == TuskDriftMode.DISABLED: return self.get_response(request) + if not sdk.app_ready: + sdk.mark_app_as_ready() + # REPLAY mode - handle trace ID extraction and context setup if sdk.mode == TuskDriftMode.REPLAY: return self._handle_replay_request(request, sdk) diff --git a/drift/instrumentation/fastapi/instrumentation.py b/drift/instrumentation/fastapi/instrumentation.py index eaa125f..d280544 100644 --- a/drift/instrumentation/fastapi/instrumentation.py +++ b/drift/instrumentation/fastapi/instrumentation.py @@ -398,6 +398,9 @@ async def _handle_request( if sdk.mode == TuskDriftMode.DISABLED: return await original_call(app, scope, receive, send) + if not sdk.app_ready: + sdk.mark_app_as_ready() + # REPLAY mode - handle trace ID extraction and context setup if sdk.mode == TuskDriftMode.REPLAY: return await _handle_replay_request( diff --git a/drift/instrumentation/wsgi/handler.py b/drift/instrumentation/wsgi/handler.py index 23a682a..e5f4aed 100644 --- a/drift/instrumentation/wsgi/handler.py +++ b/drift/instrumentation/wsgi/handler.py @@ -92,6 +92,9 @@ def original_call() -> Iterable[bytes]: if sdk.mode == TuskDriftMode.DISABLED: return original_call() + if not sdk.app_ready: + sdk.mark_app_as_ready() + # REPLAY mode: requires trace ID header if sdk.mode == TuskDriftMode.REPLAY: return _handle_replay_request( From 38d378fbb1935cab7f3a373a14b4e4e50cd337fc Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 10:42:53 -0700 Subject: [PATCH 03/17] Skip non-HTTP schemes in urllib instrumentation --- .../instrumentation/urllib/instrumentation.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py index 66bbccf..1a9d63f 100644 --- a/drift/instrumentation/urllib/instrumentation.py +++ b/drift/instrumentation/urllib/instrumentation.py @@ -338,15 +338,18 @@ def patched_open(opener_self, fullurl, data=None, timeout=_GLOBAL_DEFAULT_TIMEOU if sdk.mode == TuskDriftMode.DISABLED: return original_open(opener_self, fullurl, data, timeout) - # Set calling_library_context to suppress socket instrumentation warnings - # context_token = calling_library_context.set("urllib") - try: - # Extract URL for default response handler - if isinstance(fullurl, str): - url = fullurl - else: - url = fullurl.full_url + # Extract URL early so we can check the scheme + if isinstance(fullurl, str): + url = fullurl + else: + url = fullurl.full_url + # Only instrument HTTP/HTTPS requests; pass through file://, data://, ftp://, etc. + parsed = urlparse(url) + if parsed.scheme not in ("http", "https"): + return original_open(opener_self, fullurl, data, timeout) + + try: def original_call(): return original_open(opener_self, fullurl, data, timeout) From dcbdd10a59caceaa7a6d474bd57f973eeca473e5 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 15:00:52 -0700 Subject: [PATCH 04/17] Fix urllib3 instrumentation for botocore/boto3 --- .../urllib3/e2e-tests/.tusk/config.yaml | 1 - .../urllib3/e2e-tests/src/app.py | 58 ++++--- .../urllib3/e2e-tests/src/test_requests.py | 22 +-- .../urllib3/instrumentation.py | 143 +++++++++++++----- drift/instrumentation/urllib3/notes.md | 86 +++-------- 5 files changed, 180 insertions(+), 130 deletions(-) diff --git a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml index 9f66127..29df570 100644 --- a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml +++ b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml @@ -25,4 +25,3 @@ recording: replay: enable_telemetry: false - diff --git a/drift/instrumentation/urllib3/e2e-tests/src/app.py b/drift/instrumentation/urllib3/e2e-tests/src/app.py index 041078e..765c043 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/app.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/app.py @@ -1,6 +1,7 @@ """Flask test app for e2e tests - urllib3 instrumentation testing.""" import json +import zlib import urllib3 from flask import Flask, jsonify, request @@ -438,15 +439,14 @@ def test_requests_lib(): # ============================================================================= -@app.route("/test/bug/preload-content-false", methods=["GET"]) -def test_bug_preload_content_false(): - """CONFIRMED BUG: preload_content=False parameter breaks response reading. +@app.route("/test/preload-content-false-read", methods=["GET"]) +def test_preload_content_false_read(): + """Test preload_content=False with manual read(). - When preload_content=False, the response body is not preloaded into memory. - The instrumentation reads .data in _finalize_span which consumes the body - before the application can read it. - - Root cause: instrumentation.py line 839 accesses response.data unconditionally + This is the pattern botocore/boto3 uses: request with preload_content=False, + then call response.read() to get the body. The instrumentation must buffer + the body in _fp (BytesIO) during recording so both the span capture and the + caller's read() work correctly. """ try: response = http.request( @@ -454,7 +454,6 @@ def test_bug_preload_content_false(): "https://jsonplaceholder.typicode.com/posts/21", preload_content=False, ) - # Manually read the data after the response data_bytes = response.read() response.release_conn() data = json.loads(data_bytes.decode("utf-8")) @@ -463,14 +462,40 @@ def test_bug_preload_content_false(): return jsonify({"error": str(e)}), 500 -@app.route("/test/bug/streaming-response", methods=["GET"]) -def test_bug_streaming_response(): - """CONFIRMED BUG: Streaming response body is consumed before iteration. +@app.route("/test/preload-content-false-crc32", methods=["GET"]) +def test_preload_content_false_crc32(): + """Test preload_content=False with CRC32 checksum validation. + + Mimics botocore's DynamoDB flow: read the body via read(), then validate + the CRC32 checksum against a header value. This failed before the fix + because the mock response's BytesIO was exhausted by preload_content=True, + causing read() to return b"" and CRC32 to be 0. + """ + try: + response = http.request( + "GET", + "https://jsonplaceholder.typicode.com/posts/22", + preload_content=False, + ) + body = response.read() + response.release_conn() + + if not body: + return jsonify({"error": "Empty body from read()"}), 500 + + actual_crc32 = zlib.crc32(body) & 0xFFFFFFFF + data = json.loads(body.decode("utf-8")) + return jsonify({**data, "crc32": actual_crc32, "body_length": len(body)}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + - When using response.stream() to iterate over chunks, the instrumentation - has already consumed the body by accessing response.data in _finalize_span. +@app.route("/test/preload-content-false-stream", methods=["GET"]) +def test_preload_content_false_stream(): + """Test preload_content=False with chunked stream() reading. - Root cause: Same as preload-content-false - instrumentation.py line 839 + The instrumentation buffers the body into a BytesIO, so subsequent + stream() calls read from that BytesIO in chunks as normal. """ try: response = http.request( @@ -479,9 +504,8 @@ def test_bug_streaming_response(): preload_content=False, ) - # Try to read response in chunks using stream() chunks = [] - for chunk in response.stream(32): # Read 32 bytes at a time + for chunk in response.stream(32): chunks.append(chunk) response.release_conn() diff --git a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py index dbb8f37..94f0e5d 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py @@ -96,17 +96,17 @@ make_request("GET", "/test/requests-lib") # ========================================================================== - # Note: Bug detection tests for preload_content=False and streaming responses - # are NOT included in e2e tests because these patterns are incompatible with - # replay mode - we can't capture the response body without consuming the stream. - # - # The instrumentation now correctly handles these patterns by NOT capturing - # the response body, which allows the application to read/stream normally. - # However, this means there's no body to replay in REPLAY mode. - # - # To verify the fix works, run the manual test script in RECORD mode: - # curl http://localhost:8000/test/bug/preload-content-false - # curl http://localhost:8000/test/bug/streaming-response + # preload_content=False Tests (botocore/boto3 pattern) # ========================================================================== + print("\n--- preload_content=False Tests ---\n") + + # read() after preload_content=False (core botocore pattern) + make_request("GET", "/test/preload-content-false-read") + + # read() + CRC32 checksum validation (DynamoDB pattern) + make_request("GET", "/test/preload-content-false-crc32") + + # stream() after preload_content=False + make_request("GET", "/test/preload-content-false-stream") print_request_summary() diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index 927711e..8f78815 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import io import json import logging from typing import Any @@ -60,6 +61,21 @@ def __init__(self, message: str, method: str, url: str): } +def _normalize_headers(headers: dict) -> dict[str, str]: + """Ensure all header keys and values are str, not bytes. + + Some HTTP libraries (e.g. botocore/urllib3 combinations) may produce + headers with bytes keys or values. This normalizes them for JSON + serialization. + """ + result = {} + for k, v in headers.items(): + key = k.decode("utf-8", errors="replace") if isinstance(k, bytes) else str(k) + val = v.decode("utf-8", errors="replace") if isinstance(v, bytes) else str(v) + result[key] = val + return result + + class Urllib3Instrumentation(InstrumentationBase): """Instrumentation for the urllib3 HTTP client library. @@ -587,21 +603,29 @@ def _encode_body_to_base64(self, body_data: Any) -> tuple[str | None, int]: return base64_body, len(body_bytes) + # Spans exceeding 1 MB are blocked at export time (MAX_SPAN_SIZE_BYTES), + # so there's no value in reading more than that from the socket. + _MAX_CAPTURE_BYTES = 1 * 1024 * 1024 + def _get_response_body_safely(self, response: Any) -> bytes | None: - """Get response body without consuming the stream for non-preloaded responses. + """Get response body, buffering it in-place when preload_content=False. + + When preload_content=True (the default), urllib3 already read the body + into ``_body`` during construction — we just return it. - urllib3's .data property will read from the underlying stream if the response - was created with preload_content=False. This breaks applications that: - - Use preload_content=False and call response.read() manually - - Use response.stream() to iterate over chunks - - Use any streaming pattern to process large responses + When preload_content=False (used by botocore/boto3), the body is still + on the socket. We read the raw bytes from ``_fp``, swap ``_fp`` with a + BytesIO so the caller's subsequent ``read()`` pipeline (content + decoding, connection release, CRC32 validation, etc.) processes the + exact same bytes as if we were never here. - This method checks whether it's safe to access the body without consuming - the stream, and returns None if the body cannot be safely captured. + We must NOT call ``response.read()`` because urllib3's ``read()`` + consumes ``_fp`` and does **not** fall back to ``_body`` on subsequent + calls — only the ``.data`` property does. Botocore uses ``read()``, + so a second call would return ``b""`` and break checksum validation. - IMPORTANT: We must NOT use hasattr(response, "data") because in Python, - hasattr() calls getattr() internally, which would trigger the .data - property getter and consume the stream! + A size guard (``_MAX_CAPTURE_BYTES``) prevents us from pulling a + multi-GB streaming download into memory. Args: response: urllib3 HTTPResponse object @@ -609,37 +633,70 @@ def _get_response_body_safely(self, response: Any) -> bytes | None: Returns: Response body as bytes, or None if body cannot be safely captured """ - # Check if this is a urllib3 HTTPResponse by checking for _body attribute - # Note: We use getattr with a sentinel to avoid triggering any property getters _sentinel = object() body = getattr(response, "_body", _sentinel) if body is _sentinel: - # Not a urllib3 HTTPResponse or doesn't have _body return b"" - # Check if body was already preloaded/cached + # preload_content=True path — body was already read during construction if body is not None: - # Body was already read and cached, safe to return - # Ensure it's bytes (urllib3's _body should always be bytes when set) return body if isinstance(body, bytes) else b"" - # Check if the stream was already fully consumed (fp is None or closed) fp = getattr(response, "_fp", None) if fp is None: - # Stream was consumed, body should be in _body (but it might be None) return b"" - # Check if the file pointer is closed if hasattr(fp, "closed") and fp.closed: return b"" - # At this point, the stream is still open and body hasn't been read - # This means preload_content=False was used (otherwise the body would - # have been read during __init__) - # - # We skip capturing the body to avoid breaking the application - return None + # preload_content=False — stream is open, body hasn't been read yet. + # Check Content-Length to avoid buffering very large responses. + content_length = self._get_content_length(response) + if content_length is not None and content_length > self._MAX_CAPTURE_BYTES: + logger.debug( + "Skipping response body capture (Content-Length %d exceeds %d)", + content_length, + self._MAX_CAPTURE_BYTES, + ) + return None + + try: + # fp.read() consumes the original socket stream — it can't be read + # again. We replace _fp with a new BytesIO (position at byte 0) + # so the caller's subsequent read() gets the full body. + raw_data = fp.read() + if isinstance(raw_data, bytes) and len(raw_data) > self._MAX_CAPTURE_BYTES: + # Streamed response without Content-Length that turned out large. + # Too late to un-read, but we already have the data in memory — + # put it back for the caller but don't record it. + response._fp = io.BytesIO(raw_data) + return None + response._fp = io.BytesIO(raw_data) + return raw_data if isinstance(raw_data, bytes) else b"" + except Exception: + logger.debug( + "Failed to buffer response body for instrumentation, " + "response body will not be captured for replay", + exc_info=True, + ) + return None + + @staticmethod + def _get_content_length(response: Any) -> int | None: + """Extract Content-Length from response headers, if present.""" + headers = getattr(response, "headers", None) + if not headers: + return None + val = None + if hasattr(headers, "get"): + val = headers.get("content-length") or headers.get("Content-Length") + if val is None: + return None + try: + return int(val) + except (ValueError, TypeError): + return None def _get_decoded_type_from_content_type(self, content_type: str | None) -> DecodedType | None: """Determine decoded type from Content-Type header. Extracts @@ -655,6 +712,9 @@ def _get_decoded_type_from_content_type(self, content_type: str | None) -> Decod if not content_type: return None + if isinstance(content_type, bytes): + content_type = content_type.decode("utf-8", errors="replace") + main_type = content_type.lower().split(";")[0].strip() CONTENT_TYPE_MAP = { @@ -674,7 +734,10 @@ def _get_content_type_header(self, headers: dict | None) -> str | None: if not headers: return None for key, value in headers.items(): - if key.lower() == "content-type": + k = key.decode("utf-8", errors="replace") if isinstance(key, bytes) else key + if k.lower() == "content-type": + if isinstance(value, bytes): + return value.decode("utf-8", errors="replace") return value return None @@ -815,17 +878,24 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], final_url = mock_data.get("finalUrl") or url + headers["Content-Length"] = str(len(content)) + + # preload_content must be False so the BytesIO stays unread in _fp. + # urllib3's read() always reads from _fp — it does NOT check _body. + # With preload_content=True the constructor would exhaust the BytesIO, + # and callers like botocore that use read() would get b"". + + # Callers that use .data instead (e.g. requests library) are also fine: + # .data calls read(cache_content=True) when _body is None, which reads + # from the fresh BytesIO and caches the result in _body. response = urllib3_module.HTTPResponse( body=BytesIO(content), headers=headers, status=status_code, - preload_content=True, + preload_content=False, request_url=final_url, ) - # Read the content to make it available via response.data - response.read() - logger.debug(f"Created mock urllib3 response: {status_code} for {url}") return response @@ -860,7 +930,7 @@ def _finalize_span( if parsed_url.query: params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} - headers_dict = dict(headers) if headers else {} + headers_dict = _normalize_headers(dict(headers)) if headers else {} body_base64 = None body_size = 0 @@ -898,7 +968,7 @@ def _finalize_span( } status = SpanStatus(code=StatusCode.ERROR, message=str(error)) elif response: - response_headers = dict(response.headers) if hasattr(response, "headers") else {} + response_headers = _normalize_headers(dict(response.headers)) if hasattr(response, "headers") else {} response_body_size = 0 try: @@ -909,13 +979,8 @@ def _finalize_span( if response_bytes is not None: response_body_base64, response_body_size = self._encode_body_to_base64(response_bytes) else: - # Response body not captured (likely preload_content=False or streaming) response_body_base64 = None response_body_size = 0 - logger.warning( - f"Response body not captured for {method} {url} - request used " - f"preload_content=False or streaming. Replay may return an empty body." - ) except Exception: response_body_base64 = None response_body_size = 0 @@ -1023,5 +1088,5 @@ def _finalize_span( span.set_status(Status(OTelStatusCode.OK)) except Exception as e: - logger.error(f"Error finalizing span for {method} {url}: {e}") + logger.error(f"Error finalizing span for {method} {url}: {e}", exc_info=True) span.set_status(Status(OTelStatusCode.ERROR, str(e))) diff --git a/drift/instrumentation/urllib3/notes.md b/drift/instrumentation/urllib3/notes.md index 384dede..8f308d4 100644 --- a/drift/instrumentation/urllib3/notes.md +++ b/drift/instrumentation/urllib3/notes.md @@ -9,87 +9,49 @@ In urllib3, when making HTTP requests: - **`preload_content=True` (default)**: Response body is immediately read into memory and cached in `response._body` -- **`preload_content=False`**: Response body is NOT automatically read; the underlying stream remains open for the application to read manually via `response.read()` or `response.stream()` +- **`preload_content=False`**: Response body is NOT automatically read; the underlying stream (`_fp`) remains open for the caller to read manually via `response.read()` or `response.stream()` -This is commonly used for: +`preload_content=False` is commonly used by: +- **botocore/boto3** — always uses this to control the read lifecycle, validate CRC32 checksums, and handle retries - Large file downloads (avoid loading GB into memory) - Streaming responses (process chunks as they arrive) -- Memory-efficient processing -### Bug Found & Fixed +### How It Works (Recording) -**Problem:** The instrumentation broke applications using `preload_content=False` because accessing `response.data` in `_finalize_span()` consumed the response stream before the application could read it. +When recording a `preload_content=False` response, the instrumentation captures the body in `_get_response_body_safely()` by: -**Root Cause:** The original code used `hasattr(response, "data")` to check for the attribute. However, Python's `hasattr()` internally calls `getattr()`, which triggers property getters. urllib3's `.data` is a property that reads the entire body when accessed, consuming the stream. +1. Reading the raw bytes from `response._fp` (the underlying socket/`http.client.HTTPResponse`) +2. Replacing `response._fp` with an `io.BytesIO` containing those same bytes +3. Returning the bytes for inclusion in the span's `outputValue.body` -**Fix:** Check for the internal `_body` attribute directly using `getattr(response, "_body", sentinel)` instead of `hasattr(response, "data")`. +This is safe because `_finalize_span` runs in the `finally` block **before** the response is returned to the caller. The caller then reads from the BytesIO exactly as it would have read from the socket — urllib3's `read()` pipeline (content decoding, connection release, etc.) processes the same bytes. -**Logging**: Instead of silently skipping body capture, the SDK will emit a warning in application logs to inform users that replay may not work correctly for this request. Example: +**Important**: We read from `_fp` directly rather than calling `response.read()` because urllib3's `read()` consumes `_fp` and does **not** check `_body` on subsequent calls. Only the `.data` property checks `_body`. Since botocore uses `read()`, a second `read()` call would return `b""` and break CRC32 checksum validation. -```text -[TuskDrift] Response body not captured for GET https://example.com/api - request used preload_content=False or streaming. Replay may return an empty body. -``` +A size guard (`_MAX_CAPTURE_BYTES = 1 MB`) prevents buffering very large responses into memory. Spans exceeding 1 MB are blocked at export time anyway (`MAX_SPAN_SIZE_BYTES`), so there's no value in reading more than that from the socket. -### Trade-off: RECORD vs REPLAY +### How It Works (Replay) -For `preload_content=False` responses, we face an inherent trade-off: +When replaying, `_create_mock_response()` constructs a urllib3 `HTTPResponse` with `preload_content=False` and a `BytesIO` containing the recorded body. This ensures: -| Mode | Behavior | -|------|----------| -| **RECORD** | ✅ Works correctly - we skip capturing the body, so the application can read/stream normally | -| **REPLAY** | ⚠️ Response body will be empty - we didn't capture it, so there's nothing to return | +- Callers using `read()` (botocore) read from the fresh BytesIO and get the body +- Callers using `.data` (requests library) trigger `read(cache_content=True)`, which reads from the BytesIO and caches in `_body` +- Callers using `stream()` read from the BytesIO in chunks -This is intentional. We prioritize not breaking the application during RECORD over capturing data for REPLAY. - -### Mock Matching Impact - -Mock matching is not affected because the CLI's matching algorithm is entirely input-based. It matches on request URL, method, headers, body (InputValue), and uses InputValueHash, InputSchemaHash, and similarity scoring on InputValue. OutputValue (response) is not used for matching. - -Mock **response** is affected: - -- The matched span's `OutputValue.body` will be empty/missing -- Status code and headers are still captured correctly -- Application may fail if it expects actual response data +`preload_content` must be `False` for mock responses — with `True`, the constructor exhausts the BytesIO during initialization, and subsequent `read()` calls return `b""`. ### Code Location -The safe body retrieval logic is in `_get_response_body_safely()` method in [`instrumentation.py`](./instrumentation.py): - -```python -def _get_response_body_safely(self, response: Any) -> bytes | None: - # Use getattr with a sentinel to avoid triggering property getters - _sentinel = object() - body = getattr(response, "_body", _sentinel) - - if body is _sentinel: - return b"" # Not a urllib3 HTTPResponse - - if body is not None: - return body # Body already cached, safe to return - - # Body hasn't been read yet (preload_content=False) - # Skip to avoid consuming the stream - return None -``` +The safe body retrieval logic is in `_get_response_body_safely()` in [`instrumentation.py`](./instrumentation.py). +Mock response construction is in `_create_mock_response()` in the same file. ### Test Coverage -The e2e tests include endpoints for `preload_content=False` and streaming responses (`/test/bug/preload-content-false`, `/test/bug/streaming-response`), but these are excluded from the automated REPLAY test suite since they're incompatible with replay. They can be tested manually to verify RECORD mode doesn't break the application. - -### Future Considerations - -**Potential improvements to support replay for streaming responses:** - -1. **Response wrapper approach**: Create a wrapper around the urllib3 response that intercepts `read()` and `stream()` calls, capturing bytes as the application reads them. After the application finishes reading, store the accumulated body in the span. This would require: - - Implementing a custom response wrapper class - - Deferring span finalization until the response is fully consumed - - Handling edge cases like partial reads, connection errors mid-stream - -2. **Post-read body capture**: After the application calls `response.read()`, the body becomes cached in `response._body`. We could potentially capture it at that point. However, this requires knowing *when* the application has finished reading, which is non-trivial. +The e2e tests include endpoints for the `preload_content=False` pattern: -This may not be worth implementing at the moment because: +- `/test/preload-content-false-read` — manual `read()` (botocore pattern) +- `/test/preload-content-false-crc32` — `read()` + CRC32 checksum validation (DynamoDB pattern) +- `/test/preload-content-false-stream` — chunked `stream()` reading -- Applications using `preload_content=False` are typically downloading large files or streaming data - scenarios that aren't good candidates for unit test mocking anyway -- The complexity of implementing response wrappers could introduce subtle bugs -- Most API calls use the default `preload_content=True` and work correctly +These are included in the automated REPLAY test suite. From 2b63fa743911772399fa1e2575ce01fb20acabdc Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 15:07:27 -0700 Subject: [PATCH 05/17] address code review comment --- drift/instrumentation/django/instrumentation.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/drift/instrumentation/django/instrumentation.py b/drift/instrumentation/django/instrumentation.py index 911f7aa..d234f85 100644 --- a/drift/instrumentation/django/instrumentation.py +++ b/drift/instrumentation/django/instrumentation.py @@ -123,13 +123,12 @@ def _defer_middleware_injection(self) -> None: original_setup = django.setup def patched_setup(*args, **kwargs): - # Restore original setup first to avoid re-entrance - django.setup = original_setup - # Run the original django.setup() - result = original_setup(*args, **kwargs) - # Now settings are configured — inject middleware - self._try_inject_middleware() - return result + try: + result = original_setup(*args, **kwargs) + self._try_inject_middleware() + return result + finally: + django.setup = original_setup django.setup = patched_setup logger.debug("Deferred middleware injection to django.setup()") From 1ef5d7bdade5094ca4ee11cb38357f54e7c6390b Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 15:09:33 -0700 Subject: [PATCH 06/17] fix lint --- drift/instrumentation/urllib/instrumentation.py | 1 + drift/instrumentation/urllib3/instrumentation.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py index 1a9d63f..a0ea610 100644 --- a/drift/instrumentation/urllib/instrumentation.py +++ b/drift/instrumentation/urllib/instrumentation.py @@ -350,6 +350,7 @@ def patched_open(opener_self, fullurl, data=None, timeout=_GLOBAL_DEFAULT_TIMEOU return original_open(opener_self, fullurl, data, timeout) try: + def original_call(): return original_open(opener_self, fullurl, data, timeout) diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index 8f78815..fef1ece 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -676,8 +676,7 @@ def _get_response_body_safely(self, response: Any) -> bytes | None: return raw_data if isinstance(raw_data, bytes) else b"" except Exception: logger.debug( - "Failed to buffer response body for instrumentation, " - "response body will not be captured for replay", + "Failed to buffer response body for instrumentation, response body will not be captured for replay", exc_info=True, ) return None From 79606430538960e88af2dc0c1e97fe243f2e485d Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 16:13:24 -0700 Subject: [PATCH 07/17] normalize all headers --- drift/instrumentation/urllib3/instrumentation.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index fef1ece..d0cd838 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -479,6 +479,7 @@ def _handle_record_urlopen( headers = kw.get("headers") or {} if isinstance(headers, (list, tuple)): headers = dict(headers) + headers = _normalize_headers(headers) if headers else {} if self._transform_engine and self._transform_engine.should_drop_outbound_request( method.upper(), url, headers ): @@ -540,7 +541,7 @@ def _handle_record_connection_pool_urlopen( try: with SpanUtils.with_span(span_info): # Check drop transforms before making the request - headers_dict = dict(headers) if headers else {} + headers_dict = _normalize_headers(dict(headers)) if headers else {} if self._transform_engine and self._transform_engine.should_drop_outbound_request( method.upper(), full_url, headers_dict ): @@ -773,7 +774,7 @@ def _try_get_mock( encoded_fields = urlencode(fields) body_base64, body_size = self._encode_body_to_base64(encoded_fields) - headers_dict = dict(headers) if headers else {} + headers_dict = _normalize_headers(dict(headers)) if headers else {} raw_input_value = { "method": method.upper(), From dae0f852406bc48ad682e280aa275e516f1f92b8 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 16:59:13 -0700 Subject: [PATCH 08/17] decompress gzip response bodies during replay for preload_content=False recording + filter non tcp logs --- .../e2e_common/mock_upstream/mock_server.py | 18 ++++++ .../instrumentation/socket/instrumentation.py | 21 ++++++- .../urllib3/e2e-tests/src/app.py | 30 ++++++++++ .../urllib3/e2e-tests/src/test_requests.py | 3 + .../urllib3/instrumentation.py | 60 ++++++++++++++++--- 5 files changed, 120 insertions(+), 12 deletions(-) diff --git a/drift/instrumentation/e2e_common/mock_upstream/mock_server.py b/drift/instrumentation/e2e_common/mock_upstream/mock_server.py index be8dcbf..b7e4cc3 100644 --- a/drift/instrumentation/e2e_common/mock_upstream/mock_server.py +++ b/drift/instrumentation/e2e_common/mock_upstream/mock_server.py @@ -3,6 +3,7 @@ from __future__ import annotations +import gzip import json import os from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer @@ -19,6 +20,17 @@ def _json(handler: BaseHTTPRequestHandler, payload: Any, status: int = 200): handler.wfile.write(body) +def _json_gzip(handler: BaseHTTPRequestHandler, payload: Any, status: int = 200): + """Serve JSON compressed with gzip, setting Content-Encoding: gzip.""" + body = gzip.compress(json.dumps(payload).encode("utf-8")) + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Encoding", "gzip") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + def _text(handler: BaseHTTPRequestHandler, payload: str, status: int = 200): body = payload.encode("utf-8") handler.send_response(status) @@ -147,6 +159,12 @@ def do_GET(self): }, ) + if path == "/gzip": + return _json_gzip( + self, + {"gzipped": True, "method": "GET", "origin": "mock"}, + ) + if path in {"/json", "/json/"}: return _json( self, diff --git a/drift/instrumentation/socket/instrumentation.py b/drift/instrumentation/socket/instrumentation.py index 3cbd3b1..a021576 100644 --- a/drift/instrumentation/socket/instrumentation.py +++ b/drift/instrumentation/socket/instrumentation.py @@ -87,6 +87,10 @@ def patch(self, module: ModuleType) -> None: logger.warning("[SocketInstrumentation] socket.socket class not found") return + import socket as _socket_module + + SOCK_STREAM = _socket_module.SOCK_STREAM + # Store original methods exactly like the working test pattern original_connect = socket_class.connect original_send = socket_class.send @@ -94,12 +98,23 @@ def patch(self, module: ModuleType) -> None: instrumentation = self + def _is_tcp(sock: Any) -> bool: + """Only TCP (SOCK_STREAM) sockets carry HTTP traffic. + + UDP sockets (StatsD, DNS, etc.) are fire-and-forget side-effects + that don't need record/replay instrumentation. + """ + try: + return sock.type & SOCK_STREAM != 0 + except Exception: + return False + # Patch connect - always track and detect def patched_connect(self: Any, *args: Any, **kwargs: Any) -> Any: """Patched socket.connect method.""" - # Track this socket as an outbound socket - instrumentation._outbound_sockets.add(self) - instrumentation._handle_socket_call("connect", self) + if _is_tcp(self): + instrumentation._outbound_sockets.add(self) + instrumentation._handle_socket_call("connect", self) return original_connect(self, *args, **kwargs) # Patch send - only detect if socket is tracked (outbound) diff --git a/drift/instrumentation/urllib3/e2e-tests/src/app.py b/drift/instrumentation/urllib3/e2e-tests/src/app.py index 765c043..0edcbcd 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/app.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/app.py @@ -516,6 +516,36 @@ def test_preload_content_false_stream(): return jsonify({"error": str(e)}), 500 +@app.route("/test/preload-content-false-gzip", methods=["GET"]) +def test_preload_content_false_gzip(): + """Test preload_content=False with gzip-compressed response. + + Requests a gzip-encoded response and reads it via read(). During + recording, _get_response_body_safely captures the raw (compressed) bytes + from the socket. During replay, _create_mock_response must decompress + them before serving so the caller gets plain JSON — otherwise the mock + would return compressed bytes with no Content-Encoding header and the + caller would get garbled data. + """ + try: + response = http.request( + "GET", + "https://httpbin.org/gzip", + preload_content=False, + headers={"Accept-Encoding": "gzip"}, + ) + data_bytes = response.read() + response.release_conn() + + if not data_bytes: + return jsonify({"error": "Empty body from read()"}), 500 + + data = json.loads(data_bytes.decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": sdk.mark_app_as_ready() app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py index 94f0e5d..1ab1b14 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py @@ -109,4 +109,7 @@ # stream() after preload_content=False make_request("GET", "/test/preload-content-false-stream") + # read() after preload_content=False with gzip-compressed response + make_request("GET", "/test/preload-content-false-gzip") + print_request_summary() diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index d0cd838..cc97383 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -834,6 +834,33 @@ def _try_get_mock( logger.error(f"Error getting mock for {method} {url}: {e}") return None + @staticmethod + def _decompress(data: bytes, encoding: str) -> bytes: + """Decompress response body bytes. Returns original data on failure.""" + import gzip + import zlib + + if encoding == "gzip" or encoding == "x-gzip": + return gzip.decompress(data) + if encoding == "deflate": + try: + return zlib.decompress(data) + except zlib.error: + return zlib.decompress(data, -zlib.MAX_WBITS) + if encoding == "br": + try: + import brotli + return brotli.decompress(data) + except ImportError: + pass + if encoding == "zstd": + try: + import zstandard + return zstandard.ZstdDecompressor().decompress(data) + except ImportError: + pass + return data + def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], url: str) -> Any: """Create a mocked urllib3.HTTPResponse object. @@ -850,15 +877,6 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], status_code = mock_data.get("statusCode", 200) headers = dict(mock_data.get("headers", {})) - # Remove content-encoding and transfer-encoding headers since the body - # was already decompressed when recorded - headers_to_remove = [] - for key in headers: - if key.lower() in ("content-encoding", "transfer-encoding"): - headers_to_remove.append(key) - for key in headers_to_remove: - del headers[key] - # Get body - decode from base64 if needed body = mock_data.get("body", "") content = b"" @@ -876,6 +894,30 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], else: content = json.dumps(body).encode("utf-8") + # The recorded body may be compressed (preload_content=False records + # raw bytes from the socket) or already decompressed (preload_content=True + # records _body which urllib3 already decoded). We try to decompress + # so the mock always serves plain content. Then we strip the encoding + # headers so urllib3's decoder is a no-op. + content_encoding = None + for key in headers: + if key.lower() == "content-encoding": + content_encoding = headers[key].lower() + break + + if content_encoding and content_encoding != "identity": + try: + content = self._decompress(content, content_encoding) + except Exception: + pass + + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] + final_url = mock_data.get("finalUrl") or url headers["Content-Length"] = str(len(content)) From b702a78b78d512c2e4931ba8a8497e3290b94091 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 17:23:57 -0700 Subject: [PATCH 09/17] decompress during recording --- .../urllib3/instrumentation.py | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index cc97383..ef790c8 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -894,23 +894,8 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], else: content = json.dumps(body).encode("utf-8") - # The recorded body may be compressed (preload_content=False records - # raw bytes from the socket) or already decompressed (preload_content=True - # records _body which urllib3 already decoded). We try to decompress - # so the mock always serves plain content. Then we strip the encoding - # headers so urllib3's decoder is a no-op. - content_encoding = None - for key in headers: - if key.lower() == "content-encoding": - content_encoding = headers[key].lower() - break - - if content_encoding and content_encoding != "identity": - try: - content = self._decompress(content, content_encoding) - except Exception: - pass - + # Strip encoding headers — the body was already decompressed at + # recording time (in _finalize_span), so the decoder should be a no-op. headers_to_remove = [] for key in headers: if key.lower() in ("content-encoding", "transfer-encoding"): @@ -1014,11 +999,19 @@ def _finalize_span( response_body_size = 0 try: - # Get response content safely without consuming the stream - # urllib3's .data property will read from the stream if not preloaded, - # which would break applications using preload_content=False or response.stream() response_bytes = self._get_response_body_safely(response) if response_bytes is not None: + # For preload_content=False, response_bytes are raw from + # the socket and may still be gzip/deflate compressed. + # Decompress so the span always stores plain content + # (matches preload_content=True where _body is already + # decompressed by urllib3). + resp_encoding = response_headers.get("Content-Encoding", "").lower() + if resp_encoding and resp_encoding != "identity": + try: + response_bytes = self._decompress(response_bytes, resp_encoding) + except Exception: + pass response_body_base64, response_body_size = self._encode_body_to_base64(response_bytes) else: response_body_base64 = None From 1801362d58e16212c39a673cf1e7e8333286dc03 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 17:25:34 -0700 Subject: [PATCH 10/17] fix is tcp check --- drift/instrumentation/socket/instrumentation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drift/instrumentation/socket/instrumentation.py b/drift/instrumentation/socket/instrumentation.py index a021576..7f64891 100644 --- a/drift/instrumentation/socket/instrumentation.py +++ b/drift/instrumentation/socket/instrumentation.py @@ -105,7 +105,7 @@ def _is_tcp(sock: Any) -> bool: that don't need record/replay instrumentation. """ try: - return sock.type & SOCK_STREAM != 0 + return (sock.type & SOCK_STREAM) != 0 except Exception: return False From 3a3c13ca24f48e692b99e6795df06f4829e7954a Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Thu, 19 Mar 2026 17:42:55 -0700 Subject: [PATCH 11/17] address lowercase comment --- drift/instrumentation/urllib3/instrumentation.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index ef790c8..c4407db 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -850,12 +850,14 @@ def _decompress(data: bytes, encoding: str) -> bytes: if encoding == "br": try: import brotli + return brotli.decompress(data) except ImportError: pass if encoding == "zstd": try: import zstandard + return zstandard.ZstdDecompressor().decompress(data) except ImportError: pass @@ -1006,7 +1008,11 @@ def _finalize_span( # Decompress so the span always stores plain content # (matches preload_content=True where _body is already # decompressed by urllib3). - resp_encoding = response_headers.get("Content-Encoding", "").lower() + resp_encoding = "" + for hdr_key, hdr_val in response_headers.items(): + if hdr_key.lower() == "content-encoding": + resp_encoding = hdr_val.lower() + break if resp_encoding and resp_encoding != "identity": try: response_bytes = self._decompress(response_bytes, resp_encoding) From d29bcf14de1c7d0de12c6459b77892008007477d Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 20 Mar 2026 10:43:30 -0700 Subject: [PATCH 12/17] fix: preserve raw compressed bytes in urllib3 replay for preload_content=False responses --- E2E_TESTING_GUIDE.md | 8 +-- .../urllib3/e2e-tests/.tusk/config.yaml | 2 + .../urllib3/e2e-tests/src/app.py | 42 ++++++++++++++ .../urllib3/e2e-tests/src/test_requests.py | 8 +++ .../urllib3/instrumentation.py | 57 +++++++++++-------- 5 files changed, 90 insertions(+), 27 deletions(-) diff --git a/E2E_TESTING_GUIDE.md b/E2E_TESTING_GUIDE.md index c1a413b..e7ecd5c 100644 --- a/E2E_TESTING_GUIDE.md +++ b/E2E_TESTING_GUIDE.md @@ -143,7 +143,7 @@ pkill -f "python src/app.py" Run the Tusk CLI to replay the recorded traces: ```bash -TUSK_ANALYTICS_DISABLED=1 tusk run --print --output-format "json" --enable-service-logs +TUSK_ANALYTICS_DISABLED=1 tusk drift run --print --output-format "json" --enable-service-logs ``` **Flags explained:** @@ -155,7 +155,7 @@ TUSK_ANALYTICS_DISABLED=1 tusk run --print --output-format "json" --enable-servi To see all available flags, run: ```bash -tusk run --help +tusk drift run --help ``` **Interpreting Results:** @@ -235,7 +235,7 @@ The actual test orchestration happens inside the container via `entrypoint.py`, 2. Starts app in RECORD mode 3. Executes test requests 4. Stops app, verifies traces -5. Runs `tusk run` CLI +5. Runs `tusk drift run` CLI 6. Checks for socket instrumentation warnings 7. Returns exit code @@ -334,7 +334,7 @@ TUSK_DRIFT_MODE=RECORD python src/app.py python src/test_requests.py # Inside container: Run Tusk CLI tests -TUSK_ANALYTICS_DISABLED=1 tusk run --print --output-format "json" --enable-service-logs +TUSK_ANALYTICS_DISABLED=1 tusk drift run --print --output-format "json" --enable-service-logs # View traces cat .tusk/traces/*.jsonl | python -m json.tool diff --git a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml index 29df570..d314d88 100644 --- a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml +++ b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml @@ -25,3 +25,5 @@ recording: replay: enable_telemetry: false + sandbox: + mode: off diff --git a/drift/instrumentation/urllib3/e2e-tests/src/app.py b/drift/instrumentation/urllib3/e2e-tests/src/app.py index 0edcbcd..71eb258 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/app.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/app.py @@ -546,6 +546,48 @@ def test_preload_content_false_gzip(): return jsonify({"error": str(e)}), 500 +@app.route("/test/preload-false-decode-content-false-gzip", methods=["GET"]) +def test_preload_false_decode_content_false_gzip(): + """Test preload_content=False + decode_content=False with gzip response. + """ + import gzip as gzip_mod + + try: + response = http.request( + "GET", + "https://httpbin.org/gzip", + preload_content=False, + decode_content=False, + headers={"Accept-Encoding": "gzip"}, + ) + raw_bytes = response.read(decode_content=False) + response.release_conn() + + if not raw_bytes: + return jsonify({"error": "Empty body from read()"}), 500 + + # The caller expects compressed bytes and decompresses manually + try: + decompressed = gzip_mod.decompress(raw_bytes) + data = json.loads(decompressed.decode("utf-8")) + return jsonify({ + "raw_bytes_length": len(raw_bytes), + "decompressed_length": len(decompressed), + "was_compressed": len(raw_bytes) != len(decompressed), + "data": data, + }) + except gzip_mod.BadGzipFile: + # If we get here during replay, it means the bytes were already + # decompressed - this is the bug + return jsonify({ + "error": "BadGzipFile - bytes were not compressed as expected", + "raw_bytes_length": len(raw_bytes), + "raw_bytes_preview": raw_bytes[:100].decode("utf-8", errors="replace"), + }), 500 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": sdk.mark_app_as_ready() app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py index 1ab1b14..e499d1c 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/test_requests.py @@ -112,4 +112,12 @@ # read() after preload_content=False with gzip-compressed response make_request("GET", "/test/preload-content-false-gzip") + # ========================================================================== + # preload_content=False Edge Case Tests (bug detection) + # ========================================================================== + print("\n--- preload_content=False Edge Case Tests ---\n") + + # decode_content=False + gzip (caller wants raw compressed bytes) + make_request("GET", "/test/preload-false-decode-content-false-gzip") + print_request_summary() diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index c4407db..d5b43b7 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -896,14 +896,19 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], else: content = json.dumps(body).encode("utf-8") - # Strip encoding headers — the body was already decompressed at - # recording time (in _finalize_span), so the decoder should be a no-op. - headers_to_remove = [] - for key in headers: - if key.lower() in ("content-encoding", "transfer-encoding"): - headers_to_remove.append(key) - for key in headers_to_remove: - del headers[key] + # When bodyIsRawEncoded is true, the stored bytes are still compressed + # (preload_content=False during recording) — keep encoding headers so + # urllib3's read pipeline can decompress on demand. Otherwise the body + # was already decompressed at recording time (preload_content=True), so + # strip encoding headers to avoid double-decompression. + body_is_raw_encoded = mock_data.get("bodyIsRawEncoded", False) + if not body_is_raw_encoded: + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] final_url = mock_data.get("finalUrl") or url @@ -1003,21 +1008,25 @@ def _finalize_span( try: response_bytes = self._get_response_body_safely(response) if response_bytes is not None: - # For preload_content=False, response_bytes are raw from - # the socket and may still be gzip/deflate compressed. - # Decompress so the span always stores plain content - # (matches preload_content=True where _body is already - # decompressed by urllib3). - resp_encoding = "" - for hdr_key, hdr_val in response_headers.items(): - if hdr_key.lower() == "content-encoding": - resp_encoding = hdr_val.lower() - break - if resp_encoding and resp_encoding != "identity": - try: - response_bytes = self._decompress(response_bytes, resp_encoding) - except Exception: - pass + # Determine if the stored bytes are still raw-encoded + # (compressed). When preload_content=True, urllib3 + # already decompressed into _body so the bytes are + # plain. When preload_content=False, _body is None + # and the bytes came straight from _fp — still + # compressed. We store a flag so replay can decide + # whether to preserve Content-Encoding headers. + body_is_raw_encoded = False + _sentinel = object() + _body_val = getattr(response, "_body", _sentinel) + if _body_val is None: + # preload_content=False — bytes from _fp, possibly compressed + resp_encoding = "" + for hdr_key, hdr_val in response_headers.items(): + if hdr_key.lower() == "content-encoding": + resp_encoding = hdr_val.lower() + break + if resp_encoding and resp_encoding != "identity": + body_is_raw_encoded = True response_body_base64, response_body_size = self._encode_body_to_base64(response_bytes) else: response_body_base64 = None @@ -1039,6 +1048,8 @@ def _finalize_span( if response_body_base64 is not None: output_value["body"] = response_body_base64 output_value["bodySize"] = response_body_size + if body_is_raw_encoded: + output_value["bodyIsRawEncoded"] = True if status_code >= 400: status = SpanStatus( From f6807b537b3e7e92795e34771389109324476436 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 20 Mar 2026 11:22:24 -0700 Subject: [PATCH 13/17] remove sandbox off --- drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml index d314d88..29df570 100644 --- a/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml +++ b/drift/instrumentation/urllib3/e2e-tests/.tusk/config.yaml @@ -25,5 +25,3 @@ recording: replay: enable_telemetry: false - sandbox: - mode: off From 0414130849f076bf0f5e99da34cf5c689f17c50e Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 20 Mar 2026 11:39:53 -0700 Subject: [PATCH 14/17] fix lint --- drift/core/drift_sdk.py | 12 ++++---- .../instrumentation/django/instrumentation.py | 2 +- .../urllib3/e2e-tests/src/app.py | 29 ++++++++++--------- pyproject.toml | 2 ++ 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index cf75f54..67927f4 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -443,7 +443,7 @@ def _init_auto_instrumentations(self) -> None: pass try: - import httpx # type: ignore[unresolved-import] + import httpx from ..instrumentation.httpx import HttpxInstrumentation @@ -473,7 +473,7 @@ def _init_auto_instrumentations(self) -> None: pass try: - import sqlalchemy # type: ignore[unresolved-import] + import sqlalchemy from ..instrumentation.sqlalchemy import SqlAlchemyInstrumentation @@ -490,7 +490,7 @@ def _init_auto_instrumentations(self) -> None: # Try psycopg2 first try: - import psycopg2 # type: ignore[unresolved-import] + import psycopg2 from ..instrumentation.psycopg2 import Psycopg2Instrumentation @@ -502,7 +502,7 @@ def _init_auto_instrumentations(self) -> None: # Try psycopg (v3) try: - import psycopg # type: ignore[unresolved-import] + import psycopg from ..instrumentation.psycopg import PsycopgInstrumentation @@ -518,7 +518,7 @@ def _init_auto_instrumentations(self) -> None: logger.debug("Both psycopg2 and psycopg available - instrumented both") try: - import redis # type: ignore[unresolved-import] + import redis from ..instrumentation.redis import RedisInstrumentation @@ -528,7 +528,7 @@ def _init_auto_instrumentations(self) -> None: pass try: - import grpc # type: ignore[unresolved-import] + import grpc from ..instrumentation.grpc import GrpcInstrumentation diff --git a/drift/instrumentation/django/instrumentation.py b/drift/instrumentation/django/instrumentation.py index d234f85..5ef0c18 100644 --- a/drift/instrumentation/django/instrumentation.py +++ b/drift/instrumentation/django/instrumentation.py @@ -130,7 +130,7 @@ def patched_setup(*args, **kwargs): finally: django.setup = original_setup - django.setup = patched_setup + django.setup = patched_setup # ty: ignore[invalid-assignment] logger.debug("Deferred middleware injection to django.setup()") def _force_database_reconnect(self) -> None: diff --git a/drift/instrumentation/urllib3/e2e-tests/src/app.py b/drift/instrumentation/urllib3/e2e-tests/src/app.py index 71eb258..9a8fb82 100644 --- a/drift/instrumentation/urllib3/e2e-tests/src/app.py +++ b/drift/instrumentation/urllib3/e2e-tests/src/app.py @@ -548,8 +548,7 @@ def test_preload_content_false_gzip(): @app.route("/test/preload-false-decode-content-false-gzip", methods=["GET"]) def test_preload_false_decode_content_false_gzip(): - """Test preload_content=False + decode_content=False with gzip response. - """ + """Test preload_content=False + decode_content=False with gzip response.""" import gzip as gzip_mod try: @@ -570,20 +569,24 @@ def test_preload_false_decode_content_false_gzip(): try: decompressed = gzip_mod.decompress(raw_bytes) data = json.loads(decompressed.decode("utf-8")) - return jsonify({ - "raw_bytes_length": len(raw_bytes), - "decompressed_length": len(decompressed), - "was_compressed": len(raw_bytes) != len(decompressed), - "data": data, - }) + return jsonify( + { + "raw_bytes_length": len(raw_bytes), + "decompressed_length": len(decompressed), + "was_compressed": len(raw_bytes) != len(decompressed), + "data": data, + } + ) except gzip_mod.BadGzipFile: # If we get here during replay, it means the bytes were already # decompressed - this is the bug - return jsonify({ - "error": "BadGzipFile - bytes were not compressed as expected", - "raw_bytes_length": len(raw_bytes), - "raw_bytes_preview": raw_bytes[:100].decode("utf-8", errors="replace"), - }), 500 + return jsonify( + { + "error": "BadGzipFile - bytes were not compressed as expected", + "raw_bytes_length": len(raw_bytes), + "raw_bytes_preview": raw_bytes[:100].decode("utf-8", errors="replace"), + } + ), 500 except Exception as e: return jsonify({"error": str(e)}), 500 diff --git a/pyproject.toml b/pyproject.toml index 26b1051..974f534 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,6 +133,7 @@ exclude = ["**/e2e-tests/**", "**/stack-tests/**"] [[tool.ty.overrides]] # Disable unresolved-import errors for instrumentation files with optional dependencies include = [ + "drift/core/**", "drift/instrumentation/django/**", "drift/instrumentation/grpc/**", "drift/instrumentation/psycopg/**", @@ -141,6 +142,7 @@ include = [ "drift/instrumentation/pyjwt/**", "drift/instrumentation/redis/**", "drift/instrumentation/kinde/**", + "drift/instrumentation/urllib3/**", "drift/instrumentation/http/transform_engine.py", ] From 9d57cdc0f06ceead509f1164a67050298c8c4784 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 20 Mar 2026 15:03:33 -0700 Subject: [PATCH 15/17] record date, time for psycopg --- .../psycopg/e2e-tests/src/app.py | 38 ++++++++--- .../psycopg/instrumentation.py | 4 +- .../psycopg2/e2e-tests/src/app.py | 64 +++++++++++++++++++ .../psycopg2/e2e-tests/src/test_requests.py | 3 + .../psycopg2/instrumentation.py | 3 +- drift/instrumentation/utils/psycopg_utils.py | 60 +++++++++++++++++ drift/instrumentation/utils/serialization.py | 6 +- 7 files changed, 167 insertions(+), 11 deletions(-) diff --git a/drift/instrumentation/psycopg/e2e-tests/src/app.py b/drift/instrumentation/psycopg/e2e-tests/src/app.py index 8daa179..e1914f8 100644 --- a/drift/instrumentation/psycopg/e2e-tests/src/app.py +++ b/drift/instrumentation/psycopg/e2e-tests/src/app.py @@ -957,12 +957,16 @@ def test_decimal_types(): @app.route("/test/date-time-types") def test_date_time_types(): - """Test date/time types.""" + """Test date/time types are preserved as proper Python objects during replay. + + Verifies that DATE columns come back as datetime.date, TIME columns as + datetime.time, and INTERVAL columns as datetime.timedelta — not plain strings. + Also exercises datetime.combine() which fails if date/time are strings. + """ try: - from datetime import date, time, timedelta + from datetime import date, datetime, time, timedelta with psycopg.connect(get_conn_string()) as conn, conn.cursor() as cur: - # Create temp table with date/time columns cur.execute(""" CREATE TEMP TABLE datetime_test ( id INT, @@ -972,23 +976,41 @@ def test_date_time_types(): ) """) - # Insert date/time data cur.execute( "INSERT INTO datetime_test VALUES (%s, %s, %s, %s)", (1, date(1990, 5, 15), time(8, 30, 0), timedelta(hours=2, minutes=30)), ) - # Query back cur.execute("SELECT * FROM datetime_test WHERE id = 1") row = cur.fetchone() conn.commit() + birth_date = row[1] + wake_time = row[2] + duration = row[3] + + type_checks = { + "birth_date_is_date": isinstance(birth_date, date) and not isinstance(birth_date, datetime), + "wake_time_is_time": isinstance(wake_time, time), + "duration_is_timedelta": isinstance(duration, timedelta), + } + + # Exercise datetime.combine() — this is the exact operation that fails + # when date/time values are returned as strings during replay + combined = datetime.combine(birth_date, wake_time) + type_checks["combine_works"] = isinstance(combined, datetime) + type_checks["combine_value"] = combined.isoformat() + + all_types_correct = all(v for k, v in type_checks.items() if k != "combine_value") + return jsonify( { "id": row[0], - "birth_date": str(row[1]) if row[1] else None, - "wake_time": str(row[2]) if row[2] else None, - "duration": str(row[3]) if row[3] else None, + "birth_date": str(birth_date), + "wake_time": str(wake_time), + "duration": str(duration), + "type_checks": type_checks, + "all_types_correct": all_types_correct, } ) except Exception as e: diff --git a/drift/instrumentation/psycopg/instrumentation.py b/drift/instrumentation/psycopg/instrumentation.py index 0d356ad..b79fa41 100644 --- a/drift/instrumentation/psycopg/instrumentation.py +++ b/drift/instrumentation/psycopg/instrumentation.py @@ -25,7 +25,7 @@ ) from ..base import InstrumentationBase from ..sqlalchemy.context import sqlalchemy_execution_active_context, sqlalchemy_replay_mock_context -from ..utils.psycopg_utils import deserialize_db_value, restore_row_integer_types +from ..utils.psycopg_utils import deserialize_db_value, restore_row_date_types, restore_row_integer_types from ..utils.serialization import serialize_value from .mocks import MockConnection, MockCopy from .wrappers import TracedCopyWrapper @@ -1829,6 +1829,7 @@ def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any], is_asy # Deserialize datetime strings back to datetime objects for consistent Flask serialization mock_rows = [deserialize_db_value(row) for row in mock_rows] mock_rows = [restore_row_integer_types(row, description_data) for row in mock_rows] + mock_rows = [restore_row_date_types(row, description_data) for row in mock_rows] cursor._mock_rows = mock_rows # pyright: ignore[reportAttributeAccessIssue] cursor._mock_index = 0 # pyright: ignore[reportAttributeAccessIssue] @@ -1898,6 +1899,7 @@ def _mock_executemany_returning_with_data(self, cursor: Any, mock_data: dict[str mock_rows = result_set.get("rows", []) mock_rows = [deserialize_db_value(row) for row in mock_rows] mock_rows = [restore_row_integer_types(row, description_data) for row in mock_rows] + mock_rows = [restore_row_date_types(row, description_data) for row in mock_rows] cursor._mock_result_sets.append( # pyright: ignore[reportAttributeAccessIssue] { diff --git a/drift/instrumentation/psycopg2/e2e-tests/src/app.py b/drift/instrumentation/psycopg2/e2e-tests/src/app.py index ecd57b3..c55692d 100644 --- a/drift/instrumentation/psycopg2/e2e-tests/src/app.py +++ b/drift/instrumentation/psycopg2/e2e-tests/src/app.py @@ -242,6 +242,70 @@ def db_register_jsonb(): return jsonify({"error": str(e), "error_type": type(e).__name__}), 500 +@app.route("/test/date-time-types") +def test_date_time_types(): + """Test date/time types are preserved as proper Python objects during replay. + + Verifies that DATE columns come back as datetime.date, TIME columns as + datetime.time, and INTERVAL columns as datetime.timedelta — not plain strings. + Also exercises datetime.combine() which fails if date/time are strings. + """ + try: + from datetime import date, datetime, time, timedelta + + conn = psycopg2.connect(get_conn_string()) + cur = conn.cursor() + + cur.execute(""" + CREATE TEMP TABLE datetime_test ( + id INT, + birth_date DATE, + wake_time TIME, + duration INTERVAL + ) + """) + + cur.execute( + "INSERT INTO datetime_test VALUES (%s, %s, %s, %s)", + (1, date(1990, 5, 15), time(8, 30, 0), timedelta(hours=2, minutes=30)), + ) + + cur.execute("SELECT * FROM datetime_test WHERE id = 1") + row = cur.fetchone() + conn.commit() + cur.close() + conn.close() + + birth_date = row[1] + wake_time = row[2] + duration = row[3] + + type_checks = { + "birth_date_is_date": isinstance(birth_date, date) and not isinstance(birth_date, datetime), + "wake_time_is_time": isinstance(wake_time, time), + "duration_is_timedelta": isinstance(duration, timedelta), + } + + combined = datetime.combine(birth_date, wake_time) + type_checks["combine_works"] = isinstance(combined, datetime) + type_checks["combine_value"] = combined.isoformat() + + all_types_correct = all(v for k, v in type_checks.items() if k != "combine_value") + + return jsonify( + { + "id": row[0], + "birth_date": str(birth_date), + "wake_time": str(wake_time), + "duration": str(duration), + "type_checks": type_checks, + "all_types_correct": all_types_correct, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": sdk.mark_app_as_ready() app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/psycopg2/e2e-tests/src/test_requests.py b/drift/instrumentation/psycopg2/e2e-tests/src/test_requests.py index 3166b91..14ffae8 100644 --- a/drift/instrumentation/psycopg2/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/psycopg2/e2e-tests/src/test_requests.py @@ -54,4 +54,7 @@ if user_id: make_request("DELETE", f"/db/delete/{user_id}") + # Date/time type preservation test + make_request("GET", "/test/date-time-types") + print_request_summary() diff --git a/drift/instrumentation/psycopg2/instrumentation.py b/drift/instrumentation/psycopg2/instrumentation.py index 1e4576a..cc722db 100644 --- a/drift/instrumentation/psycopg2/instrumentation.py +++ b/drift/instrumentation/psycopg2/instrumentation.py @@ -30,7 +30,7 @@ ) from ..base import InstrumentationBase from ..sqlalchemy.context import sqlalchemy_execution_active_context, sqlalchemy_replay_mock_context -from ..utils.psycopg_utils import deserialize_db_value, restore_row_integer_types +from ..utils.psycopg_utils import deserialize_db_value, restore_row_date_types, restore_row_integer_types from ..utils.serialization import serialize_value logger = logging.getLogger(__name__) @@ -950,6 +950,7 @@ def _mock_execute_with_data(self, cursor: Any, mock_data: dict[str, Any]) -> Non # Deserialize datetime strings back to datetime objects for consistent Flask/Django serialization mock_rows = [deserialize_db_value(row) for row in mock_rows] mock_rows = [restore_row_integer_types(row, description_data) for row in mock_rows] + mock_rows = [restore_row_date_types(row, description_data) for row in mock_rows] # Check if this is a dict-cursor (like RealDictCursor) # First check if cursor has _is_dict_cursor attribute (set by InstrumentedConnection.cursor()) diff --git a/drift/instrumentation/utils/psycopg_utils.py b/drift/instrumentation/utils/psycopg_utils.py index 731996d..5871fb4 100644 --- a/drift/instrumentation/utils/psycopg_utils.py +++ b/drift/instrumentation/utils/psycopg_utils.py @@ -19,6 +19,14 @@ 28, # XID } +# PostgreSQL date/time type OIDs for backward-compatible deserialization +# of recordings that stored date/time values as plain ISO strings +POSTGRES_DATE_TYPE_CODE = 1082 # DATE +POSTGRES_TIME_TYPE_CODES = { + 1083, # TIME WITHOUT TIME ZONE + 1266, # TIME WITH TIME ZONE +} + # Try to import psycopg Range type for deserialization support try: from psycopg.types.range import Range as PsycopgRange # type: ignore[import-untyped] @@ -60,6 +68,12 @@ def deserialize_db_value(val: Any) -> Any: # Check for timedelta tagged structure if "__timedelta__" in val and len(val) == 1: return dt.timedelta(seconds=val["__timedelta__"]) + # Check for date tagged structure + if "__date__" in val and len(val) == 1: + return dt.date.fromisoformat(val["__date__"]) + # Check for time tagged structure + if "__time__" in val and len(val) == 1: + return dt.time.fromisoformat(val["__time__"]) # Check for Range tagged structure (psycopg Range types) if "__range__" in val and len(val) == 1: range_data = val["__range__"] @@ -154,3 +168,49 @@ def restore_row_integer_types( else: result.append(value) return result + + +def restore_row_date_types( + row: list[Any] | dict[str, Any], description: list[dict[str, Any]] | None +) -> list[Any] | dict[str, Any]: + """Restore date/time types for database row values using column metadata. + + Backward compatibility for recordings made before tagged serialization was added. + Older recordings stored datetime.date as "YYYY-MM-DD" and datetime.time as "HH:MM:SS" + plain strings. This function uses column type_code from the cursor description to + convert those strings back to proper datetime.date / datetime.time objects. + + New recordings use tagged format ({"__date__": ...}, {"__time__": ...}) which are + handled by deserialize_db_value, so this function is a no-op for those values. + """ + if not description or not row: + return row + + def _convert(value: Any, type_code: int | None) -> Any: + if not isinstance(value, str): + return value + if type_code == POSTGRES_DATE_TYPE_CODE: + try: + return dt.date.fromisoformat(value) + except ValueError: + return value + if type_code in POSTGRES_TIME_TYPE_CODES: + try: + return dt.time.fromisoformat(value) + except ValueError: + return value + return value + + if isinstance(row, dict): + type_code_by_name: dict[str, int | None] = {} + for col in description: + if isinstance(col, dict): + col_name = col.get("name") + if col_name: + type_code_by_name[col_name] = col.get("type_code") + return {key: _convert(value, type_code_by_name.get(key)) for key, value in row.items()} + + return [ + _convert(value, description[i].get("type_code") if i < len(description) and isinstance(description[i], dict) else None) + for i, value in enumerate(row) + ] diff --git a/drift/instrumentation/utils/serialization.py b/drift/instrumentation/utils/serialization.py index e9cee40..fab4276 100644 --- a/drift/instrumentation/utils/serialization.py +++ b/drift/instrumentation/utils/serialization.py @@ -51,8 +51,12 @@ def serialize_value(val: Any) -> Any: Returns: A JSON-serializable version of the value. """ - if isinstance(val, (datetime.datetime, datetime.date, datetime.time)): + if isinstance(val, datetime.datetime): return val.isoformat() + elif isinstance(val, datetime.date): + return {"__date__": val.isoformat()} + elif isinstance(val, datetime.time): + return {"__time__": val.isoformat()} elif isinstance(val, datetime.timedelta): # Serialize timedelta as total seconds for consistent hashing return {"__timedelta__": val.total_seconds()} From 3cd74b6bd90ee1fe6c268e0206cebe85bd6af17a Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 20 Mar 2026 16:51:01 -0700 Subject: [PATCH 16/17] fix: handle psycopg2 adapter types in serialize_value to prevent empty spans --- drift/instrumentation/utils/serialization.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/drift/instrumentation/utils/serialization.py b/drift/instrumentation/utils/serialization.py index fab4276..f28e3a8 100644 --- a/drift/instrumentation/utils/serialization.py +++ b/drift/instrumentation/utils/serialization.py @@ -91,6 +91,12 @@ def serialize_value(val: Any) -> Any: # Serialize ipaddress types to string for inet/cidr PostgreSQL columns # These are returned by psycopg when querying inet and cidr columns return str(val) + elif hasattr(val, "getquoted"): + if hasattr(val, "adapted"): + return serialize_value(val.adapted) + elif hasattr(val, "addr"): + return serialize_value(val.addr) + return str(val) elif isinstance(val, memoryview): # Convert memoryview to bytes first, then serialize return _serialize_bytes(bytes(val)) From d0f1e109bc6ea7a99476c190d77a5b19cc684d50 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 20 Mar 2026 16:51:37 -0700 Subject: [PATCH 17/17] fix lint --- drift/instrumentation/utils/psycopg_utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/drift/instrumentation/utils/psycopg_utils.py b/drift/instrumentation/utils/psycopg_utils.py index 5871fb4..e031939 100644 --- a/drift/instrumentation/utils/psycopg_utils.py +++ b/drift/instrumentation/utils/psycopg_utils.py @@ -211,6 +211,9 @@ def _convert(value: Any, type_code: int | None) -> Any: return {key: _convert(value, type_code_by_name.get(key)) for key, value in row.items()} return [ - _convert(value, description[i].get("type_code") if i < len(description) and isinstance(description[i], dict) else None) + _convert( + value, + description[i].get("type_code") if i < len(description) and isinstance(description[i], dict) else None, + ) for i, value in enumerate(row) ]