diff --git a/apps/mail/components/mail/reply-composer.tsx b/apps/mail/components/mail/reply-composer.tsx index d230a2f832..8c9245fbbd 100644 --- a/apps/mail/components/mail/reply-composer.tsx +++ b/apps/mail/components/mail/reply-composer.tsx @@ -242,10 +242,10 @@ export default function ReplyCompose({ messageId }: ReplyComposeProps) { if (!mode || !emailData) return null; return ( -
+
{ setMode(null); diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 798fdb8ea1..55e1297133 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -711,51 +711,43 @@ export default class extends WorkerEntrypoint { switch (true) { case batch.queue.startsWith('subscribe-queue'): { console.log('batch', batch); - try { - await Promise.all( - batch.messages.map(async (msg: Message) => { - const connectionId = msg.body.connectionId; - const providerId = msg.body.providerId; - try { - await enableBrainFunction({ id: connectionId, providerId }); - } catch (error) { - console.error( - `Failed to enable brain function for connection ${connectionId}:`, - error, - ); - } - }), - ); - console.log('[SUBSCRIBE_QUEUE] batch done'); - } finally { - batch.ackAll(); - } + await Promise.all( + batch.messages.map(async (msg: Message) => { + const connectionId = msg.body.connectionId; + const providerId = msg.body.providerId; + try { + await enableBrainFunction({ id: connectionId, providerId }); + } catch (error) { + console.error( + `Failed to enable brain function for connection ${connectionId}:`, + error, + ); + } + }), + ); + console.log('[SUBSCRIBE_QUEUE] batch done'); return; } case batch.queue.startsWith('thread-queue'): { - try { - await Promise.all( - batch.messages.map(async (msg: Message) => { - const providerId = msg.body.providerId; - const historyId = msg.body.historyId; - const subscriptionName = msg.body.subscriptionName; - const workflow = runWorkflow(EWorkflowType.MAIN, { - providerId, - historyId, - subscriptionName, - }); - - try { - const result = await Effect.runPromise(workflow); - console.log('[THREAD_QUEUE] result', result); - } catch (error) { - console.error('Error running workflow', error); - } - }), - ); - } finally { - batch.ackAll(); - } + await Promise.all( + batch.messages.map(async (msg: Message) => { + const providerId = msg.body.providerId; + const historyId = msg.body.historyId; + const subscriptionName = msg.body.subscriptionName; + const workflow = runWorkflow(EWorkflowType.MAIN, { + providerId, + historyId, + subscriptionName, + }); + + try { + const result = await Effect.runPromise(workflow); + console.log('[THREAD_QUEUE] result', result); + } catch (error) { + console.error('Error running workflow', error); + } + }), + ); break; } } diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index 669add8fe8..b32e75083c 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -23,9 +23,9 @@ import { analyzeEmailIntent, } from './thread-workflow-utils'; import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types'; +import { EWorkflowType, getPromptName, runWorkflow } from './pipelines'; import { getZeroAgent } from './lib/server-utils'; import { type gmail_v1 } from '@googleapis/gmail'; -import { getPromptName } from './pipelines'; import { env } from 'cloudflare:workers'; import { connection } from './db/schema'; import { Effect, Console } from 'effect'; @@ -140,7 +140,7 @@ export const runMainWorkflow = ( nextHistoryId: historyId, }; - const result = yield* runZeroWorkflow(zeroWorkflowParams).pipe( + const result = yield* runWorkflow(EWorkflowType.ZERO, zeroWorkflowParams).pipe( Effect.mapError( (error): MainWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, error }), ), @@ -185,16 +185,22 @@ export const runZeroWorkflow = ( const { connectionId, historyId, nextHistoryId } = params; const historyProcessingKey = `history_${connectionId}__${historyId}`; - const isProcessing = yield* Effect.tryPromise({ - try: () => env.gmail_processing_threads.get(historyProcessingKey), + + // Atomic lock acquisition to prevent race conditions + const lockAcquired = yield* Effect.tryPromise({ + try: async () => { + const response = await env.gmail_processing_threads.put(historyProcessingKey, 'true', { + expirationTtl: 3600, + }); + return response !== null; // null means key already existed + }, catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); - if (isProcessing === 'true') { + if (!lockAcquired) { yield* Console.log('[ZERO_WORKFLOW] History already being processed:', { connectionId, historyId, - processingStatus: isProcessing, }); return yield* Effect.fail({ _tag: 'HistoryAlreadyProcessing' as const, @@ -203,12 +209,10 @@ export const runZeroWorkflow = ( }); } - yield* Effect.tryPromise({ - try: () => - env.gmail_processing_threads.put(historyProcessingKey, 'true', { expirationTtl: 3600 }), - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - yield* Console.log('[ZERO_WORKFLOW] Set processing flag for history:', historyProcessingKey); + yield* Console.log( + '[ZERO_WORKFLOW] Acquired processing lock for history:', + historyProcessingKey, + ); const { db, conn } = createDb(env.HYPERDRIVE.connectionString); @@ -257,11 +261,6 @@ export const runZeroWorkflow = ( catch: (error) => ({ _tag: 'GmailApiError' as const, error }), }); - if (!history.length) { - yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); - return 'No history found'; - } - yield* Effect.tryPromise({ try: () => { console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId); @@ -270,6 +269,11 @@ export const runZeroWorkflow = ( catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); + if (!history.length) { + yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); + return 'No history found'; + } + // Extract thread IDs from history const threadsChanged = new Set(); const threadsAdded = new Set(); @@ -277,7 +281,7 @@ export const runZeroWorkflow = ( if (historyItem.messagesAdded) { historyItem.messagesAdded.forEach((messageAdded) => { if (messageAdded.message?.threadId) { - threadsChanged.add(messageAdded.message.threadId); + // threadsChanged.add(messageAdded.message.threadId); threadsAdded.add(messageAdded.message.threadId); } }); @@ -285,14 +289,14 @@ export const runZeroWorkflow = ( if (historyItem.labelsAdded) { historyItem.labelsAdded.forEach((labelAdded) => { if (labelAdded.message?.threadId) { - threadsChanged.add(labelAdded.message.threadId); + // threadsChanged.add(labelAdded.message.threadId); } }); } if (historyItem.labelsRemoved) { historyItem.labelsRemoved.forEach((labelRemoved) => { if (labelRemoved.message?.threadId) { - threadsChanged.add(labelRemoved.message.threadId); + // threadsChanged.add(labelRemoved.message.threadId); } }); } @@ -355,6 +359,17 @@ export const runZeroWorkflow = ( const threadResults = yield* Effect.all( threadWorkflowParams.map((params) => Effect.gen(function* () { + // Check if thread is already processing + const isProcessing = yield* Effect.tryPromise({ + try: () => env.gmail_processing_threads.get(params.threadId.toString()), + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + if (isProcessing === 'true') { + yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId); + return 'Thread already processing'; + } + // Set processing flag for thread yield* Effect.tryPromise({ try: () => { @@ -369,19 +384,8 @@ export const runZeroWorkflow = ( catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); - // Check if thread is already processing - const isProcessing = yield* Effect.tryPromise({ - try: () => env.gmail_processing_threads.get(params.threadId.toString()), - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - - if (isProcessing === 'true') { - yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId); - return 'Thread already processing'; - } - // Run the thread workflow - return yield* runThreadWorkflow(params).pipe( + return yield* runWorkflow(EWorkflowType.THREAD, params).pipe( Effect.mapError( (error): ZeroWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, @@ -394,19 +398,22 @@ export const runZeroWorkflow = ( { concurrency: 1, discard: true }, // Process up to 5 threads concurrently ); - yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); + yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults); } else { yield* Console.log('[ZERO_WORKFLOW] No threads to process'); } - // // Clean up processing flag - // yield* Effect.tryPromise({ - // try: () => { - // console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey); - // return env.gmail_processing_threads.delete(historyProcessingKey); - // }, - // catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - // }).pipe(Effect.orElse(() => Effect.succeed(null))); + // Clean up processing flag + yield* Effect.tryPromise({ + try: () => { + console.log( + '[ZERO_WORKFLOW] Clearing processing flag for history:', + historyProcessingKey, + ); + return env.gmail_processing_threads.delete(historyProcessingKey); + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); yield* Console.log('[ZERO_WORKFLOW] Processing complete'); return 'Zero workflow completed successfully'; @@ -887,8 +894,18 @@ export const runThreadWorkflow = ( add: labelsToAdd, remove: labelsToRemove, }); - await agent.modifyLabels([threadId.toString()], labelsToAdd, labelsToRemove); - await agent.syncThread({ threadId: threadId.toString() }); + await agent.modifyThreadLabelsInDB( + threadId.toString(), + labelsToAdd, + labelsToRemove, + ); + await agent.modifyLabels( + [threadId.toString()], + labelsToAdd, + labelsToRemove, + true, + ); + // await agent.syncThread({ threadId: threadId.toString() }); console.log('[THREAD_WORKFLOW] Successfully modified thread labels'); } else { console.log('[THREAD_WORKFLOW] No label changes needed - labels already match'); diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 00c30145e0..6ed51c800a 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -847,6 +847,116 @@ export class ZeroDriver extends AIChatAgent { } } + async modifyThreadLabelsByName( + threadId: string, + addLabelNames: string[], + removeLabelNames: string[], + ) { + try { + if (!this.driver) { + throw new Error('No driver available'); + } + + // Get all user labels to map names to IDs + const userLabels = await this.getUserLabels(); + const labelMap = new Map(userLabels.map((label) => [label.name.toLowerCase(), label.id])); + + // Convert label names to IDs + const addLabelIds: string[] = []; + const removeLabelIds: string[] = []; + + // Process add labels + for (const labelName of addLabelNames) { + const labelId = labelMap.get(labelName.toLowerCase()); + if (labelId) { + addLabelIds.push(labelId); + } else { + console.warn(`Label "${labelName}" not found in user labels`); + } + } + + // Process remove labels + for (const labelName of removeLabelNames) { + const labelId = labelMap.get(labelName.toLowerCase()); + if (labelId) { + removeLabelIds.push(labelId); + } else { + console.warn(`Label "${labelName}" not found in user labels`); + } + } + + // Call the existing function with IDs + return await this.modifyThreadLabelsInDB(threadId, addLabelIds, removeLabelIds); + } catch (error) { + console.error('Failed to modify thread labels by name:', error); + throw error; + } + } + + async modifyThreadLabelsInDB(threadId: string, addLabels: string[], removeLabels: string[]) { + try { + // Get current labels + const result = this.sql` + SELECT latest_label_ids + FROM threads + WHERE id = ${threadId} + LIMIT 1 + `; + + if (!result || result.length === 0) { + throw new Error(`Thread ${threadId} not found in database`); + } + + let currentLabels: string[]; + try { + currentLabels = JSON.parse(result[0].latest_label_ids || '[]') as string[]; + } catch (error) { + console.error(`Invalid JSON in latest_label_ids for thread ${threadId}:`, error); + currentLabels = []; + } + + // Apply label modifications + let updatedLabels = [...currentLabels]; + + // Remove labels + if (removeLabels.length > 0) { + updatedLabels = updatedLabels.filter((label) => !removeLabels.includes(label)); + } + + // Add labels (avoid duplicates) + if (addLabels.length > 0) { + for (const label of addLabels) { + if (!updatedLabels.includes(label)) { + updatedLabels.push(label); + } + } + } + + // Update the database + void this.sql` + UPDATE threads + SET latest_label_ids = ${JSON.stringify(updatedLabels)}, + updated_at = CURRENT_TIMESTAMP + WHERE id = ${threadId} + `; + + await this.agent?.broadcastChatMessage({ + type: OutgoingMessageType.Mail_Get, + threadId, + }); + + return { + success: true, + threadId, + previousLabels: currentLabels, + updatedLabels, + }; + } catch (error) { + console.error('Failed to modify thread labels in database:', error); + throw error; + } + } + async getThreadFromDB(id: string): Promise { try { const result = this.sql` diff --git a/apps/server/src/routes/agent/rpc.ts b/apps/server/src/routes/agent/rpc.ts index b36c0e1e12..aa77790a8c 100644 --- a/apps/server/src/routes/agent/rpc.ts +++ b/apps/server/src/routes/agent/rpc.ts @@ -100,9 +100,15 @@ export class DriverRpcDO extends RpcTarget { return result; } - async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) { + async modifyLabels( + threadIds: string[], + addLabelIds: string[], + removeLabelIds: string[], + skipSync: boolean = false, + ) { const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread({ threadId: id }))); + if (!skipSync) + await Promise.all(threadIds.map((id) => this.mainDo.syncThread({ threadId: id }))); return result; } @@ -160,6 +166,22 @@ export class DriverRpcDO extends RpcTarget { return await this.mainDo.create(data); } + async modifyThreadLabelsByName( + threadId: string, + addLabelNames: string[], + removeLabelNames: string[], + ) { + return await this.mainDo.modifyThreadLabelsByName(threadId, addLabelNames, removeLabelNames); + } + + async modifyThreadLabelsInDB( + threadId: string, + addLabelNames: string[], + removeLabelNames: string[], + ) { + return await this.mainDo.modifyThreadLabelsInDB(threadId, addLabelNames, removeLabelNames); + } + async delete(id: string) { const result = await this.mainDo.delete(id); await this.mainDo.deleteThread(id); diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc index ae0cff83f5..6ae420f957 100644 --- a/apps/server/wrangler.jsonc +++ b/apps/server/wrangler.jsonc @@ -114,7 +114,7 @@ "DROP_AGENT_TABLES": "false", "THREAD_SYNC_MAX_COUNT": "5", "THREAD_SYNC_LOOP": "false", - "DISABLE_WORKFLOWS": "true", + "DISABLE_WORKFLOWS": "false", "AUTORAG_ID": "", }, "kv_namespaces": [ diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 4fedeb9312..2d9ae21b38 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -5,8 +5,10 @@ let args = []; if (process.argv.slice(2).length === 0) { intro(`Welcome to the Nizzy CLI`); + const user = process.env.USER || process.env.USERNAME || 'there'; + const command = await select({ - message: `Hey ${process.env.USER}, what do you want to do?`, + message: `Hey ${user}, what do you want to do?`, options: Object.values(commands).map((command) => ({ label: command.description, value: command.id,