diff --git a/apps/mail/components/create/email-composer.tsx b/apps/mail/components/create/email-composer.tsx index 75b37c99c8..54b1d98ff6 100644 --- a/apps/mail/components/create/email-composer.tsx +++ b/apps/mail/components/create/email-composer.tsx @@ -709,7 +709,7 @@ export function EmailComposer({ className, )} > -
+
{/* To, Cc, Bcc */}
diff --git a/apps/mail/components/mail/reply-composer.tsx b/apps/mail/components/mail/reply-composer.tsx index 8c9245fbbd..ea35fb4427 100644 --- a/apps/mail/components/mail/reply-composer.tsx +++ b/apps/mail/components/mail/reply-composer.tsx @@ -242,10 +242,10 @@ export default function ReplyCompose({ messageId }: ReplyComposeProps) { if (!mode || !emailData) return null; return ( -
+
{ setMode(null); diff --git a/apps/mail/components/party.tsx b/apps/mail/components/party.tsx index bd8e0c92a8..a297cf8b5f 100644 --- a/apps/mail/components/party.tsx +++ b/apps/mail/components/party.tsx @@ -45,7 +45,6 @@ export const NotificationProvider = () => { queryClient.invalidateQueries({ queryKey: trpc.mail.get.queryKey({ id: threadId }), }); - console.log('invalidated mail get', threadId); } else if (type === IncomingMessageType.Mail_List) { const { folder } = JSON.parse(message.data); queryClient.invalidateQueries({ diff --git a/apps/server/src/lib/sequential-thinking.ts b/apps/server/src/lib/sequential-thinking.ts new file mode 100644 index 0000000000..0c68fe91c2 --- /dev/null +++ b/apps/server/src/lib/sequential-thinking.ts @@ -0,0 +1,286 @@ +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import type { env } from 'cloudflare:workers'; +import { McpAgent } from 'agents/mcp'; + +interface ThoughtData { + thought: string; + thoughtNumber: number; + totalThoughts: number; + isRevision?: boolean; + revisesThought?: number; + branchFromThought?: number; + branchId?: string; + needsMoreThoughts?: boolean; + nextThoughtNeeded: boolean; +} + +interface SequentialThinkingParams { + thought: string; + nextThoughtNeeded: boolean; + thoughtNumber: number; + totalThoughts: number; + isRevision?: boolean; + revisesThought?: number; + branchFromThought?: number; + branchId?: string; + needsMoreThoughts?: boolean; +} + +export class SequentialThinkingProcessor { + private thoughtHistory: ThoughtData[] = []; + private branches: Record = {}; + private disableThoughtLogging: boolean; + + constructor() { + this.disableThoughtLogging = false; // Enable logging by default in Zero + } + + private validateThoughtData(input: SequentialThinkingParams): ThoughtData { + if (!input.thought || typeof input.thought !== 'string') { + throw new Error('Invalid thought: must be a string'); + } + if (!input.thoughtNumber || typeof input.thoughtNumber !== 'number') { + throw new Error('Invalid thoughtNumber: must be a number'); + } + if (!input.totalThoughts || typeof input.totalThoughts !== 'number') { + throw new Error('Invalid totalThoughts: must be a number'); + } + if (typeof input.nextThoughtNeeded !== 'boolean') { + throw new Error('Invalid nextThoughtNeeded: must be a boolean'); + } + + return { + thought: input.thought, + thoughtNumber: input.thoughtNumber, + totalThoughts: input.totalThoughts, + nextThoughtNeeded: input.nextThoughtNeeded, + isRevision: input.isRevision, + revisesThought: input.revisesThought, + branchFromThought: input.branchFromThought, + branchId: input.branchId, + needsMoreThoughts: input.needsMoreThoughts, + }; + } + + private formatThought(thoughtData: ThoughtData): string { + const { + thoughtNumber, + totalThoughts, + thought, + isRevision, + revisesThought, + branchFromThought, + branchId, + } = thoughtData; + + let prefix = ''; + let context = ''; + + if (isRevision) { + prefix = '🔄 Revision'; + context = ` (revising thought ${revisesThought})`; + } else if (branchFromThought) { + prefix = '🌿 Branch'; + context = ` (from thought ${branchFromThought}, ID: ${branchId})`; + } else { + prefix = '💭 Thought'; + context = ''; + } + + const header = `${prefix} ${thoughtNumber}/${totalThoughts}${context}`; + const border = '─'.repeat(Math.max(header.length, thought.length) + 4); + + return ` +┌${border}┐ +│ ${header} │ +├${border}┤ +│ ${thought.padEnd(border.length - 2)} │ +└${border}┘`; + } + + public processThought(input: SequentialThinkingParams) { + try { + const validatedInput = this.validateThoughtData(input); + + if (validatedInput.thoughtNumber > validatedInput.totalThoughts) { + validatedInput.totalThoughts = validatedInput.thoughtNumber; + } + + this.thoughtHistory.push(validatedInput); + + if (validatedInput.branchFromThought && validatedInput.branchId) { + if (!this.branches[validatedInput.branchId]) { + this.branches[validatedInput.branchId] = []; + } + this.branches[validatedInput.branchId].push(validatedInput); + } + + if (!this.disableThoughtLogging) { + const formattedThought = this.formatThought(validatedInput); + console.log(formattedThought); // Use console.log instead of console.error + } + + return { + content: [ + { + type: 'text' as const, + text: JSON.stringify( + { + thoughtNumber: validatedInput.thoughtNumber, + totalThoughts: validatedInput.totalThoughts, + nextThoughtNeeded: validatedInput.nextThoughtNeeded, + branches: Object.keys(this.branches), + thoughtHistoryLength: this.thoughtHistory.length, + }, + null, + 2, + ), + }, + ], + }; + } catch (error) { + return { + content: [ + { + type: 'text' as const, + text: JSON.stringify( + { + error: error instanceof Error ? error.message : String(error), + status: 'failed', + }, + null, + 2, + ), + }, + ], + isError: true, + }; + } + } + + public getThoughtHistory(): ThoughtData[] { + return this.thoughtHistory; + } + + public getBranches(): Record { + return this.branches; + } + + public reset(): void { + this.thoughtHistory = []; + this.branches = {}; + } +} + +export class ThinkingMCP extends McpAgent, { userId: string }> { + thinkingServer = new SequentialThinkingProcessor(); + server = new McpServer({ + name: 'thinking-mcp', + version: '1.0.0', + description: 'Thinking MCP', + }); + + async init(): Promise { + this.server.tool('Test', () => { + return { + content: [{ type: 'text' as const, text: 'Hello World' }], + }; + }); + + console.log('Here!'); + + // this.server.registerTool( + // 'sequentialthinking', + // { + // description: `A detailed tool for dynamic and reflective problem-solving through thoughts. + // This tool helps analyze problems through a flexible thinking process that can adapt and evolve. + // Each thought can build on, question, or revise previous insights as understanding deepens. + + // When to use this tool: + // - Breaking down complex problems into steps + // - Planning and design with room for revision + // - Analysis that might need course correction + // - Problems where the full scope might not be clear initially + // - Problems that require a multi-step solution + // - Tasks that need to maintain context over multiple steps + // - Situations where irrelevant information needs to be filtered out + + // Key features: + // - You can adjust total_thoughts up or down as you progress + // - You can question or revise previous thoughts + // - You can add more thoughts even after reaching what seemed like the end + // - You can express uncertainty and explore alternative approaches + // - Not every thought needs to build linearly - you can branch or backtrack + // - Generates a solution hypothesis + // - Verifies the hypothesis based on the Chain of Thought steps + // - Repeats the process until satisfied + // - Provides a correct answer + + // Parameters explained: + // - thought: Your current thinking step, which can include: + // * Regular analytical steps + // * Revisions of previous thoughts + // * Questions about previous decisions + // * Realizations about needing more analysis + // * Changes in approach + // * Hypothesis generation + // * Hypothesis verification + // - next_thought_needed: True if you need more thinking, even if at what seemed like the end + // - thought_number: Current number in sequence (can go beyond initial total if needed) + // - total_thoughts: Current estimate of thoughts needed (can be adjusted up/down) + // - is_revision: A boolean indicating if this thought revises previous thinking + // - revises_thought: If is_revision is true, which thought number is being reconsidered + // - branch_from_thought: If branching, which thought number is the branching point + // - branch_id: Identifier for the current branch (if any) + // - needs_more_thoughts: If reaching end but realizing more thoughts needed + + // You should: + // 1. Start with an initial estimate of needed thoughts, but be ready to adjust + // 2. Feel free to question or revise previous thoughts + // 3. Don't hesitate to add more thoughts if needed, even at the "end" + // 4. Express uncertainty when present + // 5. Mark thoughts that revise previous thinking or branch into new paths + // 6. Ignore information that is irrelevant to the current step + // 7. Generate a solution hypothesis when appropriate + // 8. Verify the hypothesis based on the Chain of Thought steps + // 9. Repeat the process until satisfied with the solution + // 10. Provide a single, ideally correct answer as the final output + // 11. Only set next_thought_needed to false when truly done and a satisfactory answer is reached`, + // inputSchema: { + // thought: z.string().describe('Your current thinking step'), + // nextThoughtNeeded: z.boolean().describe('Whether another thought step is needed'), + // thoughtNumber: z.number().int().min(1).describe('Current thought number'), + // totalThoughts: z.number().int().min(1).describe('Estimated total thoughts needed'), + // isRevision: z.boolean().optional().describe('Whether this revises previous thinking'), + // revisesThought: z + // .number() + // .int() + // .min(1) + // .optional() + // .describe('Which thought is being reconsidered'), + // branchFromThought: z + // .number() + // .int() + // .min(1) + // .optional() + // .describe('Branching point thought number'), + // branchId: z.string().optional().describe('Branch identifier'), + // needsMoreThoughts: z.boolean().optional().describe('If more thoughts are needed'), + // }, + // }, + // (params) => { + // return this.thinkingServer.processThought({ + // thought: params.thought, + // nextThoughtNeeded: params.nextThoughtNeeded, + // thoughtNumber: params.thoughtNumber, + // totalThoughts: params.totalThoughts, + // isRevision: params.isRevision, + // revisesThought: params.revisesThought, + // branchFromThought: params.branchFromThought, + // branchId: params.branchId, + // needsMoreThoughts: params.needsMoreThoughts, + // }); + // }, + // ); + } +} diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 6ed51c800a..da0156728e 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -1065,7 +1065,7 @@ export class ZeroAgent extends AIChatAgent { private chatMessageAbortControllers: Map = new Map(); async registerZeroMCP() { - await this.mcp.connect(env.VITE_PUBLIC_BACKEND_URL + '/sse', { + await this.mcp.connect(env.VITE_PUBLIC_BACKEND_URL + '/sse?mcpId=zero-mcp', { transport: { authProvider: new DurableObjectOAuthClientProvider( this.ctx.storage, @@ -1076,8 +1076,20 @@ export class ZeroAgent extends AIChatAgent { }); } + async registerThinkingMCP() { + await this.mcp.connect(env.VITE_PUBLIC_BACKEND_URL + '/sse?mcpId=thinking-mcp', { + transport: { + authProvider: new DurableObjectOAuthClientProvider( + this.ctx.storage, + 'thinking-mcp', + env.VITE_PUBLIC_BACKEND_URL, + ), + }, + }); + } + onStart(): void | Promise { - // this.registerZeroMCP(); + // this.registerThinkingMCP(); } private getDataStreamResponse( @@ -1091,11 +1103,14 @@ export class ZeroAgent extends AIChatAgent { if (this.name === 'general') return; const connectionId = this.name; const orchestrator = new ToolOrchestrator(dataStream, connectionId); - // const mcpTools = await this.mcp.unstable_getAITools(); + + // const mcpTools = this.mcp.unstable_getAITools(); const rawTools = { ...(await authTools(connectionId)), + // ...mcpTools, }; + const tools = orchestrator.processTools(rawTools); const processedMessages = await processToolCalls( { @@ -1106,8 +1121,13 @@ export class ZeroAgent extends AIChatAgent { {}, ); + const model = + env.USE_OPENAI === 'true' + ? openai(env.OPENAI_MODEL || 'gpt-4o') + : anthropic(env.OPENAI_MODEL || 'claude-3-7-sonnet-20250219'); + const result = streamText({ - model: anthropic(env.OPENAI_MODEL || 'claude-3-5-haiku-latest'), + model, maxSteps: 10, messages: processedMessages, tools, diff --git a/apps/server/src/trpc/trpc.ts b/apps/server/src/trpc/trpc.ts index 5f60481bef..9732bf9948 100644 --- a/apps/server/src/trpc/trpc.ts +++ b/apps/server/src/trpc/trpc.ts @@ -40,37 +40,47 @@ export const activeConnectionProcedure = privateProcedure.use(async ({ ctx, next } }); +const permissionErrors = ['precondition check', 'insufficient permission', 'invalid credentials']; + export const activeDriverProcedure = activeConnectionProcedure.use(async ({ ctx, next }) => { const { activeConnection, sessionUser } = ctx; const res = await next({ ctx: { ...ctx } }); - // This is for when the user has not granted the required scopes for GMail - if (!res.ok && res.error.message === 'Precondition check failed.') { - throw new TRPCError({ - code: 'BAD_REQUEST', - message: 'Required scopes missing', - cause: res.error, - }); - } - - if (!res.ok && res.error.message === 'invalid_grant') { - // Remove the access token and refresh token - const db = await getZeroDB(sessionUser.id); - await db.updateConnection(activeConnection.id, { - accessToken: null, - refreshToken: null, - }); + if (!res.ok) { + const errorMessage = res.error.message.toLowerCase(); - ctx.c.header( - 'X-Zero-Redirect', - `/settings/connections?disconnectedConnectionId=${activeConnection.id}`, + const isPermissionError = permissionErrors.some((errorType) => + errorMessage.includes(errorType), ); - throw new TRPCError({ - code: 'UNAUTHORIZED', - message: 'Connection expired. Please reconnect.', - cause: res.error, - }); + if (isPermissionError) { + throw new TRPCError({ + code: 'BAD_REQUEST', + message: 'Required scopes missing', + cause: res.error, + }); + } + + // Handle token expiration/refresh issues + if (errorMessage.includes('invalid_grant')) { + // Remove the access token and refresh token + const db = await getZeroDB(sessionUser.id); + await db.updateConnection(activeConnection.id, { + accessToken: null, + refreshToken: null, + }); + + ctx.c.header( + 'X-Zero-Redirect', + `/settings/connections?disconnectedConnectionId=${activeConnection.id}`, + ); + + throw new TRPCError({ + code: 'UNAUTHORIZED', + message: 'Connection expired. Please reconnect.', + cause: res.error, + }); + } } return res; diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc index 6ae420f957..46d73b88d5 100644 --- a/apps/server/wrangler.jsonc +++ b/apps/server/wrangler.jsonc @@ -116,6 +116,7 @@ "THREAD_SYNC_LOOP": "false", "DISABLE_WORKFLOWS": "false", "AUTORAG_ID": "", + "USE_OPENAI": "true", }, "kv_namespaces": [ {