Skip to content

Commit c2b2fc6

Browse files
committed
Fix streaming logic and streaming output parsing for httpx and requests instrumentation
1 parent 2804484 commit c2b2fc6

6 files changed

Lines changed: 470 additions & 60 deletions

File tree

netra/instrumentation/httpx/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ def instrumentation_dependencies(self) -> Collection[str]:
2323
return _instruments
2424

2525
def _instrument(self, **kwargs: Any) -> None:
26-
"""Instrument httpx client methods."""
26+
"""Instrument httpx.Client.send and httpx.AsyncClient.send.
27+
28+
Args:
29+
**kwargs: Keyword arguments passed by the instrumentation framework.
30+
tracer_provider: Optional TracerProvider to use for creating spans.
31+
"""
2732
try:
2833
tracer_provider = kwargs.get("tracer_provider")
2934
tracer = get_tracer(__name__, __version__, tracer_provider)
@@ -38,7 +43,11 @@ def _instrument(self, **kwargs: Any) -> None:
3843
logger.error(f"Failed to instrument httpx: {e}")
3944

4045
def _uninstrument(self, **kwargs: Any) -> None:
41-
"""Uninstrument httpx client methods."""
46+
"""Uninstrument httpx.Client.send and httpx.AsyncClient.send.
47+
48+
Args:
49+
**kwargs: Keyword arguments passed by the instrumentation framework.
50+
"""
4251
try:
4352
import httpx
4453

netra/instrumentation/httpx/utils.py

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@
2424

2525

2626
def should_suppress_instrumentation() -> bool:
27-
"""Check if instrumentation should be suppressed."""
27+
"""Check if instrumentation should be suppressed.
28+
29+
Returns:
30+
True if the OpenTelemetry suppression key is active in the current
31+
context, False otherwise.
32+
"""
2833
return context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) is True
2934

3035

@@ -143,6 +148,60 @@ def set_span_output(span: Span, response: httpx.Response) -> None:
143148
logger.error(f"Failed to set output attribute on httpx span: {e}")
144149

145150

151+
def _parse_streaming_body(accumulated: bytes) -> Any:
152+
"""Parse accumulated streaming response bytes into a structured value.
153+
154+
Handles SSE (``data: {...}``), NDJSON, plain concatenated JSON objects,
155+
and falls back to a decoded string or a binary placeholder.
156+
157+
Args:
158+
accumulated: Raw bytes collected from the streaming response chunks.
159+
160+
Returns:
161+
A parsed JSON object or list, a plain string, or a binary-size
162+
placeholder string if the bytes cannot be decoded as UTF-8.
163+
"""
164+
try:
165+
text = accumulated.decode("utf-8")
166+
except UnicodeDecodeError:
167+
return f"<binary content: {len(accumulated)} bytes>"
168+
169+
# SSE: any line starts with "data:"
170+
lines = [ln.strip() for ln in text.splitlines() if ln.strip()]
171+
if any(ln.startswith("data:") for ln in lines):
172+
parsed: List[Any] = []
173+
for ln in lines:
174+
if ln.startswith("data:"):
175+
data = ln[5:].strip()
176+
if data == "[DONE]":
177+
continue
178+
try:
179+
parsed.append(json.loads(data))
180+
except json.JSONDecodeError:
181+
parsed.append(data)
182+
if parsed:
183+
return parsed[0] if len(parsed) == 1 else parsed
184+
185+
# Sequential JSON decoding: handles single JSON, NDJSON, and bare concatenated objects
186+
decoder = json.JSONDecoder()
187+
results: List[Any] = []
188+
idx = 0
189+
stripped = text.strip()
190+
try:
191+
while idx < len(stripped):
192+
obj, end_idx = decoder.raw_decode(stripped, idx)
193+
results.append(obj)
194+
idx = end_idx
195+
while idx < len(stripped) and stripped[idx] in " \t\n\r":
196+
idx += 1
197+
if results and idx == len(stripped):
198+
return results[0] if len(results) == 1 else results
199+
except json.JSONDecodeError:
200+
pass
201+
202+
return text
203+
204+
146205
def set_streaming_span_output(span: Span, response: httpx.Response, chunks: List[bytes]) -> None:
147206
"""Serialize accumulated streaming chunks and set them as the span ``output`` attribute.
148207
@@ -159,15 +218,7 @@ def set_streaming_span_output(span: Span, response: httpx.Response, chunks: List
159218
"headers": _sanitize_headers(response.headers),
160219
}
161220
if chunks:
162-
accumulated = b"".join(chunks)
163-
try:
164-
body: Any = json.loads(accumulated)
165-
except (json.JSONDecodeError, UnicodeDecodeError):
166-
try:
167-
body = accumulated.decode("utf-8")
168-
except UnicodeDecodeError:
169-
body = f"<binary content: {len(accumulated)} bytes>"
170-
output_data["body"] = body
221+
output_data["body"] = _parse_streaming_body(b"".join(chunks))
171222
span.set_attribute("output", json.dumps(output_data))
172223
except Exception as e:
173224
logger.error(f"Failed to set streaming output attribute on httpx span: {e}")

0 commit comments

Comments
 (0)