Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ export * from './refreshEmailEventOneOfFive';
export * from './refreshEmailEventOneOfFiveEvent';
export * from './refreshEmailEventOneOfNine';
export * from './refreshEmailEventOneOfNineEvent';
export * from './refreshEmailEventOneOfOneone';
export * from './refreshEmailEventOneOfOneoneEvent';
export * from './refreshEmailEventOneOfSeven';
export * from './refreshEmailEventOneOfSevenEvent';
export * from './refreshEmailEventOneOfThree';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import type { RefreshEmailEventOneOf } from './refreshEmailEventOneOf';
import type { RefreshEmailEventOneOfFive } from './refreshEmailEventOneOfFive';
import type { RefreshEmailEventOneOfNine } from './refreshEmailEventOneOfNine';
import type { RefreshEmailEventOneOfOneone } from './refreshEmailEventOneOfOneone';
import type { RefreshEmailEventOneOfSeven } from './refreshEmailEventOneOfSeven';
import type { RefreshEmailEventOneOfThree } from './refreshEmailEventOneOfThree';

Expand All @@ -19,4 +20,5 @@ export type RefreshEmailEvent =
| RefreshEmailEventOneOfThree
| RefreshEmailEventOneOfFive
| RefreshEmailEventOneOfSeven
| RefreshEmailEventOneOfNine;
| RefreshEmailEventOneOfNine
| RefreshEmailEventOneOfOneone;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Generated by orval v7.21.0 🍺
* Do not edit manually.
* email_service
* OpenAPI spec version: 0.1.0
*/
import type { RefreshEmailEventOneOfOneoneEvent } from './refreshEmailEventOneOfOneoneEvent';

