diff --git a/Cargo.lock b/Cargo.lock index 643461a..5d7536d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,7 +385,7 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "esi" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "atoi", "base64", @@ -407,7 +407,7 @@ dependencies = [ [[package]] name = "esi_example_advanced_error_handling" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -417,7 +417,7 @@ dependencies = [ [[package]] name = "esi_example_minimal" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -427,7 +427,7 @@ dependencies = [ [[package]] name = "esi_example_variants" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -437,7 +437,7 @@ dependencies = [ [[package]] name = "esi_try_example" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -447,7 +447,7 @@ dependencies = [ [[package]] name = "esi_vars_example" -version = "0.7.0-beta.4" +version = "0.7.0" dependencies = [ "env_logger", "esi", @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "fastly" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f767502306f09f6dcb76302d09cd2ea8542e228d5f155166f0c2da925e16c61" +checksum = "16393f187c703d5460d095201e194940a190479cd5a45aa7e324e8c97f4a3df4" dependencies = [ "anyhow", "bytes", @@ -485,9 +485,9 @@ dependencies = [ [[package]] name = "fastly-macros" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51ae08eeeb5ed0c1a8b454fc89dca0e316e13b7889e81fc9a435503c1e84a2d7" +checksum = "4e11b9b78e4d8d0fab4b9d7d8ba289c30d62d641e649e89153bc4d5446c88db2" dependencies = [ "proc-macro2", "quote", @@ -496,9 +496,9 @@ dependencies = [ [[package]] name = "fastly-shared" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d64ed1bba12ca45d1a2a80c2c55d903297adb3eeb4edc9d327c1d51ee709d404" +checksum = "e4ca5a9664c64b9f85188426aa1598e9885d6dbb247d6155fd9ebe043b551800" dependencies = [ "bitflags 1.3.2", "http", @@ -506,14 +506,15 @@ dependencies = [ [[package]] name = "fastly-sys" -version = "0.11.13" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1b82ebd99583740a074d8962ca75d7d17065b185a94e4919c3a3f2193268b6" +checksum = "d5dacc6ac7a7400e0b38757f48fbf1db09971812ef3dbb1f1a90a50746df662f" dependencies = [ "bitflags 1.3.2", "fastly-shared", + "http", "wasip2", - "wit-bindgen 0.46.0", + "wit-bindgen", ] [[package]] @@ -1367,7 +1368,7 @@ version = "1.0.2+wasi-0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" dependencies = [ - "wit-bindgen 0.51.0", + "wit-bindgen", ] [[package]] @@ -1376,7 +1377,7 @@ version = "0.4.0+wasi-0.3.0-rc-2026-01-06" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" dependencies = [ - "wit-bindgen 0.51.0", + "wit-bindgen", ] [[package]] @@ -1535,15 +1536,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "wit-bindgen" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" -dependencies = [ - "bitflags 2.11.1", -] - [[package]] name = "wit-bindgen" version = "0.51.0" diff --git a/Cargo.toml b/Cargo.toml index cbc9eae..a57ca1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ resolver = "2" [workspace.package] -version = "0.7.0-beta.4" +version = "0.7.0" authors = [ "Kailan Blanks ", "Vadim Getmanshchuk ", diff --git a/esi/Cargo.toml b/esi/Cargo.toml index 90adf4d..076e2c1 100644 --- a/esi/Cargo.toml +++ b/esi/Cargo.toml @@ -13,7 +13,7 @@ expose-internals = [] [dependencies] thiserror = "2.0.6" -fastly = "^0.11" +fastly = "^0.12" log = "^0.4" regex = "1.11.1" html-escape = "0.2.13" diff --git a/esi/src/lib.rs b/esi/src/lib.rs index 43032b4..cde5c70 100644 --- a/esi/src/lib.rs +++ b/esi/src/lib.rs @@ -797,17 +797,24 @@ impl Processor { match parse_result { Ok((remaining, elements)) => { - let mut handler = DocumentHandler { - processor: self, - output: output_writer, - dispatch_fragment_request: dispatcher, - fragment_response_handler: process_fragment_response, - }; - for element in elements { - handler.process(&element)?; - handler.process_queue()?; + { + let mut handler = DocumentHandler { + processor: self, + output: output_writer, + dispatch_fragment_request: dispatcher, + fragment_response_handler: process_fragment_response, + }; + for element in elements { + handler.process(&element)?; + handler.process_queue()?; + } } + // Flush any content written during this parse batch so it + // reaches the client immediately (progressive streaming) + // rather than buffering until drain_queue completes. + output_writer.flush()?; + if eof { // Nothing left to read — we're done break; @@ -1151,14 +1158,41 @@ impl Processor { ready => { // CompletedRequest or NoContent: process now. fragment.pending_fragment = ready; - let mut slot_buf = Vec::new(); - self.process_include( - *fragment, - &mut slot_buf, - dispatch_fragment_request, - process_fragment_response, - )?; - buf[slot] = Some(Bytes::from(slot_buf)); + + if slot == next_out { + // Head-of-line: stream directly to the + // client rather than buffering. + self.process_include( + *fragment, + output_writer, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[slot] = Some(Bytes::new()); + next_out += 1; + + // Flush any subsequent ready slots. + while next_out < buf.len() { + match &buf[next_out] { + Some(bytes) => { + output_writer.write_all(bytes)?; + buf[next_out] = Some(Bytes::new()); + next_out += 1; + } + None => break, + } + } + output_writer.flush()?; + } else { + let mut slot_buf = Vec::new(); + self.process_include( + *fragment, + &mut slot_buf, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[slot] = Some(Bytes::from(slot_buf)); + } // dca="esi" may push new items onto self.queue; // the outer while picks them up next iteration. } @@ -1325,6 +1359,11 @@ impl Processor { } } + // Flush written content to the client before potentially blocking + // on select(). Without this, data sits in the writer's buffer + // while we wait for slow includes, defeating streaming. + output_writer.flush()?; + // ------------------------------------------------------------------ // Step 3: done when nothing is pending. // ------------------------------------------------------------------ @@ -1399,16 +1438,47 @@ impl Processor { // ------------------------------------------------------- None => { fragment.pending_fragment = completed_content; - let mut slot_buf = Vec::new(); - self.process_include( - *fragment, - &mut slot_buf, - dispatch_fragment_request, - process_fragment_response, - )?; - buf[buf_slot] = Some(Bytes::from(slot_buf)); - // dca="esi" may push new QueuedElements onto self.queue. - // Loop back to Step 1 to assign them slots. + + if buf_slot == next_out { + // Head-of-line: write directly to the streaming + // output instead of buffering into a slot. This + // lets nested dca="esi" fragments stream + // incrementally — their isolated drain_queue + // writes (and flushes) to the real client writer + // rather than to a Vec that only surfaces later. + self.process_include( + *fragment, + output_writer, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[buf_slot] = Some(Bytes::new()); // mark consumed + next_out += 1; + + // Advance past any subsequent ready slots that + // can now be flushed contiguously. + while next_out < buf.len() { + match &buf[next_out] { + Some(bytes) => { + output_writer.write_all(bytes)?; + buf[next_out] = Some(Bytes::new()); + next_out += 1; + } + None => break, + } + } + output_writer.flush()?; + } else { + let mut slot_buf = Vec::new(); + self.process_include( + *fragment, + &mut slot_buf, + dispatch_fragment_request, + process_fragment_response, + )?; + buf[buf_slot] = Some(Bytes::from(slot_buf)); + } + // Loop back to Step 1 to pick up any new queue items. } // ------------------------------------------------------- diff --git a/esi/tests/esi_tests.rs b/esi/tests/esi_tests.rs index 27bdfd5..cdb44fc 100644 --- a/esi/tests/esi_tests.rs +++ b/esi/tests/esi_tests.rs @@ -202,11 +202,8 @@ fn test_esi_choose_compatibility_not_equal() { // Test for nested variable expansion - INVALID ESI SYNTAX // The construct $($(outer){param}) is NOT valid Akamai ESI syntax. // Akamai's ESI does not support nested variable expansion like this. -// This test was checking that it doesn't work, but the syntax is so invalid -// that different parsers may handle it differently (error vs. pass-through). #[test] -#[ignore] // Invalid ESI syntax - $($(var){key}) is not supported by Akamai ESI spec -fn test_nested_subfields() { +fn test_nested_subfields_is_invalid() { let input = r#" @@ -214,11 +211,10 @@ fn test_nested_subfields() { "#; let req = Request::get("http://example.com?param=value"); - let result = process_esi_document(input, req).expect("Processing should succeed"); - assert_ne!( - result.trim(), - "value", - "Nested variable expansion is not valid ESI syntax and should not work" + let result = process_esi_document(input, req); + assert!( + result.is_err(), + "Nested variable expansion $($(var){{key}}) is not valid ESI syntax and should fail" ); } diff --git a/esi/tests/eval_tests.rs b/esi/tests/eval_tests.rs index c570d3d..b6e19fe 100644 --- a/esi/tests/eval_tests.rs +++ b/esi/tests/eval_tests.rs @@ -1,5 +1,5 @@ use esi::{Configuration, Processor}; -use fastly::{Request, Response}; +use fastly::{Backend, Request, Response}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; @@ -418,3 +418,155 @@ fn test_triple_nested_dca_esi_document_order() -> esi::Result<()> { ); Ok(()) } + +/// Writer that snapshots its buffer on every flush(), so we can check +/// what data was available to the client at each flush point. +struct FlushTrackingWriter { + data: Vec, + /// Snapshots of `data` taken at each flush() call. + flush_snapshots: Vec>, +} + +impl FlushTrackingWriter { + fn new() -> Self { + Self { + data: Vec::new(), + flush_snapshots: Vec::new(), + } + } +} + +impl std::io::Write for FlushTrackingWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.data.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.flush_snapshots.push(self.data.clone()); + Ok(()) + } +} + +/// Regression: content before an include should be flushed, not held +/// until the include resolves. Without flush() the StreamingBody buffers +/// everything and the client sees nothing until processing finishes. +#[test] +fn test_streaming_flush_before_pending_include() -> esi::Result<()> { + let input = r#" +
Header
+ +
Footer
+"#; + + let dispatcher = + |_req: Request, _maxwait: Option| -> esi::Result { + Ok(esi::PendingFragmentContent::CompletedRequest(Box::new( + Response::from_body("FRAGMENT"), + ))) + }; + + let reader = std::io::BufReader::new(std::io::Cursor::new(input.as_bytes())); + let mut writer = FlushTrackingWriter::new(); + let mut processor = Processor::new(None, Configuration::default()); + processor.process_stream(reader, &mut writer, Some(&dispatcher), None)?; + + let result = String::from_utf8(writer.data).unwrap(); + assert!( + result.contains("
Header
"), + "Should contain header" + ); + assert!(result.contains("FRAGMENT"), "Should contain fragment"); + assert!( + result.contains("
Footer
"), + "Should contain footer" + ); + + assert!( + !writer.flush_snapshots.is_empty(), + "expected at least one flush during processing" + ); + + // The content before the include must already be present at the first flush. + let first_flush = String::from_utf8_lossy(&writer.flush_snapshots[0]); + assert!( + first_flush.contains("
Header
"), + "First flush should contain content before the include. Got: {first_flush}" + ); + + Ok(()) +} + +/// End-to-end streaming test with a real slow backend (httpbin.org/delay/1). +/// +/// The outer doc has fast content before a dca="esi" include whose fragment +/// itself contains a slow nested include. We verify that the fast content +/// is flushed to the client *before* the slow include resolves — i.e. the +/// stream doesn't stall at the outermost include boundary. +/// +/// Requires network; ~1s. Run with: `cargo test -- --ignored` +#[test] +#[ignore] +fn test_nested_dca_esi_real_async_streaming() -> esi::Result<()> { + let input = r#"

Title

+ +
End
"#; + + let dispatcher = + |req: Request, _maxwait: Option| -> esi::Result { + let url = req.get_url_str().to_string(); + if url.contains("example.com/parent") { + // Immediate response; body has a nested include that hits a slow endpoint + Ok(esi::PendingFragmentContent::CompletedRequest(Box::new( + Response::from_body( + "

Fast content

\n", + ), + ))) + } else { + // Actually hit httpbin — this is the slow request + let backend = Backend::builder("httpbin.org", "httpbin.org") + .enable_ssl() + .sni_hostname("httpbin.org") + .finish() + .map_err(|e| { + esi::ESIError::FragmentRequestError(format!( + "failed to create httpbin backend: {e}" + )) + })?; + let pending = req.send_async(backend)?; + Ok(esi::PendingFragmentContent::PendingRequest(Box::new( + pending, + ))) + } + }; + + let reader = std::io::BufReader::new(std::io::Cursor::new(input.as_bytes())); + let mut writer = FlushTrackingWriter::new(); + let mut processor = Processor::new(None, Configuration::default()); + processor.process_stream(reader, &mut writer, Some(&dispatcher), None)?; + + let result = String::from_utf8(writer.data).unwrap(); + + assert!(result.contains("

Title

")); + assert!(result.contains("

Fast content

")); + assert!(result.contains("
End
")); + + // The key check: some flush snapshot must contain the fast content but + // NOT the footer. The footer can't appear until the slow nested include + // finishes, so if we see this split it means the stream didn't stall. + let has_incremental_flush = writer.flush_snapshots.iter().any(|snap| { + let s = String::from_utf8_lossy(snap); + s.contains("

Fast content

") && !s.contains("
End
") + }); + assert!( + has_incremental_flush, + "fast content should be flushed before the footer; snapshots: {:?}", + writer + .flush_snapshots + .iter() + .map(|s| String::from_utf8_lossy(s).to_string()) + .collect::>() + ); + + Ok(()) +} diff --git a/examples/esi_example_advanced_error_handling/Cargo.toml b/examples/esi_example_advanced_error_handling/Cargo.toml index 6886b4c..01bdea1 100644 --- a/examples/esi_example_advanced_error_handling/Cargo.toml +++ b/examples/esi_example_advanced_error_handling/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/examples/esi_example_minimal/Cargo.toml b/examples/esi_example_minimal/Cargo.toml index 1d7f6ee..2d3ce28 100644 --- a/examples/esi_example_minimal/Cargo.toml +++ b/examples/esi_example_minimal/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/examples/esi_example_variants/Cargo.toml b/examples/esi_example_variants/Cargo.toml index dff42b7..1910404 100644 --- a/examples/esi_example_variants/Cargo.toml +++ b/examples/esi_example_variants/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/examples/esi_try_example/Cargo.toml b/examples/esi_try_example/Cargo.toml index 642a01c..136b6ca 100644 --- a/examples/esi_try_example/Cargo.toml +++ b/examples/esi_try_example/Cargo.toml @@ -10,7 +10,7 @@ publish = false debug = 1 [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "0.11.3" diff --git a/examples/esi_vars_example/Cargo.toml b/examples/esi_vars_example/Cargo.toml index b3d99b1..e5d3a3f 100644 --- a/examples/esi_vars_example/Cargo.toml +++ b/examples/esi_vars_example/Cargo.toml @@ -7,7 +7,7 @@ edition.workspace = true publish = false [dependencies] -fastly = "^0.11" +fastly = "^0.12" esi = { path = "../../esi" } log = "^0.4" env_logger = "^0.11" diff --git a/rust-toolchain b/rust-toolchain index 82e24bf..55f6ae9 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.90.0 +1.95.0