From 59780ef53eaf5a195439709e228b396e66394719 Mon Sep 17 00:00:00 2001 From: Mustafa Zahid <91423500+musti17@users.noreply.github.com> Date: Wed, 23 Jul 2025 03:01:26 +0500 Subject: [PATCH 1/3] fix: undefined name on specific platforms fixed (#1796) Co-authored-by: cubic-dev-ai[bot] <191113872+cubic-dev-ai[bot]@users.noreply.github.com> --- packages/cli/src/cli.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 4fedeb9312..2d9ae21b38 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -5,8 +5,10 @@ let args = []; if (process.argv.slice(2).length === 0) { intro(`Welcome to the Nizzy CLI`); + const user = process.env.USER || process.env.USERNAME || 'there'; + const command = await select({ - message: `Hey ${process.env.USER}, what do you want to do?`, + message: `Hey ${user}, what do you want to do?`, options: Object.values(commands).map((command) => ({ label: command.description, value: command.id, From f1ba92bb1af6192320ec04879f9334a2775009ff Mon Sep 17 00:00:00 2001 From: Nizzy Date: Wed, 23 Jul 2025 11:29:56 -0400 Subject: [PATCH 2/3] overflow reply composer --- apps/mail/components/mail/reply-composer.tsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/mail/components/mail/reply-composer.tsx b/apps/mail/components/mail/reply-composer.tsx index d230a2f832..8c9245fbbd 100644 --- a/apps/mail/components/mail/reply-composer.tsx +++ b/apps/mail/components/mail/reply-composer.tsx @@ -242,10 +242,10 @@ export default function ReplyCompose({ messageId }: ReplyComposeProps) { if (!mode || !emailData) return null; return ( -
+
{ setMode(null); From f66ba03cc08bb96f3827309425c0c3ed20017641 Mon Sep 17 00:00:00 2001 From: Adam <13007539+MrgSub@users.noreply.github.com> Date: Wed, 23 Jul 2025 08:34:33 -0700 Subject: [PATCH 3/3] Improve thread processing workflow with better locking mechanism (#1803) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description Improved workflow processing reliability and efficiency in the email processing pipeline. Key changes include: 1. Refactored batch processing to properly handle message acknowledgment 2. Implemented atomic locking mechanism to prevent race conditions in history processing 3. Optimized thread label modification with a new DB-first approach 4. Fixed processing order in the Zero workflow to ensure history ID is updated before processing 5. Added thread-level processing checks to avoid duplicate processing 6. Enabled workflows in the development environment ## Type of Change - [x] 🐛 Bug fix (non-breaking change which fixes an issue) - [x] ⚡ Performance improvement ## Areas Affected - [x] Email Integration (Gmail, IMAP, etc.) - [x] Data Storage/Management ## Testing Done - [x] Manual testing performed ## Security Considerations - [x] No sensitive data is exposed ## Checklist - [x] I have performed a self-review of my code - [x] My changes generate no new warnings ## Additional Notes The changes significantly improve the reliability of email processing by preventing race conditions and duplicate processing. The new atomic locking mechanism ensures that each history and thread is processed exactly once, while the DB-first approach to label modification reduces API calls to Gmail. ## Summary by CodeRabbit * **New Features** * Added the ability to modify thread labels by label names or directly in the database, providing more flexible label management options. * **Improvements** * Enhanced label modification methods to allow skipping synchronization for faster updates when needed. * Improved workflow processing reliability through atomic locking to prevent duplicate processing. * Unified workflow execution for better consistency and maintainability. * **Configuration** * Workflows are now enabled by default in the local environment. --- apps/server/src/main.ts | 76 ++++++++---------- apps/server/src/pipelines.effect.ts | 103 ++++++++++++++---------- apps/server/src/routes/agent/index.ts | 110 ++++++++++++++++++++++++++ apps/server/src/routes/agent/rpc.ts | 26 +++++- apps/server/wrangler.jsonc | 2 +- 5 files changed, 229 insertions(+), 88 deletions(-) diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 798fdb8ea1..55e1297133 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -711,51 +711,43 @@ export default class extends WorkerEntrypoint { switch (true) { case batch.queue.startsWith('subscribe-queue'): { console.log('batch', batch); - try { - await Promise.all( - batch.messages.map(async (msg: Message) => { - const connectionId = msg.body.connectionId; - const providerId = msg.body.providerId; - try { - await enableBrainFunction({ id: connectionId, providerId }); - } catch (error) { - console.error( - `Failed to enable brain function for connection ${connectionId}:`, - error, - ); - } - }), - ); - console.log('[SUBSCRIBE_QUEUE] batch done'); - } finally { - batch.ackAll(); - } + await Promise.all( + batch.messages.map(async (msg: Message) => { + const connectionId = msg.body.connectionId; + const providerId = msg.body.providerId; + try { + await enableBrainFunction({ id: connectionId, providerId }); + } catch (error) { + console.error( + `Failed to enable brain function for connection ${connectionId}:`, + error, + ); + } + }), + ); + console.log('[SUBSCRIBE_QUEUE] batch done'); return; } case batch.queue.startsWith('thread-queue'): { - try { - await Promise.all( - batch.messages.map(async (msg: Message) => { - const providerId = msg.body.providerId; - const historyId = msg.body.historyId; - const subscriptionName = msg.body.subscriptionName; - const workflow = runWorkflow(EWorkflowType.MAIN, { - providerId, - historyId, - subscriptionName, - }); - - try { - const result = await Effect.runPromise(workflow); - console.log('[THREAD_QUEUE] result', result); - } catch (error) { - console.error('Error running workflow', error); - } - }), - ); - } finally { - batch.ackAll(); - } + await Promise.all( + batch.messages.map(async (msg: Message) => { + const providerId = msg.body.providerId; + const historyId = msg.body.historyId; + const subscriptionName = msg.body.subscriptionName; + const workflow = runWorkflow(EWorkflowType.MAIN, { + providerId, + historyId, + subscriptionName, + }); + + try { + const result = await Effect.runPromise(workflow); + console.log('[THREAD_QUEUE] result', result); + } catch (error) { + console.error('Error running workflow', error); + } + }), + ); break; } } diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index 669add8fe8..b32e75083c 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -23,9 +23,9 @@ import { analyzeEmailIntent, } from './thread-workflow-utils'; import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types'; +import { EWorkflowType, getPromptName, runWorkflow } from './pipelines'; import { getZeroAgent } from './lib/server-utils'; import { type gmail_v1 } from '@googleapis/gmail'; -import { getPromptName } from './pipelines'; import { env } from 'cloudflare:workers'; import { connection } from './db/schema'; import { Effect, Console } from 'effect'; @@ -140,7 +140,7 @@ export const runMainWorkflow = ( nextHistoryId: historyId, }; - const result = yield* runZeroWorkflow(zeroWorkflowParams).pipe( + const result = yield* runWorkflow(EWorkflowType.ZERO, zeroWorkflowParams).pipe( Effect.mapError( (error): MainWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, error }), ), @@ -185,16 +185,22 @@ export const runZeroWorkflow = ( const { connectionId, historyId, nextHistoryId } = params; const historyProcessingKey = `history_${connectionId}__${historyId}`; - const isProcessing = yield* Effect.tryPromise({ - try: () => env.gmail_processing_threads.get(historyProcessingKey), + + // Atomic lock acquisition to prevent race conditions + const lockAcquired = yield* Effect.tryPromise({ + try: async () => { + const response = await env.gmail_processing_threads.put(historyProcessingKey, 'true', { + expirationTtl: 3600, + }); + return response !== null; // null means key already existed + }, catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); - if (isProcessing === 'true') { + if (!lockAcquired) { yield* Console.log('[ZERO_WORKFLOW] History already being processed:', { connectionId, historyId, - processingStatus: isProcessing, }); return yield* Effect.fail({ _tag: 'HistoryAlreadyProcessing' as const, @@ -203,12 +209,10 @@ export const runZeroWorkflow = ( }); } - yield* Effect.tryPromise({ - try: () => - env.gmail_processing_threads.put(historyProcessingKey, 'true', { expirationTtl: 3600 }), - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - yield* Console.log('[ZERO_WORKFLOW] Set processing flag for history:', historyProcessingKey); + yield* Console.log( + '[ZERO_WORKFLOW] Acquired processing lock for history:', + historyProcessingKey, + ); const { db, conn } = createDb(env.HYPERDRIVE.connectionString); @@ -257,11 +261,6 @@ export const runZeroWorkflow = ( catch: (error) => ({ _tag: 'GmailApiError' as const, error }), }); - if (!history.length) { - yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); - return 'No history found'; - } - yield* Effect.tryPromise({ try: () => { console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId); @@ -270,6 +269,11 @@ export const runZeroWorkflow = ( catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); + if (!history.length) { + yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); + return 'No history found'; + } + // Extract thread IDs from history const threadsChanged = new Set(); const threadsAdded = new Set(); @@ -277,7 +281,7 @@ export const runZeroWorkflow = ( if (historyItem.messagesAdded) { historyItem.messagesAdded.forEach((messageAdded) => { if (messageAdded.message?.threadId) { - threadsChanged.add(messageAdded.message.threadId); + // threadsChanged.add(messageAdded.message.threadId); threadsAdded.add(messageAdded.message.threadId); } }); @@ -285,14 +289,14 @@ export const runZeroWorkflow = ( if (historyItem.labelsAdded) { historyItem.labelsAdded.forEach((labelAdded) => { if (labelAdded.message?.threadId) { - threadsChanged.add(labelAdded.message.threadId); + // threadsChanged.add(labelAdded.message.threadId); } }); } if (historyItem.labelsRemoved) { historyItem.labelsRemoved.forEach((labelRemoved) => { if (labelRemoved.message?.threadId) { - threadsChanged.add(labelRemoved.message.threadId); + // threadsChanged.add(labelRemoved.message.threadId); } }); } @@ -355,6 +359,17 @@ export const runZeroWorkflow = ( const threadResults = yield* Effect.all( threadWorkflowParams.map((params) => Effect.gen(function* () { + // Check if thread is already processing + const isProcessing = yield* Effect.tryPromise({ + try: () => env.gmail_processing_threads.get(params.threadId.toString()), + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + if (isProcessing === 'true') { + yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId); + return 'Thread already processing'; + } + // Set processing flag for thread yield* Effect.tryPromise({ try: () => { @@ -369,19 +384,8 @@ export const runZeroWorkflow = ( catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); - // Check if thread is already processing - const isProcessing = yield* Effect.tryPromise({ - try: () => env.gmail_processing_threads.get(params.threadId.toString()), - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - - if (isProcessing === 'true') { - yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId); - return 'Thread already processing'; - } - // Run the thread workflow - return yield* runThreadWorkflow(params).pipe( + return yield* runWorkflow(EWorkflowType.THREAD, params).pipe( Effect.mapError( (error): ZeroWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, @@ -394,19 +398,22 @@ export const runZeroWorkflow = ( { concurrency: 1, discard: true }, // Process up to 5 threads concurrently ); - yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); + yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults); } else { yield* Console.log('[ZERO_WORKFLOW] No threads to process'); } - // // Clean up processing flag - // yield* Effect.tryPromise({ - // try: () => { - // console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey); - // return env.gmail_processing_threads.delete(historyProcessingKey); - // }, - // catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - // }).pipe(Effect.orElse(() => Effect.succeed(null))); + // Clean up processing flag + yield* Effect.tryPromise({ + try: () => { + console.log( + '[ZERO_WORKFLOW] Clearing processing flag for history:', + historyProcessingKey, + ); + return env.gmail_processing_threads.delete(historyProcessingKey); + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); yield* Console.log('[ZERO_WORKFLOW] Processing complete'); return 'Zero workflow completed successfully'; @@ -887,8 +894,18 @@ export const runThreadWorkflow = ( add: labelsToAdd, remove: labelsToRemove, }); - await agent.modifyLabels([threadId.toString()], labelsToAdd, labelsToRemove); - await agent.syncThread({ threadId: threadId.toString() }); + await agent.modifyThreadLabelsInDB( + threadId.toString(), + labelsToAdd, + labelsToRemove, + ); + await agent.modifyLabels( + [threadId.toString()], + labelsToAdd, + labelsToRemove, + true, + ); + // await agent.syncThread({ threadId: threadId.toString() }); console.log('[THREAD_WORKFLOW] Successfully modified thread labels'); } else { console.log('[THREAD_WORKFLOW] No label changes needed - labels already match'); diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 00c30145e0..6ed51c800a 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -847,6 +847,116 @@ export class ZeroDriver extends AIChatAgent { } } + async modifyThreadLabelsByName( + threadId: string, + addLabelNames: string[], + removeLabelNames: string[], + ) { + try { + if (!this.driver) { + throw new Error('No driver available'); + } + + // Get all user labels to map names to IDs + const userLabels = await this.getUserLabels(); + const labelMap = new Map(userLabels.map((label) => [label.name.toLowerCase(), label.id])); + + // Convert label names to IDs + const addLabelIds: string[] = []; + const removeLabelIds: string[] = []; + + // Process add labels + for (const labelName of addLabelNames) { + const labelId = labelMap.get(labelName.toLowerCase()); + if (labelId) { + addLabelIds.push(labelId); + } else { + console.warn(`Label "${labelName}" not found in user labels`); + } + } + + // Process remove labels + for (const labelName of removeLabelNames) { + const labelId = labelMap.get(labelName.toLowerCase()); + if (labelId) { + removeLabelIds.push(labelId); + } else { + console.warn(`Label "${labelName}" not found in user labels`); + } + } + + // Call the existing function with IDs + return await this.modifyThreadLabelsInDB(threadId, addLabelIds, removeLabelIds); + } catch (error) { + console.error('Failed to modify thread labels by name:', error); + throw error; + } + } + + async modifyThreadLabelsInDB(threadId: string, addLabels: string[], removeLabels: string[]) { + try { + // Get current labels + const result = this.sql` + SELECT latest_label_ids + FROM threads + WHERE id = ${threadId} + LIMIT 1 + `; + + if (!result || result.length === 0) { + throw new Error(`Thread ${threadId} not found in database`); + } + + let currentLabels: string[]; + try { + currentLabels = JSON.parse(result[0].latest_label_ids || '[]') as string[]; + } catch (error) { + console.error(`Invalid JSON in latest_label_ids for thread ${threadId}:`, error); + currentLabels = []; + } + + // Apply label modifications + let updatedLabels = [...currentLabels]; + + // Remove labels + if (removeLabels.length > 0) { + updatedLabels = updatedLabels.filter((label) => !removeLabels.includes(label)); + } + + // Add labels (avoid duplicates) + if (addLabels.length > 0) { + for (const label of addLabels) { + if (!updatedLabels.includes(label)) { + updatedLabels.push(label); + } + } + } + + // Update the database + void this.sql` + UPDATE threads + SET latest_label_ids = ${JSON.stringify(updatedLabels)}, + updated_at = CURRENT_TIMESTAMP + WHERE id = ${threadId} + `; + + await this.agent?.broadcastChatMessage({ + type: OutgoingMessageType.Mail_Get, + threadId, + }); + + return { + success: true, + threadId, + previousLabels: currentLabels, + updatedLabels, + }; + } catch (error) { + console.error('Failed to modify thread labels in database:', error); + throw error; + } + } + async getThreadFromDB(id: string): Promise { try { const result = this.sql` diff --git a/apps/server/src/routes/agent/rpc.ts b/apps/server/src/routes/agent/rpc.ts index b36c0e1e12..aa77790a8c 100644 --- a/apps/server/src/routes/agent/rpc.ts +++ b/apps/server/src/routes/agent/rpc.ts @@ -100,9 +100,15 @@ export class DriverRpcDO extends RpcTarget { return result; } - async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) { + async modifyLabels( + threadIds: string[], + addLabelIds: string[], + removeLabelIds: string[], + skipSync: boolean = false, + ) { const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread({ threadId: id }))); + if (!skipSync) + await Promise.all(threadIds.map((id) => this.mainDo.syncThread({ threadId: id }))); return result; } @@ -160,6 +166,22 @@ export class DriverRpcDO extends RpcTarget { return await this.mainDo.create(data); } + async modifyThreadLabelsByName( + threadId: string, + addLabelNames: string[], + removeLabelNames: string[], + ) { + return await this.mainDo.modifyThreadLabelsByName(threadId, addLabelNames, removeLabelNames); + } + + async modifyThreadLabelsInDB( + threadId: string, + addLabelNames: string[], + removeLabelNames: string[], + ) { + return await this.mainDo.modifyThreadLabelsInDB(threadId, addLabelNames, removeLabelNames); + } + async delete(id: string) { const result = await this.mainDo.delete(id); await this.mainDo.deleteThread(id); diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc index ae0cff83f5..6ae420f957 100644 --- a/apps/server/wrangler.jsonc +++ b/apps/server/wrangler.jsonc @@ -114,7 +114,7 @@ "DROP_AGENT_TABLES": "false", "THREAD_SYNC_MAX_COUNT": "5", "THREAD_SYNC_LOOP": "false", - "DISABLE_WORKFLOWS": "true", + "DISABLE_WORKFLOWS": "false", "AUTORAG_ID": "", }, "kv_namespaces": [