/**
* The self-contact photo for `link_id` finished uploading to static
file storage, so the inbox's derived `photo_url` is now available.
*/
export type RefreshEmailEventOneOfOneone = {
event: RefreshEmailEventOneOfOneoneEvent;
link_id: string;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Generated by orval v7.21.0 🍺
* Do not edit manually.
* email_service
* OpenAPI spec version: 0.1.0
*/

export type RefreshEmailEventOneOfOneoneEvent =
(typeof RefreshEmailEventOneOfOneoneEvent)[keyof typeof RefreshEmailEventOneOfOneoneEvent];

// eslint-disable-next-line @typescript-eslint/no-redeclare
export const RefreshEmailEventOneOfOneoneEvent = {
photo_synced: 'photo_synced',
} as const;
15 changes: 15 additions & 0 deletions js/app/packages/service-clients/service-email/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -4657,6 +4657,21 @@
"format": "uuid"
}
}
},
{
"type": "object",
"description": "The self-contact photo for `link_id` finished uploading to static\nfile storage, so the inbox's derived `photo_url` is now available.",
"required": ["link_id", "event"],
"properties": {
"event": {
"type": "string",
"enum": ["photo_synced"]
},
"link_id": {
"type": "string",
"format": "uuid"
}
}
}
],
"description": "Payload for the `refresh_email` connection gateway event: identifies the\ninbox that changed and the kind of change."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,14 @@ async fn main() -> anyhow::Result<()> {
for worker in sfs_uploader_workers {
let db_sfs_uploader = db.clone();
let sfs_client_sfs_uploader = sfs_client.clone();
let connection_gateway_client_sfs_uploader = connection_gateway_client.clone();
// upload user contact images to sfs from contact sync
tokio::spawn(async move {
email_service::pubsub::sfs_uploader::worker::run_worker(
worker,
db_sfs_uploader,
sfs_client_sfs_uploader,
connection_gateway_client_sfs_uploader,
)
.await;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use connection_gateway_client::client::ConnectionGatewayClient;
use sqlx::PgPool;
use static_file_service_client::StaticFileServiceClient;

Expand All @@ -6,4 +7,5 @@ pub struct SFSUploaderContext {
pub db: PgPool,
pub sfs_client: StaticFileServiceClient,
pub sqs_worker: sqs_worker::SQSWorker,
pub connection_gateway_client: ConnectionGatewayClient,
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::pubsub::sfs_uploader::context::SFSUploaderContext;
use crate::pubsub::util::cg_refresh_email;
use crate::util::process_pre_insert::sfs_map::fetch_and_upload_to_sfs;
use anyhow::{Context, anyhow};
use aws_sdk_sqs::types::Message;
use models_email::api::refresh::RefreshEmailEvent;
use models_email::service::pubsub::SFSUploaderMessage;
use sqs_worker::cleanup_message;
use std::collections::HashMap;
use uuid::Uuid;

// upload user photo_url to SFS and add url to database
pub async fn process_message(ctx: SFSUploaderContext, message: &Message) -> anyhow::Result<()> {
Expand Down Expand Up @@ -34,17 +37,53 @@ pub async fn process_message(ctx: SFSUploaderContext, message: &Message) -> anyh
// update contact's photo url to new SFS url and upsert entry in database
contact.sfs_photo_url = Some(sfs_url);

if let Err(err) =
email_db_client::contacts::upsert_sync::upsert_contacts(&ctx.db, &[contact]).await
{
tracing::error!(error = ?err, "Unable to upsert contact");
let link_id = contact.link_id;
let contact_email = contact.email_address.clone();

match email_db_client::contacts::upsert_sync::upsert_contacts(&ctx.db, &[contact]).await {
Ok(_) => notify_if_self_contact(&ctx, link_id, contact_email.as_deref()).await,
Err(err) => tracing::error!(error = ?err, "Unable to upsert contact"),
}

cleanup_message(&ctx.sqs_worker, message).await?;

Ok(())
}

/// Emit `PhotoSynced` only when the uploaded contact is the inbox's own
/// self-contact, i.e. its email matches the link's inbox address. The worker
/// also uploads correspondent and attachment images, which share no email with
/// the inbox and must not signal that the inbox's own photo changed.
async fn notify_if_self_contact(
ctx: &SFSUploaderContext,
link_id: Uuid,
contact_email: Option<&str>,
) {
let Some(contact_email) = contact_email else {
return;
};

let link = match email_db_client::links::get::fetch_link_by_id(&ctx.db, link_id).await {
Ok(Some(link)) => link,
Ok(None) => return,
Err(e) => {
tracing::error!(error = ?e, link_id = %link_id, "Failed to fetch link for photo sync");
return;
}
};

if !contact_email.eq_ignore_ascii_case(link.email_address.0.as_ref()) {
return;
}

cg_refresh_email(
&ctx.connection_gateway_client,
link.macro_id.as_ref(),
RefreshEmailEvent::PhotoSynced { link_id },
)
.await;
}

/// Deserializes the SQS message body into a SfsUploaderMessage struct.
#[tracing::instrument(skip(message))]
fn extract_sfs_upload_notification(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::pubsub::sfs_uploader::context::SFSUploaderContext;
use crate::pubsub::sfs_uploader::process;
use connection_gateway_client::client::ConnectionGatewayClient;
use futures::StreamExt;
use sqlx::PgPool;
use static_file_service_client::StaticFileServiceClient;
Expand All @@ -9,11 +10,13 @@ pub async fn run_worker(
worker: sqs_worker::SQSWorker,
db: PgPool,
sfs_client: StaticFileServiceClient,
connection_gateway_client: ConnectionGatewayClient,
) {
let ctx = SFSUploaderContext {
db,
sfs_client,
sqs_worker: worker.clone(),
connection_gateway_client,
};
loop {
let worker_result = tokio::spawn({
Expand Down
3 changes: 3 additions & 0 deletions rust/cloud-storage/models_email/src/email/api/refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ pub enum RefreshEmailEvent {
DeleteMessage { link_id: Uuid },
/// The inbox `link_id` was removed and its data torn down.
LinkRemoved { link_id: Uuid },
/// The self-contact photo for `link_id` finished uploading to static
/// file storage, so the inbox's derived `photo_url` is now available.
PhotoSynced { link_id: Uuid },
}
Loading