diff --git a/apps/mail/components/mail/reply-composer.tsx b/apps/mail/components/mail/reply-composer.tsx
index d230a2f832..8c9245fbbd 100644
--- a/apps/mail/components/mail/reply-composer.tsx
+++ b/apps/mail/components/mail/reply-composer.tsx
@@ -242,10 +242,10 @@ export default function ReplyCompose({ messageId }: ReplyComposeProps) {
if (!mode || !emailData) return null;
return (
-
+
{
setMode(null);
diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts
index 798fdb8ea1..55e1297133 100644
--- a/apps/server/src/main.ts
+++ b/apps/server/src/main.ts
@@ -711,51 +711,43 @@ export default class extends WorkerEntrypoint {
switch (true) {
case batch.queue.startsWith('subscribe-queue'): {
console.log('batch', batch);
- try {
- await Promise.all(
- batch.messages.map(async (msg: Message) => {
- const connectionId = msg.body.connectionId;
- const providerId = msg.body.providerId;
- try {
- await enableBrainFunction({ id: connectionId, providerId });
- } catch (error) {
- console.error(
- `Failed to enable brain function for connection ${connectionId}:`,
- error,
- );
- }
- }),
- );
- console.log('[SUBSCRIBE_QUEUE] batch done');
- } finally {
- batch.ackAll();
- }
+ await Promise.all(
+ batch.messages.map(async (msg: Message) => {
+ const connectionId = msg.body.connectionId;
+ const providerId = msg.body.providerId;
+ try {
+ await enableBrainFunction({ id: connectionId, providerId });
+ } catch (error) {
+ console.error(
+ `Failed to enable brain function for connection ${connectionId}:`,
+ error,
+ );
+ }
+ }),
+ );
+ console.log('[SUBSCRIBE_QUEUE] batch done');
return;
}
case batch.queue.startsWith('thread-queue'): {
- try {
- await Promise.all(
- batch.messages.map(async (msg: Message) => {
- const providerId = msg.body.providerId;
- const historyId = msg.body.historyId;
- const subscriptionName = msg.body.subscriptionName;
- const workflow = runWorkflow(EWorkflowType.MAIN, {
- providerId,
- historyId,
- subscriptionName,
- });
-
- try {
- const result = await Effect.runPromise(workflow);
- console.log('[THREAD_QUEUE] result', result);
- } catch (error) {
- console.error('Error running workflow', error);
- }
- }),
- );
- } finally {
- batch.ackAll();
- }
+ await Promise.all(
+ batch.messages.map(async (msg: Message) => {
+ const providerId = msg.body.providerId;
+ const historyId = msg.body.historyId;
+ const subscriptionName = msg.body.subscriptionName;
+ const workflow = runWorkflow(EWorkflowType.MAIN, {
+ providerId,
+ historyId,
+ subscriptionName,
+ });
+
+ try {
+ const result = await Effect.runPromise(workflow);
+ console.log('[THREAD_QUEUE] result', result);
+ } catch (error) {
+ console.error('Error running workflow', error);
+ }
+ }),
+ );
break;
}
}
diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts
index 669add8fe8..b32e75083c 100644
--- a/apps/server/src/pipelines.effect.ts
+++ b/apps/server/src/pipelines.effect.ts
@@ -23,9 +23,9 @@ import {
analyzeEmailIntent,
} from './thread-workflow-utils';
import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types';
+import { EWorkflowType, getPromptName, runWorkflow } from './pipelines';
import { getZeroAgent } from './lib/server-utils';
import { type gmail_v1 } from '@googleapis/gmail';
-import { getPromptName } from './pipelines';
import { env } from 'cloudflare:workers';
import { connection } from './db/schema';
import { Effect, Console } from 'effect';
@@ -140,7 +140,7 @@ export const runMainWorkflow = (
nextHistoryId: historyId,
};
- const result = yield* runZeroWorkflow(zeroWorkflowParams).pipe(
+ const result = yield* runWorkflow(EWorkflowType.ZERO, zeroWorkflowParams).pipe(
Effect.mapError(
(error): MainWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, error }),
),
@@ -185,16 +185,22 @@ export const runZeroWorkflow = (
const { connectionId, historyId, nextHistoryId } = params;
const historyProcessingKey = `history_${connectionId}__${historyId}`;
- const isProcessing = yield* Effect.tryPromise({
- try: () => env.gmail_processing_threads.get(historyProcessingKey),
+
+ // Atomic lock acquisition to prevent race conditions
+ const lockAcquired = yield* Effect.tryPromise({
+ try: async () => {
+ const response = await env.gmail_processing_threads.put(historyProcessingKey, 'true', {
+ expirationTtl: 3600,
+ });
+ return response !== null; // null means key already existed
+ },
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});
- if (isProcessing === 'true') {
+ if (!lockAcquired) {
yield* Console.log('[ZERO_WORKFLOW] History already being processed:', {
connectionId,
historyId,
- processingStatus: isProcessing,
});
return yield* Effect.fail({
_tag: 'HistoryAlreadyProcessing' as const,
@@ -203,12 +209,10 @@ export const runZeroWorkflow = (
});
}
- yield* Effect.tryPromise({
- try: () =>
- env.gmail_processing_threads.put(historyProcessingKey, 'true', { expirationTtl: 3600 }),
- catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
- });
- yield* Console.log('[ZERO_WORKFLOW] Set processing flag for history:', historyProcessingKey);
+ yield* Console.log(
+ '[ZERO_WORKFLOW] Acquired processing lock for history:',
+ historyProcessingKey,
+ );
const { db, conn } = createDb(env.HYPERDRIVE.connectionString);
@@ -257,11 +261,6 @@ export const runZeroWorkflow = (
catch: (error) => ({ _tag: 'GmailApiError' as const, error }),
});
- if (!history.length) {
- yield* Console.log('[ZERO_WORKFLOW] No history found, skipping');
- return 'No history found';
- }
-
yield* Effect.tryPromise({
try: () => {
console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId);
@@ -270,6 +269,11 @@ export const runZeroWorkflow = (
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});
+ if (!history.length) {
+ yield* Console.log('[ZERO_WORKFLOW] No history found, skipping');
+ return 'No history found';
+ }
+
// Extract thread IDs from history
const threadsChanged = new Set();
const threadsAdded = new Set();
@@ -277,7 +281,7 @@ export const runZeroWorkflow = (
if (historyItem.messagesAdded) {
historyItem.messagesAdded.forEach((messageAdded) => {
if (messageAdded.message?.threadId) {
- threadsChanged.add(messageAdded.message.threadId);
+ // threadsChanged.add(messageAdded.message.threadId);
threadsAdded.add(messageAdded.message.threadId);
}
});
@@ -285,14 +289,14 @@ export const runZeroWorkflow = (
if (historyItem.labelsAdded) {
historyItem.labelsAdded.forEach((labelAdded) => {
if (labelAdded.message?.threadId) {
- threadsChanged.add(labelAdded.message.threadId);
+ // threadsChanged.add(labelAdded.message.threadId);
}
});
}
if (historyItem.labelsRemoved) {
historyItem.labelsRemoved.forEach((labelRemoved) => {
if (labelRemoved.message?.threadId) {
- threadsChanged.add(labelRemoved.message.threadId);
+ // threadsChanged.add(labelRemoved.message.threadId);
}
});
}
@@ -355,6 +359,17 @@ export const runZeroWorkflow = (
const threadResults = yield* Effect.all(
threadWorkflowParams.map((params) =>
Effect.gen(function* () {
+ // Check if thread is already processing
+ const isProcessing = yield* Effect.tryPromise({
+ try: () => env.gmail_processing_threads.get(params.threadId.toString()),
+ catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
+ });
+
+ if (isProcessing === 'true') {
+ yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId);
+ return 'Thread already processing';
+ }
+
// Set processing flag for thread
yield* Effect.tryPromise({
try: () => {
@@ -369,19 +384,8 @@ export const runZeroWorkflow = (
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});
- // Check if thread is already processing
- const isProcessing = yield* Effect.tryPromise({
- try: () => env.gmail_processing_threads.get(params.threadId.toString()),
- catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
- });
-
- if (isProcessing === 'true') {
- yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId);
- return 'Thread already processing';
- }
-
// Run the thread workflow
- return yield* runThreadWorkflow(params).pipe(
+ return yield* runWorkflow(EWorkflowType.THREAD, params).pipe(
Effect.mapError(
(error): ZeroWorkflowError => ({
_tag: 'WorkflowCreationFailed' as const,
@@ -394,19 +398,22 @@ export const runZeroWorkflow = (
{ concurrency: 1, discard: true }, // Process up to 5 threads concurrently
);
- yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length);
+ yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults);
} else {
yield* Console.log('[ZERO_WORKFLOW] No threads to process');
}
- // // Clean up processing flag
- // yield* Effect.tryPromise({
- // try: () => {
- // console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey);
- // return env.gmail_processing_threads.delete(historyProcessingKey);
- // },
- // catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
- // }).pipe(Effect.orElse(() => Effect.succeed(null)));
+ // Clean up processing flag
+ yield* Effect.tryPromise({
+ try: () => {
+ console.log(
+ '[ZERO_WORKFLOW] Clearing processing flag for history:',
+ historyProcessingKey,
+ );
+ return env.gmail_processing_threads.delete(historyProcessingKey);
+ },
+ catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
+ }).pipe(Effect.orElse(() => Effect.succeed(null)));
yield* Console.log('[ZERO_WORKFLOW] Processing complete');
return 'Zero workflow completed successfully';
@@ -887,8 +894,18 @@ export const runThreadWorkflow = (
add: labelsToAdd,
remove: labelsToRemove,
});
- await agent.modifyLabels([threadId.toString()], labelsToAdd, labelsToRemove);
- await agent.syncThread({ threadId: threadId.toString() });
+ await agent.modifyThreadLabelsInDB(
+ threadId.toString(),
+ labelsToAdd,
+ labelsToRemove,
+ );
+ await agent.modifyLabels(
+ [threadId.toString()],
+ labelsToAdd,
+ labelsToRemove,
+ true,
+ );
+ // await agent.syncThread({ threadId: threadId.toString() });
console.log('[THREAD_WORKFLOW] Successfully modified thread labels');
} else {
console.log('[THREAD_WORKFLOW] No label changes needed - labels already match');
diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts
index 00c30145e0..6ed51c800a 100644
--- a/apps/server/src/routes/agent/index.ts
+++ b/apps/server/src/routes/agent/index.ts
@@ -847,6 +847,116 @@ export class ZeroDriver extends AIChatAgent {
}
}
+ async modifyThreadLabelsByName(
+ threadId: string,
+ addLabelNames: string[],
+ removeLabelNames: string[],
+ ) {
+ try {
+ if (!this.driver) {
+ throw new Error('No driver available');
+ }
+
+ // Get all user labels to map names to IDs
+ const userLabels = await this.getUserLabels();
+ const labelMap = new Map(userLabels.map((label) => [label.name.toLowerCase(), label.id]));
+
+ // Convert label names to IDs
+ const addLabelIds: string[] = [];
+ const removeLabelIds: string[] = [];
+
+ // Process add labels
+ for (const labelName of addLabelNames) {
+ const labelId = labelMap.get(labelName.toLowerCase());
+ if (labelId) {
+ addLabelIds.push(labelId);
+ } else {
+ console.warn(`Label "${labelName}" not found in user labels`);
+ }
+ }
+
+ // Process remove labels
+ for (const labelName of removeLabelNames) {
+ const labelId = labelMap.get(labelName.toLowerCase());
+ if (labelId) {
+ removeLabelIds.push(labelId);
+ } else {
+ console.warn(`Label "${labelName}" not found in user labels`);
+ }
+ }
+
+ // Call the existing function with IDs
+ return await this.modifyThreadLabelsInDB(threadId, addLabelIds, removeLabelIds);
+ } catch (error) {
+ console.error('Failed to modify thread labels by name:', error);
+ throw error;
+ }
+ }
+
+ async modifyThreadLabelsInDB(threadId: string, addLabels: string[], removeLabels: string[]) {
+ try {
+ // Get current labels
+ const result = this.sql`
+ SELECT latest_label_ids
+ FROM threads
+ WHERE id = ${threadId}
+ LIMIT 1
+ `;
+
+ if (!result || result.length === 0) {
+ throw new Error(`Thread ${threadId} not found in database`);
+ }
+
+ let currentLabels: string[];
+ try {
+ currentLabels = JSON.parse(result[0].latest_label_ids || '[]') as string[];
+ } catch (error) {
+ console.error(`Invalid JSON in latest_label_ids for thread ${threadId}:`, error);
+ currentLabels = [];
+ }
+
+ // Apply label modifications
+ let updatedLabels = [...currentLabels];
+
+ // Remove labels
+ if (removeLabels.length > 0) {
+ updatedLabels = updatedLabels.filter((label) => !removeLabels.includes(label));
+ }
+
+ // Add labels (avoid duplicates)
+ if (addLabels.length > 0) {
+ for (const label of addLabels) {
+ if (!updatedLabels.includes(label)) {
+ updatedLabels.push(label);
+ }
+ }
+ }
+
+ // Update the database
+ void this.sql`
+ UPDATE threads
+ SET latest_label_ids = ${JSON.stringify(updatedLabels)},
+ updated_at = CURRENT_TIMESTAMP
+ WHERE id = ${threadId}
+ `;
+
+ await this.agent?.broadcastChatMessage({
+ type: OutgoingMessageType.Mail_Get,
+ threadId,
+ });
+
+ return {
+ success: true,
+ threadId,
+ previousLabels: currentLabels,
+ updatedLabels,
+ };
+ } catch (error) {
+ console.error('Failed to modify thread labels in database:', error);
+ throw error;
+ }
+ }
+
async getThreadFromDB(id: string): Promise {
try {
const result = this.sql`
diff --git a/apps/server/src/routes/agent/rpc.ts b/apps/server/src/routes/agent/rpc.ts
index b36c0e1e12..aa77790a8c 100644
--- a/apps/server/src/routes/agent/rpc.ts
+++ b/apps/server/src/routes/agent/rpc.ts
@@ -100,9 +100,15 @@ export class DriverRpcDO extends RpcTarget {
return result;
}
- async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) {
+ async modifyLabels(
+ threadIds: string[],
+ addLabelIds: string[],
+ removeLabelIds: string[],
+ skipSync: boolean = false,
+ ) {
const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds);
- await Promise.all(threadIds.map((id) => this.mainDo.syncThread({ threadId: id })));
+ if (!skipSync)
+ await Promise.all(threadIds.map((id) => this.mainDo.syncThread({ threadId: id })));
return result;
}
@@ -160,6 +166,22 @@ export class DriverRpcDO extends RpcTarget {
return await this.mainDo.create(data);
}
+ async modifyThreadLabelsByName(
+ threadId: string,
+ addLabelNames: string[],
+ removeLabelNames: string[],
+ ) {
+ return await this.mainDo.modifyThreadLabelsByName(threadId, addLabelNames, removeLabelNames);
+ }
+
+ async modifyThreadLabelsInDB(
+ threadId: string,
+ addLabelNames: string[],
+ removeLabelNames: string[],
+ ) {
+ return await this.mainDo.modifyThreadLabelsInDB(threadId, addLabelNames, removeLabelNames);
+ }
+
async delete(id: string) {
const result = await this.mainDo.delete(id);
await this.mainDo.deleteThread(id);
diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc
index ae0cff83f5..6ae420f957 100644
--- a/apps/server/wrangler.jsonc
+++ b/apps/server/wrangler.jsonc
@@ -114,7 +114,7 @@
"DROP_AGENT_TABLES": "false",
"THREAD_SYNC_MAX_COUNT": "5",
"THREAD_SYNC_LOOP": "false",
- "DISABLE_WORKFLOWS": "true",
+ "DISABLE_WORKFLOWS": "false",
"AUTORAG_ID": "",
},
"kv_namespaces": [
diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts
index 4fedeb9312..2d9ae21b38 100644
--- a/packages/cli/src/cli.ts
+++ b/packages/cli/src/cli.ts
@@ -5,8 +5,10 @@ let args = [];
if (process.argv.slice(2).length === 0) {
intro(`Welcome to the Nizzy CLI`);
+ const user = process.env.USER || process.env.USERNAME || 'there';
+
const command = await select({
- message: `Hey ${process.env.USER}, what do you want to do?`,
+ message: `Hey ${user}, what do you want to do?`,
options: Object.values(commands).map((command) => ({
label: command.description,
value: command.id,