diff --git a/Cargo.lock b/Cargo.lock index c382aa3d..00feebe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,7 +205,7 @@ dependencies = [ "rand 0.8.5", "regex", "ring", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pki-types", "rustls-webpki 0.102.8", "serde", @@ -650,6 +650,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-link", ] @@ -790,6 +791,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1724,6 +1735,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" + [[package]] name = "hybrid-array" version = "0.4.7" @@ -1766,6 +1783,7 @@ dependencies = [ "hyper", "hyper-util", "rustls", + "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", "tokio-rustls", @@ -1999,6 +2017,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.17" @@ -2444,6 +2471,22 @@ dependencies = [ "uuid", ] +[[package]] +name = "nvisy-object" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes", + "derive_more", + "futures", + "object_store", + "schemars", + "serde", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "nvisy-postgres" version = "0.1.0" @@ -2554,6 +2597,44 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbfbfff40aeccab00ec8a910b57ca8ecf4319b335c542f2edcd19dd25a1e2a00" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "form_urlencoded", + "futures", + "http", + "http-body-util", + "httparse", + "humantime", + "hyper", + "itertools 0.14.0", + "md-5", + "parking_lot", + "percent-encoding", + "quick-xml", + "rand 0.9.2", + "reqwest", + "ring", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", + "walkdir", + "wasm-bindgen-futures", + "web-time", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -2578,6 +2659,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-probe" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" + [[package]] name = "parking_lot" version = "0.12.5" @@ -2875,6 +2962,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick-xml" +version = "0.38.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66c2058c55a409d601666cffe35f04333cf1013010882cec174a7467cd4e21c" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.9" @@ -3089,6 +3186,8 @@ dependencies = [ "base64", "bytes", "futures-core", + "futures-util", + "h2", "http", "http-body", "http-body-util", @@ -3101,6 +3200,7 @@ dependencies = [ "pin-project-lite", "quinn", "rustls", + "rustls-native-certs 0.8.3", "rustls-pki-types", "serde", "serde_json", @@ -3108,12 +3208,14 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 1.0.5", ] @@ -3211,11 +3313,23 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" dependencies = [ - "openssl-probe", + "openssl-probe 0.1.6", "rustls-pemfile", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 2.11.1", +] + +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe 0.2.1", + "rustls-pki-types", + "schannel", + "security-framework 3.5.1", ] [[package]] @@ -3271,6 +3385,15 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -3336,7 +3459,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags", - "core-foundation", + "core-foundation 0.9.4", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" +dependencies = [ + "bitflags", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -4301,6 +4437,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4433,6 +4579,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.244.0" @@ -4494,6 +4653,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -4971,7 +5139,7 @@ dependencies = [ "chrono", "derive_builder", "fancy-regex", - "itertools", + "itertools 0.13.0", "lazy_static", "regex", "serde", diff --git a/Cargo.toml b/Cargo.toml index f56f59f7..ceb91319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ resolver = "2" members = [ "./crates/nvisy-cli", "./crates/nvisy-nats", + "./crates/nvisy-object", "./crates/nvisy-postgres", "./crates/nvisy-server", "./crates/nvisy-webhook", @@ -30,6 +31,7 @@ documentation = "https://docs.rs/nvisy-server" # Internal crates nvisy-nats = { path = "./crates/nvisy-nats", version = "0.1.0" } +nvisy-object = { path = "./crates/nvisy-object", version = "0.1.0" } nvisy-postgres = { path = "./crates/nvisy-postgres", version = "0.1.0" } nvisy-server = { path = "./crates/nvisy-server", version = "0.1.0" } nvisy-webhook = { path = "./crates/nvisy-webhook", version = "0.1.0" } @@ -95,6 +97,9 @@ pgtrgm = { version = "0.4", features = [] } # Messaging and object storage (NATS) async-nats = { version = "0.46", features = [] } +# Cloud object storage +object_store = { version = "0.12", features = [] } + # Observability tracing = { version = "0.1", features = [] } tracing-subscriber = { version = "0.3", features = [] } diff --git a/README.md b/README.md index 4ea965e6..fdd31ee0 100644 --- a/README.md +++ b/README.md @@ -2,24 +2,23 @@ [![Build](https://img.shields.io/github/actions/workflow/status/nvisycom/server/build.yml?branch=main&label=build%20%26%20test&style=flat-square)](https://github.com/nvisycom/server/actions/workflows/build.yml) -Open-source ETL platform for building intelligent data pipelines with pluggable -sources, AI-powered transforms, and configurable sinks. +Open-source multimodal redaction API. Detect and redact PII and sensitive data +across documents, images, audio, and video. ## Features -- **Workflow Pipelines** — Declarative DAG-based workflows compiled to optimized execution graphs -- **Pluggable Providers** — Uniform interface for databases, object stores, vector DBs, and more -- **AI-Native Transforms** — Extraction, enrichment, embedding, entity resolution, and analysis as pipeline nodes -- **Resumable Streaming** — Incremental processing with per-item pagination context -- **Encrypted Connections** — Workspace-isolated credential encryption with HKDF-derived keys -- **Interactive Docs** — Auto-generated OpenAPI with Scalar UI +- **Multimodal Redaction:** Detect and remove sensitive data across PDFs, images, audio, and video +- **AI-Powered Detection:** LLM-driven PII and entity recognition with configurable redaction policies +- **Workspace Isolation:** Multi-tenant workspaces with HKDF-derived credential encryption +- **Real-Time Collaboration:** WebSocket and NATS pub/sub for live document editing +- **Interactive Docs:** Auto-generated OpenAPI with Scalar UI ## Quick Start The fastest way to get started is with [Nvisy Cloud](https://nvisy.com). -To run locally, see [`docker/`](docker/) for development and production compose -files, infrastructure requirements, and configuration reference. +For self-hosted deployments, refer to [`docker/`](docker/) for compose files and +infrastructure requirements, and [`.env.example`](.env.example) for configuration. ## Documentation @@ -36,7 +35,7 @@ Apache 2.0 License, see [LICENSE.txt](LICENSE.txt) ## Support -- **Documentation**: [docs.nvisy.com](https://docs.nvisy.com) -- **Issues**: [GitHub Issues](https://github.com/nvisycom/server/issues) -- **Email**: [support@nvisy.com](mailto:support@nvisy.com) -- **API Status**: [nvisy.openstatus.dev](https://nvisy.openstatus.dev) +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/crates/nvisy-cli/README.md b/crates/nvisy-cli/README.md index 3afecd6e..4f4b0e89 100644 --- a/crates/nvisy-cli/README.md +++ b/crates/nvisy-cli/README.md @@ -6,22 +6,22 @@ Command-line interface and HTTP server for the Nvisy platform. ## Features -- **Server Lifecycle** - Startup, graceful shutdown, and health monitoring -- **Flexible Configuration** - CLI arguments and environment variables -- **TLS Support** - HTTPS with rustls (optional) -- **AI Backends** - Pluggable providers for embeddings, OCR, and VLM +- **Server Lifecycle:** Startup, graceful shutdown, and health monitoring +- **Flexible Configuration:** CLI arguments and environment variables +- **TLS Support:** HTTPS with rustls (optional) +- **AI Backends:** Pluggable providers for embeddings, OCR, and VLM ## Key Dependencies -- `clap` - Command line argument parser with derive macros -- `axum` - Web framework for HTTP server -- `tokio` - Async runtime for concurrent operations -- `tracing` - Structured logging and diagnostics +- `clap`: Command line argument parser with derive macros +- `axum`: Web framework for HTTP server +- `tokio`: Async runtime for concurrent operations +- `tracing`: Structured logging and diagnostics ## Optional Features -- **tls** - HTTPS support with rustls -- **dotenv** - Load configuration from `.env` files +- **tls:** HTTPS support with rustls +- **dotenv:** Load configuration from `.env` files ## Changelog @@ -29,11 +29,11 @@ See [CHANGELOG.md](../../CHANGELOG.md) for release notes and version history. ## License -Apache 2.0 License - see [LICENSE.txt](../../LICENSE.txt) +Apache 2.0 License, see [LICENSE.txt](../../LICENSE.txt) ## Support -- **Documentation**: [docs.nvisy.com](https://docs.nvisy.com) -- **Issues**: [GitHub Issues](https://github.com/nvisycom/server/issues) -- **Email**: [support@nvisy.com](mailto:support@nvisy.com) -- **API Status**: [nvisy.openstatus.dev](https://nvisy.openstatus.dev) +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/crates/nvisy-nats/README.md b/crates/nvisy-nats/README.md index d6966d2b..e0941b24 100644 --- a/crates/nvisy-nats/README.md +++ b/crates/nvisy-nats/README.md @@ -7,28 +7,28 @@ support and unified streaming infrastructure. ## Features -- **Type-Safe Operations** - Generic KV store with compile-time type safety -- **Unified Streaming** - Jobs and real-time updates use the same stream +- **Type-Safe Operations:** Generic KV store with compile-time type safety +- **Unified Streaming:** Jobs and real-time updates use the same stream infrastructure -- **Object Storage** - File and binary data storage using NATS JetStream -- **Job Processing** - Distributed background job queue with retry logic -- **Connection Management** - Automatic reconnection with exponential backoff -- **Error Handling** - Comprehensive error types with retry classification +- **Object Storage:** File and binary data storage using NATS JetStream +- **Job Processing:** Distributed background job queue with retry logic +- **Connection Management:** Automatic reconnection with exponential backoff +- **Error Handling:** Comprehensive error types with retry classification ## Key Dependencies -- `async-nats` - High-performance async NATS client with JetStream support -- `tokio` - Async runtime for connection management and streaming -- `serde` - Type-safe serialization for message payloads +- `async-nats`: High-performance async NATS client with JetStream support +- `tokio`: Async runtime for connection management and streaming +- `serde`: Type-safe serialization for message payloads ## Architecture The crate provides specialized modules for common NATS use cases: -- **Client** - Connection management and configuration -- **KV** - Type-safe Key-Value operations for sessions and caching -- **Object** - Object storage for files and binary data via JetStream -- **Stream** - Unified real-time updates and distributed job processing +- **Client:** Connection management and configuration +- **KV:** Type-safe Key-Value operations for sessions and caching +- **Object:** Object storage for files and binary data via JetStream +- **Stream:** Unified real-time updates and distributed job processing All modules maintain type safety through generic parameters and provide access to the underlying NATS client for extensibility. @@ -39,11 +39,11 @@ See [CHANGELOG.md](../../CHANGELOG.md) for release notes and version history. ## License -Apache 2.0 License - see [LICENSE.txt](../../LICENSE.txt) +Apache 2.0 License, see [LICENSE.txt](../../LICENSE.txt) ## Support -- **Documentation**: [docs.nvisy.com](https://docs.nvisy.com) -- **Issues**: [GitHub Issues](https://github.com/nvisycom/server/issues) -- **Email**: [support@nvisy.com](mailto:support@nvisy.com) -- **API Status**: [nvisy.openstatus.dev](https://nvisy.openstatus.dev) +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/crates/nvisy-object/Cargo.toml b/crates/nvisy-object/Cargo.toml new file mode 100644 index 00000000..da217de1 --- /dev/null +++ b/crates/nvisy-object/Cargo.toml @@ -0,0 +1,52 @@ +[package] +name = "nvisy-object" +description = "Object store providers and streams (S3, Azure, GCS) for Nvisy" +readme = "./README.md" +keywords = ["nvisy", "object-store", "s3", "storage"] +categories = ["filesystem"] + +version = { workspace = true } +rust-version = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +publish = { workspace = true } + +authors = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +documentation = { workspace = true } + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[features] +schema = ["dep:schemars"] + +[dependencies] +# (De)serialization +serde = { workspace = true, features = ["derive"] } + +# Async runtime +tokio = { workspace = true, features = ["sync"] } +async-trait = { workspace = true, features = [] } +futures = { workspace = true, features = [] } + +# Derive macros +derive_more = { workspace = true, features = ["deref"] } + +# Primitive datatypes +bytes = { workspace = true, features = [] } +uuid = { workspace = true, features = [] } + +# Cloud object storage (S3, Azure Blob, GCS) +object_store = { workspace = true, features = ["aws", "azure", "gcp"] } + +# JSON Schema (optional) +schemars = { workspace = true, features = [], optional = true } + +# Observability +tracing = { workspace = true, features = [] } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/crates/nvisy-object/README.md b/crates/nvisy-object/README.md new file mode 100644 index 00000000..0e2cfce8 --- /dev/null +++ b/crates/nvisy-object/README.md @@ -0,0 +1,43 @@ +# nvisy-object + +[![Build](https://img.shields.io/github/actions/workflow/status/nvisycom/server/build.yml?branch=main&label=build%20%26%20test&style=flat-square)](https://github.com/nvisycom/server/actions/workflows/build.yml) + +Object store providers and streaming read/write interfaces for the Nvisy +platform. Supports S3, Azure Blob Storage, and Google Cloud Storage. + +## Features + +- **Provider Abstraction:** Unified trait for credential verification and client construction +- **Streaming I/O:** Source and target stream traits for pipeline integration +- **Multi-Cloud:** S3, Azure Blob Storage, and Google Cloud Storage backends +- **Content Tracking:** UUIDv7-based content source identifiers with content-type metadata + +## Key Dependencies + +- `object_store`: Cloud-agnostic object storage (S3, Azure, GCS) +- `tokio`: Async runtime for streaming operations +- `serde`: Type-safe serialization for credentials and parameters + +## Architecture + +The crate provides specialized modules for object storage: + +- **Client:** Unified `ObjectStoreClient` wrapping `Arc` +- **Providers:** Factory trait with S3, Azure, and GCS implementations +- **Streams:** Source/target stream traits with object store adapters +- **Types:** Self-contained `Error`, `ContentData`, and `ContentSource` types + +## Changelog + +See [CHANGELOG.md](../../CHANGELOG.md) for release notes and version history. + +## License + +Apache 2.0 License, see [LICENSE.txt](../../LICENSE.txt) + +## Support + +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/crates/nvisy-object/src/client/get_output.rs b/crates/nvisy-object/src/client/get_output.rs new file mode 100644 index 00000000..6546f2a0 --- /dev/null +++ b/crates/nvisy-object/src/client/get_output.rs @@ -0,0 +1,15 @@ +//! Result type for [`ObjectStoreClient::get`](super::ObjectStoreClient::get). + +use bytes::Bytes; +use object_store::ObjectMeta; + +/// Result of a successful [`ObjectStoreClient::get`](super::ObjectStoreClient::get) call. +#[derive(Debug)] +pub struct GetOutput { + /// Raw bytes of the retrieved object. + pub data: Bytes, + /// MIME content-type, if the backend provides one. + pub content_type: Option, + /// Object metadata (size, etag, last_modified, location). + pub meta: ObjectMeta, +} diff --git a/crates/nvisy-object/src/client/mod.rs b/crates/nvisy-object/src/client/mod.rs new file mode 100644 index 00000000..9a79fb8c --- /dev/null +++ b/crates/nvisy-object/src/client/mod.rs @@ -0,0 +1,320 @@ +//! Unified object-store client backed by [`object_store::ObjectStore`]. +//! +//! [`ObjectStoreClient`] is a thin, cloneable wrapper around +//! `Arc` that provides convenience methods for the most +//! common operations. Every public method is instrumented with +//! [`tracing`] for observability. + +use std::sync::Arc; + +use bytes::Bytes; +use futures::TryStreamExt; +use futures::stream::BoxStream; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore, PutMode, PutOptions, PutPayload}; + +use crate::types::Error; + +mod get_output; +mod put_output; + +pub use get_output::GetOutput; +pub use put_output::PutOutput; + +/// Cloneable handle to any [`ObjectStore`] backend (S3, Azure, GCS, ...). +/// +/// All methods accept human-readable string keys and convert them to +/// [`object_store::path::Path`] internally. +#[derive(Clone, Debug)] +pub struct ObjectStoreClient(pub Arc); + +impl ObjectStoreClient { + /// Wrap a concrete [`ObjectStore`] implementation. + pub fn new(store: impl ObjectStore) -> Self { + Self(Arc::new(store)) + } + + /// Verify that the backing store is reachable. + /// + /// Issues a HEAD for a probe key — a not-found response is treated as + /// success (the bucket/container exists), any other error is propagated. + #[tracing::instrument(name = "object.verify", skip(self))] + pub async fn verify_reachable(&self) -> Result<(), Error> { + let path = Path::from("_nvisy_verify_probe"); + match self.0.head(&path).await { + Ok(_) => Ok(()), + Err(object_store::Error::NotFound { .. }) => Ok(()), + Err(e) => Err(from_object_store(e)), + } + } + + /// List object keys under `prefix`. + /// + /// Returns all matching keys in a single `Vec`. For lazy iteration, + /// use [`list_stream`](Self::list_stream) instead. + #[tracing::instrument(name = "object.list", skip(self), fields(prefix))] + pub async fn list(&self, prefix: &str) -> Result, Error> { + let prefix = if prefix.is_empty() { + None + } else { + Some(Path::from(prefix)) + }; + self.0 + .list(prefix.as_ref()) + .try_collect() + .await + .map_err(from_object_store) + } + + /// Lazily stream object metadata under `prefix`. + #[tracing::instrument(name = "object.list_stream", skip(self), fields(prefix))] + pub fn list_stream(&self, prefix: &str) -> BoxStream<'_, Result> { + let prefix = if prefix.is_empty() { + None + } else { + Some(Path::from(prefix)) + }; + Box::pin(self.0.list(prefix.as_ref()).map_err(from_object_store)) + } + + /// Retrieve the raw bytes, content-type, and metadata stored at `key`. + #[tracing::instrument(name = "object.get", skip(self), fields(key))] + pub async fn get(&self, key: &str) -> Result { + let path = Path::from(key); + let result = self.0.get(&path).await.map_err(from_object_store)?; + let meta = result.meta.clone(); + let content_type = result + .attributes + .get(&object_store::Attribute::ContentType) + .map(|v| v.to_string()); + let data = result.bytes().await.map_err(from_object_store)?; + Ok(GetOutput { + data, + content_type, + meta, + }) + } + + /// Upload `data` to `key`, optionally setting the content-type. + pub async fn put( + &self, + key: &str, + data: Bytes, + content_type: Option<&str>, + ) -> Result { + self.put_opts(key, data, PutMode::Overwrite, content_type) + .await + } + + /// Upload `data` to `key` with the specified [`PutMode`]. + #[tracing::instrument(name = "object.put_opts", skip(self, data), fields(key, size = data.len()))] + pub async fn put_opts( + &self, + key: &str, + data: Bytes, + mode: PutMode, + content_type: Option<&str>, + ) -> Result { + let path = Path::from(key); + let payload = PutPayload::from(data); + let mut opts = PutOptions { + mode, + ..Default::default() + }; + if let Some(ct) = content_type { + opts.attributes + .insert(object_store::Attribute::ContentType, ct.to_string().into()); + } + let result = self + .0 + .put_opts(&path, payload, opts) + .await + .map_err(from_object_store)?; + Ok(result.into()) + } + + /// Get object metadata without downloading the body. + #[tracing::instrument(name = "object.head", skip(self), fields(key))] + pub async fn head(&self, key: &str) -> Result { + let path = Path::from(key); + self.0.head(&path).await.map_err(from_object_store) + } + + /// Delete the object at `key`. + #[tracing::instrument(name = "object.delete", skip(self), fields(key))] + pub async fn delete(&self, key: &str) -> Result<(), Error> { + let path = Path::from(key); + self.0.delete(&path).await.map_err(from_object_store) + } + + /// Copy an object from `src` to `dst` within the same store. + #[tracing::instrument(name = "object.copy", skip(self), fields(src, dst))] + pub async fn copy(&self, src: &str, dst: &str) -> Result<(), Error> { + let from = Path::from(src); + let to = Path::from(dst); + self.0.copy(&from, &to).await.map_err(from_object_store) + } +} + +/// Convert an [`object_store::Error`] into a crate [`Error`]. +fn from_object_store(err: object_store::Error) -> Error { + let retryable = !matches!( + err, + object_store::Error::NotFound { .. } + | object_store::Error::PermissionDenied { .. } + | object_store::Error::Unauthenticated { .. } + | object_store::Error::AlreadyExists { .. } + | object_store::Error::Precondition { .. } + ); + Error::runtime(err.to_string(), "object-store", retryable).with_source(err) +} + +#[cfg(test)] +mod tests { + use object_store::memory::InMemory; + + use super::*; + + fn test_client() -> ObjectStoreClient { + ObjectStoreClient::new(InMemory::new()) + } + + #[tokio::test] + async fn put_and_get() { + let client = test_client(); + let data = Bytes::from("hello world"); + client + .put("test.txt", data.clone(), Some("text/plain")) + .await + .unwrap(); + + let result = client.get("test.txt").await.unwrap(); + assert_eq!(result.data, data); + assert_eq!(result.content_type.as_deref(), Some("text/plain")); + } + + #[tokio::test] + async fn get_returns_meta() { + let client = test_client(); + let data = Bytes::from("abc"); + client.put("meta.bin", data, None).await.unwrap(); + + let result = client.get("meta.bin").await.unwrap(); + assert_eq!(result.meta.size as usize, 3); + assert_eq!(result.meta.location, Path::from("meta.bin")); + } + + #[tokio::test] + async fn put_returns_result() { + let client = test_client(); + let result = client + .put("etag.bin", Bytes::from("x"), None) + .await + .unwrap(); + assert!(result.e_tag.is_some()); + } + + #[tokio::test] + async fn head() { + let client = test_client(); + client + .put("head.bin", Bytes::from("data"), None) + .await + .unwrap(); + + let meta = client.head("head.bin").await.unwrap(); + assert_eq!(meta.size, 4); + assert_eq!(meta.location, Path::from("head.bin")); + } + + #[tokio::test] + async fn head_not_found() { + let client = test_client(); + let err = client.head("missing").await.unwrap_err(); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn delete() { + let client = test_client(); + client.put("del.bin", Bytes::from("x"), None).await.unwrap(); + client.delete("del.bin").await.unwrap(); + + assert!(client.get("del.bin").await.is_err()); + } + + #[tokio::test] + async fn copy() { + let client = test_client(); + let data = Bytes::from("copy me"); + client.put("src.bin", data.clone(), None).await.unwrap(); + client.copy("src.bin", "dst.bin").await.unwrap(); + + let result = client.get("dst.bin").await.unwrap(); + assert_eq!(result.data, data); + } + + #[tokio::test] + async fn list() { + let client = test_client(); + for i in 0..3 { + client + .put( + &format!("dir/file{i}.txt"), + Bytes::from(format!("{i}")), + None, + ) + .await + .unwrap(); + } + + let items = client.list("dir/").await.unwrap(); + assert_eq!(items.len(), 3); + } + + #[tokio::test] + async fn list_stream() { + use futures::StreamExt; + let client = test_client(); + for i in 0..3 { + client + .put( + &format!("stream/f{i}.bin"), + Bytes::from(format!("{i}")), + None, + ) + .await + .unwrap(); + } + + let items: Vec<_> = client + .list_stream("stream/") + .collect::>() + .await + .into_iter() + .collect::, _>>() + .unwrap(); + assert_eq!(items.len(), 3); + } + + #[tokio::test] + async fn put_create_only() { + let client = test_client(); + client + .put_opts("unique.bin", Bytes::from("first"), PutMode::Create, None) + .await + .unwrap(); + + let err = client + .put_opts("unique.bin", Bytes::from("second"), PutMode::Create, None) + .await + .unwrap_err(); + assert!(!err.is_retryable()); + } + + #[tokio::test] + async fn verify_reachable() { + let client = test_client(); + client.verify_reachable().await.unwrap(); + } +} diff --git a/crates/nvisy-object/src/client/put_output.rs b/crates/nvisy-object/src/client/put_output.rs new file mode 100644 index 00000000..2550eeea --- /dev/null +++ b/crates/nvisy-object/src/client/put_output.rs @@ -0,0 +1,20 @@ +//! Result type for [`ObjectStoreClient::put`](super::ObjectStoreClient::put) and +//! [`ObjectStoreClient::put_opts`](super::ObjectStoreClient::put_opts). + +/// Result of a successful put operation. +#[derive(Debug)] +pub struct PutOutput { + /// Unique identifier for the newly created object, if the backend provides one. + pub e_tag: Option, + /// A version indicator for the newly created object, if the backend provides one. + pub version: Option, +} + +impl From for PutOutput { + fn from(r: object_store::PutResult) -> Self { + Self { + e_tag: r.e_tag, + version: r.version, + } + } +} diff --git a/crates/nvisy-object/src/lib.rs b/crates/nvisy-object/src/lib.rs new file mode 100644 index 00000000..d48db5b3 --- /dev/null +++ b/crates/nvisy-object/src/lib.rs @@ -0,0 +1,14 @@ +#![forbid(unsafe_code)] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![doc = include_str!("../README.md")] + +pub mod client; +/// Client trait and object storage providers. +pub mod providers; +/// Streaming traits and object store adapters. +pub mod streams; +/// Inlined types (Error, ContentData, ContentSource). +pub mod types; + +#[doc(hidden)] +pub mod prelude; diff --git a/crates/nvisy-object/src/prelude.rs b/crates/nvisy-object/src/prelude.rs new file mode 100644 index 00000000..6c4d51a7 --- /dev/null +++ b/crates/nvisy-object/src/prelude.rs @@ -0,0 +1,6 @@ +//! Convenience re-exports. + +pub use crate::client::{GetOutput, ObjectStoreClient, PutOutput}; +pub use crate::providers::{AzureProvider, Client, GcsProvider, S3Provider}; +pub use crate::streams::{ObjectReadStream, ObjectWriteStream, StreamSource, StreamTarget}; +pub use crate::types::{ContentData, ContentSource, Error}; diff --git a/crates/nvisy-object/src/providers/azure.rs b/crates/nvisy-object/src/providers/azure.rs new file mode 100644 index 00000000..c6633a79 --- /dev/null +++ b/crates/nvisy-object/src/providers/azure.rs @@ -0,0 +1,76 @@ +//! Azure Blob Storage provider using [`object_store::azure::MicrosoftAzureBuilder`]. + +use derive_more::Deref; +use object_store::azure::MicrosoftAzureBuilder; +#[cfg(feature = "schema")] +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use super::Client; +use crate::client::ObjectStoreClient; +use crate::types::Error; + +/// Typed credentials for Azure Blob Storage. +#[derive(Debug, Deserialize, Serialize)] +#[cfg_attr(feature = "schema", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +pub struct AzureCredentials { + /// Azure storage container name. + pub container: String, + /// Azure storage account name. + pub account_name: String, + /// Storage account access key. + #[serde(default)] + pub access_key: Option, + /// Shared Access Signature token. + #[serde(default)] + pub sas_token: Option, + /// Custom endpoint URL (for Azure Stack or Azurite). + #[serde(default)] + pub endpoint: Option, +} + +/// Azure Blob Storage-backed object storage client. +#[derive(Deref)] +pub struct AzureProvider(ObjectStoreClient); + +impl Client for AzureProvider { + type Credentials = AzureCredentials; + + const ID: &str = "azure"; + + async fn connect(creds: &Self::Credentials) -> Result { + let mut builder = MicrosoftAzureBuilder::new() + .with_container_name(&creds.container) + .with_account(&creds.account_name); + + if let Some(key) = &creds.access_key { + builder = builder.with_access_key(key); + } + + if let Some(sas) = &creds.sas_token { + let pairs: Vec<(String, String)> = sas + .trim_start_matches('?') + .split('&') + .filter_map(|pair| { + let mut parts = pair.splitn(2, '='); + Some(( + parts.next()?.to_string(), + parts.next().unwrap_or("").to_string(), + )) + }) + .collect(); + builder = builder.with_sas_authorization(pairs); + } + + if let Some(endpoint) = &creds.endpoint { + builder = builder.with_endpoint(endpoint.clone()); + } + + let store = builder + .build() + .map_err(|e| Error::connection(e.to_string(), Self::ID, true))?; + + Ok(Self(ObjectStoreClient::new(store))) + } +} diff --git a/crates/nvisy-object/src/providers/gcs.rs b/crates/nvisy-object/src/providers/gcs.rs new file mode 100644 index 00000000..dab11d71 --- /dev/null +++ b/crates/nvisy-object/src/providers/gcs.rs @@ -0,0 +1,54 @@ +//! Google Cloud Storage provider using [`object_store::gcp::GoogleCloudStorageBuilder`]. + +use derive_more::Deref; +use object_store::gcp::GoogleCloudStorageBuilder; +#[cfg(feature = "schema")] +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use super::Client; +use crate::client::ObjectStoreClient; +use crate::types::Error; + +/// Typed credentials for Google Cloud Storage. +#[derive(Debug, Deserialize, Serialize)] +#[cfg_attr(feature = "schema", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +pub struct GcsCredentials { + /// GCS bucket name. + pub bucket: String, + /// Path to a JSON service account key file. + #[serde(default)] + pub service_account_key: Option, + /// Custom endpoint URL (for testing with a fake GCS server). + #[serde(default)] + pub endpoint: Option, +} + +/// Google Cloud Storage-backed object storage client. +#[derive(Deref)] +pub struct GcsProvider(ObjectStoreClient); + +impl Client for GcsProvider { + type Credentials = GcsCredentials; + + const ID: &str = "gcs"; + + async fn connect(creds: &Self::Credentials) -> Result { + let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(&creds.bucket); + + if let Some(key_path) = &creds.service_account_key { + builder = builder.with_service_account_key(key_path); + } + + if let Some(endpoint) = &creds.endpoint { + builder = builder.with_url(endpoint); + } + + let store = builder + .build() + .map_err(|e| Error::connection(e.to_string(), Self::ID, true))?; + + Ok(Self(ObjectStoreClient::new(store))) + } +} diff --git a/crates/nvisy-object/src/providers/mod.rs b/crates/nvisy-object/src/providers/mod.rs new file mode 100644 index 00000000..5b2ea704 --- /dev/null +++ b/crates/nvisy-object/src/providers/mod.rs @@ -0,0 +1,11 @@ +//! Client trait and object storage providers. + +mod azure; +mod gcs; +mod provider; +mod s3; + +pub use azure::{AzureCredentials, AzureProvider}; +pub use gcs::{GcsCredentials, GcsProvider}; +pub use provider::Client; +pub use s3::{S3Credentials, S3Provider}; diff --git a/crates/nvisy-object/src/providers/provider.rs b/crates/nvisy-object/src/providers/provider.rs new file mode 100644 index 00000000..7a0b0714 --- /dev/null +++ b/crates/nvisy-object/src/providers/provider.rs @@ -0,0 +1,38 @@ +//! Client trait for object storage providers. + +use std::ops::Deref; + +use serde::de::DeserializeOwned; + +use crate::client::ObjectStoreClient; +use crate::types::Error; + +/// Authenticated connection to an object storage backend. +/// +/// Implementations are newtype wrappers around [`ObjectStoreClient`] that +/// handle credential validation and client construction for a specific +/// provider (e.g. S3, Azure, GCS). +pub trait Client: Deref + Send + Sync + 'static { + /// Strongly-typed credentials for this provider. + type Credentials: DeserializeOwned + Send; + + /// Unique identifier (e.g. "s3", "azure"). + const ID: &str; + + /// Verify that the backing store is reachable. + fn verify(&self) -> impl Future> + Send { + self.verify_reachable() + } + + /// Create a connected client from credentials. + fn connect(creds: &Self::Credentials) -> impl Future> + Send + where + Self: Sized; + + /// Optional async cleanup when the connection is released. + /// + /// The default implementation is a no-op. + fn disconnect(&self) -> impl Future + Send { + async {} + } +} diff --git a/crates/nvisy-object/src/providers/s3.rs b/crates/nvisy-object/src/providers/s3.rs new file mode 100644 index 00000000..9fdc1a29 --- /dev/null +++ b/crates/nvisy-object/src/providers/s3.rs @@ -0,0 +1,83 @@ +//! S3-compatible provider using [`object_store::aws::AmazonS3Builder`]. +//! +//! Works with AWS S3, MinIO, and any S3-compatible service. + +use derive_more::Deref; +use object_store::aws::AmazonS3Builder; +#[cfg(feature = "schema")] +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use super::Client; +use crate::client::ObjectStoreClient; +use crate::types::Error; + +/// Typed credentials for S3-compatible provider. +#[derive(Debug, Deserialize, Serialize)] +#[cfg_attr(feature = "schema", derive(JsonSchema))] +#[serde(rename_all = "camelCase")] +pub struct S3Credentials { + /// S3 bucket name. + pub bucket: String, + /// AWS region (defaults to `us-east-1`). + #[serde(default = "default_region")] + pub region: String, + /// Endpoint URL (e.g. `http://localhost:9000` for MinIO). + /// Required for non-AWS S3-compatible services. + #[serde(default)] + pub endpoint: Option, + /// Access key ID for static credentials. + #[serde(default)] + pub access_key_id: Option, + /// Secret access key for static credentials. + #[serde(default)] + pub secret_access_key: Option, + /// Session token for temporary credentials. + #[serde(default)] + pub session_token: Option, +} + +fn default_region() -> String { + "us-east-1".to_string() +} + +/// S3-backed object storage client. +#[derive(Deref)] +pub struct S3Provider(ObjectStoreClient); + +impl Client for S3Provider { + type Credentials = S3Credentials; + + const ID: &str = "s3"; + + async fn connect(creds: &Self::Credentials) -> Result { + let mut builder = AmazonS3Builder::new() + .with_bucket_name(&creds.bucket) + .with_region(&creds.region); + + if let Some(endpoint) = &creds.endpoint { + builder = builder.with_endpoint(endpoint); + if endpoint.starts_with("http://") { + builder = builder.with_allow_http(true); + } + } + + if let Some(access_key) = &creds.access_key_id { + builder = builder.with_access_key_id(access_key); + } + + if let Some(secret_key) = &creds.secret_access_key { + builder = builder.with_secret_access_key(secret_key); + } + + if let Some(token) = &creds.session_token { + builder = builder.with_token(token); + } + + let store = builder + .build() + .map_err(|e| Error::connection(e.to_string(), Self::ID, true))?; + + Ok(Self(ObjectStoreClient::new(store))) + } +} diff --git a/crates/nvisy-object/src/streams/mod.rs b/crates/nvisy-object/src/streams/mod.rs new file mode 100644 index 00000000..38aa936c --- /dev/null +++ b/crates/nvisy-object/src/streams/mod.rs @@ -0,0 +1,11 @@ +//! Streaming traits and object store adapters. + +mod read_object; +mod source_stream; +mod target_stream; +mod write_object; + +pub use read_object::{ObjectReadParams, ObjectReadStream}; +pub use source_stream::StreamSource; +pub use target_stream::StreamTarget; +pub use write_object::{ObjectWriteParams, ObjectWriteStream}; diff --git a/crates/nvisy-object/src/streams/read_object.rs b/crates/nvisy-object/src/streams/read_object.rs new file mode 100644 index 00000000..0a22e6d8 --- /dev/null +++ b/crates/nvisy-object/src/streams/read_object.rs @@ -0,0 +1,155 @@ +//! Streaming reader that pulls objects from a cloud object store. + +use futures::StreamExt; +use serde::Deserialize; +use tokio::sync::mpsc; + +use super::StreamSource; +use crate::client::ObjectStoreClient; +use crate::types::{ContentData, ContentSource, Error}; + +/// Typed parameters for [`ObjectReadStream`]. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ObjectReadParams { + /// Object key prefix to filter by. + #[serde(default)] + pub prefix: String, + /// Skip objects whose size exceeds this limit (in bytes). + #[serde(default)] + pub max_size: Option, +} + +/// A [`StreamSource`] that lists and fetches objects from a cloud object store, +/// emitting each object as a [`ContentData`] onto the output channel. +pub struct ObjectReadStream; + +#[async_trait::async_trait] +impl StreamSource for ObjectReadStream { + type Client = ObjectStoreClient; + type Params = ObjectReadParams; + + fn id(&self) -> &str { + "read" + } + + #[tracing::instrument(name = "object.read", skip_all, fields(prefix = %params.prefix, count))] + async fn read( + &self, + output: mpsc::Sender, + params: Self::Params, + client: Self::Client, + ) -> Result { + let mut stream = client.list_stream(¶ms.prefix); + let mut total = 0u64; + + while let Some(result) = stream.next().await { + let meta = result?; + let key = meta.location.as_ref(); + + if let Some(max) = params.max_size + && meta.size > max + { + tracing::debug!( + key, + size = meta.size, + max_size = max, + "skipping oversized object" + ); + continue; + } + + let source = ContentSource::new(); + tracing::debug!(key, source_id = %source, "fetching object"); + + let result = client.get(key).await?; + + let mut content = ContentData::new(source, result.data); + if let Some(ct) = result.content_type { + content = content.with_content_type(ct); + } + + total += 1; + if output.send(content).await.is_err() { + break; + } + } + + tracing::Span::current().record("count", total); + Ok(total) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use object_store::memory::InMemory; + + use super::*; + + fn test_client() -> ObjectStoreClient { + ObjectStoreClient::new(InMemory::new()) + } + + #[tokio::test] + async fn read_emits_all_objects() { + let client = test_client(); + for i in 0..3 { + client + .put( + &format!("data/file{i}.txt"), + Bytes::from(format!("content-{i}")), + Some("text/plain"), + ) + .await + .unwrap(); + } + + let (tx, mut rx) = mpsc::channel(16); + let stream = ObjectReadStream; + let params = ObjectReadParams { + prefix: "data/".to_string(), + max_size: None, + }; + + let count = stream.read(tx, params, client).await.unwrap(); + assert_eq!(count, 3); + + let mut items = Vec::new(); + while let Some(item) = rx.recv().await { + items.push(item); + } + assert_eq!(items.len(), 3); + } + + #[tokio::test] + async fn read_max_size_filter() { + let client = test_client(); + client + .put("filter/small.bin", Bytes::from("hi"), None) + .await + .unwrap(); + client + .put( + "filter/big.bin", + Bytes::from("this is a much bigger payload"), + None, + ) + .await + .unwrap(); + + let (tx, mut rx) = mpsc::channel(16); + let stream = ObjectReadStream; + let params = ObjectReadParams { + prefix: "filter/".to_string(), + max_size: Some(10), + }; + + let count = stream.read(tx, params, client).await.unwrap(); + assert_eq!(count, 1); + + let item = rx.recv().await.unwrap(); + assert_eq!(item.as_bytes(), b"hi"); + assert!(rx.recv().await.is_none()); + } +} diff --git a/crates/nvisy-object/src/streams/source_stream.rs b/crates/nvisy-object/src/streams/source_stream.rs new file mode 100644 index 00000000..cdc6fdcd --- /dev/null +++ b/crates/nvisy-object/src/streams/source_stream.rs @@ -0,0 +1,33 @@ +//! Streaming source trait for pipeline input. +//! +//! [`StreamSource`] reads content from an external system into the pipeline. + +use serde::de::DeserializeOwned; +use tokio::sync::mpsc; + +use crate::types::{ContentData, Error}; + +/// A source stream that reads content from an external system into the pipeline. +/// +/// Implementations connect to a storage backend (e.g. S3, local filesystem) +/// and emit content data into the pipeline's input channel. +#[async_trait::async_trait] +pub trait StreamSource: Send + Sync + 'static { + /// Strongly-typed parameters for this stream source. + type Params: DeserializeOwned + Send; + /// The client type this stream requires. + type Client: Send + 'static; + + /// Unique identifier for this stream source (e.g. `"read"`). + fn id(&self) -> &str; + + /// Read content from the external system and send it to `output`. + /// + /// Returns the number of items read. + async fn read( + &self, + output: mpsc::Sender, + params: Self::Params, + client: Self::Client, + ) -> Result; +} diff --git a/crates/nvisy-object/src/streams/target_stream.rs b/crates/nvisy-object/src/streams/target_stream.rs new file mode 100644 index 00000000..9e941401 --- /dev/null +++ b/crates/nvisy-object/src/streams/target_stream.rs @@ -0,0 +1,33 @@ +//! Streaming target trait for pipeline output. +//! +//! [`StreamTarget`] writes processed content back to an external system. + +use serde::de::DeserializeOwned; +use tokio::sync::mpsc; + +use crate::types::{ContentData, Error}; + +/// A target stream that writes content from the pipeline to an external system. +/// +/// Implementations receive processed content data from the pipeline and persist +/// it to a storage backend. +#[async_trait::async_trait] +pub trait StreamTarget: Send + Sync + 'static { + /// Strongly-typed parameters for this stream target. + type Params: DeserializeOwned + Send; + /// The client type this stream requires. + type Client: Send + 'static; + + /// Unique identifier for this stream target (e.g. `"write"`). + fn id(&self) -> &str; + + /// Receive content from `input` and write it to the external system. + /// + /// Returns the number of items written. + async fn write( + &self, + input: mpsc::Receiver, + params: Self::Params, + client: Self::Client, + ) -> Result; +} diff --git a/crates/nvisy-object/src/streams/write_object.rs b/crates/nvisy-object/src/streams/write_object.rs new file mode 100644 index 00000000..3eb80a02 --- /dev/null +++ b/crates/nvisy-object/src/streams/write_object.rs @@ -0,0 +1,137 @@ +//! Streaming writer that uploads content to a cloud object store. + +use object_store::PutMode; +use serde::Deserialize; +use tokio::sync::mpsc; + +use super::StreamTarget; +use crate::client::ObjectStoreClient; +use crate::types::{ContentData, Error}; + +/// Typed parameters for [`ObjectWriteStream`]. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ObjectWriteParams { + /// Key prefix prepended to each content source UUID. + #[serde(default)] + pub prefix: String, + /// When `true`, uses `PutMode::Create` so that writing to an existing + /// key fails with an error. + #[serde(default)] + pub create_only: bool, +} + +/// A [`StreamTarget`] that receives [`ContentData`] from the input channel and +/// uploads each one to a cloud object store. +pub struct ObjectWriteStream; + +#[async_trait::async_trait] +impl StreamTarget for ObjectWriteStream { + type Client = ObjectStoreClient; + type Params = ObjectWriteParams; + + fn id(&self) -> &str { + "write" + } + + #[tracing::instrument(name = "object.write", skip_all, fields(prefix = %params.prefix, count))] + async fn write( + &self, + mut input: mpsc::Receiver, + params: Self::Params, + client: Self::Client, + ) -> Result { + let prefix = ¶ms.prefix; + let mut total = 0u64; + + while let Some(content) = input.recv().await { + let source_id = content.content_source.to_string(); + let key = if prefix.is_empty() { + source_id + } else { + format!("{prefix}{source_id}") + }; + + let mode = if params.create_only { + PutMode::Create + } else { + PutMode::Overwrite + }; + client + .put_opts(&key, content.to_bytes(), mode, content.content_type()) + .await?; + + total += 1; + } + + tracing::Span::current().record("count", total); + Ok(total) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use object_store::memory::InMemory; + + use super::*; + use crate::types::{ContentData, ContentSource}; + + fn test_client() -> ObjectStoreClient { + ObjectStoreClient::new(InMemory::new()) + } + + #[tokio::test] + async fn write_uploads_all() { + let client = test_client(); + let (tx, rx) = mpsc::channel(16); + + let sources: Vec = (0..3).map(|_| ContentSource::new()).collect(); + for (i, src) in sources.iter().enumerate() { + let content = ContentData::new(*src, Bytes::from(format!("payload-{i}"))); + tx.send(content).await.unwrap(); + } + drop(tx); + + let stream = ObjectWriteStream; + let params = ObjectWriteParams { + prefix: "out/".to_string(), + create_only: false, + }; + + let count = stream.write(rx, params, client.clone()).await.unwrap(); + assert_eq!(count, 3); + + // Verify all objects were stored + let items = client.list("out/").await.unwrap(); + assert_eq!(items.len(), 3); + } + + #[tokio::test] + async fn write_create_only() { + let client = test_client(); + + // Pre-populate an object at a known key + let source = ContentSource::new(); + let key = format!("prefix/{source}"); + client + .put(&key, Bytes::from("existing"), None) + .await + .unwrap(); + + // Try to write the same key with create_only + let (tx, rx) = mpsc::channel(1); + let content = ContentData::new(source, Bytes::from("new")); + tx.send(content).await.unwrap(); + drop(tx); + + let stream = ObjectWriteStream; + let params = ObjectWriteParams { + prefix: "prefix/".to_string(), + create_only: true, + }; + + let result = stream.write(rx, params, client).await; + assert!(result.is_err()); + } +} diff --git a/crates/nvisy-object/src/types/content_data.rs b/crates/nvisy-object/src/types/content_data.rs new file mode 100644 index 00000000..690de74d --- /dev/null +++ b/crates/nvisy-object/src/types/content_data.rs @@ -0,0 +1,46 @@ +//! Content payload with source identity and optional content-type. + +use bytes::Bytes; + +use super::ContentSource; + +/// A blob of content together with its [`ContentSource`] identity and an +/// optional MIME content-type. +pub struct ContentData { + /// The unique source identifier for this content. + pub content_source: ContentSource, + data: Bytes, + content_type: Option, +} + +impl ContentData { + /// Create a new [`ContentData`] from a source and raw bytes. + pub fn new(source: ContentSource, data: Bytes) -> Self { + Self { + content_source: source, + data, + content_type: None, + } + } + + /// Attach a content-type (MIME) to this content. + pub fn with_content_type(mut self, ct: impl Into) -> Self { + self.content_type = Some(ct.into()); + self + } + + /// Return the content-type, if set. + pub fn content_type(&self) -> Option<&str> { + self.content_type.as_deref() + } + + /// Return a clone of the underlying [`Bytes`]. + pub fn to_bytes(&self) -> Bytes { + self.data.clone() + } + + /// Return a byte-slice view of the content. + pub fn as_bytes(&self) -> &[u8] { + &self.data + } +} diff --git a/crates/nvisy-object/src/types/content_source.rs b/crates/nvisy-object/src/types/content_source.rs new file mode 100644 index 00000000..b2df1a63 --- /dev/null +++ b/crates/nvisy-object/src/types/content_source.rs @@ -0,0 +1,30 @@ +//! Unique content source identifier backed by UUIDv7. + +use std::fmt; + +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// Opaque identifier for a piece of content, backed by a UUIDv7. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ContentSource(Uuid); + +impl ContentSource { + /// Generate a new time-ordered content source id (UUIDv7). + pub fn new() -> Self { + Self(Uuid::now_v7()) + } +} + +impl Default for ContentSource { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Display for ContentSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/crates/nvisy-object/src/types/error.rs b/crates/nvisy-object/src/types/error.rs new file mode 100644 index 00000000..666f5ad0 --- /dev/null +++ b/crates/nvisy-object/src/types/error.rs @@ -0,0 +1,68 @@ +//! Minimal error type for object-store operations. + +use std::fmt; + +type BoxedError = Box; + +/// A lightweight error carrying a message, an optional source, and a +/// retryable flag. +pub struct Error { + message: String, + source: Option, + retryable: bool, +} + +impl Error { + /// Create a runtime error formatted as `[{label}] {msg}`. + pub fn runtime(msg: impl fmt::Display, label: &str, retryable: bool) -> Self { + Self { + message: format!("[{label}] {msg}"), + source: None, + retryable, + } + } + + /// Create a connection error formatted as `[{label}] {msg}`. + pub fn connection(msg: impl fmt::Display, label: &str, retryable: bool) -> Self { + Self { + message: format!("[{label}] {msg}"), + source: None, + retryable, + } + } + + /// Attach a source error. + pub fn with_source(mut self, source: impl std::error::Error + Send + Sync + 'static) -> Self { + self.source = Some(Box::new(source)); + self + } + + /// Whether the caller should retry this operation. + pub fn is_retryable(&self) -> bool { + self.retryable + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.message) + } +} + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Error") + .field("message", &self.message) + .field("retryable", &self.retryable) + .field("source", &self.source) + .finish() + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.source + .as_deref() + .map(|e| e as &(dyn std::error::Error + 'static)) + } +} diff --git a/crates/nvisy-object/src/types/mod.rs b/crates/nvisy-object/src/types/mod.rs new file mode 100644 index 00000000..604084e8 --- /dev/null +++ b/crates/nvisy-object/src/types/mod.rs @@ -0,0 +1,9 @@ +//! Inlined types from `nvisy-core` to keep this crate self-contained. + +pub mod content_data; +pub mod content_source; +pub mod error; + +pub use content_data::ContentData; +pub use content_source::ContentSource; +pub use error::Error; diff --git a/crates/nvisy-postgres/README.md b/crates/nvisy-postgres/README.md index 98c78517..84f7d498 100644 --- a/crates/nvisy-postgres/README.md +++ b/crates/nvisy-postgres/README.md @@ -7,18 +7,18 @@ pooling and embedded migrations. ## Features -- **Async Connection Pooling** - High-performance connection management with +- **Async Connection Pooling:** High-performance connection management with Deadpool -- **Type-Safe Queries** - Compile-time SQL validation with Diesel ORM -- **Embedded Migrations** - Automatic schema management with rollback support -- **Error Handling** - Comprehensive database error types with context -- **Production Ready** - Health checks and connection monitoring +- **Type-Safe Queries:** Compile-time SQL validation with Diesel ORM +- **Embedded Migrations:** Automatic schema management with rollback support +- **Error Handling:** Comprehensive database error types with context +- **Production Ready:** Health checks and connection monitoring ## Key Dependencies -- `diesel` - Safe, extensible ORM and query builder for Rust -- `diesel-async` - Async support for Diesel with PostgreSQL -- `deadpool` - Async connection pooling for high-concurrency workloads +- `diesel`: Safe, extensible ORM and query builder for Rust +- `diesel-async`: Async support for Diesel with PostgreSQL +- `deadpool`: Async connection pooling for high-concurrency workloads ## Schema Management @@ -37,11 +37,11 @@ See [CHANGELOG.md](../../CHANGELOG.md) for release notes and version history. ## License -Apache 2.0 License - see [LICENSE.txt](../../LICENSE.txt) +Apache 2.0 License, see [LICENSE.txt](../../LICENSE.txt) ## Support -- **Documentation**: [docs.nvisy.com](https://docs.nvisy.com) -- **Issues**: [GitHub Issues](https://github.com/nvisycom/server/issues) -- **Email**: [support@nvisy.com](mailto:support@nvisy.com) -- **API Status**: [nvisy.openstatus.dev](https://nvisy.openstatus.dev) +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/crates/nvisy-server/README.md b/crates/nvisy-server/README.md index 7d896199..0b94cd76 100644 --- a/crates/nvisy-server/README.md +++ b/crates/nvisy-server/README.md @@ -7,27 +7,27 @@ built with Axum and Tokio. ## Features -- **Async HTTP Server** - Built with Axum web framework on Tokio runtime -- **JWT Authentication** - Stateless authentication with session management -- **OpenAPI Documentation** - Auto-generated Swagger and Scalar UI -- **Type-Safe Validation** - Comprehensive request/response validation -- **Middleware Stack** - CORS, security headers, and request logging -- **Service Integration** - PostgreSQL, MinIO, OpenRouter, and NATS clients +- **Async HTTP Server:** Built with Axum web framework on Tokio runtime +- **JWT Authentication:** Stateless authentication with session management +- **OpenAPI Documentation:** Auto-generated Swagger and Scalar UI +- **Type-Safe Validation:** Comprehensive request/response validation +- **Middleware Stack:** CORS, security headers, and request logging +- **Service Integration:** PostgreSQL, MinIO, OpenRouter, and NATS clients ## Key Dependencies -- `axum` - Modern web framework with excellent async performance -- `tokio` - Async runtime for concurrent request handling -- `tower` - Middleware ecosystem for HTTP services -- `utoipa` - OpenAPI documentation generation +- `axum`: Modern web framework with excellent async performance +- `tokio`: Async runtime for concurrent request handling +- `tower`: Middleware ecosystem for HTTP services +- `utoipa`: OpenAPI documentation generation ## API Documentation When running, the server exposes interactive documentation at: -- **Swagger UI**: `/api/swagger` -- **Scalar UI**: `/api/scalar` -- **OpenAPI JSON**: `/api/openapi.json` +- **Swagger UI:** `/api/swagger` +- **Scalar UI:** `/api/scalar` +- **OpenAPI JSON:** `/api/openapi.json` ## Changelog @@ -35,11 +35,11 @@ See [CHANGELOG.md](../../CHANGELOG.md) for release notes and version history. ## License -Apache 2.0 License - see [LICENSE.txt](../../LICENSE.txt) +Apache 2.0 License, see [LICENSE.txt](../../LICENSE.txt) ## Support -- **Documentation**: [docs.nvisy.com](https://docs.nvisy.com) -- **Issues**: [GitHub Issues](https://github.com/nvisycom/server/issues) -- **Email**: [support@nvisy.com](mailto:support@nvisy.com) -- **API Status**: [nvisy.openstatus.dev](https://nvisy.openstatus.dev) +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/crates/nvisy-server/src/service/mod.rs b/crates/nvisy-server/src/service/mod.rs index c17add3d..5da98a67 100644 --- a/crates/nvisy-server/src/service/mod.rs +++ b/crates/nvisy-server/src/service/mod.rs @@ -24,8 +24,8 @@ pub use crate::service::webhook::WebhookEmitter; /// Used for the [`State`] extraction (dependency injection). /// /// [`State`]: axum::extract::State -#[must_use = "state does nothing unless you use it"] #[derive(Clone)] +#[must_use = "state does nothing unless you use it"] pub struct ServiceState { // External services: pub postgres: PgClient, diff --git a/crates/nvisy-webhook/README.md b/crates/nvisy-webhook/README.md index c54f20b9..d43a966d 100644 --- a/crates/nvisy-webhook/README.md +++ b/crates/nvisy-webhook/README.md @@ -30,11 +30,11 @@ See [CHANGELOG.md](../../CHANGELOG.md) for release notes and version history. ## License -Apache 2.0 License - see [LICENSE.txt](../../LICENSE.txt) +Apache 2.0 License, see [LICENSE.txt](../../LICENSE.txt) ## Support -- **Documentation**: [docs.nvisy.com](https://docs.nvisy.com) -- **Issues**: [GitHub Issues](https://github.com/nvisycom/server/issues) -- **Email**: [support@nvisy.com](mailto:support@nvisy.com) -- **API Status**: [nvisy.openstatus.dev](https://nvisy.openstatus.dev) +- **Documentation:** [docs.nvisy.com](https://docs.nvisy.com) +- **Issues:** [GitHub Issues](https://github.com/nvisycom/server/issues) +- **Email:** [support@nvisy.com](mailto:support@nvisy.com) +- **API Status:** [nvisy.openstatus.dev](https://nvisy.openstatus.dev) diff --git a/migrations/README.md b/migrations/README.md index 907915d3..fa04c4be 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -27,32 +27,32 @@ Each migration includes a comprehensive down migration that: ### 1. Naming Conventions -- **Tables**: `snake_case`, descriptive nouns -- **Columns**: `snake_case`, clear and concise -- **Indexes**: `table_purpose_idx` format -- **Constraints**: `table_column_constraint_type` format -- **Functions**: `snake_case` with descriptive verbs -- **Enums**: `UPPER_CASE` with descriptive names +- **Tables:** `snake_case`, descriptive nouns +- **Columns:** `snake_case`, clear and concise +- **Indexes:** `table_purpose_idx` format +- **Constraints:** `table_column_constraint_type` format +- **Functions:** `snake_case` with descriptive verbs +- **Enums:** `UPPER_CASE` with descriptive names ### 2. Data Types and Sizing -- **UUIDs**: Primary keys for external references -- **BIGSERIAL**: Internal sequential IDs where needed -- **TEXT**: Variable length strings with CHECK constraints for limits -- **JSONB**: Structured data with size limits -- **TIMESTAMPTZ**: All timestamps with timezone awareness -- **DECIMAL**: Precise numeric values for financial data +- **UUIDs:** Primary keys for external references +- **BIGSERIAL:** Internal sequential IDs where needed +- **TEXT:** Variable length strings with CHECK constraints for limits +- **JSONB:** Structured data with size limits +- **TIMESTAMPTZ:** All timestamps with timezone awareness +- **DECIMAL:** Precise numeric values for financial data ### 3. Relationship Management -- **Foreign Keys**: Always include appropriate CASCADE/SET NULL rules -- **Self-referencing**: Support for hierarchical structures where needed -- **Many-to-many**: Explicit junction tables with additional metadata -- **Soft relationships**: References that survive deletions where appropriate +- **Foreign Keys:** Always include appropriate CASCADE/SET NULL rules +- **Self-referencing:** Support for hierarchical structures where needed +- **Many-to-many:** Explicit junction tables with additional metadata +- **Soft relationships:** References that survive deletions where appropriate ### 4. Error Handling and Resilience -- **Graceful failures**: Functions that handle errors appropriately -- **Transaction safety**: All operations designed for ACID compliance -- **Rollback support**: Complete down migrations for all changes -- **Data preservation**: Soft deletion patterns to prevent data loss +- **Graceful failures:** Functions that handle errors appropriately +- **Transaction safety:** All operations designed for ACID compliance +- **Rollback support:** Complete down migrations for all changes +- **Data preservation:** Soft deletion patterns to prevent data loss