diff --git a/apps/mail/components/context/thread-context.tsx b/apps/mail/components/context/thread-context.tsx
index 6d48164abb..76a70c9d63 100644
--- a/apps/mail/components/context/thread-context.tsx
+++ b/apps/mail/components/context/thread-context.tsx
@@ -25,10 +25,13 @@ import {
} from 'lucide-react';
import { useOptimisticThreadState } from '@/components/mail/optimistic-thread-state';
import { useOptimisticActions } from '@/hooks/use-optimistic-actions';
+import { ExclamationCircle, Mail, Clock } from '../icons/icons';
+import { SnoozeDialog } from '@/components/mail/snooze-dialog';
import { type ThreadDestination } from '@/lib/thread-actions';
import { useThread, useThreads } from '@/hooks/use-threads';
-import { ExclamationCircle, Mail, Clock } from '../icons/icons';
import { useMemo, type ReactNode, useState } from 'react';
+import { useTRPC } from '@/providers/query-provider';
+import { useMutation } from '@tanstack/react-query';
import { useLabels } from '@/hooks/use-labels';
import { FOLDERS, LABELS } from '@/lib/utils';
import { useMail } from '../mail/use-mail';
@@ -37,7 +40,6 @@ import { m } from '@/paraglide/messages';
import { useParams } from 'react-router';
import { useQueryState } from 'nuqs';
import { toast } from 'sonner';
-import { SnoozeDialog } from '@/components/mail/snooze-dialog';
interface EmailAction {
id: string;
@@ -93,10 +95,14 @@ const LabelsList = ({ threadId, bulkSelected }: { threadId: string; bulkSelected
{labels
.filter((label) => label.id)
.map((label) => {
- let isChecked = label.id ? thread!.labels?.some((l) => l.id === label.id) ?? false : false;
+ let isChecked = label.id
+ ? (thread!.labels?.some((l) => l.id === label.id) ?? false)
+ : false;
if (rightClickedThreadOptimisticState.optimisticLabels) {
- if (rightClickedThreadOptimisticState.optimisticLabels.addedLabelIds.includes(label.id)) {
+ if (
+ rightClickedThreadOptimisticState.optimisticLabels.addedLabelIds.includes(label.id)
+ ) {
isChecked = true;
} else if (
rightClickedThreadOptimisticState.optimisticLabels.removedLabelIds.includes(label.id)
@@ -141,16 +147,18 @@ export function ThreadContextMenu({
const { data: threadData } = useThread(threadId);
const [, setActiveReplyId] = useQueryState('activeReplyId');
const optimisticState = useOptimisticThreadState(threadId);
+ const trpc = useTRPC();
const {
optimisticMoveThreadsTo,
optimisticToggleStar,
optimisticToggleImportant,
optimisticMarkAsRead,
optimisticMarkAsUnread,
- optimisticDeleteThreads,
+ // optimisticDeleteThreads,
optimisticSnooze,
optimisticUnsnooze,
} = useOptimisticActions();
+ const { mutateAsync: deleteThread } = useMutation(trpc.mail.delete.mutationOptions());
const { isUnread, isStarred, isImportant } = useMemo(() => {
const unread = threadData?.hasUnread ?? false;
@@ -305,18 +313,18 @@ export function ThreadContextMenu({
const handleDelete = () => () => {
const targets = mail.bulkSelected.length ? mail.bulkSelected : [threadId];
- // Use optimistic update with undo functionality
- optimisticDeleteThreads(targets, currentFolder);
-
- // Clear bulk selection after action
- if (mail.bulkSelected.length) {
- setMail((prev) => ({ ...prev, bulkSelected: [] }));
- }
-
- // Navigation removed to prevent route change on current thread action
- // if (!mail.bulkSelected.length && threadId) {
- // navigate(`/mail/${currentFolder}`);
- // }
+ toast.promise(
+ Promise.all(
+ targets.map(async (id) => {
+ return deleteThread({ id });
+ }),
+ ),
+ {
+ loading: 'Deleting...',
+ success: 'Deleted',
+ error: 'Failed to delete',
+ },
+ );
};
const getActions = useMemo(() => {
@@ -353,7 +361,6 @@ export function ThreadContextMenu({
label: m['common.mail.deleteFromBin'](),
icon: ,
action: handleDelete(),
- disabled: true,
},
];
}
@@ -493,15 +500,7 @@ export function ThreadContextMenu({
disabled: false,
},
],
- [
- isUnread,
- isImportant,
- isStarred,
- m,
- handleReadUnread,
- handleToggleImportant,
- handleFavorites,
- ],
+ [isUnread, isImportant, isStarred, m, handleReadUnread, handleToggleImportant, handleFavorites],
);
const renderAction = (action: EmailAction) => {
diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts
index ea45b65a52..798fdb8ea1 100644
--- a/apps/server/src/main.ts
+++ b/apps/server/src/main.ts
@@ -24,7 +24,6 @@ import { ZeroAgent, ZeroDriver } from './routes/agent';
import { contextStorage } from 'hono/context-storage';
import { defaultUserSettings } from './lib/schemas';
import { createLocalJWKSet, jwtVerify } from 'jose';
-import { routePartykitRequest } from 'partyserver';
import { getZeroAgent } from './lib/server-utils';
import { enableBrainFunction } from './lib/brain';
import { trpcServer } from '@hono/trpc-server';
@@ -691,7 +690,7 @@ export default class extends WorkerEntrypoint {
await env.thread_queue.send({
providerId,
historyId: body.historyId,
- subscriptionName: subHeader!,
+ subscriptionName: subHeader,
});
} catch (error) {
console.error('Error sending to thread queue', error, {
@@ -705,12 +704,6 @@ export default class extends WorkerEntrypoint {
});
async fetch(request: Request): Promise {
- if (request.url.includes('/zero/durable-mailbox')) {
- const res = await routePartykitRequest(request, env as unknown as Record, {
- prefix: 'zero',
- });
- if (res) return res;
- }
return this.app.fetch(request, this.env, this.ctx);
}
@@ -723,8 +716,6 @@ export default class extends WorkerEntrypoint {
batch.messages.map(async (msg: Message) => {
const connectionId = msg.body.connectionId;
const providerId = msg.body.providerId;
- console.log('connectionId', connectionId);
- console.log('providerId', providerId);
try {
await enableBrainFunction({ id: connectionId, providerId });
} catch (error) {
@@ -742,7 +733,6 @@ export default class extends WorkerEntrypoint {
return;
}
case batch.queue.startsWith('thread-queue'): {
- console.log('batch', batch);
try {
await Promise.all(
batch.messages.map(async (msg: Message) => {
diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts
index 1fcc4b44f0..669add8fe8 100644
--- a/apps/server/src/pipelines.effect.ts
+++ b/apps/server/src/pipelines.effect.ts
@@ -65,7 +65,7 @@ type MainWorkflowError =
const validateArguments = (
params: MainWorkflowParams,
- serviceAccount: any,
+ serviceAccount: { project_id: string },
): Effect.Effect =>
Effect.gen(function* () {
yield* Console.log('[MAIN_WORKFLOW] Validating arguments');
@@ -87,6 +87,12 @@ const validateArguments = (
const override = false;
+/**
+ * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline.
+ * It validates the subscription name and extracts the connection ID.
+ * @param params
+ * @returns
+ */
export const runMainWorkflow = (
params: MainWorkflowParams,
): Effect.Effect =>
@@ -265,36 +271,82 @@ export const runZeroWorkflow = (
});
// Extract thread IDs from history
- const threadIds = new Set();
+ const threadsChanged = new Set();
+ const threadsAdded = new Set();
history.forEach((historyItem) => {
if (historyItem.messagesAdded) {
historyItem.messagesAdded.forEach((messageAdded) => {
if (messageAdded.message?.threadId) {
- threadIds.add(messageAdded.message.threadId);
+ threadsChanged.add(messageAdded.message.threadId);
+ threadsAdded.add(messageAdded.message.threadId);
}
});
}
if (historyItem.labelsAdded) {
historyItem.labelsAdded.forEach((labelAdded) => {
if (labelAdded.message?.threadId) {
- threadIds.add(labelAdded.message.threadId);
+ threadsChanged.add(labelAdded.message.threadId);
}
});
}
if (historyItem.labelsRemoved) {
historyItem.labelsRemoved.forEach((labelRemoved) => {
if (labelRemoved.message?.threadId) {
- threadIds.add(labelRemoved.message.threadId);
+ threadsChanged.add(labelRemoved.message.threadId);
}
});
}
});
- yield* Console.log('[ZERO_WORKFLOW] Found unique thread IDs:', Array.from(threadIds));
+ yield* Console.log(
+ '[ZERO_WORKFLOW] Found unique thread IDs:',
+ Array.from(threadsChanged),
+ Array.from(threadsAdded),
+ );
+
+ if (threadsAdded.size > 0) {
+ const threadWorkflowParams = Array.from(threadsAdded);
+
+ // Sync threads with proper error handling - use allSuccesses to collect successful syncs
+ const syncResults = yield* Effect.allSuccesses(
+ threadWorkflowParams.map((threadId) =>
+ Effect.tryPromise({
+ try: async () => {
+ const result = await agent.syncThread({ threadId });
+ console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`);
+ return { threadId, result };
+ },
+ catch: (error) => {
+ console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error);
+ // Let this effect fail so allSuccesses will exclude it
+ throw new Error(
+ `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`,
+ );
+ },
+ }),
+ ),
+ { concurrency: 1 }, // Limit concurrency to avoid rate limits
+ );
+
+ const syncedCount = syncResults.length;
+ const failedCount = threadWorkflowParams.length - syncedCount;
+
+ if (failedCount > 0) {
+ yield* Console.log(
+ `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`,
+ );
+ // Continue with processing - sync failures shouldn't stop the entire workflow
+ // The thread processing will continue with whatever data is available
+ } else {
+ yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`);
+ }
+
+ yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults);
+ }
// Process all threads concurrently using Effect.all
- if (threadIds.size > 0) {
- const threadWorkflowParams = Array.from(threadIds).map((threadId) => ({
+ if (threadsChanged.size > 0) {
+ const threadWorkflowParams = Array.from(threadsChanged).map((threadId) => ({
connectionId,
threadId,
providerId: foundConnection.providerId,
@@ -339,7 +391,7 @@ export const runZeroWorkflow = (
);
}),
),
- { concurrency: 1 }, // Process up to 5 threads concurrently
+ { concurrency: 1, discard: true }, // Process up to 5 threads concurrently
);
yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length);
@@ -407,6 +459,11 @@ type ThreadWorkflowError =
| { _tag: 'GmailApiError'; error: unknown }
| { _tag: 'VectorizationError'; error: unknown };
+/**
+ * Runs the main workflow for processing a thread. The workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline.
+ * @param params
+ * @returns
+ */
export const runThreadWorkflow = (
params: ThreadWorkflowParams,
): Effect.Effect =>
@@ -613,12 +670,9 @@ export const runThreadWorkflow = (
{ role: 'system', content: SummarizeMessagePrompt },
{ role: 'user', content: prompt },
];
- const response: any = await env.AI.run(
- '@cf/meta/llama-4-scout-17b-16e-instruct',
- {
- messages,
- },
- );
+ const response = await env.AI.run('@cf/meta/llama-4-scout-17b-16e-instruct', {
+ messages,
+ });
console.log(
`[THREAD_WORKFLOW] Summary generated for message ${message.id}:`,
response,
@@ -696,7 +750,7 @@ export const runThreadWorkflow = (
return null;
}
console.log('[THREAD_WORKFLOW] Found existing thread summary');
- return threadSummary[0].metadata as any;
+ return threadSummary[0].metadata as { summary: string; lastMsg: string };
},
catch: (error) => ({ _tag: 'VectorizationError' as const, error }),
});
diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts
index 5790deab17..00c30145e0 100644
--- a/apps/server/src/routes/agent/index.ts
+++ b/apps/server/src/routes/agent/index.ts
@@ -15,11 +15,11 @@
*/
import {
- type StreamTextOnFinishCallback,
- createDataStreamResponse,
- streamText,
appendResponseMessages,
+ createDataStreamResponse,
generateText,
+ streamText,
+ type StreamTextOnFinishCallback,
} from 'ai';
import {
IncomingMessageType,
@@ -30,10 +30,10 @@ import {
import {
EPrompts,
type IOutgoingMessage,
- type ParsedMessage,
type ISnoozeBatch,
+ type ParsedMessage,
} from '../../types';
-import type { MailManager, IGetThreadResponse, IGetThreadsResponse } from '../../lib/driver/types';
+import type { IGetThreadResponse, IGetThreadsResponse, MailManager } from '../../lib/driver/types';
import { DurableObjectOAuthClientProvider } from 'agents/mcp/do-oauth-client-provider';
import { AiChatPrompt, GmailSearchAssistantSystemPrompt } from '../../lib/prompts';
import { connectionToDriver, getZeroSocketAgent } from '../../lib/server-utils';
@@ -367,6 +367,16 @@ export class ZeroDriver extends AIChatAgent {
DROP TABLE IF EXISTS threads;`;
}
+ async deleteThread(id: string) {
+ void this.sql`
+ DELETE FROM threads WHERE thread_id = ${id};
+ `;
+ this.agent?.broadcastChatMessage({
+ type: OutgoingMessageType.Mail_List,
+ folder: 'bin',
+ });
+ }
+
async syncThread({ threadId }: { threadId: string }) {
if (this.name === 'general') return;
if (!this.driver) {
@@ -384,7 +394,7 @@ export class ZeroDriver extends AIChatAgent {
}
this.syncThreadsInProgress.set(threadId, true);
- console.log('Server: syncThread called for thread', threadId);
+ // console.log('Server: syncThread called for thread', threadId);
try {
const threadData = await this.getWithRetry(threadId);
const latest = threadData.latest;
@@ -432,10 +442,10 @@ export class ZeroDriver extends AIChatAgent {
threadId,
});
this.syncThreadsInProgress.delete(threadId);
- console.log('Server: syncThread result', {
- threadId,
- labels: threadData.labels,
- });
+ // console.log('Server: syncThread result', {
+ // threadId,
+ // labels: threadData.labels,
+ // });
return { success: true, threadId, threadData };
} else {
this.syncThreadsInProgress.delete(threadId);
@@ -656,6 +666,11 @@ export class ZeroDriver extends AIChatAgent {
};
}
+ normalizeFolderName(folderName: string) {
+ if (folderName === 'bin') return 'trash';
+ return folderName;
+ }
+
async getThreadsFromDB(params: {
labelIds?: string[];
folder?: string;
@@ -663,15 +678,14 @@ export class ZeroDriver extends AIChatAgent {
maxResults?: number;
pageToken?: string;
}): Promise {
- const { labelIds = [], folder, q, maxResults = 50, pageToken } = params;
+ const { labelIds = [], q, maxResults = 50, pageToken } = params;
+ let folder = params.folder ?? 'inbox';
try {
+ folder = this.normalizeFolderName(folder);
const folderThreadCount = (await this.count()).find((c) => c.label === folder)?.count;
const currentThreadCount = await this.getThreadCount();
- console.log('folderThreadCount', folderThreadCount, folder);
- console.log('currentThreadCount', currentThreadCount);
-
if (folderThreadCount && folderThreadCount > currentThreadCount && folder) {
this.ctx.waitUntil(this.syncThreads(folder));
}
@@ -852,8 +866,7 @@ export class ZeroDriver extends AIChatAgent {
`;
if (!result || result.length === 0) {
- const res = await this.queue('syncThread', { threadId: id });
- console.log('res', res);
+ await this.queue('syncThread', { threadId: id });
return {
messages: [],
latest: undefined,
diff --git a/apps/server/src/routes/agent/rpc.ts b/apps/server/src/routes/agent/rpc.ts
index c7f279ccd8..b36c0e1e12 100644
--- a/apps/server/src/routes/agent/rpc.ts
+++ b/apps/server/src/routes/agent/rpc.ts
@@ -161,7 +161,9 @@ export class DriverRpcDO extends RpcTarget {
}
async delete(id: string) {
- return await this.mainDo.delete(id);
+ const result = await this.mainDo.delete(id);
+ await this.mainDo.deleteThread(id);
+ return result;
}
async deleteAllSpam() {