From 101b73219d09914b614057bdfbdfee2efe9ff7b3 Mon Sep 17 00:00:00 2001 From: Adam <13007539+MrgSub@users.noreply.github.com> Date: Tue, 22 Jul 2025 13:53:00 -0700 Subject: [PATCH 1/3] Clean up code and improve thread processing workflow (#1797) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Clean up Gmail thread processing and improve error handling ## Description This PR improves the Gmail thread processing workflow by: 1. Removing unnecessary console logs and debug statements 2. Adding proper type annotations instead of using `any` 3. Removing unused PartyKit request routing code 4. Enhancing thread processing by tracking both changed and added threads separately 5. Adding JSDoc comments to key workflow functions 6. Implementing thread syncing for newly added threads 7. Fixing a non-null assertion by removing the `!` operator ## Type of Change - [x] 🐛 Bug fix (non-breaking change which fixes an issue) - [x] ⚡ Performance improvement ## Areas Affected - [x] Email Integration (Gmail, IMAP, etc.) - [x] Data Storage/Management ## Testing Done - [x] Manual testing performed ## Checklist - [x] I have performed a self-review of my code - [x] My code follows the project's style guidelines - [x] I have commented my code, particularly in complex areas - [x] My changes generate no new warnings ## Additional Notes The thread processing logic now distinguishes between threads that were changed and threads that were newly added, allowing for more efficient processing. Added JSDoc comments to improve code documentation and maintainability. --- _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ ## Summary by CodeRabbit * **Bug Fixes** * Improved handling of thread synchronization in workflows, ensuring newly added threads are properly synced before processing. * Reduced unnecessary debug logging for cleaner output. * **Documentation** * Added JSDoc comments to key workflow functions for better clarity. * **Refactor** * Enhanced type safety in workflow parameters and return types. * Simplified and reordered import statements for improved code organization. --- apps/server/src/main.ts | 11 +--- apps/server/src/pipelines.effect.ts | 86 ++++++++++++++++++++++----- apps/server/src/routes/agent/index.ts | 13 ++-- 3 files changed, 77 insertions(+), 33 deletions(-) diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index ea45b65a52..19886f2aa5 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -691,7 +691,7 @@ export default class extends WorkerEntrypoint { await env.thread_queue.send({ providerId, historyId: body.historyId, - subscriptionName: subHeader!, + subscriptionName: subHeader, }); } catch (error) { console.error('Error sending to thread queue', error, { @@ -705,12 +705,6 @@ export default class extends WorkerEntrypoint { }); async fetch(request: Request): Promise { - if (request.url.includes('/zero/durable-mailbox')) { - const res = await routePartykitRequest(request, env as unknown as Record, { - prefix: 'zero', - }); - if (res) return res; - } return this.app.fetch(request, this.env, this.ctx); } @@ -723,8 +717,6 @@ export default class extends WorkerEntrypoint { batch.messages.map(async (msg: Message) => { const connectionId = msg.body.connectionId; const providerId = msg.body.providerId; - console.log('connectionId', connectionId); - console.log('providerId', providerId); try { await enableBrainFunction({ id: connectionId, providerId }); } catch (error) { @@ -742,7 +734,6 @@ export default class extends WorkerEntrypoint { return; } case batch.queue.startsWith('thread-queue'): { - console.log('batch', batch); try { await Promise.all( batch.messages.map(async (msg: Message) => { diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index 1fcc4b44f0..669add8fe8 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -65,7 +65,7 @@ type MainWorkflowError = const validateArguments = ( params: MainWorkflowParams, - serviceAccount: any, + serviceAccount: { project_id: string }, ): Effect.Effect => Effect.gen(function* () { yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); @@ -87,6 +87,12 @@ const validateArguments = ( const override = false; +/** + * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * It validates the subscription name and extracts the connection ID. + * @param params + * @returns + */ export const runMainWorkflow = ( params: MainWorkflowParams, ): Effect.Effect => @@ -265,36 +271,82 @@ export const runZeroWorkflow = ( }); // Extract thread IDs from history - const threadIds = new Set(); + const threadsChanged = new Set(); + const threadsAdded = new Set(); history.forEach((historyItem) => { if (historyItem.messagesAdded) { historyItem.messagesAdded.forEach((messageAdded) => { if (messageAdded.message?.threadId) { - threadIds.add(messageAdded.message.threadId); + threadsChanged.add(messageAdded.message.threadId); + threadsAdded.add(messageAdded.message.threadId); } }); } if (historyItem.labelsAdded) { historyItem.labelsAdded.forEach((labelAdded) => { if (labelAdded.message?.threadId) { - threadIds.add(labelAdded.message.threadId); + threadsChanged.add(labelAdded.message.threadId); } }); } if (historyItem.labelsRemoved) { historyItem.labelsRemoved.forEach((labelRemoved) => { if (labelRemoved.message?.threadId) { - threadIds.add(labelRemoved.message.threadId); + threadsChanged.add(labelRemoved.message.threadId); } }); } }); - yield* Console.log('[ZERO_WORKFLOW] Found unique thread IDs:', Array.from(threadIds)); + yield* Console.log( + '[ZERO_WORKFLOW] Found unique thread IDs:', + Array.from(threadsChanged), + Array.from(threadsAdded), + ); + + if (threadsAdded.size > 0) { + const threadWorkflowParams = Array.from(threadsAdded); + + // Sync threads with proper error handling - use allSuccesses to collect successful syncs + const syncResults = yield* Effect.allSuccesses( + threadWorkflowParams.map((threadId) => + Effect.tryPromise({ + try: async () => { + const result = await agent.syncThread({ threadId }); + console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`); + return { threadId, result }; + }, + catch: (error) => { + console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error); + // Let this effect fail so allSuccesses will exclude it + throw new Error( + `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`, + ); + }, + }), + ), + { concurrency: 1 }, // Limit concurrency to avoid rate limits + ); + + const syncedCount = syncResults.length; + const failedCount = threadWorkflowParams.length - syncedCount; + + if (failedCount > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`, + ); + // Continue with processing - sync failures shouldn't stop the entire workflow + // The thread processing will continue with whatever data is available + } else { + yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`); + } + + yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults); + } // Process all threads concurrently using Effect.all - if (threadIds.size > 0) { - const threadWorkflowParams = Array.from(threadIds).map((threadId) => ({ + if (threadsChanged.size > 0) { + const threadWorkflowParams = Array.from(threadsChanged).map((threadId) => ({ connectionId, threadId, providerId: foundConnection.providerId, @@ -339,7 +391,7 @@ export const runZeroWorkflow = ( ); }), ), - { concurrency: 1 }, // Process up to 5 threads concurrently + { concurrency: 1, discard: true }, // Process up to 5 threads concurrently ); yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); @@ -407,6 +459,11 @@ type ThreadWorkflowError = | { _tag: 'GmailApiError'; error: unknown } | { _tag: 'VectorizationError'; error: unknown }; +/** + * Runs the main workflow for processing a thread. The workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * @param params + * @returns + */ export const runThreadWorkflow = ( params: ThreadWorkflowParams, ): Effect.Effect => @@ -613,12 +670,9 @@ export const runThreadWorkflow = ( { role: 'system', content: SummarizeMessagePrompt }, { role: 'user', content: prompt }, ]; - const response: any = await env.AI.run( - '@cf/meta/llama-4-scout-17b-16e-instruct', - { - messages, - }, - ); + const response = await env.AI.run('@cf/meta/llama-4-scout-17b-16e-instruct', { + messages, + }); console.log( `[THREAD_WORKFLOW] Summary generated for message ${message.id}:`, response, @@ -696,7 +750,7 @@ export const runThreadWorkflow = ( return null; } console.log('[THREAD_WORKFLOW] Found existing thread summary'); - return threadSummary[0].metadata as any; + return threadSummary[0].metadata as { summary: string; lastMsg: string }; }, catch: (error) => ({ _tag: 'VectorizationError' as const, error }), }); diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 5790deab17..f773ba3905 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -15,11 +15,11 @@ */ import { - type StreamTextOnFinishCallback, - createDataStreamResponse, - streamText, appendResponseMessages, + createDataStreamResponse, generateText, + streamText, + type StreamTextOnFinishCallback, } from 'ai'; import { IncomingMessageType, @@ -30,10 +30,10 @@ import { import { EPrompts, type IOutgoingMessage, - type ParsedMessage, type ISnoozeBatch, + type ParsedMessage, } from '../../types'; -import type { MailManager, IGetThreadResponse, IGetThreadsResponse } from '../../lib/driver/types'; +import type { IGetThreadResponse, IGetThreadsResponse, MailManager } from '../../lib/driver/types'; import { DurableObjectOAuthClientProvider } from 'agents/mcp/do-oauth-client-provider'; import { AiChatPrompt, GmailSearchAssistantSystemPrompt } from '../../lib/prompts'; import { connectionToDriver, getZeroSocketAgent } from '../../lib/server-utils'; @@ -852,8 +852,7 @@ export class ZeroDriver extends AIChatAgent { `; if (!result || result.length === 0) { - const res = await this.queue('syncThread', { threadId: id }); - console.log('res', res); + await this.queue('syncThread', { threadId: id }); return { messages: [], latest: undefined, From 6c76d3d44d544e6e7ea0568668366c13c6724192 Mon Sep 17 00:00:00 2001 From: Adam <13007539+MrgSub@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:18:31 -0700 Subject: [PATCH 2/3] Implement thread deletion functionality (#1799) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Implement Thread Deletion Functionality ## Description This PR implements thread deletion functionality, allowing users to permanently delete threads from the bin. The implementation includes: 1. Replacing the optimistic delete with a real deletion using TRPC mutation 2. Enabling the previously disabled delete button in the thread context menu 3. Adding server-side support for thread deletion from the database 4. Adding folder name normalization to handle 'bin' vs 'trash' naming differences ## Type of Change - [x] ✨ New feature (non-breaking change which adds functionality) - [x] 🐛 Bug fix (non-breaking change which fixes an issue) ## Areas Affected - [x] Email Integration (Gmail, IMAP, etc.) - [x] User Interface/Experience - [x] Data Storage/Management ## Testing Done - [x] Manual testing performed ## Checklist - [x] I have performed a self-review of my code - [x] My changes generate no new warnings ## Additional Notes The implementation now properly deletes threads from the database when the user selects the delete option from the context menu. The UI provides toast notifications to indicate the deletion status. _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ --- .../components/context/thread-context.tsx | 53 +++++++++---------- apps/server/src/routes/agent/index.ts | 32 +++++++---- apps/server/src/routes/agent/rpc.ts | 4 +- 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/apps/mail/components/context/thread-context.tsx b/apps/mail/components/context/thread-context.tsx index 6d48164abb..76a70c9d63 100644 --- a/apps/mail/components/context/thread-context.tsx +++ b/apps/mail/components/context/thread-context.tsx @@ -25,10 +25,13 @@ import { } from 'lucide-react'; import { useOptimisticThreadState } from '@/components/mail/optimistic-thread-state'; import { useOptimisticActions } from '@/hooks/use-optimistic-actions'; +import { ExclamationCircle, Mail, Clock } from '../icons/icons'; +import { SnoozeDialog } from '@/components/mail/snooze-dialog'; import { type ThreadDestination } from '@/lib/thread-actions'; import { useThread, useThreads } from '@/hooks/use-threads'; -import { ExclamationCircle, Mail, Clock } from '../icons/icons'; import { useMemo, type ReactNode, useState } from 'react'; +import { useTRPC } from '@/providers/query-provider'; +import { useMutation } from '@tanstack/react-query'; import { useLabels } from '@/hooks/use-labels'; import { FOLDERS, LABELS } from '@/lib/utils'; import { useMail } from '../mail/use-mail'; @@ -37,7 +40,6 @@ import { m } from '@/paraglide/messages'; import { useParams } from 'react-router'; import { useQueryState } from 'nuqs'; import { toast } from 'sonner'; -import { SnoozeDialog } from '@/components/mail/snooze-dialog'; interface EmailAction { id: string; @@ -93,10 +95,14 @@ const LabelsList = ({ threadId, bulkSelected }: { threadId: string; bulkSelected {labels .filter((label) => label.id) .map((label) => { - let isChecked = label.id ? thread!.labels?.some((l) => l.id === label.id) ?? false : false; + let isChecked = label.id + ? (thread!.labels?.some((l) => l.id === label.id) ?? false) + : false; if (rightClickedThreadOptimisticState.optimisticLabels) { - if (rightClickedThreadOptimisticState.optimisticLabels.addedLabelIds.includes(label.id)) { + if ( + rightClickedThreadOptimisticState.optimisticLabels.addedLabelIds.includes(label.id) + ) { isChecked = true; } else if ( rightClickedThreadOptimisticState.optimisticLabels.removedLabelIds.includes(label.id) @@ -141,16 +147,18 @@ export function ThreadContextMenu({ const { data: threadData } = useThread(threadId); const [, setActiveReplyId] = useQueryState('activeReplyId'); const optimisticState = useOptimisticThreadState(threadId); + const trpc = useTRPC(); const { optimisticMoveThreadsTo, optimisticToggleStar, optimisticToggleImportant, optimisticMarkAsRead, optimisticMarkAsUnread, - optimisticDeleteThreads, + // optimisticDeleteThreads, optimisticSnooze, optimisticUnsnooze, } = useOptimisticActions(); + const { mutateAsync: deleteThread } = useMutation(trpc.mail.delete.mutationOptions()); const { isUnread, isStarred, isImportant } = useMemo(() => { const unread = threadData?.hasUnread ?? false; @@ -305,18 +313,18 @@ export function ThreadContextMenu({ const handleDelete = () => () => { const targets = mail.bulkSelected.length ? mail.bulkSelected : [threadId]; - // Use optimistic update with undo functionality - optimisticDeleteThreads(targets, currentFolder); - - // Clear bulk selection after action - if (mail.bulkSelected.length) { - setMail((prev) => ({ ...prev, bulkSelected: [] })); - } - - // Navigation removed to prevent route change on current thread action - // if (!mail.bulkSelected.length && threadId) { - // navigate(`/mail/${currentFolder}`); - // } + toast.promise( + Promise.all( + targets.map(async (id) => { + return deleteThread({ id }); + }), + ), + { + loading: 'Deleting...', + success: 'Deleted', + error: 'Failed to delete', + }, + ); }; const getActions = useMemo(() => { @@ -353,7 +361,6 @@ export function ThreadContextMenu({ label: m['common.mail.deleteFromBin'](), icon: , action: handleDelete(), - disabled: true, }, ]; } @@ -493,15 +500,7 @@ export function ThreadContextMenu({ disabled: false, }, ], - [ - isUnread, - isImportant, - isStarred, - m, - handleReadUnread, - handleToggleImportant, - handleFavorites, - ], + [isUnread, isImportant, isStarred, m, handleReadUnread, handleToggleImportant, handleFavorites], ); const renderAction = (action: EmailAction) => { diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index f773ba3905..00c30145e0 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -367,6 +367,16 @@ export class ZeroDriver extends AIChatAgent { DROP TABLE IF EXISTS threads;`; } + async deleteThread(id: string) { + void this.sql` + DELETE FROM threads WHERE thread_id = ${id}; + `; + this.agent?.broadcastChatMessage({ + type: OutgoingMessageType.Mail_List, + folder: 'bin', + }); + } + async syncThread({ threadId }: { threadId: string }) { if (this.name === 'general') return; if (!this.driver) { @@ -384,7 +394,7 @@ export class ZeroDriver extends AIChatAgent { } this.syncThreadsInProgress.set(threadId, true); - console.log('Server: syncThread called for thread', threadId); + // console.log('Server: syncThread called for thread', threadId); try { const threadData = await this.getWithRetry(threadId); const latest = threadData.latest; @@ -432,10 +442,10 @@ export class ZeroDriver extends AIChatAgent { threadId, }); this.syncThreadsInProgress.delete(threadId); - console.log('Server: syncThread result', { - threadId, - labels: threadData.labels, - }); + // console.log('Server: syncThread result', { + // threadId, + // labels: threadData.labels, + // }); return { success: true, threadId, threadData }; } else { this.syncThreadsInProgress.delete(threadId); @@ -656,6 +666,11 @@ export class ZeroDriver extends AIChatAgent { }; } + normalizeFolderName(folderName: string) { + if (folderName === 'bin') return 'trash'; + return folderName; + } + async getThreadsFromDB(params: { labelIds?: string[]; folder?: string; @@ -663,15 +678,14 @@ export class ZeroDriver extends AIChatAgent { maxResults?: number; pageToken?: string; }): Promise { - const { labelIds = [], folder, q, maxResults = 50, pageToken } = params; + const { labelIds = [], q, maxResults = 50, pageToken } = params; + let folder = params.folder ?? 'inbox'; try { + folder = this.normalizeFolderName(folder); const folderThreadCount = (await this.count()).find((c) => c.label === folder)?.count; const currentThreadCount = await this.getThreadCount(); - console.log('folderThreadCount', folderThreadCount, folder); - console.log('currentThreadCount', currentThreadCount); - if (folderThreadCount && folderThreadCount > currentThreadCount && folder) { this.ctx.waitUntil(this.syncThreads(folder)); } diff --git a/apps/server/src/routes/agent/rpc.ts b/apps/server/src/routes/agent/rpc.ts index c7f279ccd8..b36c0e1e12 100644 --- a/apps/server/src/routes/agent/rpc.ts +++ b/apps/server/src/routes/agent/rpc.ts @@ -161,7 +161,9 @@ export class DriverRpcDO extends RpcTarget { } async delete(id: string) { - return await this.mainDo.delete(id); + const result = await this.mainDo.delete(id); + await this.mainDo.deleteThread(id); + return result; } async deleteAllSpam() { From 28f730bdcb8a15df3900b8551062e4322b0757be Mon Sep 17 00:00:00 2001 From: Adam <13007539+MrgSub@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:27:29 -0700 Subject: [PATCH 3/3] Remove unused partykit import (#1800) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # READ CAREFULLY THEN REMOVE Remove bullet points that are not relevant. PLEASE REFRAIN FROM USING AI TO WRITE YOUR CODE AND PR DESCRIPTION. IF YOU DO USE AI TO WRITE YOUR CODE PLEASE PROVIDE A DESCRIPTION AND REVIEW IT CAREFULLY. MAKE SURE YOU UNDERSTAND THE CODE YOU ARE SUBMITTING USING AI. - Pull requests that do not follow these guidelines will be closed without review or comment. - If you use AI to write your PR description your pr will be close without review or comment. - If you are unsure about anything, feel free to ask for clarification. ## Description Please provide a clear description of your changes. --- ## Type of Change Please delete options that are not relevant. - [ ] 🐛 Bug fix (non-breaking change which fixes an issue) - [ ] ✨ New feature (non-breaking change which adds functionality) - [ ] 💥 Breaking change (fix or feature with breaking changes) - [ ] 📝 Documentation update - [ ] 🎨 UI/UX improvement - [ ] 🔒 Security enhancement - [ ] ⚡ Performance improvement ## Areas Affected Please check all that apply: - [ ] Email Integration (Gmail, IMAP, etc.) - [ ] User Interface/Experience - [ ] Authentication/Authorization - [ ] Data Storage/Management - [ ] API Endpoints - [ ] Documentation - [ ] Testing Infrastructure - [ ] Development Workflow - [ ] Deployment/Infrastructure ## Testing Done Describe the tests you've done: - [ ] Unit tests added/updated - [ ] Integration tests added/updated - [ ] Manual testing performed - [ ] Cross-browser testing (if UI changes) - [ ] Mobile responsiveness verified (if UI changes) ## Security Considerations For changes involving data or authentication: - [ ] No sensitive data is exposed - [ ] Authentication checks are in place - [ ] Input validation is implemented - [ ] Rate limiting is considered (if applicable) ## Checklist - [ ] I have read the [CONTRIBUTING](https://github.com/Mail-0/Zero/blob/staging/.github/CONTRIBUTING.md) document - [ ] My code follows the project's style guidelines - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in complex areas - [ ] I have updated the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix/feature works - [ ] All tests pass locally - [ ] Any dependent changes are merged and published ## Additional Notes Add any other context about the pull request here. ## Screenshots/Recordings Add screenshots or recordings here if applicable. --- _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ --- ## Summary by cubic Removed an unused import of routePartykitRequest from partyserver in main.ts to clean up the code. --- apps/server/src/main.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 19886f2aa5..798fdb8ea1 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -24,7 +24,6 @@ import { ZeroAgent, ZeroDriver } from './routes/agent'; import { contextStorage } from 'hono/context-storage'; import { defaultUserSettings } from './lib/schemas'; import { createLocalJWKSet, jwtVerify } from 'jose'; -import { routePartykitRequest } from 'partyserver'; import { getZeroAgent } from './lib/server-utils'; import { enableBrainFunction } from './lib/brain'; import { trpcServer } from '@hono/trpc-server';