diff --git a/apps/mail/components/context/thread-context.tsx b/apps/mail/components/context/thread-context.tsx index 6d48164abb..76a70c9d63 100644 --- a/apps/mail/components/context/thread-context.tsx +++ b/apps/mail/components/context/thread-context.tsx @@ -25,10 +25,13 @@ import { } from 'lucide-react'; import { useOptimisticThreadState } from '@/components/mail/optimistic-thread-state'; import { useOptimisticActions } from '@/hooks/use-optimistic-actions'; +import { ExclamationCircle, Mail, Clock } from '../icons/icons'; +import { SnoozeDialog } from '@/components/mail/snooze-dialog'; import { type ThreadDestination } from '@/lib/thread-actions'; import { useThread, useThreads } from '@/hooks/use-threads'; -import { ExclamationCircle, Mail, Clock } from '../icons/icons'; import { useMemo, type ReactNode, useState } from 'react'; +import { useTRPC } from '@/providers/query-provider'; +import { useMutation } from '@tanstack/react-query'; import { useLabels } from '@/hooks/use-labels'; import { FOLDERS, LABELS } from '@/lib/utils'; import { useMail } from '../mail/use-mail'; @@ -37,7 +40,6 @@ import { m } from '@/paraglide/messages'; import { useParams } from 'react-router'; import { useQueryState } from 'nuqs'; import { toast } from 'sonner'; -import { SnoozeDialog } from '@/components/mail/snooze-dialog'; interface EmailAction { id: string; @@ -93,10 +95,14 @@ const LabelsList = ({ threadId, bulkSelected }: { threadId: string; bulkSelected {labels .filter((label) => label.id) .map((label) => { - let isChecked = label.id ? thread!.labels?.some((l) => l.id === label.id) ?? false : false; + let isChecked = label.id + ? (thread!.labels?.some((l) => l.id === label.id) ?? false) + : false; if (rightClickedThreadOptimisticState.optimisticLabels) { - if (rightClickedThreadOptimisticState.optimisticLabels.addedLabelIds.includes(label.id)) { + if ( + rightClickedThreadOptimisticState.optimisticLabels.addedLabelIds.includes(label.id) + ) { isChecked = true; } else if ( rightClickedThreadOptimisticState.optimisticLabels.removedLabelIds.includes(label.id) @@ -141,16 +147,18 @@ export function ThreadContextMenu({ const { data: threadData } = useThread(threadId); const [, setActiveReplyId] = useQueryState('activeReplyId'); const optimisticState = useOptimisticThreadState(threadId); + const trpc = useTRPC(); const { optimisticMoveThreadsTo, optimisticToggleStar, optimisticToggleImportant, optimisticMarkAsRead, optimisticMarkAsUnread, - optimisticDeleteThreads, + // optimisticDeleteThreads, optimisticSnooze, optimisticUnsnooze, } = useOptimisticActions(); + const { mutateAsync: deleteThread } = useMutation(trpc.mail.delete.mutationOptions()); const { isUnread, isStarred, isImportant } = useMemo(() => { const unread = threadData?.hasUnread ?? false; @@ -305,18 +313,18 @@ export function ThreadContextMenu({ const handleDelete = () => () => { const targets = mail.bulkSelected.length ? mail.bulkSelected : [threadId]; - // Use optimistic update with undo functionality - optimisticDeleteThreads(targets, currentFolder); - - // Clear bulk selection after action - if (mail.bulkSelected.length) { - setMail((prev) => ({ ...prev, bulkSelected: [] })); - } - - // Navigation removed to prevent route change on current thread action - // if (!mail.bulkSelected.length && threadId) { - // navigate(`/mail/${currentFolder}`); - // } + toast.promise( + Promise.all( + targets.map(async (id) => { + return deleteThread({ id }); + }), + ), + { + loading: 'Deleting...', + success: 'Deleted', + error: 'Failed to delete', + }, + ); }; const getActions = useMemo(() => { @@ -353,7 +361,6 @@ export function ThreadContextMenu({ label: m['common.mail.deleteFromBin'](), icon: , action: handleDelete(), - disabled: true, }, ]; } @@ -493,15 +500,7 @@ export function ThreadContextMenu({ disabled: false, }, ], - [ - isUnread, - isImportant, - isStarred, - m, - handleReadUnread, - handleToggleImportant, - handleFavorites, - ], + [isUnread, isImportant, isStarred, m, handleReadUnread, handleToggleImportant, handleFavorites], ); const renderAction = (action: EmailAction) => { diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index ea45b65a52..798fdb8ea1 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -24,7 +24,6 @@ import { ZeroAgent, ZeroDriver } from './routes/agent'; import { contextStorage } from 'hono/context-storage'; import { defaultUserSettings } from './lib/schemas'; import { createLocalJWKSet, jwtVerify } from 'jose'; -import { routePartykitRequest } from 'partyserver'; import { getZeroAgent } from './lib/server-utils'; import { enableBrainFunction } from './lib/brain'; import { trpcServer } from '@hono/trpc-server'; @@ -691,7 +690,7 @@ export default class extends WorkerEntrypoint { await env.thread_queue.send({ providerId, historyId: body.historyId, - subscriptionName: subHeader!, + subscriptionName: subHeader, }); } catch (error) { console.error('Error sending to thread queue', error, { @@ -705,12 +704,6 @@ export default class extends WorkerEntrypoint { }); async fetch(request: Request): Promise { - if (request.url.includes('/zero/durable-mailbox')) { - const res = await routePartykitRequest(request, env as unknown as Record, { - prefix: 'zero', - }); - if (res) return res; - } return this.app.fetch(request, this.env, this.ctx); } @@ -723,8 +716,6 @@ export default class extends WorkerEntrypoint { batch.messages.map(async (msg: Message) => { const connectionId = msg.body.connectionId; const providerId = msg.body.providerId; - console.log('connectionId', connectionId); - console.log('providerId', providerId); try { await enableBrainFunction({ id: connectionId, providerId }); } catch (error) { @@ -742,7 +733,6 @@ export default class extends WorkerEntrypoint { return; } case batch.queue.startsWith('thread-queue'): { - console.log('batch', batch); try { await Promise.all( batch.messages.map(async (msg: Message) => { diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index 1fcc4b44f0..669add8fe8 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -65,7 +65,7 @@ type MainWorkflowError = const validateArguments = ( params: MainWorkflowParams, - serviceAccount: any, + serviceAccount: { project_id: string }, ): Effect.Effect => Effect.gen(function* () { yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); @@ -87,6 +87,12 @@ const validateArguments = ( const override = false; +/** + * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * It validates the subscription name and extracts the connection ID. + * @param params + * @returns + */ export const runMainWorkflow = ( params: MainWorkflowParams, ): Effect.Effect => @@ -265,36 +271,82 @@ export const runZeroWorkflow = ( }); // Extract thread IDs from history - const threadIds = new Set(); + const threadsChanged = new Set(); + const threadsAdded = new Set(); history.forEach((historyItem) => { if (historyItem.messagesAdded) { historyItem.messagesAdded.forEach((messageAdded) => { if (messageAdded.message?.threadId) { - threadIds.add(messageAdded.message.threadId); + threadsChanged.add(messageAdded.message.threadId); + threadsAdded.add(messageAdded.message.threadId); } }); } if (historyItem.labelsAdded) { historyItem.labelsAdded.forEach((labelAdded) => { if (labelAdded.message?.threadId) { - threadIds.add(labelAdded.message.threadId); + threadsChanged.add(labelAdded.message.threadId); } }); } if (historyItem.labelsRemoved) { historyItem.labelsRemoved.forEach((labelRemoved) => { if (labelRemoved.message?.threadId) { - threadIds.add(labelRemoved.message.threadId); + threadsChanged.add(labelRemoved.message.threadId); } }); } }); - yield* Console.log('[ZERO_WORKFLOW] Found unique thread IDs:', Array.from(threadIds)); + yield* Console.log( + '[ZERO_WORKFLOW] Found unique thread IDs:', + Array.from(threadsChanged), + Array.from(threadsAdded), + ); + + if (threadsAdded.size > 0) { + const threadWorkflowParams = Array.from(threadsAdded); + + // Sync threads with proper error handling - use allSuccesses to collect successful syncs + const syncResults = yield* Effect.allSuccesses( + threadWorkflowParams.map((threadId) => + Effect.tryPromise({ + try: async () => { + const result = await agent.syncThread({ threadId }); + console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`); + return { threadId, result }; + }, + catch: (error) => { + console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error); + // Let this effect fail so allSuccesses will exclude it + throw new Error( + `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`, + ); + }, + }), + ), + { concurrency: 1 }, // Limit concurrency to avoid rate limits + ); + + const syncedCount = syncResults.length; + const failedCount = threadWorkflowParams.length - syncedCount; + + if (failedCount > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`, + ); + // Continue with processing - sync failures shouldn't stop the entire workflow + // The thread processing will continue with whatever data is available + } else { + yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`); + } + + yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults); + } // Process all threads concurrently using Effect.all - if (threadIds.size > 0) { - const threadWorkflowParams = Array.from(threadIds).map((threadId) => ({ + if (threadsChanged.size > 0) { + const threadWorkflowParams = Array.from(threadsChanged).map((threadId) => ({ connectionId, threadId, providerId: foundConnection.providerId, @@ -339,7 +391,7 @@ export const runZeroWorkflow = ( ); }), ), - { concurrency: 1 }, // Process up to 5 threads concurrently + { concurrency: 1, discard: true }, // Process up to 5 threads concurrently ); yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); @@ -407,6 +459,11 @@ type ThreadWorkflowError = | { _tag: 'GmailApiError'; error: unknown } | { _tag: 'VectorizationError'; error: unknown }; +/** + * Runs the main workflow for processing a thread. The workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * @param params + * @returns + */ export const runThreadWorkflow = ( params: ThreadWorkflowParams, ): Effect.Effect => @@ -613,12 +670,9 @@ export const runThreadWorkflow = ( { role: 'system', content: SummarizeMessagePrompt }, { role: 'user', content: prompt }, ]; - const response: any = await env.AI.run( - '@cf/meta/llama-4-scout-17b-16e-instruct', - { - messages, - }, - ); + const response = await env.AI.run('@cf/meta/llama-4-scout-17b-16e-instruct', { + messages, + }); console.log( `[THREAD_WORKFLOW] Summary generated for message ${message.id}:`, response, @@ -696,7 +750,7 @@ export const runThreadWorkflow = ( return null; } console.log('[THREAD_WORKFLOW] Found existing thread summary'); - return threadSummary[0].metadata as any; + return threadSummary[0].metadata as { summary: string; lastMsg: string }; }, catch: (error) => ({ _tag: 'VectorizationError' as const, error }), }); diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 5790deab17..00c30145e0 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -15,11 +15,11 @@ */ import { - type StreamTextOnFinishCallback, - createDataStreamResponse, - streamText, appendResponseMessages, + createDataStreamResponse, generateText, + streamText, + type StreamTextOnFinishCallback, } from 'ai'; import { IncomingMessageType, @@ -30,10 +30,10 @@ import { import { EPrompts, type IOutgoingMessage, - type ParsedMessage, type ISnoozeBatch, + type ParsedMessage, } from '../../types'; -import type { MailManager, IGetThreadResponse, IGetThreadsResponse } from '../../lib/driver/types'; +import type { IGetThreadResponse, IGetThreadsResponse, MailManager } from '../../lib/driver/types'; import { DurableObjectOAuthClientProvider } from 'agents/mcp/do-oauth-client-provider'; import { AiChatPrompt, GmailSearchAssistantSystemPrompt } from '../../lib/prompts'; import { connectionToDriver, getZeroSocketAgent } from '../../lib/server-utils'; @@ -367,6 +367,16 @@ export class ZeroDriver extends AIChatAgent { DROP TABLE IF EXISTS threads;`; } + async deleteThread(id: string) { + void this.sql` + DELETE FROM threads WHERE thread_id = ${id}; + `; + this.agent?.broadcastChatMessage({ + type: OutgoingMessageType.Mail_List, + folder: 'bin', + }); + } + async syncThread({ threadId }: { threadId: string }) { if (this.name === 'general') return; if (!this.driver) { @@ -384,7 +394,7 @@ export class ZeroDriver extends AIChatAgent { } this.syncThreadsInProgress.set(threadId, true); - console.log('Server: syncThread called for thread', threadId); + // console.log('Server: syncThread called for thread', threadId); try { const threadData = await this.getWithRetry(threadId); const latest = threadData.latest; @@ -432,10 +442,10 @@ export class ZeroDriver extends AIChatAgent { threadId, }); this.syncThreadsInProgress.delete(threadId); - console.log('Server: syncThread result', { - threadId, - labels: threadData.labels, - }); + // console.log('Server: syncThread result', { + // threadId, + // labels: threadData.labels, + // }); return { success: true, threadId, threadData }; } else { this.syncThreadsInProgress.delete(threadId); @@ -656,6 +666,11 @@ export class ZeroDriver extends AIChatAgent { }; } + normalizeFolderName(folderName: string) { + if (folderName === 'bin') return 'trash'; + return folderName; + } + async getThreadsFromDB(params: { labelIds?: string[]; folder?: string; @@ -663,15 +678,14 @@ export class ZeroDriver extends AIChatAgent { maxResults?: number; pageToken?: string; }): Promise { - const { labelIds = [], folder, q, maxResults = 50, pageToken } = params; + const { labelIds = [], q, maxResults = 50, pageToken } = params; + let folder = params.folder ?? 'inbox'; try { + folder = this.normalizeFolderName(folder); const folderThreadCount = (await this.count()).find((c) => c.label === folder)?.count; const currentThreadCount = await this.getThreadCount(); - console.log('folderThreadCount', folderThreadCount, folder); - console.log('currentThreadCount', currentThreadCount); - if (folderThreadCount && folderThreadCount > currentThreadCount && folder) { this.ctx.waitUntil(this.syncThreads(folder)); } @@ -852,8 +866,7 @@ export class ZeroDriver extends AIChatAgent { `; if (!result || result.length === 0) { - const res = await this.queue('syncThread', { threadId: id }); - console.log('res', res); + await this.queue('syncThread', { threadId: id }); return { messages: [], latest: undefined, diff --git a/apps/server/src/routes/agent/rpc.ts b/apps/server/src/routes/agent/rpc.ts index c7f279ccd8..b36c0e1e12 100644 --- a/apps/server/src/routes/agent/rpc.ts +++ b/apps/server/src/routes/agent/rpc.ts @@ -161,7 +161,9 @@ export class DriverRpcDO extends RpcTarget { } async delete(id: string) { - return await this.mainDo.delete(id); + const result = await this.mainDo.delete(id); + await this.mainDo.deleteThread(id); + return result; } async deleteAllSpam() {