- Introduction & Core Concepts
- Basic Usage Patterns
- Real-World Examples
- Advanced Features Deep Dive
- Practical Patterns & Best Practices
- Integration Examples
When working with concurrent stream processing and channel operations in Effect, you often need fine-grained control over how child executors behave during merge operations. Traditional approaches lead to rigid execution models that can't adapt to different workloads:
// Traditional approach - fixed behavior
const mergeStreams = Stream.mergeAll(streams, { concurrency: 4 })
// No control over when to continue, yield, or close individual streams
// All streams follow the same execution pattern
// Can lead to resource contention or inefficient schedulingThis approach leads to:
- Inflexible Scheduling - No control over when child streams should yield execution
- Resource Waste - Cannot close streams early when conditions are met
- Poor Load Balancing - No way to dynamically adjust execution priority
- Limited Customization - Cannot implement custom merge strategies
ChildExecutorDecision provides granular control over child executor behavior in concurrent channel operations, enabling dynamic scheduling decisions based on runtime conditions.
import { ChildExecutorDecision, Channel, Option } from "effect"
// Fine-grained control over child executor behavior
const customMergeLogic = (emittedValue: unknown): ChildExecutorDecision.ChildExecutorDecision => {
if (shouldContinueProcessing(emittedValue)) {
return ChildExecutorDecision.Continue()
}
if (shouldYieldToOthers(emittedValue)) {
return ChildExecutorDecision.Yield()
}
return ChildExecutorDecision.Close(emittedValue)
}Continue: Instructs the child executor to keep processing the current substream without yielding execution to other concurrent streams.
Yield: Tells the child executor to pause processing and allow other concurrent streams to execute, implementing cooperative multitasking.
Close: Terminates the current substream with a final value and transfers execution to the next available substream.
import { ChildExecutorDecision } from "effect"
// Create basic decisions
const continueDecision = ChildExecutorDecision.Continue()
const yieldDecision = ChildExecutorDecision.Yield()
const closeDecision = ChildExecutorDecision.Close("final-value")
// Type guards for decision checking
const decision: ChildExecutorDecision.ChildExecutorDecision = ChildExecutorDecision.Continue()
if (ChildExecutorDecision.isContinue(decision)) {
console.log("Decision is to continue processing")
}
if (ChildExecutorDecision.isYield(decision)) {
console.log("Decision is to yield execution")
}
if (ChildExecutorDecision.isClose(decision)) {
console.log("Decision is to close with value:", decision.value)
}import { ChildExecutorDecision } from "effect"
const handleDecision = (decision: ChildExecutorDecision.ChildExecutorDecision): string => {
return ChildExecutorDecision.match(decision, {
onContinue: () => "Continue processing current stream",
onYield: () => "Yield to allow other streams to process",
onClose: (value) => `Close stream with final value: ${value}`
})
}
// Usage
const decision = ChildExecutorDecision.Yield()
const result = handleDecision(decision) // "Yield to allow other streams to process"import { ChildExecutorDecision, Option } from "effect"
const makeDecisionBasedOnCondition = (
condition: Option.Option<boolean>
): ChildExecutorDecision.ChildExecutorDecision => {
return Option.match(condition, {
onNone: () => ChildExecutorDecision.Yield(),
onSome: (hasData) => hasData
? ChildExecutorDecision.Continue()
: ChildExecutorDecision.Close("no-data")
})
}This example shows how to implement intelligent load balancing for concurrent stream processing based on system load.
import {
ChildExecutorDecision,
Channel,
Effect,
Stream,
Ref,
Schedule,
pipe
} from "effect"
interface ProcessingMetrics {
readonly processed: number
readonly errors: number
readonly load: number
}
const createLoadBalancedProcessor = Effect.gen(function* () {
const metrics = yield* Ref.make<ProcessingMetrics>({
processed: 0,
errors: 0,
load: 0.0
})
const updateLoad = Effect.gen(function* () {
const currentMetrics = yield* Ref.get(metrics)
const newLoad = calculateSystemLoad(currentMetrics)
yield* Ref.update(metrics, m => ({ ...m, load: newLoad }))
})
const makeProcessingDecision = (item: unknown): Effect.Effect<ChildExecutorDecision.ChildExecutorDecision> => {
return Effect.gen(function* () {
const currentMetrics = yield* Ref.get(metrics)
// High load - yield to other streams
if (currentMetrics.load > 0.8) {
return ChildExecutorDecision.Yield()
}
// Error threshold reached - close this stream
if (currentMetrics.errors > 10) {
return ChildExecutorDecision.Close(`Too many errors: ${currentMetrics.errors}`)
}
// Normal processing - continue
yield* Ref.update(metrics, m => ({ ...m, processed: m.processed + 1 }))
return ChildExecutorDecision.Continue()
})
}
const processStream = (items: ReadonlyArray<unknown>) => {
return Stream.fromIterable(items).pipe(
Stream.mapEffect(processItem),
Stream.tap(() => updateLoad)
)
}
return { processStream, makeProcessingDecision, metrics }
})
const calculateSystemLoad = (metrics: ProcessingMetrics): number => {
// Simulate load calculation based on processed items and errors
return Math.min(1.0, (metrics.processed * 0.01) + (metrics.errors * 0.1))
}
const processItem = (item: unknown): Effect.Effect<string> => {
return Effect.gen(function* () {
// Simulate processing with potential errors
if (Math.random() < 0.1) {
yield* Effect.fail(`Processing failed for item: ${item}`)
}
return `Processed: ${item}`
})
}This example demonstrates how to use ChildExecutorDecision to implement priority-based processing where high-priority streams get more execution time.
import {
ChildExecutorDecision,
Channel,
Effect,
Stream,
Queue,
Ref,
pipe
} from "effect"
interface PriorityItem {
readonly data: unknown
readonly priority: number // 1-10, 10 being highest
readonly streamId: string
}
const createPriorityProcessor = Effect.gen(function* () {
const streamPriorities = yield* Ref.make<Map<string, number>>(new Map())
const executionCounts = yield* Ref.make<Map<string, number>>(new Map())
const updateStreamPriority = (streamId: string, priority: number) => {
return Ref.update(streamPriorities, map => new Map(map).set(streamId, priority))
}
const makePriorityDecision = (
item: PriorityItem
): Effect.Effect<ChildExecutorDecision.ChildExecutorDecision> => {
return Effect.gen(function* () {
const priorities = yield* Ref.get(streamPriorities)
const counts = yield* Ref.get(executionCounts)
const currentPriority = priorities.get(item.streamId) ?? 1
const currentCount = counts.get(item.streamId) ?? 0
// Update execution count
yield* Ref.update(executionCounts, map =>
new Map(map).set(item.streamId, currentCount + 1)
)
// High priority items get more continuous processing
if (currentPriority >= 8) {
return ChildExecutorDecision.Continue()
}
// Medium priority - yield after processing several items
if (currentPriority >= 5 && currentCount % 3 === 0) {
return ChildExecutorDecision.Yield()
}
// Low priority - yield frequently to allow others
if (currentPriority < 5 && currentCount % 2 === 0) {
return ChildExecutorDecision.Yield()
}
// Special condition - close if item indicates completion
if (isCompletionMarker(item.data)) {
return ChildExecutorDecision.Close(`Stream ${item.streamId} completed`)
}
return ChildExecutorDecision.Continue()
})
}
const createPriorityStream = (
items: ReadonlyArray<PriorityItem>
): Stream.Stream<PriorityItem> => {
return Stream.fromIterable(items).pipe(
Stream.tap(item => updateStreamPriority(item.streamId, item.priority))
)
}
return {
createPriorityStream,
makePriorityDecision,
updateStreamPriority,
streamPriorities,
executionCounts
}
})
const isCompletionMarker = (data: unknown): boolean => {
return typeof data === 'string' && data.includes('COMPLETE')
}
// Usage example
const runPriorityProcessing = Effect.gen(function* () {
const processor = yield* createPriorityProcessor
const highPriorityItems: ReadonlyArray<PriorityItem> = [
{ data: "urgent-task-1", priority: 9, streamId: "urgent" },
{ data: "urgent-task-2", priority: 9, streamId: "urgent" },
{ data: "COMPLETE", priority: 9, streamId: "urgent" }
]
const lowPriorityItems: ReadonlyArray<PriorityItem> = [
{ data: "background-task-1", priority: 3, streamId: "background" },
{ data: "background-task-2", priority: 3, streamId: "background" },
{ data: "background-task-3", priority: 3, streamId: "background" }
]
const urgentStream = processor.createPriorityStream(highPriorityItems)
const backgroundStream = processor.createPriorityStream(lowPriorityItems)
return Stream.mergeAll([urgentStream, backgroundStream], {
concurrency: 2,
bufferSize: 10
}).pipe(
Stream.runCollect
)
})This example shows how to use ChildExecutorDecision to manage resource consumption and implement backpressure in stream processing.
import {
ChildExecutorDecision,
Channel,
Effect,
Stream,
Ref,
Clock,
Schedule,
pipe
} from "effect"
interface ResourceManager {
readonly memoryUsage: Ref.Ref<number>
readonly cpuUsage: Ref.Ref<number>
readonly activeConnections: Ref.Ref<number>
readonly maxMemory: number
readonly maxCpu: number
readonly maxConnections: number
}
const createResourceManager = (
maxMemory: number = 1000,
maxCpu: number = 80,
maxConnections: number = 100
): Effect.Effect<ResourceManager> => {
return Effect.gen(function* () {
const memoryUsage = yield* Ref.make(0)
const cpuUsage = yield* Ref.make(0)
const activeConnections = yield* Ref.make(0)
return {
memoryUsage,
cpuUsage,
activeConnections,
maxMemory,
maxCpu,
maxConnections
}
})
}
const makeResourceAwareDecision = (
resourceManager: ResourceManager,
itemSize: number
): Effect.Effect<ChildExecutorDecision.ChildExecutorDecision> => {
return Effect.gen(function* () {
const memory = yield* Ref.get(resourceManager.memoryUsage)
const cpu = yield* Ref.get(resourceManager.cpuUsage)
const connections = yield* Ref.get(resourceManager.activeConnections)
// Critical resource usage - close stream to prevent system overload
if (memory > resourceManager.maxMemory * 0.95) {
return ChildExecutorDecision.Close("Memory limit exceeded")
}
if (cpu > resourceManager.maxCpu * 0.95) {
return ChildExecutorDecision.Close("CPU limit exceeded")
}
if (connections > resourceManager.maxConnections * 0.95) {
return ChildExecutorDecision.Close("Connection limit exceeded")
}
// High resource usage - yield to reduce load
if (memory > resourceManager.maxMemory * 0.8 ||
cpu > resourceManager.maxCpu * 0.8 ||
connections > resourceManager.maxConnections * 0.8) {
return ChildExecutorDecision.Yield()
}
// Normal usage - continue processing
yield* Ref.update(resourceManager.memoryUsage, m => m + itemSize)
yield* Ref.update(resourceManager.activeConnections, c => c + 1)
return ChildExecutorDecision.Continue()
})
}
const simulateResourceUsage = (
resourceManager: ResourceManager
): Effect.Effect<void> => {
return Effect.gen(function* () {
// Simulate fluctuating CPU usage
const newCpu = Math.random() * 100
yield* Ref.set(resourceManager.cpuUsage, newCpu)
// Simulate memory cleanup
yield* Ref.update(resourceManager.memoryUsage, m => Math.max(0, m - 10))
// Simulate connection cleanup
yield* Ref.update(resourceManager.activeConnections, c => Math.max(0, c - 1))
})
}
// Usage with concurrent streams
const runResourceAwareProcessing = Effect.gen(function* () {
const resourceManager = yield* createResourceManager(500, 70, 50)
const createManagedStream = (items: ReadonlyArray<unknown>, itemSize: number) => {
return Stream.fromIterable(items).pipe(
Stream.mapEffect(item =>
Effect.gen(function* () {
const decision = yield* makeResourceAwareDecision(resourceManager, itemSize)
yield* simulateResourceUsage(resourceManager)
return { item, decision }
})
)
)
}
const largeItemStream = createManagedStream(
["large-file-1.zip", "large-file-2.zip", "large-file-3.zip"],
100
)
const smallItemStream = createManagedStream(
["small-file-1.txt", "small-file-2.txt", "small-file-3.txt"],
10
)
return Stream.mergeAll([largeItemStream, smallItemStream], {
concurrency: 2
}).pipe(
Stream.take(10),
Stream.runCollect
)
})Pattern matching allows you to handle different decision types with sophisticated branching logic.
import { ChildExecutorDecision, pipe } from "effect"
const analyzeDecision = (decision: ChildExecutorDecision.ChildExecutorDecision) => {
return pipe(
decision,
ChildExecutorDecision.match({
onContinue: () => ({
action: "continue",
shouldLog: true,
nextStep: "process-next-item"
}),
onYield: () => ({
action: "yield",
shouldLog: false,
nextStep: "schedule-later"
}),
onClose: (value) => ({
action: "close",
shouldLog: true,
nextStep: "cleanup",
finalValue: value
})
})
)
}import { ChildExecutorDecision, Effect, Logger, Metric, pipe } from "effect"
interface DecisionMetrics {
readonly continueCount: Metric.Counter
readonly yieldCount: Metric.Counter
readonly closeCount: Metric.Counter
}
const createDecisionHandler = (metrics: DecisionMetrics) => {
return (decision: ChildExecutorDecision.ChildExecutorDecision) => {
return pipe(
decision,
ChildExecutorDecision.match({
onContinue: () => Effect.gen(function* () {
yield* Metric.increment(metrics.continueCount)
yield* Logger.info("Stream continuing execution")
return "CONTINUE"
}),
onYield: () => Effect.gen(function* () {
yield* Metric.increment(metrics.yieldCount)
yield* Logger.debug("Stream yielding execution")
return "YIELD"
}),
onClose: (value) => Effect.gen(function* () {
yield* Metric.increment(metrics.closeCount)
yield* Logger.info(`Stream closing with value: ${value}`)
return `CLOSE:${value}`
})
})
)
}
}import { ChildExecutorDecision, Effect, Array as Arr, pipe } from "effect"
const combineDecisions = (
decisions: ReadonlyArray<ChildExecutorDecision.ChildExecutorDecision>
): ChildExecutorDecision.ChildExecutorDecision => {
const hasContinue = decisions.some(ChildExecutorDecision.isContinue)
const hasClose = decisions.some(ChildExecutorDecision.isClose)
const hasYield = decisions.some(ChildExecutorDecision.isYield)
// Priority: Close > Continue > Yield
if (hasClose) {
const closeDecision = decisions.find(ChildExecutorDecision.isClose)!
return ChildExecutorDecision.Close(closeDecision.value)
}
if (hasContinue) {
return ChildExecutorDecision.Continue()
}
return ChildExecutorDecision.Yield()
}
const applyDecisionPolicy = (
decisions: ReadonlyArray<ChildExecutorDecision.ChildExecutorDecision>,
policy: "majority" | "priority" | "unanimous"
): ChildExecutorDecision.ChildExecutorDecision => {
switch (policy) {
case "majority":
return getMajorityDecision(decisions)
case "priority":
return combineDecisions(decisions)
case "unanimous":
return getUnanimousDecision(decisions)
}
}
const getMajorityDecision = (
decisions: ReadonlyArray<ChildExecutorDecision.ChildExecutorDecision>
): ChildExecutorDecision.ChildExecutorDecision => {
const counts = {
continue: decisions.filter(ChildExecutorDecision.isContinue).length,
yield: decisions.filter(ChildExecutorDecision.isYield).length,
close: decisions.filter(ChildExecutorDecision.isClose).length
}
const max = Math.max(counts.continue, counts.yield, counts.close)
if (counts.close === max) {
const closeDecision = decisions.find(ChildExecutorDecision.isClose)!
return ChildExecutorDecision.Close(closeDecision.value)
}
if (counts.continue === max) {
return ChildExecutorDecision.Continue()
}
return ChildExecutorDecision.Yield()
}
const getUnanimousDecision = (
decisions: ReadonlyArray<ChildExecutorDecision.ChildExecutorDecision>
): ChildExecutorDecision.ChildExecutorDecision => {
if (decisions.length === 0) {
return ChildExecutorDecision.Yield()
}
const first = decisions[0]
const allSame = decisions.every(decision => {
if (ChildExecutorDecision.isContinue(first) && ChildExecutorDecision.isContinue(decision)) {
return true
}
if (ChildExecutorDecision.isYield(first) && ChildExecutorDecision.isYield(decision)) {
return true
}
if (ChildExecutorDecision.isClose(first) && ChildExecutorDecision.isClose(decision)) {
return decision.value === first.value
}
return false
})
return allSame ? first : ChildExecutorDecision.Yield()
}Implement stateful decision making that adapts based on historical execution patterns.
import { ChildExecutorDecision, Effect, Ref, Array as Arr, pipe } from "effect"
interface DecisionHistory {
readonly decisions: ReadonlyArray<ChildExecutorDecision.ChildExecutorDecision>
readonly timestamps: ReadonlyArray<number>
readonly maxHistory: number
}
const createStatefulDecisionMaker = Effect.gen(function* () {
const history = yield* Ref.make<DecisionHistory>({
decisions: [],
timestamps: [],
maxHistory: 100
})
const recordDecision = (decision: ChildExecutorDecision.ChildExecutorDecision) => {
return Effect.gen(function* () {
const now = Date.now()
yield* Ref.update(history, h => ({
...h,
decisions: Arr.append(h.decisions, decision).slice(-h.maxHistory),
timestamps: Arr.append(h.timestamps, now).slice(-h.maxHistory)
}))
})
}
const makeAdaptiveDecision = (
currentCondition: unknown
): Effect.Effect<ChildExecutorDecision.ChildExecutorDecision> => {
return Effect.gen(function* () {
const h = yield* Ref.get(history)
const recentDecisions = h.decisions.slice(-10)
const yieldRatio = recentDecisions.filter(ChildExecutorDecision.isYield).length / recentDecisions.length
const continueRatio = recentDecisions.filter(ChildExecutorDecision.isContinue).length / recentDecisions.length
let decision: ChildExecutorDecision.ChildExecutorDecision
// Adaptive logic based on recent patterns
if (yieldRatio > 0.7) {
// Too much yielding, try to continue more
decision = ChildExecutorDecision.Continue()
} else if (continueRatio > 0.8) {
// Too much continuing, balance with yielding
decision = ChildExecutorDecision.Yield()
} else {
// Balanced approach based on current condition
decision = makeBasicDecision(currentCondition)
}
yield* recordDecision(decision)
return decision
})
}
const getDecisionStats = Effect.gen(function* () {
const h = yield* Ref.get(history)
const total = h.decisions.length
if (total === 0) {
return { continue: 0, yield: 0, close: 0, total: 0 }
}
return {
continue: h.decisions.filter(ChildExecutorDecision.isContinue).length / total,
yield: h.decisions.filter(ChildExecutorDecision.isYield).length / total,
close: h.decisions.filter(ChildExecutorDecision.isClose).length / total,
total
}
})
return { makeAdaptiveDecision, getDecisionStats, recordDecision }
})
const makeBasicDecision = (condition: unknown): ChildExecutorDecision.ChildExecutorDecision => {
if (typeof condition === 'string' && condition.includes('error')) {
return ChildExecutorDecision.Close(condition)
}
if (typeof condition === 'number' && condition > 0.5) {
return ChildExecutorDecision.Continue()
}
return ChildExecutorDecision.Yield()
}Create reusable decision factories for common scenarios.
import { ChildExecutorDecision, Effect, pipe } from "effect"
interface DecisionFactoryConfig {
readonly continueThreshold: number
readonly yieldThreshold: number
readonly closeOnError: boolean
readonly maxProcessingTime: number
}
const createDecisionFactory = (config: DecisionFactoryConfig) => {
return {
forLoad: (currentLoad: number): ChildExecutorDecision.ChildExecutorDecision => {
if (currentLoad > config.continueThreshold) {
return ChildExecutorDecision.Yield()
}
return ChildExecutorDecision.Continue()
},
forError: (error: unknown): ChildExecutorDecision.ChildExecutorDecision => {
if (config.closeOnError) {
return ChildExecutorDecision.Close(`Error: ${error}`)
}
return ChildExecutorDecision.Yield()
},
forTime: (processingTime: number): ChildExecutorDecision.ChildExecutorDecision => {
if (processingTime > config.maxProcessingTime) {
return ChildExecutorDecision.Close(`Timeout after ${processingTime}ms`)
}
if (processingTime > config.maxProcessingTime * 0.8) {
return ChildExecutorDecision.Yield()
}
return ChildExecutorDecision.Continue()
},
composite: (
load: number,
error: unknown | null,
time: number
): ChildExecutorDecision.ChildExecutorDecision => {
if (error && config.closeOnError) {
return ChildExecutorDecision.Close(`Error: ${error}`)
}
if (time > config.maxProcessingTime) {
return ChildExecutorDecision.Close(`Timeout after ${time}ms`)
}
if (load > config.continueThreshold || time > config.maxProcessingTime * 0.8) {
return ChildExecutorDecision.Yield()
}
return ChildExecutorDecision.Continue()
}
}
}
// Usage
const factory = createDecisionFactory({
continueThreshold: 0.7,
yieldThreshold: 0.5,
closeOnError: true,
maxProcessingTime: 5000
})
const decision = factory.composite(0.8, null, 1000) // Returns YieldChain multiple decision criteria together for complex logic.
import { ChildExecutorDecision, Effect, Option, pipe } from "effect"
type DecisionCriteria<T> = (input: T) => Option.Option<ChildExecutorDecision.ChildExecutorDecision>
const createDecisionChain = <T>(...criteria: ReadonlyArray<DecisionCriteria<T>>) => {
return (input: T): ChildExecutorDecision.ChildExecutorDecision => {
for (const criterion of criteria) {
const decision = criterion(input)
if (Option.isSome(decision)) {
return decision.value
}
}
// Default decision if no criteria match
return ChildExecutorDecision.Continue()
}
}
// Example criteria
const errorCriterion: DecisionCriteria<{ error?: string }> = (input) => {
return input.error
? Option.some(ChildExecutorDecision.Close(`Error: ${input.error}`))
: Option.none()
}
const loadCriterion: DecisionCriteria<{ load: number }> = (input) => {
return input.load > 0.8
? Option.some(ChildExecutorDecision.Yield())
: Option.none()
}
const timeCriterion: DecisionCriteria<{ processingTime: number }> = (input) => {
return input.processingTime > 10000
? Option.some(ChildExecutorDecision.Close(`Timeout: ${input.processingTime}ms`))
: Option.none()
}
// Usage
const decisionChain = createDecisionChain(
errorCriterion,
timeCriterion,
loadCriterion
)
const decision = decisionChain({
error: undefined,
load: 0.9,
processingTime: 5000
}) // Returns Yield due to high loadimport {
ChildExecutorDecision,
Channel,
Effect,
Stream,
UpstreamPullStrategy,
UpstreamPullRequest,
Option,
pipe
} from "effect"
const createCustomMergeChannel = <A>(
decisionMaker: (item: A) => ChildExecutorDecision.ChildExecutorDecision
) => {
return <InElem, OutErr, InErr, OutDone, InDone, Env>(
source: Channel.Channel<A, InElem, OutErr, InErr, OutDone, InDone, Env>
) => {
return Channel.concatMapWithCustom(
source,
(item: A) => Channel.write(item),
(done1: OutDone, done2: OutDone) => done1, // combineInners
(done1: OutDone, done2: OutDone) => done1, // combineAll
(pullRequest: UpstreamPullRequest.UpstreamPullRequest<A>) =>
UpstreamPullRequest.match(pullRequest, {
onPulled: () => UpstreamPullStrategy.PullAfterNext(Option.none()),
onNoUpstream: () => UpstreamPullStrategy.PullAfterAllEnqueued(Option.none())
}),
decisionMaker // This is where ChildExecutorDecision is used
)
}
}
// Usage with Stream
const createSmartMergeStream = <A>(
streams: ReadonlyArray<Stream.Stream<A>>,
decisionMaker: (item: A) => ChildExecutorDecision.ChildExecutorDecision
) => {
const channels = streams.map(stream => Stream.toChannel(stream))
const mergedChannel = channels.reduce((acc, channel) =>
Channel.mergeWith(acc, channel, {
onSelfDone: (exit) => Channel.fromExit(exit),
onOtherDone: (exit) => Channel.fromExit(exit)
})
)
return Channel.toStream(createCustomMergeChannel(decisionMaker)(mergedChannel))
}import {
ChildExecutorDecision,
Effect,
TestContext,
it,
expect,
pipe
} from "effect"
const testDecisionMaking = it.effect("should make correct decisions based on input", () =>
Effect.gen(function* () {
// Test Continue decision
const continueDecision = ChildExecutorDecision.Continue()
expect(ChildExecutorDecision.isContinue(continueDecision)).toBe(true)
expect(ChildExecutorDecision.isYield(continueDecision)).toBe(false)
expect(ChildExecutorDecision.isClose(continueDecision)).toBe(false)
// Test Yield decision
const yieldDecision = ChildExecutorDecision.Yield()
expect(ChildExecutorDecision.isYield(yieldDecision)).toBe(true)
expect(ChildExecutorDecision.isContinue(yieldDecision)).toBe(false)
expect(ChildExecutorDecision.isClose(yieldDecision)).toBe(false)
// Test Close decision
const closeDecision = ChildExecutorDecision.Close("test-value")
expect(ChildExecutorDecision.isClose(closeDecision)).toBe(true)
expect(ChildExecutorDecision.isContinue(closeDecision)).toBe(false)
expect(ChildExecutorDecision.isYield(closeDecision)).toBe(false)
// Test pattern matching
const matchResult = ChildExecutorDecision.match(closeDecision, {
onContinue: () => "continue",
onYield: () => "yield",
onClose: (value) => `close-${value}`
})
expect(matchResult).toBe("close-test-value")
})
)
// Mock decision maker for testing
const createMockDecisionMaker = (decisions: ReadonlyArray<ChildExecutorDecision.ChildExecutorDecision>) => {
let index = 0
return (_item: unknown): ChildExecutorDecision.ChildExecutorDecision => {
const decision = decisions[index % decisions.length]
index++
return decision
}
}
const testDecisionSequence = it.effect("should handle decision sequences correctly", () =>
Effect.gen(function* () {
const decisions = [
ChildExecutorDecision.Continue(),
ChildExecutorDecision.Yield(),
ChildExecutorDecision.Close("done")
]
const mockDecisionMaker = createMockDecisionMaker(decisions)
const results = [
mockDecisionMaker("item1"),
mockDecisionMaker("item2"),
mockDecisionMaker("item3")
]
expect(ChildExecutorDecision.isContinue(results[0])).toBe(true)
expect(ChildExecutorDecision.isYield(results[1])).toBe(true)
expect(ChildExecutorDecision.isClose(results[2])).toBe(true)
})
)
// Property-based testing with fast-check
import { Arbitrary, fc } from "effect"
const ChildExecutorDecisionArbitrary = fc.oneof(
fc.constant(ChildExecutorDecision.Continue()),
fc.constant(ChildExecutorDecision.Yield()),
fc.string().map(s => ChildExecutorDecision.Close(s))
)
const testDecisionProperties = it.effect("decision pattern matching should be exhaustive", () =>
Effect.gen(function* () {
yield* Arbitrary.check(ChildExecutorDecisionArbitrary, (decision) => {
const result = ChildExecutorDecision.match(decision, {
onContinue: () => "continue",
onYield: () => "yield",
onClose: (value) => `close-${value}`
})
// Should always return a string result
return typeof result === 'string' && result.length > 0
})
})
)ChildExecutorDecision provides fine-grained control over concurrent stream execution, enabling adaptive load balancing, priority-based processing, and resource-aware scheduling for Effect applications.
Key benefits:
- Granular Control: Make precise decisions about stream execution flow at the individual item level
- Dynamic Adaptation: Adjust processing behavior based on runtime conditions, load, and system state
- Resource Management: Implement sophisticated backpressure and resource-aware processing strategies
- Composable Logic: Chain and combine decision criteria for complex, maintainable execution policies
ChildExecutorDecision is essential when building high-performance concurrent applications that need to dynamically balance throughput, resource usage, and responsiveness based on changing conditions.