From 213de4ab36b0e63468d29e00c536318b9d086e96 Mon Sep 17 00:00:00 2001 From: "pullfrog[bot]" <226033991+pullfrog[bot]@users.noreply.github.com> Date: Thu, 21 May 2026 05:45:03 +0000 Subject: [PATCH] chore: update vendored Effect source --- .../fix-bedrock-thinking-generate-object.md | 5 + .../forward-parent-pointer-discard.md | 5 + .../src/AmazonBedrockLanguageModel.ts | 791 ++++++++++-------- .../test/AmazonBedrockLanguageModel.test.ts | 203 +++++ .../test/ClusterWorkflowEngine.test.ts | 533 +++++++----- .../packages/workflow/src/WorkflowEngine.ts | 265 +++--- 6 files changed, 1135 insertions(+), 667 deletions(-) create mode 100644 repos/effect/.changeset/fix-bedrock-thinking-generate-object.md create mode 100644 repos/effect/.changeset/forward-parent-pointer-discard.md create mode 100644 repos/effect/packages/ai/amazon-bedrock/test/AmazonBedrockLanguageModel.test.ts diff --git a/repos/effect/.changeset/fix-bedrock-thinking-generate-object.md b/repos/effect/.changeset/fix-bedrock-thinking-generate-object.md new file mode 100644 index 0000000..04143c2 --- /dev/null +++ b/repos/effect/.changeset/fix-bedrock-thinking-generate-object.md @@ -0,0 +1,5 @@ +--- +"@effect/ai-amazon-bedrock": patch +--- + +`generateObject` no longer fails when extended thinking is configured via `withConfigOverride`. Anthropic's API rejects requests that set `thinking` in `additionalModelRequestFields` alongside a forced `toolChoice` — which `generateObject` always does. The fix strips `thinking` from `additionalModelRequestFields` in the `json` response format path before the request is sent. diff --git a/repos/effect/.changeset/forward-parent-pointer-discard.md b/repos/effect/.changeset/forward-parent-pointer-discard.md new file mode 100644 index 0000000..b55505e --- /dev/null +++ b/repos/effect/.changeset/forward-parent-pointer-discard.md @@ -0,0 +1,5 @@ +--- +"@effect/workflow": patch +--- + +Forward the parent pointer when spawning a child workflow with `discard: true` diff --git a/repos/effect/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts b/repos/effect/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts index dbcadd0..5bd08b3 100644 --- a/repos/effect/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts +++ b/repos/effect/packages/ai/amazon-bedrock/src/AmazonBedrockLanguageModel.ts @@ -64,9 +64,10 @@ export class Config extends Context.Tag( /** * @since 1.0.0 */ - static readonly getOrUndefined: Effect.Effect = Effect.map( - Effect.context(), - (context) => context.unsafeMap.get(Config.key) + static readonly getOrUndefined: Effect.Effect< + typeof Config.Service | undefined + > = Effect.map(Effect.context(), (context) => + context.unsafeMap.get(Config.key) ) } @@ -78,16 +79,11 @@ export declare namespace Config { * @since 1.0.0 * @category Configuration */ - export interface Service extends - Simplify< - Partial< - Omit< - typeof ConverseRequest.Encoded, - "messages" | "system" | "toolConfig" - > - > + export interface Service extends Simplify< + Partial< + Omit > - {} + > {} } // ============================================================================= @@ -98,61 +94,71 @@ export declare namespace Config { * @since 1.0.0 * @category Provider Options */ -export type AmazonBedrockReasoningInfo = { - readonly type: "thinking" - /** - * Thinking content as an encrypted string, which is used to verify - * that thinking content was indeed generated by Amazon Bedrock's API. - */ - readonly signature: string -} | { - readonly type: "redacted_thinking" - /** - * Thinking content which was flagged by Amazon Bedrock's safety systems, and - * was therefore encrypted. - */ - readonly redactedData: string -} +export type AmazonBedrockReasoningInfo = + | { + readonly type: "thinking" + /** + * Thinking content as an encrypted string, which is used to verify + * that thinking content was indeed generated by Amazon Bedrock's API. + */ + readonly signature: string + } + | { + readonly type: "redacted_thinking" + /** + * Thinking content which was flagged by Amazon Bedrock's safety systems, and + * was therefore encrypted. + */ + readonly redactedData: string + } declare module "@effect/ai/Prompt" { export interface SystemMessageOptions extends ProviderOptions { - readonly bedrock?: { - /** - * Defines a section of content to be cached for reuse in subsequent API - * calls. - */ - readonly cachePoint?: typeof CachePointBlock.Encoded | undefined - } | undefined + readonly bedrock?: + | { + /** + * Defines a section of content to be cached for reuse in subsequent API + * calls. + */ + readonly cachePoint?: typeof CachePointBlock.Encoded | undefined + } + | undefined } export interface UserMessageOptions extends ProviderOptions { - readonly bedrock?: { - /** - * Defines a section of content to be cached for reuse in subsequent API - * calls. - */ - readonly cachePoint?: typeof CachePointBlock.Encoded | undefined - } | undefined + readonly bedrock?: + | { + /** + * Defines a section of content to be cached for reuse in subsequent API + * calls. + */ + readonly cachePoint?: typeof CachePointBlock.Encoded | undefined + } + | undefined } export interface AssistantMessageOptions extends ProviderOptions { - readonly bedrock?: { - /** - * Defines a section of content to be cached for reuse in subsequent API - * calls. - */ - readonly cachePoint?: typeof CachePointBlock.Encoded | undefined - } | undefined + readonly bedrock?: + | { + /** + * Defines a section of content to be cached for reuse in subsequent API + * calls. + */ + readonly cachePoint?: typeof CachePointBlock.Encoded | undefined + } + | undefined } export interface ToolMessageOptions extends ProviderOptions { - readonly bedrock?: { - /** - * Defines a section of content to be cached for reuse in subsequent API - * calls. - */ - readonly cachePoint?: typeof CachePointBlock.Encoded | undefined - } | undefined + readonly bedrock?: + | { + /** + * Defines a section of content to be cached for reuse in subsequent API + * calls. + */ + readonly cachePoint?: typeof CachePointBlock.Encoded | undefined + } + | undefined } export interface ReasoningPartOptions extends ProviderOptions { @@ -166,12 +172,14 @@ declare module "@effect/ai/Response" { } export interface FinishPartMetadata extends ProviderMetadata { - readonly bedrock?: { - readonly trace?: ConverseTrace | undefined - readonly usage: { - readonly cacheWriteInputTokens?: number | undefined - } - } | undefined + readonly bedrock?: + | { + readonly trace?: ConverseTrace | undefined + readonly usage: { + readonly cacheWriteInputTokens?: number | undefined + } + } + | undefined } } @@ -186,83 +194,118 @@ declare module "@effect/ai/Response" { export const model = ( model: (string & {}) | Model, config?: Omit -): AiModel.Model<"amazon-bedrock", LanguageModel.LanguageModel, AmazonBedrockClient> => - AiModel.make("amazon-bedrock", layer({ model, config })) +): AiModel.Model< + "amazon-bedrock", + LanguageModel.LanguageModel, + AmazonBedrockClient +> => AiModel.make("amazon-bedrock", layer({ model, config })) /** * @since 1.0.0 * @category Constructors */ -export const make = Effect.fnUntraced(function*(options: { +export const make = Effect.fnUntraced(function* (options: { readonly model: (string & {}) | Model readonly config?: Omit }) { const client = yield* AmazonBedrockClient - const makeRequest = Effect.fnUntraced( - function*(providerOptions: LanguageModel.ProviderOptions) { - const context = yield* Effect.context() - const config = { modelId: options.model, ...options.config, ...context.unsafeMap.get(Config.key) } - const { messages, system } = yield* prepareMessages(providerOptions) - const { additionalTools, betas, toolConfig } = yield* prepareTools(providerOptions, config) - const responseFormat = providerOptions.responseFormat - const request: typeof ConverseRequest.Encoded = { - ...config, - system, - messages, - // Handle tool configuration - ...(responseFormat.type === "json" - ? { + const makeRequest = Effect.fnUntraced(function* ( + providerOptions: LanguageModel.ProviderOptions + ) { + const context = yield* Effect.context() + const config = { + modelId: options.model, + ...options.config, + ...context.unsafeMap.get(Config.key) + } + const { messages, system } = yield* prepareMessages(providerOptions) + const { additionalTools, betas, toolConfig } = yield* prepareTools( + providerOptions, + config + ) + const responseFormat = providerOptions.responseFormat + + // Anthropic rejects requests that combine extended thinking with forced tool use. + // generateObject always forces toolChoice, so strip "thinking" from the merged + // fields on the json path to prevent a 400 from the Bedrock Converse API. + let requestAdditionalFields: Record | undefined + if (responseFormat.type === "json") { + if ( + Predicate.isNotUndefined(config.additionalModelRequestFields) || + Predicate.isNotUndefined(additionalTools) + ) { + const { thinking: _thinking, ...rest } = { + ...config.additionalModelRequestFields, + ...additionalTools + } + requestAdditionalFields = rest + } + } else if (Predicate.isNotUndefined(additionalTools)) { + requestAdditionalFields = { + ...config.additionalModelRequestFields, + ...additionalTools + } + } + + const request: typeof ConverseRequest.Encoded = { + ...config, + system, + messages, + // Handle tool configuration + ...(responseFormat.type === "json" + ? { toolConfig: { - tools: [{ - toolSpec: { - name: responseFormat.objectName, - description: Tool.getDescriptionFromSchemaAst(responseFormat.schema.ast) ?? - "Respond with a JSON object", - inputSchema: { - json: Tool.getJsonSchemaFromSchemaAst(responseFormat.schema.ast) as any + tools: [ + { + toolSpec: { + name: responseFormat.objectName, + description: + Tool.getDescriptionFromSchemaAst( + responseFormat.schema.ast + ) ?? "Respond with a JSON object", + inputSchema: { + json: Tool.getJsonSchemaFromSchemaAst( + responseFormat.schema.ast + ) as any + } } } - }], + ], toolChoice: { tool: { name: responseFormat.objectName } } } } - : Predicate.isNotUndefined(toolConfig.tools) && toolConfig.tools.length > 0 + : Predicate.isNotUndefined(toolConfig.tools) && + toolConfig.tools.length > 0 ? { toolConfig } : {}), - // Handle additional model request fields - ...(Predicate.isNotUndefined(additionalTools) - ? { - additionalModelRequestFields: { - ...config.additionalModelRequestFields, - ...additionalTools - } - } - : {}) - } - return { betas, request } + // Handle additional model request fields + ...(Predicate.isNotUndefined(requestAdditionalFields) + ? { additionalModelRequestFields: requestAdditionalFields } + : {}) } - ) + return { betas, request } + }) return yield* LanguageModel.make({ - generateText: Effect.fnUntraced( - function*(options) { - const { betas, request } = yield* makeRequest(options) - annotateRequest(options.span, request) - const anthropicBeta = betas.size > 0 ? Array.from(betas).join(",") : undefined - const rawResponse = yield* client.converse({ - params: { "anthropic-beta": anthropicBeta }, - payload: request - }) - annotateResponse(options.span, request, rawResponse) - return yield* makeResponse(request, rawResponse, options) - } - ), + generateText: Effect.fnUntraced(function* (options) { + const { betas, request } = yield* makeRequest(options) + annotateRequest(options.span, request) + const anthropicBeta = + betas.size > 0 ? Array.from(betas).join(",") : undefined + const rawResponse = yield* client.converse({ + params: { "anthropic-beta": anthropicBeta }, + payload: request + }) + annotateResponse(options.span, request, rawResponse) + return yield* makeResponse(request, rawResponse, options) + }), streamText: Effect.fnUntraced( - function*(options) { + function* (options) { const { betas, request } = yield* makeRequest(options) annotateRequest(options.span, request) - const anthropicBeta = betas.size > 0 ? Array.from(betas).join(",") : undefined + const anthropicBeta = + betas.size > 0 ? Array.from(betas).join(",") : undefined const stream = client.converseStream({ params: { "anthropic-beta": anthropicBeta }, payload: request @@ -271,7 +314,9 @@ export const make = Effect.fnUntraced(function*(options: { }, (effect, options) => effect.pipe( - Effect.flatMap(({ request, stream }) => makeStreamResponse(request, stream, options)), + Effect.flatMap(({ request, stream }) => + makeStreamResponse(request, stream, options) + ), Stream.unwrap, Stream.map((response) => { annotateStreamResponse(options.span, response) @@ -290,212 +335,240 @@ export const layer = (options: { readonly model: (string & {}) | Model readonly config?: Omit }): Layer.Layer => - Layer.effect(LanguageModel.LanguageModel, make({ model: options.model, config: options.config })) + Layer.effect( + LanguageModel.LanguageModel, + make({ model: options.model, config: options.config }) + ) /** * @since 1.0.0 * @category Configuration */ export const withConfigOverride: { - (config: Config.Service): (self: Effect.Effect) => Effect.Effect - (self: Effect.Effect, config: Config.Service): Effect.Effect + ( + config: Config.Service + ): (self: Effect.Effect) => Effect.Effect + ( + self: Effect.Effect, + config: Config.Service + ): Effect.Effect } = dual< - (config: Config.Service) => (self: Effect.Effect) => Effect.Effect, - (self: Effect.Effect, config: Config.Service) => Effect.Effect + ( + config: Config.Service + ) => (self: Effect.Effect) => Effect.Effect, + ( + self: Effect.Effect, + config: Config.Service + ) => Effect.Effect >(2, (self, overrides) => - Effect.flatMap( - Config.getOrUndefined, - (config) => Effect.provideService(self, Config, { ...config, ...overrides }) - )) + Effect.flatMap(Config.getOrUndefined, (config) => + Effect.provideService(self, Config, { ...config, ...overrides }) + ) +) // ============================================================================= // Prompt Conversion // ============================================================================= -const prepareMessages: (options: LanguageModel.ProviderOptions) => Effect.Effect<{ - readonly system: ReadonlyArray - readonly messages: ReadonlyArray -}, AiError.AiError> = Effect.fnUntraced( - function*(options) { - const groups = groupMessages(options.prompt) - - const system: Array = [] - const messages: Array = [] - - let documentCounter = 0 - const nextDocumentName = () => `document-${++documentCounter}` - - for (let i = 0; i < groups.length; i++) { - const group = groups[i] - const isLastGroup = i === groups.length - 1 - - switch (group.type) { - case "system": { - if (messages.length > 0) { - return yield* new AiError.MalformedInput({ - module: "AmazonBedrockLanguageModel", - method: "prepareMessages", - description: "Multiple system messages separated by user / assistant messages" - }) - } - for (const message of group.messages) { - system.push({ text: message.content }) - if (Predicate.isNotUndefined(getCachePoint(message))) { - system.push(BEDROCK_CACHE_POINT) - } +const prepareMessages: ( + options: LanguageModel.ProviderOptions +) => Effect.Effect< + { + readonly system: ReadonlyArray + readonly messages: ReadonlyArray + }, + AiError.AiError +> = Effect.fnUntraced(function* (options) { + const groups = groupMessages(options.prompt) + + const system: Array = [] + const messages: Array = [] + + let documentCounter = 0 + const nextDocumentName = () => `document-${++documentCounter}` + + for (let i = 0; i < groups.length; i++) { + const group = groups[i] + const isLastGroup = i === groups.length - 1 + + switch (group.type) { + case "system": { + if (messages.length > 0) { + return yield* new AiError.MalformedInput({ + module: "AmazonBedrockLanguageModel", + method: "prepareMessages", + description: + "Multiple system messages separated by user / assistant messages" + }) + } + for (const message of group.messages) { + system.push({ text: message.content }) + if (Predicate.isNotUndefined(getCachePoint(message))) { + system.push(BEDROCK_CACHE_POINT) } - break } + break + } - case "user": { - const content: Array = [] + case "user": { + const content: Array = [] - for (const message of group.messages) { - switch (message.role) { - case "user": { - for (let j = 0; j < message.content.length; j++) { - const part = message.content[j] + for (const message of group.messages) { + switch (message.role) { + case "user": { + for (let j = 0; j < message.content.length; j++) { + const part = message.content[j] - switch (part.type) { - case "text": { - content.push({ text: part.text }) - break - } + switch (part.type) { + case "text": { + content.push({ text: part.text }) + break + } - case "file": { - if (part.data instanceof URL) { - // TODO(Max): support this - return yield* new AiError.MalformedInput({ - module: "AmazonBedrockLanguageModel", - method: "prepareMessages", - description: "File URL inputs are not supported at this time" - }) - } - if (part.mediaType.startsWith("image/")) { - content.push({ - image: { - format: yield* getImageFormat(part.mediaType), - source: { bytes: convertToBase64(part.data) } - } - }) - } else { - content.push({ - document: { - format: yield* getDocumentFormat(part.mediaType), - name: nextDocumentName(), - source: { bytes: convertToBase64(part.data) } - } - }) - } - break + case "file": { + if (part.data instanceof URL) { + // TODO(Max): support this + return yield* new AiError.MalformedInput({ + module: "AmazonBedrockLanguageModel", + method: "prepareMessages", + description: + "File URL inputs are not supported at this time" + }) + } + if (part.mediaType.startsWith("image/")) { + content.push({ + image: { + format: yield* getImageFormat(part.mediaType), + source: { bytes: convertToBase64(part.data) } + } + }) + } else { + content.push({ + document: { + format: yield* getDocumentFormat(part.mediaType), + name: nextDocumentName(), + source: { bytes: convertToBase64(part.data) } + } + }) } + break } } - break } + break + } - case "tool": { - for (const part of message.content) { - content.push({ - toolResult: { - toolUseId: part.id, - content: [{ text: JSON.stringify(part.result) }] - } - }) - } - break + case "tool": { + for (const part of message.content) { + content.push({ + toolResult: { + toolUseId: part.id, + content: [{ text: JSON.stringify(part.result) }] + } + }) } + break } + } - if (getCachePoint(message)) { - content.push(BEDROCK_CACHE_POINT) - } + if (getCachePoint(message)) { + content.push(BEDROCK_CACHE_POINT) } + } - messages.push({ role: "user", content }) + messages.push({ role: "user", content }) - break - } + break + } - case "assistant": { - const content: Array = [] + case "assistant": { + const content: Array = [] - for (let j = 0; j < group.messages.length; j++) { - const message = group.messages[j] - const isLastMessage = j === group.messages.length - 1 + for (let j = 0; j < group.messages.length; j++) { + const message = group.messages[j] + const isLastMessage = j === group.messages.length - 1 - for (let k = 0; k < message.content.length; k++) { - const part = message.content[k] - const isLastPart = k === message.content.length - 1 + for (let k = 0; k < message.content.length; k++) { + const part = message.content[k] + const isLastPart = k === message.content.length - 1 - switch (part.type) { - case "text": { - // Skip empty text blocks - if (part.text.trim().length === 0) { - break - } - content.push({ - // Amazon Bedrock does not allow trailing whitespace in - // assistant content blocks - text: trimIfLast(isLastGroup, isLastMessage, isLastPart, part.text) - }) + switch (part.type) { + case "text": { + // Skip empty text blocks + if (part.text.trim().length === 0) { break } + content.push({ + // Amazon Bedrock does not allow trailing whitespace in + // assistant content blocks + text: trimIfLast( + isLastGroup, + isLastMessage, + isLastPart, + part.text + ) + }) + break + } - case "reasoning": { - const options = part.options.bedrock - if (Predicate.isNotUndefined(options)) { - if (options.type === "thinking") { - content.push({ - reasoningContent: { - reasoningText: { - // Amazon Bedrock does not allow trailing whitespace in - // assistant content blocks - text: trimIfLast(isLastGroup, isLastMessage, isLastPart, part.text), - signature: options.signature - } + case "reasoning": { + const options = part.options.bedrock + if (Predicate.isNotUndefined(options)) { + if (options.type === "thinking") { + content.push({ + reasoningContent: { + reasoningText: { + // Amazon Bedrock does not allow trailing whitespace in + // assistant content blocks + text: trimIfLast( + isLastGroup, + isLastMessage, + isLastPart, + part.text + ), + signature: options.signature } - }) - } - if (options.type === "redacted_thinking") { - content.push({ - reasoningContent: { - redactedContent: options.redactedData - } - }) - } + } + }) + } + if (options.type === "redacted_thinking") { + content.push({ + reasoningContent: { + redactedContent: options.redactedData + } + }) } - break } + break + } - case "tool-call": { - content.push({ - toolUse: { - toolUseId: part.id, - name: part.name, - input: part.params - } - }) - break - } + case "tool-call": { + content.push({ + toolUse: { + toolUseId: part.id, + name: part.name, + input: part.params + } + }) + break } } + } - if (getCachePoint(message)) { - content.push(BEDROCK_CACHE_POINT) - } + if (getCachePoint(message)) { + content.push(BEDROCK_CACHE_POINT) } + } - messages.push({ role: "assistant", content }) + messages.push({ role: "assistant", content }) - break - } + break } } - - return { system, messages } } -) + + return { system, messages } +}) // ============================================================================= // Response Conversion @@ -509,7 +582,7 @@ const makeResponse: ( Array, never, IdGenerator.IdGenerator -> = Effect.fnUntraced(function*(request, response, options) { +> = Effect.fnUntraced(function* (request, response, options) { const parts: Array = [] parts.push({ @@ -540,8 +613,8 @@ const makeResponse: ( parts.push({ type: "reasoning", text: part.reasoningContent.reasoningText.text, - metadata: Predicate.isNotUndefined(signature) ? - { bedrock: { type: "thinking", signature } } + metadata: Predicate.isNotUndefined(signature) + ? { bedrock: { type: "thinking", signature } } : undefined }) } @@ -569,9 +642,15 @@ const makeResponse: ( text: JSON.stringify(part.toolUse.input) }) } else { - const providerTool = AnthropicTool.getProviderDefinedToolName(part.toolUse.name) - const name = Predicate.isNotUndefined(providerTool) ? providerTool : part.toolUse.name - const providerName = Predicate.isNotUndefined(providerTool) ? part.toolUse.name : undefined + const providerTool = AnthropicTool.getProviderDefinedToolName( + part.toolUse.name + ) + const name = Predicate.isNotUndefined(providerTool) + ? providerTool + : part.toolUse.name + const providerName = Predicate.isNotUndefined(providerTool) + ? part.toolUse.name + : undefined parts.push({ type: "tool-call", id: part.toolUse.toolUseId, @@ -586,7 +665,9 @@ const makeResponse: ( } } - const finishReason = InternalUtilities.resolveFinishReason(response.stopReason) + const finishReason = InternalUtilities.resolveFinishReason( + response.stopReason + ) parts.push({ type: "finish", @@ -616,17 +697,16 @@ const makeStreamResponse: ( Stream.Stream, never, IdGenerator.IdGenerator -> = Effect.fnUntraced( - function*(request, stream, options) { - const contentBlocks: Record< - number, - | { +> = Effect.fnUntraced(function* (request, stream, options) { + const contentBlocks: Record< + number, + | { readonly type: "text" } - | { + | { readonly type: "reasoning" } - | { + | { readonly type: "tool-call" readonly id: string readonly name: string @@ -634,18 +714,19 @@ const makeStreamResponse: ( readonly providerName?: string | undefined readonly providerExecuted: boolean } - > = {} - - let trace: ConverseTrace | undefined = undefined - let cacheWriteInputTokens: number | undefined = undefined - const usage: Mutable = { - inputTokens: undefined, - outputTokens: undefined, - totalTokens: undefined - } + > = {} + + let trace: ConverseTrace | undefined = undefined + let cacheWriteInputTokens: number | undefined = undefined + const usage: Mutable = { + inputTokens: undefined, + outputTokens: undefined, + totalTokens: undefined + } - return stream.pipe( - Stream.mapEffect(Effect.fnUntraced(function*(event) { + return stream.pipe( + Stream.mapEffect( + Effect.fnUntraced(function* (event) { const parts: Array = [] switch (event.type) { @@ -659,7 +740,9 @@ const makeStreamResponse: ( } case "messageStop": { - const reason = InternalUtilities.resolveFinishReason(event.messageStop.stopReason) + const reason = InternalUtilities.resolveFinishReason( + event.messageStop.stopReason + ) parts.push({ type: "finish", reason, @@ -678,9 +761,14 @@ const makeStreamResponse: ( if (Predicate.isNotUndefined(block.start.toolUse)) { const toolUse = block.start.toolUse const toolName = toolUse.name - const providerTool = AnthropicTool.getProviderDefinedToolName(toolName) - const name = Predicate.isNotUndefined(providerTool) ? providerTool : toolName - const providerName = Predicate.isNotUndefined(providerTool) ? toolName : undefined + const providerTool = + AnthropicTool.getProviderDefinedToolName(toolName) + const name = Predicate.isNotUndefined(providerTool) + ? providerTool + : toolName + const providerName = Predicate.isNotUndefined(providerTool) + ? toolName + : undefined contentBlocks[index] = { type: "tool-call", @@ -782,7 +870,10 @@ const makeStreamResponse: ( case "toolUse": { const block = contentBlocks[index] - if (Predicate.isNotUndefined(block) && block.type === "tool-call") { + if ( + Predicate.isNotUndefined(block) && + block.type === "tool-call" + ) { const params = delta.toolUse.input // Only emit tool params delta events for text responses if (options.responseFormat.type === "text") { @@ -842,7 +933,8 @@ const makeStreamResponse: ( new AiError.MalformedOutput({ module: "AmazonBedrockLanguageModel", method: "makeStreamResponse", - description: "Failed to securely parse tool call parameters " + + description: + "Failed to securely parse tool call parameters " + `for tool '${toolName}':\nParameters: ${toolParams}`, cause }) @@ -884,7 +976,11 @@ const makeStreamResponse: ( usage.outputTokens = event.metadata.usage.outputTokens usage.totalTokens = event.metadata.usage.totalTokens usage.cachedInputTokens = event.metadata.usage.cacheReadInputTokens - if (Predicate.isNotUndefined(event.metadata.usage.cacheWriteInputTokens)) { + if ( + Predicate.isNotUndefined( + event.metadata.usage.cacheWriteInputTokens + ) + ) { cacheWriteInputTokens = event.metadata.usage.cacheWriteInputTokens } if (Predicate.isNotUndefined(event.metadata.trace)) { @@ -899,12 +995,18 @@ const makeStreamResponse: ( } case "modelStreamErrorException": { - parts.push({ type: "error", error: event.modelStreamErrorException }) + parts.push({ + type: "error", + error: event.modelStreamErrorException + }) break } case "serviceUnavailableException": { - parts.push({ type: "error", error: event.serviceUnavailableException }) + parts.push({ + type: "error", + error: event.serviceUnavailableException + }) break } @@ -920,11 +1022,11 @@ const makeStreamResponse: ( } return parts - })), - Stream.flattenIterables - ) - } -) + }) + ), + Stream.flattenIterables + ) +}) // ============================================================================= // Tool Calling @@ -933,11 +1035,14 @@ const makeStreamResponse: ( const prepareTools: ( options: LanguageModel.ProviderOptions, config: Config.Service -) => Effect.Effect<{ - readonly betas: ReadonlySet - readonly toolConfig: Partial - readonly additionalTools?: Record | undefined -}, AiError.AiError> = Effect.fnUntraced(function*(options, config) { +) => Effect.Effect< + { + readonly betas: ReadonlySet + readonly toolConfig: Partial + readonly additionalTools?: Record | undefined + }, + AiError.AiError +> = Effect.fnUntraced(function* (options, config) { const betas = new Set() if (options.tools.length === 0) { @@ -1006,7 +1111,11 @@ const prepareTools: ( // The tool choice for Anthropic provider-defined tools is resolved above and // inserted into the additional tool configuration object. let toolChoice: typeof ToolChoice.Encoded | undefined = undefined - if (!hasAnthropicTools && tools.length > 0 && Predicate.isNotUndefined(options.toolChoice)) { + if ( + !hasAnthropicTools && + tools.length > 0 && + Predicate.isNotUndefined(options.toolChoice) + ) { if (options.toolChoice === "none") { tools.length = 0 toolChoice = undefined @@ -1019,13 +1128,13 @@ const prepareTools: ( } else { const allowedTools = new Set(options.toolChoice.oneOf) tools = tools.filter((tool) => allowedTools.has(tool.toolSpec?.name)) - toolChoice = options.toolChoice.mode === "auto" ? { auto: {} } : { any: {} } + toolChoice = + options.toolChoice.mode === "auto" ? { auto: {} } : { any: {} } } } - const toolConfig: Partial = tools.length > 0 - ? { tools, toolChoice } - : {} + const toolConfig: Partial = + tools.length > 0 ? { tools, toolChoice } : {} return { additionalTools, betas, toolConfig } }) @@ -1068,7 +1177,10 @@ const annotateResponse = ( }) } -const annotateStreamResponse = (span: Span, part: Response.StreamPartEncoded) => { +const annotateStreamResponse = ( + span: Span, + part: Response.StreamPartEncoded +) => { if (part.type === "response-metadata") { addGenAIAnnotations(span, { response: { @@ -1094,7 +1206,10 @@ const annotateStreamResponse = (span: Span, part: Response.StreamPartEncoded) => // Utilities // ============================================================================= -type ContentGroup = SystemMessageGroup | AssistantMessageGroup | UserMessageGroup +type ContentGroup = + | SystemMessageGroup + | AssistantMessageGroup + | UserMessageGroup interface SystemMessageGroup { readonly type: "system" @@ -1156,7 +1271,7 @@ const trimIfLast = ( isLastMessage: boolean, isLastPart: boolean, text: string -) => isLastGroup && isLastMessage && isLastPart ? text.trim() : text +) => (isLastGroup && isLastMessage && isLastPart ? text.trim() : text) const getCachePoint = ( part: @@ -1164,7 +1279,8 @@ const getCachePoint = ( | Prompt.UserMessage | Prompt.AssistantMessage | Prompt.ToolMessage -): typeof CachePointBlock.Encoded | undefined => part.options.bedrock?.cachePoint +): typeof CachePointBlock.Encoded | undefined => + part.options.bedrock?.cachePoint const convertToBase64 = (data: string | Uint8Array): string => typeof data === "string" ? data : Encoding.encodeBase64(data) @@ -1173,7 +1289,8 @@ const DOCUMENT_MIME_TYPES: Record = { "application/pdf": "pdf", "text/csv": "csv", "application/msword": "doc", - "application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": + "docx", "application/vnd.ms-excel": "xls", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx", "text/html": "html", @@ -1183,38 +1300,40 @@ const DOCUMENT_MIME_TYPES: Record = { const getDocumentFormat: ( mediaType: string -) => Effect.Effect = Effect.fnUntraced( - function*(mediaType) { +) => Effect.Effect = + Effect.fnUntraced(function* (mediaType) { const format = DOCUMENT_MIME_TYPES[mediaType] if (Predicate.isUndefined(format)) { return yield* new AiError.MalformedInput({ module: "AmazonBedrockLanguageModel", method: "getDocumentFormat", - description: `Unsupported document MIME type: ${mediaType} - expected ` + + description: + `Unsupported document MIME type: ${mediaType} - expected ` + `one of: ${Object.keys(DOCUMENT_MIME_TYPES)}` }) } return format - } -) + }) const getImageFormat: ( mediaType: string -) => Effect.Effect = Effect.fnUntraced( - function*(mediaType) { - const format = ImageFormat.literals.find((format) => mediaType === `image/${format}`) +) => Effect.Effect = + Effect.fnUntraced(function* (mediaType) { + const format = ImageFormat.literals.find( + (format) => mediaType === `image/${format}` + ) if (Predicate.isUndefined(format)) { return yield* new AiError.MalformedInput({ module: "AmazonBedrockLanguageModel", method: "getImageFormat", - description: `Unsupported image MIME type: ${mediaType} - expected ` + + description: + `Unsupported image MIME type: ${mediaType} - expected ` + `one of: ${ImageFormat.literals.map((format) => `image/${format}`).join(",")}` }) } return format - } -) + }) diff --git a/repos/effect/packages/ai/amazon-bedrock/test/AmazonBedrockLanguageModel.test.ts b/repos/effect/packages/ai/amazon-bedrock/test/AmazonBedrockLanguageModel.test.ts new file mode 100644 index 0000000..32f2a8e --- /dev/null +++ b/repos/effect/packages/ai/amazon-bedrock/test/AmazonBedrockLanguageModel.test.ts @@ -0,0 +1,203 @@ +import * as LanguageModel from "@effect/ai/LanguageModel" +import { assert, describe, it } from "@effect/vitest" +import * as Effect from "effect/Effect" +import * as Layer from "effect/Layer" +import * as Schema from "effect/Schema" +import { AmazonBedrockClient } from "../src/AmazonBedrockClient.js" +import * as AmazonBedrockLanguageModel from "../src/AmazonBedrockLanguageModel.js" +import { ConverseResponse } from "../src/AmazonBedrockSchema.js" +import type { ConverseRequest } from "../src/AmazonBedrockSchema.js" + +// --------------------------------------------------------------------------- +// Schema decoders +// --------------------------------------------------------------------------- + +const decodeConverseResponse = Schema.decodeUnknownSync(ConverseResponse) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +const TestModel = "us.anthropic.claude-3-5-sonnet-20241022-v2:0" as const +const TestObjectName = "MockResult" +const MockResultSchema = Schema.Struct({ field: Schema.String }) + +const textResponse = () => + decodeConverseResponse({ + output: { + message: { + role: "assistant", + content: [{ text: "hello" }] + } + }, + metrics: { latencyMs: 100 }, + stopReason: "end_turn", + usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 } + }) + +const toolUseResponse = (name: string) => + decodeConverseResponse({ + output: { + message: { + role: "assistant", + content: [ + { + toolUse: { + toolUseId: "test-id", + name, + input: { field: "value" } + } + } + ] + } + }, + metrics: { latencyMs: 100 }, + stopReason: "tool_use", + usage: { inputTokens: 10, outputTokens: 5, totalTokens: 15 } + }) + +const makeCapturingLayer = ( + captured: Array, + response: ConverseResponse +) => + AmazonBedrockLanguageModel.layer({ model: TestModel }).pipe( + Layer.provide( + Layer.succeed(AmazonBedrockClient, { + client: null as any, + streamRequest: null as any, + converse: (opts) => + Effect.sync(() => { + captured.push(opts.payload) + return response + }), + converseStream: null as any + }) + ) + ) + +// --------------------------------------------------------------------------- +// makeRequest — request construction tests +// --------------------------------------------------------------------------- + +describe("AmazonBedrockLanguageModel", () => { + describe("makeRequest / additionalModelRequestFields", () => { + it.effect( + "strips thinking when using generateObject (forced toolChoice.tool)", + () => + Effect.gen(function* () { + const captured: Array = [] + + yield* LanguageModel.generateObject({ + prompt: [], + schema: MockResultSchema, + objectName: TestObjectName + }).pipe( + AmazonBedrockLanguageModel.withConfigOverride({ + additionalModelRequestFields: { + thinking: { type: "enabled", budget_tokens: 5000 } + } + }), + Effect.provide( + makeCapturingLayer(captured, toolUseResponse(TestObjectName)) + ) + ) + + assert.strictEqual(captured.length, 1) + const req = captured[0] + + // Must force tool use for generateObject + assert.deepStrictEqual(req.toolConfig?.toolChoice, { + tool: { name: TestObjectName } + }) + + // Anthropic rejects thinking + forced tool use — must be stripped + assert.isUndefined(req.additionalModelRequestFields?.["thinking"]) + }) + ) + + it.effect( + "preserves thinking when using generateText (no forced toolChoice)", + () => + Effect.gen(function* () { + const captured: Array = [] + + yield* LanguageModel.generateText({ prompt: [] }).pipe( + AmazonBedrockLanguageModel.withConfigOverride({ + additionalModelRequestFields: { + thinking: { type: "enabled", budget_tokens: 5000 } + } + }), + Effect.provide(makeCapturingLayer(captured, textResponse())) + ) + + assert.strictEqual(captured.length, 1) + const req = captured[0] + + // thinking must flow through for non-forced-tool-use requests + assert.deepStrictEqual( + req.additionalModelRequestFields?.["thinking"], + { + type: "enabled", + budget_tokens: 5000 + } + ) + }) + ) + + it.effect( + "does not set additionalModelRequestFields when none configured and using generateObject", + () => + Effect.gen(function* () { + const captured: Array = [] + + yield* LanguageModel.generateObject({ + prompt: [], + schema: MockResultSchema, + objectName: TestObjectName + }).pipe( + Effect.provide( + makeCapturingLayer(captured, toolUseResponse(TestObjectName)) + ) + ) + + assert.strictEqual(captured.length, 1) + // No additionalModelRequestFields should be injected when none was configured + assert.isUndefined(captured[0].additionalModelRequestFields) + }) + ) + + it.effect( + "preserves non-thinking fields in additionalModelRequestFields with generateObject", + () => + Effect.gen(function* () { + const captured: Array = [] + + yield* LanguageModel.generateObject({ + prompt: [], + schema: MockResultSchema, + objectName: TestObjectName + }).pipe( + AmazonBedrockLanguageModel.withConfigOverride({ + additionalModelRequestFields: { + thinking: { type: "enabled", budget_tokens: 5000 }, + someOtherField: "preserved" + } + }), + Effect.provide( + makeCapturingLayer(captured, toolUseResponse(TestObjectName)) + ) + ) + + assert.strictEqual(captured.length, 1) + const req = captured[0] + + // thinking stripped, other fields survive + assert.isUndefined(req.additionalModelRequestFields?.["thinking"]) + assert.strictEqual( + req.additionalModelRequestFields?.["someOtherField"], + "preserved" + ) + }) + ) + }) +}) diff --git a/repos/effect/packages/cluster/test/ClusterWorkflowEngine.test.ts b/repos/effect/packages/cluster/test/ClusterWorkflowEngine.test.ts index dc4a931..293e401 100644 --- a/repos/effect/packages/cluster/test/ClusterWorkflowEngine.test.ts +++ b/repos/effect/packages/cluster/test/ClusterWorkflowEngine.test.ts @@ -1,6 +1,17 @@ -import { ClusterWorkflowEngine, MessageStorage, Runners, Sharding, ShardingConfig } from "@effect/cluster" +import { + ClusterWorkflowEngine, + MessageStorage, + Runners, + Sharding, + ShardingConfig +} from "@effect/cluster" import { assert, describe, expect, it } from "@effect/vitest" -import { Activity, DurableClock, DurableDeferred, Workflow } from "@effect/workflow" +import { + Activity, + DurableClock, + DurableDeferred, + Workflow +} from "@effect/workflow" import { WorkflowInstance } from "@effect/workflow/WorkflowEngine" import { DateTime, Effect, Exit, Fiber, Layer, Schema, TestClock } from "effect" import * as Cause from "effect/Cause" @@ -10,7 +21,7 @@ import * as RunnerStorage from "../src/RunnerStorage.js" describe.concurrent("ClusterWorkflowEngine", () => { it.effect("should run a workflow", () => - Effect.gen(function*() { + Effect.gen(function* () { const sharding = yield* Sharding.Sharding const driver = yield* MessageStorage.MemoryDriver const flags = yield* Flags @@ -47,7 +58,10 @@ describe.concurrent("ClusterWorkflowEngine", () => { // --- resume the workflow using DurableDeferred.done const token = yield* DurableDeferred.token(EmailTrigger).pipe( - Effect.provideService(WorkflowInstance, WorkflowInstance.initial(EmailWorkflow, executionId)) + Effect.provideService( + WorkflowInstance, + WorkflowInstance.initial(EmailWorkflow, executionId) + ) ) yield* DurableDeferred.done(EmailTrigger, { token, @@ -76,11 +90,14 @@ describe.concurrent("ClusterWorkflowEngine", () => { expect(driver.requests.size).toEqual(10) // test poll - expect(yield* EmailWorkflow.poll(executionId)).toEqual(new Workflow.Complete({ exit: Exit.void })) - }).pipe(Effect.provide(TestWorkflowLayer))) + expect(yield* EmailWorkflow.poll(executionId)).toEqual( + new Workflow.Complete({ exit: Exit.void }) + ) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("interrupt", () => - Effect.gen(function*() { + Effect.gen(function* () { const sharding = yield* Sharding.Sharding const driver = yield* MessageStorage.MemoryDriver yield* TestClock.adjust(1) @@ -114,10 +131,7 @@ describe.concurrent("ClusterWorkflowEngine", () => { const result = driver.requests.get(envelope.requestId)! const reply = result.replies[0]! - assert( - reply._tag === "WithExit" && - reply.exit._tag === "Success" - ) + assert(reply._tag === "WithExit" && reply.exit._tag === "Success") const value = reply.exit.value as Workflow.ResultEncoded assert(value._tag === "Complete" && value.exit._tag === "Failure") @@ -126,12 +140,11 @@ describe.concurrent("ClusterWorkflowEngine", () => { const flags = yield* Flags assert.isTrue(flags.get("compensation")) - }).pipe( - Effect.provide(TestWorkflowLayer) - )) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("Workflow.withCompensation", () => - Effect.gen(function*() { + Effect.gen(function* () { yield* TestClock.adjust(1) const fiber = yield* EmailWorkflow.execute({ @@ -144,16 +157,13 @@ describe.concurrent("ClusterWorkflowEngine", () => { const flags = yield* Flags assert.isTrue(flags.get("compensation")) - const error = yield* Fiber.join(fiber).pipe( - Effect.flip - ) + const error = yield* Fiber.join(fiber).pipe(Effect.flip) expect(error).toBeInstanceOf(SendEmailError) - }).pipe( - Effect.provide(TestWorkflowLayer) - )) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("Activity.raceAll", () => - Effect.gen(function*() { + Effect.gen(function* () { const flags = yield* Flags yield* TestClock.adjust(1) @@ -170,10 +180,11 @@ describe.concurrent("ClusterWorkflowEngine", () => { expect(flags.get("interrupt1")).toBeTruthy() expect(flags.get("interrupt2")).toBeTruthy() expect(flags.get("interrupt3")).toBeFalsy() - }).pipe(Effect.provide(TestWorkflowLayer))) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("Activity.raceAll durable", () => - Effect.gen(function*() { + Effect.gen(function* () { const sharding = yield* Sharding.Sharding yield* TestClock.adjust(1) @@ -188,10 +199,11 @@ describe.concurrent("ClusterWorkflowEngine", () => { const result = yield* Fiber.join(fiber) expect(result).toEqual("Activity3") - }).pipe(Effect.provide(TestWorkflowLayer))) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("nested workflows", () => - Effect.gen(function*() { + Effect.gen(function* () { const flags = yield* Flags const sharding = yield* Sharding.Sharding yield* TestClock.adjust(1) @@ -216,10 +228,11 @@ describe.concurrent("ClusterWorkflowEngine", () => { yield* TestClock.adjust(5000) assert.isTrue(flags.get("parent-end")) assert.isTrue(flags.get("child-end")) - }).pipe(Effect.provide(TestWorkflowLayer))) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("SuspendOnFailure", () => - Effect.gen(function*() { + Effect.gen(function* () { const flags = yield* Flags yield* TestClock.adjust(1) @@ -230,10 +243,11 @@ describe.concurrent("ClusterWorkflowEngine", () => { assert.isTrue(flags.get("suspended")) assert.include(flags.get("cause"), "boom") - }).pipe(Effect.provide(TestWorkflowLayer))) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) it.effect("catchAllCause activity", () => - Effect.gen(function*() { + Effect.gen(function* () { const flags = yield* Flags yield* TestClock.adjust(1) @@ -244,7 +258,50 @@ describe.concurrent("ClusterWorkflowEngine", () => { yield* Fiber.join(fiber) assert.isTrue(flags.get("catch")) - }).pipe(Effect.provide(TestWorkflowLayer))) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) + + it.effect( + "forwards parent pointer when spawning a child with discard:true", + () => + Effect.gen(function* () { + const driver = yield* MessageStorage.MemoryDriver + const fiber = yield* DiscardParentWorkflow.execute({ + id: "discard-parent-1" + }).pipe(Effect.fork) + yield* TestClock.adjust(1) + yield* Fiber.join(fiber) + + const findRun = (entityType: string) => + driver.journal.find( + (envelope) => + envelope._tag === "Request" && + envelope.address.entityType === entityType && + envelope.tag === "run" + ) + const parentRun = findRun("Workflow/DiscardParentWorkflow") + const childRun = findRun("Workflow/DiscardChildWorkflow") + assert.exists( + parentRun, + "expected a run envelope for the parent workflow" + ) + assert.exists( + childRun, + "expected a run envelope for the child workflow" + ) + + const childPayload = (childRun as { payload: Record }) + .payload + const parent = childPayload["~@effect/workflow/parent"] as + | { workflowName: string; executionId: string } + | undefined + assert.exists(parent, "child payload should carry the parent pointer") + expect(parent!.workflowName).toEqual("DiscardParentWorkflow") + expect(parent!.executionId).toEqual( + (parentRun as { address: { entityId: string } }).address.entityId + ) + }).pipe(Effect.provide(TestWorkflowLayer)) + ) }) const TestShardingConfig = ShardingConfig.layer({ @@ -264,7 +321,9 @@ const TestWorkflowEngine = ClusterWorkflowEngine.layer.pipe( Layer.provide(TestShardingConfig) ) -class SendEmailError extends Schema.TaggedError("SendEmailError")("SendEmailError", { +class SendEmailError extends Schema.TaggedError( + "SendEmailError" +)("SendEmailError", { message: Schema.String }) {} @@ -284,68 +343,72 @@ class Flags extends Effect.Service()("Flags", { sync: () => new Map() }) {} -const EmailWorkflowLayer = EmailWorkflow.toLayer(Effect.fn(function*(payload) { - const flags = yield* Flags +const EmailWorkflowLayer = EmailWorkflow.toLayer( + Effect.fn(function* (payload) { + const flags = yield* Flags - yield* Effect.addFinalizer(() => - Effect.sync(() => { - flags.set("finalizer", true) - }) - ) - - yield* Activity.make({ - name: "SendEmail", - error: SendEmailError, - execute: Effect.gen(function*() { - const attempt = yield* Activity.CurrentAttempt + yield* Effect.addFinalizer(() => + Effect.sync(() => { + flags.set("finalizer", true) + }) + ) - if (attempt !== 5) { - return yield* new SendEmailError({ - message: `Failed to send email for ${payload.id} on attempt ${attempt}` + yield* Activity.make({ + name: "SendEmail", + error: SendEmailError, + execute: Effect.gen(function* () { + const attempt = yield* Activity.CurrentAttempt + + if (attempt !== 5) { + return yield* new SendEmailError({ + message: `Failed to send email for ${payload.id} on attempt ${attempt}` + }) + } + }) + }).pipe( + EmailWorkflow.withCompensation( + Effect.fnUntraced(function* () { + flags.set("compensation", true) }) - } - }) - }).pipe( - EmailWorkflow.withCompensation(Effect.fnUntraced(function*() { - flags.set("compensation", true) - })), - Activity.retry({ times: 5 }) - ) - - if (payload.to === "compensation") { - return yield* new SendEmailError({ message: `Compensation triggered` }) - } + ), + Activity.retry({ times: 5 }) + ) - const result = yield* Activity.make({ - name: "Sleep", - success: Schema.DateTimeUtc, - execute: Effect.gen(function*() { - // suspended inside Activity - yield* DurableClock.sleep({ - name: "Some sleep", - duration: "10 seconds", - inMemoryThreshold: Duration.zero + if (payload.to === "compensation") { + return yield* new SendEmailError({ message: `Compensation triggered` }) + } + + const result = yield* Activity.make({ + name: "Sleep", + success: Schema.DateTimeUtc, + execute: Effect.gen(function* () { + // suspended inside Activity + yield* DurableClock.sleep({ + name: "Some sleep", + duration: "10 seconds", + inMemoryThreshold: Duration.zero + }) + return yield* DateTime.now }) - return yield* DateTime.now }) + // test serialization from Activity + assert(DateTime.isUtc(result)) + + yield* DurableDeferred.token(EmailTrigger) + // suspended outside Activity + yield* DurableDeferred.await(EmailTrigger).pipe( + Effect.catchAllCause(() => { + flags.set("catchAllCause", true) + return Effect.void + }), + Effect.ensuring( + Effect.sync(() => { + flags.set("ensuring", true) + }) + ) + ) }) - // test serialization from Activity - assert(DateTime.isUtc(result)) - - yield* DurableDeferred.token(EmailTrigger) - // suspended outside Activity - yield* DurableDeferred.await(EmailTrigger).pipe( - Effect.catchAllCause(() => { - flags.set("catchAllCause", true) - return Effect.void - }), - Effect.ensuring(Effect.sync(() => { - flags.set("ensuring", true) - })) - ) -})).pipe( - Layer.provideMerge(Flags.Default) -) +).pipe(Layer.provideMerge(Flags.Default)) const EmailTrigger = DurableDeferred.make("EmailTrigger", { success: Schema.String @@ -360,45 +423,56 @@ const RaceWorkflow = Workflow.make({ idempotencyKey: ({ id }) => id }) -const RaceWorkflowLayer = RaceWorkflow.toLayer(Effect.fnUntraced(function*() { - const flags = yield* Flags +const RaceWorkflowLayer = RaceWorkflow.toLayer( + Effect.fnUntraced(function* () { + const flags = yield* Flags - yield* Effect.addFinalizer(() => - Effect.sync(() => { - flags.set("finalizer", true) - }) - ) + yield* Effect.addFinalizer(() => + Effect.sync(() => { + flags.set("finalizer", true) + }) + ) - return yield* Activity.raceAll("race", [ - Activity.make({ - name: "Activity1", - success: Schema.String, - error: Schema.Never, - execute: Effect.onInterrupt(Effect.delay(Effect.succeed("Activity1"), 1000), () => - Effect.sync(() => { - flags.set("interrupt1", true) - })) - }), - Activity.make({ - name: "Activity2", - success: Schema.String, - error: Schema.Never, - execute: Effect.onInterrupt(Effect.delay(Effect.succeed("Activity2"), 500), () => - Effect.sync(() => { - flags.set("interrupt2", true) - })) - }), - Activity.make({ - name: "Activity3", - success: Schema.String, - error: Schema.Never, - execute: Effect.onInterrupt(Effect.delay(Effect.succeed("Activity3"), 100), () => - Effect.sync(() => { - flags.set("interrupt3", true) - })) - }) - ]) -})) + return yield* Activity.raceAll("race", [ + Activity.make({ + name: "Activity1", + success: Schema.String, + error: Schema.Never, + execute: Effect.onInterrupt( + Effect.delay(Effect.succeed("Activity1"), 1000), + () => + Effect.sync(() => { + flags.set("interrupt1", true) + }) + ) + }), + Activity.make({ + name: "Activity2", + success: Schema.String, + error: Schema.Never, + execute: Effect.onInterrupt( + Effect.delay(Effect.succeed("Activity2"), 500), + () => + Effect.sync(() => { + flags.set("interrupt2", true) + }) + ) + }), + Activity.make({ + name: "Activity3", + success: Schema.String, + error: Schema.Never, + execute: Effect.onInterrupt( + Effect.delay(Effect.succeed("Activity3"), 100), + () => + Effect.sync(() => { + flags.set("interrupt3", true) + }) + ) + }) + ]) + }) +) const DurableRaceWorkflow = Workflow.make({ name: "DurableRaceWorkflow", @@ -409,54 +483,50 @@ const DurableRaceWorkflow = Workflow.make({ idempotencyKey: ({ id }) => id }) -const DurableRaceWorkflowLayer = DurableRaceWorkflow.toLayer(Effect.fnUntraced(function*() { - const flags = yield* Flags +const DurableRaceWorkflowLayer = DurableRaceWorkflow.toLayer( + Effect.fnUntraced(function* () { + const flags = yield* Flags - yield* Effect.addFinalizer(() => - Effect.sync(() => { - flags.set("finalizer", true) - }) - ) + yield* Effect.addFinalizer(() => + Effect.sync(() => { + flags.set("finalizer", true) + }) + ) - return yield* Activity.raceAll("race", [ - Activity.make({ - name: "Activity1", - success: Schema.String, - error: Schema.Never, - execute: DurableClock.sleep({ + return yield* Activity.raceAll("race", [ + Activity.make({ name: "Activity1", - duration: 50000, - inMemoryThreshold: Duration.zero - }).pipe( - Effect.as("Activity1") - ) - }), - Activity.make({ - name: "Activity2", - success: Schema.String, - error: Schema.Never, - execute: DurableClock.sleep({ + success: Schema.String, + error: Schema.Never, + execute: DurableClock.sleep({ + name: "Activity1", + duration: 50000, + inMemoryThreshold: Duration.zero + }).pipe(Effect.as("Activity1")) + }), + Activity.make({ name: "Activity2", - duration: 10000, - inMemoryThreshold: Duration.zero - }).pipe( - Effect.as("Activity2") - ) - }), - Activity.make({ - name: "Activity3", - success: Schema.String, - error: Schema.Never, - execute: DurableClock.sleep({ + success: Schema.String, + error: Schema.Never, + execute: DurableClock.sleep({ + name: "Activity2", + duration: 10000, + inMemoryThreshold: Duration.zero + }).pipe(Effect.as("Activity2")) + }), + Activity.make({ name: "Activity3", - duration: 1000, - inMemoryThreshold: Duration.zero - }).pipe( - Effect.as("Activity3") - ) - }) - ]) -})) + success: Schema.String, + error: Schema.Never, + execute: DurableClock.sleep({ + name: "Activity3", + duration: 1000, + inMemoryThreshold: Duration.zero + }).pipe(Effect.as("Activity3")) + }) + ]) + }) +) const ParentWorkflow = Workflow.make({ name: "ParentWorkflow", @@ -478,25 +548,60 @@ const ChildWorkflow = Workflow.make({ } }) -const ParentWorkflowLayer = ParentWorkflow.toLayer(Effect.fnUntraced(function*({ id }) { - const flags = yield* Flags - const instance = yield* WorkflowInstance - yield* Effect.addFinalizer(() => - Effect.sync(() => { - flags.set("parent-suspended", instance.suspended) - }) - ) - yield* ChildWorkflow.execute({ id }) - flags.set("parent-end", true) -})) +const ParentWorkflowLayer = ParentWorkflow.toLayer( + Effect.fnUntraced(function* ({ id }) { + const flags = yield* Flags + const instance = yield* WorkflowInstance + yield* Effect.addFinalizer(() => + Effect.sync(() => { + flags.set("parent-suspended", instance.suspended) + }) + ) + yield* ChildWorkflow.execute({ id }) + flags.set("parent-end", true) + }) +) const ChildDeferred = DurableDeferred.make("ChildDeferred") -const ChildWorkflowLayer = ChildWorkflow.toLayer(Effect.fnUntraced(function*() { - const flags = yield* Flags - flags.set("child-token", yield* DurableDeferred.token(ChildDeferred)) - yield* DurableDeferred.await(ChildDeferred) - flags.set("child-end", true) -})) +const ChildWorkflowLayer = ChildWorkflow.toLayer( + Effect.fnUntraced(function* () { + const flags = yield* Flags + flags.set("child-token", yield* DurableDeferred.token(ChildDeferred)) + yield* DurableDeferred.await(ChildDeferred) + flags.set("child-end", true) + }) +) + +const DiscardParentWorkflow = Workflow.make({ + name: "DiscardParentWorkflow", + payload: { id: Schema.String }, + idempotencyKey(payload) { + return payload.id + } +}) + +const DiscardChildWorkflow = Workflow.make({ + name: "DiscardChildWorkflow", + payload: { id: Schema.String }, + idempotencyKey(payload) { + return payload.id + } +}) + +const DiscardParentWorkflowLayer = DiscardParentWorkflow.toLayer( + Effect.fnUntraced(function* ({ id }) { + yield* DiscardChildWorkflow.execute( + { id: `${id}-child` }, + { discard: true } + ) + }) +) + +const DiscardChildWorkflowLayer = DiscardChildWorkflow.toLayer( + Effect.fnUntraced(function* () { + return yield* Effect.void + }) +) const SuspendOnFailureWorkflow = Workflow.make({ name: "SuspendOnFailureWorkflow", @@ -508,20 +613,22 @@ const SuspendOnFailureWorkflow = Workflow.make({ } }).annotate(Workflow.SuspendOnFailure, true) -const SuspendOnFailureWorkflowLayer = SuspendOnFailureWorkflow.toLayer(Effect.fnUntraced(function*() { - const flags = yield* Flags - const instance = yield* WorkflowInstance - yield* Effect.addFinalizer(() => - Effect.sync(() => { - flags.set("suspended", instance.suspended) - flags.set("cause", Cause.pretty(instance.cause!)) +const SuspendOnFailureWorkflowLayer = SuspendOnFailureWorkflow.toLayer( + Effect.fnUntraced(function* () { + const flags = yield* Flags + const instance = yield* WorkflowInstance + yield* Effect.addFinalizer(() => + Effect.sync(() => { + flags.set("suspended", instance.suspended) + flags.set("cause", Cause.pretty(instance.cause!)) + }) + ) + yield* Activity.make({ + name: "fail", + execute: Effect.die("boom") }) - ) - yield* Activity.make({ - name: "fail", - execute: Effect.die("boom") }) -})) +) const CatchWorkflow = Workflow.make({ name: "CatchWorkflow", @@ -533,29 +640,33 @@ const CatchWorkflow = Workflow.make({ } }) -const CatchWorkflowLayer = CatchWorkflow.toLayer(Effect.fnUntraced(function*() { - const flags = yield* Flags - yield* Activity.make({ - name: "fail", - execute: Effect.die("boom") - }).pipe( - Effect.catchAllCause((cause) => - Activity.make({ - name: "log", - execute: Effect.suspend(() => { - flags.set("catch", true) - return Effect.log(cause) +const CatchWorkflowLayer = CatchWorkflow.toLayer( + Effect.fnUntraced(function* () { + const flags = yield* Flags + yield* Activity.make({ + name: "fail", + execute: Effect.die("boom") + }).pipe( + Effect.catchAllCause((cause) => + Activity.make({ + name: "log", + execute: Effect.suspend(() => { + flags.set("catch", true) + return Effect.log(cause) + }) }) - }) + ) ) - ) -})) + }) +) const TestWorkflowLayer = EmailWorkflowLayer.pipe( Layer.merge(RaceWorkflowLayer), Layer.merge(DurableRaceWorkflowLayer), Layer.merge(ParentWorkflowLayer), Layer.merge(ChildWorkflowLayer), + Layer.merge(DiscardParentWorkflowLayer), + Layer.merge(DiscardChildWorkflowLayer), Layer.merge(SuspendOnFailureWorkflowLayer), Layer.merge(CatchWorkflowLayer), Layer.provideMerge(Flags.Default), diff --git a/repos/effect/packages/workflow/src/WorkflowEngine.ts b/repos/effect/packages/workflow/src/WorkflowEngine.ts index 5a2b58e..8ebba7e 100644 --- a/repos/effect/packages/workflow/src/WorkflowEngine.ts +++ b/repos/effect/packages/workflow/src/WorkflowEngine.ts @@ -21,7 +21,9 @@ import * as Workflow from "./Workflow.js" * @since 4.0.0 * @category Services */ -export class WorkflowEngine extends Context.Tag("@effect/workflow/WorkflowEngine")< +export class WorkflowEngine extends Context.Tag( + "@effect/workflow/WorkflowEngine" +)< WorkflowEngine, { /** @@ -44,12 +46,12 @@ export class WorkflowEngine extends Context.Tag("@effect/workflow/WorkflowEngine never, | Scope.Scope | Exclude< - R, - | WorkflowEngine - | WorkflowInstance - | Workflow.Execution - | Scope.Scope - > + R, + | WorkflowEngine + | WorkflowInstance + | Workflow.Execution + | Scope.Scope + > | Payload["Context"] | Success["Context"] | Error["Context"] @@ -77,9 +79,7 @@ export class WorkflowEngine extends Context.Tag("@effect/workflow/WorkflowEngine ) => Effect.Effect< Discard extends true ? string : Success["Type"], Error["Type"], - | Payload["Context"] - | Success["Context"] - | Error["Context"] + Payload["Context"] | Success["Context"] | Error["Context"] > /** @@ -128,10 +128,7 @@ export class WorkflowEngine extends Context.Tag("@effect/workflow/WorkflowEngine ) => Effect.Effect< Workflow.Result, never, - | Success["Context"] - | Error["Context"] - | R - | WorkflowInstance + Success["Context"] | Error["Context"] | R | WorkflowInstance > /** @@ -163,11 +160,7 @@ export class WorkflowEngine extends Context.Tag("@effect/workflow/WorkflowEngine readonly deferredName: string readonly exit: Exit.Exit } - ) => Effect.Effect< - void, - never, - Success["Context"] | Error["Context"] - > + ) => Effect.Effect /** * Schedule a wake up for a DurableClock @@ -186,7 +179,9 @@ export class WorkflowEngine extends Context.Tag("@effect/workflow/WorkflowEngine * @since 4.0.0 * @category Services */ -export class WorkflowInstance extends Context.Tag("@effect/workflow/WorkflowEngine/WorkflowInstance")< +export class WorkflowInstance extends Context.Tag( + "@effect/workflow/WorkflowEngine/WorkflowInstance" +)< WorkflowInstance, { /** @@ -283,11 +278,7 @@ export interface Encoded { readonly activityExecute: ( activity: Activity.Any, attempt: number - ) => Effect.Effect< - Workflow.Result, - never, - WorkflowInstance - > + ) => Effect.Effect, never, WorkflowInstance> readonly deferredResult: ( deferred: DurableDeferred.Any ) => Effect.Effect< @@ -316,18 +307,17 @@ export interface Encoded { */ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => WorkflowEngine.of({ - register: Effect.fnUntraced(function*(workflow, execute) { + register: Effect.fnUntraced(function* (workflow, execute) { const context = yield* Effect.context() yield* options.register(workflow, (payload, executionId) => - Effect.suspend(() => - execute(payload, executionId) - ).pipe( + Effect.suspend(() => execute(payload, executionId)).pipe( Effect.mapInputContext( (input) => Context.merge(context, input) as Context.Context ) - )) + ) + ) }), - execute: Effect.fnUntraced(function*< + execute: Effect.fnUntraced(function* < Name extends string, Payload extends Workflow.AnyStructSchema, Success extends Schema.Schema.Any, @@ -346,7 +336,8 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => ) { const payload = opts.payload const executionId = opts.executionId - const suspendedRetrySchedule = opts.suspendedRetrySchedule ?? defaultRetrySchedule + const suspendedRetrySchedule = + opts.suspendedRetrySchedule ?? defaultRetrySchedule yield* Effect.annotateCurrentSpan({ executionId }) let result: Workflow.Result | undefined @@ -366,7 +357,8 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => yield* options.execute(self, { executionId, payload: payload as object, - discard: true + discard: true, + parent: Option.getOrUndefined(parentInstance) }) return executionId } @@ -394,16 +386,22 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => if (result._tag === "Complete") { return yield* result.exit as Exit.Exit } - sleep ??= (yield* Schedule.driver(suspendedRetrySchedule)).next(void 0).pipe( - Effect.catchAll(() => Effect.dieMessage(`${self.name}.execute: suspendedRetrySchedule exhausted`)) - ) + sleep ??= (yield* Schedule.driver(suspendedRetrySchedule)) + .next(void 0) + .pipe( + Effect.catchAll(() => + Effect.dieMessage( + `${self.name}.execute: suspendedRetrySchedule exhausted` + ) + ) + ) yield* sleep } }), poll: options.poll, interrupt: options.interrupt, resume: options.resume, - activityExecute: Effect.fnUntraced(function*< + activityExecute: Effect.fnUntraced(function* < Success extends Schema.Schema.Any, Error extends Schema.Schema.All, R @@ -417,43 +415,43 @@ export const makeUnsafe = (options: Encoded): WorkflowEngine["Type"] => ) return new Workflow.Complete({ exit }) }), - deferredResult: Effect.fnUntraced( - function*( - deferred: DurableDeferred.DurableDeferred - ) { - const instance = yield* WorkflowInstance - yield* Effect.annotateCurrentSpan({ - executionId: instance.executionId - }) - const exit = yield* options.deferredResult(deferred) - if (exit === undefined) { - return exit - } - return yield* Effect.orDie( - Schema.decodeUnknown(deferred.exitSchema)(exit) - ) as Effect.Effect> + deferredResult: Effect.fnUntraced(function* < + Success extends Schema.Schema.Any, + Error extends Schema.Schema.All + >(deferred: DurableDeferred.DurableDeferred) { + const instance = yield* WorkflowInstance + yield* Effect.annotateCurrentSpan({ + executionId: instance.executionId + }) + const exit = yield* options.deferredResult(deferred) + if (exit === undefined) { + return exit } - ), - deferredDone: Effect.fnUntraced( - function*( - deferred: DurableDeferred.DurableDeferred, - opts: { - readonly workflowName: string - readonly executionId: string - readonly deferredName: string - readonly exit: Exit.Exit - } - ) { - return yield* options.deferredDone({ - workflowName: opts.workflowName, - executionId: opts.executionId, - deferredName: opts.deferredName, - exit: yield* Schema.encode(deferred.exitSchema)( - opts.exit - ) as Effect.Effect> - }) + return yield* Effect.orDie( + Schema.decodeUnknown(deferred.exitSchema)(exit) + ) as Effect.Effect> + }), + deferredDone: Effect.fnUntraced(function* < + Success extends Schema.Schema.Any, + Error extends Schema.Schema.All + >( + deferred: DurableDeferred.DurableDeferred, + opts: { + readonly workflowName: string + readonly executionId: string + readonly deferredName: string + readonly exit: Exit.Exit } - ), + ) { + return yield* options.deferredDone({ + workflowName: opts.workflowName, + executionId: opts.executionId, + deferredName: opts.deferredName, + exit: yield* Schema.encode(deferred.exitSchema)( + opts.exit + ) as Effect.Effect> + }) + }), scheduleClock: options.scheduleClock }) @@ -467,17 +465,20 @@ const defaultRetrySchedule = Schedule.exponential(200, 1.5).pipe( */ export const layerMemory: Layer.Layer = Layer.scoped( WorkflowEngine, - Effect.gen(function*() { + Effect.gen(function* () { const scope = yield* Effect.scope - const workflows = new Map Effect.Effect - readonly scope: Scope.Scope - }>() + const workflows = new Map< + string, + { + readonly workflow: Workflow.Any + readonly execute: ( + payload: object, + executionId: string + ) => Effect.Effect + readonly scope: Scope.Scope + } + >() type ExecutionState = { readonly payload: object @@ -497,7 +498,9 @@ export const layerMemory: Layer.Layer = Layer.scoped( } const activities = new Map() - const resume = Effect.fnUntraced(function*(executionId: string): Effect.fn.Return { + const resume = Effect.fnUntraced(function* ( + executionId: string + ): Effect.fn.Return { const state = executions.get(executionId) if (!state) return const exit = state.fiber?.unsafePoll() @@ -508,28 +511,35 @@ export const layerMemory: Layer.Layer = Layer.scoped( } const entry = workflows.get(state.instance.workflow.name)! - const instance = WorkflowInstance.initial(state.instance.workflow, state.instance.executionId) + const instance = WorkflowInstance.initial( + state.instance.workflow, + state.instance.executionId + ) instance.interrupted = state.instance.interrupted state.instance = instance - state.fiber = yield* state.execute(state.payload, state.instance.executionId).pipe( - Effect.onExit(() => { - if (!instance.interrupted) { - return Effect.void - } - instance.suspended = false - return Effect.withFiberRuntime((fiber) => Effect.interruptible(Fiber.interrupt(fiber))) - }), - Workflow.intoResult, - Effect.provideService(WorkflowInstance, instance), - Effect.provideService(WorkflowEngine, engine), - Effect.tap((result) => { - if (!state.parent || result._tag !== "Complete") { - return Effect.void - } - return Effect.forkIn(resume(state.parent), scope) - }), - Effect.forkIn(entry.scope) - ) + state.fiber = yield* state + .execute(state.payload, state.instance.executionId) + .pipe( + Effect.onExit(() => { + if (!instance.interrupted) { + return Effect.void + } + instance.suspended = false + return Effect.withFiberRuntime((fiber) => + Effect.interruptible(Fiber.interrupt(fiber)) + ) + }), + Workflow.intoResult, + Effect.provideService(WorkflowInstance, instance), + Effect.provideService(WorkflowEngine, engine), + Effect.tap((result) => { + if (!state.parent || result._tag !== "Complete") { + return Effect.void + } + return Effect.forkIn(resume(state.parent), scope) + }), + Effect.forkIn(entry.scope) + ) }) const deferredResults = new Map>() @@ -537,17 +547,19 @@ export const layerMemory: Layer.Layer = Layer.scoped( const clocks = yield* FiberMap.make() const engine = makeUnsafe({ - register: Effect.fnUntraced(function*(workflow, execute) { + register: Effect.fnUntraced(function* (workflow, execute) { workflows.set(workflow.name, { workflow, execute, scope: yield* Effect.scope }) }), - execute: Effect.fnUntraced(function*(workflow, options) { + execute: Effect.fnUntraced(function* (workflow, options) { const entry = workflows.get(workflow.name) if (!entry) { - return yield* Effect.die(`Workflow ${workflow.name} is not registered`) + return yield* Effect.die( + `Workflow ${workflow.name} is not registered` + ) } let state = executions.get(options.executionId) @@ -565,7 +577,7 @@ export const layerMemory: Layer.Layer = Layer.scoped( if (options.discard) return return (yield* Fiber.join(state.fiber!)) as any }), - interrupt: Effect.fnUntraced(function*(_workflow, executionId) { + interrupt: Effect.fnUntraced(function* (_workflow, executionId) { const state = executions.get(executionId) if (!state) return state.instance.interrupted = true @@ -574,13 +586,17 @@ export const layerMemory: Layer.Layer = Layer.scoped( resume(_workflow, executionId) { return resume(executionId) }, - activityExecute: Effect.fnUntraced(function*(activity, attempt) { + activityExecute: Effect.fnUntraced(function* (activity, attempt) { const instance = yield* WorkflowInstance const activityId = `${instance.executionId}/${activity.name}/${attempt}` let state = activities.get(activityId) if (state) { const exit = state.exit - if (exit && exit._tag === "Success" && exit.value._tag === "Suspended") { + if ( + exit && + exit._tag === "Success" && + exit.value._tag === "Suspended" + ) { state.exit = undefined } else if (exit) { return yield* exit @@ -589,7 +605,10 @@ export const layerMemory: Layer.Layer = Layer.scoped( state = { exit: undefined } activities.set(activityId, state) } - const activityInstance = WorkflowInstance.initial(instance.workflow, instance.executionId) + const activityInstance = WorkflowInstance.initial( + instance.workflow, + instance.executionId + ) activityInstance.interrupted = instance.interrupted return yield* activity.executeEncoded.pipe( Workflow.intoResult, @@ -609,7 +628,7 @@ export const layerMemory: Layer.Layer = Layer.scoped( const exit = state.fiber?.unsafePoll() return exit ?? Effect.succeed(undefined) }), - deferredResult: Effect.fnUntraced(function*(deferred) { + deferredResult: Effect.fnUntraced(function* (deferred) { const instance = yield* WorkflowInstance const id = `${instance.executionId}/${deferred.name}` return deferredResults.get(id) @@ -622,16 +641,22 @@ export const layerMemory: Layer.Layer = Layer.scoped( return resume(options.executionId) }), scheduleClock: (workflow, options) => - engine.deferredDone(options.clock.deferred, { - workflowName: workflow.name, - executionId: options.executionId, - deferredName: options.clock.deferred.name, - exit: Exit.void - }).pipe( - Effect.delay(options.clock.duration), - FiberMap.run(clocks, `${options.executionId}/${options.clock.name}`, { onlyIfMissing: true }), - Effect.asVoid - ) + engine + .deferredDone(options.clock.deferred, { + workflowName: workflow.name, + executionId: options.executionId, + deferredName: options.clock.deferred.name, + exit: Exit.void + }) + .pipe( + Effect.delay(options.clock.duration), + FiberMap.run( + clocks, + `${options.executionId}/${options.clock.name}`, + { onlyIfMissing: true } + ), + Effect.asVoid + ) }) return engine