diff --git a/src/debug_toolbar/litestar/middleware.py b/src/debug_toolbar/litestar/middleware.py index ff4658b..4bf6d60 100644 --- a/src/debug_toolbar/litestar/middleware.py +++ b/src/debug_toolbar/litestar/middleware.py @@ -2,6 +2,7 @@ from __future__ import annotations +import gzip import logging import re import time @@ -190,20 +191,27 @@ async def _handle_response_body( async def _send_html_response(self, send: Send, context: RequestContext, state: ResponseState) -> None: """Process and send buffered HTML response with toolbar injection.""" full_body = b"".join(state.body_chunks) + content_encoding = state.headers.get("content-encoding", "") try: await self.toolbar.process_response(context) - modified_body = self._inject_toolbar(full_body, context) + modified_body, new_encoding = self._inject_toolbar(full_body, context, content_encoding) server_timing = self.toolbar.get_server_timing_header(context) except Exception: logger.debug("Toolbar processing failed, sending original response", exc_info=True) modified_body = full_body + new_encoding = content_encoding server_timing = None + # Build headers, excluding content-length (recalculated) and content-encoding (may have changed) + excluded_headers = {"content-length", "content-encoding"} new_headers: list[tuple[bytes, bytes]] = [ - (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() != "content-length" + (k.encode(), v.encode()) for k, v in state.headers.items() if k.lower() not in excluded_headers ] new_headers.append((b"content-length", str(len(modified_body)).encode())) + # Only add content-encoding if we still have one (not stripped due to decompression) + if new_encoding: + new_headers.append((b"content-encoding", new_encoding.encode())) if server_timing: new_headers.append((b"server-timing", server_timing.encode())) @@ -471,20 +479,39 @@ def _populate_routes_metadata(self, request: Request, context: RequestContext) - context.metadata["routes"] = [] context.metadata["matched_route"] = "" - def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: + def _inject_toolbar(self, body: bytes, context: RequestContext, content_encoding: str = "") -> tuple[bytes, str]: """Inject the toolbar HTML into the response body. Args: - body: The original response body. + body: The original response body (may be compressed). context: The request context with collected data. + content_encoding: The content-encoding header value (e.g., "gzip"). Returns: - The modified response body with toolbar injected. + Tuple of (modified body, content_encoding to use). + If gzip was decompressed, returns uncompressed body with empty encoding. """ + # Handle gzip-compressed responses + # Track whether we successfully decompressed the body + decompressed = False + encodings = [e.strip() for e in content_encoding.lower().split(",")] if content_encoding else [] + if "gzip" in encodings: + try: + body = gzip.decompress(body) + decompressed = True + except gzip.BadGzipFile: + # Not valid gzip, try to decode as-is + pass + try: html = body.decode("utf-8") except UnicodeDecodeError: - return body + # Can't decode. If we successfully decompressed gzip, return the + # decompressed body with no content-encoding. Otherwise, return + # the body as-is with the original encoding. + if decompressed: + return body, "" + return body, content_encoding toolbar_data = self.toolbar.get_toolbar_data(context) toolbar_html = self._render_toolbar(toolbar_data) @@ -496,7 +523,10 @@ def _inject_toolbar(self, body: bytes, context: RequestContext) -> bytes: pattern = re.compile(re.escape(insert_before), re.IGNORECASE) html = pattern.sub(toolbar_html + insert_before, html, count=1) - return html.encode("utf-8") + # Return body as uncompressed UTF-8 with empty content-encoding. + # This applies to all successful toolbar injections, regardless of whether + # the input was originally compressed (we decompress before processing). + return html.encode("utf-8"), "" def _render_toolbar(self, data: dict[str, Any]) -> str: """Render the toolbar HTML. diff --git a/tests/integration/test_litestar_middleware.py b/tests/integration/test_litestar_middleware.py index 870f59c..8a92396 100644 --- a/tests/integration/test_litestar_middleware.py +++ b/tests/integration/test_litestar_middleware.py @@ -2,11 +2,14 @@ from __future__ import annotations +import gzip + import pytest from litestar.testing import TestClient from debug_toolbar.litestar import DebugToolbarPlugin, LitestarDebugToolbarConfig -from litestar import Litestar, MediaType, get +from litestar import Litestar, MediaType, Response, get +from litestar.status_codes import HTTP_200_OK @get("/", media_type=MediaType.HTML) @@ -298,3 +301,158 @@ async def after_request(response: Response) -> Response: assert response.status_code == 200 assert b"debug-toolbar" in response.content assert hook_state["before"], "before_request hook was not called" + + +class TestGzipCompression: + """Test toolbar injection with gzip-compressed responses.""" + + def test_toolbar_injected_with_gzip_compression(self) -> None: + """Test that toolbar is correctly injected when response is gzip-compressed. + + This tests the fix for the issue where gzip-compressed responses would + fail to have the toolbar injected because the middleware couldn't decode + the compressed bytes as UTF-8. + """ + from litestar.config.compression import CompressionConfig + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[html_handler], + plugins=[DebugToolbarPlugin(config)], + compression_config=CompressionConfig(backend="gzip", minimum_size=1), + debug=True, + ) + with TestClient(app) as client: + # Request with Accept-Encoding to trigger compression + response = client.get("/", headers={"Accept-Encoding": "gzip"}) + assert response.status_code == 200 + # At the TestClient level we see an uncompressed body with the toolbar injected. + assert b"debug-toolbar" in response.content + assert b"" in response.content + + def test_toolbar_injected_without_compression(self) -> None: + """Test that toolbar injection still works without compression.""" + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[html_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + + def test_invalid_gzip_data_with_gzip_header(self) -> None: + """Test handling of invalid gzip data with content-encoding: gzip header. + + When the response claims to be gzipped but contains invalid gzip data, + the middleware should gracefully fall back to treating it as uncompressed. + """ + + @get("/invalid-gzip", media_type=MediaType.HTML) + async def invalid_gzip_handler() -> Response: + """Return invalid gzip data with gzip content-encoding header.""" + # This is not valid gzip data + invalid_gzip = b"This is not gzipped data but pretends to be" + return Response( + content=invalid_gzip, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "gzip"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[invalid_gzip_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/invalid-gzip") + assert response.status_code == 200 + # Should return original content since it couldn't be decompressed + assert b"This is not gzipped data but pretends to be" in response.content + + def test_gzip_decompressed_data_fails_utf8_decoding(self) -> None: + """Test handling of valid gzip data that fails UTF-8 decoding after decompression. + + When gzipped data decompresses successfully but contains non-UTF-8 bytes, + the middleware should return the original compressed data. + """ + + @get("/binary-gzip", media_type=MediaType.HTML) + async def binary_gzip_handler() -> Response: + """Return gzipped binary data that's not valid UTF-8.""" + # Binary data that's not valid UTF-8 + binary_data = b"\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89" + gzipped = gzip.compress(binary_data) + return Response( + content=gzipped, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "gzip"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[binary_gzip_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + response = client.get("/binary-gzip") + assert response.status_code == 200 + # Should return decompressed binary data since UTF-8 decode failed + # The middleware has removed the gzip encoding, so we check for the raw binary content + assert response.content == b"\x80\x81\x82\x83\x84\x85\x86\x87\x88\x89" + + def test_gzip_header_case_insensitive(self) -> None: + """Test that content-encoding header matching is case-insensitive. + + The HTTP spec requires header names to be case-insensitive, so we should + handle various casings of "gzip" (e.g., "GZIP", "Gzip", "GzIp"). + """ + + @get("/gzip-upper", media_type=MediaType.HTML) + async def gzip_upper_handler() -> Response: + """Return gzipped HTML with uppercase GZIP encoding.""" + html = "

Test

" + gzipped = gzip.compress(html.encode("utf-8")) + return Response( + content=gzipped, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "GZIP"}, + ) + + @get("/gzip-mixed", media_type=MediaType.HTML) + async def gzip_mixed_handler() -> Response: + """Return gzipped HTML with mixed case GzIp encoding.""" + html = "

Test

" + gzipped = gzip.compress(html.encode("utf-8")) + return Response( + content=gzipped, + status_code=HTTP_200_OK, + media_type=MediaType.HTML, + headers={"content-encoding": "GzIp"}, + ) + + config = LitestarDebugToolbarConfig(enabled=True) + app = Litestar( + route_handlers=[gzip_upper_handler, gzip_mixed_handler], + plugins=[DebugToolbarPlugin(config)], + debug=True, + ) + with TestClient(app) as client: + # Test uppercase GZIP + response = client.get("/gzip-upper") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content + + # Test mixed case GzIp + response = client.get("/gzip-mixed") + assert response.status_code == 200 + assert b"debug-toolbar" in response.content + assert b"" in response.content