From aa64f6b4c5fb55027ea7faed40d2a2645ef58c3c Mon Sep 17 00:00:00 2001 From: Vadim Getmanshchuk Date: Tue, 28 Apr 2026 20:04:04 -0500 Subject: [PATCH 1/3] fix: flush output writer during streaming to enable progressive rendering Without explicit flush() calls, content written to the StreamingBody accumulated in the writer's buffer while the processor blocked waiting for pending includes via select(). This meant the client saw nothing until all fragments resolved, defeating the purpose of streaming. Add flush() at two points: - In process_stream, after processing each batch of parsed elements - In drain_queue, after flushing ready slots (Step 2) and before blocking on select() (Step 4) Fixes #44 --- esi/src/lib.rs | 30 ++++++++++----- esi/tests/eval_tests.rs | 83 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 9 deletions(-) diff --git a/esi/src/lib.rs b/esi/src/lib.rs index 43032b4..a9b7657 100644 --- a/esi/src/lib.rs +++ b/esi/src/lib.rs @@ -797,17 +797,24 @@ impl Processor { match parse_result { Ok((remaining, elements)) => { - let mut handler = DocumentHandler { - processor: self, - output: output_writer, - dispatch_fragment_request: dispatcher, - fragment_response_handler: process_fragment_response, - }; - for element in elements { - handler.process(&element)?; - handler.process_queue()?; + { + let mut handler = DocumentHandler { + processor: self, + output: output_writer, + dispatch_fragment_request: dispatcher, + fragment_response_handler: process_fragment_response, + }; + for element in elements { + handler.process(&element)?; + handler.process_queue()?; + } } + // Flush any content written during this parse batch so it + // reaches the client immediately (progressive streaming) + // rather than buffering until drain_queue completes. + output_writer.flush()?; + if eof { // Nothing left to read — we're done break; @@ -1325,6 +1332,11 @@ impl Processor { } } + // Flush written content to the client before potentially blocking + // on select(). Without this, data sits in the writer's buffer + // while we wait for slow includes, defeating streaming. + output_writer.flush()?; + // ------------------------------------------------------------------ // Step 3: done when nothing is pending. // ------------------------------------------------------------------ diff --git a/esi/tests/eval_tests.rs b/esi/tests/eval_tests.rs index c570d3d..73e908e 100644 --- a/esi/tests/eval_tests.rs +++ b/esi/tests/eval_tests.rs @@ -418,3 +418,86 @@ fn test_triple_nested_dca_esi_document_order() -> esi::Result<()> { ); Ok(()) } + +/// A writer that records every flush() call along with the data written so far. +/// This lets us assert that content is flushed at the right points during streaming. +struct FlushTrackingWriter { + data: Vec, + /// Snapshots of `data` taken at each flush() call. + flush_snapshots: Vec>, +} + +impl FlushTrackingWriter { + fn new() -> Self { + Self { + data: Vec::new(), + flush_snapshots: Vec::new(), + } + } +} + +impl std::io::Write for FlushTrackingWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.data.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.flush_snapshots.push(self.data.clone()); + Ok(()) + } +} + +/// Test that process_stream flushes output before blocking on pending includes. +/// +/// Content written before the first include must be flushed so that a streaming +/// client (e.g. Fastly StreamingBody) sends it immediately rather than buffering +/// until all includes have resolved (issue #45 streaming complaint). +#[test] +fn test_streaming_flush_before_pending_include() -> esi::Result<()> { + let input = r#" +
Header
+ +
Footer
+"#; + + let dispatcher = + |_req: Request, _maxwait: Option| -> esi::Result { + Ok(esi::PendingFragmentContent::CompletedRequest(Box::new( + Response::from_body("FRAGMENT"), + ))) + }; + + let reader = std::io::BufReader::new(std::io::Cursor::new(input.as_bytes())); + let mut writer = FlushTrackingWriter::new(); + let mut processor = Processor::new(None, Configuration::default()); + processor.process_stream(reader, &mut writer, Some(&dispatcher), None)?; + + // Final output must be correct + let result = String::from_utf8(writer.data).unwrap(); + assert!( + result.contains("
Header
"), + "Should contain header" + ); + assert!(result.contains("FRAGMENT"), "Should contain fragment"); + assert!( + result.contains("
Footer
"), + "Should contain footer" + ); + + // At least one flush must have occurred BEFORE the final output, + // meaning the first flush snapshot should NOT contain everything. + assert!( + !writer.flush_snapshots.is_empty(), + "flush() should have been called at least once during processing" + ); + + // The first flush should contain content written before the include + let first_flush = String::from_utf8_lossy(&writer.flush_snapshots[0]); + assert!( + first_flush.contains("
Header
"), + "First flush should contain content before the include. Got: {first_flush}" + ); + + Ok(()) +} From 576d2b440510b4a5edabad3f05ee1b5243e54355 Mon Sep 17 00:00:00 2001 From: Vadim Getmanshchuk Date: Wed, 29 Apr 2026 11:52:18 -0500 Subject: [PATCH 2/3] fix: ensure immediate flushing of output during streaming to prevent stalling --- esi/src/lib.rs | 94 ++++++++++++++++++++++++++++++++-------- esi/tests/eval_tests.rs | 95 +++++++++++++++++++++++++++++++++++------ 2 files changed, 158 insertions(+), 31 deletions(-) diff --git a/esi/src/lib.rs b/esi/src/lib.rs index a9b7657..cde5c70 100644 --- a/esi/src/lib.rs +++ b/esi/src/lib.rs @@ -1158,14 +1158,41 @@ impl Processor { ready => { // CompletedRequest or NoContent: process now. fragment.pending_fragment = ready; - let mut slot_buf = Vec::new(); - self.process_include( - *fragment, - &mut slot_buf, - dispatch_fragment_request, - process_fragment_response, - )?; - buf[slot] = Some(Bytes::from(slot_buf)); + + if slot == next_out { + // Head-of-line: stream directly to the + // client rather than buffering. + self.process_include( + *fragment, + output_writer, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[slot] = Some(Bytes::new()); + next_out += 1; + + // Flush any subsequent ready slots. + while next_out < buf.len() { + match &buf[next_out] { + Some(bytes) => { + output_writer.write_all(bytes)?; + buf[next_out] = Some(Bytes::new()); + next_out += 1; + } + None => break, + } + } + output_writer.flush()?; + } else { + let mut slot_buf = Vec::new(); + self.process_include( + *fragment, + &mut slot_buf, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[slot] = Some(Bytes::from(slot_buf)); + } // dca="esi" may push new items onto self.queue; // the outer while picks them up next iteration. } @@ -1411,16 +1438,47 @@ impl Processor { // ------------------------------------------------------- None => { fragment.pending_fragment = completed_content; - let mut slot_buf = Vec::new(); - self.process_include( - *fragment, - &mut slot_buf, - dispatch_fragment_request, - process_fragment_response, - )?; - buf[buf_slot] = Some(Bytes::from(slot_buf)); - // dca="esi" may push new QueuedElements onto self.queue. - // Loop back to Step 1 to assign them slots. + + if buf_slot == next_out { + // Head-of-line: write directly to the streaming + // output instead of buffering into a slot. This + // lets nested dca="esi" fragments stream + // incrementally — their isolated drain_queue + // writes (and flushes) to the real client writer + // rather than to a Vec that only surfaces later. + self.process_include( + *fragment, + output_writer, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[buf_slot] = Some(Bytes::new()); // mark consumed + next_out += 1; + + // Advance past any subsequent ready slots that + // can now be flushed contiguously. + while next_out < buf.len() { + match &buf[next_out] { + Some(bytes) => { + output_writer.write_all(bytes)?; + buf[next_out] = Some(Bytes::new()); + next_out += 1; + } + None => break, + } + } + output_writer.flush()?; + } else { + let mut slot_buf = Vec::new(); + self.process_include( + *fragment, + &mut slot_buf, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[buf_slot] = Some(Bytes::from(slot_buf)); + } + // Loop back to Step 1 to pick up any new queue items. } // ------------------------------------------------------- diff --git a/esi/tests/eval_tests.rs b/esi/tests/eval_tests.rs index 73e908e..b6e19fe 100644 --- a/esi/tests/eval_tests.rs +++ b/esi/tests/eval_tests.rs @@ -1,5 +1,5 @@ use esi::{Configuration, Processor}; -use fastly::{Request, Response}; +use fastly::{Backend, Request, Response}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -419,8 +419,8 @@ fn test_triple_nested_dca_esi_document_order() -> esi::Result<()> { Ok(()) } -/// A writer that records every flush() call along with the data written so far. -/// This lets us assert that content is flushed at the right points during streaming. +/// Writer that snapshots its buffer on every flush(), so we can check +/// what data was available to the client at each flush point. struct FlushTrackingWriter { data: Vec, /// Snapshots of `data` taken at each flush() call. @@ -448,11 +448,9 @@ impl std::io::Write for FlushTrackingWriter { } } -/// Test that process_stream flushes output before blocking on pending includes. -/// -/// Content written before the first include must be flushed so that a streaming -/// client (e.g. Fastly StreamingBody) sends it immediately rather than buffering -/// until all includes have resolved (issue #45 streaming complaint). +/// Regression: content before an include should be flushed, not held +/// until the include resolves. Without flush() the StreamingBody buffers +/// everything and the client sees nothing until processing finishes. #[test] fn test_streaming_flush_before_pending_include() -> esi::Result<()> { let input = r#" @@ -473,7 +471,6 @@ fn test_streaming_flush_before_pending_include() -> esi::Result<()> { let mut processor = Processor::new(None, Configuration::default()); processor.process_stream(reader, &mut writer, Some(&dispatcher), None)?; - // Final output must be correct let result = String::from_utf8(writer.data).unwrap(); assert!( result.contains("
Header
"), @@ -485,14 +482,12 @@ fn test_streaming_flush_before_pending_include() -> esi::Result<()> { "Should contain footer" ); - // At least one flush must have occurred BEFORE the final output, - // meaning the first flush snapshot should NOT contain everything. assert!( !writer.flush_snapshots.is_empty(), - "flush() should have been called at least once during processing" + "expected at least one flush during processing" ); - // The first flush should contain content written before the include + // The content before the include must already be present at the first flush. let first_flush = String::from_utf8_lossy(&writer.flush_snapshots[0]); assert!( first_flush.contains("
Header
"), @@ -501,3 +496,77 @@ fn test_streaming_flush_before_pending_include() -> esi::Result<()> { Ok(()) } + +/// End-to-end streaming test with a real slow backend (httpbin.org/delay/1). +/// +/// The outer doc has fast content before a dca="esi" include whose fragment +/// itself contains a slow nested include. We verify that the fast content +/// is flushed to the client *before* the slow include resolves — i.e. the +/// stream doesn't stall at the outermost include boundary. +/// +/// Requires network; ~1s. Run with: `cargo test -- --ignored` +#[test] +#[ignore] +fn test_nested_dca_esi_real_async_streaming() -> esi::Result<()> { + let input = r#"

Title

+ +
End
"#; + + let dispatcher = + |req: Request, _maxwait: Option| -> esi::Result { + let url = req.get_url_str().to_string(); + if url.contains("example.com/parent") { + // Immediate response; body has a nested include that hits a slow endpoint + Ok(esi::PendingFragmentContent::CompletedRequest(Box::new( + Response::from_body( + "

Fast content

\n", + ), + ))) + } else { + // Actually hit httpbin — this is the slow request + let backend = Backend::builder("httpbin.org", "httpbin.org") + .enable_ssl() + .sni_hostname("httpbin.org") + .finish() + .map_err(|e| { + esi::ESIError::FragmentRequestError(format!( + "failed to create httpbin backend: {e}" + )) + })?; + let pending = req.send_async(backend)?; + Ok(esi::PendingFragmentContent::PendingRequest(Box::new( + pending, + ))) + } + }; + + let reader = std::io::BufReader::new(std::io::Cursor::new(input.as_bytes())); + let mut writer = FlushTrackingWriter::new(); + let mut processor = Processor::new(None, Configuration::default()); + processor.process_stream(reader, &mut writer, Some(&dispatcher), None)?; + + let result = String::from_utf8(writer.data).unwrap(); + + assert!(result.contains("

Title

")); + assert!(result.contains("

Fast content

")); + assert!(result.contains("
End
")); + + // The key check: some flush snapshot must contain the fast content but + // NOT the footer. The footer can't appear until the slow nested include + // finishes, so if we see this split it means the stream didn't stall. + let has_incremental_flush = writer.flush_snapshots.iter().any(|snap| { + let s = String::from_utf8_lossy(snap); + s.contains("

Fast content

") && !s.contains("
End
") + }); + assert!( + has_incremental_flush, + "fast content should be flushed before the footer; snapshots: {:?}", + writer + .flush_snapshots + .iter() + .map(|s| String::from_utf8_lossy(s).to_string()) + .collect::>() + ); + + Ok(()) +} From 151d6fa0d9e3bfe250e056a21bfce9496daa1400 Mon Sep 17 00:00:00 2001 From: Vadim Getmanshchuk Date: Wed, 29 Apr 2026 12:50:37 -0500 Subject: [PATCH 3/3] bump: update version to 0.7.0 and fastly dependency to 0.12 in Cargo.toml and Cargo.lock; update rust toolchain to 1.95.0 --- Cargo.lock | 44 ++++++++----------- Cargo.toml | 2 +- esi/Cargo.toml | 2 +- esi/tests/esi_tests.rs | 14 +++--- .../Cargo.toml | 2 +- examples/esi_example_minimal/Cargo.toml | 2 +- examples/esi_example_variants/Cargo.toml | 2 +- examples/esi_try_example/Cargo.toml | 2 +- examples/esi_vars_example/Cargo.toml | 2 +- rust-toolchain | 2 +- 10 files changed, 31 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 643461a..5d7536d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,7 +385,7 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "esi" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "atoi", "base64", @@ -407,7 +407,7 @@ dependencies = [ [[package]] name = "esi_example_advanced_error_handling" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -417,7 +417,7 @@ dependencies = [ [[package]] name = "esi_example_minimal" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -427,7 +427,7 @@ dependencies = [ [[package]] name = "esi_example_variants" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -437,7 +437,7 @@ dependencies = [ [[package]] name = "esi_try_example" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -447,7 +447,7 @@ dependencies = [ [[package]] name = "esi_vars_example" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "fastly" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f767502306f09f6dcb76302d09cd2ea8542e228d5f155166f0c2da925e16c61" +checksum = "16393f187c703d5460d095201e194940a190479cd5a45aa7e324e8c97f4a3df4" dependencies = [ "anyhow", "bytes", @@ -485,9 +485,9 @@ dependencies = [ [[package]] name = "fastly-macros" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51ae08eeeb5ed0c1a8b454fc89dca0e316e13b7889e81fc9a435503c1e84a2d7" +checksum = "4e11b9b78e4d8d0fab4b9d7d8ba289c30d62d641e649e89153bc4d5446c88db2" dependencies = [ "proc-macro2", "quote", @@ -496,9 +496,9 @@ dependencies = [ [[package]] name = "fastly-shared" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d64ed1bba12ca45d1a2a80c2c55d903297adb3eeb4edc9d327c1d51ee709d404" +checksum = "e4ca5a9664c64b9f85188426aa1598e9885d6dbb247d6155fd9ebe043b551800" dependencies = [ "bitflags 1.3.2", "http", @@ -506,14 +506,15 @@ dependencies = [ [[package]] name = "fastly-sys" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1b82ebd99583740a074d8962ca75d7d17065b185a94e4919c3a3f2193268b6" +checksum = "d5dacc6ac7a7400e0b38757f48fbf1db09971812ef3dbb1f1a90a50746df662f" dependencies = [ "bitflags 1.3.2", "fastly-shared", + "http", "wasip2", - "wit-bindgen 0.46.0", + "wit-bindgen", ] [[package]] @@ -1367,7 +1368,7 @@ version = "1.0.2+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" dependencies = [ - "wit-bindgen 0.51.0", + "wit-bindgen", ] [[package]] @@ -1376,7 +1377,7 @@ version = "0.4.0+wasi-0.3.0-rc-2026-01-06" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" dependencies = [ - "wit-bindgen 0.51.0", + "wit-bindgen", ] [[package]] @@ -1535,15 +1536,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "wit-bindgen" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" -dependencies = [ - "bitflags 2.11.1", -] - [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index cbc9eae..a57ca1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ resolver = "2" [workspace.package] -version = "0.7.0-beta.4" +version = "0.7.0" authors = [ "Kailan Blanks ", "Vadim Getmanshchuk ", diff --git a/esi/Cargo.toml b/esi/Cargo.toml index 90adf4d..076e2c1 100644 --- a/esi/Cargo.toml +++ b/esi/Cargo.toml @@ -13,7 +13,7 @@ expose-internals = [] [dependencies] thiserror = "2.0.6" -fastly = "^0.11" +fastly = "^0.12" log = "^0.4" regex = "1.11.1" html-escape = "0.2.13" diff --git a/esi/tests/esi_tests.rs b/esi/tests/esi_tests.rs index 27bdfd5..cdb44fc 100644 --- a/esi/tests/esi_tests.rs +++ b/esi/tests/esi_tests.rs @@ -202,11 +202,8 @@ fn test_esi_choose_compatibility_not_equal() { // Test for nested variable expansion - INVALID ESI SYNTAX // The construct $($(outer){param}) is NOT valid Akamai ESI syntax. // Akamai's ESI does not support nested variable expansion like this. -// This test was checking that it doesn't work, but the syntax is so invalid -// that different parsers may handle it differently (error vs. pass-through). #[test] -#[ignore] // Invalid ESI syntax - $($(var){key}) is not supported by Akamai ESI spec -fn test_nested_subfields() { +fn test_nested_subfields_is_invalid() { let input = r#" @@ -214,11 +211,10 @@ fn test_nested_subfields() { "#; let req = Request::get("http://example.com?param=value"); - let result = process_esi_document(input, req).expect("Processing should succeed"); - assert_ne!( - result.trim(), - "value", - "Nested variable expansion is not valid ESI syntax and should not work" + let result = process_esi_document(input, req); + assert!( + result.is_err(), + "Nested variable expansion $($(var){{key}}) is not valid ESI syntax and should fail" ); } diff --git a/examples/esi_example_advanced_error_handling/Cargo.toml b/examples/esi_example_advanced_error_handling/Cargo.toml index 6886b4c..01bdea1 100644 --- a/examples/esi_example_advanced_error_handling/Cargo.toml +++ b/examples/esi_example_advanced_error_handling/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/examples/esi_example_minimal/Cargo.toml b/examples/esi_example_minimal/Cargo.toml index 1d7f6ee..2d3ce28 100644 --- a/examples/esi_example_minimal/Cargo.toml +++ b/examples/esi_example_minimal/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/examples/esi_example_variants/Cargo.toml b/examples/esi_example_variants/Cargo.toml index dff42b7..1910404 100644 --- a/examples/esi_example_variants/Cargo.toml +++ b/examples/esi_example_variants/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/examples/esi_try_example/Cargo.toml b/examples/esi_try_example/Cargo.toml index 642a01c..136b6ca 100644 --- a/examples/esi_try_example/Cargo.toml +++ b/examples/esi_try_example/Cargo.toml @@ -10,7 +10,7 @@ publish = false debug = 1 [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "0.11.3" diff --git a/examples/esi_vars_example/Cargo.toml b/examples/esi_vars_example/Cargo.toml index b3d99b1..e5d3a3f 100644 --- a/examples/esi_vars_example/Cargo.toml +++ b/examples/esi_vars_example/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/rust-toolchain b/rust-toolchain index 82e24bf..55f6ae9 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.90.0 +1.95.0