diff --git a/js/app/packages/service-clients/service-cognition/generated/client.ts b/js/app/packages/service-clients/service-cognition/generated/client.ts index e8b67cc0bd..6ae28060da 100644 --- a/js/app/packages/service-clients/service-cognition/generated/client.ts +++ b/js/app/packages/service-clients/service-cognition/generated/client.ts @@ -1362,10 +1362,10 @@ export const startMcpAuth = async ( }; /** - * Returns the current memory if one exists. If the memory is stale or missing, -a background generation is triggered and the endpoint returns the stale -memory (200) or 404 if none exists yet. - * @summary Get the authenticated user's latest memory. + * Returns whichever memories currently exist. If either memory is stale or +missing, a background generation for it is triggered and the endpoint +returns the stale values (200), or 404 if neither exists yet. + * @summary Get the authenticated user's latest personal and team memories. */ export type getMemoryHandlerResponse200 = { data: MemoryResponse; diff --git a/js/app/packages/service-clients/service-cognition/generated/schemas/index.ts b/js/app/packages/service-clients/service-cognition/generated/schemas/index.ts index d058d3e6c1..cd7a724d24 100644 --- a/js/app/packages/service-clients/service-cognition/generated/schemas/index.ts +++ b/js/app/packages/service-clients/service-cognition/generated/schemas/index.ts @@ -115,6 +115,8 @@ export * from './jwtPayload'; export * from './mcpAuthCallbackParams'; export * from './memoryErrorBody'; export * from './memoryResponse'; +export * from './memoryResponseMemory'; +export * from './memoryResponseTeamMemory'; export * from './messageWithAttachments'; export * from './newAttachment'; export * from './newChatMessage'; diff --git a/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponse.ts b/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponse.ts index 5d14e7deb7..e20d06de9f 100644 --- a/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponse.ts +++ b/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponse.ts @@ -4,11 +4,16 @@ * Document Cognition Service * OpenAPI spec version: 1.0.0 */ +import type { MemoryResponseMemory } from './memoryResponseMemory'; +import type { MemoryResponseTeamMemory } from './memoryResponseTeamMemory'; /** - * The user's latest memory. + * The user's latest memories. */ export interface MemoryResponse { - /** The generated memory text. */ - memory: string; + /** The user's personal memory, if one has been generated. */ + memory?: MemoryResponseMemory; + /** The latest memory of the user's team, if the user belongs to a team +and a team memory has been generated. */ + team_memory?: MemoryResponseTeamMemory; } diff --git a/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponseMemory.ts b/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponseMemory.ts new file mode 100644 index 0000000000..c29575d26f --- /dev/null +++ b/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponseMemory.ts @@ -0,0 +1,11 @@ +/** + * Generated by orval v7.21.0 🍺 + * Do not edit manually. + * Document Cognition Service + * OpenAPI spec version: 1.0.0 + */ + +/** + * The user's personal memory, if one has been generated. + */ +export type MemoryResponseMemory = string | null; diff --git a/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponseTeamMemory.ts b/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponseTeamMemory.ts new file mode 100644 index 0000000000..3cd5ebd122 --- /dev/null +++ b/js/app/packages/service-clients/service-cognition/generated/schemas/memoryResponseTeamMemory.ts @@ -0,0 +1,12 @@ +/** + * Generated by orval v7.21.0 🍺 + * Do not edit manually. + * Document Cognition Service + * OpenAPI spec version: 1.0.0 + */ + +/** + * The latest memory of the user's team, if the user belongs to a team +and a team memory has been generated. + */ +export type MemoryResponseTeamMemory = string | null; diff --git a/js/app/packages/service-clients/service-cognition/openapi.json b/js/app/packages/service-clients/service-cognition/openapi.json index e514f792a8..da5c9c3239 100644 --- a/js/app/packages/service-clients/service-cognition/openapi.json +++ b/js/app/packages/service-clients/service-cognition/openapi.json @@ -1166,12 +1166,12 @@ "/memory": { "get": { "tags": ["memory"], - "summary": "Get the authenticated user's latest memory.", - "description": "Returns the current memory if one exists. If the memory is stale or missing,\na background generation is triggered and the endpoint returns the stale\nmemory (200) or 404 if none exists yet.", + "summary": "Get the authenticated user's latest personal and team memories.", + "description": "Returns whichever memories currently exist. If either memory is stale or\nmissing, a background generation for it is triggered and the endpoint\nreturns the stale values (200), or 404 if neither exists yet.", "operationId": "get_memory_handler", "responses": { "200": { - "description": "Latest memory for the user", + "description": "Latest personal and team memories for the user", "content": { "application/json": { "schema": { @@ -1181,7 +1181,7 @@ } }, "404": { - "description": "No memory exists for this user yet (generation triggered)" + "description": "No memory exists for this user or their team yet (generation triggered)" }, "500": { "description": "Internal server error", @@ -2784,12 +2784,15 @@ }, "MemoryResponse": { "type": "object", - "description": "The user's latest memory.", - "required": ["memory"], + "description": "The user's latest memories.", "properties": { "memory": { - "type": "string", - "description": "The generated memory text." + "type": ["string", "null"], + "description": "The user's personal memory, if one has been generated." + }, + "team_memory": { + "type": ["string", "null"], + "description": "The latest memory of the user's team, if the user belongs to a team\nand a team memory has been generated." } } }, diff --git a/rust/cloud-storage/.sqlx/query-2bacd87def214ac03a40e3b5858a67bc5e7f96ca1d145e65f0c6e3f2787cf227.json b/rust/cloud-storage/.sqlx/query-2bacd87def214ac03a40e3b5858a67bc5e7f96ca1d145e65f0c6e3f2787cf227.json new file mode 100644 index 0000000000..6d5f0a65b9 --- /dev/null +++ b/rust/cloud-storage/.sqlx/query-2bacd87def214ac03a40e3b5858a67bc5e7f96ca1d145e65f0c6e3f2787cf227.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT memory, updated_at as \"updated_at!\"\n FROM memory\n WHERE team_id = $1\n ORDER BY updated_at DESC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "memory", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "updated_at!", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "2bacd87def214ac03a40e3b5858a67bc5e7f96ca1d145e65f0c6e3f2787cf227" +} diff --git a/rust/cloud-storage/.sqlx/query-b101ce64403b1af489e1cafdd07cbfc71039effb4d2869606534765bce7b17fc.json b/rust/cloud-storage/.sqlx/query-5a427312ded69610ede4679d54cefaf14c6da39d8e7a2ade3b370343002be82c.json similarity index 63% rename from rust/cloud-storage/.sqlx/query-b101ce64403b1af489e1cafdd07cbfc71039effb4d2869606534765bce7b17fc.json rename to rust/cloud-storage/.sqlx/query-5a427312ded69610ede4679d54cefaf14c6da39d8e7a2ade3b370343002be82c.json index 3cf399f742..6a5026c87a 100644 --- a/rust/cloud-storage/.sqlx/query-b101ce64403b1af489e1cafdd07cbfc71039effb4d2869606534765bce7b17fc.json +++ b/rust/cloud-storage/.sqlx/query-5a427312ded69610ede4679d54cefaf14c6da39d8e7a2ade3b370343002be82c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO memory (id, user_id, memory)\n VALUES ($1, $2, $3)\n ON CONFLICT (user_id) DO UPDATE\n SET memory = EXCLUDED.memory,\n updated_at = NOW()\n RETURNING id\n ", + "query": "\n INSERT INTO memory (id, user_id, memory)\n VALUES ($1, $2, $3)\n ON CONFLICT (user_id) WHERE user_id IS NOT NULL DO UPDATE\n SET memory = EXCLUDED.memory,\n updated_at = NOW()\n RETURNING id\n ", "describe": { "columns": [ { @@ -20,5 +20,5 @@ false ] }, - "hash": "b101ce64403b1af489e1cafdd07cbfc71039effb4d2869606534765bce7b17fc" + "hash": "5a427312ded69610ede4679d54cefaf14c6da39d8e7a2ade3b370343002be82c" } diff --git a/rust/cloud-storage/.sqlx/query-91bfe5ec5b5f8eadaa5a149987c59081daf7cd5fb66aa4d8299f39c96e402d80.json b/rust/cloud-storage/.sqlx/query-91bfe5ec5b5f8eadaa5a149987c59081daf7cd5fb66aa4d8299f39c96e402d80.json new file mode 100644 index 0000000000..b887c7eb0f --- /dev/null +++ b/rust/cloud-storage/.sqlx/query-91bfe5ec5b5f8eadaa5a149987c59081daf7cd5fb66aa4d8299f39c96e402d80.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT t.name,\n ARRAY_REMOVE(ARRAY_AGG(tu.user_id), NULL) as \"member_ids!\"\n FROM team t\n LEFT JOIN team_user tu ON tu.team_id = t.id\n WHERE t.id = $1\n GROUP BY t.id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "name", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "member_ids!", + "type_info": "TextArray" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + null + ] + }, + "hash": "91bfe5ec5b5f8eadaa5a149987c59081daf7cd5fb66aa4d8299f39c96e402d80" +} diff --git a/rust/cloud-storage/.sqlx/query-bdbe32af4a2c3138e6853ec1d541abbce728b93591e34fb09d9ce8bfa19729c1.json b/rust/cloud-storage/.sqlx/query-bdbe32af4a2c3138e6853ec1d541abbce728b93591e34fb09d9ce8bfa19729c1.json new file mode 100644 index 0000000000..09e06e9d1c --- /dev/null +++ b/rust/cloud-storage/.sqlx/query-bdbe32af4a2c3138e6853ec1d541abbce728b93591e34fb09d9ce8bfa19729c1.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO memory (id, team_id, memory)\n VALUES ($1, $2, $3)\n ON CONFLICT (team_id) WHERE team_id IS NOT NULL DO UPDATE\n SET memory = EXCLUDED.memory,\n updated_at = NOW()\n RETURNING id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid", + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "bdbe32af4a2c3138e6853ec1d541abbce728b93591e34fb09d9ce8bfa19729c1" +} diff --git a/rust/cloud-storage/.sqlx/query-d932a832244139cad77a3dcfb0b11ff406ea348014419a970a7a4ea5b7a455b0.json b/rust/cloud-storage/.sqlx/query-d932a832244139cad77a3dcfb0b11ff406ea348014419a970a7a4ea5b7a455b0.json new file mode 100644 index 0000000000..94cf279a56 --- /dev/null +++ b/rust/cloud-storage/.sqlx/query-d932a832244139cad77a3dcfb0b11ff406ea348014419a970a7a4ea5b7a455b0.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT memory\n FROM memory\n WHERE id = $1 AND team_id = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "memory", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "d932a832244139cad77a3dcfb0b11ff406ea348014419a970a7a4ea5b7a455b0" +} diff --git a/rust/cloud-storage/.sqlx/query-ea3b56b003d40d52eb5fe2e247fa38e7cfbcff3def9db49e5d9bad83207a3a48.json b/rust/cloud-storage/.sqlx/query-ea3b56b003d40d52eb5fe2e247fa38e7cfbcff3def9db49e5d9bad83207a3a48.json new file mode 100644 index 0000000000..9bf47379af --- /dev/null +++ b/rust/cloud-storage/.sqlx/query-ea3b56b003d40d52eb5fe2e247fa38e7cfbcff3def9db49e5d9bad83207a3a48.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT pg_try_advisory_lock(hashtextextended('team_memory:' || $1, 0)) as \"locked!\"", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "locked!", + "type_info": "Bool" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + null + ] + }, + "hash": "ea3b56b003d40d52eb5fe2e247fa38e7cfbcff3def9db49e5d9bad83207a3a48" +} diff --git a/rust/cloud-storage/document_cognition_service/src/api/context/mod.rs b/rust/cloud-storage/document_cognition_service/src/api/context/mod.rs index d7b2cef35b..75dcd2a73a 100644 --- a/rust/cloud-storage/document_cognition_service/src/api/context/mod.rs +++ b/rust/cloud-storage/document_cognition_service/src/api/context/mod.rs @@ -51,8 +51,10 @@ mod test; pub use test::test_api_context; pub(crate) type NotificationIngressType = SqsNotificationIngress; -pub type DcsMemoryService = - memory::domain::service::MemoryServiceImpl; +pub type DcsMemoryService = memory::domain::service::MemoryServiceImpl< + memory::outbound::pg_memory_repo::PgMemoryRepo, + memory::outbound::pg_team_memory_repo::PgTeamMemoryRepo, +>; /// Concrete MCP router state for DCS. pub type DcsMcpRouterState = mcp_client::inbound::McpRouterState< diff --git a/rust/cloud-storage/document_cognition_service/src/api/context/test.rs b/rust/cloud-storage/document_cognition_service/src/api/context/test.rs index bae5de0480..e7c15063a4 100644 --- a/rust/cloud-storage/document_cognition_service/src/api/context/test.rs +++ b/rust/cloud-storage/document_cognition_service/src/api/context/test.rs @@ -335,9 +335,12 @@ pub async fn test_api_context(pool: sqlx::Pool) -> std::sync::Ar Arc::new(all_tools.prompt.to_string()); let memory_repo = memory::outbound::pg_memory_repo::PgMemoryRepo::new(pool.clone()); + let team_memory_repo = + memory::outbound::pg_team_memory_repo::PgTeamMemoryRepo::new(pool.clone()); let memory_service = Arc::new(memory::domain::service::MemoryServiceImpl::new( pool.clone(), memory_repo, + team_memory_repo, tool_service_context.clone(), all_tools, )); diff --git a/rust/cloud-storage/document_cognition_service/src/api/stream/chat_message/mod.rs b/rust/cloud-storage/document_cognition_service/src/api/stream/chat_message/mod.rs index eca85e96df..10c7b6e64d 100644 --- a/rust/cloud-storage/document_cognition_service/src/api/stream/chat_message/mod.rs +++ b/rust/cloud-storage/document_cognition_service/src/api/stream/chat_message/mod.rs @@ -280,14 +280,14 @@ async fn send_chat_message_inner( .filter_map(|r| r.parts) .collect(); - // Fetch user memory (triggers background generation if stale/missing) - let user_memory = ctx + // Fetch the user's personal and team memories (triggers background + // generation of whichever is stale or missing) + let memories = ctx .memory_service .get_or_generate_memory((*user_id).clone()) .await - .inspect_err(|e| tracing::error!(error = ?e, "failed to fetch user memory")) - .ok() - .flatten(); + .inspect_err(|e| tracing::error!(error = ?e, "failed to fetch memories")) + .unwrap_or_default(); // Build the chat messages let tools_prompt = choose_tools_prompt(&payload, &*ctx.all_tools_prompt); @@ -321,11 +321,16 @@ async fn send_chat_message_inner( .as_deref() .unwrap_or_default(); let mut prompt = format!("{}\n{}", tools_prompt, additional); - if let Some(memory) = user_memory.as_deref() { + if let Some(memory) = memories.user.as_deref() { prompt.push_str("\n\n\n"); prompt.push_str(memory); prompt.push_str("\n"); } + if let Some(memory) = memories.team.as_deref() { + prompt.push_str("\n\n\n"); + prompt.push_str(memory); + prompt.push_str("\n"); + } prompt }; diff --git a/rust/cloud-storage/document_cognition_service/src/main.rs b/rust/cloud-storage/document_cognition_service/src/main.rs index ce2eabeaba..56b18e249d 100644 --- a/rust/cloud-storage/document_cognition_service/src/main.rs +++ b/rust/cloud-storage/document_cognition_service/src/main.rs @@ -391,11 +391,13 @@ async fn main() -> anyhow::Result<()> { let all_tools_prompt: Arc = Arc::new(all_tools.prompt.to_string()); - // Build memory service + // Build memory service (personal + team memory) let memory_repo = memory::outbound::pg_memory_repo::PgMemoryRepo::new(db.clone()); + let team_memory_repo = memory::outbound::pg_team_memory_repo::PgTeamMemoryRepo::new(db.clone()); let memory_service = Arc::new(memory::domain::service::MemoryServiceImpl::new( db.clone(), memory_repo, + team_memory_repo, tool_service_context.clone(), all_tools, )); diff --git a/rust/cloud-storage/macro_db_client/migrations/20260610212946_add_team_id_to_memory_table.sql b/rust/cloud-storage/macro_db_client/migrations/20260610212946_add_team_id_to_memory_table.sql new file mode 100644 index 0000000000..0f4b910966 --- /dev/null +++ b/rust/cloud-storage/macro_db_client/migrations/20260610212946_add_team_id_to_memory_table.sql @@ -0,0 +1,11 @@ +-- Team-level memory lives in the existing memory table: each row is scoped to +-- either a user (personal memory) or a team (team memory), exactly one of the two. +ALTER TABLE memory ALTER COLUMN user_id DROP NOT NULL; +ALTER TABLE memory ADD COLUMN team_id UUID REFERENCES team (id) ON DELETE CASCADE; +ALTER TABLE memory ADD CONSTRAINT memory_user_or_team CHECK (num_nonnulls(user_id, team_id) = 1); + +-- Replace the plain unique constraint with partial unique indexes so both +-- scopes keep one-row-per-owner upsert semantics. +ALTER TABLE memory DROP CONSTRAINT memory_user_id_unique; +CREATE UNIQUE INDEX memory_user_id_unique ON memory (user_id) WHERE user_id IS NOT NULL; +CREATE UNIQUE INDEX memory_team_id_unique ON memory (team_id) WHERE team_id IS NOT NULL; diff --git a/rust/cloud-storage/memory/src/domain/mod.rs b/rust/cloud-storage/memory/src/domain/mod.rs index fa3d23c5b1..833997d6c7 100644 --- a/rust/cloud-storage/memory/src/domain/mod.rs +++ b/rust/cloud-storage/memory/src/domain/mod.rs @@ -1,4 +1,7 @@ pub mod ports; pub mod service; -pub use ports::{Memory, MemoryError, MemoryRecord, MemoryRepo, MemoryService, Result}; +pub use ports::{ + Memories, Memory, MemoryError, MemoryRecord, MemoryRepo, MemoryService, Result, TeamMemoryRepo, + TeamOverview, +}; diff --git a/rust/cloud-storage/memory/src/domain/ports.rs b/rust/cloud-storage/memory/src/domain/ports.rs index 45f87ef05d..80149935f4 100644 --- a/rust/cloud-storage/memory/src/domain/ports.rs +++ b/rust/cloud-storage/memory/src/domain/ports.rs @@ -48,9 +48,63 @@ pub trait MemoryRepo: Send + Sync + 'static { ) -> impl Future> + Send; } +/// The memories available to a user: their own and their team's. +#[derive(Debug, Default)] +pub struct Memories { + /// The user's personal memory. + pub user: Option, + /// The latest memory of the team the user belongs to, if any. + pub team: Option, +} + pub trait MemoryService: Send + Sync + 'static { + /// Get the user's personal memory and the memory of their team, triggering + /// background regeneration of whichever is stale or missing. fn get_or_generate_memory( &self, user: MacroUserIdStr<'static>, - ) -> impl Future>> + Send; + ) -> impl Future> + Send; +} + +/// A snapshot of team data used to ground team memory generation. +#[derive(Debug)] +pub struct TeamOverview { + /// The team's display name. + pub name: String, + /// Macro user ids of the team's members. + pub member_ids: Vec, +} + +/// Persistence for team-scoped memory. +/// +/// Team memory lives in the same `memory` table as personal memory but is +/// keyed by `team_id` rather than `user_id`. +pub trait TeamMemoryRepo: Send + Sync + 'static { + /// Upsert the team's memory, returning the row id. + fn save_team_memory( + &self, + memory: &Memory, + team_id: Uuid, + ) -> impl Future> + Send; + /// Fetch the team's latest memory, or `None` if it has none yet. + fn get_latest_team_memory( + &self, + team_id: Uuid, + ) -> impl Future>> + Send; + /// Fetch a specific team memory row by id, scoped to the team. + fn get_team_memory_by_id( + &self, + team_id: Uuid, + id: Uuid, + ) -> impl Future> + Send; + /// Resolve the team the user belongs to, if any. + fn get_user_team_id( + &self, + user: MacroUserIdStr, + ) -> impl Future>> + Send; + /// Fetch the team's name and member list, or `None` if the team does not exist. + fn get_team_overview( + &self, + team_id: Uuid, + ) -> impl Future>> + Send; } diff --git a/rust/cloud-storage/memory/src/domain/service.rs b/rust/cloud-storage/memory/src/domain/service.rs index fac4a1cf2b..1c0bf7b519 100644 --- a/rust/cloud-storage/memory/src/domain/service.rs +++ b/rust/cloud-storage/memory/src/domain/service.rs @@ -1,3 +1,5 @@ +mod team; + use super::ports::*; use agent::types::{ChatMessage, ChatMessageContent, Role}; use agent::{AgentLoop, AgentModel, StreamPart}; @@ -67,23 +69,26 @@ struct MemoryJudgement { reason: String, } -pub struct MemoryServiceImpl { +pub struct MemoryServiceImpl { db: sqlx::PgPool, memory_repo: Rpo, + team_memory_repo: TRpo, tool_context: ToolServiceContext, tools: ToolSetWithPrompt, } -impl MemoryServiceImpl { +impl MemoryServiceImpl { pub fn new( db: sqlx::PgPool, memory_repo: Rpo, + team_memory_repo: TRpo, tool_context: ToolServiceContext, tools: ToolSetWithPrompt, ) -> Self { Self { db, memory_repo, + team_memory_repo, tool_context, tools, } @@ -93,14 +98,50 @@ impl MemoryServiceImpl { /// Default max age for memory freshness (1 day). const MAX_AGE: std::time::Duration = std::time::Duration::from_hours(24); -impl MemoryService for MemoryServiceImpl +impl MemoryService for MemoryServiceImpl where Rpo: MemoryRepo, + TRpo: TeamMemoryRepo, { #[tracing::instrument(skip(self), err)] async fn get_or_generate_memory( &self, user: macro_user_id::user_id::MacroUserIdStr<'static>, + ) -> super::Result { + let (user_memory, team_memory) = tokio::join!( + self.get_or_generate_user_memory(user.clone()), + self.get_or_generate_team_memory(user) + ); + + // Return whichever scope(s) resolved: a transient failure in one must + // not discard the other's result. Only surface an error if both failed. + match (user_memory, team_memory) { + (Ok(user), Ok(team)) => Ok(Memories { user, team }), + (Ok(user), Err(e)) => { + tracing::error!(error = ?e, "failed to load team memory"); + Ok(Memories { user, team: None }) + } + (Err(e), Ok(team)) => { + tracing::error!(error = ?e, "failed to load user memory"); + Ok(Memories { user: None, team }) + } + (Err(user_err), Err(team_err)) => { + tracing::error!(error = ?team_err, "failed to load team memory"); + Err(user_err) + } + } + } +} + +impl MemoryServiceImpl +where + Rpo: MemoryRepo, + TRpo: TeamMemoryRepo, +{ + #[tracing::instrument(skip(self), err)] + async fn get_or_generate_user_memory( + &self, + user: macro_user_id::user_id::MacroUserIdStr<'static>, ) -> super::Result> { let record = self.memory_repo.get_latest_memory(user.clone()).await?; @@ -122,8 +163,10 @@ where Box::new(self.tools.prompt.to_string()); tokio::spawn(async move { let repo = crate::outbound::pg_memory_repo::PgMemoryRepo::new(pool.clone()); + let team_repo = + crate::outbound::pg_team_memory_repo::PgTeamMemoryRepo::new(pool.clone()); let tools = ToolSetWithPrompt { toolset, prompt }; - let svc = MemoryServiceImpl::new(pool, repo, tool_context, tools); + let svc = MemoryServiceImpl::new(pool, repo, team_repo, tool_context, tools); match svc.generate_memory(user.clone(), previous_memory).await { Ok(_) => tracing::info!(%user, "memory generated"), Err(MemoryError::Rejected(reason)) => { @@ -136,12 +179,7 @@ where Ok(record.map(|r| r.memory)) } -} -impl MemoryServiceImpl -where - Rpo: MemoryRepo, -{ #[tracing::instrument(skip(self), err)] async fn generate_memory( &self, @@ -193,7 +231,7 @@ where } // 2nd pass: judge the memory quality - judge_memory(&memory).await?; + judge_memory(JUDGE_PROMPT, &memory).await?; self.memory_repo.save_memory(&memory, user).await?; Ok(memory) @@ -218,8 +256,8 @@ fn build_generation_system_prompt( prompt } -#[tracing::instrument(skip(memory), err)] -async fn judge_memory(memory: &str) -> super::Result<()> { +#[tracing::instrument(skip(judge_prompt, memory), err)] +pub(crate) async fn judge_memory(judge_prompt: &str, memory: &str) -> super::Result<()> { let user_message = format!( "Evaluate this memory and respond with ONLY a JSON object \ (no markdown, no code fences):\n\ @@ -227,7 +265,7 @@ async fn judge_memory(memory: &str) -> super::Result<()> { ---\n\n{memory}" ); - let response = agent::complete(JUDGE_MODEL, JUDGE_PROMPT, &user_message) + let response = agent::complete(JUDGE_MODEL, judge_prompt, &user_message) .await .map_err(|e| anyhow::anyhow!(e))?; diff --git a/rust/cloud-storage/memory/src/domain/service/team.rs b/rust/cloud-storage/memory/src/domain/service/team.rs new file mode 100644 index 0000000000..baf5505c3f --- /dev/null +++ b/rust/cloud-storage/memory/src/domain/service/team.rs @@ -0,0 +1,282 @@ +#[cfg(test)] +mod test; + +use super::{GENERATION_MODEL, MAX_AGE, MemoryServiceImpl, judge_memory}; +use crate::domain::ports::*; +use agent::types::{ChatMessage, ChatMessageContent, Role}; +use agent::{AgentLoop, StreamPart}; +use ai_tools::ToolSetWithPrompt; +use chrono::Utc; +use futures::stream::StreamExt; +use macro_env::Environment; +use macro_user_id::user_id::MacroUserIdStr; +use macro_uuid::Uuid; +use std::sync::Arc; + +static GENERATE_TEAM_MEMORY_PROMPT: &str = "\ +Use tool calls to research the team identified in the system prompt: what the \ +team does, who is on it, and what it is working on. Look at shared projects, \ +documents, channels, and emails, and search for content created by team members. + +Then generate a ~1000-3000 word memory about the team that will be prepended to \ +future prompts for every member of the team. Focus on: +- What the team/company does and who its customers are +- Team members, their roles, and areas of ownership +- Current projects, priorities, and deadlines +- Shared domain knowledge, terminology, and conventions +- Recurring processes and how the team communicates and works together + +Only include team-level context that is useful to every member of the team. \ +Do not include personal details about individual members beyond their role on \ +the team. + +If a previous team memory is provided in the system prompt, use it as the \ +baseline for the new memory. Preserve still-accurate durable facts, verify and \ +update it with fresh tool research, add important new context, and remove \ +obsolete or unsupported details. + +Don't include things that would make sense to find via tool search at runtime. \ +Focus on context that is useful as permanent background knowledge. + +CRITICAL: Your response must contain ONLY the memory text. \ +No preamble, no postscript, no commentary, no \"Let me...\", no \"Here is...\". \ +Do not narrate your research process. Do not address the user. \ +Just output the raw memory text starting with the first substantive line."; + +static TEAM_JUDGE_PROMPT: &str = "\ +You are a strict quality judge for AI-generated team memory profiles. + +A \"team memory\" is a ~1000-3000 word summary of a team prepended to future AI \ +prompts for every member of the team. A good team memory is built from rich data: \ +documents the team wrote, projects it manages, emails and channel messages between \ +members, and search results showing the team's work. + +REJECT if ANY of the following are true: +- The memory is based on insufficient data (e.g. only a handful of chat titles, \ + no documents, no projects, no emails). A memory built from a nearly empty \ + workspace is useless speculation. +- It is mostly guesswork or hedged inferences (\"likely\", \"suggests\", \"may\") \ + rather than concrete facts derived from actual content. +- It is under ~500 words of substantive content. +- It lacks specific details about the team's actual work, projects, customers, \ + or processes. +- It is a profile of a single member rather than the team as a whole. +- It contains narration about the research process (\"I found...\", \"The workspace has...\"). + +ACCEPT only if the memory contains concrete, specific, actionable context derived \ +from substantial workspace data (documents, code, projects, emails, messages) that \ +would meaningfully improve future AI interactions for every member of the team."; + +impl MemoryServiceImpl +where + Rpo: MemoryRepo, + TRpo: TeamMemoryRepo, +{ + /// Get the latest memory for the user's team, if the user belongs to one, + /// triggering background regeneration when it is stale or missing. + #[tracing::instrument(skip(self), err)] + pub(super) async fn get_or_generate_team_memory( + &self, + user: MacroUserIdStr<'static>, + ) -> crate::domain::Result> { + let Some(team_id) = self.team_memory_repo.get_user_team_id(user.clone()).await? else { + return Ok(None); + }; + + let record = self + .team_memory_repo + .get_latest_team_memory(team_id) + .await?; + + let needs_generation = match &record { + Some(r) => { + let age = Utc::now() - r.updated_at; + age > chrono::Duration::from_std(MAX_AGE).unwrap_or(chrono::TimeDelta::MAX) + } + None => true, + }; + + let env = Environment::new_or_prod(); + if needs_generation && !matches!(env, Environment::Local) { + let pool = self.db.clone(); + let tool_context = self.tool_context.clone(); + let toolset = self.tools.toolset.clone(); + let prompt: Box = + Box::new(self.tools.prompt.to_string()); + tokio::spawn(async move { + // Unlike personal memory, every member of a (possibly large) + // team can trigger a regeneration while one is already in + // flight, so generations are serialized cross-instance via a + // Postgres advisory lock. + let _lock = match try_acquire_generation_lock(&pool, team_id).await { + Ok(Some(lock)) => lock, + Ok(None) => { + tracing::debug!(%team_id, "team memory generation already in progress"); + return; + } + Err(e) => { + tracing::error!(error = ?e, %team_id, "failed to acquire team memory generation lock"); + return; + } + }; + let repo = crate::outbound::pg_memory_repo::PgMemoryRepo::new(pool.clone()); + let team_repo = + crate::outbound::pg_team_memory_repo::PgTeamMemoryRepo::new(pool.clone()); + + // Re-check under the lock: another instance may have refreshed + // the row between our staleness check and acquiring the lock, so + // the lock deduplicates rather than merely serializing. Also read + // `previous_memory` here so we diff against the freshest baseline. + let latest = match team_repo.get_latest_team_memory(team_id).await { + Ok(latest) => latest, + Err(e) => { + tracing::error!(error = ?e, %team_id, "failed to reload latest team memory"); + return; + } + }; + let still_needs_generation = match &latest { + Some(r) => { + let age = Utc::now() - r.updated_at; + age > chrono::Duration::from_std(MAX_AGE).unwrap_or(chrono::TimeDelta::MAX) + } + None => true, + }; + if !still_needs_generation { + tracing::debug!(%team_id, "team memory already refreshed; skipping"); + return; + } + let previous_memory = latest.map(|r| r.memory); + + let tools = ToolSetWithPrompt { toolset, prompt }; + let svc = MemoryServiceImpl::new(pool, repo, team_repo, tool_context, tools); + match svc + .generate_team_memory(team_id, user.clone(), previous_memory) + .await + { + Ok(_) => tracing::info!(%team_id, "team memory generated"), + Err(MemoryError::Rejected(reason)) => { + tracing::warn!(%team_id, %reason, "team memory rejected by judge") + } + Err(e) => { + tracing::error!(%team_id, error = ?e, "team memory generation failed") + } + } + }); + } + + Ok(record.map(|r| r.memory)) + } + + /// Generate a fresh team memory by researching the team's workspace with + /// the access of the `user` who triggered the refresh. + #[tracing::instrument(skip(self), err)] + async fn generate_team_memory( + &self, + team_id: Uuid, + user: MacroUserIdStr<'static>, + previous_memory: Option, + ) -> crate::domain::Result { + let overview = self + .team_memory_repo + .get_team_overview(team_id) + .await? + .ok_or_else(|| anyhow::anyhow!("team {team_id} does not exist"))?; + + let system_prompt = build_team_generation_system_prompt( + &self.tools.prompt, + &user, + team_id, + &overview, + &Utc::now().to_rfc2822(), + previous_memory.as_deref(), + ); + + let agent_loop = AgentLoop::new().with_model(GENERATION_MODEL); + let toolset: Arc + Send + Sync> = + self.tools.toolset.clone() as _; + let mut session = agent_loop + .session( + toolset, + Arc::new(self.tool_context.clone()), + &system_prompt, + user.clone(), + ) + .await; + + let user_msg = ChatMessage { + content: ChatMessageContent::Text(GENERATE_TEAM_MEMORY_PROMPT.to_string()), + role: Role::User, + attachments: None, + }; + let rig_messages = agent::to_rig_messages(&[user_msg]); + + let mut content = String::new(); + { + let mut stream = session.send_message(rig_messages).await?; + + while let Some(next) = stream.next().await { + let part = next?; + if let StreamPart::Content(text) = part { + content.push_str(&text); + } + } + } + + let memory = content.trim().to_string(); + if memory.is_empty() { + return Err(MemoryError::NoGeneration); + } + + // 2nd pass: judge the memory quality + judge_memory(TEAM_JUDGE_PROMPT, &memory).await?; + + self.team_memory_repo + .save_team_memory(&memory, team_id) + .await?; + Ok(memory) + } +} + +/// Try to take the cross-instance generation lock for a team. +/// +/// Returns a connection holding a Postgres advisory lock, or `None` when +/// another generation for the same team already holds it. The connection is +/// detached from the pool so that dropping it closes the session, which +/// releases the lock even if the generation task panics. +async fn try_acquire_generation_lock( + pool: &sqlx::PgPool, + team_id: Uuid, +) -> crate::domain::Result> { + let mut conn = pool.acquire().await?.detach(); + let locked = sqlx::query_scalar!( + r#"SELECT pg_try_advisory_lock(hashtextextended('team_memory:' || $1, 0)) as "locked!""#, + team_id.to_string() + ) + .fetch_one(&mut conn) + .await?; + + Ok(locked.then_some(conn)) +} + +fn build_team_generation_system_prompt( + base_prompt: impl std::fmt::Display, + user: &MacroUserIdStr<'_>, + team_id: Uuid, + overview: &TeamOverview, + datetime: &str, + previous_memory: Option<&str>, +) -> String { + let members = overview.member_ids.join(", "); + let mut prompt = format!( + "{base_prompt}\n{user:?}\n{team_id}\n{name}\n{members}\n{datetime}", + name = overview.name, + ); + + if let Some(memory) = previous_memory { + prompt.push_str("\n\n"); + prompt.push_str(memory); + prompt.push_str("\n"); + } + + prompt +} diff --git a/rust/cloud-storage/memory/src/domain/service/team/test.rs b/rust/cloud-storage/memory/src/domain/service/team/test.rs new file mode 100644 index 0000000000..402ba836bd --- /dev/null +++ b/rust/cloud-storage/memory/src/domain/service/team/test.rs @@ -0,0 +1,72 @@ +use super::*; + +fn user_id(value: &str) -> MacroUserIdStr<'static> { + MacroUserIdStr::try_from(value.to_string()).expect("valid macro user id") +} + +fn overview() -> TeamOverview { + TeamOverview { + name: "Acme Engineering".to_string(), + member_ids: vec![ + "macro|alice@acme.com".to_string(), + "macro|bob@acme.com".to_string(), + ], + } +} + +#[test] +fn team_generation_system_prompt_includes_team_context() { + let user = user_id("macro|memory-test@example.com"); + let team_id = macro_uuid::generate_uuid_v7(); + let prompt = build_team_generation_system_prompt( + "base tools prompt", + &user, + team_id, + &overview(), + "Mon, 08 Jun 2026 12:00:00 +0000", + None, + ); + + assert!(prompt.contains("base tools prompt")); + assert!(prompt.contains("macro|memory-test@example.com")); + assert!(prompt.contains(&format!("{team_id}"))); + assert!(prompt.contains("Acme Engineering")); + assert!( + prompt.contains("macro|alice@acme.com, macro|bob@acme.com") + ); + assert!(prompt.contains("Mon, 08 Jun 2026 12:00:00 +0000")); +} + +#[test] +fn team_generation_system_prompt_includes_previous_memory_when_present() { + let user = user_id("macro|memory-test@example.com"); + let prompt = build_team_generation_system_prompt( + "base tools prompt", + &user, + macro_uuid::generate_uuid_v7(), + &overview(), + "Mon, 08 Jun 2026 12:00:00 +0000", + Some("previous durable team facts"), + ); + + assert!( + prompt.contains( + "\nprevious durable team facts\n" + ) + ); +} + +#[test] +fn team_generation_system_prompt_omits_previous_memory_when_absent() { + let user = user_id("macro|memory-test@example.com"); + let prompt = build_team_generation_system_prompt( + "base tools prompt", + &user, + macro_uuid::generate_uuid_v7(), + &overview(), + "Mon, 08 Jun 2026 12:00:00 +0000", + None, + ); + + assert!(!prompt.contains("")); +} diff --git a/rust/cloud-storage/memory/src/inbound/axum_router.rs b/rust/cloud-storage/memory/src/inbound/axum_router.rs index 50398e9d26..2527ff48e1 100644 --- a/rust/cloud-storage/memory/src/inbound/axum_router.rs +++ b/rust/cloud-storage/memory/src/inbound/axum_router.rs @@ -11,11 +11,14 @@ use serde::Serialize; use std::sync::Arc; use utoipa::ToSchema; -/// The user's latest memory. +/// The user's latest memories. #[derive(Serialize, ToSchema)] pub struct MemoryResponse { - /// The generated memory text. - pub memory: String, + /// The user's personal memory, if one has been generated. + pub memory: Option, + /// The latest memory of the user's team, if the user belongs to a team + /// and a team memory has been generated. + pub team_memory: Option, } #[derive(Serialize, ToSchema)] @@ -34,17 +37,17 @@ where .with_state(service) } -/// Get the authenticated user's latest memory. +/// Get the authenticated user's latest personal and team memories. /// -/// Returns the current memory if one exists. If the memory is stale or missing, -/// a background generation is triggered and the endpoint returns the stale -/// memory (200) or 404 if none exists yet. +/// Returns whichever memories currently exist. If either memory is stale or +/// missing, a background generation for it is triggered and the endpoint +/// returns the stale values (200), or 404 if neither exists yet. #[utoipa::path( get, path = "/memory", responses( - (status = 200, description = "Latest memory for the user", body = MemoryResponse), - (status = 404, description = "No memory exists for this user yet (generation triggered)"), + (status = 200, description = "Latest personal and team memories for the user", body = MemoryResponse), + (status = 404, description = "No memory exists for this user or their team yet (generation triggered)"), (status = 500, description = "Internal server error", body = MemoryErrorBody), ), tag = "memory" @@ -55,8 +58,14 @@ pub async fn get_memory_handler( user: MacroUserExtractor, ) -> Response { match service.get_or_generate_memory(user.macro_user_id).await { - Ok(Some(memory)) => Json(MemoryResponse { memory }).into_response(), - Ok(None) => StatusCode::NOT_FOUND.into_response(), + Ok(memories) if memories.user.is_none() && memories.team.is_none() => { + StatusCode::NOT_FOUND.into_response() + } + Ok(memories) => Json(MemoryResponse { + memory: memories.user, + team_memory: memories.team, + }) + .into_response(), Err(e) => { tracing::error!(error = ?e, "failed to get memory"); ( diff --git a/rust/cloud-storage/memory/src/main.rs b/rust/cloud-storage/memory/src/main.rs index d8f4a7854f..312719987b 100644 --- a/rust/cloud-storage/memory/src/main.rs +++ b/rust/cloud-storage/memory/src/main.rs @@ -6,6 +6,7 @@ use macro_user_id::user_id::MacroUserIdStr; use memory::config::Config; use memory::domain::{MemoryService, service::MemoryServiceImpl}; use memory::outbound::pg_memory_repo::PgMemoryRepo; +use memory::outbound::pg_team_memory_repo::PgTeamMemoryRepo; use sqlx::postgres::PgPoolOptions; #[tokio::main] @@ -22,16 +23,23 @@ async fn main() -> anyhow::Result<()> { let tool_context = build_tool_service_context_from_env(pool.clone()).await?; let tools = all_tools(); let memory_repo = PgMemoryRepo::new(pool.clone()); - let memory_service = MemoryServiceImpl::new(pool, memory_repo, tool_context, tools); + let team_memory_repo = PgTeamMemoryRepo::new(pool.clone()); + let memory_service = + MemoryServiceImpl::new(pool, memory_repo, team_memory_repo, tool_context, tools); let user = MacroUserIdStr::try_from(config.user_id.clone()) .context("USER_ID must be a valid Macro user id")?; tracing::info!("Generating memory for {user}..."); - match memory_service.get_or_generate_memory(user).await? { + let memories = memory_service.get_or_generate_memory(user).await?; + match memories.user { Some(memory) => println!("{memory}"), None => println!("No memory yet, generation triggered in background"), } + match memories.team { + Some(memory) => println!("\n\n{memory}\n"), + None => println!("No team memory yet (no team, or generation triggered in background)"), + } Ok(()) } diff --git a/rust/cloud-storage/memory/src/outbound.rs b/rust/cloud-storage/memory/src/outbound.rs index 599b0bd1fd..44b5a28c41 100644 --- a/rust/cloud-storage/memory/src/outbound.rs +++ b/rust/cloud-storage/memory/src/outbound.rs @@ -1 +1,2 @@ pub mod pg_memory_repo; +pub mod pg_team_memory_repo; diff --git a/rust/cloud-storage/memory/src/outbound/pg_memory_repo.rs b/rust/cloud-storage/memory/src/outbound/pg_memory_repo.rs index a110f981b2..f87ed176f5 100644 --- a/rust/cloud-storage/memory/src/outbound/pg_memory_repo.rs +++ b/rust/cloud-storage/memory/src/outbound/pg_memory_repo.rs @@ -23,7 +23,7 @@ impl MemoryRepo for PgMemoryRepo { r#" INSERT INTO memory (id, user_id, memory) VALUES ($1, $2, $3) - ON CONFLICT (user_id) DO UPDATE + ON CONFLICT (user_id) WHERE user_id IS NOT NULL DO UPDATE SET memory = EXCLUDED.memory, updated_at = NOW() RETURNING id diff --git a/rust/cloud-storage/memory/src/outbound/pg_team_memory_repo.rs b/rust/cloud-storage/memory/src/outbound/pg_team_memory_repo.rs new file mode 100644 index 0000000000..2bfef6eee0 --- /dev/null +++ b/rust/cloud-storage/memory/src/outbound/pg_team_memory_repo.rs @@ -0,0 +1,124 @@ +#[cfg(test)] +mod test; + +use crate::domain::{ + Memory, Result, TeamMemoryRepo, + ports::{MemoryRecord, TeamOverview}, +}; +use macro_user_id::user_id::MacroUserIdStr; +use macro_uuid::Uuid; +use sqlx::PgPool; + +/// Postgres-backed [`TeamMemoryRepo`] over the shared `memory` table. +pub struct PgTeamMemoryRepo { + inner: PgPool, +} + +impl PgTeamMemoryRepo { + /// Create a new repository backed by the given pool. + pub fn new(inner: PgPool) -> Self { + PgTeamMemoryRepo { inner } + } +} + +impl TeamMemoryRepo for PgTeamMemoryRepo { + async fn save_team_memory(&self, memory: &Memory, team_id: Uuid) -> Result { + let id = macro_uuid::generate_uuid_v7(); + let row = sqlx::query!( + r#" + INSERT INTO memory (id, team_id, memory) + VALUES ($1, $2, $3) + ON CONFLICT (team_id) WHERE team_id IS NOT NULL DO UPDATE + SET memory = EXCLUDED.memory, + updated_at = NOW() + RETURNING id + "#, + id, + team_id, + memory, + ) + .fetch_one(&self.inner) + .await?; + + Ok(row.id) + } + + async fn get_latest_team_memory(&self, team_id: Uuid) -> Result> { + let row = sqlx::query!( + r#" + SELECT memory, updated_at as "updated_at!" + FROM memory + WHERE team_id = $1 + ORDER BY updated_at DESC + LIMIT 1 + "#, + team_id, + ) + .fetch_optional(&self.inner) + .await?; + + Ok(row.map(|r| MemoryRecord { + memory: r.memory, + updated_at: r.updated_at, + })) + } + + async fn get_team_memory_by_id(&self, team_id: Uuid, id: Uuid) -> Result { + let row = sqlx::query!( + r#" + SELECT memory + FROM memory + WHERE id = $1 AND team_id = $2 + "#, + id, + team_id, + ) + .fetch_optional(&self.inner) + .await? + .ok_or(crate::domain::MemoryError::NoGeneration)?; + + Ok(row.memory) + } + + async fn get_user_team_id(&self, user: MacroUserIdStr<'_>) -> Result> { + // Users are expected to belong to at most one team. Mirror + // entity_access::get_user_team: if `team_user` defensively returns + // multiple rows, the team where the user holds the strongest role + // wins (Postgres orders the enum `member < admin < owner`). + let row = sqlx::query_scalar!( + r#" + SELECT team_id + FROM team_user + WHERE user_id = $1 + ORDER BY team_role DESC + LIMIT 1 + "#, + user.as_ref(), + ) + .fetch_optional(&self.inner) + .await?; + + Ok(row) + } + + async fn get_team_overview(&self, team_id: Uuid) -> Result> { + let row = sqlx::query!( + r#" + SELECT t.name, + ARRAY_REMOVE(ARRAY_AGG(tu.user_id), NULL) as "member_ids!" + FROM team t + LEFT JOIN team_user tu ON tu.team_id = t.id + WHERE t.id = $1 + GROUP BY t.id + "#, + team_id, + ) + .fetch_optional(&self.inner) + .await?; + + Ok(row.map(|r| TeamOverview { + name: r.name, + member_ids: r.member_ids, + })) + } +} diff --git a/rust/cloud-storage/memory/src/outbound/pg_team_memory_repo/test.rs b/rust/cloud-storage/memory/src/outbound/pg_team_memory_repo/test.rs new file mode 100644 index 0000000000..8eab16634f --- /dev/null +++ b/rust/cloud-storage/memory/src/outbound/pg_team_memory_repo/test.rs @@ -0,0 +1,254 @@ +use super::PgTeamMemoryRepo; +use crate::domain::{MemoryError, MemoryRepo, TeamMemoryRepo}; +use crate::outbound::pg_memory_repo::PgMemoryRepo; +use macro_db_migrator::MACRO_DB_MIGRATIONS; +use macro_user_id::user_id::MacroUserIdStr; +use macro_uuid::Uuid; +use sqlx::{Pool, Postgres}; + +/// Insert the `macro_user` + `"User"` rows a `macro|` user id depends on. +async fn create_user(pool: &Pool, user_id: &str) { + let macro_user_id = macro_uuid::generate_uuid_v7(); + let email = user_id.strip_prefix("macro|").unwrap_or(user_id); + sqlx::query!( + "INSERT INTO macro_user (id, username, email, stripe_customer_id) VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING", + macro_user_id, + email, + email, + format!("cus_{email}"), + ) + .execute(pool) + .await + .unwrap(); + sqlx::query!( + r#"INSERT INTO "User" (id, email, macro_user_id) VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING"#, + user_id, + email, + macro_user_id, + ) + .execute(pool) + .await + .unwrap(); +} + +async fn create_team(pool: &Pool, name: &str, owner_id: &str) -> Uuid { + create_user(pool, owner_id).await; + let team_id = macro_uuid::generate_uuid_v7(); + sqlx::query!( + "INSERT INTO team (id, name, owner_id) VALUES ($1, $2, $3)", + team_id, + name, + owner_id, + ) + .execute(pool) + .await + .unwrap(); + team_id +} + +async fn add_member(pool: &Pool, team_id: Uuid, user_id: &str, role: &str) { + create_user(pool, user_id).await; + sqlx::query!( + "INSERT INTO team_user (user_id, team_id, team_role) VALUES ($1, $2, ($3::text)::team_role)", + user_id, + team_id, + role, + ) + .execute(pool) + .await + .unwrap(); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn save_and_get_by_id(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + let repo = PgTeamMemoryRepo::new(pool); + let memory_text = "Team builds cloud infra for enterprise customers".to_string(); + + let id = repo.save_team_memory(&memory_text, team_id).await.unwrap(); + let fetched = repo.get_team_memory_by_id(team_id, id).await.unwrap(); + + assert_eq!(fetched, memory_text); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_latest_returns_most_recent(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + let repo = PgTeamMemoryRepo::new(pool); + + repo.save_team_memory(&"first memory".to_string(), team_id) + .await + .unwrap(); + repo.save_team_memory(&"second memory".to_string(), team_id) + .await + .unwrap(); + + let record = repo.get_latest_team_memory(team_id).await.unwrap().unwrap(); + assert_eq!(record.memory, "second memory"); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_latest_no_memories_returns_none(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + let repo = PgTeamMemoryRepo::new(pool); + + let result = repo.get_latest_team_memory(team_id).await.unwrap(); + assert!(result.is_none()); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_by_id_wrong_team_returns_error(pool: Pool) { + let team_a = create_team(&pool, "team-a", "macro|owner-a@example.com").await; + let team_b = create_team(&pool, "team-b", "macro|owner-b@example.com").await; + let repo = PgTeamMemoryRepo::new(pool); + + let id = repo + .save_team_memory(&"team a memory".to_string(), team_a) + .await + .unwrap(); + + let result = repo.get_team_memory_by_id(team_b, id).await; + assert!(matches!(result, Err(MemoryError::NoGeneration))); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn memories_are_scoped_to_team(pool: Pool) { + let team_a = create_team(&pool, "team-a", "macro|owner-a@example.com").await; + let team_b = create_team(&pool, "team-b", "macro|owner-b@example.com").await; + let repo = PgTeamMemoryRepo::new(pool); + + repo.save_team_memory(&"team a memory".to_string(), team_a) + .await + .unwrap(); + repo.save_team_memory(&"team b memory".to_string(), team_b) + .await + .unwrap(); + + let latest_a = repo.get_latest_team_memory(team_a).await.unwrap().unwrap(); + let latest_b = repo.get_latest_team_memory(team_b).await.unwrap().unwrap(); + + assert_eq!(latest_a.memory, "team a memory"); + assert_eq!(latest_b.memory, "team b memory"); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn team_and_user_memories_coexist_in_shared_table(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + let team_repo = PgTeamMemoryRepo::new(pool.clone()); + let user_repo = PgMemoryRepo::new(pool); + let user = MacroUserIdStr::parse_from_str("macro|owner@example.com").unwrap(); + + user_repo + .save_memory(&"personal memory".to_string(), user.clone()) + .await + .unwrap(); + team_repo + .save_team_memory(&"team memory".to_string(), team_id) + .await + .unwrap(); + + // Upserts stay scoped: refreshing one must not clobber the other. + user_repo + .save_memory(&"fresh personal memory".to_string(), user.clone()) + .await + .unwrap(); + team_repo + .save_team_memory(&"fresh team memory".to_string(), team_id) + .await + .unwrap(); + + let personal = user_repo.get_latest_memory(user).await.unwrap().unwrap(); + let team = team_repo + .get_latest_team_memory(team_id) + .await + .unwrap() + .unwrap(); + assert_eq!(personal.memory, "fresh personal memory"); + assert_eq!(team.memory, "fresh team memory"); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn deleting_team_deletes_its_memory(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + let repo = PgTeamMemoryRepo::new(pool.clone()); + + repo.save_team_memory(&"a memory".to_string(), team_id) + .await + .unwrap(); + + sqlx::query!("DELETE FROM team WHERE id = $1", team_id) + .execute(&pool) + .await + .unwrap(); + + let result = repo.get_latest_team_memory(team_id).await.unwrap(); + assert!(result.is_none()); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_user_team_id_returns_membership(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + add_member(&pool, team_id, "macro|member@example.com", "member").await; + let repo = PgTeamMemoryRepo::new(pool); + + let user = MacroUserIdStr::parse_from_str("macro|member@example.com").unwrap(); + let result = repo.get_user_team_id(user).await.unwrap(); + + assert_eq!(result, Some(team_id)); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_user_team_id_returns_none_without_membership(pool: Pool) { + let repo = PgTeamMemoryRepo::new(pool); + + let user = MacroUserIdStr::parse_from_str("macro|loner@example.com").unwrap(); + let result = repo.get_user_team_id(user).await.unwrap(); + + assert_eq!(result, None); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_team_overview_returns_name_and_members(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + add_member(&pool, team_id, "macro|owner@example.com", "owner").await; + add_member(&pool, team_id, "macro|member@example.com", "member").await; + let repo = PgTeamMemoryRepo::new(pool); + + let overview = repo.get_team_overview(team_id).await.unwrap().unwrap(); + + assert_eq!(overview.name, "acme"); + let mut members = overview.member_ids; + members.sort(); + assert_eq!( + members, + vec![ + "macro|member@example.com".to_string(), + "macro|owner@example.com".to_string(), + ] + ); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_team_overview_with_no_members_returns_empty_list(pool: Pool) { + let team_id = create_team(&pool, "acme", "macro|owner@example.com").await; + let repo = PgTeamMemoryRepo::new(pool); + + let overview = repo.get_team_overview(team_id).await.unwrap().unwrap(); + + assert_eq!(overview.name, "acme"); + assert!(overview.member_ids.is_empty()); +} + +#[sqlx::test(migrator = "MACRO_DB_MIGRATIONS")] +async fn get_team_overview_missing_team_returns_none(pool: Pool) { + let repo = PgTeamMemoryRepo::new(pool); + + let overview = repo + .get_team_overview(macro_uuid::generate_uuid_v7()) + .await + .unwrap(); + + assert!(overview.is_none()); +} diff --git a/rust/cloud-storage/scheduled_action/src/outbound/inprocess_executor/agent_task.rs b/rust/cloud-storage/scheduled_action/src/outbound/inprocess_executor/agent_task.rs index 5e0a9730ef..eba2208da9 100644 --- a/rust/cloud-storage/scheduled_action/src/outbound/inprocess_executor/agent_task.rs +++ b/rust/cloud-storage/scheduled_action/src/outbound/inprocess_executor/agent_task.rs @@ -11,9 +11,10 @@ use chat::domain::ports::ChatRepo; use chat::outbound::postgres::PgChatRepo; use futures::StreamExt; use macro_db_client::dcs::create_chat_message::create_chat_message; -use memory::domain::MemoryService; use memory::domain::service::MemoryServiceImpl; +use memory::domain::{Memories, MemoryService}; use memory::outbound::pg_memory_repo::PgMemoryRepo; +use memory::outbound::pg_team_memory_repo::PgTeamMemoryRepo; use model::chat::NewChatMessage; use notification::domain::service::SqsNotificationIngress; use notification::outbound::queue::SqsQueue; @@ -56,11 +57,11 @@ pub async fn run_agent_task( Ok(()) } -async fn fetch_user_memory( +async fn fetch_memories( db: &PgPool, tool_context: &ToolServiceContext, owner: ¯o_user_id::user_id::MacroUserIdStr<'static>, -) -> Option { +) -> Memories { let tools = all_tools(); let tools = ToolSetWithPrompt { toolset: tools.toolset, @@ -69,14 +70,15 @@ async fn fetch_user_memory( let memory_service = MemoryServiceImpl::new( db.clone(), PgMemoryRepo::new(db.clone()), + PgTeamMemoryRepo::new(db.clone()), tool_context.clone(), tools, ); match memory_service.get_or_generate_memory(owner.clone()).await { - Ok(memory) => memory, + Ok(memories) => memories, Err(e) => { - tracing::warn!(error=?e, %owner, "failed to fetch user memory; running without it"); - None + tracing::warn!(error=?e, %owner, "failed to fetch memories; running without them"); + Memories::default() } } } @@ -119,14 +121,16 @@ async fn run_tool_loop( agent_task: &AgentTask, ) -> Result> { let tools = all_tools(); - let user_memory = fetch_user_memory(db, tool_context, &action.owner).await; - let system_prompt = match user_memory { - Some(memory) => format!( - "{}\n{}\n\n{}\n\n{}", - tools.prompt, SCHEDULED_AGENT_PROMPT, memory, agent_task.prompt - ), - None => format!("{}\n{}", tools.prompt, agent_task.prompt), - }; + let memories = fetch_memories(db, tool_context, &action.owner).await; + // The guardrail prompt must always be present, regardless of which memories exist. + let mut system_prompt = format!("{}\n{}", tools.prompt, SCHEDULED_AGENT_PROMPT); + if let Some(memory) = memories.user { + system_prompt.push_str(&format!("\n\n{memory}\n")); + } + if let Some(memory) = memories.team { + system_prompt.push_str(&format!("\n\n{memory}\n")); + } + system_prompt.push_str(&format!("\n{}", agent_task.prompt)); let toolset: Arc + Send + Sync> = tools.toolset; let agent_loop = AgentLoop::new().with_model(agent_task.model);