From b2b90e85de84fbcf8cfdc77726a43552f283ba08 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 17:08:14 +0000 Subject: [PATCH 1/5] feat(email): add Outlook provider model + Microsoft Graph client Foundational layer for Outlook email support, mirroring the Gmail integration: - models_email: add UserProvider::Outlook and a new outlook module with Microsoft Graph resource models (message, mailFolder, attachment), delta-query models (the Outlook analogue of Gmail history), and change- notification/subscription models (the analogue of Gmail watch + Pub/Sub). - outlook_client: new crate wrapping the Microsoft Graph mail API, mirroring gmail_client's surface -- fetch/send messages, conversations (threads), delta sync, mail folders mapped to system labels, attachments, and webhook subscriptions. Includes a pure, unit-tested Graph->service message mapper. - email / email_db_client: thread UserProvider::Outlook through the domain and DB layers; add a forward-only migration adding OUTLOOK to the email_user_provider_enum Postgres enum. Verified: models_email + outlook_client compile with 15 passing unit tests; email and email_db_client compile. --- rust/cloud-storage/Cargo.lock | 18 + rust/cloud-storage/Cargo.toml | 1 + .../email/src/domain/models/link.rs | 2 + .../src/outbound/email_pg_repo/db_types.rs | 1 + .../email/src/outbound/email_pg_repo/link.rs | 2 + .../email_db_client/src/links/types.rs | 4 + ...260609170618_add_outlook_user_provider.sql | 8 + .../models_email/src/email/api/link.rs | 3 + .../models_email/src/email/service/link.rs | 2 + .../models_email/src/gmail/inbox_sync.rs | 15 + rust/cloud-storage/models_email/src/lib.rs | 1 + .../models_email/src/outlook/delta.rs | 121 ++++++ .../models_email/src/outlook/error.rs | 66 ++++ .../models_email/src/outlook/mod.rs | 350 ++++++++++++++++++ .../models_email/src/outlook/subscription.rs | 125 +++++++ rust/cloud-storage/outlook_client/Cargo.toml | 22 ++ .../outlook_client/src/attachments.rs | 33 ++ .../outlook_client/src/convert.rs | 317 ++++++++++++++++ .../cloud-storage/outlook_client/src/delta.rs | 62 ++++ .../outlook_client/src/folders.rs | 126 +++++++ rust/cloud-storage/outlook_client/src/lib.rs | 317 ++++++++++++++++ .../outlook_client/src/messages.rs | 174 +++++++++ .../outlook_client/src/profile.rs | 19 + .../outlook_client/src/subscriptions.rs | 98 +++++ .../outlook_client/src/threads.rs | 36 ++ 25 files changed, 1923 insertions(+) create mode 100644 rust/cloud-storage/macro_db_client/migrations/20260609170618_add_outlook_user_provider.sql create mode 100644 rust/cloud-storage/models_email/src/outlook/delta.rs create mode 100644 rust/cloud-storage/models_email/src/outlook/error.rs create mode 100644 rust/cloud-storage/models_email/src/outlook/mod.rs create mode 100644 rust/cloud-storage/models_email/src/outlook/subscription.rs create mode 100644 rust/cloud-storage/outlook_client/Cargo.toml create mode 100644 rust/cloud-storage/outlook_client/src/attachments.rs create mode 100644 rust/cloud-storage/outlook_client/src/convert.rs create mode 100644 rust/cloud-storage/outlook_client/src/delta.rs create mode 100644 rust/cloud-storage/outlook_client/src/folders.rs create mode 100644 rust/cloud-storage/outlook_client/src/lib.rs create mode 100644 rust/cloud-storage/outlook_client/src/messages.rs create mode 100644 rust/cloud-storage/outlook_client/src/profile.rs create mode 100644 rust/cloud-storage/outlook_client/src/subscriptions.rs create mode 100644 rust/cloud-storage/outlook_client/src/threads.rs diff --git a/rust/cloud-storage/Cargo.lock b/rust/cloud-storage/Cargo.lock index 4b26d299a3..d2152d2224 100644 --- a/rust/cloud-storage/Cargo.lock +++ b/rust/cloud-storage/Cargo.lock @@ -8261,6 +8261,24 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "outlook_client" +version = "0.1.0" +dependencies = [ + "anyhow", + "base64 0.22.1", + "chrono", + "macro_uuid", + "mockall", + "models_email", + "reqwest 0.13.4", + "serde", + "serde_json", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "outref" version = "0.5.2" diff --git a/rust/cloud-storage/Cargo.toml b/rust/cloud-storage/Cargo.toml index 2a16e91730..4c956054ff 100644 --- a/rust/cloud-storage/Cargo.toml +++ b/rust/cloud-storage/Cargo.toml @@ -37,6 +37,7 @@ members = [ "memory", "notification_sandbox", "notification_service", + "outlook_client", "organization_retention_handler", "organization_retention_trigger", "properties_service", diff --git a/rust/cloud-storage/email/src/domain/models/link.rs b/rust/cloud-storage/email/src/domain/models/link.rs index fc025bc3bd..2d80af351f 100644 --- a/rust/cloud-storage/email/src/domain/models/link.rs +++ b/rust/cloud-storage/email/src/domain/models/link.rs @@ -6,12 +6,14 @@ use uuid::Uuid; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum UserProvider { Gmail, + Outlook, } impl UserProvider { pub fn as_str(&self) -> &'static str { match self { UserProvider::Gmail => "GMAIL", + UserProvider::Outlook => "OUTLOOK", } } } diff --git a/rust/cloud-storage/email/src/outbound/email_pg_repo/db_types.rs b/rust/cloud-storage/email/src/outbound/email_pg_repo/db_types.rs index 055642f33a..757ec1d1c2 100644 --- a/rust/cloud-storage/email/src/outbound/email_pg_repo/db_types.rs +++ b/rust/cloud-storage/email/src/outbound/email_pg_repo/db_types.rs @@ -150,6 +150,7 @@ impl ThreadPreviewCursorDbRow { #[dg(forward = crate::domain::models::UserProvider)] pub enum DbUserProvider { Gmail, + Outlook, } #[derive(Debug, Clone)] diff --git a/rust/cloud-storage/email/src/outbound/email_pg_repo/link.rs b/rust/cloud-storage/email/src/outbound/email_pg_repo/link.rs index f96826c4f8..1fcf5f63a5 100644 --- a/rust/cloud-storage/email/src/outbound/email_pg_repo/link.rs +++ b/rust/cloud-storage/email/src/outbound/email_pg_repo/link.rs @@ -14,6 +14,7 @@ pub(super) async fn link_by_fusionauth_and_macro_id( ) -> Result, sqlx::Error> { let provider: DbUserProvider = match provider { UserProvider::Gmail => DbUserProvider::Gmail, + UserProvider::Outlook => DbUserProvider::Outlook, }; let db_link = sqlx::query_as!( @@ -47,6 +48,7 @@ pub(super) async fn link_by_fusionauth_email_provider( ) -> Result, sqlx::Error> { let provider: DbUserProvider = match provider { UserProvider::Gmail => DbUserProvider::Gmail, + UserProvider::Outlook => DbUserProvider::Outlook, }; let db_link = sqlx::query_as!( diff --git a/rust/cloud-storage/email_db_client/src/links/types.rs b/rust/cloud-storage/email_db_client/src/links/types.rs index 8f65e69093..dca2141289 100644 --- a/rust/cloud-storage/email_db_client/src/links/types.rs +++ b/rust/cloud-storage/email_db_client/src/links/types.rs @@ -9,12 +9,14 @@ use sqlx::{Type, types::Uuid}; #[dg(backward = models_email::email::service::link::UserProvider)] pub enum DbUserProvider { Gmail, + Outlook, } impl DbUserProvider { pub fn as_str(&self) -> &'static str { match self { DbUserProvider::Gmail => "GMAIL", + DbUserProvider::Outlook => "OUTLOOK", } } } @@ -45,6 +47,7 @@ impl From for DbLink { .to_string(), provider: match service_link.provider { models_email::service::link::UserProvider::Gmail => DbUserProvider::Gmail, + models_email::service::link::UserProvider::Outlook => DbUserProvider::Outlook, }, is_sync_active: service_link.is_sync_active, created_at: service_link.created_at, @@ -74,6 +77,7 @@ impl TryFrom for models_email::email::service::link::Link { email_address: EmailStr::try_from(email_address)?, provider: match provider { DbUserProvider::Gmail => UserProvider::Gmail, + DbUserProvider::Outlook => UserProvider::Outlook, }, is_sync_active, created_at, diff --git a/rust/cloud-storage/macro_db_client/migrations/20260609170618_add_outlook_user_provider.sql b/rust/cloud-storage/macro_db_client/migrations/20260609170618_add_outlook_user_provider.sql new file mode 100644 index 0000000000..058dad8e02 --- /dev/null +++ b/rust/cloud-storage/macro_db_client/migrations/20260609170618_add_outlook_user_provider.sql @@ -0,0 +1,8 @@ +-- Add OUTLOOK as a supported email provider so Outlook (Microsoft Graph) +-- inboxes can be linked alongside Gmail. +-- +-- Forward-only: Postgres cannot remove a value from an enum type without +-- rewriting it, so there is no accompanying down migration. ADD VALUE IF NOT +-- EXISTS is idempotent and (on Postgres 12+) safe inside the migration +-- transaction as long as the new value is not used in the same transaction. +ALTER TYPE email_user_provider_enum ADD VALUE IF NOT EXISTS 'OUTLOOK'; diff --git a/rust/cloud-storage/models_email/src/email/api/link.rs b/rust/cloud-storage/models_email/src/email/api/link.rs index d5b9ea176c..d9b0b0db0f 100644 --- a/rust/cloud-storage/models_email/src/email/api/link.rs +++ b/rust/cloud-storage/models_email/src/email/api/link.rs @@ -43,12 +43,14 @@ impl SyncStatus { #[serde(rename_all = "UPPERCASE")] pub enum UserProvider { Gmail, + Outlook, } impl UserProvider { pub fn as_str(&self) -> &'static str { match self { UserProvider::Gmail => "GMAIL", + UserProvider::Outlook => "OUTLOOK", } } } @@ -63,6 +65,7 @@ impl From for UserProvider { fn from(provider: crate::email::service::link::UserProvider) -> Self { match provider { crate::email::service::link::UserProvider::Gmail => UserProvider::Gmail, + crate::email::service::link::UserProvider::Outlook => UserProvider::Outlook, } } } diff --git a/rust/cloud-storage/models_email/src/email/service/link.rs b/rust/cloud-storage/models_email/src/email/service/link.rs index c1b9891ba0..4533bfff56 100644 --- a/rust/cloud-storage/models_email/src/email/service/link.rs +++ b/rust/cloud-storage/models_email/src/email/service/link.rs @@ -21,12 +21,14 @@ pub struct Link { #[derive(Debug, Clone, Copy, ToSchema, Serialize, Deserialize, PartialEq, Eq)] pub enum UserProvider { Gmail, + Outlook, } impl UserProvider { pub fn as_str(&self) -> &'static str { match self { UserProvider::Gmail => "GMAIL", + UserProvider::Outlook => "OUTLOOK", } } } diff --git a/rust/cloud-storage/models_email/src/gmail/inbox_sync.rs b/rust/cloud-storage/models_email/src/gmail/inbox_sync.rs index 6a9083e177..f19d4cd8b6 100644 --- a/rust/cloud-storage/models_email/src/gmail/inbox_sync.rs +++ b/rust/cloud-storage/models_email/src/gmail/inbox_sync.rs @@ -61,6 +61,10 @@ pub enum InboxSyncOperation { // The original message we get from gmail when there is a change to the user's inbox. // Contains the new history_id for the user's inbox. GmailMessage(GmailMessagePayload), + // The notification we get from Microsoft Graph when there is a change to an + // Outlook user's inbox. Unlike Gmail there is no history id; the worker runs + // an incremental delta sync from the persisted delta link for the link. + OutlookNotification(OutlookNotificationPayload), // Operation to upsert a message UpsertMessage(UpsertMessagePayload), // Operation to delete a message @@ -74,6 +78,17 @@ pub struct GmailMessagePayload { pub history_id: u64, } +/// Payload for an Outlook change notification queued from the Graph webhook. +/// We coalesce the notification batch down to the affected subscription; the +/// worker then runs a delta sync from the persisted delta link rather than +/// acting on a single message id, which keeps us resilient to dropped or merged +/// notifications. +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] +pub struct OutlookNotificationPayload { + /// The Graph subscription id that produced the notification. + pub subscription_id: String, +} + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] pub struct UpsertMessagePayload { pub provider_message_id: String, diff --git a/rust/cloud-storage/models_email/src/lib.rs b/rust/cloud-storage/models_email/src/lib.rs index 38b79b413d..955cc63fcf 100644 --- a/rust/cloud-storage/models_email/src/lib.rs +++ b/rust/cloud-storage/models_email/src/lib.rs @@ -2,3 +2,4 @@ pub mod email; pub use email::{api, db, service}; pub mod gmail; +pub mod outlook; diff --git a/rust/cloud-storage/models_email/src/outlook/delta.rs b/rust/cloud-storage/models_email/src/outlook/delta.rs new file mode 100644 index 0000000000..8c7579165f --- /dev/null +++ b/rust/cloud-storage/models_email/src/outlook/delta.rs @@ -0,0 +1,121 @@ +//! Microsoft Graph delta-query models. +//! +//! Outlook does not expose a Gmail-style monotonic `historyId`. Instead, change +//! tracking is done with [delta queries]: an initial sync returns a page of +//! messages plus an opaque `@odata.deltaLink`; calling that link later returns +//! only the items that changed since, again ending in a fresh `@odata.deltaLink`. +//! +//! This is the Outlook analogue of [`crate::gmail::history`]. The persisted +//! delta link plays the role of the persisted `historyId`. +//! +//! [delta queries]: https://learn.microsoft.com/en-us/graph/delta-query-messages + +use super::MessageResource; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use uuid::Uuid; + +/// Annotation present on a delta item that represents a removed (deleted) message. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Removed { + /// `"deleted"` for hard deletes, `"changed"` for soft deletes/moves. + #[serde(default)] + pub reason: Option, +} + +/// A single entry in a delta page. Either a (partial) message that was created +/// or updated, or a `@removed` marker carrying just the id. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DeltaItem { + /// Present only when the item was removed. + #[serde(rename = "@removed", default)] + pub removed: Option, + /// The message fields. For `@removed` items only `id` is meaningful; every + /// other field falls back to its default. + #[serde(flatten)] + pub message: MessageResource, +} + +impl DeltaItem { + /// Whether this entry represents a removed (deleted) message. + pub fn is_removed(&self) -> bool { + self.removed.is_some() + } +} + +/// One page of a delta response. Exactly one of `next_link` / `delta_link` is +/// typically set: `next_link` while more pages remain, `delta_link` on the last +/// page. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DeltaResponse { + #[serde(default)] + pub value: Vec, + /// Opaque URL to fetch the next page of the current sync. + #[serde(rename = "@odata.nextLink", default)] + pub next_link: Option, + /// Opaque URL to persist and re-use for the next incremental sync. + #[serde(rename = "@odata.deltaLink", default)] + pub delta_link: Option, +} + +/// The curated set of changes derived from walking every page of a delta sync. +/// +/// Outlook analogue of [`crate::gmail::history::InboxChanges`]. Because Graph +/// returns the full updated message body (not just label deltas), there is no +/// separate "labels to update" bucket — an updated message is simply upserted. +#[derive(Debug, Deserialize, Serialize, Clone, Default)] +pub struct DeltaChanges { + /// Provider message ids that are new or were updated and need upserting. + pub message_ids_to_upsert: HashSet, + /// Provider message ids that were removed and need deleting locally. + pub message_ids_to_delete: HashSet, + /// The `@odata.deltaLink` to persist for the next incremental sync. + pub delta_link: Option, +} + +/// Database representation of a stored Outlook delta link for a link/folder. +/// +/// Outlook analogue of [`crate::gmail::history::GmailHistoryDb`]. We track the +/// delta link per (link, folder) since Graph delta is scoped to a single folder. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OutlookDeltaDb { + pub link_id: Uuid, + /// The mail folder this delta link tracks (typically the well-known inbox id). + pub folder_id: String, + pub delta_link: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_mixed_delta_page() { + let json = r#"{ + "value": [ + { "id": "msg-new", "subject": "New", "isRead": false }, + { "id": "msg-gone", "@removed": { "reason": "deleted" } } + ], + "@odata.deltaLink": "https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages/delta?$deltatoken=abc" + }"#; + + let page: DeltaResponse = serde_json::from_str(json).unwrap(); + assert_eq!(page.value.len(), 2); + + let new = &page.value[0]; + assert!(!new.is_removed()); + assert_eq!(new.message.id, "msg-new"); + assert_eq!(new.message.subject.as_deref(), Some("New")); + + let gone = &page.value[1]; + assert!(gone.is_removed()); + assert_eq!(gone.message.id, "msg-gone"); + assert_eq!(gone.removed.as_ref().unwrap().reason.as_deref(), Some("deleted")); + + assert!(page.delta_link.is_some()); + assert!(page.next_link.is_none()); + } +} diff --git a/rust/cloud-storage/models_email/src/outlook/error.rs b/rust/cloud-storage/models_email/src/outlook/error.rs new file mode 100644 index 0000000000..bd9f1f6182 --- /dev/null +++ b/rust/cloud-storage/models_email/src/outlook/error.rs @@ -0,0 +1,66 @@ +//! Error type for the Microsoft Graph (Outlook) client. +//! +//! Mirrors [`crate::gmail::error::GmailError`] so that call sites in the email +//! service can map provider errors to HTTP status codes uniformly across +//! providers. + +use thiserror::Error; + +/// Errors returned by the Outlook (Microsoft Graph) client. +#[derive(Error, Debug)] +pub enum OutlookError { + /// Graph throttling: HTTP 429. Honour the `Retry-After` header when retrying. + #[error("API Rate Limit Exceeded (429)")] + RateLimitExceeded, + + /// The access token is invalid or expired: HTTP 401. + #[error("Unauthorized: The access token is invalid or expired (401)")] + Unauthorized, + + /// The token lacks the required Graph scope/permission: HTTP 403. + #[error("Forbidden: Insufficient permissions (403)")] + Forbidden, + + /// Graph returned a 5xx. + #[error("Server Error ({0}): {1}")] + ServerError(u16, String), + + /// The underlying HTTP request could not be sent. + #[error("HTTP Request Error: {0}")] + HttpRequest(String), + + /// Graph returned a non-success status we don't special-case. + #[error("API Error: {0}")] + ApiError(String), + + /// Failed to read or decode the response body. + #[error("Failed to read response body: {0}")] + BodyReadError(String), + + /// The requested resource was a duplicate / already exists: HTTP 409. + #[error("Conflict: {0}")] + Conflict(String), + + /// The requested resource does not exist: HTTP 404. + #[error("Not found: {0}")] + NotFound(String), + + /// Catch-all internal error. + #[error("Internal Error: {0}")] + GenericError(String), +} + +impl OutlookError { + /// Map a non-success HTTP status plus body into the appropriate variant. + pub fn from_status(status: u16, body: String) -> Self { + match status { + 401 => OutlookError::Unauthorized, + 403 => OutlookError::Forbidden, + 404 => OutlookError::NotFound(body), + 409 => OutlookError::Conflict(body), + 429 => OutlookError::RateLimitExceeded, + s if s >= 500 => OutlookError::ServerError(s, body), + s => OutlookError::ApiError(format!("Graph API error {s}: {body}")), + } + } +} diff --git a/rust/cloud-storage/models_email/src/outlook/mod.rs b/rust/cloud-storage/models_email/src/outlook/mod.rs new file mode 100644 index 0000000000..337a2f235a --- /dev/null +++ b/rust/cloud-storage/models_email/src/outlook/mod.rs @@ -0,0 +1,350 @@ +//! Microsoft Graph (Outlook) API resource models. +//! +//! These mirror the shape of the structs in [`crate::gmail`] but model the +//! Microsoft Graph `message`, `mailFolder`, and related resources. Callers in +//! the `outlook_client` crate map these raw resources onto the provider-agnostic +//! service-layer structs in [`crate::email::service`]. +//! +//! Graph reference: + +pub mod delta; +pub mod error; +pub mod subscription; + +use serde::{Deserialize, Serialize}; + +/// A Microsoft Graph `emailAddress` complex type: `{ "name": ..., "address": ... }`. +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct EmailAddress { + #[serde(default)] + pub name: Option, + #[serde(default)] + pub address: Option, +} + +/// A Microsoft Graph `recipient` complex type, wrapping an [`EmailAddress`]. +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct Recipient { + #[serde(rename = "emailAddress", default)] + pub email_address: EmailAddress, +} + +/// The body of a message. `content_type` is either `"html"` or `"text"`. +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct ItemBody { + #[serde(rename = "contentType", default)] + pub content_type: String, + #[serde(default)] + pub content: String, +} + +/// A single internet message header (only populated when explicitly `$select`ed). +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct InternetMessageHeader { + pub name: String, + pub value: String, +} + +/// The follow-up flag on a message. We treat `flagStatus == "flagged"` as the +/// equivalent of a Gmail "starred" message. +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct FollowupFlag { + #[serde(rename = "flagStatus", default)] + pub flag_status: Option, +} + +/// A Microsoft Graph `message` resource. +/// +/// Fields are intentionally permissive (`Option` / `#[serde(default)]`) because +/// the exact set returned depends on the `$select` clause used by the caller. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MessageResource { + pub id: String, + /// Server-assigned id that changes whenever the message is modified. Useful + /// for detecting whether a cached copy is stale. + #[serde(rename = "changeKey", default)] + pub change_key: Option, + /// Groups messages into a conversation; the Outlook analogue of a Gmail thread id. + #[serde(rename = "conversationId", default)] + pub conversation_id: Option, + /// The globally-unique RFC 5322 `Message-ID` header value. + #[serde(rename = "internetMessageId", default)] + pub internet_message_id: Option, + #[serde(default)] + pub subject: Option, + /// Short preview of the body, used as the snippet. + #[serde(rename = "bodyPreview", default)] + pub body_preview: Option, + #[serde(default)] + pub body: Option, + #[serde(default)] + pub from: Option, + #[serde(rename = "toRecipients", default)] + pub to_recipients: Vec, + #[serde(rename = "ccRecipients", default)] + pub cc_recipients: Vec, + #[serde(rename = "bccRecipients", default)] + pub bcc_recipients: Vec, + #[serde(rename = "replyTo", default)] + pub reply_to: Vec, + #[serde(rename = "receivedDateTime", default)] + pub received_date_time: Option, + #[serde(rename = "sentDateTime", default)] + pub sent_date_time: Option, + #[serde(rename = "isRead", default)] + pub is_read: bool, + #[serde(rename = "isDraft", default)] + pub is_draft: bool, + #[serde(rename = "hasAttachments", default)] + pub has_attachments: bool, + /// Approximate size of the message in bytes. + #[serde(default)] + pub size: Option, + /// The id of the mail folder the message currently lives in. + #[serde(rename = "parentFolderId", default)] + pub parent_folder_id: Option, + /// User-applied categories — the closest Outlook analogue to Gmail user labels. + #[serde(default)] + pub categories: Vec, + #[serde(default)] + pub flag: Option, + #[serde(rename = "internetMessageHeaders", default)] + pub internet_message_headers: Vec, +} + +/// Minimal message projection (`$select=id,conversationId,parentFolderId,isRead`) +/// used where we only need ids / folder placement, analogous to Gmail's +/// `MinimalMessageResource`. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MinimalMessageResource { + pub id: String, + #[serde(rename = "conversationId", default)] + pub conversation_id: Option, + #[serde(rename = "parentFolderId", default)] + pub parent_folder_id: Option, + #[serde(rename = "isRead", default)] + pub is_read: bool, +} + +/// A page of messages returned by a list endpoint +/// (`GET /me/messages`, `GET /me/mailFolders/{id}/messages`, ...). +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MessageListResponse { + #[serde(default)] + pub value: Vec, + /// Opaque URL for the next page, if any. + #[serde(rename = "@odata.nextLink", default)] + pub next_link: Option, +} + +// -- Folders -- + +/// A Microsoft Graph `mailFolder` resource. The Outlook analogue of a Gmail label. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MailFolder { + pub id: String, + #[serde(rename = "displayName", default)] + pub display_name: Option, + /// Stable, locale-independent name for built-in folders, e.g. `"inbox"`, + /// `"sentitems"`, `"drafts"`, `"junkemail"`, `"deleteditems"`. Only present + /// when explicitly `$select`ed (`wellKnownName`). + #[serde(rename = "wellKnownName", default)] + pub well_known_name: Option, + #[serde(rename = "parentFolderId", default)] + pub parent_folder_id: Option, + #[serde(rename = "totalItemCount", default)] + pub total_item_count: Option, + #[serde(rename = "unreadItemCount", default)] + pub unread_item_count: Option, +} + +/// Response wrapper for `GET /me/mailFolders`. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MailFolderListResponse { + #[serde(default)] + pub value: Vec, + #[serde(rename = "@odata.nextLink", default)] + pub next_link: Option, +} + +// -- Attachments -- + +/// A Microsoft Graph `fileAttachment` (the `@odata.type` we support). Other +/// attachment kinds (item/reference) are ignored by the client. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FileAttachment { + pub id: String, + #[serde(default)] + pub name: Option, + #[serde(rename = "contentType", default)] + pub content_type: Option, + #[serde(default)] + pub size: i64, + #[serde(rename = "isInline", default)] + pub is_inline: bool, + #[serde(rename = "contentId", default)] + pub content_id: Option, + /// Base64-encoded attachment bytes (standard base64, not URL-safe). + #[serde(rename = "contentBytes", default)] + pub content_bytes: Option, +} + +/// Response wrapper for `GET /me/messages/{id}/attachments`. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AttachmentListResponse { + #[serde(default)] + pub value: Vec, +} + +// -- Profile -- + +/// Subset of the Graph `user` resource returned by `GET /me`. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct UserResource { + pub id: String, + /// Primary SMTP address. May be null for accounts without a mailbox. + #[serde(default)] + pub mail: Option, + #[serde(rename = "userPrincipalName", default)] + pub user_principal_name: Option, + #[serde(rename = "displayName", default)] + pub display_name: Option, +} + +// -- Sending -- + +/// Request body for `POST /me/sendMail`. +#[derive(Serialize, Debug, Clone)] +pub struct SendMailRequest { + pub message: OutgoingMessage, + #[serde(rename = "saveToSentItems")] + pub save_to_sent_items: bool, +} + +/// The message object embedded in a [`SendMailRequest`] or used to create a draft. +#[derive(Serialize, Debug, Clone, Default)] +pub struct OutgoingMessage { + pub subject: String, + pub body: ItemBody, + #[serde(rename = "toRecipients")] + pub to_recipients: Vec, + #[serde(rename = "ccRecipients", skip_serializing_if = "Vec::is_empty")] + pub cc_recipients: Vec, + #[serde(rename = "bccRecipients", skip_serializing_if = "Vec::is_empty")] + pub bcc_recipients: Vec, + #[serde(rename = "attachments", skip_serializing_if = "Vec::is_empty")] + pub attachments: Vec, + /// Custom internet message headers (e.g. `In-Reply-To`, `References`). Header + /// names sent via Graph must be prefixed with `x-` unless they are standard. + #[serde(rename = "internetMessageHeaders", skip_serializing_if = "Vec::is_empty")] + pub internet_message_headers: Vec, +} + +/// A `#microsoft.graph.fileAttachment` to send with an [`OutgoingMessage`]. +#[derive(Serialize, Debug, Clone)] +pub struct OutgoingAttachment { + #[serde(rename = "@odata.type")] + pub odata_type: String, + pub name: String, + #[serde(rename = "contentType")] + pub content_type: String, + /// Base64-encoded bytes (standard base64). + #[serde(rename = "contentBytes")] + pub content_bytes: String, +} + +impl OutgoingAttachment { + /// Build a file attachment from raw bytes, base64-encoding the content. + pub fn file(name: String, content_type: String, content_bytes: String) -> Self { + Self { + odata_type: "#microsoft.graph.fileAttachment".to_string(), + name, + content_type, + content_bytes, + } + } +} + +/// Helpers for mapping Outlook well-known folders onto the provider-agnostic +/// system labels in [`crate::email::service::label::system_labels`]. +pub mod well_known_folder { + use crate::email::service::label::system_labels; + + /// Map a Graph `wellKnownName` (e.g. `"inbox"`, `"sentitems"`) onto a + /// provider-agnostic system label id, if one exists. + /// + /// Returns `None` for user-created folders and for well-known folders that + /// have no system-label analogue (e.g. `"archive"`, `"outbox"`). + pub fn to_system_label(well_known_name: &str) -> Option<&'static str> { + match well_known_name.to_ascii_lowercase().as_str() { + "inbox" => Some(system_labels::INBOX), + "sentitems" => Some(system_labels::SENT), + "drafts" => Some(system_labels::DRAFT), + "junkemail" => Some(system_labels::SPAM), + "deleteditems" => Some(system_labels::TRASH), + _ => None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::email::service::label::system_labels; + + #[test] + fn deserializes_graph_message() { + let json = r#"{ + "id": "AAMkAGI2", + "changeKey": "CQAAABYA", + "conversationId": "AAQkAGI2", + "internetMessageId": "", + "subject": "Hello", + "bodyPreview": "preview text", + "body": { "contentType": "html", "content": "

hi

" }, + "from": { "emailAddress": { "name": "Alice", "address": "alice@contoso.com" } }, + "toRecipients": [ { "emailAddress": { "name": "Bob", "address": "bob@contoso.com" } } ], + "receivedDateTime": "2026-06-01T12:00:00Z", + "sentDateTime": "2026-06-01T11:59:00Z", + "isRead": false, + "isDraft": false, + "hasAttachments": true, + "parentFolderId": "inboxfolderid", + "categories": ["Work"], + "flag": { "flagStatus": "flagged" } + }"#; + + let msg: MessageResource = serde_json::from_str(json).unwrap(); + assert_eq!(msg.id, "AAMkAGI2"); + assert_eq!(msg.conversation_id.as_deref(), Some("AAQkAGI2")); + assert_eq!(msg.internet_message_id.as_deref(), Some("")); + assert!(!msg.is_read); + assert!(msg.has_attachments); + assert_eq!(msg.from.unwrap().email_address.address.as_deref(), Some("alice@contoso.com")); + assert_eq!(msg.to_recipients.len(), 1); + assert_eq!(msg.categories, vec!["Work".to_string()]); + assert_eq!(msg.flag.unwrap().flag_status.as_deref(), Some("flagged")); + } + + #[test] + fn deserializes_message_list_with_next_link() { + let json = r#"{ + "value": [ { "id": "1" }, { "id": "2" } ], + "@odata.nextLink": "https://graph.microsoft.com/v1.0/me/messages?$skip=2" + }"#; + let list: MessageListResponse = serde_json::from_str(json).unwrap(); + assert_eq!(list.value.len(), 2); + assert!(list.next_link.is_some()); + } + + #[test] + fn maps_well_known_folders_to_system_labels() { + assert_eq!(well_known_folder::to_system_label("inbox"), Some(system_labels::INBOX)); + assert_eq!(well_known_folder::to_system_label("SentItems"), Some(system_labels::SENT)); + assert_eq!(well_known_folder::to_system_label("drafts"), Some(system_labels::DRAFT)); + assert_eq!(well_known_folder::to_system_label("junkemail"), Some(system_labels::SPAM)); + assert_eq!(well_known_folder::to_system_label("deleteditems"), Some(system_labels::TRASH)); + assert_eq!(well_known_folder::to_system_label("archive"), None); + assert_eq!(well_known_folder::to_system_label("MyCustomFolder"), None); + } +} diff --git a/rust/cloud-storage/models_email/src/outlook/subscription.rs b/rust/cloud-storage/models_email/src/outlook/subscription.rs new file mode 100644 index 0000000000..7345a8a797 --- /dev/null +++ b/rust/cloud-storage/models_email/src/outlook/subscription.rs @@ -0,0 +1,125 @@ +//! Microsoft Graph change-notification (webhook) models. +//! +//! This is the Outlook analogue of the Gmail `watch` + Pub/Sub push flow +//! ([`crate::gmail::WatchRequest`] / [`crate::gmail::inbox_sync`]). +//! +//! Unlike Gmail (which authenticates push messages with a Google-signed JWT), +//! Graph uses two mechanisms: +//! 1. A **validation handshake**: when a subscription is created Graph sends a +//! request with a `validationToken` query param that the endpoint must echo +//! back as `text/plain` within 10 seconds. +//! 2. A per-notification **`clientState`** secret that we set at creation time +//! and verify on every notification. +//! +//! Reference: + +use serde::{Deserialize, Serialize}; + +/// Request body for `POST /subscriptions`. +#[derive(Serialize, Debug, Clone)] +pub struct CreateSubscriptionRequest { + /// Comma-separated list of change types, e.g. `"created,updated,deleted"`. + #[serde(rename = "changeType")] + pub change_type: String, + /// HTTPS endpoint Graph will POST notifications to. + #[serde(rename = "notificationUrl")] + pub notification_url: String, + /// The resource to watch, e.g. `"/me/mailFolders('inbox')/messages"`. + pub resource: String, + /// ISO-8601 UTC expiry. For message resources the max is ~3 days out. + #[serde(rename = "expirationDateTime")] + pub expiration_date_time: String, + /// Opaque secret echoed back in every notification for verification. + #[serde(rename = "clientState")] + pub client_state: String, +} + +/// Request body for renewing a subscription (`PATCH /subscriptions/{id}`). +#[derive(Serialize, Debug, Clone)] +pub struct RenewSubscriptionRequest { + #[serde(rename = "expirationDateTime")] + pub expiration_date_time: String, +} + +/// The `subscription` resource returned by Graph on create / renew. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Subscription { + pub id: String, + #[serde(rename = "expirationDateTime", default)] + pub expiration_date_time: Option, + #[serde(default)] + pub resource: Option, + #[serde(rename = "changeType", default)] + pub change_type: Option, +} + +/// A batch of change notifications POSTed to our webhook. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChangeNotificationCollection { + #[serde(default)] + pub value: Vec, +} + +/// A single change notification. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ChangeNotification { + /// The id of the subscription that produced this notification. + #[serde(rename = "subscriptionId")] + pub subscription_id: String, + /// The secret we provided at subscription-creation time. Must be verified. + #[serde(rename = "clientState", default)] + pub client_state: Option, + /// `"created"`, `"updated"`, or `"deleted"`. + #[serde(rename = "changeType")] + pub change_type: String, + /// The resource path that changed, e.g. `"Users/{uid}/Messages/{mid}"`. + #[serde(default)] + pub resource: Option, + /// Lightweight identifier for the changed resource. + #[serde(rename = "resourceData", default)] + pub resource_data: Option, +} + +/// The `resourceData` of a change notification — enough to fetch the full item. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ResourceData { + /// The provider id of the changed message. + #[serde(default)] + pub id: Option, + #[serde(rename = "@odata.type", default)] + pub odata_type: Option, + #[serde(rename = "@odata.id", default)] + pub odata_id: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_change_notification_collection() { + let json = r##"{ + "value": [ + { + "subscriptionId": "sub-123", + "clientState": "secretvalue", + "changeType": "created", + "resource": "Users/abc/Messages/msg-1", + "resourceData": { + "id": "msg-1", + "@odata.type": "#Microsoft.Graph.Message", + "@odata.id": "Users/abc/Messages/msg-1" + } + } + ] + }"##; + + let parsed: ChangeNotificationCollection = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.value.len(), 1); + let n = &parsed.value[0]; + assert_eq!(n.subscription_id, "sub-123"); + assert_eq!(n.client_state.as_deref(), Some("secretvalue")); + assert_eq!(n.change_type, "created"); + assert_eq!(n.resource_data.as_ref().unwrap().id.as_deref(), Some("msg-1")); + } +} diff --git a/rust/cloud-storage/outlook_client/Cargo.toml b/rust/cloud-storage/outlook_client/Cargo.toml new file mode 100644 index 0000000000..bcc8e2ef17 --- /dev/null +++ b/rust/cloud-storage/outlook_client/Cargo.toml @@ -0,0 +1,22 @@ +[package] +edition = "2024" +name = "outlook_client" +publish = false +version = "0.1.0" + +[features] +outlook_test = [] + +[dependencies] +anyhow = { workspace = true } +base64 = { workspace = true } +chrono = { workspace = true } +macro_uuid = { path = "../macro_uuid" } +mockall = { workspace = true } +models_email = { path = "../models_email" } +reqwest = { workspace = true, features = ["stream"] } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } diff --git a/rust/cloud-storage/outlook_client/src/attachments.rs b/rust/cloud-storage/outlook_client/src/attachments.rs new file mode 100644 index 0000000000..89f5a547dd --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/attachments.rs @@ -0,0 +1,33 @@ +//! Attachment download from Microsoft Graph. + +use crate::OutlookClient; +use base64::Engine; +use base64::engine::general_purpose::STANDARD; +use models_email::outlook::error::OutlookError; +use models_email::outlook::FileAttachment; + +/// Download and decode the bytes of a single file attachment. +/// +/// Graph returns file-attachment content as standard (not URL-safe) base64 in +/// the `contentBytes` field. +pub(crate) async fn get_attachment_data( + client: &OutlookClient, + access_token: &str, + message_id: &str, + attachment_id: &str, +) -> Result, OutlookError> { + let url = format!( + "{}/me/messages/{}/attachments/{}", + client.base_url, message_id, attachment_id + ); + + let attachment: FileAttachment = client.graph_get(access_token, &url).await?; + + let content = attachment.content_bytes.ok_or_else(|| { + OutlookError::BodyReadError("attachment response missing contentBytes".to_string()) + })?; + + STANDARD + .decode(content) + .map_err(|e| OutlookError::BodyReadError(format!("failed to decode attachment base64: {e}"))) +} diff --git a/rust/cloud-storage/outlook_client/src/convert.rs b/rust/cloud-storage/outlook_client/src/convert.rs new file mode 100644 index 0000000000..90fb96f660 --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/convert.rs @@ -0,0 +1,317 @@ +//! Pure mappers from Microsoft Graph resources to the provider-agnostic +//! service-layer structs in `models_email::email::service`. +//! +//! This is the Outlook analogue of `email_service::convert`. Because Graph +//! returns already-structured JSON (rather than a raw MIME payload like Gmail), +//! the mapping is straightforward and has no I/O, which makes it easy to unit +//! test in isolation. + +use chrono::{DateTime, Utc}; +use macro_uuid::generate_uuid_v7; +use models_email::email::service; +use models_email::email::service::label::system_labels; +use models_email::outlook::{MessageResource, Recipient}; +use uuid::Uuid; + +/// Map a Graph [`Recipient`] to a service [`ContactInfo`](service::address::ContactInfo). +/// +/// Recipients with no address are dropped by the plural helper; this returns +/// `None` in that case so callers can filter. +fn recipient_to_contact(recipient: &Recipient) -> Option { + let email = recipient.email_address.address.as_ref()?; + if email.is_empty() { + return None; + } + Some(service::address::ContactInfo { + email: email.to_lowercase(), + name: recipient.email_address.name.clone(), + photo_url: None, + }) +} + +fn recipients_to_contacts(recipients: &[Recipient]) -> Vec { + recipients.iter().filter_map(recipient_to_contact).collect() +} + +/// Parse an ISO-8601 / RFC-3339 timestamp as returned by Graph +/// (e.g. `"2026-06-01T12:00:00Z"`). +fn parse_graph_datetime(value: &Option) -> Option> { + let s = value.as_ref()?; + DateTime::parse_from_rfc3339(s) + .ok() + .map(|dt| dt.with_timezone(&Utc)) +} + +/// Build the provider-agnostic label set for a message. +/// +/// Outlook has no per-message label list like Gmail. We synthesize one from: +/// - read state (`UNREAD` when the message is unread), +/// - the follow-up flag (`STARRED` when flagged), +/// - draft state (`DRAFT`), +/// - the system label for the message's parent folder, if the caller resolved +/// one (e.g. `INBOX`, `SENT`, `SPAM`, `TRASH`), +/// - each user `category` as a user label (its name doubles as its id). +fn build_labels( + message: &MessageResource, + link_id: Uuid, + folder_system_label: Option<&str>, +) -> Vec { + let mut provider_label_ids: Vec = Vec::new(); + + if !message.is_read { + provider_label_ids.push(system_labels::UNREAD.to_string()); + } + if message + .flag + .as_ref() + .and_then(|f| f.flag_status.as_deref()) + .is_some_and(|s| s.eq_ignore_ascii_case("flagged")) + { + provider_label_ids.push(system_labels::STARRED.to_string()); + } + if message.is_draft { + provider_label_ids.push(system_labels::DRAFT.to_string()); + } + if let Some(system_label) = folder_system_label { + provider_label_ids.push(system_label.to_string()); + } + for category in &message.categories { + provider_label_ids.push(category.clone()); + } + + provider_label_ids + .into_iter() + .map(|id| service::label::Label { + id: None, + link_id, + provider_label_id: id, + name: None, + created_at: Default::default(), + message_list_visibility: None, + label_list_visibility: None, + type_: None, + }) + .collect() +} + +/// Map a Graph [`MessageResource`] to a service-layer [`Message`](service::message::Message). +/// +/// `folder_system_label` is the system label the caller resolved for the +/// message's `parentFolderId` (via a folder lookup), if any. Attachment bytes +/// are fetched separately, so only `has_attachments` is populated here. +#[tracing::instrument(skip(message), fields(message_id = %message.id), level = "debug")] +pub fn map_message_resource_to_service( + message: MessageResource, + link_id: Uuid, + folder_system_label: Option<&str>, +) -> service::message::Message { + let internal_date_ts = parse_graph_datetime(&message.received_date_time); + let sent_at = parse_graph_datetime(&message.sent_date_time).or(internal_date_ts); + + let is_sent = folder_system_label == Some(system_labels::SENT); + let labels = build_labels(&message, link_id, folder_system_label); + + let (body_text, body_html_sanitized) = match &message.body { + Some(body) if body.content_type.eq_ignore_ascii_case("html") => { + (None, Some(body.content.clone())) + } + Some(body) => (Some(body.content.clone()), None), + None => (None, None), + }; + + let headers_json = if message.internet_message_headers.is_empty() { + None + } else { + serde_json::to_value( + message + .internet_message_headers + .iter() + .map(|h| serde_json::json!({ "name": h.name, "value": h.value })) + .collect::>(), + ) + .ok() + }; + + let from = message.from.as_ref().and_then(recipient_to_contact); + + service::message::Message { + db_id: generate_uuid_v7(), + provider_id: Some(message.id), + thread_db_id: generate_uuid_v7(), + provider_thread_id: message.conversation_id, + replying_to_id: None, + global_id: message.internet_message_id, + link_id, + subject: message.subject, + snippet: message.body_preview, + // Outlook has no monotonic per-message history id; delta links live on + // the link, not the message. + provider_history_id: None, + internal_date_ts, + sent_at, + size_estimate: message.size, + is_read: message.is_read, + is_starred: message + .flag + .as_ref() + .and_then(|f| f.flag_status.as_deref()) + .is_some_and(|s| s.eq_ignore_ascii_case("flagged")), + is_sent, + is_draft: message.is_draft, + scheduled_send_time: None, + has_attachments: message.has_attachments, + from, + to: recipients_to_contacts(&message.to_recipients), + cc: recipients_to_contacts(&message.cc_recipients), + bcc: recipients_to_contacts(&message.bcc_recipients), + labels, + body_text, + body_html_sanitized, + body_macro: None, + attachments: Vec::new(), + attachments_draft: Vec::new(), + attachments_forwarded: Vec::new(), + headers_json, + created_at: Utc::now(), + updated_at: Utc::now(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use models_email::outlook::{EmailAddress, FollowupFlag, ItemBody}; + + fn sample_message() -> MessageResource { + MessageResource { + id: "msg-1".to_string(), + change_key: Some("ck".to_string()), + conversation_id: Some("conv-1".to_string()), + internet_message_id: Some("".to_string()), + subject: Some("Hello".to_string()), + body_preview: Some("preview".to_string()), + body: Some(ItemBody { + content_type: "html".to_string(), + content: "

hi

".to_string(), + }), + from: Some(Recipient { + email_address: EmailAddress { + name: Some("Alice".to_string()), + address: Some("Alice@Contoso.com".to_string()), + }, + }), + to_recipients: vec![Recipient { + email_address: EmailAddress { + name: Some("Bob".to_string()), + address: Some("bob@contoso.com".to_string()), + }, + }], + cc_recipients: vec![], + bcc_recipients: vec![], + reply_to: vec![], + received_date_time: Some("2026-06-01T12:00:00Z".to_string()), + sent_date_time: Some("2026-06-01T11:59:00Z".to_string()), + is_read: false, + is_draft: false, + has_attachments: true, + size: Some(2048), + parent_folder_id: Some("inbox-folder".to_string()), + categories: vec!["Work".to_string()], + flag: Some(FollowupFlag { + flag_status: Some("flagged".to_string()), + }), + internet_message_headers: vec![], + } + } + + #[test] + fn maps_core_fields() { + let msg = map_message_resource_to_service( + sample_message(), + Uuid::nil(), + Some(system_labels::INBOX), + ); + + assert_eq!(msg.provider_id.as_deref(), Some("msg-1")); + assert_eq!(msg.provider_thread_id.as_deref(), Some("conv-1")); + assert_eq!(msg.global_id.as_deref(), Some("")); + assert_eq!(msg.subject.as_deref(), Some("Hello")); + assert_eq!(msg.snippet.as_deref(), Some("preview")); + assert_eq!(msg.size_estimate, Some(2048)); + assert!(msg.has_attachments); + assert!(!msg.is_read); + assert!(msg.is_starred); + assert!(!msg.is_sent); + } + + #[test] + fn lowercases_addresses_and_keeps_names() { + let msg = map_message_resource_to_service(sample_message(), Uuid::nil(), None); + let from = msg.from.unwrap(); + assert_eq!(from.email, "alice@contoso.com"); + assert_eq!(from.name.as_deref(), Some("Alice")); + assert_eq!(msg.to.len(), 1); + assert_eq!(msg.to[0].email, "bob@contoso.com"); + } + + #[test] + fn html_body_goes_to_sanitized_text_body_empty() { + let msg = map_message_resource_to_service(sample_message(), Uuid::nil(), None); + assert_eq!(msg.body_html_sanitized.as_deref(), Some("

hi

")); + assert!(msg.body_text.is_none()); + } + + #[test] + fn text_body_goes_to_body_text() { + let mut raw = sample_message(); + raw.body = Some(ItemBody { + content_type: "text".to_string(), + content: "plain text".to_string(), + }); + let msg = map_message_resource_to_service(raw, Uuid::nil(), None); + assert_eq!(msg.body_text.as_deref(), Some("plain text")); + assert!(msg.body_html_sanitized.is_none()); + } + + #[test] + fn synthesizes_labels_from_flags_folder_and_categories() { + let msg = map_message_resource_to_service( + sample_message(), + Uuid::nil(), + Some(system_labels::INBOX), + ); + let ids: Vec<&str> = msg + .labels + .iter() + .map(|l| l.provider_label_id.as_str()) + .collect(); + assert!(ids.contains(&system_labels::UNREAD)); + assert!(ids.contains(&system_labels::STARRED)); + assert!(ids.contains(&system_labels::INBOX)); + assert!(ids.contains(&"Work")); + assert!(!ids.contains(&system_labels::DRAFT)); + } + + #[test] + fn sent_folder_marks_message_sent() { + let msg = map_message_resource_to_service( + sample_message(), + Uuid::nil(), + Some(system_labels::SENT), + ); + assert!(msg.is_sent); + } + + #[test] + fn parses_timestamps() { + let msg = map_message_resource_to_service(sample_message(), Uuid::nil(), None); + assert_eq!( + msg.internal_date_ts.unwrap().to_rfc3339(), + "2026-06-01T12:00:00+00:00" + ); + assert_eq!( + msg.sent_at.unwrap().to_rfc3339(), + "2026-06-01T11:59:00+00:00" + ); + } +} diff --git a/rust/cloud-storage/outlook_client/src/delta.rs b/rust/cloud-storage/outlook_client/src/delta.rs new file mode 100644 index 0000000000..db08967c1d --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/delta.rs @@ -0,0 +1,62 @@ +//! Delta-query sync against Microsoft Graph. +//! +//! Drives the [delta query] loop: walk every `@odata.nextLink` page, accumulate +//! created/updated vs removed message ids, and capture the terminal +//! `@odata.deltaLink` to persist for the next incremental sync. This is the +//! Outlook analogue of `gmail_client::history`. +//! +//! [delta query]: https://learn.microsoft.com/en-us/graph/delta-query-messages + +use crate::OutlookClient; +use models_email::outlook::delta::{DeltaChanges, DeltaResponse}; +use models_email::outlook::error::OutlookError; + +/// Build the URL that starts a fresh delta enumeration of a folder. +/// +/// We only `$select` ids/state here: a delta page can be large, and the worker +/// re-fetches full bodies for the ids it decides to upsert. `parentFolderId` is +/// included so the worker can resolve the message's folder → system label. +pub(crate) fn initial_delta_url(base_url: &str, folder_id: &str) -> String { + format!( + "{}/me/mailFolders/{}/messages/delta?$select=id,conversationId,parentFolderId,isRead", + base_url, folder_id + ) +} + +/// Walk a delta sync to completion starting from `start_url` (either an initial +/// delta URL or a persisted `@odata.deltaLink`) and return the aggregated changes. +pub(crate) async fn run_delta( + client: &OutlookClient, + access_token: &str, + start_url: &str, +) -> Result { + let mut changes = DeltaChanges::default(); + let mut next_url = Some(start_url.to_string()); + + while let Some(url) = next_url.take() { + let page: DeltaResponse = client.graph_get(access_token, &url).await?; + + for item in page.value { + let id = item.message.id.clone(); + if id.is_empty() { + continue; + } + if item.is_removed() { + // A message can be both updated and later removed within one + // sync window; deletion wins. + changes.message_ids_to_upsert.remove(&id); + changes.message_ids_to_delete.insert(id); + } else if !changes.message_ids_to_delete.contains(&id) { + changes.message_ids_to_upsert.insert(id); + } + } + + if let Some(delta_link) = page.delta_link { + changes.delta_link = Some(delta_link); + break; + } + next_url = page.next_link; + } + + Ok(changes) +} diff --git a/rust/cloud-storage/outlook_client/src/folders.rs b/rust/cloud-storage/outlook_client/src/folders.rs new file mode 100644 index 0000000000..b7a17a88fa --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/folders.rs @@ -0,0 +1,126 @@ +//! Mail-folder operations against Microsoft Graph. +//! +//! Outlook mail folders are the closest analogue to Gmail labels for the +//! purposes of our label model: a message lives in exactly one folder, and we +//! map the well-known folders (`inbox`, `sentitems`, ...) onto the +//! provider-agnostic system labels. + +use crate::OutlookClient; +use chrono::Utc; +use models_email::email::service::label::{ + Label, LabelListVisibility, LabelType, MessageListVisibility, +}; +use models_email::outlook::error::OutlookError; +use models_email::outlook::well_known_folder; +use models_email::outlook::{MailFolder, MailFolderListResponse}; +use uuid::Uuid; + +const FOLDER_SELECT: &str = + "id,displayName,wellKnownName,parentFolderId,totalItemCount,unreadItemCount"; +const PAGE_SIZE: u32 = 100; + +/// List the user's mail folders, following pagination. Hidden folders are +/// included so well-known system folders are always present. +pub(crate) async fn list_folders( + client: &OutlookClient, + access_token: &str, +) -> Result, OutlookError> { + let mut next_url = Some(format!( + "{}/me/mailFolders?$select={}&$top={}&includeHiddenFolders=true", + client.base_url, FOLDER_SELECT, PAGE_SIZE + )); + + let mut folders = Vec::new(); + while let Some(url) = next_url.take() { + let page: MailFolderListResponse = client.graph_get(access_token, &url).await?; + folders.extend(page.value); + next_url = page.next_link; + } + + Ok(folders) +} + +/// List the user's folders mapped to service labels, ready to persist. +/// +/// A well-known folder is emitted as a `System` label whose `provider_label_id` +/// is the provider-agnostic system label (e.g. `INBOX`); a user folder is +/// emitted as a `User` label keyed by its Graph folder id. +pub(crate) async fn fetch_user_labels( + client: &OutlookClient, + access_token: &str, + link_id: Uuid, +) -> Result, OutlookError> { + let folders = list_folders(client, access_token).await?; + Ok(folders + .into_iter() + .map(|folder| folder_to_label(folder, link_id)) + .collect()) +} + +fn folder_to_label(folder: MailFolder, link_id: Uuid) -> Label { + let system_label = folder + .well_known_name + .as_deref() + .and_then(well_known_folder::to_system_label); + + let (provider_label_id, name, type_) = match system_label { + Some(system) => ( + system.to_string(), + folder.display_name, + LabelType::System, + ), + None => ( + folder.id, + folder.display_name, + LabelType::User, + ), + }; + + Label { + id: None, + link_id, + provider_label_id, + name, + created_at: Utc::now(), + message_list_visibility: Some(MessageListVisibility::Show), + label_list_visibility: Some(LabelListVisibility::LabelShow), + type_: Some(type_), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use models_email::email::service::label::system_labels; + + #[test] + fn well_known_folder_maps_to_system_label() { + let folder = MailFolder { + id: "raw-inbox-id".to_string(), + display_name: Some("Inbox".to_string()), + well_known_name: Some("inbox".to_string()), + parent_folder_id: None, + total_item_count: Some(10), + unread_item_count: Some(2), + }; + let label = folder_to_label(folder, Uuid::nil()); + assert_eq!(label.provider_label_id, system_labels::INBOX); + assert_eq!(label.type_, Some(LabelType::System)); + assert_eq!(label.name.as_deref(), Some("Inbox")); + } + + #[test] + fn user_folder_keeps_graph_id() { + let folder = MailFolder { + id: "AAMkUserFolderId".to_string(), + display_name: Some("Receipts".to_string()), + well_known_name: None, + parent_folder_id: Some("parent".to_string()), + total_item_count: None, + unread_item_count: None, + }; + let label = folder_to_label(folder, Uuid::nil()); + assert_eq!(label.provider_label_id, "AAMkUserFolderId"); + assert_eq!(label.type_, Some(LabelType::User)); + } +} diff --git a/rust/cloud-storage/outlook_client/src/lib.rs b/rust/cloud-storage/outlook_client/src/lib.rs new file mode 100644 index 0000000000..21bab42a8a --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/lib.rs @@ -0,0 +1,317 @@ +//! A thin client over the [Microsoft Graph] mail API, used to integrate Outlook +//! inboxes into the email service. +//! +//! This is the Outlook analogue of the `gmail_client` crate: it exposes the same +//! kinds of operations (fetch messages/threads, incremental sync, manage +//! folders/labels, send, attachments, push-notification subscriptions) but +//! speaks Graph instead of the Gmail REST API. Where it's natural, methods +//! return the provider-agnostic structs from `models_email::email::service` so +//! the rest of the service can treat providers uniformly; lower-level methods +//! return the raw Graph resources from [`models_email::outlook`]. +//! +//! Authentication is delegated to the caller: every request takes an OAuth 2.0 +//! access token (minted by the authentication service from the user's linked +//! Microsoft refresh token). +//! +//! [Microsoft Graph]: https://learn.microsoft.com/en-us/graph/api/resources/mail-api-overview + +pub mod convert; + +pub(crate) mod attachments; +pub(crate) mod delta; +pub(crate) mod folders; +pub(crate) mod messages; +pub(crate) mod profile; +pub(crate) mod subscriptions; +pub(crate) mod threads; + +#[allow(unused_imports)] +use mockall::automock; +use models_email::email::service; +use models_email::email::service::address::ContactInfo; +use models_email::email::service::message; +pub use models_email::outlook::error::OutlookError; +use models_email::outlook::{ + MailFolder, MessageResource, delta::DeltaChanges, subscription::Subscription, +}; +use serde::de::DeserializeOwned; + +/// Default Microsoft Graph v1.0 base URL. +const DEFAULT_GRAPH_BASE_URL: &str = "https://graph.microsoft.com/v1.0"; + +/// Client for the Microsoft Graph mail API. +#[derive(Clone, Debug)] +pub struct OutlookClient { + /// The inner HTTP client used to make requests. + inner: reqwest::Client, + /// The base url for the Graph API (e.g. `https://graph.microsoft.com/v1.0`). + base_url: String, + /// The HTTPS endpoint Graph posts change notifications to when we create a + /// subscription on a user's mailbox. + notification_url: String, + /// Opaque secret we set as the subscription `clientState` and verify on every + /// incoming notification. + client_state: String, +} + +impl OutlookClient { + /// Create a new client. + /// + /// * `notification_url` — the public HTTPS URL Graph should deliver change + /// notifications to (our webhook). + /// * `client_state` — a secret used to authenticate incoming notifications. + pub fn new(notification_url: String, client_state: String) -> Self { + Self { + inner: reqwest::Client::new(), + base_url: DEFAULT_GRAPH_BASE_URL.to_string(), + notification_url, + client_state, + } + } + + /// Override the Graph base url (used in tests against a mock server). + pub fn with_base_url(mut self, base_url: String) -> Self { + self.base_url = base_url; + self + } + + // ---- shared request helpers --------------------------------------------- + + /// Issue an authenticated `GET` and deserialize the JSON body, mapping + /// non-success statuses to [`OutlookError`]. + pub(crate) async fn graph_get( + &self, + access_token: &str, + url: &str, + ) -> Result { + let response = self + .inner + .get(url) + .bearer_auth(access_token) + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + + Self::deserialize_success(response).await + } + + /// Like [`graph_get`](Self::graph_get) but maps a `404` to `Ok(None)`. + pub(crate) async fn graph_get_opt( + &self, + access_token: &str, + url: &str, + ) -> Result, OutlookError> { + match self.graph_get::(access_token, url).await { + Ok(value) => Ok(Some(value)), + Err(OutlookError::NotFound(_)) => Ok(None), + Err(e) => Err(e), + } + } + + /// Read a response, returning the deserialized body on success or the + /// appropriate [`OutlookError`] otherwise. + pub(crate) async fn deserialize_success( + response: reqwest::Response, + ) -> Result { + let status = response.status(); + if status.is_success() { + response + .json::() + .await + .map_err(|e| OutlookError::BodyReadError(e.to_string())) + } else { + let body = response + .text() + .await + .unwrap_or_else(|_| "Failed to read error body".to_string()); + Err(OutlookError::from_status(status.as_u16(), body)) + } + } + + /// Ensure a response was a success, discarding its body. + pub(crate) async fn ensure_success(response: reqwest::Response) -> Result<(), OutlookError> { + let status = response.status(); + if status.is_success() { + Ok(()) + } else { + let body = response + .text() + .await + .unwrap_or_else(|_| "Failed to read error body".to_string()); + Err(OutlookError::from_status(status.as_u16(), body)) + } + } + + // ---- messages ----------------------------------------------------------- + + /// Fetch a single message by its Graph id, returning the raw resource. + /// Returns `None` if the message no longer exists. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn get_message( + &self, + access_token: &str, + message_id: &str, + ) -> Result, OutlookError> { + messages::get_message(self, access_token, message_id).await + } + + /// Send a message. When `parent_provider_message_id` is set the message is + /// sent as a reply within the existing conversation (preserving threading); + /// otherwise it's sent as a brand new message. On success `message.provider_id` + /// and `message.provider_thread_id` are populated. + #[tracing::instrument( + skip(self, access_token, message, from_contact), + fields(link_id = %message.link_id), + err + )] + pub async fn send_message( + &self, + access_token: &str, + message: &mut message::MessageToSend, + from_contact: &ContactInfo, + parent_provider_message_id: Option, + ) -> Result<(), OutlookError> { + messages::send_message(self, access_token, message, from_contact, parent_provider_message_id) + .await + } + + // ---- threads (conversations) -------------------------------------------- + + /// Fetch every message belonging to a conversation (the Outlook analogue of + /// a Gmail thread), ordered oldest-first. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn get_conversation_messages( + &self, + access_token: &str, + conversation_id: &str, + ) -> Result, OutlookError> { + threads::get_conversation_messages(self, access_token, conversation_id).await + } + + // ---- incremental sync (delta) ------------------------------------------- + + /// Begin a delta sync of a mail folder, returning the changes seen so far + /// plus a fresh `@odata.deltaLink` to persist. Pass the well-known inbox id + /// (or any folder id) to scope the sync. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn initial_delta( + &self, + access_token: &str, + folder_id: &str, + ) -> Result { + delta::run_delta(self, access_token, &delta::initial_delta_url(&self.base_url, folder_id)) + .await + } + + /// Continue a delta sync from a previously persisted `@odata.deltaLink`. + #[tracing::instrument(skip(self, access_token, delta_link), err)] + pub async fn delta_from_link( + &self, + access_token: &str, + delta_link: &str, + ) -> Result { + delta::run_delta(self, access_token, delta_link).await + } + + // ---- folders (labels) --------------------------------------------------- + + /// List the user's mail folders (the Outlook analogue of Gmail labels). + #[tracing::instrument(skip(self, access_token), err)] + pub async fn list_folders( + &self, + access_token: &str, + ) -> Result, OutlookError> { + folders::list_folders(self, access_token).await + } + + /// List the user's mail folders mapped to service labels, ready to persist. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn fetch_user_labels( + &self, + access_token: &str, + link_id: uuid::Uuid, + ) -> Result, OutlookError> { + folders::fetch_user_labels(self, access_token, link_id).await + } + + // ---- attachments -------------------------------------------------------- + + /// Download the bytes of a file attachment. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn get_attachment_data( + &self, + access_token: &str, + message_id: &str, + attachment_id: &str, + ) -> Result, OutlookError> { + attachments::get_attachment_data(self, access_token, message_id, attachment_id).await + } + + // ---- subscriptions (push notifications) --------------------------------- + + /// Create a change-notification subscription on the user's inbox (the + /// Outlook analogue of registering a Gmail watch). Returns the created + /// subscription, whose id should be persisted for renewal/teardown. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn create_subscription( + &self, + access_token: &str, + ) -> Result { + subscriptions::create_subscription(self, access_token).await + } + + /// Renew an existing subscription's expiry (subscriptions are short-lived + /// and must be renewed periodically). + #[tracing::instrument(skip(self, access_token), err)] + pub async fn renew_subscription( + &self, + access_token: &str, + subscription_id: &str, + ) -> Result { + subscriptions::renew_subscription(self, access_token, subscription_id).await + } + + /// Delete a subscription (the Outlook analogue of stopping a Gmail watch). + #[tracing::instrument(skip(self, access_token), err)] + pub async fn delete_subscription( + &self, + access_token: &str, + subscription_id: &str, + ) -> Result<(), OutlookError> { + subscriptions::delete_subscription(self, access_token, subscription_id).await + } + + /// Verify that a notification's `clientState` matches the secret we set when + /// creating the subscription. Returns `true` when it matches. + /// + /// This is the Outlook analogue of verifying the Google-signed JWT on Gmail + /// Pub/Sub pushes. + pub fn verify_client_state(&self, client_state: Option<&str>) -> bool { + client_state == Some(self.client_state.as_str()) + } + + // ---- profile ------------------------------------------------------------ + + /// Fetch the signed-in user's profile (`GET /me`), used to resolve the + /// primary SMTP address of a freshly-linked inbox. + #[tracing::instrument(skip(self, access_token), err)] + pub async fn get_profile( + &self, + access_token: &str, + ) -> Result { + profile::get_profile(self, access_token).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn verify_client_state_matches_secret() { + let client = OutlookClient::new("https://example.com/webhook".into(), "s3cr3t".into()); + assert!(client.verify_client_state(Some("s3cr3t"))); + assert!(!client.verify_client_state(Some("wrong"))); + assert!(!client.verify_client_state(None)); + } +} diff --git a/rust/cloud-storage/outlook_client/src/messages.rs b/rust/cloud-storage/outlook_client/src/messages.rs new file mode 100644 index 0000000000..49517dbb04 --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/messages.rs @@ -0,0 +1,174 @@ +//! Message fetch and send operations against Microsoft Graph. + +use crate::OutlookClient; +use base64::Engine; +use base64::engine::general_purpose::STANDARD; +use models_email::email::service::address::ContactInfo; +use models_email::email::service::message::MessageToSend; +use models_email::outlook::error::OutlookError; +use models_email::outlook::{ + EmailAddress, ItemBody, MessageResource, OutgoingAttachment, OutgoingMessage, Recipient, +}; + +/// Fetch a single message by id. Returns `None` if it no longer exists. +pub(crate) async fn get_message( + client: &OutlookClient, + access_token: &str, + message_id: &str, +) -> Result, OutlookError> { + // `$select` the fields our convert layer needs; request internet headers so + // we can persist them (and recover Reply-To etc.) like we do for Gmail. + let url = format!( + "{}/me/messages/{}?$select={}", + client.base_url, + message_id, + MESSAGE_SELECT + ); + client.graph_get_opt(access_token, &url).await +} + +/// The set of message fields we project. Keep in sync with what +/// [`crate::convert::map_message_resource_to_service`] reads. +pub(crate) const MESSAGE_SELECT: &str = "id,changeKey,conversationId,internetMessageId,subject,bodyPreview,body,from,toRecipients,ccRecipients,bccRecipients,replyTo,receivedDateTime,sentDateTime,isRead,isDraft,hasAttachments,parentFolderId,categories,flag,internetMessageHeaders"; + +fn contact_to_recipient(contact: &ContactInfo) -> Recipient { + Recipient { + email_address: EmailAddress { + name: contact.name.clone(), + address: Some(contact.email.clone()), + }, + } +} + +fn contacts_to_recipients(contacts: &Option>) -> Vec { + contacts + .as_ref() + .map(|cs| cs.iter().map(contact_to_recipient).collect()) + .unwrap_or_default() +} + +/// Build the Graph message body to send from a [`MessageToSend`]. HTML is +/// preferred when present (matching how the composer produces content). +fn build_outgoing_message(message: &MessageToSend) -> OutgoingMessage { + let body = if let Some(html) = &message.body_html { + ItemBody { + content_type: "html".to_string(), + content: html.clone(), + } + } else { + ItemBody { + content_type: "text".to_string(), + content: message.body_text.clone().unwrap_or_default(), + } + }; + + let attachments = message + .attachments + .as_ref() + .map(|atts| { + atts.iter() + .map(|att| { + OutgoingAttachment::file( + att.file_name.clone(), + att.content_type.clone(), + STANDARD.encode(&att.data), + ) + }) + .collect() + }) + .unwrap_or_default(); + + OutgoingMessage { + subject: message.subject.clone(), + body, + to_recipients: contacts_to_recipients(&message.to), + cc_recipients: contacts_to_recipients(&message.cc), + bcc_recipients: contacts_to_recipients(&message.bcc), + attachments, + internet_message_headers: Vec::new(), + } +} + +/// Send a message. +/// +/// Unlike Gmail's `messages.send` (which returns the created id + thread id), +/// Graph's `sendMail` returns nothing. To still capture the provider ids — and +/// to thread replies correctly — we create a draft first, then send it: +/// +/// * **reply** (`parent_provider_message_id` set): `createReply` produces a draft +/// already attached to the conversation; we patch it with the composed +/// content and recipients, then send. +/// * **new message**: create a fresh draft, then send. +/// +/// On success `message.provider_id` / `message.provider_thread_id` are set from +/// the draft. Note Graph may re-id the message when it lands in Sent Items; the +/// subsequent delta sync reconciles the canonical copy. +pub(crate) async fn send_message( + client: &OutlookClient, + access_token: &str, + message: &mut MessageToSend, + from_contact: &ContactInfo, + parent_provider_message_id: Option, +) -> Result<(), OutlookError> { + let _ = from_contact; // Graph derives From from the authenticated mailbox. + let outgoing = build_outgoing_message(message); + + let draft = if let Some(parent_id) = parent_provider_message_id { + // Create a reply draft attached to the existing conversation. + let create_reply_url = format!("{}/me/messages/{}/createReply", client.base_url, parent_id); + let draft: MessageResource = { + let resp = client + .inner + .post(&create_reply_url) + .bearer_auth(access_token) + .header("Content-Length", "0") + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + OutlookClient::deserialize_success(resp).await? + }; + + // Overwrite the auto-generated reply with the composed content. + let patch_url = format!("{}/me/messages/{}", client.base_url, draft.id); + let resp = client + .inner + .patch(&patch_url) + .bearer_auth(access_token) + .json(&outgoing) + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + OutlookClient::deserialize_success::(resp).await? + } else { + // Create a standalone draft. + let create_url = format!("{}/me/messages", client.base_url); + let resp = client + .inner + .post(&create_url) + .bearer_auth(access_token) + .json(&outgoing) + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + OutlookClient::deserialize_success::(resp).await? + }; + + // Send the draft. + let send_url = format!("{}/me/messages/{}/send", client.base_url, draft.id); + let resp = client + .inner + .post(&send_url) + .bearer_auth(access_token) + .header("Content-Length", "0") + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + OutlookClient::ensure_success(resp).await?; + + message.provider_id = Some(draft.id); + if let Some(conversation_id) = draft.conversation_id { + message.provider_thread_id = Some(conversation_id); + } + + Ok(()) +} diff --git a/rust/cloud-storage/outlook_client/src/profile.rs b/rust/cloud-storage/outlook_client/src/profile.rs new file mode 100644 index 0000000000..903b42677a --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/profile.rs @@ -0,0 +1,19 @@ +//! Profile lookup against Microsoft Graph (`GET /me`). + +use crate::OutlookClient; +use models_email::outlook::UserResource; +use models_email::outlook::error::OutlookError; + +/// Fetch the signed-in user's profile. Used to resolve the primary SMTP address +/// of a freshly-linked Outlook inbox (the analogue of Gmail's +/// `users.getProfile`). +pub(crate) async fn get_profile( + client: &OutlookClient, + access_token: &str, +) -> Result { + let url = format!( + "{}/me?$select=id,mail,userPrincipalName,displayName", + client.base_url + ); + client.graph_get(access_token, &url).await +} diff --git a/rust/cloud-storage/outlook_client/src/subscriptions.rs b/rust/cloud-storage/outlook_client/src/subscriptions.rs new file mode 100644 index 0000000000..5814619c06 --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/subscriptions.rs @@ -0,0 +1,98 @@ +//! Change-notification subscription management against Microsoft Graph. +//! +//! This is the Outlook analogue of `gmail_client::watch`. Graph subscriptions on +//! message resources are short-lived (max ~3 days) and must be renewed +//! periodically; the email service schedules renewals before expiry. + +use crate::OutlookClient; +use chrono::{Duration, Utc}; +use models_email::outlook::error::OutlookError; +use models_email::outlook::subscription::{ + CreateSubscriptionRequest, RenewSubscriptionRequest, Subscription, +}; + +/// How far in the future to set a subscription's expiry. Graph caps message +/// subscriptions at ~4230 minutes; we use 2 days and renew well before then. +const SUBSCRIPTION_TTL_MINUTES: i64 = 2 * 24 * 60; + +/// The mailbox resource we watch. Scoped to the inbox to mirror Gmail's +/// inbox-focused watch. +const WATCHED_RESOURCE: &str = "/me/mailFolders('inbox')/messages"; + +fn expiration_timestamp() -> String { + (Utc::now() + Duration::minutes(SUBSCRIPTION_TTL_MINUTES)).to_rfc3339() +} + +/// Create a subscription delivering inbox change notifications to our webhook. +pub(crate) async fn create_subscription( + client: &OutlookClient, + access_token: &str, +) -> Result { + let url = format!("{}/subscriptions", client.base_url); + + let body = CreateSubscriptionRequest { + change_type: "created,updated,deleted".to_string(), + notification_url: client.notification_url.clone(), + resource: WATCHED_RESOURCE.to_string(), + expiration_date_time: expiration_timestamp(), + client_state: client.client_state.clone(), + }; + + let resp = client + .inner + .post(&url) + .bearer_auth(access_token) + .json(&body) + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + + OutlookClient::deserialize_success(resp).await +} + +/// Renew a subscription, pushing its expiry out by the standard TTL. +pub(crate) async fn renew_subscription( + client: &OutlookClient, + access_token: &str, + subscription_id: &str, +) -> Result { + let url = format!("{}/subscriptions/{}", client.base_url, subscription_id); + + let body = RenewSubscriptionRequest { + expiration_date_time: expiration_timestamp(), + }; + + let resp = client + .inner + .patch(&url) + .bearer_auth(access_token) + .json(&body) + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + + OutlookClient::deserialize_success(resp).await +} + +/// Delete a subscription, stopping notifications. +pub(crate) async fn delete_subscription( + client: &OutlookClient, + access_token: &str, + subscription_id: &str, +) -> Result<(), OutlookError> { + let url = format!("{}/subscriptions/{}", client.base_url, subscription_id); + + let resp = client + .inner + .delete(&url) + .bearer_auth(access_token) + .send() + .await + .map_err(|e| OutlookError::HttpRequest(e.to_string()))?; + + // A subscription that's already gone is fine — treat 404 as success. + match OutlookClient::ensure_success(resp).await { + Ok(()) | Err(OutlookError::NotFound(_)) => Ok(()), + Err(e) => Err(e), + } +} diff --git a/rust/cloud-storage/outlook_client/src/threads.rs b/rust/cloud-storage/outlook_client/src/threads.rs new file mode 100644 index 0000000000..a6d36cf4b1 --- /dev/null +++ b/rust/cloud-storage/outlook_client/src/threads.rs @@ -0,0 +1,36 @@ +//! Conversation (thread) operations against Microsoft Graph. +//! +//! Outlook groups related messages by `conversationId`. There is no first-class +//! "thread" resource as in Gmail, so we reconstruct a thread by listing all +//! messages that share a conversation id. + +use crate::OutlookClient; +use crate::messages::MESSAGE_SELECT; +use models_email::outlook::error::OutlookError; +use models_email::outlook::{MessageListResponse, MessageResource}; + +/// Maximum messages per page (Graph caps `$top` at 1000 for messages). +const PAGE_SIZE: u32 = 100; + +/// Fetch every message in a conversation, oldest first, following pagination. +pub(crate) async fn get_conversation_messages( + client: &OutlookClient, + access_token: &str, + conversation_id: &str, +) -> Result, OutlookError> { + // Single-quotes inside an OData string literal are escaped by doubling them. + let escaped = conversation_id.replace('\'', "''"); + let mut next_url = Some(format!( + "{}/me/messages?$filter=conversationId eq '{}'&$orderby=receivedDateTime asc&$top={}&$select={}", + client.base_url, escaped, PAGE_SIZE, MESSAGE_SELECT + )); + + let mut messages = Vec::new(); + while let Some(url) = next_url.take() { + let page: MessageListResponse = client.graph_get(access_token, &url).await?; + messages.extend(page.value); + next_url = page.next_link; + } + + Ok(messages) +} From 92bac601a9c839b995966f572ee0f1f73b8676d1 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 17:16:43 +0000 Subject: [PATCH 2/5] feat(auth): scaffold Outlook OAuth linking + token issuance Mirror the Gmail FusionAuth linking flow for Microsoft (Outlook): - fusionauth: add a microsoft module (refresh/exchange/parse) and Microsoft client credentials on FusionAuthClient via a with_microsoft_credentials builder, so the three existing FusionAuthClient::new call sites are untouched. - model + authentication_service_client: add MicrosoftAccessToken and get_microsoft_access_token, mirroring the Google equivalents. - authentication_service: - POST /link/outlook + GET /link/outlook/status (Microsoft authorize URL). - oauth2 /microsoft/callback wired into the shared callback handler; links the Microsoft IdP to the user and stashes the linked email for /email/init. - GET /internal/microsoft_access_token: looks up the user's microsoft_outlook IdP link and refreshes the stored refresh token into a Graph access token. - config: optional MICROSOFT_CLIENT_ID / MICROSOFT_CLIENT_SECRET_KEY (default empty so the service still boots when Outlook is unconfigured). Verified: fusionauth (with new microsoft test), model, authentication_service_client, and authentication_service all compile. --- .../api/internal/microsoft_access_token.rs | 121 ++++++++ .../src/api/internal/mod.rs | 5 + .../src/api/link/mod.rs | 6 + .../src/api/link/outlook.rs | 234 +++++++++++++++ .../src/api/oauth2/microsoft.rs | 204 +++++++++++++ .../src/api/oauth2/mod.rs | 2 + .../authentication_service/src/config.rs | 9 + .../authentication_service/src/main.rs | 20 +- .../authentication_service_client/src/lib.rs | 1 + .../src/microsoft_access_token.rs | 67 +++++ rust/cloud-storage/fusionauth/src/lib.rs | 28 ++ .../fusionauth/src/microsoft/mod.rs | 54 ++++ .../fusionauth/src/microsoft/oauth.rs | 277 ++++++++++++++++++ .../src/authentication/microsoft_token.rs | 8 + .../model/src/authentication/mod.rs | 1 + 15 files changed, 1036 insertions(+), 1 deletion(-) create mode 100644 rust/cloud-storage/authentication_service/src/api/internal/microsoft_access_token.rs create mode 100644 rust/cloud-storage/authentication_service/src/api/link/outlook.rs create mode 100644 rust/cloud-storage/authentication_service/src/api/oauth2/microsoft.rs create mode 100644 rust/cloud-storage/authentication_service_client/src/microsoft_access_token.rs create mode 100644 rust/cloud-storage/fusionauth/src/microsoft/mod.rs create mode 100644 rust/cloud-storage/fusionauth/src/microsoft/oauth.rs create mode 100644 rust/cloud-storage/model/src/authentication/microsoft_token.rs diff --git a/rust/cloud-storage/authentication_service/src/api/internal/microsoft_access_token.rs b/rust/cloud-storage/authentication_service/src/api/internal/microsoft_access_token.rs new file mode 100644 index 0000000000..7d96ec8e6f --- /dev/null +++ b/rust/cloud-storage/authentication_service/src/api/internal/microsoft_access_token.rs @@ -0,0 +1,121 @@ +use axum::{ + Json, + extract::{self, State}, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use fusionauth::FusionAuthClient; +use fusionauth::error::FusionAuthClientError; +use macro_middleware::auth::internal_access::ValidInternalKey; +use model::authentication::microsoft_token::MicrosoftAccessToken; +use model::response::ErrorResponse; +use std::sync::Arc; + +/// FusionAuth identity-provider name for the Microsoft (Outlook) IdP. Mirrors +/// the `google_gmail` name used for Gmail. +pub(crate) const OUTLOOK_IDENTITY_PROVIDER_NAME: &str = "microsoft_outlook"; + +#[derive(serde::Deserialize, Debug)] +pub struct MicrosoftAccessTokenParams { + fusionauth_user_id: String, + /// The linked Microsoft account's email — what FusionAuth stores as + /// `display_name` on the IdP link. Discriminates one Microsoft account from + /// another when the FA user has multiple Microsoft IdP links. + email: String, +} + +/// Gets a Microsoft (Outlook) access token for the linked account. Mirrors the +/// Gmail `google_access_token` handler. +#[tracing::instrument(skip(auth_client, _internal_access))] +pub async fn handler( + State(auth_client): State>, + _internal_access: ValidInternalKey, + extract::Query(params): extract::Query, +) -> Result { + get_access_token(auth_client, ¶ms, OUTLOOK_IDENTITY_PROVIDER_NAME).await +} + +/// Fetches an access token for a user from the Microsoft identity provider by +/// looking up their IdP link and refreshing the stored refresh token. +#[tracing::instrument(skip(auth_client))] +async fn get_access_token( + auth_client: Arc, + params: &MicrosoftAccessTokenParams, + identity_provider_name: &str, +) -> Result { + let fusionauth_user_id = params.fusionauth_user_id.as_str(); + let email = params.email.as_str(); + + // get identity provider id + let idp_id = auth_client + .get_identity_provider_id_by_name(identity_provider_name) + .await + .map_err(|e| { + tracing::error!(error=?e, "unable to find idp id for {}", identity_provider_name); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + message: "unable to find idp".into(), + }), + ) + .into_response() + })?; + + // get refresh token via link + let links = auth_client + .get_links(fusionauth_user_id, Some(idp_id.clone())) + .await + .map_err(|e| { + tracing::error!(error=?e, "error fetching links for userid {} and idp id {}", fusionauth_user_id, idp_id.as_str()); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + message: "unable to fetch links".into(), + }), + ) + .into_response() + })?; + + // a fusionauth user can have multiple links to the same identity provider with different email + // addresses, but can only have one link with a given email + let link = links + .into_iter() + .find(|l| l.display_name.as_str() == email) + .ok_or_else(|| { + tracing::error!( + "link not found for user id {} and idp id {}", + fusionauth_user_id, + idp_id.as_str() + ); + ( + StatusCode::NOT_FOUND, + Json(ErrorResponse { + message: format!("No {} link found for this user", identity_provider_name) + .into(), + }), + ) + .into_response() + })?; + + // get access token using refresh token + let token_response = auth_client + .refresh_microsoft_token(link.token.as_str()) + .await + .map_err(|e| { + tracing::error!(error=?e, "error fetching microsoft access token for userid {}", fusionauth_user_id); + let status_code = match &e { + FusionAuthClientError::InvalidGrant => StatusCode::FORBIDDEN, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }; + let message = format!("unable to fetch {} access token", identity_provider_name); + (status_code, Json(ErrorResponse { message: message.into() })).into_response() + })?; + + Ok(( + StatusCode::OK, + Json(MicrosoftAccessToken { + access_token: token_response.access_token, + }), + ) + .into_response()) +} diff --git a/rust/cloud-storage/authentication_service/src/api/internal/mod.rs b/rust/cloud-storage/authentication_service/src/api/internal/mod.rs index 3f8403e340..ea0e7fcff3 100644 --- a/rust/cloud-storage/authentication_service/src/api/internal/mod.rs +++ b/rust/cloud-storage/authentication_service/src/api/internal/mod.rs @@ -9,12 +9,17 @@ use super::user::post_get_names; // needs to be public in api crate for swagger mod google_access_token; +mod microsoft_access_token; mod post_get_existing_users; mod remove_link; pub fn router() -> Router { Router::new() .route("/google_access_token", get(google_access_token::handler)) + .route( + "/microsoft_access_token", + get(microsoft_access_token::handler), + ) .route("/get_names", post(post_get_names::handler_internal)) .route("/get_existing_users", get(post_get_existing_users::handler)) .route("/remove_link", delete(remove_link::handler)) diff --git a/rust/cloud-storage/authentication_service/src/api/link/mod.rs b/rust/cloud-storage/authentication_service/src/api/link/mod.rs index 592b123575..33550fff49 100644 --- a/rust/cloud-storage/authentication_service/src/api/link/mod.rs +++ b/rust/cloud-storage/authentication_service/src/api/link/mod.rs @@ -6,6 +6,7 @@ use axum::{ pub(in crate::api) mod create_in_progress_link; pub(in crate::api) mod github; pub(in crate::api) mod gmail; +pub(in crate::api) mod outlook; /// The link router /// We ensure the user is logged in with the `macro_middleware::auth::decode_jwt::handler`. @@ -20,4 +21,9 @@ pub fn router(_state: ApiContext) -> Router { ) .route("/gmail", post(gmail::init_gmail_link_handler)) .route("/gmail/status", get(gmail::check_gmail_link_status_handler)) + .route("/outlook", post(outlook::init_outlook_link_handler)) + .route( + "/outlook/status", + get(outlook::check_outlook_link_status_handler), + ) } diff --git a/rust/cloud-storage/authentication_service/src/api/link/outlook.rs b/rust/cloud-storage/authentication_service/src/api/link/outlook.rs new file mode 100644 index 0000000000..47ee5df35a --- /dev/null +++ b/rust/cloud-storage/authentication_service/src/api/link/outlook.rs @@ -0,0 +1,234 @@ +use anyhow::Context; +use axum::{ + Json, + extract::{Query, State}, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use macro_middleware::tracking::ClientIp; +use model::response::ErrorResponse; +use model_user::axum_extractor::MacroUserExtractor; +use serde_utils::urlencode::UrlEncoded; +use url::Url; + +use crate::api::{ + context::ApiContext, link::github::REAUTHENTICATION_REQUIRED_MESSAGE, oauth2::OAuthState, +}; + +/// Microsoft identity platform v2.0 authorize endpoint. `common` supports both +/// work/school (Entra) and personal Microsoft accounts. +const MICROSOFT_AUTHORIZATION_URL: &str = + "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"; +/// FusionAuth identity-provider name for the Microsoft (Outlook) IdP. Mirrors +/// the `google_gmail` name used for Gmail. +const OUTLOOK_IDENTITY_PROVIDER_NAME: &str = "microsoft_outlook"; +/// Delegated Graph scopes. `offline_access` is required to receive a refresh +/// token; the Mail/User scopes back the mailbox operations `outlook_client` +/// performs. Must stay in sync with `fusionauth::microsoft::oauth`. +const OUTLOOK_SCOPES: &str = "offline_access openid profile email https://graph.microsoft.com/Mail.ReadWrite https://graph.microsoft.com/Mail.Send https://graph.microsoft.com/User.Read"; + +#[derive(serde::Deserialize, serde::Serialize, Debug, utoipa::ToSchema)] +pub struct InitOutlookLinkResponse { + /// The OAuth authorization URL to redirect the user to + pub authorization_url: String, + /// The link ID for tracking the OAuth flow + pub link_id: uuid::Uuid, +} + +/// Error type for init Outlook operations +#[derive(thiserror::Error, Debug)] +pub enum InitOutlookLinkError { + /// Too many in-progress links + #[error("too many in progress links")] + TooManyInProgressLinks, + /// Internal error + #[error("internal error occurred")] + InternalError(#[from] anyhow::Error), + /// The identity provider was not found + #[error("identity provider not found")] + IdentityProviderNotFound, +} + +impl IntoResponse for InitOutlookLinkError { + fn into_response(self) -> Response { + let message = self.to_string(); + let status_code: StatusCode = match &self { + InitOutlookLinkError::TooManyInProgressLinks => StatusCode::TOO_MANY_REQUESTS, + InitOutlookLinkError::InternalError(_) + | InitOutlookLinkError::IdentityProviderNotFound => { + StatusCode::INTERNAL_SERVER_ERROR + } + }; + + ( + status_code, + Json(ErrorResponse { + message: message.into(), + }), + ) + .into_response() + } +} + +#[derive(Debug, serde::Deserialize)] +pub(crate) struct InitOutlookLinkQueryParams { + /// Once the frontend is updated to NOT 2x urlencode this then this should be + /// changed to `Option` + original_url: Option>, +} + +/// Initiates an Outlook link for a user. +#[utoipa::path( + post, + operation_id = "init_outlook_link", + path = "/link/outlook", + params( + ("original_url" = String, Query, description = "**OPTIONAL**. The original url to redirect to.") + ), + responses( + (status = 200, body=InitOutlookLinkResponse), + (status = 400, body=ErrorResponse), + (status = 429, body=ErrorResponse), + (status = 401, body=ErrorResponse), + (status = 500, body=ErrorResponse), + ) + )] +#[tracing::instrument(skip(ctx, ip_context, user_context), fields(client_ip=%ip_context, user_id=%user_context.user_context.user_id, fusion_user_id=%user_context.user_context.fusion_user_id), err)] +pub async fn init_outlook_link_handler( + State(ctx): State, + query: Query, + ip_context: ClientIp, + user_context: MacroUserExtractor, +) -> Result, InitOutlookLinkError> { + let Query(InitOutlookLinkQueryParams { original_url }) = query; + + let count = + macro_db_client::in_progress_user_link::count_existing_in_progress_user_links_for_user( + &ctx.db, + &user_context.user_context.fusion_user_id, + ) + .await?; + + if count >= 5 { + return Err(InitOutlookLinkError::TooManyInProgressLinks); + } + + let link_id = macro_db_client::in_progress_user_link::create_in_progress_user_link( + &ctx.db, + &user_context.user_context.fusion_user_id, + ) + .await?; + + let outlook_idp_id = ctx + .auth_client + .get_identity_provider_id_by_name(OUTLOOK_IDENTITY_PROVIDER_NAME) + .await + .map_err(|_| InitOutlookLinkError::IdentityProviderNotFound)?; + + let state = OAuthState { + identity_provider_id: outlook_idp_id, + link_id: Some(link_id), + original_url: original_url.map(|x| x.0.to_string()), + is_mobile: None, + }; + + let redirect_uri = crate::api::oauth2::format_redirect_uri("microsoft"); + let state_str = serde_json::to_string(&state).context("failed to serialize OAuth state")?; + + let mut authorization_url = Url::parse(MICROSOFT_AUTHORIZATION_URL) + .context("invalid Microsoft authorization URL")?; + authorization_url + .query_pairs_mut() + .append_pair("client_id", ctx.auth_client.microsoft_client_id()) + .append_pair("redirect_uri", &redirect_uri) + .append_pair("response_type", "code") + .append_pair("response_mode", "query") + .append_pair("scope", OUTLOOK_SCOPES) + .append_pair("state", &state_str) + // Force the consent screen so a refresh token is always issued. + .append_pair("prompt", "consent"); + + Ok(Json(InitOutlookLinkResponse { + authorization_url: authorization_url.to_string(), + link_id, + })) +} + +#[derive(serde::Deserialize, serde::Serialize, Debug, utoipa::ToSchema)] +pub struct OutlookLinkStatusResponse { + /// Whether the user must reauthenticate their Outlook link. + pub reauthentication_required: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum OutlookLinkStatusError { + #[error("reauthentication required")] + ReauthenticationRequired, + #[error("internal")] + Internal(#[from] anyhow::Error), +} + +impl IntoResponse for OutlookLinkStatusError { + fn into_response(self) -> Response { + match &self { + OutlookLinkStatusError::ReauthenticationRequired => ( + StatusCode::PRECONDITION_REQUIRED, + Json(ErrorResponse { + message: REAUTHENTICATION_REQUIRED_MESSAGE.into(), + }), + ), + OutlookLinkStatusError::Internal(_) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse { + message: "internal error occurred".into(), + }), + ), + } + .into_response() + } +} + +/// Checks whether the authenticated user's Outlook link is valid. +#[utoipa::path( + get, + operation_id = "check_outlook_link_status", + path = "/link/outlook/status", + responses( + (status = 200, body=OutlookLinkStatusResponse), + (status = 401, body=ErrorResponse), + (status = 404, body=ErrorResponse), + (status = 428, body=ErrorResponse), + (status = 500, body=ErrorResponse), + ) + )] +#[tracing::instrument(skip(ctx, ip_context, user_context), fields(client_ip=%ip_context, user_id=%user_context.macro_user_id), err)] +pub async fn check_outlook_link_status_handler( + State(ctx): State, + ip_context: ClientIp, + user_context: MacroUserExtractor, +) -> Result, OutlookLinkStatusError> { + // Check if the user has an email link in db + if macro_db_client::email::check_user_email_link(&ctx.db, &user_context.macro_user_id) + .await + .map_err(OutlookLinkStatusError::Internal)? + { + let links = ctx + .auth_client + .get_links(&user_context.user_context.fusion_user_id, None) + .await + .map_err(|e| OutlookLinkStatusError::Internal(e.into()))?; + + let has_outlook_link = links + .iter() + .any(|l| l.identity_provider_name.eq(OUTLOOK_IDENTITY_PROVIDER_NAME)); + + // If no, return 428 + if !has_outlook_link { + return Err(OutlookLinkStatusError::ReauthenticationRequired); + } + } + + Ok(Json(OutlookLinkStatusResponse { + reauthentication_required: false, + })) +} diff --git a/rust/cloud-storage/authentication_service/src/api/oauth2/microsoft.rs b/rust/cloud-storage/authentication_service/src/api/oauth2/microsoft.rs new file mode 100644 index 0000000000..5a8e8ec914 --- /dev/null +++ b/rust/cloud-storage/authentication_service/src/api/oauth2/microsoft.rs @@ -0,0 +1,204 @@ +use anyhow::Context; +use email_validator::normalize_email; +use std::borrow::Cow; + +use axum::{ + Json, + response::{IntoResponse, Redirect, Response}, +}; +use model::response::ErrorResponse; +use reqwest::StatusCode; +use tower_cookies::Cookies; +use url::Url; + +use crate::api::{ + context::ApiContext, + oauth2::{ + OAuthState, format_redirect_uri, + login::{self}, + }, +}; +use fusionauth::error::FusionAuthClientError; +use fusionauth::identity_provider::{IdentityProviderLink, LinkUserRequest}; + +async fn link_user( + ctx: &ApiContext, + identity_provider_id: &str, + code: &str, + link_id: &uuid::Uuid, +) -> Result<(), (StatusCode, String)> { + let macro_user_id = + macro_db_client::in_progress_user_link::get_macro_user_id_by_link_id(&ctx.db, link_id) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let token_response = ctx + .auth_client + .exchange_microsoft_code_for_tokens(code, &format_redirect_uri("microsoft")) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("unable to exchange code for tokens {e}"), + ) + })?; + + let user_info = ctx + .auth_client + .parse_microsoft_id_token(&token_response.id_token) + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("unable to decode id token {e}"), + ) + })?; + + // Work/school accounts may only populate `preferred_username`; personal + // accounts populate `email`. `resolved_email` picks whichever is present. + let raw_email = user_info.resolved_email().ok_or_else(|| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "microsoft id token did not contain an email".to_string(), + ) + })?; + + // Microsoft's immutable object id (`oid`) is the stable per-user identifier; + // fall back to the token subject if it's absent. + let idp_user_id = user_info + .oid + .as_deref() + .or(user_info.sub.as_deref()) + .ok_or_else(|| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "microsoft id token did not contain a subject".to_string(), + ) + })?; + + let user_info_email = normalize_email(raw_email) + .context("email should be normalizable") + .map_err(|_| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("unable to normalize email {raw_email}"), + ) + })?; + + // Same three terminal cases as the Gmail flow: fresh link, idempotent + // relink, or cross-account add. /email/init re-derives ownership downstream. + match ctx + .auth_client + .link_user(LinkUserRequest { + identity_provider_link: IdentityProviderLink { + display_name: user_info_email.clone(), + identity_provider_id: Cow::Borrowed(identity_provider_id), + identity_provider_user_id: Cow::Borrowed(idp_user_id), + user_id: Cow::Borrowed(¯o_user_id.to_string()), + token: Cow::Borrowed(&token_response.refresh_token), + }, + }) + .await + { + Ok(()) => {} + Err(FusionAuthClientError::IdentityProviderLinkAlreadyExists) => { + tracing::info!( + fusion_user_id = %macro_user_id, + linked_email = %user_info_email, + "outlook idp link already exists, skipping creation" + ); + } + Err(e) => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("unable to link user {e}"), + )); + } + } + + // Stash the linked email on the in_progress_user_link row so /email/init can + // pick it up; the row is consumed and deleted by /email/init. + macro_db_client::in_progress_user_link::set_linked_email(&ctx.db, link_id, &user_info_email) + .await + .map_err(|e| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("unable to record linked email on in_progress_user_link {e}"), + ) + })?; + + Ok(()) +} + +pub(in crate::api::oauth2) async fn handler( + ctx: &ApiContext, + cookies: Cookies, + code: &str, + state: &OAuthState, +) -> Result { + // If the link id is provided the user is already logged in; we only need to + // attach the Outlook IdP link, not complete a login. + if let Some(link_id) = state.link_id.as_ref() { + let link_result = link_user(ctx, &state.identity_provider_id, code, link_id).await; + + if link_result.is_err() { + // Best-effort cleanup so a failed attempt doesn't burn an in-flight slot. + macro_db_client::in_progress_user_link::delete_in_progress_user_link(&ctx.db, link_id) + .await + .inspect_err(|e| { + tracing::warn!( + error=?e, + ?link_id, + "failed to clean up in_progress_user_link after link_user error" + ); + }) + .ok(); + } + + link_result.map_err(|(status_code, error)| { + tracing::error!(error=?error, "unable to link user"); + ( + status_code, + Json(ErrorResponse { + message: error.into(), + }), + ) + .into_response() + })?; + + if let Some(original_url) = &state.original_url { + let decoded = urlencoding::decode(original_url).map_err(|e| { + tracing::error!(error=?e, "unable to decode original url"); + ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + message: "unable to decode original url".into(), + }), + ) + .into_response() + })?; + + let mut url: Url = decoded + .parse() + .inspect_err(|e| tracing::error!(error=?e, "unable to parse string to url")) + .map_err(|_| { + ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + message: "unable to parse to original url".into(), + }), + ) + .into_response() + })?; + + url.query_pairs_mut() + .append_pair("link_id", &link_id.to_string()); + + return Ok(Redirect::to(url.as_str()).into_response()); + } + + return Ok(StatusCode::OK.into_response()); + } + + // No link required: complete the standard IdP login. + login::handler(ctx, cookies, code, "microsoft", state).await +} diff --git a/rust/cloud-storage/authentication_service/src/api/oauth2/mod.rs b/rust/cloud-storage/authentication_service/src/api/oauth2/mod.rs index 9c98c0bd22..d938a67388 100644 --- a/rust/cloud-storage/authentication_service/src/api/oauth2/mod.rs +++ b/rust/cloud-storage/authentication_service/src/api/oauth2/mod.rs @@ -5,6 +5,7 @@ use tower_cookies::CookieManagerLayer; mod github; mod google; mod login; +mod microsoft; pub fn router() -> Router { Router::new().route( @@ -120,6 +121,7 @@ pub(in crate::api) async fn handler( match provider.as_str() { "google" => google::handler(&ctx, cookies, &code, &state).await, + "microsoft" => microsoft::handler(&ctx, cookies, &code, &state).await, "github" => github::handler(&ctx, cookies, &code, &state) .await .map(|r| r.into_response()) diff --git a/rust/cloud-storage/authentication_service/src/config.rs b/rust/cloud-storage/authentication_service/src/config.rs index dc19a6d468..13d7b41bd7 100644 --- a/rust/cloud-storage/authentication_service/src/config.rs +++ b/rust/cloud-storage/authentication_service/src/config.rs @@ -43,6 +43,15 @@ pub struct Config { /// Google client secret key pub google_client_secret_key: String, + /// Microsoft (Outlook) client id. Empty when Outlook linking is not + /// configured for the environment. + #[macro_config_default(String::new())] + pub microsoft_client_id: String, + /// Microsoft (Outlook) client secret key (secrets-manager name). Empty when + /// Outlook linking is not configured for the environment. + #[macro_config_default(String::new())] + pub microsoft_client_secret_key: String, + /// Stripe secret key pub stripe_secret_key: String, diff --git a/rust/cloud-storage/authentication_service/src/main.rs b/rust/cloud-storage/authentication_service/src/main.rs index 88f4b3ff44..7a0f062b16 100644 --- a/rust/cloud-storage/authentication_service/src/main.rs +++ b/rust/cloud-storage/authentication_service/src/main.rs @@ -142,6 +142,23 @@ async fn main() -> anyhow::Result<()> { .to_string(), }; + // Outlook is optional: only resolve the Microsoft client secret when the + // client id and secret key are configured for this environment. + let microsoft_client_secret = if config.microsoft_client_id.is_empty() + || config.microsoft_client_secret_key.is_empty() + { + String::new() + } else { + match config.environment { + Environment::Local => config.microsoft_client_secret_key.clone(), + _ => secretsmanager_client + .get_secret_value(&config.microsoft_client_secret_key) + .await + .context("unable to get microsoft client secret")? + .to_string(), + } + }; + let auth_client = fusionauth::FusionAuthClient::new( config.fusionauth_tenant_id, fusionauth_api_key, @@ -151,7 +168,8 @@ async fn main() -> anyhow::Result<()> { config.fusionauth_oauth_redirect_uri.clone(), config.google_client_id.clone(), google_client_secret, - ); + ) + .with_microsoft_credentials(config.microsoft_client_id.clone(), microsoft_client_secret); tracing::trace!("initialized auth client"); let document_storage_service_client = DocumentStorageServiceClient::new( diff --git a/rust/cloud-storage/authentication_service_client/src/lib.rs b/rust/cloud-storage/authentication_service_client/src/lib.rs index 70afd6fcfa..6432bccd8b 100644 --- a/rust/cloud-storage/authentication_service_client/src/lib.rs +++ b/rust/cloud-storage/authentication_service_client/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; pub mod google_access_token; +pub mod microsoft_access_token; pub mod unlink; pub mod users; diff --git a/rust/cloud-storage/authentication_service_client/src/microsoft_access_token.rs b/rust/cloud-storage/authentication_service_client/src/microsoft_access_token.rs new file mode 100644 index 0000000000..a3f5241026 --- /dev/null +++ b/rust/cloud-storage/authentication_service_client/src/microsoft_access_token.rs @@ -0,0 +1,67 @@ +use crate::AuthServiceClient; +use crate::error::{AuthServiceClientError, GenericErrorResponse}; +use model::authentication::microsoft_token::MicrosoftAccessToken; + +impl AuthServiceClient { + /// Gets the Microsoft (Outlook) access token for the given fusionauth user and + /// linked email. Mirrors [`AuthServiceClient::get_google_access_token`]. + /// + /// `email` corresponds to the `display_name` on the FusionAuth IdP link — i.e. + /// the linked Microsoft account's email — which is the discriminator when one + /// FA user has multiple IdP links (multi-inbox). + #[tracing::instrument(skip(self))] + pub async fn get_microsoft_access_token( + &self, + fusionauth_user_id: &str, + email: &str, + ) -> Result { + let res = self + .client + .get(format!("{}/internal/microsoft_access_token", self.url)) + .query(&[("fusionauth_user_id", fusionauth_user_id)]) + .query(&[("email", email)]) + .send() + .await + .map_err(|e| AuthServiceClientError::RequestBuildError { + details: e.to_string(), + })?; + + match res.status() { + reqwest::StatusCode::OK => { + tracing::trace!("user microsoft access token retrieved"); + let result = res.json::().await.map_err(|e| { + AuthServiceClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + Ok(result) + } + reqwest::StatusCode::UNAUTHORIZED => Err(AuthServiceClientError::Unauthorized), + reqwest::StatusCode::FORBIDDEN => Err(AuthServiceClientError::Forbidden), + reqwest::StatusCode::NOT_FOUND => Err(AuthServiceClientError::NotFound), + reqwest::StatusCode::INTERNAL_SERVER_ERROR => { + let error_message = res.text().await.map_err(|e| { + AuthServiceClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + Err(AuthServiceClientError::InternalServerError { + details: error_message, + }) + } + _ => { + let body = res.text().await.map_err(|e| { + AuthServiceClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + Err(AuthServiceClientError::Generic(GenericErrorResponse { + message: body, + })) + } + } + } +} diff --git a/rust/cloud-storage/fusionauth/src/lib.rs b/rust/cloud-storage/fusionauth/src/lib.rs index 5d48fc303d..e3f5db5f46 100644 --- a/rust/cloud-storage/fusionauth/src/lib.rs +++ b/rust/cloud-storage/fusionauth/src/lib.rs @@ -7,6 +7,8 @@ pub mod apple; pub mod error; /// Google identity provider and OAuth. pub mod google; +/// Microsoft (Outlook) identity provider and OAuth. +pub mod microsoft; /// Identity provider management (links, login, lookup, search, unlink). pub mod identity_provider; /// JWT refresh operations. @@ -108,6 +110,12 @@ pub struct FusionAuthClient { google_client_id: String, /// The client secret for Google identity provider google_client_secret: String, + /// The client ID for the Microsoft (Outlook) identity provider. Empty when + /// Outlook linking is not configured. + microsoft_client_id: String, + /// The client secret for the Microsoft (Outlook) identity provider. Empty + /// when Outlook linking is not configured. + microsoft_client_secret: String, } impl FusionAuthClient { @@ -135,14 +143,34 @@ impl FusionAuthClient { unauth_client, google_client_id, google_client_secret, + microsoft_client_id: String::new(), + microsoft_client_secret: String::new(), } } + /// Sets the Microsoft (Outlook) OAuth client credentials, enabling Outlook + /// linking. Returns `self` for builder-style chaining so existing call sites + /// that don't use Outlook are unaffected. + pub fn with_microsoft_credentials( + mut self, + client_id: String, + client_secret: String, + ) -> Self { + self.microsoft_client_id = client_id; + self.microsoft_client_secret = client_secret; + self + } + /// Returns the Google OAuth client ID. pub fn google_client_id(&self) -> &str { &self.google_client_id } + /// Returns the Microsoft (Outlook) OAuth client ID. + pub fn microsoft_client_id(&self) -> &str { + &self.microsoft_client_id + } + /// Constructs the oauth2 authorize url for the given idp /// If login_hint is provided, it will be used as the login_hint parameter. This is used to /// ensure users are correctly redirected for domain specific SSO diff --git a/rust/cloud-storage/fusionauth/src/microsoft/mod.rs b/rust/cloud-storage/fusionauth/src/microsoft/mod.rs new file mode 100644 index 0000000000..04382e0dc2 --- /dev/null +++ b/rust/cloud-storage/fusionauth/src/microsoft/mod.rs @@ -0,0 +1,54 @@ +use crate::{ + FusionAuthClient, Result, + error::{FusionAuthClientError, GenericErrorResponse}, +}; + +/// Microsoft (Outlook) OAuth token operations. +pub mod oauth; + +impl FusionAuthClient { + /// Refreshes a Microsoft OAuth2 access token using a refresh token. + #[tracing::instrument(skip(self, refresh_token), fields(client_id=%self.microsoft_client_id))] + pub async fn refresh_microsoft_token( + &self, + refresh_token: &str, + ) -> Result { + oauth::refresh_microsoft_token( + &self.unauth_client, + &self.microsoft_client_id, + &self.microsoft_client_secret, + refresh_token, + ) + .await + } + + /// Exchanges a Microsoft authorization code for tokens. + #[tracing::instrument(skip(self, code, redirect_uri))] + pub async fn exchange_microsoft_code_for_tokens( + &self, + code: &str, + redirect_uri: &str, + ) -> Result { + oauth::exchange_code_for_tokens( + &self.unauth_client, + &self.microsoft_client_id, + &self.microsoft_client_secret, + redirect_uri, + code, + ) + .await + } + + /// Parses and decodes a Microsoft ID token to extract user info. + #[tracing::instrument(skip(self, id_token))] + pub fn parse_microsoft_id_token(&self, id_token: &str) -> Result { + let result = oauth::decode_microsoft_id_token(id_token).map_err(|e| { + tracing::error!(error=?e, "unable to parse microsoft id token"); + FusionAuthClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + Ok(result) + } +} diff --git a/rust/cloud-storage/fusionauth/src/microsoft/oauth.rs b/rust/cloud-storage/fusionauth/src/microsoft/oauth.rs new file mode 100644 index 0000000000..16b05f6276 --- /dev/null +++ b/rust/cloud-storage/fusionauth/src/microsoft/oauth.rs @@ -0,0 +1,277 @@ +use std::{collections::HashMap, time::Duration}; + +use crate::{ + Result, UnauthedClient, + error::{FusionAuthClientError, GenericErrorResponse}, +}; +use base64::{Engine, engine::general_purpose}; + +/// Microsoft identity platform v2.0 token endpoint. `common` supports both work/ +/// school (Entra) accounts and personal Microsoft accounts. +const MICROSOFT_TOKEN_URL: &str = "https://login.microsoftonline.com/common/oauth2/v2.0/token"; + +/// Scopes requested when refreshing/exchanging tokens. `offline_access` is +/// required to obtain a refresh token; the Mail/User scopes back the mailbox +/// operations the `outlook_client` performs. +const MICROSOFT_SCOPES: &str = + "offline_access openid profile email https://graph.microsoft.com/Mail.ReadWrite https://graph.microsoft.com/Mail.Send https://graph.microsoft.com/User.Read"; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +/// Response from refreshing a Microsoft OAuth2 access token. +pub struct MicrosoftTokenResponse { + /// The access token. + pub access_token: String, + /// The number of seconds until the token expires. + #[serde(default)] + pub expires_in: u64, + /// The scope of the token. + #[serde(default)] + pub scope: String, + /// The type of token. + #[serde(default)] + pub token_type: String, + /// The ID token, if openid scope was granted. + #[serde(default)] + pub id_token: Option, + /// A rotated refresh token, if one was returned. + #[serde(default)] + pub refresh_token: Option, +} + +#[derive(serde::Serialize, Debug)] +struct RefreshRequest<'a> { + client_id: &'a str, + client_secret: &'a str, + refresh_token: &'a str, + grant_type: &'a str, + scope: &'a str, +} + +/// Refreshes a Microsoft OAuth2 access token using a refresh token. +/// See +pub(crate) async fn refresh_microsoft_token( + client: &UnauthedClient, + client_id: &str, + client_secret: &str, + refresh_token: &str, +) -> Result { + let token_request = RefreshRequest { + client_id, + client_secret, + refresh_token, + grant_type: "refresh_token", + scope: MICROSOFT_SCOPES, + }; + + let res = client + .client() + .post(MICROSOFT_TOKEN_URL) + // The Microsoft token endpoint expects application/x-www-form-urlencoded. + .form(&token_request) + .timeout(Duration::from_secs(30)) + .send() + .await + .map_err(|e| { + tracing::error!(error=?e, "failed to send microsoft access token request"); + FusionAuthClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + match res.status() { + reqwest::StatusCode::OK => { + let response = res.json::().await.map_err(|e| { + tracing::error!(error=?e, "unable to parse microsoft token response"); + FusionAuthClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + tracing::debug!( + expires_in=?response.expires_in, + scope=?response.scope, + token_type=?response.token_type, + "successfully refreshed Microsoft access token" + ); + + Ok(response) + } + status => { + let error_text = res + .text() + .await + .unwrap_or_else(|_| "Unable to read error response".to_string()); + tracing::error!( + status=?status, + error=?error_text, + "failed to refresh Microsoft access token" + ); + + // When the user revokes Macro's access (or the refresh token is + // otherwise no longer valid) Microsoft returns invalid_grant, which + // we surface so callers can tear the link down — mirroring Google. + if error_text.contains("invalid_grant") { + return Err(FusionAuthClientError::InvalidGrant); + } + + Err(FusionAuthClientError::Generic(GenericErrorResponse { + message: format!( + "Microsoft token refresh failed with status {}: {}", + status, error_text + ), + })) + } + } +} + +#[derive(Debug, serde::Deserialize)] +/// Response from exchanging a Microsoft authorization code for tokens. +pub struct MicrosoftExchangeTokenResponse { + /// The refresh token. + pub refresh_token: String, + /// The ID token. + pub id_token: String, +} + +#[derive(Debug, serde::Serialize)] +struct TokenExchangeRequest<'a> { + client_id: &'a str, + client_secret: &'a str, + code: &'a str, + grant_type: &'a str, + redirect_uri: &'a str, + scope: &'a str, +} + +/// Exchanges a Microsoft authorization code for tokens. +pub(crate) async fn exchange_code_for_tokens( + client: &UnauthedClient, + client_id: &str, + client_secret: &str, + redirect_uri: &str, + code: &str, +) -> Result { + let token_request = TokenExchangeRequest { + client_id, + client_secret, + code, + grant_type: "authorization_code", + redirect_uri, + scope: MICROSOFT_SCOPES, + }; + + let response = client + .client() + .post(MICROSOFT_TOKEN_URL) + .form(&token_request) + .timeout(Duration::from_secs(30)) + .send() + .await + .map_err(|e| { + tracing::error!(error=?e, "failed to send microsoft token request"); + FusionAuthClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + let status = response.status(); + + if !status.is_success() { + let error_body = response.text().await.map_err(|e| { + tracing::error!(error=?e, "failed to get error body"); + FusionAuthClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + tracing::error!(status=?status, body=?error_body, "microsoft token exchange failed"); + return Err(FusionAuthClientError::Generic(GenericErrorResponse { + message: format!( + "microsoft token exchange failed with status {}: {}", + status, error_body + ), + })); + } + + let token_response: MicrosoftExchangeTokenResponse = response.json().await.map_err(|e| { + tracing::error!(error=?e, "failed to parse microsoft token response"); + FusionAuthClientError::Generic(GenericErrorResponse { + message: e.to_string(), + }) + })?; + + Ok(token_response) +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +/// User info extracted from a Microsoft ID token. +pub struct MicrosoftUserInfo { + /// The immutable Microsoft object id for the user, when present. + #[serde(default)] + pub oid: Option, + /// The token subject. + #[serde(default)] + pub sub: Option, + /// The user's email address. Personal accounts populate `email`; work/school + /// accounts often only populate `preferred_username`, so callers should fall + /// back to that. + #[serde(default)] + pub email: Option, + /// The username, frequently the UPN / email for work accounts. + #[serde(default)] + pub preferred_username: Option, + /// Additional claims from the ID token. + #[serde(flatten)] + pub other: HashMap, +} + +impl MicrosoftUserInfo { + /// Best-effort resolution of the account email, preferring `email` and + /// falling back to `preferred_username`. + pub fn resolved_email(&self) -> Option<&str> { + self.email + .as_deref() + .or(self.preferred_username.as_deref()) + } +} + +/// Decodes a Microsoft ID token without signature verification. +pub(crate) fn decode_microsoft_id_token(id_token: &str) -> anyhow::Result { + let parts: Vec<&str> = id_token.split('.').collect(); + if parts.len() != 3 { + anyhow::bail!("invalid jwt format") + } + + let payload = parts[1]; + let decoded_bytes = general_purpose::URL_SAFE_NO_PAD.decode(payload)?; + let decoded_str = String::from_utf8(decoded_bytes)?; + let claims: MicrosoftUserInfo = serde_json::from_str(&decoded_str)?; + + Ok(claims) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn resolved_email_prefers_email_then_upn() { + let info = MicrosoftUserInfo { + oid: None, + sub: None, + email: Some("a@contoso.com".to_string()), + preferred_username: Some("upn@contoso.com".to_string()), + other: HashMap::new(), + }; + assert_eq!(info.resolved_email(), Some("a@contoso.com")); + + let info_upn_only = MicrosoftUserInfo { + oid: None, + sub: None, + email: None, + preferred_username: Some("upn@contoso.com".to_string()), + other: HashMap::new(), + }; + assert_eq!(info_upn_only.resolved_email(), Some("upn@contoso.com")); + } +} diff --git a/rust/cloud-storage/model/src/authentication/microsoft_token.rs b/rust/cloud-storage/model/src/authentication/microsoft_token.rs new file mode 100644 index 0000000000..c6e05a99b7 --- /dev/null +++ b/rust/cloud-storage/model/src/authentication/microsoft_token.rs @@ -0,0 +1,8 @@ +/// An access token minted for a linked Microsoft (Outlook) account, returned by +/// the authentication service's internal token endpoint. Mirrors +/// [`super::google_token::GoogleAccessToken`]. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct MicrosoftAccessToken { + /// The user's Microsoft Graph access token. + pub access_token: String, +} diff --git a/rust/cloud-storage/model/src/authentication/mod.rs b/rust/cloud-storage/model/src/authentication/mod.rs index bcc1a08820..75395fdfad 100644 --- a/rust/cloud-storage/model/src/authentication/mod.rs +++ b/rust/cloud-storage/model/src/authentication/mod.rs @@ -1,5 +1,6 @@ pub mod error; pub mod google_token; +pub mod microsoft_token; pub mod login; pub mod permission; pub mod user; From 48a7964ab2b691bc71e07baadaca6dfb33f81f68 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 17:27:53 +0000 Subject: [PATCH 3/5] feat(email_service): wire Outlook client, webhook ingress + delta sync - Add outlook_client to ApiContext and PubSubContext (and both construction sites); optional OUTLOOK_NOTIFICATION_URL / OUTLOOK_CLIENT_STATE config that default empty so the service boots when Outlook is unconfigured. - util/outlook/auth: fetch Microsoft access tokens via the auth service, with the same revoked-access -> enqueue-link-deletion behavior as Gmail. - api/outlook/webhook: Graph change-notification endpoint handling the validation-token handshake and per-notification clientState verification (the analogue of verifying Gmail's Google-signed JWT), nested at /outlook/webhook. - inbox_sync: new OutlookNotification operation that runs the read side of a delta sync end-to-end (token -> folders -> delta -> per-message fetch + convert to the provider-agnostic Message), wired into the inbox-sync process match. Follow-ons (deliberately deferred, marked with TODOs): persisting the subscription->link mapping so the webhook can enqueue, persisting the @odata.deltaLink for incremental sync, and reusing a provider-agnostic inbound upsert to store converted messages. Verified: email_service and the dependent email handlers compile. --- rust/cloud-storage/Cargo.lock | 1 + rust/cloud-storage/email_service/Cargo.toml | 1 + .../email_service/src/api/context.rs | 1 + .../email_service/src/api/mod.rs | 2 + .../email_service/src/api/outlook/mod.rs | 11 ++ .../email_service/src/api/outlook/webhook.rs | 106 ++++++++++++++ .../src/bin/pubsub_workers/pubsub_workers.rs | 9 ++ .../cloud-storage/email_service/src/config.rs | 19 +++ rust/cloud-storage/email_service/src/main.rs | 7 + .../src/pubsub/backfill/worker.rs | 3 + .../email_service/src/pubsub/context.rs | 2 + .../src/pubsub/inbox_sync/operations/mod.rs | 1 + .../operations/outlook_notification.rs | 137 ++++++++++++++++++ .../src/pubsub/inbox_sync/process.rs | 7 + .../src/pubsub/inbox_sync/worker.rs | 2 + .../email_service/src/util/mod.rs | 1 + .../email_service/src/util/outlook/auth.rs | 71 +++++++++ .../email_service/src/util/outlook/mod.rs | 3 + 18 files changed, 384 insertions(+) create mode 100644 rust/cloud-storage/email_service/src/api/outlook/mod.rs create mode 100644 rust/cloud-storage/email_service/src/api/outlook/webhook.rs create mode 100644 rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/outlook_notification.rs create mode 100644 rust/cloud-storage/email_service/src/util/outlook/auth.rs create mode 100644 rust/cloud-storage/email_service/src/util/outlook/mod.rs diff --git a/rust/cloud-storage/Cargo.lock b/rust/cloud-storage/Cargo.lock index d2152d2224..5720a1fa30 100644 --- a/rust/cloud-storage/Cargo.lock +++ b/rust/cloud-storage/Cargo.lock @@ -4076,6 +4076,7 @@ dependencies = [ "models_properties", "notification", "once_cell", + "outlook_client", "redis", "regex", "reqwest 0.13.4", diff --git a/rust/cloud-storage/email_service/Cargo.toml b/rust/cloud-storage/email_service/Cargo.toml index 1e5bbc9727..a48689371d 100644 --- a/rust/cloud-storage/email_service/Cargo.toml +++ b/rust/cloud-storage/email_service/Cargo.toml @@ -121,6 +121,7 @@ models_opensearch = { path = "../models_opensearch" } models_pagination = { path = "../models_pagination", features = ["axum"] } models_permissions = { path = "../models_permissions" } once_cell = { workspace = true } +outlook_client = { path = "../outlook_client" } models_properties = { path = "../models_properties" } redis = { workspace = true, features = [ "cluster-async", diff --git a/rust/cloud-storage/email_service/src/api/context.rs b/rust/cloud-storage/email_service/src/api/context.rs index 02a496ee6a..b4eae58ca0 100644 --- a/rust/cloud-storage/email_service/src/api/context.rs +++ b/rust/cloud-storage/email_service/src/api/context.rs @@ -36,6 +36,7 @@ pub(crate) struct ApiContext { pub db: sqlx::Pool, pub auth_service_client: Arc, pub gmail_client: Arc, + pub outlook_client: Arc, pub redis_client: Arc, pub sqs_client: Arc, pub s3_client: Arc, diff --git a/rust/cloud-storage/email_service/src/api/mod.rs b/rust/cloud-storage/email_service/src/api/mod.rs index 89ca61fa12..d5c1e16b69 100644 --- a/rust/cloud-storage/email_service/src/api/mod.rs +++ b/rust/cloud-storage/email_service/src/api/mod.rs @@ -14,6 +14,7 @@ mod email; // Misc pub(crate) mod context; pub(crate) mod gmail; +pub(crate) mod outlook; mod internal; mod middleware; pub(crate) mod swagger; @@ -56,6 +57,7 @@ fn api_router(state: ApiContext) -> Router { )), ) .nest("/gmail", gmail::router()) + .nest("/outlook", outlook::router()) .nest( "/internal", internal::router().layer( diff --git a/rust/cloud-storage/email_service/src/api/outlook/mod.rs b/rust/cloud-storage/email_service/src/api/outlook/mod.rs new file mode 100644 index 0000000000..4a90af7517 --- /dev/null +++ b/rust/cloud-storage/email_service/src/api/outlook/mod.rs @@ -0,0 +1,11 @@ +//! Outlook (Microsoft Graph) change-notification webhook. + +use crate::api::context::ApiContext; +use axum::{Router, routing::post}; + +pub(crate) mod webhook; + +/// Router for the Outlook webhook, nested under `/outlook` in the API. +pub fn router() -> Router { + Router::new().route("/webhook", post(webhook::webhook_handler)) +} diff --git a/rust/cloud-storage/email_service/src/api/outlook/webhook.rs b/rust/cloud-storage/email_service/src/api/outlook/webhook.rs new file mode 100644 index 0000000000..bbb4a2a2ad --- /dev/null +++ b/rust/cloud-storage/email_service/src/api/outlook/webhook.rs @@ -0,0 +1,106 @@ +//! Webhook that Microsoft Graph hits when an Outlook user's inbox changes. +//! +//! This is the Outlook analogue of [`crate::api::gmail::webhook`]. Two request +//! shapes arrive at this endpoint: +//! +//! 1. **Validation handshake** — when a subscription is created Graph issues a +//! request carrying a `validationToken` query parameter. We must echo it back +//! verbatim as `text/plain` with a 2xx within 10 seconds, otherwise the +//! subscription is rejected. +//! 2. **Change notifications** — a JSON batch of changes. We verify each +//! notification's `clientState` against our secret (the analogue of verifying +//! Gmail's Google-signed JWT) and then enqueue a delta-sync job. +//! +//! Reference: + +use crate::api::context::ApiContext; +use axum::extract::{Query, State}; +use axum::http::{StatusCode, header}; +use axum::response::{IntoResponse, Json, Response}; +use model::response::ErrorResponse; +use models_email::outlook::subscription::ChangeNotificationCollection; +use serde::Deserialize; +use std::collections::HashSet; + +#[derive(Debug, Deserialize)] +pub struct WebhookQuery { + /// Present only on the subscription validation handshake. + #[serde(rename = "validationToken")] + validation_token: Option, +} + +/// Handles both the Graph validation handshake and change notifications. +pub async fn webhook_handler( + State(ctx): State, + Query(query): Query, + body: String, +) -> Result { + // 1. Validation handshake: echo the token back as text/plain. + if let Some(token) = query.validation_token { + tracing::info!("responding to outlook subscription validation handshake"); + return Ok(( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/plain")], + token, + ) + .into_response()); + } + + // 2. Change notification batch. + let notifications: ChangeNotificationCollection = serde_json::from_str(&body).map_err(|e| { + tracing::error!(error=?e, "failed to deserialize outlook change notification"); + ( + StatusCode::BAD_REQUEST, + Json(ErrorResponse { + message: "invalid change notification payload".into(), + }), + ) + .into_response() + })?; + + // Verify clientState on every notification before acting on any of them. + // A mismatch means the notification did not originate from a subscription we + // created, so we reject the whole batch. + for notification in ¬ifications.value { + if !ctx + .outlook_client + .verify_client_state(notification.client_state.as_deref()) + { + tracing::warn!( + subscription_id = %notification.subscription_id, + "outlook notification failed clientState verification" + ); + return Err(( + StatusCode::UNAUTHORIZED, + Json(ErrorResponse { + message: "invalid client state".into(), + }), + ) + .into_response()); + } + } + + // Coalesce to unique subscriptions: a single delta sync per affected + // subscription subsumes every individual message change in the batch. + let subscription_ids: HashSet<&str> = notifications + .value + .iter() + .map(|n| n.subscription_id.as_str()) + .collect(); + + for subscription_id in subscription_ids { + // TODO(outlook): resolve the link for this subscription and enqueue an + // `InboxSyncOperation::OutlookNotification` onto the inbox-sync queue + // (reusing `enqueue_gmail_inbox_sync_notification`). The subscription -> + // link mapping is persisted when the subscription is created during inbox + // init; that persistence (a small `email_outlook_subscriptions` table + + // db_client lookups) is the next increment. Until then we log so the + // ingress and clientState verification can be exercised end-to-end. + tracing::info!( + subscription_id = %subscription_id, + "received verified outlook change notification (delta sync enqueue pending subscription->link persistence)" + ); + } + + Ok((StatusCode::ACCEPTED, Json(StatusCode::ACCEPTED.as_u16())).into_response()) +} diff --git a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs index 3dc73ebe11..b389936300 100644 --- a/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs +++ b/rust/cloud-storage/email_service/src/bin/pubsub_workers/pubsub_workers.rs @@ -188,6 +188,11 @@ async fn main() -> anyhow::Result<()> { let gmail_client = gmail_client::GmailClient::new(config.gmail_gcp_queue.clone()); + let outlook_client = outlook_client::OutlookClient::new( + config.outlook_notification_url.clone(), + config.outlook_client_state.clone(), + ); + let redis_inner_client = redis::Client::open(config.redis_uri.as_str()) .inspect(|client| { client @@ -300,6 +305,7 @@ async fn main() -> anyhow::Result<()> { let sqs_client_inbox_sync = sqs_client.clone(); let contacts_ingress_inbox_sync = contacts_ingress.clone(); let gmail_client_inbox_sync = gmail_client.clone(); + let outlook_client_inbox_sync = outlook_client.clone(); let auth_service_client_inbox_sync = auth_service_client.clone(); let redis_client_inbox_sync = redis_client.clone(); let notification_ingress_service_inbox_sync = notification_ingress_service.clone(); @@ -315,6 +321,7 @@ async fn main() -> anyhow::Result<()> { sqs_client_inbox_sync, contacts_ingress_inbox_sync, gmail_client_inbox_sync, + outlook_client_inbox_sync, auth_service_client_inbox_sync, redis_client_inbox_sync, notification_ingress_service_inbox_sync, @@ -340,6 +347,7 @@ async fn main() -> anyhow::Result<()> { let sqs_client_inbox_sync = sqs_client.clone(); let contacts_ingress_inbox_sync = contacts_ingress.clone(); let gmail_client_inbox_sync = gmail_client.clone(); + let outlook_client_inbox_sync = outlook_client.clone(); let auth_service_client_inbox_sync = auth_service_client.clone(); let redis_client_inbox_sync = redis_client.clone(); let notification_ingress_service_inbox_sync = notification_ingress_service.clone(); @@ -355,6 +363,7 @@ async fn main() -> anyhow::Result<()> { sqs_client_inbox_sync, contacts_ingress_inbox_sync, gmail_client_inbox_sync, + outlook_client_inbox_sync, auth_service_client_inbox_sync, redis_client_inbox_sync, notification_ingress_service_inbox_sync, diff --git a/rust/cloud-storage/email_service/src/config.rs b/rust/cloud-storage/email_service/src/config.rs index 28becc2b09..424aa324e8 100644 --- a/rust/cloud-storage/email_service/src/config.rs +++ b/rust/cloud-storage/email_service/src/config.rs @@ -42,6 +42,16 @@ pub struct Config { /// The GCP queue name that has the subscription that hits our webhook endpoint pub gmail_gcp_queue: String, + /// The public HTTPS URL Microsoft Graph delivers Outlook change + /// notifications to (our `/outlook/webhook` endpoint). Empty when Outlook + /// support is not configured for the environment. + pub outlook_notification_url: String, + + /// The opaque secret set as the Graph subscription `clientState` and verified + /// on every incoming Outlook notification. Empty when Outlook support is not + /// configured for the environment. + pub outlook_client_state: String, + /// The SQS queue name for notification-service pub notification_queue: String, @@ -198,6 +208,13 @@ impl Config { let gmail_gcp_queue = std::env::var("GMAIL_GCP_QUEUE").context("GMAIL_GCP_QUEUE must be provided")?; + // Outlook support is optional; these default to empty when unset so the + // service still boots in environments without Outlook configured. + let outlook_notification_url = + std::env::var("OUTLOOK_NOTIFICATION_URL").unwrap_or_default(); + + let outlook_client_state = std::env::var("OUTLOOK_CLIENT_STATE").unwrap_or_default(); + let notification_queue = std::env::var("NOTIFICATION_QUEUE").context("NOTIFICATION_QUEUE must be provided")?; @@ -355,6 +372,8 @@ impl Config { gmail_ops_retry_queue, search_event_queue, gmail_gcp_queue, + outlook_notification_url, + outlook_client_state, notification_queue, backfill_queue, sfs_uploader_queue, diff --git a/rust/cloud-storage/email_service/src/main.rs b/rust/cloud-storage/email_service/src/main.rs index fbed8fe0c4..e6dfc8514c 100644 --- a/rust/cloud-storage/email_service/src/main.rs +++ b/rust/cloud-storage/email_service/src/main.rs @@ -88,6 +88,11 @@ async fn main() -> anyhow::Result<()> { let gmail_client = gmail_client::GmailClient::new(config.gmail_gcp_queue.clone()); + let outlook_client = outlook_client::OutlookClient::new( + config.outlook_notification_url.clone(), + config.outlook_client_state.clone(), + ); + let redis_inner_client = redis::Client::open(config.redis_uri.as_str()) .inspect(|client| { client @@ -129,6 +134,7 @@ async fn main() -> anyhow::Result<()> { let sqs_client = Arc::new(sqs_client); let gmail_client = Arc::new(gmail_client); + let outlook_client = Arc::new(outlook_client); // HTTP API only reads CRM rows — populate runs in the pubsub // worker. The no-op resolver makes the unused branch explicit and // keeps reqwest/scraper out of this binary. @@ -169,6 +175,7 @@ async fn main() -> anyhow::Result<()> { sqs_client, sfs_client: Arc::new(sfs_client), gmail_client: gmail_client.clone(), + outlook_client: outlook_client.clone(), s3_client: Arc::new(s3_client), dss_client: Arc::new(dss_client), system_properties_service, diff --git a/rust/cloud-storage/email_service/src/pubsub/backfill/worker.rs b/rust/cloud-storage/email_service/src/pubsub/backfill/worker.rs index d91f93a231..7c74f279db 100644 --- a/rust/cloud-storage/email_service/src/pubsub/backfill/worker.rs +++ b/rust/cloud-storage/email_service/src/pubsub/backfill/worker.rs @@ -35,6 +35,9 @@ pub async fn run_worker( sqs_client, contacts_ingress, gmail_client, + // Backfill is Gmail-only today; supply a disabled Outlook client so the + // shared context type is satisfied without requiring Outlook config here. + outlook_client: outlook_client::OutlookClient::new(String::new(), String::new()), auth_service_client, redis_client, notification_ingress_service, diff --git a/rust/cloud-storage/email_service/src/pubsub/context.rs b/rust/cloud-storage/email_service/src/pubsub/context.rs index 9562b5605c..2b42a0738a 100644 --- a/rust/cloud-storage/email_service/src/pubsub/context.rs +++ b/rust/cloud-storage/email_service/src/pubsub/context.rs @@ -11,6 +11,7 @@ use crm::outbound::companies_repo::CompaniesRepositoryImpl; use crm::outbound::unfurl_resolver::UnfurlCompanyMetadataResolver; use document_storage_service_client::DocumentStorageServiceClient; use gmail_client::GmailClient; +use outlook_client::OutlookClient; use notification::domain::service::SqsNotificationIngress; use notification::outbound::queue::SqsQueue; use sqlx::PgPool; @@ -61,6 +62,7 @@ pub struct PubSubContext { pub sqs_client: sqs_client::SQS, pub contacts_ingress: Arc>, pub gmail_client: GmailClient, + pub outlook_client: OutlookClient, pub auth_service_client: AuthServiceClient, pub redis_client: RedisClient, pub notification_ingress_service: Arc, diff --git a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/mod.rs b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/mod.rs index 694cca1d37..f3d9645fd9 100644 --- a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/mod.rs +++ b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/mod.rs @@ -1,5 +1,6 @@ pub(in crate::pubsub) mod delete_message; pub(in crate::pubsub) mod gmail_message; +pub(in crate::pubsub) mod outlook_notification; pub(in crate::pubsub) mod shared; pub(in crate::pubsub) mod update_labels; pub(in crate::pubsub) mod upsert_message; diff --git a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/outlook_notification.rs b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/outlook_notification.rs new file mode 100644 index 0000000000..16f09e3cb0 --- /dev/null +++ b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/operations/outlook_notification.rs @@ -0,0 +1,137 @@ +//! Handles an Outlook change notification by running a Microsoft Graph delta +//! sync for the affected link. +//! +//! This is the Outlook analogue of +//! [`crate::pubsub::inbox_sync::operations::gmail_message`]: where the Gmail +//! handler walks the inbox *history* from a stored `historyId`, this handler +//! walks a *delta query* and reconciles the changed messages. +//! +//! Status: this performs the full read side of the sync — token fetch, folder → +//! system-label resolution, delta enumeration, and per-message fetch + convert +//! to the provider-agnostic [`Message`](models_email::service::message::Message). +//! Persisting the converted messages reuses the inbound upsert pipeline, which +//! is currently Gmail-specific; extracting a provider-agnostic upsert is the +//! next increment (see the TODO below). Likewise, the persisted delta link that +//! makes the sync incremental is part of that increment — until then we do a +//! bounded initial delta of the inbox each time, which is correct but not +//! minimal. + +use crate::pubsub::context::PubSubContext; +use models_email::gmail::inbox_sync::OutlookNotificationPayload; +use models_email::outlook::well_known_folder; +use models_email::service::link; +use models_email::service::pubsub::{DetailedError, FailureReason, ProcessingError}; +use std::collections::HashMap; + +#[tracing::instrument(skip(ctx))] +pub async fn outlook_notification( + ctx: &PubSubContext, + link: &link::Link, + payload: &OutlookNotificationPayload, +) -> Result<(), ProcessingError> { + let access_token = crate::util::outlook::auth::fetch_token_or_delete_on_revocation( + link, + &ctx.auth_service_client, + &ctx.sqs_client, + ) + .await + .map_err(|e| { + ProcessingError::NonRetryable(DetailedError { + reason: FailureReason::AccessTokenFetchFailed, + source: e.context("Failed to fetch Outlook access token"), + }) + })?; + + // Resolve folders so we can map each message's parent folder to a system + // label (INBOX / SENT / SPAM / TRASH / DRAFT). + let folders = ctx + .outlook_client + .list_folders(&access_token) + .await + .map_err(|e| { + ProcessingError::Retryable(DetailedError { + reason: FailureReason::GmailApiFailed, + source: anyhow::Error::new(e).context("Failed to list Outlook folders"), + }) + })?; + + let folder_labels: HashMap = folders + .iter() + .filter_map(|f| { + let label = f + .well_known_name + .as_deref() + .and_then(well_known_folder::to_system_label)?; + Some((f.id.clone(), label)) + }) + .collect(); + + let inbox_folder_id = folders + .iter() + .find(|f| { + f.well_known_name + .as_deref() + .is_some_and(|w| w.eq_ignore_ascii_case("inbox")) + }) + .map(|f| f.id.clone()) + .unwrap_or_else(|| "inbox".to_string()); + + // TODO(outlook): load the persisted `@odata.deltaLink` for this link/folder + // and call `delta_from_link` for a true incremental sync; persist the + // returned link afterwards. Until that store exists we run an initial delta. + let changes = ctx + .outlook_client + .initial_delta(&access_token, &inbox_folder_id) + .await + .map_err(|e| { + ProcessingError::Retryable(DetailedError { + reason: FailureReason::GmailApiFailed, + source: anyhow::Error::new(e).context("Failed to run Outlook delta sync"), + }) + })?; + + tracing::info!( + link_id = %link.id, + subscription_id = %payload.subscription_id, + to_upsert = changes.message_ids_to_upsert.len(), + to_delete = changes.message_ids_to_delete.len(), + "outlook delta sync computed changes" + ); + + // Fetch + convert each changed message. This exercises the full client → + // convert path; persistence is the next increment. + for message_id in &changes.message_ids_to_upsert { + let Some(resource) = ctx + .outlook_client + .get_message(&access_token, message_id) + .await + .map_err(|e| { + ProcessingError::Retryable(DetailedError { + reason: FailureReason::GmailApiFailed, + source: anyhow::Error::new(e).context("Failed to fetch Outlook message"), + }) + })? + else { + continue; + }; + + let folder_label = resource + .parent_folder_id + .as_deref() + .and_then(|fid| folder_labels.get(fid).copied()); + + let _message = + outlook_client::convert::map_message_resource_to_service(resource, link.id, folder_label); + + // TODO(outlook): persist `_message` via a provider-agnostic inbound + // upsert (extracted from `operations::upsert_message`), then notify + // search and run the CRM/notification fan-out as the Gmail path does. + tracing::debug!( + link_id = %link.id, + provider_message_id = %message_id, + "converted outlook message (persistence pending)" + ); + } + + Ok(()) +} diff --git a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/process.rs b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/process.rs index fdb14f6d4e..9ea695ab1e 100644 --- a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/process.rs +++ b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/process.rs @@ -2,6 +2,7 @@ use crate::pubsub::context::PubSubContext; use crate::pubsub::inbox_sync::error_handlers::prefix_error_source; use crate::pubsub::inbox_sync::operations::delete_message::delete_message; use crate::pubsub::inbox_sync::operations::gmail_message::gmail_message; +use crate::pubsub::inbox_sync::operations::outlook_notification::outlook_notification; use crate::pubsub::inbox_sync::operations::update_labels::update_labels; use crate::pubsub::inbox_sync::operations::upsert_message::upsert_message; use crate::util::redis::rate_limit::RateLimitArgs; @@ -88,6 +89,12 @@ async fn inner_process_message( .map_err(|e| prefix_error_source(e, "gmail_message"))?; tracing::debug!("Successfully processed gmail message operation"); } + InboxSyncOperation::OutlookNotification(payload) => { + outlook_notification(ctx, &link, payload) + .await + .map_err(|e| prefix_error_source(e, "outlook_notification"))?; + tracing::debug!("Successfully processed outlook notification operation"); + } InboxSyncOperation::UpsertMessage(payload) => { upsert_message(ctx, &link, payload) .await diff --git a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/worker.rs b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/worker.rs index f5b1564610..8cd282fd06 100644 --- a/rust/cloud-storage/email_service/src/pubsub/inbox_sync/worker.rs +++ b/rust/cloud-storage/email_service/src/pubsub/inbox_sync/worker.rs @@ -19,6 +19,7 @@ pub async fn run_worker( sqs_client: sqs_client::SQS, contacts_ingress: Arc>, gmail_client: gmail_client::GmailClient, + outlook_client: outlook_client::OutlookClient, auth_service_client: AuthServiceClient, redis_client: RedisClient, notification_ingress_service: Arc, @@ -36,6 +37,7 @@ pub async fn run_worker( sqs_client, contacts_ingress, gmail_client, + outlook_client, auth_service_client, redis_client, notification_ingress_service, diff --git a/rust/cloud-storage/email_service/src/util/mod.rs b/rust/cloud-storage/email_service/src/util/mod.rs index 3e3aff2840..e42a176f67 100644 --- a/rust/cloud-storage/email_service/src/util/mod.rs +++ b/rust/cloud-storage/email_service/src/util/mod.rs @@ -1,4 +1,5 @@ pub mod gmail; +pub mod outlook; pub mod process_pre_insert; pub mod redis; pub mod sync_contacts; diff --git a/rust/cloud-storage/email_service/src/util/outlook/auth.rs b/rust/cloud-storage/email_service/src/util/outlook/auth.rs new file mode 100644 index 0000000000..04af84389e --- /dev/null +++ b/rust/cloud-storage/email_service/src/util/outlook/auth.rs @@ -0,0 +1,71 @@ +//! Outlook access-token retrieval, mirroring [`crate::util::gmail::auth`]. +//! +//! Tokens are minted by the authentication service from the user's linked +//! Microsoft refresh token (`/internal/microsoft_access_token`). When the auth +//! service reports the link was revoked (HTTP 403 / `invalid_grant`) we enqueue +//! a link deletion, exactly as the Gmail path does. + +use authentication_service_client::{AuthServiceClient, error::AuthServiceClientError}; +use models_email::email::service::link::Link; +use models_email::email::service::pubsub::{DeletionReason, LinkManagerMessage}; +use sqs_client::SQS; + +/// Fetches an Outlook access token for the link, triggering link deletion if the +/// user revoked Macro's access. Intended for pubsub handlers; API handlers can +/// call [`fetch_outlook_access_token_from_link`] directly. +#[tracing::instrument(skip(auth_service_client, sqs_client))] +pub async fn fetch_token_or_delete_on_revocation( + link: &Link, + auth_service_client: &AuthServiceClient, + sqs_client: &SQS, +) -> anyhow::Result { + match fetch_outlook_access_token_from_link(link, auth_service_client).await { + Ok(token) => Ok(token), + Err(e) if is_forbidden_error(&e) => { + tracing::warn!( + link_id = %link.id, + fusionauth_user_id = %link.fusionauth_user_id, + "User revoked access to Outlook - enqueueing link deletion" + ); + + sqs_client + .enqueue_link_manager_notification(LinkManagerMessage::DeleteLink { + link_id: link.id, + deletion_reason: DeletionReason::AccessRevoked, + }) + .await + .inspect_err(|e| { + tracing::error!(error=?e, link_id=%link.id, "Failed to enqueue link deletion after detecting revoked Outlook access"); + }) + .ok(); + + Err(e) + } + Err(e) => Err(e), + } +} + +/// Checks whether an error chain contains a Forbidden error from the auth service. +fn is_forbidden_error(e: &anyhow::Error) -> bool { + e.chain().any(|cause| { + cause + .downcast_ref::() + .map(|e| matches!(e, AuthServiceClientError::Forbidden)) + .unwrap_or(false) + }) +} + +/// Fetches an Outlook (Microsoft Graph) access token for the given link from the +/// authentication service. +/// +/// TODO(outlook): add Redis caching with a provider-scoped `TokenCacheKey`, to +/// reach parity with the Gmail token path (`email::outbound::fetch_gmail_access_token`). +pub async fn fetch_outlook_access_token_from_link( + link: &Link, + auth_service_client: &AuthServiceClient, +) -> anyhow::Result { + let token = auth_service_client + .get_microsoft_access_token(&link.fusionauth_user_id, link.email_address.0.as_ref()) + .await?; + Ok(token.access_token) +} diff --git a/rust/cloud-storage/email_service/src/util/outlook/mod.rs b/rust/cloud-storage/email_service/src/util/outlook/mod.rs new file mode 100644 index 0000000000..3675128368 --- /dev/null +++ b/rust/cloud-storage/email_service/src/util/outlook/mod.rs @@ -0,0 +1,3 @@ +//! Outlook (Microsoft Graph) helpers for the email service. + +pub mod auth; From 4916d2a1d2e2865da1f6c326628da7d22d29f138 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 17:31:58 +0000 Subject: [PATCH 4/5] feat(app): add "Add Outlook inbox" connect flow Mirror the Gmail multi-inbox connect flow for Outlook in the settings UI: - service-auth client: initOutlookLink() hits POST /link/outlook and returns the Microsoft OAuth authorization URL (response shape inlined to avoid depending on a regenerated schema). - queries/auth: useInitOutlookLink mutation, mirroring useInitGmailLink. - settings/Account: an "Add Outlook inbox" button next to the Gmail one that starts the link flow and redirects to the authorization URL; the existing inbox-link callback then provisions the inbox (provider-agnostic). Verified: app package typechecks (tsc, 0 errors) and Biome passes on the changed files. --- .../app/component/settings/Account.tsx | 28 +++++++++++++++++-- js/app/packages/queries/auth/index.ts | 1 + js/app/packages/queries/auth/outlook-link.ts | 16 +++++++++++ .../service-clients/service-auth/client.ts | 21 ++++++++++++++ 4 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 js/app/packages/queries/auth/outlook-link.ts diff --git a/js/app/packages/app/component/settings/Account.tsx b/js/app/packages/app/component/settings/Account.tsx index 32fd12aa15..27fcab322a 100644 --- a/js/app/packages/app/component/settings/Account.tsx +++ b/js/app/packages/app/component/settings/Account.tsx @@ -58,7 +58,7 @@ import PaywallTeamMemberView from '../paywall/PaywallTeamMemberView'; import PaywallTeamOwnerView from '../paywall/PaywallTeamOwnerView'; import { ROUTER_BASE_CONCAT } from '@app/constants/routerBase'; import { useEmailLinks, useEmailLinksStatus } from '@core/email-link'; -import { useInitGmailLink } from '@queries/auth'; +import { useInitGmailLink, useInitOutlookLink } from '@queries/auth'; import { useRemoveInboxMutation } from '@queries/email/link'; import { type SupportedNotificationSettings, @@ -358,6 +358,17 @@ export function Account() { } }; + const initOutlookLink = useInitOutlookLink(); + const handleAddOutlookInbox = async () => { + const callbackUrl = `${window.location.origin}${ROUTER_BASE_CONCAT}inbox-link-callback`; + const result = await initOutlookLink.mutateAsync(callbackUrl); + if (result.isOk()) { + window.location.href = result.value.authorization_url; + } else { + toast.failure('Failed to start Outlook link flow'); + } + }; + const handleResyncInbox = async (linkId: string) => { setResyncingIds((prev) => new Set(prev).add(linkId)); await resyncInbox(linkId).match( @@ -653,17 +664,28 @@ export function Account() { } > - + + + + diff --git a/js/app/packages/queries/auth/index.ts b/js/app/packages/queries/auth/index.ts index 69aa772879..b852789f1e 100644 --- a/js/app/packages/queries/auth/index.ts +++ b/js/app/packages/queries/auth/index.ts @@ -8,6 +8,7 @@ export { useInitGmailLink } from './gmail-link'; export { authKeys } from './keys'; export { useSendMobileWelcomeEmail } from './mobile-welcome-email'; export {} from './mutations'; +export { useInitOutlookLink } from './outlook-link'; export type { UserInfoData } from './user-info'; export { normalizeUserNameQueryId, diff --git a/js/app/packages/queries/auth/outlook-link.ts b/js/app/packages/queries/auth/outlook-link.ts new file mode 100644 index 0000000000..0c1d16b118 --- /dev/null +++ b/js/app/packages/queries/auth/outlook-link.ts @@ -0,0 +1,16 @@ +import { authServiceClient } from '@service-auth/client'; +import { useMutation } from '@tanstack/solid-query'; + +/** + * Mutation that asks auth-service for the Microsoft OAuth authorization URL for + * adding an Outlook inbox to the already-authenticated user. Callers consume the + * `authorization_url` and navigate the browser to it. Mirrors + * {@link useInitGmailLink}. + */ +export function useInitOutlookLink() { + return useMutation(() => ({ + mutationFn: async (originalUrl: string) => { + return authServiceClient.initOutlookLink(originalUrl); + }, + })); +} diff --git a/js/app/packages/service-clients/service-auth/client.ts b/js/app/packages/service-clients/service-auth/client.ts index 31c554c640..5a09e2502c 100644 --- a/js/app/packages/service-clients/service-auth/client.ts +++ b/js/app/packages/service-clients/service-auth/client.ts @@ -572,6 +572,27 @@ export const authServiceClient = { ).map((result) => result); }, + /** + * Initializes an Outlook (Microsoft) account link for the already-authenticated + * user (multi-inbox flow). Mirrors {@link initGmailLink}: returns the OAuth + * authorization URL to redirect the browser to. After Microsoft consent, the + * user is redirected back to `originalUrl` with `?link_id=` appended; the + * frontend then calls `emailClient.init({ linkId })` to provision the inbox. + * + * The response shape matches `InitGmailLinkResponse`; it's inlined here so the + * client doesn't depend on a regenerated `InitOutlookLinkResponse` schema. + */ + async initOutlookLink(originalUrl?: string) { + const url = originalUrl + ? `${authHost}/link/outlook?original_url=${encodeURIComponent(originalUrl)}` + : `${authHost}/link/outlook`; + return ( + await fetchWithAuth<{ authorization_url: string; link_id: string }>(url, { + method: 'POST', + }) + ).map((result) => result); + }, + /** * Deletes a github link for a user * NOTE: this does not delete the github application from being installed on a teams repository From c02dad8e51a74dc62b857bc40c479d158a712b7d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 17:33:52 +0000 Subject: [PATCH 5/5] chore(infra): wire optional Outlook env vars into email-service stack Add OUTLOOK_NOTIFICATION_URL and OUTLOOK_CLIENT_STATE to the email-service task environment via optional config (config.get, empty default) so deploys keep working before the Outlook integration is provisioned. The service treats empty values as "Outlook disabled". Provisioning follow-ups (separate, as discussed): the Azure app registration + FusionAuth Microsoft IdP, the MICROSOFT_CLIENT_ID / MICROSOFT_CLIENT_SECRET_KEY for the auth service (Doppler-managed there), and sourcing OUTLOOK_CLIENT_STATE from Secrets Manager. --- infra/stacks/email-service/index.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/infra/stacks/email-service/index.ts b/infra/stacks/email-service/index.ts index ddd8eed201..e226c05e7b 100644 --- a/infra/stacks/email-service/index.ts +++ b/infra/stacks/email-service/index.ts @@ -85,6 +85,15 @@ const GMAIL_GCP_QUEUE = aws.secretsmanager .getSecretVersionOutput({ secretId: gmailGcpQueue }) .apply((secret) => secret.secretString); +// Optional Outlook (Microsoft Graph) webhook configuration. Empty until the +// Outlook integration is provisioned; the email service treats empty values as +// "Outlook disabled" and still boots. When provisioning, point +// OUTLOOK_NOTIFICATION_URL at the public /outlook/webhook endpoint and source +// OUTLOOK_CLIENT_STATE from Secrets Manager (the Graph subscription clientState +// secret). +const OUTLOOK_NOTIFICATION_URL = config.get(`outlook_notification_url`) ?? ''; +const OUTLOOK_CLIENT_STATE = config.get(`outlook_client_state`) ?? ''; + const jwtSecretKeyArn: pulumi.Output = aws.secretsmanager .getSecretVersionOutput({ secretId: JWT_SECRET_KEY }) .apply((secret) => secret.arn); @@ -438,6 +447,14 @@ const containerEnvVars = [ name: 'GMAIL_GCP_QUEUE', value: pulumi.interpolate`${GMAIL_GCP_QUEUE}`, }, + { + name: 'OUTLOOK_NOTIFICATION_URL', + value: OUTLOOK_NOTIFICATION_URL, + }, + { + name: 'OUTLOOK_CLIENT_STATE', + value: OUTLOOK_CLIENT_STATE, + }, { name: 'NOTIFICATION_QUEUE', value: pulumi.interpolate`${notificationIngressQueueName}`,