From 208db31e5adfc7462a84c2d668aa10f3dacf4974 Mon Sep 17 00:00:00 2001 From: Jamie Quadri Date: Mon, 15 Sep 2025 20:20:31 -0500 Subject: [PATCH 1/3] [fix]: add host-level logic for reentrant event handling --- Workflow/Sources/SubtreeManager.swift | 43 +++++---- Workflow/Sources/WorkflowHost.swift | 89 +++++++++++++++++- Workflow/Tests/TestUtilities.swift | 3 +- Workflow/Tests/WorkflowHostTests.swift | 121 ++++++++++++++++++++++--- 4 files changed, 222 insertions(+), 34 deletions(-) diff --git a/Workflow/Sources/SubtreeManager.swift b/Workflow/Sources/SubtreeManager.swift index cf262e8c3..f471d5436 100644 --- a/Workflow/Sources/SubtreeManager.swift +++ b/Workflow/Sources/SubtreeManager.swift @@ -281,7 +281,10 @@ extension WorkflowNode.SubtreeManager { func makeSink( of actionType: Action.Type ) -> Sink where WorkflowType == Action.WorkflowType { - let reusableSink = sinkStore.findOrCreate(actionType: Action.self) + let reusableSink = sinkStore.findOrCreate( + actionType: Action.self, + onSinkEvent: hostContext.onSinkEvent + ) let sink = Sink { [weak reusableSink] action in WorkflowLogger.logSinkEvent(ref: SignpostRef(), action: action) @@ -320,7 +323,10 @@ extension WorkflowNode.SubtreeManager { self.usedSinks = [:] } - mutating func findOrCreate(actionType: Action.Type) -> ReusableSink { + mutating func findOrCreate( + actionType: Action.Type, + onSinkEvent: @escaping OnSinkEvent + ) -> ReusableSink { let key = ObjectIdentifier(actionType) let reusableSink: ReusableSink @@ -334,7 +340,7 @@ extension WorkflowNode.SubtreeManager { reusableSink = usedSink } else { // Create a new reusable sink. - reusableSink = ReusableSink() + reusableSink = ReusableSink(onSinkEvent: onSinkEvent) } usedSinks[key] = reusableSink @@ -345,30 +351,33 @@ extension WorkflowNode.SubtreeManager { /// Type-erased base class for reusable sinks. fileprivate class AnyReusableSink { + /// The callback to invoke when an event is to be handled. + let onSinkEvent: OnSinkEvent var eventPipe: EventPipe - init() { + init(onSinkEvent: @escaping OnSinkEvent) { + self.onSinkEvent = onSinkEvent self.eventPipe = EventPipe() } } fileprivate final class ReusableSink: AnyReusableSink where Action.WorkflowType == WorkflowType { func handle(action: Action) { - let output = Output.update( - action, - source: .external, - subtreeInvalidated: false // initial state - ) + let perform: () -> Void = { + let output = Output.update( + action, + source: .external, + subtreeInvalidated: false // initial state + ) - if case .pending = eventPipe.validationState { - // Workflow is currently processing an `event`. - // Scheduling it to be processed after. - DispatchQueue.workflowExecution.async { [weak self] in - self?.eventPipe.handle(event: output) - } - return + self.eventPipe.handle(event: output) } - eventPipe.handle(event: output) + + let enqueue: () -> Void = { [weak self] in + self?.handle(action: action) + } + + onSinkEvent(perform, enqueue) } } } diff --git a/Workflow/Sources/WorkflowHost.swift b/Workflow/Sources/WorkflowHost.swift index 3dcc43689..59e34dc35 100644 --- a/Workflow/Sources/WorkflowHost.swift +++ b/Workflow/Sources/WorkflowHost.swift @@ -14,6 +14,7 @@ * limitations under the License. */ +import Dispatch import ReactiveSwift /// Defines a type that receives debug information about a running workflow hierarchy. @@ -50,6 +51,8 @@ public final class WorkflowHost { context.debugger } + let eventHandler: SinkEventHandler + /// Initializes a new host with the given workflow at the root. /// /// - Parameter workflow: The root workflow in the hierarchy @@ -61,6 +64,13 @@ public final class WorkflowHost { observers: [WorkflowObserver] = [], debugger: WorkflowDebugger? = nil ) { + self.eventHandler = SinkEventHandler() + assert( + eventHandler.state == .initializing, + "EventHandler must begin in the `.initializing` state" + ) + defer { eventHandler.state = .ready } + let observer = WorkflowObservation .sharedObserversInterceptor .workflowObservers(for: observers) @@ -69,7 +79,8 @@ public final class WorkflowHost { self.context = HostContext( observer: observer, debugger: debugger, - runtimeConfig: Runtime.configuration + runtimeConfig: Runtime.configuration, + onSinkEvent: eventHandler.makeOnSinkEventCallback() ) self.rootNode = WorkflowNode( @@ -158,14 +169,19 @@ struct HostContext { let debugger: WorkflowDebugger? let runtimeConfig: Runtime.Configuration + /// Event handler to be plumbed through the runtime down to the Sinks + let onSinkEvent: OnSinkEvent + init( observer: WorkflowObserver?, debugger: WorkflowDebugger?, - runtimeConfig: Runtime.Configuration + runtimeConfig: Runtime.Configuration, + onSinkEvent: @escaping OnSinkEvent ) { self.observer = observer self.debugger = debugger self.runtimeConfig = runtimeConfig + self.onSinkEvent = onSinkEvent } } @@ -176,3 +192,72 @@ extension HostContext { debugger != nil ? perform() : nil } } + +// MARK: - EventHandler + +/// Callback signature for the internal `ReusableSink` types to invoke when +/// they receive an event from the 'outside world'. +/// - Parameter perform: The event handler to invoke if the event can be processed immediately. +/// - Parameter enqueue: The event handler to invoke in the future if the event cannot currently be processed. +typealias OnSinkEvent = ( + _ perform: () -> Void, + _ enqueue: @escaping () -> Void +) -> Void + +/// Handles events from 'Sinks' such that runtime-level event handling state is appropriately +/// managed, and attempts to perform reentrant action handling can be detected and dealt with. +final class SinkEventHandler { + enum State { + /// The handler (and related components) are being + /// initialized, and are not yet ready to process events. + /// Attempts to do so in this state will fail with a fatal error. + case initializing + + /// An event is currently being processed. + case processingEvent + + /// Ready to handle an event. + case ready + } + + fileprivate(set) var state: State = .initializing + + /// Synchronously performs or enqueues the specified event handlers based on the current + /// event handler state. + /// - Parameters: + /// - perform: The event handling action to perform immediately if possible. + /// - enqueue: The event handling action to enqueue if the event handler is already processing an event. + func performOrEnqueueEvent( + perform: () -> Void, + enqueue: @escaping () -> Void + ) { + switch state { + case .initializing: + fatalError("Tried to handle event before finishing initialization.") + + case .processingEvent: + DispatchQueue.workflowExecution.async(execute: enqueue) + + case .ready: + state = .processingEvent + defer { state = .ready } + perform() + } + } + + /// Creates the callback that should be invoked by Sinks to handle their event appropriately + /// given the `EventHandler`'s current state. + /// - Returns: The callback that should be invoked. + func makeOnSinkEventCallback() -> OnSinkEvent { + // TODO: do we need the weak ref? + let onSinkEvent: OnSinkEvent = { [weak self] perform, enqueue in + guard let self else { + return // TODO: what's the appropriate handling? + } + + performOrEnqueueEvent(perform: perform, enqueue: enqueue) + } + + return onSinkEvent + } +} diff --git a/Workflow/Tests/TestUtilities.swift b/Workflow/Tests/TestUtilities.swift index ddbf4f782..3bad6c55b 100644 --- a/Workflow/Tests/TestUtilities.swift +++ b/Workflow/Tests/TestUtilities.swift @@ -68,7 +68,8 @@ extension HostContext { HostContext( observer: observer, debugger: debugger, - runtimeConfig: runtimeConfig + runtimeConfig: runtimeConfig, + onSinkEvent: { perform, _ in perform() } ) } } diff --git a/Workflow/Tests/WorkflowHostTests.swift b/Workflow/Tests/WorkflowHostTests.swift index 12b9907b9..060246ad7 100644 --- a/Workflow/Tests/WorkflowHostTests.swift +++ b/Workflow/Tests/WorkflowHostTests.swift @@ -14,7 +14,9 @@ * limitations under the License. */ +import ReactiveSwift import XCTest + @_spi(WorkflowRuntimeConfig) @testable import Workflow final class WorkflowHostTests: XCTestCase { @@ -58,32 +60,84 @@ final class WorkflowHostTests: XCTestCase { final class WorkflowHost_EventEmissionTests: XCTestCase { // Previous versions of Workflow would fatalError under this scenario func test_event_sent_to_invalidated_sink_during_action_handling() { - let root = Parent() - let host = WorkflowHost(workflow: root) + let host = WorkflowHost(workflow: Parent()) + let (lifetime, token) = ReactiveSwift.Lifetime.make() + defer { _ = token } let initialRendering = host.rendering.value var observedRenderCount = 0 XCTAssertEqual(initialRendering.eventCount, 0) - let disposable = host.rendering.signal.observeValues { rendering in - XCTAssertEqual(rendering.eventCount, 1) + host + .rendering + .signal + .take(during: lifetime) + .observeValues { rendering in + XCTAssertEqual(rendering.eventCount, 1) + + // emit another event using an old rendering + // while the first is still being processed, but + // the workflow that handles the event has been + // removed from the tree + if observedRenderCount == 0 { + initialRendering.eventHandler() + } - // emit another event using an old rendering - // while the first is still being processed, but - // the workflow that handles the event has been - // removed from the tree - if observedRenderCount == 0 { - initialRendering.eventHandler() + observedRenderCount += 1 } - observedRenderCount += 1 - } - defer { disposable?.dispose() } - // send an event and cause a re-render initialRendering.eventHandler() XCTAssertEqual(observedRenderCount, 1) + + drainMainQueueBySpinningRunLoop() + + // Ensure the invalidated sink doesn't process the event + let nextRendering = host.rendering.value + XCTAssertEqual(nextRendering.eventCount, 1) + XCTAssertEqual(observedRenderCount, 1) + } + + func test_reentrant_event_during_render() { + let host = WorkflowHost(workflow: ReentrancyWorkflow()) + let (lifetime, token) = ReactiveSwift.Lifetime.make() + defer { _ = token } + let initialRendering = host.rendering.value + + var emitReentrantEvent = false + + let renderExpectation = expectation(description: "render") + renderExpectation.expectedFulfillmentCount = 2 + + host + .rendering + .signal + .take(during: lifetime) + .observeValues { val in + defer { renderExpectation.fulfill() } + defer { emitReentrantEvent = true } + guard !emitReentrantEvent else { return } + + // In a prior implementation, this would check state local + // to the underlying EventPipe and defer event handling + // into the future. If the RunLoop was spun after that + // point, the action could attempt to be handled and an + // we'd hit a trap when sending a sink an action in an + // invalid state. + // + // 'Real world' code could hit this case as there are some + // UI bindings that fire when a rendering/output is updated + // that call into system API that do sometimes spin the + // RunLoop manually (e.g. stuff calling into WebKit). + initialRendering.sink.send(.event) + drainMainQueueBySpinningRunLoop() + } + + // Send an event and cause a re-render + initialRendering.sink.send(.event) + + waitForExpectations(timeout: 1) } } @@ -115,6 +169,35 @@ extension WorkflowHostTests { // MARK: Utility Types +extension WorkflowHost_EventEmissionTests { + struct ReentrancyWorkflow: Workflow { + typealias State = Void + typealias Output = Never + + struct Rendering { + var sink: Sink! + } + + func render(state: Void, context: RenderContext) -> Rendering { + let sink = context.makeSink(of: Action.self) + return Rendering(sink: sink) + } + + enum Action: WorkflowAction { + typealias WorkflowType = ReentrancyWorkflow + + case event + + func apply( + toState state: inout WorkflowType.State, + context: ApplyContext + ) -> WorkflowType.Output? { + nil + } + } + } +} + extension WorkflowHost_EventEmissionTests { struct Parent: Workflow { struct Rendering { @@ -182,3 +265,13 @@ extension WorkflowHost_EventEmissionTests { } } } + +private func drainMainQueueBySpinningRunLoop(timeoutSeconds: UInt = 1) { + var done = false + DispatchQueue.main.async { done = true } + + let deadline = ContinuousClock.now + .seconds(timeoutSeconds) + while !done, ContinuousClock.now < deadline { + RunLoop.current.run(until: .now.addingTimeInterval(0.01)) + } +} From 524840816dc6fb67bcaea3559bd07967b926fd6d Mon Sep 17 00:00:00 2001 From: Jamie Quadri Date: Wed, 24 Sep 2025 17:27:15 -0500 Subject: [PATCH 2/3] [feedback]: renames, refactors, tests --- Workflow/Sources/SubtreeManager.swift | 8 +- Workflow/Sources/WorkflowHost.swift | 92 ++++++++----- Workflow/Tests/SinkEventHandlerTests.swift | 149 +++++++++++++++++++++ Workflow/Tests/TestUtilities.swift | 68 ++++++++++ Workflow/Tests/WorkflowHostTests.swift | 127 ++++++++++++++++-- Workflow/Tests/WorkflowObserverTests.swift | 47 ------- 6 files changed, 394 insertions(+), 97 deletions(-) create mode 100644 Workflow/Tests/SinkEventHandlerTests.swift diff --git a/Workflow/Sources/SubtreeManager.swift b/Workflow/Sources/SubtreeManager.swift index f471d5436..fb87f3bc0 100644 --- a/Workflow/Sources/SubtreeManager.swift +++ b/Workflow/Sources/SubtreeManager.swift @@ -363,7 +363,8 @@ extension WorkflowNode.SubtreeManager { fileprivate final class ReusableSink: AnyReusableSink where Action.WorkflowType == WorkflowType { func handle(action: Action) { - let perform: () -> Void = { + // If we can process now, forward through the `EventPipe` + let immediatePerform: () -> Void = { let output = Output.update( action, source: .external, @@ -373,11 +374,12 @@ extension WorkflowNode.SubtreeManager { self.eventPipe.handle(event: output) } - let enqueue: () -> Void = { [weak self] in + // Otherwise, try to recurse again in the future + let deferredPerform: () -> Void = { [weak self] in self?.handle(action: action) } - onSinkEvent(perform, enqueue) + onSinkEvent(immediatePerform, deferredPerform) } } } diff --git a/Workflow/Sources/WorkflowHost.swift b/Workflow/Sources/WorkflowHost.swift index 59e34dc35..2b1288a9b 100644 --- a/Workflow/Sources/WorkflowHost.swift +++ b/Workflow/Sources/WorkflowHost.swift @@ -51,7 +51,7 @@ public final class WorkflowHost { context.debugger } - let eventHandler: SinkEventHandler + let sinkEventHandler: SinkEventHandler /// Initializes a new host with the given workflow at the root. /// @@ -64,12 +64,8 @@ public final class WorkflowHost { observers: [WorkflowObserver] = [], debugger: WorkflowDebugger? = nil ) { - self.eventHandler = SinkEventHandler() - assert( - eventHandler.state == .initializing, - "EventHandler must begin in the `.initializing` state" - ) - defer { eventHandler.state = .ready } + self.sinkEventHandler = SinkEventHandler() + defer { sinkEventHandler.state = .ready } let observer = WorkflowObservation .sharedObserversInterceptor @@ -80,7 +76,7 @@ public final class WorkflowHost { observer: observer, debugger: debugger, runtimeConfig: Runtime.configuration, - onSinkEvent: eventHandler.makeOnSinkEventCallback() + onSinkEvent: sinkEventHandler.makeOnSinkEventCallback() ) self.rootNode = WorkflowNode( @@ -102,6 +98,12 @@ public final class WorkflowHost { /// Update the input for the workflow. Will cause a render pass. public func update(workflow: WorkflowType) { + sinkEventHandler.withEventHandlingSuspended { + _update(workflow: workflow) + } + } + + private func _update(workflow: WorkflowType) { rootNode.update(workflow: workflow) // Treat the update as an "output" from the workflow originating from an external event to force a render pass. @@ -193,69 +195,85 @@ extension HostContext { } } -// MARK: - EventHandler +// MARK: - SinkEventHandler /// Callback signature for the internal `ReusableSink` types to invoke when /// they receive an event from the 'outside world'. -/// - Parameter perform: The event handler to invoke if the event can be processed immediately. -/// - Parameter enqueue: The event handler to invoke in the future if the event cannot currently be processed. +/// - Parameter immediatePerform: The event handler to invoke if the event can be processed immediately. +/// - Parameter deferredPerform: The event handler to invoke in the future if the event cannot currently be processed. typealias OnSinkEvent = ( - _ perform: () -> Void, - _ enqueue: @escaping () -> Void + _ immediatePerform: () -> Void, + _ deferredPerform: @escaping () -> Void ) -> Void /// Handles events from 'Sinks' such that runtime-level event handling state is appropriately /// managed, and attempts to perform reentrant action handling can be detected and dealt with. final class SinkEventHandler { enum State { - /// The handler (and related components) are being - /// initialized, and are not yet ready to process events. - /// Attempts to do so in this state will fail with a fatal error. - case initializing - - /// An event is currently being processed. - case processingEvent - /// Ready to handle an event. case ready + + /// The event handler is busy. Usually this indicates another event is being + /// processed, but it may also be set when some other condition prevents + /// event handling (e.g. a `WorkflowHost` was told to update its root node). + case busy } - fileprivate(set) var state: State = .initializing + fileprivate(set) var state: State + + init(state: State = .busy) { + self.state = state + } /// Synchronously performs or enqueues the specified event handlers based on the current /// event handler state. /// - Parameters: - /// - perform: The event handling action to perform immediately if possible. - /// - enqueue: The event handling action to enqueue if the event handler is already processing an event. + /// - immediate: The event handling action to perform immediately if possible. + /// - deferred: The event handling action to enqueue if the event handler is already processing an event. func performOrEnqueueEvent( - perform: () -> Void, - enqueue: @escaping () -> Void + immediate: () -> Void, + deferred: @escaping () -> Void ) { switch state { - case .initializing: - fatalError("Tried to handle event before finishing initialization.") + case .ready: + withEventHandlingSuspended(immediate) - case .processingEvent: - DispatchQueue.workflowExecution.async(execute: enqueue) + case .busy: + DispatchQueue.workflowExecution.async(execute: deferred) + } + } + /// Invokes the given closure with event handling explicitly set to the `busy` state, so + /// any incoming events produced while executing the closure's body will be enqueued. + /// - Parameter body: The closure to invoke. + func withEventHandlingSuspended(_ body: () -> Void) { + switch state { case .ready: - state = .processingEvent + state = .busy defer { state = .ready } - perform() + body() + + case .busy: + body() } } /// Creates the callback that should be invoked by Sinks to handle their event appropriately - /// given the `EventHandler`'s current state. + /// given the `SinkEventHandler`'s current state. /// - Returns: The callback that should be invoked. func makeOnSinkEventCallback() -> OnSinkEvent { - // TODO: do we need the weak ref? - let onSinkEvent: OnSinkEvent = { [weak self] perform, enqueue in + // We may not actually need the weak ref, but it's more defensive to keep it. + let onSinkEvent: OnSinkEvent = { [weak self] immediate, deferred in guard let self else { - return // TODO: what's the appropriate handling? + // We just drop the events here. Should we signal this somehow? + // Maybe as a debug-only thing? Or is it just noise? + return } - performOrEnqueueEvent(perform: perform, enqueue: enqueue) + performOrEnqueueEvent( + immediate: immediate, + deferred: deferred + ) } return onSinkEvent diff --git a/Workflow/Tests/SinkEventHandlerTests.swift b/Workflow/Tests/SinkEventHandlerTests.swift new file mode 100644 index 000000000..7ee6c90a0 --- /dev/null +++ b/Workflow/Tests/SinkEventHandlerTests.swift @@ -0,0 +1,149 @@ +/* + * Copyright 2025 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Testing + +@testable import Workflow + +struct SinkEventHandlerTests { + @Test + func initialState() async throws { + let subject = SinkEventHandler() + #expect(subject.state == .busy) + } + + @Test + func stateTransitions() async throws { + let subject = SinkEventHandler(state: .ready) + + #expect(subject.state == .ready) + + var stateDuringPerform: SinkEventHandler.State? + var stateDuringEnqueue: SinkEventHandler.State? + subject.performOrEnqueueEvent { + stateDuringPerform = subject.state + } deferred: { + stateDuringEnqueue = subject.state + } + + #expect(stateDuringPerform == .busy) + #expect(stateDuringEnqueue == nil) + #expect(subject.state == .ready) + } + + // we are asserting things & depending on the main dispatch queue + @MainActor + @Test + func reentrancyHandling() async throws { + let subject = SinkEventHandler(state: .ready) + + var performCount = 0 + var enqueueCount = 0 + let incrementPerform = { performCount += 1 } + let incrementEnqueue = { enqueueCount += 1 } + + subject.performOrEnqueueEvent { + incrementPerform() + subject.performOrEnqueueEvent { + Issue.record("should not perform") + } deferred: { + incrementEnqueue() + } + } deferred: { + Issue.record("should not enqueue") + } + + // should have synchronously performed once and not yet enqueued + #expect(performCount == 1) + #expect(enqueueCount == 0) + + await drainMainQueue() + + // should have invoked the enqueued event + #expect(performCount == 1) + #expect(enqueueCount == 1) + } + + @MainActor + @Test + func callbackReentrancyHandling() async throws { + let subject = SinkEventHandler(state: .ready) + + var performCount = 0 + var enqueueCount = 0 + let incrementPerform = { performCount += 1 } + let incrementEnqueue = { enqueueCount += 1 } + + let callback = subject.makeOnSinkEventCallback() + + callback( /* immediatePerform */ { + incrementPerform() + callback( /* immediatePerform */ { + Issue.record("should not perform") + }, /* deferredPerform */ { + incrementEnqueue() + }) + }, /* deferredPerform */ { + Issue.record("should not enqueue") + }) + + // should have synchronously performed once and not yet enqueued + #expect(performCount == 1) + #expect(enqueueCount == 0) + + await drainMainQueue() + + // should have invoked the enqueued event + #expect(performCount == 1) + #expect(enqueueCount == 1) + } + + @MainActor + @Test + func callbackIgnoresEventAfterDeinit() async throws { + weak var weakRef: SinkEventHandler? + let callback: OnSinkEvent + let subject = SinkEventHandler(state: .ready) + weakRef = subject + callback = consume subject.makeOnSinkEventCallback() + + // should not invoke anything because event handler deinited + callback( /* immediatePerform */ { + Issue.record("should not perform") + }, /* deferredPerform */ { + Issue.record("should not enqueue") + }) + + #expect(weakRef == nil) + } + + @Test + func explicitEventHandlingDisabled() async throws { + let subject = SinkEventHandler(state: .ready) + + #expect(subject.state == .ready) + + subject.withEventHandlingSuspended { + #expect(subject.state == .busy) + + subject.withEventHandlingSuspended { + #expect(subject.state == .busy) + } + } + + #expect(subject.state == .ready) + } +} diff --git a/Workflow/Tests/TestUtilities.swift b/Workflow/Tests/TestUtilities.swift index 3bad6c55b..72998f5bc 100644 --- a/Workflow/Tests/TestUtilities.swift +++ b/Workflow/Tests/TestUtilities.swift @@ -106,3 +106,71 @@ extension Runtime { Runtime._bootstrapConfiguration = .init() } } + +// MARK: - WorkflowObserver + +final class TestObserver: WorkflowObserver { + var onSessionBegan: ((WorkflowSession) -> Void)? + var onSessionEnded: ((WorkflowSession) -> Void)? + /// (Workflow, State, Session) -> Void + var onDidMakeInitialState: ((Any, Any, WorkflowSession) -> Void)? + /// (Workflow, State, Session) -> ((Rendering) -> Void)? + var onWillRender: ((Any, Any, WorkflowSession) -> ((Any) -> Void)?)? + /// (Workflow [old], Workflow [new], State, Session) -> Void + var onDidChange: ((Any, Any, Any, WorkflowSession) -> Void)? + /// (Action, Workflow, Session) -> Void + var onDidReceiveAction: ((Any, Any, WorkflowSession) -> Void)? + /// (Action, Workflow, State, Session) -> ((State, Output?) -> Void)? + var onApplyAction: ((Any, Any, Any, WorkflowSession) -> ((Any, Any) -> Void)?)? + + func sessionDidBegin(_ session: WorkflowSession) { + onSessionBegan?(session) + } + + func sessionDidEnd(_ session: WorkflowSession) { + onSessionEnded?(session) + } + + func workflowDidMakeInitialState( + _ workflow: WorkflowType, + initialState: WorkflowType.State, + session: WorkflowSession + ) where WorkflowType: Workflow { + onDidMakeInitialState?(workflow, initialState, session) + } + + func workflowWillRender(_ workflow: WorkflowType, state: WorkflowType.State, session: WorkflowSession) -> ((WorkflowType.Rendering) -> Void)? where WorkflowType: Workflow { + onWillRender?(workflow, state, session) + } + + func workflowDidChange(from oldWorkflow: WorkflowType, to newWorkflow: WorkflowType, state: WorkflowType.State, session: WorkflowSession) where WorkflowType: Workflow { + onDidChange?(oldWorkflow, newWorkflow, state, session) + } + + func workflowDidReceiveAction(_ action: Action, workflow: Action.WorkflowType, session: WorkflowSession) { + onDidReceiveAction?(action, workflow, session) + } + + func workflowWillApplyAction(_ action: Action, workflow: Action.WorkflowType, state: Action.WorkflowType.State, session: WorkflowSession) -> ((Action.WorkflowType.State, Action.WorkflowType.Output?) -> Void)? { + onApplyAction?(action, workflow, state, session) + } +} + +// MARK: - Generic + +func drainMainQueueBySpinningRunLoop(timeoutSeconds: UInt = 1) { + var done = false + DispatchQueue.main.async { done = true } + + let deadline = ContinuousClock.now + .seconds(timeoutSeconds) + while !done, ContinuousClock.now < deadline { + // Turn one iteration at a time + RunLoop.main.run(until: .now) + } +} + +func drainMainQueue() async { + await withCheckedContinuation { done in + DispatchQueue.main.async { done.resume() } + } +} diff --git a/Workflow/Tests/WorkflowHostTests.swift b/Workflow/Tests/WorkflowHostTests.swift index 060246ad7..b3018defc 100644 --- a/Workflow/Tests/WorkflowHostTests.swift +++ b/Workflow/Tests/WorkflowHostTests.swift @@ -15,6 +15,7 @@ */ import ReactiveSwift +import Testing import XCTest @_spi(WorkflowRuntimeConfig) @testable import Workflow @@ -167,6 +168,122 @@ extension WorkflowHostTests { } } +// MARK: SinkEventHandler + +@MainActor +@Suite +struct WorkflowHost_SinkEventHandlerTests { + @Test + func correctStateAfterInit() { + let workflow = StateTransitioningWorkflow() + let host = WorkflowHost(workflow: workflow) + + #expect(host.sinkEventHandler.state == .ready) + } + + @Test + func enqueuesEventsDuringUpdate() async throws { + let observer = TestObserver() + + var receivedActionCount = 0 + observer.onDidReceiveAction = { _, _, _ in + receivedActionCount += 1 + } + + let host = WorkflowHost( + workflow: StateTransitioningWorkflow(), + observers: [observer] + ) + + let rendering = host.rendering.value + + let eventHandler = host.sinkEventHandler + #expect(eventHandler.state == .ready) + + var handlerStatesDuringUpdate: [SinkEventHandler.State] = [] + observer.onDidChange = { _, _, _, _ in + handlerStatesDuringUpdate.append(eventHandler.state) + } + + var handlerStatesDuringRender: [SinkEventHandler.State] = [] + var emitOnce: (() -> Void)? = { rendering.toggle() } + observer.onWillRender = { _, _, _ in + handlerStatesDuringRender.append(eventHandler.state) + if let emit = emitOnce.take() { + // emit an event once – we expect it to be enqueued + emit() + } + return nil + } + + host.update(workflow: StateTransitioningWorkflow()) + + #expect(handlerStatesDuringUpdate == [.busy]) + #expect(handlerStatesDuringRender == [.busy]) + + // reentrant event should have been sent & queued + #expect(emitOnce == nil) + #expect(receivedActionCount == 0) + + await drainMainQueue() + + // reentrant event should have been handled + #expect(receivedActionCount == 1) + } + + @Test + func enqueuesEventsDuringEventHandling() async throws { + let observer = TestObserver() + + let host = WorkflowHost( + workflow: StateTransitioningWorkflow(), + observers: [observer] + ) + + let rendering = host.rendering.value + let eventHandler = host.sinkEventHandler + + var didEmit = false + let emitActionOnce = { + var emitToggle: (() -> Void)? = { rendering.toggle() } + return { + guard let emit = emitToggle.take() else { return } + didEmit = true + emit() + } + }() + + var receivedActionCount = 0 + var handlerStatesOnExternalAction: [SinkEventHandler.State] = [] + observer.onDidReceiveAction = { _, _, _ in + receivedActionCount += 1 + handlerStatesOnExternalAction.append(eventHandler.state) + } + + // emit a reentrant action during action + observer.onApplyAction = { _, _, _, _ in + emitActionOnce() + return nil + } + + #expect(eventHandler.state == .ready) + + // emit a 'normal' event, which the observer will + // see and emit a second one + rendering.toggle() + + #expect(didEmit == true) + #expect(receivedActionCount == 1) // only the first processed so far + #expect(handlerStatesOnExternalAction == [.busy]) + + await drainMainQueue() + + // reentrant event should have been handled + #expect(receivedActionCount == 2) + #expect(handlerStatesOnExternalAction == [.busy, .busy]) + } +} + // MARK: Utility Types extension WorkflowHost_EventEmissionTests { @@ -265,13 +382,3 @@ extension WorkflowHost_EventEmissionTests { } } } - -private func drainMainQueueBySpinningRunLoop(timeoutSeconds: UInt = 1) { - var done = false - DispatchQueue.main.async { done = true } - - let deadline = ContinuousClock.now + .seconds(timeoutSeconds) - while !done, ContinuousClock.now < deadline { - RunLoop.current.run(until: .now.addingTimeInterval(0.01)) - } -} diff --git a/Workflow/Tests/WorkflowObserverTests.swift b/Workflow/Tests/WorkflowObserverTests.swift index b03765b48..c6f210fe0 100644 --- a/Workflow/Tests/WorkflowObserverTests.swift +++ b/Workflow/Tests/WorkflowObserverTests.swift @@ -609,53 +609,6 @@ extension WorkflowObserverTests { // MARK: - Utilities -private final class TestObserver: WorkflowObserver { - var onSessionBegan: ((WorkflowSession) -> Void)? - var onSessionEnded: ((WorkflowSession) -> Void)? - /// (Workflow, State, Session) -> Void - var onDidMakeInitialState: ((Any, Any, WorkflowSession) -> Void)? - /// (Workflow, State, Session) -> ((Rendering) -> Void)? - var onWillRender: ((Any, Any, WorkflowSession) -> ((Any) -> Void)?)? - /// (Workflow [old], Workflow [new], State, Session) -> Void - var onDidChange: ((Any, Any, Any, WorkflowSession) -> Void)? - /// (Action, Workflow, Session) -> Void - var onDidReceiveAction: ((Any, Any, WorkflowSession) -> Void)? - /// (Action, Workflow, State, Session) -> ((State, Output?) -> Void)? - var onApplyAction: ((Any, Any, Any, WorkflowSession) -> ((Any, Any) -> Void)?)? - - func sessionDidBegin(_ session: WorkflowSession) { - onSessionBegan?(session) - } - - func sessionDidEnd(_ session: WorkflowSession) { - onSessionEnded?(session) - } - - func workflowDidMakeInitialState( - _ workflow: WorkflowType, - initialState: WorkflowType.State, - session: WorkflowSession - ) where WorkflowType: Workflow { - onDidMakeInitialState?(workflow, initialState, session) - } - - func workflowWillRender(_ workflow: WorkflowType, state: WorkflowType.State, session: WorkflowSession) -> ((WorkflowType.Rendering) -> Void)? where WorkflowType: Workflow { - onWillRender?(workflow, state, session) - } - - func workflowDidChange(from oldWorkflow: WorkflowType, to newWorkflow: WorkflowType, state: WorkflowType.State, session: WorkflowSession) where WorkflowType: Workflow { - onDidChange?(oldWorkflow, newWorkflow, state, session) - } - - func workflowDidReceiveAction(_ action: Action, workflow: Action.WorkflowType, session: WorkflowSession) { - onDidReceiveAction?(action, workflow, session) - } - - func workflowWillApplyAction(_ action: Action, workflow: Action.WorkflowType, state: Action.WorkflowType.State, session: WorkflowSession) -> ((Action.WorkflowType.State, Action.WorkflowType.Output?) -> Void)? { - onApplyAction?(action, workflow, state, session) - } -} - private struct Child: Workflow { var prop: String From 1c58ab6ff160edff638e7bb6be8a8a73c08de122 Mon Sep 17 00:00:00 2001 From: Jamie Quadri Date: Thu, 25 Sep 2025 08:44:31 -0500 Subject: [PATCH 3/3] [refactor]: put new event handling behind a runtime config --- Workflow/Sources/RuntimeConfiguration.swift | 4 +++ Workflow/Sources/SubtreeManager.swift | 36 +++++++++++++++++++-- Workflow/Sources/WorkflowHost.swift | 23 ++++++++----- Workflow/Tests/SinkEventHandlerTests.swift | 3 +- Workflow/Tests/WorkflowHostTests.swift | 32 ++++++++++++------ 5 files changed, 77 insertions(+), 21 deletions(-) diff --git a/Workflow/Sources/RuntimeConfiguration.swift b/Workflow/Sources/RuntimeConfiguration.swift index 2e54d0306..217f48dc6 100644 --- a/Workflow/Sources/RuntimeConfiguration.swift +++ b/Workflow/Sources/RuntimeConfiguration.swift @@ -88,6 +88,10 @@ extension Runtime { /// Note: this doesn't control anything yet, but is here as a placeholder public var renderOnlyIfStateChanged: Bool = false + + /// Whether action handling should be delegated to the `SinkEventHandler` type. + /// This is expected to eventually be removed and become the default behavior. + public var useSinkEventHandler: Bool = false } struct BootstrappableConfiguration { diff --git a/Workflow/Sources/SubtreeManager.swift b/Workflow/Sources/SubtreeManager.swift index fb87f3bc0..cf27e62af 100644 --- a/Workflow/Sources/SubtreeManager.swift +++ b/Workflow/Sources/SubtreeManager.swift @@ -325,7 +325,7 @@ extension WorkflowNode.SubtreeManager { mutating func findOrCreate( actionType: Action.Type, - onSinkEvent: @escaping OnSinkEvent + onSinkEvent: OnSinkEvent? ) -> ReusableSink { let key = ObjectIdentifier(actionType) @@ -352,10 +352,10 @@ extension WorkflowNode.SubtreeManager { /// Type-erased base class for reusable sinks. fileprivate class AnyReusableSink { /// The callback to invoke when an event is to be handled. - let onSinkEvent: OnSinkEvent + let onSinkEvent: OnSinkEvent? var eventPipe: EventPipe - init(onSinkEvent: @escaping OnSinkEvent) { + init(onSinkEvent: OnSinkEvent?) { self.onSinkEvent = onSinkEvent self.eventPipe = EventPipe() } @@ -363,6 +363,36 @@ extension WorkflowNode.SubtreeManager { fileprivate final class ReusableSink: AnyReusableSink where Action.WorkflowType == WorkflowType { func handle(action: Action) { + if let onSinkEvent { + handleWithSinkEventHandler(action: action, onSinkEvent: onSinkEvent) + return + } + + // Prior logic + let output = Output.update( + action, + source: .external, + subtreeInvalidated: false // initial state + ) + + if case .pending = eventPipe.validationState { + // Workflow is currently processing an `event`. + // Scheduling it to be processed after. + DispatchQueue.workflowExecution.async { [weak self] in + self?.eventPipe.handle(event: output) + } + return + } + eventPipe.handle(event: output) + } + + private func handleWithSinkEventHandler( + action: Action, + onSinkEvent: OnSinkEvent + ) { + // new `SinkEventHandler` logic + dispatchPrecondition(condition: .onQueue(DispatchQueue.workflowExecution)) + // If we can process now, forward through the `EventPipe` let immediatePerform: () -> Void = { let output = Output.update( diff --git a/Workflow/Sources/WorkflowHost.swift b/Workflow/Sources/WorkflowHost.swift index 2b1288a9b..329cd1573 100644 --- a/Workflow/Sources/WorkflowHost.swift +++ b/Workflow/Sources/WorkflowHost.swift @@ -72,11 +72,14 @@ public final class WorkflowHost { .workflowObservers(for: observers) .chained() + let config = Runtime.configuration + let sinkEventCallback = config.useSinkEventHandler ? sinkEventHandler.makeOnSinkEventCallback() : nil + self.context = HostContext( observer: observer, debugger: debugger, - runtimeConfig: Runtime.configuration, - onSinkEvent: sinkEventHandler.makeOnSinkEventCallback() + runtimeConfig: config, + onSinkEvent: sinkEventCallback ) self.rootNode = WorkflowNode( @@ -98,12 +101,16 @@ public final class WorkflowHost { /// Update the input for the workflow. Will cause a render pass. public func update(workflow: WorkflowType) { - sinkEventHandler.withEventHandlingSuspended { - _update(workflow: workflow) + if context.runtimeConfig.useSinkEventHandler { + sinkEventHandler.withEventHandlingSuspended { + updateRootNode(workflow: workflow) + } + } else { + updateRootNode(workflow: workflow) } } - private func _update(workflow: WorkflowType) { + private func updateRootNode(workflow: WorkflowType) { rootNode.update(workflow: workflow) // Treat the update as an "output" from the workflow originating from an external event to force a render pass. @@ -171,14 +178,14 @@ struct HostContext { let debugger: WorkflowDebugger? let runtimeConfig: Runtime.Configuration - /// Event handler to be plumbed through the runtime down to the Sinks - let onSinkEvent: OnSinkEvent + /// Event handler to be plumbed through the runtime down to the (reusable) Sinks. + let onSinkEvent: OnSinkEvent? init( observer: WorkflowObserver?, debugger: WorkflowDebugger?, runtimeConfig: Runtime.Configuration, - onSinkEvent: @escaping OnSinkEvent + onSinkEvent: OnSinkEvent? ) { self.observer = observer self.debugger = debugger diff --git a/Workflow/Tests/SinkEventHandlerTests.swift b/Workflow/Tests/SinkEventHandlerTests.swift index 7ee6c90a0..68107e4f7 100644 --- a/Workflow/Tests/SinkEventHandlerTests.swift +++ b/Workflow/Tests/SinkEventHandlerTests.swift @@ -118,7 +118,8 @@ struct SinkEventHandlerTests { let callback: OnSinkEvent let subject = SinkEventHandler(state: .ready) weakRef = subject - callback = consume subject.makeOnSinkEventCallback() + callback = subject.makeOnSinkEventCallback() + _ = consume subject // should not invoke anything because event handler deinited callback( /* immediatePerform */ { diff --git a/Workflow/Tests/WorkflowHostTests.swift b/Workflow/Tests/WorkflowHostTests.swift index b3018defc..e324f85c5 100644 --- a/Workflow/Tests/WorkflowHostTests.swift +++ b/Workflow/Tests/WorkflowHostTests.swift @@ -101,7 +101,13 @@ final class WorkflowHost_EventEmissionTests: XCTestCase { } func test_reentrant_event_during_render() { - let host = WorkflowHost(workflow: ReentrancyWorkflow()) + let host = Runtime.withConfiguration { cfg in + // Test will only pass with the 'SinkEventHandler' enabled + cfg.useSinkEventHandler = true + } operation: { + WorkflowHost(workflow: ReentrancyWorkflow()) + } + let (lifetime, token) = ReactiveSwift.Lifetime.make() defer { _ = token } let initialRendering = host.rendering.value @@ -190,10 +196,14 @@ struct WorkflowHost_SinkEventHandlerTests { receivedActionCount += 1 } - let host = WorkflowHost( - workflow: StateTransitioningWorkflow(), - observers: [observer] - ) + let host = Runtime.withConfiguration { cfg in + cfg.useSinkEventHandler = true + } operation: { + WorkflowHost( + workflow: StateTransitioningWorkflow(), + observers: [observer] + ) + } let rendering = host.rendering.value @@ -235,10 +245,14 @@ struct WorkflowHost_SinkEventHandlerTests { func enqueuesEventsDuringEventHandling() async throws { let observer = TestObserver() - let host = WorkflowHost( - workflow: StateTransitioningWorkflow(), - observers: [observer] - ) + let host = Runtime.withConfiguration { cfg in + cfg.useSinkEventHandler = true + } operation: { + WorkflowHost( + workflow: StateTransitioningWorkflow(), + observers: [observer] + ) + } let rendering = host.rendering.value let eventHandler = host.sinkEventHandler