From 340cdae5150d8f20610d0942a7d2d9f2cc0d2474 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 7 Jan 2021 17:20:06 +0100 Subject: [PATCH 01/38] Update dependency versions, use tokio-native-tls. Update to Rust 2018 and async/await. --- Cargo.toml | 27 +++---- README.md | 27 ++----- examples/follower.rs | 9 +-- src/event.rs | 4 +- src/lib.rs | 152 +++++++++++++--------------------------- tests/changes-stream.rs | 4 +- 6 files changed, 75 insertions(+), 148 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 04ea12039..2de2de854 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,19 +1,22 @@ [package] name = "changes-stream" description = "couchdb follower" -version = "0.1.0" -authors = ["Ashley Williams "] +version = "0.2.0" +authors = [ + "Ashley Williams ", + "René Rössler ", +] license = "MIT" +edition = "2018" [dependencies] -futures = "0.1.10" -tokio-core = "0.1.4" -serde = "0.8" -serde_json = "0.8" -serde_derive = "0.8" +tokio = "1.0" +serde = "1.0" +serde_json = "1.0" +serde_derive = "1.0" +hyper = { version = "0.14", features = ["client", "http1", "stream"] } +hyper-tls = "0.5" +tokio-native-tls = "0.3.0" -[dependencies.hyper] -git = "https://github.com/hyperium/hyper.git" - -[dependencies.hyper-tls] -git = "https://github.com/hyperium/hyper-tls.git" +[dev-dependencies] +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/README.md b/README.md index 43ae7863a..7714bd77b 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,6 @@ an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-strea this code reads in a readable stream from an endpoint and returns each chunk in JSON. -this code works off of the [`tokio` branch] of [`hyper`] to take advantage of new Rust Futures. - -[`tokio` branch]: https://github.com/hyperium/hyper/tree/tokio -[`hyper`]: https:///crates.io/crates/hyper - -`changes-stream-rust` only works on nightly because it uses [`serde`]. - -[`serde`]: https://crates.io/crates/serde ## usage @@ -22,28 +14,21 @@ in your `Cargo.toml`: ```toml [dependencies] -changes-stream = { git = "https://github.com/ashleygwilliams/changes-stream-rust.git" } +changes-stream = "0.2" ``` from [examples/follower.rs](/examples/follower.rs): ```rust -extern crate changes_stream; -extern crate futures; - -use std::io; -use std::io::Write; - use changes_stream::ChangesStream; -fn main() { - let url = "https://replicate.npmjs.com/_changes".to_string(); +#[tokio::main] +async fn main() { + let url = String::from("https://replicate.npmjs.com/_changes"); let mut changes = ChangesStream::new(url); - changes.on(|change| { - io::stdout().write_all(&change).unwrap(); + println!("{}: {}", change.seq, change.id); }); - - changes.run(); + changes.run().await; } ``` diff --git a/examples/follower.rs b/examples/follower.rs index d256463d3..347e02f19 100644 --- a/examples/follower.rs +++ b/examples/follower.rs @@ -1,14 +1,11 @@ -extern crate serde_json; -extern crate changes_stream; -extern crate futures; - use changes_stream::ChangesStream; -fn main() { +#[tokio::main] +async fn main() { let url = String::from("https://replicate.npmjs.com/_changes"); let mut changes = ChangesStream::new(url); changes.on(|change| { println!("{}: {}", change.seq, change.id); }); - changes.run(); + changes.run().await; } diff --git a/src/event.rs b/src/event.rs index a9ab5c3bb..9118906c8 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,8 +1,10 @@ +use serde_derive::{Deserialize, Serialize}; + #[derive(Serialize, Deserialize, Debug)] pub struct Event { pub seq: u64, pub id: String, - pub changes: Vec + pub changes: Vec, } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/lib.rs b/src/lib.rs index ad06e5745..4883f1bb8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,26 +4,15 @@ //! chunked data, upon which you can register multiple handlers, that are //! called on Read of the data chunk. -#[macro_use] -extern crate serde_derive; - -extern crate serde; -extern crate serde_json; -extern crate futures; -extern crate hyper; -extern crate hyper_tls; -extern crate tokio_core; - -use futures::Future; -use futures::stream::Stream; -use hyper::Client; +use hyper::{body::HttpBody, client::Client, Body}; +use hyper_tls::HttpsConnector; use std::cell::RefCell; mod event; use event::Event; -const DELIMITER: &'static str = ",\n"; -const PROLOGUE: &'static str = "{\"results\":["; +const DELIMITER: &str = ",\n"; +const PROLOGUE: &str = "{\"results\":["; /// A structure to generate a readable stream on which you can register handlers. /// @@ -39,8 +28,7 @@ const PROLOGUE: &'static str = "{\"results\":["; /// [`hyper::Chunk`]: ../hyper/struct.Chunk.html pub struct ChangesStream { db: hyper::Uri, - lp: tokio_core::reactor::Core, - handlers: Vec>, + handlers: Vec>, } impl ChangesStream { @@ -50,23 +38,16 @@ impl ChangesStream { /// url of the data you wish to stream. /// /// Every `ChangesStream` struct is initialized with - /// an event loop ([`tokio_core::reactor::Core`]) and an - /// empty vector of handlers. See above for more details. - /// - /// [`tokio_core::reactor::Core`]: ../tokio_core/reactor/struct.Core.html + /// an empty vector of handlers. See above for more details. /// /// For example, to create a new `ChangesStream` struct /// for the npmjs registry, you would write: /// /// ```no_run - /// # extern crate serde_json; - /// # extern crate changes_stream; - /// # extern crate futures; - /// # - /// # /// # use changes_stream::ChangesStream; /// # - /// # fn main() { + /// # #[tokio::main] + /// # async fn main() { /// let url = "https://replicate.npmjs.com/_changes".to_string(); /// let mut changes = ChangesStream::new(url); /// # @@ -75,13 +56,12 @@ impl ChangesStream { /// # println!("{}", data); /// # }); /// # - /// # changes.run(); + /// # changes.run().await; /// # } /// ``` pub fn new(db: String) -> ChangesStream { ChangesStream { db: db.parse().unwrap(), - lp: tokio_core::reactor::Core::new().unwrap(), handlers: vec![], } } @@ -100,13 +80,10 @@ impl ChangesStream { /// out, you would write: /// /// ```no_run - /// # extern crate serde_json; - /// # extern crate changes_stream; - /// # extern crate futures; - /// # /// # use changes_stream::ChangesStream; /// # - /// # fn main() { + /// # #[tokio::main] + /// # async fn main() { /// # let url = "https://replicate.npmjs.com/_changes".to_string(); /// # let mut changes = ChangesStream::new(url); /// # @@ -122,7 +99,7 @@ impl ChangesStream { self.handlers.push(Box::new(handler)); } - /// Runs the `ChangesStream` struct's event loop, `lp`. + /// Runs the `ChangesStream` struct's event loop. /// /// Call this after you have regsitered all handlers using /// `on`. @@ -132,13 +109,10 @@ impl ChangesStream { /// For example: /// /// ```no_run - /// # extern crate serde_json; - /// # extern crate changes_stream; - /// # extern crate futures; - /// # /// # use changes_stream::ChangesStream; /// # - /// # fn main() { + /// # #[tokio::main] + /// # async fn main() { /// # let url = "https://replicate.npmjs.com/_changes".to_string(); /// # let mut changes = ChangesStream::new(url); /// # @@ -150,75 +124,43 @@ impl ChangesStream { /// changes.run(); /// # } /// ``` - pub fn run(mut self) { - let handle = self.lp.handle(); - let client = Client::configure() - // 4 is number of threads to use for dns resolution - .connector(hyper_tls::HttpsConnector::new(4, &handle)) - .build(&handle); - - let handlers = self.handlers; - self.lp - .run(client.get(self.db).and_then(move |res| { - assert!(res.status().is_success()); - - // Buffer up incomplete json lines. - let buffer: Vec = vec![]; - let buffer_cell = RefCell::new(buffer); - - res.body().for_each(move |chunk| { - if chunk.starts_with(PROLOGUE.as_bytes()) { - return Ok(()); - } - let mut source = chunk.to_vec(); - let mut borrowed = buffer_cell.borrow_mut(); - if borrowed.len() > 0 { - source = [borrowed.clone(), chunk.to_vec()].concat(); - borrowed.clear(); - } - if chunk.starts_with(DELIMITER.as_bytes()) { - source = chunk[2..].to_vec(); - } - - match serde_json::from_slice(source.as_slice()) { - Err(_) => { - // We probably have an incomplete chunk of json. Buffer it & move on. - borrowed.append(&mut chunk.to_vec()); - } - Ok(json) => { - let event: Event = json; - for handler in &handlers { - handler(&event); - } - } - } - Ok(()) - }) - })) - .unwrap(); - } - - pub fn run_with(self, mut lp: tokio_core::reactor::Core) { - let handle = self.lp.handle(); - let client = Client::configure() - // 4 is number of threads to use for dns resolution - .connector(hyper_tls::HttpsConnector::new(4, &handle)) - .build(&handle); + pub async fn run(self) { + let client = Client::builder().build::<_, Body>(HttpsConnector::new()); let handlers = self.handlers; - lp - .run(client.get(self.db).and_then(move |res| { - assert!(res.status().is_success()); - - res.body().for_each(move |chunk| { - let event: Event = serde_json::from_slice(&chunk).unwrap(); + let mut res = client.get(self.db).await.unwrap(); + assert!(res.status().is_success()); + + // Buffer up incomplete json lines. + let buffer: Vec = vec![]; + let buffer_cell = RefCell::new(buffer); + + while let Some(Ok(chunk)) = res.body_mut().data().await { + if chunk.starts_with(PROLOGUE.as_bytes()) { + continue; + } + let mut source = chunk.to_vec(); + let mut borrowed = buffer_cell.borrow_mut(); + if borrowed.len() > 0 { + source = [borrowed.clone(), chunk.to_vec()].concat(); + borrowed.clear(); + } + if chunk.starts_with(DELIMITER.as_bytes()) { + source = chunk[2..].to_vec(); + } + + match serde_json::from_slice(source.as_slice()) { + Err(_) => { + // We probably have an incomplete chunk of json. Buffer it & move on. + borrowed.append(&mut chunk.to_vec()); + } + Ok(json) => { + let event: Event = json; for handler in &handlers { handler(&event); } - Ok(()) - }) - })) - .unwrap(); - + } + } + } } } diff --git a/tests/changes-stream.rs b/tests/changes-stream.rs index 7b3564ab3..69d6141a8 100644 --- a/tests/changes-stream.rs +++ b/tests/changes-stream.rs @@ -1,9 +1,7 @@ -extern crate changes_stream; - use changes_stream::ChangesStream; #[test] fn test_new() { let db = String::from("https://replicate.npmjs.com/_changes"); - let mut changes = ChangesStream::new(db); + let _changes = ChangesStream::new(db); } From 9ee9fe33647d3e4213b7d9a39abad2415a1767d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 11 Jan 2021 12:49:28 +0100 Subject: [PATCH 02/38] Switch to reqwest for basic_auth support. Implement futures::Stream instead of handlers logic. Remove test case which didn't test anything. --- Cargo.toml | 6 +- examples/follower.rs | 21 ++-- src/event.rs | 24 ++++- src/lib.rs | 209 +++++++++++++++------------------------- tests/changes-stream.rs | 7 -- 5 files changed, 117 insertions(+), 150 deletions(-) delete mode 100644 tests/changes-stream.rs diff --git a/Cargo.toml b/Cargo.toml index 2de2de854..68850386f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,9 +14,9 @@ tokio = "1.0" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -hyper = { version = "0.14", features = ["client", "http1", "stream"] } -hyper-tls = "0.5" -tokio-native-tls = "0.3.0" +reqwest = { version = "0.11.0", features = ["stream"] } +futures-util = "0.3" +bytes = "1.0.0" [dev-dependencies] tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/examples/follower.rs b/examples/follower.rs index 347e02f19..7b9c1b670 100644 --- a/examples/follower.rs +++ b/examples/follower.rs @@ -1,11 +1,18 @@ -use changes_stream::ChangesStream; +use changes_stream::{ChangesStream, Event}; +use futures_util::stream::StreamExt; #[tokio::main] async fn main() { - let url = String::from("https://replicate.npmjs.com/_changes"); - let mut changes = ChangesStream::new(url); - changes.on(|change| { - println!("{}: {}", change.seq, change.id); - }); - changes.run().await; + let url = "https://replicate.npmjs.com/_changes".to_string(); + let mut changes = ChangesStream::new(url).await; + while let Some(event) = changes.next().await { + match event { + Event::Change(change) => { + println!("{}: {}", change.seq, change.id); + } + Event::Finished(finished) => { + println!("Finished: {}", finished.last_seq); + } + } + } } diff --git a/src/event.rs b/src/event.rs index 9118906c8..12210e685 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,10 +1,30 @@ use serde_derive::{Deserialize, Serialize}; +use serde_json::Value; #[derive(Serialize, Deserialize, Debug)] -pub struct Event { - pub seq: u64, +#[serde(untagged)] +pub enum Event { + Change(ChangeEvent), + Finished(FinishedEvent), +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ChangeEvent { + pub seq: Value, pub id: String, pub changes: Vec, + + #[serde(default)] + pub deleted: bool, + + #[serde(default)] + pub doc: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct FinishedEvent { + pub last_seq: Value, + pub pending: u64, } #[derive(Serialize, Deserialize, Debug)] diff --git a/src/lib.rs b/src/lib.rs index 4883f1bb8..8b9ba4eb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,34 +1,19 @@ -//! This is documentation for the `changes-stream` crate. -//! -//! The `changes-stream` crate is designed to give you a readable stream of -//! chunked data, upon which you can register multiple handlers, that are -//! called on Read of the data chunk. +//! The `changes-stream` crate is designed to give you a +//! futures::Stream of CouchDB changes stream events. -use hyper::{body::HttpBody, client::Client, Body}; -use hyper_tls::HttpsConnector; -use std::cell::RefCell; +use bytes::Bytes; +use futures_util::stream::Stream; +use std::{pin::Pin, task::Poll}; mod event; -use event::Event; +pub use event::Event; -const DELIMITER: &str = ",\n"; -const PROLOGUE: &str = "{\"results\":["; - -/// A structure to generate a readable stream on which you can register handlers. -/// -/// Internally, the `ChangesStream` struct holds 3 members: -/// -/// | Member | Type | Notes | -/// |-------------|---------------------------------------|-------------------------------------------------------------------------| -/// | `db` | `String` | A url pointing to the data you'd like to stream. | -/// | `lp` | [`tokio_core::reactor::Core`] | The event loop | -/// | `handlers` | `Vec where F: Fn(&`[`hyper::Chunk`]`)` | A vector of handlers to be called on each Chunk from the Stream on Read | -/// -/// [`tokio_core::reactor::Core`]: ../tokio_core/reactor/struct.Core.html -/// [`hyper::Chunk`]: ../hyper/struct.Chunk.html +/// A structure which implements futures::Stream pub struct ChangesStream { - db: hyper::Uri, - handlers: Vec>, + /// for incomplete line chunks + buffer: Vec, + /// Source of http chunks provided by reqwest + source: Pin>>>, } impl ChangesStream { @@ -37,129 +22,91 @@ impl ChangesStream { /// Takes a single argument, `db`, which represents the /// url of the data you wish to stream. /// - /// Every `ChangesStream` struct is initialized with - /// an empty vector of handlers. See above for more details. - /// /// For example, to create a new `ChangesStream` struct /// for the npmjs registry, you would write: /// /// ```no_run - /// # use changes_stream::ChangesStream; + /// # use changes_stream::{ChangesStream, Event}; + /// # use futures_util::stream::StreamExt; /// # /// # #[tokio::main] /// # async fn main() { - /// let url = "https://replicate.npmjs.com/_changes".to_string(); - /// let mut changes = ChangesStream::new(url); - /// # - /// # changes.on(|change| { - /// # let data = serde_json::to_string(change).unwrap(); - /// # println!("{}", data); - /// # }); - /// # - /// # changes.run().await; + /// # let url = "https://replicate.npmjs.com/_changes".to_string(); + /// # let mut changes = ChangesStream::new(url).await; + /// # while let Some(event) = changes.next().await { + /// # match event { + /// # Event::Change(change) => { + /// # println!("{}: {}", change.seq, change.id); + /// # } + /// # Event::Finished(finished) => { + /// # println!("Finished: {}", finished.last_seq); + /// # } + /// # } + /// # } /// # } /// ``` - pub fn new(db: String) -> ChangesStream { + pub async fn new(db: String) -> ChangesStream { + let res = reqwest::get(&db).await.unwrap(); + assert!(res.status().is_success()); + let source = Pin::new(Box::new(res.bytes_stream())); + ChangesStream { - db: db.parse().unwrap(), - handlers: vec![], + buffer: vec![], + source, } } +} - /// Registers a handler. A handler is simply a function - /// you'd like to call on a chunk from the stream at the - /// time the chunk is read. - /// - /// `.on()` takes a single argument, a closure. The - /// closure you pass should take a single [`hyper::Chunk`] - /// as an argument. - /// - /// [`hyper::Chunk`]: ../hyper/struct.Chunk.html - /// - /// For example, to write the data in a chunk to standard - /// out, you would write: - /// - /// ```no_run - /// # use changes_stream::ChangesStream; - /// # - /// # #[tokio::main] - /// # async fn main() { - /// # let url = "https://replicate.npmjs.com/_changes".to_string(); - /// # let mut changes = ChangesStream::new(url); - /// # - /// changes.on(|change| { - /// let data = serde_json::to_string(change).unwrap(); - /// println!("{}", data); - /// }); - /// # - /// # changes.run(); - /// # } - /// ``` - pub fn on(&mut self, handler: F) { - self.handlers.push(Box::new(handler)); - } - - /// Runs the `ChangesStream` struct's event loop. - /// - /// Call this after you have regsitered all handlers using - /// `on`. - /// - /// Takes no arguments. - /// - /// For example: - /// - /// ```no_run - /// # use changes_stream::ChangesStream; - /// # - /// # #[tokio::main] - /// # async fn main() { - /// # let url = "https://replicate.npmjs.com/_changes".to_string(); - /// # let mut changes = ChangesStream::new(url); - /// # - /// # changes.on(|change| { - /// # let data = serde_json::to_string(change).unwrap(); - /// # println!("{}", data); - /// # }); - /// # - /// changes.run(); - /// # } - /// ``` - pub async fn run(self) { - let client = Client::builder().build::<_, Body>(HttpsConnector::new()); - - let handlers = self.handlers; - let mut res = client.get(self.db).await.unwrap(); - assert!(res.status().is_success()); - - // Buffer up incomplete json lines. - let buffer: Vec = vec![]; - let buffer_cell = RefCell::new(buffer); +impl Stream for ChangesStream { + type Item = Event; - while let Some(Ok(chunk)) = res.body_mut().data().await { - if chunk.starts_with(PROLOGUE.as_bytes()) { - continue; - } - let mut source = chunk.to_vec(); - let mut borrowed = buffer_cell.borrow_mut(); - if borrowed.len() > 0 { - source = [borrowed.clone(), chunk.to_vec()].concat(); - borrowed.clear(); - } - if chunk.starts_with(DELIMITER.as_bytes()) { - source = chunk[2..].to_vec(); - } + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + loop { + let line_break_pos = self + .buffer + .iter() + .enumerate() + .find(|(_pos, b)| **b == 0x0A) // search for \n + .map(|(pos, _b)| pos); + if let Some(line_break_pos) = line_break_pos { + let mut line = self.buffer.drain(0..=line_break_pos).collect::>(); - match serde_json::from_slice(source.as_slice()) { - Err(_) => { - // We probably have an incomplete chunk of json. Buffer it & move on. - borrowed.append(&mut chunk.to_vec()); + if line.len() < 15 { + // skip prologue, epilogue and empty lines (continuous mode) + continue; + } + line.remove(line.len() - 1); // remove \n + if line[line.len() - 1] == 0x0D { + // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n" + line.remove(line.len() - 1); } - Ok(json) => { - let event: Event = json; - for handler in &handlers { - handler(&event); + if line[line.len() - 1] == 0x2C { + // 0x2C is ',' + line.remove(line.len() - 1); // remove , + } + match serde_json::from_slice(line.as_slice()) { + Err(err) => { + panic!( + "Error: {:?} \"{}\"", + err, + std::str::from_utf8(line.as_slice()).unwrap() + ); + } + Ok(json) => { + let event: Event = json; + return Poll::Ready(Some(event)); } } + } else { + match Stream::poll_next(self.source.as_mut(), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(Ok(chunk))) => self.buffer.append(&mut chunk.to_vec()), + Poll::Ready(Some(Err(err))) => panic!("Error getting next chunk: {:?}", err), + }; } } } diff --git a/tests/changes-stream.rs b/tests/changes-stream.rs deleted file mode 100644 index 69d6141a8..000000000 --- a/tests/changes-stream.rs +++ /dev/null @@ -1,7 +0,0 @@ -use changes_stream::ChangesStream; - -#[test] -fn test_new() { - let db = String::from("https://replicate.npmjs.com/_changes"); - let _changes = ChangesStream::new(db); -} From 7d8c887b360b1ee15b94bb5feb3a7e1d6e418407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 11 Jan 2021 13:07:38 +0100 Subject: [PATCH 03/38] error handling --- examples/follower.rs | 11 ++++------ src/error.rs | 6 ++++++ src/lib.rs | 48 +++++++++++++++++++++++--------------------- 3 files changed, 35 insertions(+), 30 deletions(-) create mode 100644 src/error.rs diff --git a/examples/follower.rs b/examples/follower.rs index 7b9c1b670..5a7a89b70 100644 --- a/examples/follower.rs +++ b/examples/follower.rs @@ -4,15 +4,12 @@ use futures_util::stream::StreamExt; #[tokio::main] async fn main() { let url = "https://replicate.npmjs.com/_changes".to_string(); - let mut changes = ChangesStream::new(url).await; + let mut changes = ChangesStream::new(url).await.unwrap(); while let Some(event) = changes.next().await { match event { - Event::Change(change) => { - println!("{}: {}", change.seq, change.id); - } - Event::Finished(finished) => { - println!("Finished: {}", finished.last_seq); - } + Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + Err(err) => println!("Error: {:?}", err), } } } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 000000000..1b2ec5a5b --- /dev/null +++ b/src/error.rs @@ -0,0 +1,6 @@ +#[derive(Debug)] +pub enum Error { + RequestFailed(reqwest::Error), + InvalidStatus(reqwest::StatusCode), + ParsingFailed(serde_json::Error, String), +} diff --git a/src/lib.rs b/src/lib.rs index 8b9ba4eb3..f39edd590 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,9 @@ use bytes::Bytes; use futures_util::stream::Stream; use std::{pin::Pin, task::Poll}; +mod error; mod event; +pub use error::Error; pub use event::Event; /// A structure which implements futures::Stream @@ -32,33 +34,33 @@ impl ChangesStream { /// # #[tokio::main] /// # async fn main() { /// # let url = "https://replicate.npmjs.com/_changes".to_string(); - /// # let mut changes = ChangesStream::new(url).await; + /// # let mut changes = ChangesStream::new(url).await.unwrap(); /// # while let Some(event) = changes.next().await { /// # match event { - /// # Event::Change(change) => { - /// # println!("{}: {}", change.seq, change.id); - /// # } - /// # Event::Finished(finished) => { - /// # println!("Finished: {}", finished.last_seq); - /// # } + /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + /// # Err(err) => println!("Error: {:?}", err), /// # } /// # } /// # } /// ``` - pub async fn new(db: String) -> ChangesStream { - let res = reqwest::get(&db).await.unwrap(); - assert!(res.status().is_success()); + pub async fn new(db: String) -> Result { + let res = reqwest::get(&db).await.map_err(Error::RequestFailed)?; + let status = res.status(); + if !status.is_success() { + return Err(Error::InvalidStatus(status)); + } let source = Pin::new(Box::new(res.bytes_stream())); - ChangesStream { + Ok(ChangesStream { buffer: vec![], source, - } + }) } } impl Stream for ChangesStream { - type Item = Event; + type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -87,19 +89,19 @@ impl Stream for ChangesStream { // 0x2C is ',' line.remove(line.len() - 1); // remove , } - match serde_json::from_slice(line.as_slice()) { - Err(err) => { - panic!( - "Error: {:?} \"{}\"", - err, - std::str::from_utf8(line.as_slice()).unwrap() - ); - } + + let result = match serde_json::from_slice(line.as_slice()) { + Err(err) => Err(Error::ParsingFailed( + err, + String::from_utf8(line).unwrap_or_default(), + )), Ok(json) => { let event: Event = json; - return Poll::Ready(Some(event)); + Ok(event) } - } + }; + + return Poll::Ready(Some(result)); } else { match Stream::poll_next(self.source.as_mut(), cx) { Poll::Pending => return Poll::Pending, From c53fc1a6bfef8ac8375e571ac5a7675bdd4cad37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 11 Jan 2021 14:18:53 +0100 Subject: [PATCH 04/38] update Readme --- README.md | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 7714bd77b..c21dee2f0 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. -this code reads in a readable stream from an endpoint and returns each chunk in JSON. +this code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [lib/event.rs](/lib/event.rs). ## usage @@ -20,15 +20,19 @@ changes-stream = "0.2" from [examples/follower.rs](/examples/follower.rs): ```rust -use changes_stream::ChangesStream; +use changes_stream::{ChangesStream, Event}; +use futures_util::stream::StreamExt; #[tokio::main] async fn main() { - let url = String::from("https://replicate.npmjs.com/_changes"); - let mut changes = ChangesStream::new(url); - changes.on(|change| { - println!("{}: {}", change.seq, change.id); - }); - changes.run().await; + let url = "https://replicate.npmjs.com/_changes".to_string(); + let mut changes = ChangesStream::new(url).await.unwrap(); + while let Some(event) = changes.next().await { + match event { + Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + Err(err) => println!("Error: {:?}", err), + } + } } ``` From 61a3525e7355115f53f44f0d9afd9d7cf3217bd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 11 Jan 2021 14:20:26 +0100 Subject: [PATCH 05/38] fix path --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c21dee2f0..de3af975e 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. -this code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [lib/event.rs](/lib/event.rs). +this code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [src/event.rs](/src/event.rs). ## usage From 2a6b1807921cd5c99b3271792ba7b2c37f7f600a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 11 Jan 2021 22:01:04 +0100 Subject: [PATCH 06/38] add Send to source to make the whole Stream Send --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index f39edd590..48512ddc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,7 +15,7 @@ pub struct ChangesStream { /// for incomplete line chunks buffer: Vec, /// Source of http chunks provided by reqwest - source: Pin>>>, + source: Pin> + Send>>, } impl ChangesStream { From a38c9f5d057ef097871f56dd059c0f9829386588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 14 Jan 2021 13:39:20 +0100 Subject: [PATCH 07/38] make types public --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 48512ddc0..34e3e4b18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ use std::{pin::Pin, task::Poll}; mod error; mod event; pub use error::Error; -pub use event::Event; +pub use event::{Change, ChangeEvent, Event, FinishedEvent}; /// A structure which implements futures::Stream pub struct ChangesStream { From 481158556cfe6f88a5d273bd0db7c45a0e7c6028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 14 Jan 2021 14:47:10 +0100 Subject: [PATCH 08/38] make pending optional --- src/event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/event.rs b/src/event.rs index 12210e685..24891c5cf 100644 --- a/src/event.rs +++ b/src/event.rs @@ -24,7 +24,7 @@ pub struct ChangeEvent { #[derive(Serialize, Deserialize, Debug)] pub struct FinishedEvent { pub last_seq: Value, - pub pending: u64, + pub pending: Option, // not available on CouchDB 1.0 } #[derive(Serialize, Deserialize, Debug)] From 607ac0545d78cf3fe4c0d86b14ac7fe03ec3b426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 18 Jan 2021 23:40:35 +0100 Subject: [PATCH 09/38] use Map for doc --- src/event.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event.rs b/src/event.rs index 24891c5cf..c7a998d7f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,5 @@ use serde_derive::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{Map, Value}; #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] @@ -18,7 +18,7 @@ pub struct ChangeEvent { pub deleted: bool, #[serde(default)] - pub doc: Option, + pub doc: Option>, } #[derive(Serialize, Deserialize, Debug)] From b8df6a9effea2b15addecc26b4b4f7a6eb769523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 11 Mar 2021 11:09:55 +0100 Subject: [PATCH 10/38] handle stream error --- Cargo.toml | 5 +++-- src/lib.rs | 6 +++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 68850386f..91f8e5ebe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,9 +14,10 @@ tokio = "1.0" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -reqwest = { version = "0.11.0", features = ["stream"] } +reqwest = { version = "0.11", features = ["stream"] } futures-util = "0.3" -bytes = "1.0.0" +bytes = "1.0" +log = "0.4" [dev-dependencies] tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/src/lib.rs b/src/lib.rs index 34e3e4b18..bb395c244 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ use bytes::Bytes; use futures_util::stream::Stream; +use log::error; use std::{pin::Pin, task::Poll}; mod error; @@ -107,7 +108,10 @@ impl Stream for ChangesStream { Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(Ok(chunk))) => self.buffer.append(&mut chunk.to_vec()), - Poll::Ready(Some(Err(err))) => panic!("Error getting next chunk: {:?}", err), + Poll::Ready(Some(Err(err))) => { + error!("Error getting next chunk: {:?}", err); + return Poll::Ready(None); + } }; } } From bac1567374cc4c7ad5f3bb231d25eaed15f327f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 15 Mar 2021 09:24:07 +0100 Subject: [PATCH 11/38] update dependencies --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 91f8e5ebe..6838c3145 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" edition = "2018" [dependencies] -tokio = "1.0" +tokio = "1.3" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" @@ -20,4 +20,4 @@ bytes = "1.0" log = "0.4" [dev-dependencies] -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.3", features = ["macros", "rt-multi-thread"] } From 9d3b11f44c5621b5163fc2f57cafac4d884da1ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 15 Mar 2021 09:25:43 +0100 Subject: [PATCH 12/38] rename to changes-stream2 --- Cargo.toml | 2 +- README.md | 7 ++----- examples/follower.rs | 2 +- src/lib.rs | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6838c3145..3d48a709e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "changes-stream" +name = "changes-stream2" description = "couchdb follower" version = "0.2.0" authors = [ diff --git a/README.md b/README.md index de3af975e..634361f39 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,5 @@ # changes-stream-rust -[![travis badge](https://travis-ci.org/ashleygwilliams/changes-stream-rust.svg?branch=master)](https://travis-ci.org/ashleygwilliams/changes-stream-rust) - - an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. this code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [src/event.rs](/src/event.rs). @@ -14,13 +11,13 @@ in your `Cargo.toml`: ```toml [dependencies] -changes-stream = "0.2" +changes-stream2 = "0.2" ``` from [examples/follower.rs](/examples/follower.rs): ```rust -use changes_stream::{ChangesStream, Event}; +use changes_stream2::{ChangesStream, Event}; use futures_util::stream::StreamExt; #[tokio::main] diff --git a/examples/follower.rs b/examples/follower.rs index 5a7a89b70..aa1c377f8 100644 --- a/examples/follower.rs +++ b/examples/follower.rs @@ -1,4 +1,4 @@ -use changes_stream::{ChangesStream, Event}; +use changes_stream2::{ChangesStream, Event}; use futures_util::stream::StreamExt; #[tokio::main] diff --git a/src/lib.rs b/src/lib.rs index bb395c244..b48873b75 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,7 +29,7 @@ impl ChangesStream { /// for the npmjs registry, you would write: /// /// ```no_run - /// # use changes_stream::{ChangesStream, Event}; + /// # use changes_stream2::{ChangesStream, Event}; /// # use futures_util::stream::StreamExt; /// # /// # #[tokio::main] From 7d1d3a56acf0dbd909adb85320bea1b6a4189319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 15 Mar 2021 09:30:02 +0100 Subject: [PATCH 13/38] add note, that this is a fork --- Cargo.toml | 2 +- README.md | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d48a709e..d275dfe15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.0" +version = "0.2.1" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/README.md b/README.md index 634361f39..aa2f0bb60 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ # changes-stream-rust -an implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. +Fork of https://github.com/ashleygwilliams/changes-stream-rust / https://crates.io/crates/changes-stream. -this code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [src/event.rs](/src/event.rs). +An implementation of [`changes-stream`](https://github.com/jcrugzz/changes-stream) in Rust. + +This code reads in a readable stream from an endpoint, parses each line and returns CouchDB changes events as defined in [src/event.rs](/src/event.rs). ## usage From 38dd74ec9bd78015a83e40ee060d4b66b1289820 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 23 Nov 2021 11:59:14 +0100 Subject: [PATCH 14/38] add features so we can switch to rustls --- Cargo.toml | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d275dfe15..fce72260c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.1" +version = "0.2.2" authors = [ "Ashley Williams ", "René Rössler ", ] +repository = "https://github.com/elwerene/changes-stream-rust.git" license = "MIT" edition = "2018" @@ -14,10 +15,16 @@ tokio = "1.3" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -reqwest = { version = "0.11", features = ["stream"] } +reqwest = { version = "0.11", default-features = false, features = ["stream"] } futures-util = "0.3" bytes = "1.0" log = "0.4" [dev-dependencies] tokio = { version = "1.3", features = ["macros", "rt-multi-thread"] } + +[features] + +default = ["native-tls"] +native-tls = ["reqwest/native-tls"] +rustls-tls = ["reqwest/rustls-tls"] From 79cf5a94b01c90f15fa9f65487dd978259fb6ee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Fri, 10 Dec 2021 13:00:30 +0100 Subject: [PATCH 15/38] continue searching for newline characters --- Cargo.toml | 2 +- src/lib.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fce72260c..44e9046d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.2" +version = "0.2.3" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/lib.rs b/src/lib.rs index b48873b75..1e55f29e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,8 @@ pub struct ChangesStream { buffer: Vec, /// Source of http chunks provided by reqwest source: Pin> + Send>>, + /// Search pos for newline + newline_search_pos: usize, } impl ChangesStream { @@ -56,6 +58,7 @@ impl ChangesStream { Ok(ChangesStream { buffer: vec![], source, + newline_search_pos: 0, }) } } @@ -71,11 +74,13 @@ impl Stream for ChangesStream { let line_break_pos = self .buffer .iter() + .skip(self.newline_search_pos) .enumerate() .find(|(_pos, b)| **b == 0x0A) // search for \n - .map(|(pos, _b)| pos); + .map(|(pos, _b)| self.newline_search_pos + pos); if let Some(line_break_pos) = line_break_pos { let mut line = self.buffer.drain(0..=line_break_pos).collect::>(); + self.newline_search_pos = 0; if line.len() < 15 { // skip prologue, epilogue and empty lines (continuous mode) @@ -104,6 +109,8 @@ impl Stream for ChangesStream { return Poll::Ready(Some(result)); } else { + self.newline_search_pos = self.buffer.len(); + match Stream::poll_next(self.source.as_mut(), cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(None), From b98bf7b78ceb6ea30d940142ab7a9bf45445d064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 6 Jan 2022 14:07:47 +0100 Subject: [PATCH 16/38] add badges --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index aa2f0bb60..47bf20c44 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,11 @@ -# changes-stream-rust +# changes_stream2 + +[![License](https://img.shields.io/badge/license-MIT-blue.svg)]( +https://github.com/elwerene/changes-stream-rust/blob/master/LICENSE) +[![Cargo](https://img.shields.io/crates/v/changes-stream2.svg)]( +https://crates.io/crates/changes-stream2) +[![Documentation](https://docs.rs/changes-stream2/badge.svg)]( +https://docs.rs/changes-stream2) Fork of https://github.com/ashleygwilliams/changes-stream-rust / https://crates.io/crates/changes-stream. From b9c4605cd726a84031d4b4bc069327228a42a000 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 30 Apr 2024 10:17:04 +0200 Subject: [PATCH 17/38] update reqwest, more readable buffer code --- Cargo.toml | 2 +- src/lib.rs | 28 ++++++++++------------------ 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 44e9046d7..acf828d0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ tokio = "1.3" serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -reqwest = { version = "0.11", default-features = false, features = ["stream"] } +reqwest = { version = "0.12", default-features = false, features = ["stream"] } futures-util = "0.3" bytes = "1.0" log = "0.4" diff --git a/src/lib.rs b/src/lib.rs index 1e55f29e1..fa1514aad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -75,9 +75,8 @@ impl Stream for ChangesStream { .buffer .iter() .skip(self.newline_search_pos) - .enumerate() - .find(|(_pos, b)| **b == 0x0A) // search for \n - .map(|(pos, _b)| self.newline_search_pos + pos); + .position(|b| *b == 0x0A) + .map(|pos| self.newline_search_pos + pos); if let Some(line_break_pos) = line_break_pos { let mut line = self.buffer.drain(0..=line_break_pos).collect::>(); self.newline_search_pos = 0; @@ -86,26 +85,19 @@ impl Stream for ChangesStream { // skip prologue, epilogue and empty lines (continuous mode) continue; } - line.remove(line.len() - 1); // remove \n - if line[line.len() - 1] == 0x0D { + line.pop(); // remove \n + if line.last() == Some(&0x0D) { // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n" - line.remove(line.len() - 1); + line.pop(); } - if line[line.len() - 1] == 0x2C { + if line.last() == Some(&0x2C) { // 0x2C is ',' - line.remove(line.len() - 1); // remove , + line.pop(); // remove , } - let result = match serde_json::from_slice(line.as_slice()) { - Err(err) => Err(Error::ParsingFailed( - err, - String::from_utf8(line).unwrap_or_default(), - )), - Ok(json) => { - let event: Event = json; - Ok(event) - } - }; + let result = serde_json::from_slice(line.as_slice()).map_err(|err| { + Error::ParsingFailed(err, String::from_utf8(line).unwrap_or_default()) + }); return Poll::Ready(Some(result)); } else { From c893fe51e71319b1a22b44cf6ba64b30a8664281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 30 Apr 2024 11:24:57 +0200 Subject: [PATCH 18/38] optimize away some copy operations --- src/lib.rs | 79 +++++++++++++++++++++++++++++------------------------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fa1514aad..fc2d205af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,10 @@ //! The `changes-stream` crate is designed to give you a //! futures::Stream of CouchDB changes stream events. -use bytes::Bytes; +use bytes::{buf::IntoIter, Bytes}; use futures_util::stream::Stream; use log::error; -use std::{pin::Pin, task::Poll}; +use std::{mem::replace, pin::Pin, task::Poll}; mod error; mod event; @@ -13,12 +13,10 @@ pub use event::{Change, ChangeEvent, Event, FinishedEvent}; /// A structure which implements futures::Stream pub struct ChangesStream { - /// for incomplete line chunks - buffer: Vec, /// Source of http chunks provided by reqwest source: Pin> + Send>>, - /// Search pos for newline - newline_search_pos: usize, + /// Buffer of current line and current chunk iterator + buf: (Vec, Option>), } impl ChangesStream { @@ -56,9 +54,8 @@ impl ChangesStream { let source = Pin::new(Box::new(res.bytes_stream())); Ok(ChangesStream { - buffer: vec![], source, - newline_search_pos: 0, + buf: (Vec::new(), None), }) } } @@ -70,48 +67,56 @@ impl Stream for ChangesStream { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - loop { - let line_break_pos = self - .buffer - .iter() - .skip(self.newline_search_pos) - .position(|b| *b == 0x0A) - .map(|pos| self.newline_search_pos + pos); - if let Some(line_break_pos) = line_break_pos { - let mut line = self.buffer.drain(0..=line_break_pos).collect::>(); - self.newline_search_pos = 0; + 'main: loop { + if self.buf.1.is_none() { + match Stream::poll_next(self.source.as_mut(), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(Ok(chunk))) => self.buf.1 = Some(chunk.into_iter()), + Poll::Ready(Some(Err(err))) => { + error!("Error getting next chunk: {:?}", err); + return Poll::Ready(None); + } + } + } else { + let (line, chunk_iter) = &mut self.buf; + let iter = chunk_iter.as_mut().unwrap(); + + loop { + if let Some(byte) = iter.next() { + if byte == 0x0A { + // Found '\n', end of line + break; + } + line.push(byte); + } else { + // We need another chunk to fill the line + *chunk_iter = None; + continue 'main; + } + } - if line.len() < 15 { + let line = replace(line, Vec::with_capacity(line.len() * 2)); + if line.len() < 14 { // skip prologue, epilogue and empty lines (continuous mode) continue; } - line.pop(); // remove \n - if line.last() == Some(&0x0D) { + + let mut len = line.len(); + if line[len - 1] == 0x0D { // 0x0D is '\r'. CouchDB >= 2.0 sends "\r\n" - line.pop(); + len -= 1; } - if line.last() == Some(&0x2C) { + if line[len - 1] == 0x2C { // 0x2C is ',' - line.pop(); // remove , + len -= 1; } - let result = serde_json::from_slice(line.as_slice()).map_err(|err| { + let result = serde_json::from_slice(&line[..len]).map_err(|err| { Error::ParsingFailed(err, String::from_utf8(line).unwrap_or_default()) }); return Poll::Ready(Some(result)); - } else { - self.newline_search_pos = self.buffer.len(); - - match Stream::poll_next(self.source.as_mut(), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(None), - Poll::Ready(Some(Ok(chunk))) => self.buffer.append(&mut chunk.to_vec()), - Poll::Ready(Some(Err(err))) => { - error!("Error getting next chunk: {:?}", err); - return Poll::Ready(None); - } - }; } } } From 4e11a7afcfce6647ef2dbf06162d7d9339748529 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 30 Apr 2024 11:48:05 +0200 Subject: [PATCH 19/38] update edition, dependency versions and bump version --- Cargo.toml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index acf828d0c..77990e7b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,27 +1,27 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.3" +version = "0.2.4" authors = [ "Ashley Williams ", "René Rössler ", ] repository = "https://github.com/elwerene/changes-stream-rust.git" license = "MIT" -edition = "2018" +edition = "2021" [dependencies] -tokio = "1.3" -serde = "1.0" -serde_json = "1.0" -serde_derive = "1.0" +tokio = "1" +serde = "1" +serde_json = "1" +serde_derive = "1" reqwest = { version = "0.12", default-features = false, features = ["stream"] } futures-util = "0.3" -bytes = "1.0" +bytes = "1" log = "0.4" [dev-dependencies] -tokio = { version = "1.3", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [features] From 80aafe0632e67bd7b3dbda86456b9b0a655558e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 7 May 2024 11:41:50 +0200 Subject: [PATCH 20/38] support to deserialize a change doc as serde_json::value::RawValue --- Cargo.toml | 16 ++++++++-------- src/event.rs | 7 +++++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 77990e7b7..9a14367cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.4" +version = "0.2.5" authors = [ "Ashley Williams ", "René Rössler ", @@ -11,20 +11,20 @@ license = "MIT" edition = "2021" [dependencies] -tokio = "1" -serde = "1" -serde_json = "1" -serde_derive = "1" -reqwest = { version = "0.12", default-features = false, features = ["stream"] } -futures-util = "0.3" bytes = "1" +futures-util = "0.3" +serde_json = "1" log = "0.4" +reqwest = { version = "0.12", default-features = false, features = ["stream"] } +serde = "1" +serde_derive = "1" +tokio = "1" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [features] - default = ["native-tls"] +raw_value_doc = ["serde_json/raw_value"] native-tls = ["reqwest/native-tls"] rustls-tls = ["reqwest/rustls-tls"] diff --git a/src/event.rs b/src/event.rs index c7a998d7f..c81bae294 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,5 @@ use serde_derive::{Deserialize, Serialize}; -use serde_json::{Map, Value}; +use serde_json::Value; #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] @@ -18,7 +18,10 @@ pub struct ChangeEvent { pub deleted: bool, #[serde(default)] - pub doc: Option>, + #[cfg(feature = "raw_value_doc")] + pub doc: Option>, + #[cfg(not(feature = "raw_value_doc"))] + pub doc: Option>, } #[derive(Serialize, Deserialize, Debug)] From c02d63b665533b6ff7a6e3e7a6b12582cec17725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 7 May 2024 12:12:42 +0200 Subject: [PATCH 21/38] add metrics support (optional) --- Cargo.toml | 12 ++++++++++- src/error.rs | 1 + src/lib.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9a14367cf..f6fc03815 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.5" +version = "0.2.6" authors = [ "Ashley Williams ", "René Rössler ", @@ -15,16 +15,26 @@ bytes = "1" futures-util = "0.3" serde_json = "1" log = "0.4" +metrics = { version = "0.22", optional = true } +regex = { version = "1", optional = true } reqwest = { version = "0.12", default-features = false, features = ["stream"] } serde = "1" serde_derive = "1" tokio = "1" +url = "2" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [features] default = ["native-tls"] + +# change.doc as serde_json::value::RawValue raw_value_doc = ["serde_json/raw_value"] + +# metrics +metrics = ["dep:metrics", "dep:regex"] + +# tls library selection native-tls = ["reqwest/native-tls"] rustls-tls = ["reqwest/rustls-tls"] diff --git a/src/error.rs b/src/error.rs index 1b2ec5a5b..426d509d1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,5 +1,6 @@ #[derive(Debug)] pub enum Error { + InvalidUrl(url::ParseError), RequestFailed(reqwest::Error), InvalidStatus(reqwest::StatusCode), ParsingFailed(serde_json::Error, String), diff --git a/src/lib.rs b/src/lib.rs index fc2d205af..c2263f09a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,12 @@ pub use event::{Change, ChangeEvent, Event, FinishedEvent}; /// A structure which implements futures::Stream pub struct ChangesStream { + /// metrics bytes counter + #[cfg(feature = "metrics")] + bytes: metrics::Counter, + /// metrics entries counter + #[cfg(feature = "metrics")] + entries: metrics::Counter, /// Source of http chunks provided by reqwest source: Pin> + Send>>, /// Buffer of current line and current chunk iterator @@ -45,15 +51,49 @@ impl ChangesStream { /// # } /// # } /// ``` - pub async fn new(db: String) -> Result { - let res = reqwest::get(&db).await.map_err(Error::RequestFailed)?; + pub async fn new(db: String) -> Result { + let url = url::Url::parse(&db).map_err(Error::InvalidUrl)?; + #[cfg(feature = "metrics")] + let metrics_prefix = regex::Regex::new(r"(?m)[_/]+") + .unwrap() + .replace_all( + &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()), + "_", + ) + .to_string(); + + let res = reqwest::get(url).await.map_err(Error::RequestFailed)?; let status = res.status(); if !status.is_success() { return Err(Error::InvalidStatus(status)); } let source = Pin::new(Box::new(res.bytes_stream())); - Ok(ChangesStream { + #[cfg(feature = "metrics")] + let (bytes, entries) = { + let bytes_name = format!("{}_bytes", metrics_prefix); + let entries_name = format!("{}_entries", metrics_prefix); + metrics::describe_counter!( + bytes_name.clone(), + metrics::Unit::Bytes, + "Changes stream bytes" + ); + metrics::describe_counter!( + entries_name.clone(), + metrics::Unit::Count, + "Changes stream entries" + ); + ( + metrics::counter!(bytes_name), + metrics::counter!(entries_name), + ) + }; + + Ok(Self { + #[cfg(feature = "metrics")] + bytes, + #[cfg(feature = "metrics")] + entries, source, buf: (Vec::new(), None), }) @@ -72,7 +112,12 @@ impl Stream for ChangesStream { match Stream::poll_next(self.source.as_mut(), cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => return Poll::Ready(None), - Poll::Ready(Some(Ok(chunk))) => self.buf.1 = Some(chunk.into_iter()), + Poll::Ready(Some(Ok(chunk))) => { + #[cfg(feature = "metrics")] + self.bytes.increment(chunk.len() as u64); + + self.buf.1 = Some(chunk.into_iter()) + } Poll::Ready(Some(Err(err))) => { error!("Error getting next chunk: {:?}", err); return Poll::Ready(None); @@ -116,6 +161,9 @@ impl Stream for ChangesStream { Error::ParsingFailed(err, String::from_utf8(line).unwrap_or_default()) }); + #[cfg(feature = "metrics")] + self.entries.increment(1); + return Poll::Ready(Some(result)); } } From b09833f8bb9875ebf1a87e2dfb991cfd0fdd59bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 7 May 2024 12:18:02 +0200 Subject: [PATCH 22/38] add features overview to Readme --- Cargo.toml | 2 +- README.md | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index f6fc03815..cb366fbf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.6" +version = "0.2.7" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/README.md b/README.md index 47bf20c44..0597e29be 100644 --- a/README.md +++ b/README.md @@ -42,3 +42,23 @@ async fn main() { } } ``` + +## features + +### metrics + +Enables metric collection of the changes stream as counter values. The common prefix is generated from the host and path of the url. The metrics are: + * `{prefix}_bytes`: Total bytes read from the changes stream + * `{prefix}_entries`: Total parsed change entries + +### raw_value_doc + +Changes the type of ChangeEvent::Doc from `serde_json::Map` to `serde_json::value::RawValue`. + +### native-tls + +Use the native-tls crate for TLS connections. This is the default. + +### rustls-tls + +Use the rustls crate for TLS connections. \ No newline at end of file From 0c6bb86ede690d82dcf6f5ec260679d6270212a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Wed, 8 May 2024 09:31:41 +0200 Subject: [PATCH 23/38] expect a change event to optimize parsing a bit --- Cargo.toml | 2 +- src/event.rs | 1 - src/lib.rs | 15 ++++++++++++--- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cb366fbf2..6ead3fd75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.7" +version = "0.2.8" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/event.rs b/src/event.rs index c81bae294..83824cd40 100644 --- a/src/event.rs +++ b/src/event.rs @@ -2,7 +2,6 @@ use serde_derive::{Deserialize, Serialize}; use serde_json::Value; #[derive(Serialize, Deserialize, Debug)] -#[serde(untagged)] pub enum Event { Change(ChangeEvent), Finished(FinishedEvent), diff --git a/src/lib.rs b/src/lib.rs index c2263f09a..8f5fc3312 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -157,9 +157,18 @@ impl Stream for ChangesStream { len -= 1; } - let result = serde_json::from_slice(&line[..len]).map_err(|err| { - Error::ParsingFailed(err, String::from_utf8(line).unwrap_or_default()) - }); + let result = serde_json::from_slice::(&line[..len]) + .map(Event::Change) + .or_else(|err| { + serde_json::from_slice::(&line[..len]) + .map(Event::Finished) + .map_err(|_err| { + Error::ParsingFailed( + err, + String::from_utf8(line).unwrap_or_default(), + ) + }) + }); #[cfg(feature = "metrics")] self.entries.increment(1); From 0204114224d85676d232cbb1c9684063d85f719b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Wed, 8 May 2024 11:11:18 +0200 Subject: [PATCH 24/38] add _total postfix --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8f5fc3312..b22e285c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,8 +71,8 @@ impl ChangesStream { #[cfg(feature = "metrics")] let (bytes, entries) = { - let bytes_name = format!("{}_bytes", metrics_prefix); - let entries_name = format!("{}_entries", metrics_prefix); + let bytes_name = format!("{}_bytes_total", metrics_prefix); + let entries_name = format!("{}_entries_total", metrics_prefix); metrics::describe_counter!( bytes_name.clone(), metrics::Unit::Bytes, From ed3add64701c145e4d7e2fc00e2eb3361bf2e9ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Wed, 8 May 2024 11:11:28 +0200 Subject: [PATCH 25/38] bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6ead3fd75..a513d792e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.8" +version = "0.2.9" authors = [ "Ashley Williams ", "René Rössler ", From a9c4453c38daf5e283c97bdaf519df246a7d0495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 14 May 2024 10:50:09 +0200 Subject: [PATCH 26/38] change error to use thiserror --- Cargo.toml | 3 ++- src/error.rs | 18 ++++++++++++++---- src/event.rs | 4 ++-- src/lib.rs | 14 ++++++-------- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a513d792e..e20678baa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.9" +version = "0.2.10" authors = [ "Ashley Williams ", "René Rössler ", @@ -20,6 +20,7 @@ regex = { version = "1", optional = true } reqwest = { version = "0.12", default-features = false, features = ["stream"] } serde = "1" serde_derive = "1" +thiserror = "1" tokio = "1" url = "2" diff --git a/src/error.rs b/src/error.rs index 426d509d1..8abd15eef 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,17 @@ -#[derive(Debug)] +use thiserror::Error; + +#[derive(Error, Debug)] pub enum Error { - InvalidUrl(url::ParseError), - RequestFailed(reqwest::Error), + #[error("Could not parse the url")] + InvalidUrl(#[from] url::ParseError), + #[error("Request failed")] + RequestFailed(#[from] reqwest::Error), + #[error("Server answered with non-ok status: {0}")] InvalidStatus(reqwest::StatusCode), - ParsingFailed(serde_json::Error, String), + #[error("Could not parse server response: {json}")] + ParsingFailed { + #[source] + error: serde_json::Error, + json: String, + }, } diff --git a/src/event.rs b/src/event.rs index 83824cd40..c319745c8 100644 --- a/src/event.rs +++ b/src/event.rs @@ -7,7 +7,7 @@ pub enum Event { Finished(FinishedEvent), } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ChangeEvent { pub seq: Value, pub id: String, @@ -29,7 +29,7 @@ pub struct FinishedEvent { pub pending: Option, // not available on CouchDB 1.0 } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Change { pub rev: String, } diff --git a/src/lib.rs b/src/lib.rs index b22e285c9..3088fdaae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,7 +52,7 @@ impl ChangesStream { /// # } /// ``` pub async fn new(db: String) -> Result { - let url = url::Url::parse(&db).map_err(Error::InvalidUrl)?; + let url = url::Url::parse(&db)?; #[cfg(feature = "metrics")] let metrics_prefix = regex::Regex::new(r"(?m)[_/]+") .unwrap() @@ -62,7 +62,7 @@ impl ChangesStream { ) .to_string(); - let res = reqwest::get(url).await.map_err(Error::RequestFailed)?; + let res = reqwest::get(url).await?; let status = res.status(); if !status.is_success() { return Err(Error::InvalidStatus(status)); @@ -159,14 +159,12 @@ impl Stream for ChangesStream { let result = serde_json::from_slice::(&line[..len]) .map(Event::Change) - .or_else(|err| { + .or_else(|error| { serde_json::from_slice::(&line[..len]) .map(Event::Finished) - .map_err(|_err| { - Error::ParsingFailed( - err, - String::from_utf8(line).unwrap_or_default(), - ) + .map_err(|_err| Error::ParsingFailed { + error, + json: String::from_utf8(line).unwrap_or_default(), }) }); From 98ffe1c21173338b1493638cd701f010a03b3d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 21 May 2024 11:05:29 +0200 Subject: [PATCH 27/38] support for post request bodys to support _selector filters. Switches from get to post which should work the same according to couchdb documentation. --- Cargo.toml | 7 +++++-- src/lib.rs | 41 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e20678baa..7b3dfce5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.10" +version = "0.2.11" authors = [ "Ashley Williams ", "René Rössler ", @@ -17,7 +17,10 @@ serde_json = "1" log = "0.4" metrics = { version = "0.22", optional = true } regex = { version = "1", optional = true } -reqwest = { version = "0.12", default-features = false, features = ["stream"] } +reqwest = { version = "0.12", default-features = false, features = [ + "stream", + "json", +] } serde = "1" serde_derive = "1" thiserror = "1" diff --git a/src/lib.rs b/src/lib.rs index 3088fdaae..3504efe73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ use bytes::{buf::IntoIter, Bytes}; use futures_util::stream::Stream; use log::error; +use serde_json::Value; use std::{mem::replace, pin::Pin, task::Poll}; mod error; @@ -26,7 +27,7 @@ pub struct ChangesStream { } impl ChangesStream { - /// Constructs a new `ChangesStream` struct + /// Constructs a new `ChangesStream` struct with a post payload /// /// Takes a single argument, `db`, which represents the /// url of the data you wish to stream. @@ -40,8 +41,8 @@ impl ChangesStream { /// # /// # #[tokio::main] /// # async fn main() { - /// # let url = "https://replicate.npmjs.com/_changes".to_string(); - /// # let mut changes = ChangesStream::new(url).await.unwrap(); + /// # let url = "https://replicate.npmjs.com/_changes?filter=_selector".to_string(); + /// # let mut changes = ChangesStream::with_post_payload(url, &serde_json::json!({"selector": { "_id": { "$regex": "^_design/" }}})).await.unwrap(); /// # while let Some(event) = changes.next().await { /// # match event { /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), @@ -51,7 +52,7 @@ impl ChangesStream { /// # } /// # } /// ``` - pub async fn new(db: String) -> Result { + pub async fn with_post_payload(db: String, payload: &Value) -> Result { let url = url::Url::parse(&db)?; #[cfg(feature = "metrics")] let metrics_prefix = regex::Regex::new(r"(?m)[_/]+") @@ -62,7 +63,8 @@ impl ChangesStream { ) .to_string(); - let res = reqwest::get(url).await?; + let client = reqwest::Client::new(); + let res = client.post(url).json(payload).send().await?; let status = res.status(); if !status.is_success() { return Err(Error::InvalidStatus(status)); @@ -98,6 +100,35 @@ impl ChangesStream { buf: (Vec::new(), None), }) } + + /// Constructs a new `ChangesStream` struct + /// + /// Takes a single argument, `db`, which represents the + /// url of the data you wish to stream. + /// + /// For example, to create a new `ChangesStream` struct + /// for the npmjs registry, you would write: + /// + /// ```no_run + /// # use changes_stream2::{ChangesStream, Event}; + /// # use futures_util::stream::StreamExt; + /// # + /// # #[tokio::main] + /// # async fn main() { + /// # let url = "https://replicate.npmjs.com/_changes".to_string(); + /// # let mut changes = ChangesStream::new(url).await.unwrap(); + /// # while let Some(event) = changes.next().await { + /// # match event { + /// # Ok(Event::Change(change)) => println!("Change ({}): {}", change.seq, change.id), + /// # Ok(Event::Finished(finished)) => println!("Finished: {}", finished.last_seq), + /// # Err(err) => println!("Error: {:?}", err), + /// # } + /// # } + /// # } + /// ``` + pub async fn new(db: String) -> Result { + Self::with_post_payload(db, &serde_json::json!({})).await + } } impl Stream for ChangesStream { From c7324d852ed6688fcf841e037e236576a18fd4b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 21 May 2024 11:07:07 +0200 Subject: [PATCH 28/38] prefix metrics names with "couchdb_changes_" --- Cargo.toml | 2 +- src/lib.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7b3dfce5f..5c4ef68e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.11" +version = "0.2.12" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/lib.rs b/src/lib.rs index 3504efe73..9b88de8bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,8 +73,8 @@ impl ChangesStream { #[cfg(feature = "metrics")] let (bytes, entries) = { - let bytes_name = format!("{}_bytes_total", metrics_prefix); - let entries_name = format!("{}_entries_total", metrics_prefix); + let bytes_name = format!("couchdb_changes_{}_bytes_total", metrics_prefix); + let entries_name = format!("couchdb_changes_{}_entries_total", metrics_prefix); metrics::describe_counter!( bytes_name.clone(), metrics::Unit::Bytes, From a1e99b671807a490e1540a88f8e280307a6eeb01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Tue, 21 May 2024 11:10:47 +0200 Subject: [PATCH 29/38] fix Readme --- Cargo.toml | 2 +- README.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5c4ef68e2..77f42d073 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.12" +version = "0.2.13" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/README.md b/README.md index 0597e29be..e01455f7a 100644 --- a/README.md +++ b/README.md @@ -47,9 +47,9 @@ async fn main() { ### metrics -Enables metric collection of the changes stream as counter values. The common prefix is generated from the host and path of the url. The metrics are: - * `{prefix}_bytes`: Total bytes read from the changes stream - * `{prefix}_entries`: Total parsed change entries +Enables metrics collection of the changes stream as counter values. The name is generated from the host and path of the url(database name). The metrics are: + * `couchdb_changes_{name}_bytes`: Total bytes read from the changes stream + * `couchdb_changes_{name}_entries`: Total parsed change entries ### raw_value_doc From dfa461d6bfe0d56c59f47c5bfa4f7c68323bec5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 6 Jun 2024 23:18:25 +0200 Subject: [PATCH 30/38] update metrics --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 77f42d073..a5db08451 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.13" +version = "0.2.14" authors = [ "Ashley Williams ", "René Rössler ", @@ -15,7 +15,7 @@ bytes = "1" futures-util = "0.3" serde_json = "1" log = "0.4" -metrics = { version = "0.22", optional = true } +metrics = { version = "0.23", optional = true } regex = { version = "1", optional = true } reqwest = { version = "0.12", default-features = false, features = [ "stream", From 1c681f6cdb6f39923e968136b73265b8ba26ac45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Fri, 7 Jun 2024 08:49:22 +0200 Subject: [PATCH 31/38] change how metrics are reported --- Cargo.toml | 2 +- src/lib.rs | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a5db08451..ac4677014 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.14" +version = "0.2.15" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/lib.rs b/src/lib.rs index 9b88de8bf..c8f38708f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,7 @@ impl ChangesStream { pub async fn with_post_payload(db: String, payload: &Value) -> Result { let url = url::Url::parse(&db)?; #[cfg(feature = "metrics")] - let metrics_prefix = regex::Regex::new(r"(?m)[_/]+") + let database = regex::Regex::new(r"(?m)[_/]+") .unwrap() .replace_all( &format!("{}_{}", url.host_str().unwrap_or_default(), url.path()), @@ -73,21 +73,17 @@ impl ChangesStream { #[cfg(feature = "metrics")] let (bytes, entries) = { - let bytes_name = format!("couchdb_changes_{}_bytes_total", metrics_prefix); - let entries_name = format!("couchdb_changes_{}_entries_total", metrics_prefix); + let bytes_name = "couchdb_changes_bytes_total"; + let entries_name = "couchdb_changes_entries_total"; + metrics::describe_counter!(bytes_name, metrics::Unit::Bytes, "Changes stream bytes"); metrics::describe_counter!( - bytes_name.clone(), - metrics::Unit::Bytes, - "Changes stream bytes" - ); - metrics::describe_counter!( - entries_name.clone(), + entries_name, metrics::Unit::Count, "Changes stream entries" ); ( - metrics::counter!(bytes_name), - metrics::counter!(entries_name), + metrics::counter!(bytes_name, "database" => database), + metrics::counter!(entries_name, "database" => database), ) }; From 30f5ab8365fcf4cc5e5ac0770b43cff5a19d7fe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Fri, 7 Jun 2024 08:51:25 +0200 Subject: [PATCH 32/38] fix metrics --- Cargo.toml | 2 +- src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ac4677014..46654315d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.15" +version = "0.2.16" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/lib.rs b/src/lib.rs index c8f38708f..0feaf95bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,7 +82,7 @@ impl ChangesStream { "Changes stream entries" ); ( - metrics::counter!(bytes_name, "database" => database), + metrics::counter!(bytes_name, "database" => database.clone()), metrics::counter!(entries_name, "database" => database), ) }; From a275b572e502773183d1c7eca002a1a0847aa2c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 8 Jul 2024 10:16:34 +0200 Subject: [PATCH 33/38] change error response to include body --- Cargo.toml | 2 +- src/error.rs | 7 +++++-- src/lib.rs | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46654315d..ccafbd736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.16" +version = "0.2.17" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/error.rs b/src/error.rs index 8abd15eef..a8ff5514e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,8 +6,11 @@ pub enum Error { InvalidUrl(#[from] url::ParseError), #[error("Request failed")] RequestFailed(#[from] reqwest::Error), - #[error("Server answered with non-ok status: {0}")] - InvalidStatus(reqwest::StatusCode), + #[error("Server answered with non-ok status: {status}")] + InvalidResponse { + status: reqwest::StatusCode, + body: String, + }, #[error("Could not parse server response: {json}")] ParsingFailed { #[source] diff --git a/src/lib.rs b/src/lib.rs index 0feaf95bd..02e07545c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,7 +67,8 @@ impl ChangesStream { let res = client.post(url).json(payload).send().await?; let status = res.status(); if !status.is_success() { - return Err(Error::InvalidStatus(status)); + let body = res.text().await.unwrap_or_default(); + return Err(Error::InvalidResponse { status, body }); } let source = Pin::new(Box::new(res.bytes_stream())); From aa8ded5c2ebd2cc75385cffad2d0c3397ddb2fd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Mon, 8 Jul 2024 10:18:13 +0200 Subject: [PATCH 34/38] add body to error message --- Cargo.toml | 2 +- src/error.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ccafbd736..5030983a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.17" +version = "0.2.18" authors = [ "Ashley Williams ", "René Rössler ", diff --git a/src/error.rs b/src/error.rs index a8ff5514e..e0a6a25d2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,7 @@ pub enum Error { InvalidUrl(#[from] url::ParseError), #[error("Request failed")] RequestFailed(#[from] reqwest::Error), - #[error("Server answered with non-ok status: {status}")] + #[error("Server answered with non-ok status: {status}. body: {body}")] InvalidResponse { status: reqwest::StatusCode, body: String, From a28febf91b6a2786f1810624f26ab61252d7f181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 21 Nov 2024 08:11:55 +0100 Subject: [PATCH 35/38] update dependencies --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5030983a2..43544fe21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ bytes = "1" futures-util = "0.3" serde_json = "1" log = "0.4" -metrics = { version = "0.23", optional = true } +metrics = { version = "0.24", optional = true } regex = { version = "1", optional = true } reqwest = { version = "0.12", default-features = false, features = [ "stream", @@ -23,7 +23,7 @@ reqwest = { version = "0.12", default-features = false, features = [ ] } serde = "1" serde_derive = "1" -thiserror = "1" +thiserror = "2" tokio = "1" url = "2" From 963d89373625e113632a6af3641dbd3895ee39e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Thu, 21 Nov 2024 08:12:43 +0100 Subject: [PATCH 36/38] new version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 43544fe21..ae1eea33c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.18" +version = "0.2.19" authors = [ "Ashley Williams ", "René Rössler ", From 2859b5d50bfe73c5991fe19e0d3989cc1ffcd5f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Fri, 19 Sep 2025 09:37:21 +0200 Subject: [PATCH 37/38] add a default connect and read timeout of 1 minute --- src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 02e07545c..e3d117791 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ use bytes::{buf::IntoIter, Bytes}; use futures_util::stream::Stream; use log::error; use serde_json::Value; -use std::{mem::replace, pin::Pin, task::Poll}; +use std::{mem::replace, pin::Pin, task::Poll, time::Duration}; mod error; mod event; @@ -63,7 +63,10 @@ impl ChangesStream { ) .to_string(); - let client = reqwest::Client::new(); + let client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(60)) + .read_timeout(Duration::from_secs(60)) + .build()?; let res = client.post(url).json(payload).send().await?; let status = res.status(); if !status.is_success() { From 4c72064ab53916c2e853d7e9716b21a305f90eeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20R=C3=B6ssler?= Date: Fri, 19 Sep 2025 09:37:44 +0200 Subject: [PATCH 38/38] bump version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ae1eea33c..cfb6f9840 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "changes-stream2" description = "couchdb follower" -version = "0.2.19" +version = "0.2.20" authors = [ "Ashley Williams ", "René Rössler ",