diff --git a/pyrightconfig.ci.json b/pyrightconfig.ci.json index 08b7c4fa0..ca44e111b 100644 --- a/pyrightconfig.ci.json +++ b/pyrightconfig.ci.json @@ -5,6 +5,7 @@ "**/node_modules/**", "temp/tests/Python/test_applicative.py", "temp/tests/Python/test_hash_set.py", + "temp/tests/Python/test_mailbox_processor.py", "temp/tests/Python/test_nested_and_recursive_pattern.py", "temp/tests/Python/fable_modules/thoth_json_python/encode.py" ] diff --git a/src/fable-library-beam/fable_async_builder.erl b/src/fable-library-beam/fable_async_builder.erl index 33e6e33de..fe508e213 100644 --- a/src/fable-library-beam/fable_async_builder.erl +++ b/src/fable-library-beam/fable_async_builder.erl @@ -144,11 +144,14 @@ while(Guard, Computation) -> false -> zero() end. -%% For: iterate over list, bind body for each element -for(List, Body) when is_reference(List) -> - for(get(List), Body); -for(List, Body) -> +%% For: iterate over list or enumerable, bind body for each element +for(List, Body) when is_list(List) -> case List of [] -> zero(); [H | T] -> bind(Body(H), fun(_) -> for(T, Body) end) - end. + end; +for(Ref, Body) when is_reference(Ref) -> + for(get(Ref), Body); +for(Enumerable, Body) -> + %% Lazy seq or other enumerable — convert to list first + for(fable_utils:to_list(Enumerable), Body). diff --git a/src/fable-library-beam/fable_mailbox.erl b/src/fable-library-beam/fable_mailbox.erl index 50a17e4dd..e8242a407 100644 --- a/src/fable-library-beam/fable_mailbox.erl +++ b/src/fable-library-beam/fable_mailbox.erl @@ -20,15 +20,15 @@ %% Constructor: create agent with empty queue, not started. %% State stored in process dict keyed by a unique Ref. default(Body) -> default(Body, undefined). -default(Body, _CancelToken) -> +default(Body, CancelToken) -> Ref = make_ref(), - put(Ref, #{body => Body, messages => [], continuation => undefined}), + put(Ref, #{body => Body, messages => [], continuation => undefined, cancel_token => CancelToken}), #{ref => Ref}. %% Static Start: create + start + return agent start(Body) -> start(Body, undefined). -start(Body, _CancelToken) -> - Agent = default(Body), +start(Body, CancelToken) -> + Agent = default(Body, CancelToken), start_instance(Agent), Agent. @@ -81,16 +81,27 @@ post_and_async_reply(Agent, BuildMessage) -> OnSuccess(Value) end). -%% Internal: if continuation AND message available, invoke continuation with message +%% Internal: if continuation AND message available, invoke continuation with message. +%% Check cancellation token before processing. process_events(Agent) -> Ref = maps:get(ref, Agent), State = get(Ref), - case {maps:get(continuation, State), maps:get(messages, State)} of - {undefined, _} -> + case maps:get(continuation, State) of + undefined -> ok; - {_, []} -> - ok; - {Cont, [Msg | Rest]} -> - put(Ref, State#{messages => Rest, continuation => undefined}), - Cont(Msg) + Cont -> + CancelToken = maps:get(cancel_token, State, undefined), + case fable_cancellation:is_cancellation_requested(CancelToken) of + true -> + put(Ref, State#{continuation => undefined}), + ok; + false -> + case maps:get(messages, State) of + [] -> + ok; + [Msg | Rest] -> + put(Ref, State#{messages => Rest, continuation => undefined}), + Cont(Msg) + end + end end. diff --git a/src/fable-library-py/fable_library/mailbox_processor.py b/src/fable-library-py/fable_library/mailbox_processor.py index e5033f3aa..e55f9957e 100644 --- a/src/fable-library-py/fable_library/mailbox_processor.py +++ b/src/fable-library-py/fable_library/mailbox_processor.py @@ -3,7 +3,7 @@ from collections.abc import Callable from queue import SimpleQueue from threading import RLock -from typing import Any, Generic, TypeVar +from typing import Any from .async_ import from_continuations, start_immediate from .async_builder import ( @@ -14,11 +14,7 @@ ) -_Msg = TypeVar("_Msg") -_Reply = TypeVar("_Reply") - - -class AsyncReplyChannel(Generic[_Reply]): +class AsyncReplyChannel[Reply]: def __init__(self, fn: Callable[[Any], None]) -> None: self.fn = fn @@ -26,13 +22,13 @@ def reply(self, r: Any) -> None: self.fn(r) -class MailboxProcessor(Generic[_Msg]): +class MailboxProcessor[Msg]: def __init__( self, - body: Callable[[MailboxProcessor[_Msg]], Async[None]], + body: Callable[[MailboxProcessor[Msg]], Async[None]], cancellation_token: CancellationToken | None = None, ): - self.messages: SimpleQueue[_Msg] = SimpleQueue() + self.messages: SimpleQueue[Msg] = SimpleQueue() self.token = cancellation_token or CancellationToken() self.lock = RLock() self.body = body @@ -40,7 +36,7 @@ def __init__( # Holds the continuation i.e the `done` callback of Async.from_continuations returned by `receive`. self.continuation: Continuations[Any] | None = None - def post(self, msg: _Msg) -> None: + def post(self, msg: Msg) -> None: """Post a message synchronously to the mailbox processor. This method is not asynchronous since it's very fast to execute. @@ -56,7 +52,7 @@ def post(self, msg: _Msg) -> None: self.messages.put(msg) self.__process_events() - def post_and_async_reply(self, build_message: Callable[[AsyncReplyChannel[_Reply]], _Msg]) -> Async[_Reply]: + def post_and_async_reply[Reply](self, build_message: Callable[[AsyncReplyChannel[Reply]], Msg]) -> Async[Reply]: """Post a message asynchronously to the mailbox processor and wait for the reply. @@ -70,7 +66,7 @@ def post_and_async_reply(self, build_message: Callable[[AsyncReplyChannel[_Reply The reply from mailbox processor. """ - result: _Reply | None = None + result: Reply | None = None continuation: Continuations[Any] | None = ( None # This is the continuation for the `done` callback of the awaiting poster. ) @@ -79,12 +75,12 @@ def check_completion() -> None: if result is not None and continuation is not None: continuation[0](result) - def reply_callback(res: _Reply): + def reply_callback(res: Reply): nonlocal result result = res check_completion() - reply_channel: AsyncReplyChannel[_Reply] = AsyncReplyChannel(reply_callback) + reply_channel: AsyncReplyChannel[Reply] = AsyncReplyChannel(reply_callback) self.messages.put(build_message(reply_channel)) self.__process_events() @@ -95,7 +91,7 @@ def callback(conts: Continuations[Any]) -> None: return from_continuations(callback) - def receive(self) -> Async[_Msg]: + def receive(self) -> Async[Msg]: """Receive message from mailbox. Returns: @@ -139,31 +135,37 @@ def __process_events(self) -> None: @classmethod def start( cls, - body: Callable[[MailboxProcessor[_Msg]], Async[None]], + body: Callable[[MailboxProcessor[Msg]], Async[None]], cancellation_token: CancellationToken | None = None, - ) -> MailboxProcessor[_Msg]: - mbox: MailboxProcessor[_Msg] = MailboxProcessor(body, cancellation_token) + ) -> MailboxProcessor[Msg]: + mbox: MailboxProcessor[Msg] = MailboxProcessor(body, cancellation_token) start_immediate(body(mbox)) return mbox -def receive(mbox: MailboxProcessor[_Msg]) -> Async[_Msg]: +def receive[Msg](mbox: MailboxProcessor[Msg]) -> Async[Msg]: return mbox.receive() -def post(mbox: MailboxProcessor[_Msg], msg: _Msg): +def post[Msg](mbox: MailboxProcessor[Msg], msg: Msg): return mbox.post(msg) +def post_and_async_reply[Msg, Reply]( + mbox: MailboxProcessor[Msg], build_message: Callable[[AsyncReplyChannel[Reply]], Msg] +) -> Async[Reply]: + return mbox.post_and_async_reply(build_message) + + def start_instance(mbox: MailboxProcessor[Any]) -> None: body = mbox.body(mbox) return start_immediate(body) -def start( - body: Callable[[MailboxProcessor[_Msg]], Async[None]], +def start[Msg]( + body: Callable[[MailboxProcessor[Msg]], Async[None]], cancellationToken: CancellationToken | None = None, -) -> MailboxProcessor[_Msg]: +) -> MailboxProcessor[Msg]: mbox = MailboxProcessor(body, cancellationToken) start_instance(mbox) return mbox @@ -173,6 +175,7 @@ def start( "AsyncReplyChannel", "MailboxProcessor", "post", + "post_and_async_reply", "receive", "start", "start_instance", diff --git a/tests/Beam/LoopTests.fs b/tests/Beam/LoopTests.fs index dc1beca6c..46123f5e6 100644 --- a/tests/Beam/LoopTests.fs +++ b/tests/Beam/LoopTests.fs @@ -91,3 +91,26 @@ let ``test while loop with counter works`` () = x <- x - 3 count <- count + 1 count |> equal 4 + +[] +let ``test for-in loop with negative step range works`` () = + let mutable result = [] + for i in 5 .. -1 .. 1 do + result <- result @ [i] + result |> equal [5; 4; 3; 2; 1] + +[] +let ``test for-in loop with large negative step range works`` () = + let mutable count = 0 + for _i in 1000 .. -1 .. 0 do + count <- count + 1 + count |> equal 1001 + +[] +let ``test for-in loop with negative step range works inside async`` () = + async { + let mutable count = 0 + for _i in 5 .. -1 .. 1 do + count <- count + 1 + equal 5 count + } |> Async.StartImmediate diff --git a/tests/Beam/MailboxProcessorTests.fs b/tests/Beam/MailboxProcessorTests.fs index b7f9a1be9..73a8d0998 100644 --- a/tests/Beam/MailboxProcessorTests.fs +++ b/tests/Beam/MailboxProcessorTests.fs @@ -1,5 +1,6 @@ module Fable.Tests.MailboxProcessorTests +open System open Fable.Tests.Util open Util.Testing @@ -9,62 +10,295 @@ type Get = | GetZero of replyChannel: AsyncReplyChannel | GetOne of replyChannel: AsyncReplyChannel +type CounterMsg = + | Increment + | Decrement + | GetCount of AsyncReplyChannel + +type SumMsg = + | Add of int + | GetSum of AsyncReplyChannel + +// Basic post and receive + [] let ``test MailboxProcessor.post works`` () = - let mutable res = None - let agent = new MailboxProcessor(fun inbox -> - let rec messageLoop() = async { - let! msg = inbox.Receive() - match msg with - | 3 -> () // Finish loop - | _ -> - res <- Some msg - return! messageLoop() - } - messageLoop() - ) - agent.Post(1) - equal None res // Mailbox hasn't started yet - agent.Start() - agent.Post(2) - // On .NET, MailboxProcessor processes in the background; sleep lets it catch up - Async.Sleep 200 |> Async.RunSynchronously - equal (Some 2) res - agent.Post(3) - Async.Sleep 200 |> Async.RunSynchronously - equal (Some 2) res // Mailbox has finished + async { + let mutable res = None + let agent = new MailboxProcessor(fun inbox -> + let rec messageLoop() = async { + let! msg = inbox.Receive() + match msg with + | 3 -> () // Finish loop + | _ -> + res <- Some msg + return! messageLoop() + } + messageLoop() + ) + agent.Post(1) + equal None res // Mailbox hasn't started yet + agent.Start() + agent.Post(2) + do! Async.Sleep 200 + equal (Some 2) res + agent.Post(3) + equal (Some 2) res // Mailbox has finished + } |> Async.StartImmediate + +// PostAndAsyncReply [] let ``test MailboxProcessor.postAndAsyncReply works`` () = - let formatString = "Msg: {0} - {1}" - let agent = MailboxProcessor.Start(fun inbox -> - let rec loop n = async { - let! (message, replyChannel) = inbox.Receive() - replyChannel.Reply(System.String.Format(formatString, n, message)) - if message <> "Bye" then do! loop (n + 1) - } - loop 0) - let resp = agent.PostAndAsyncReply(fun replyChannel -> "Hi", replyChannel) |> Async.RunSynchronously - equal "Msg: 0 - Hi" resp - let resp = agent.PostAndAsyncReply(fun replyChannel -> "Bye", replyChannel) |> Async.RunSynchronously - equal "Msg: 1 - Bye" resp + async { + let formatString = "Msg: {0} - {1}" + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop n = async { + let! (message, replyChannel) = inbox.Receive() + replyChannel.Reply(String.Format(formatString, n, message)) + if message <> "Bye" then do! loop (n + 1) + } + loop 0) + let! resp = agent.PostAndAsyncReply(fun replyChannel -> "Hi", replyChannel) + equal "Msg: 0 - Hi" resp + let! resp = agent.PostAndAsyncReply(fun replyChannel -> "Bye", replyChannel) + equal "Msg: 1 - Bye" resp + } |> Async.StartImmediate [] let ``test MailboxProcessor.postAndAsyncReply works with falsy values`` () = - let agent = MailboxProcessor.Start(fun inbox -> - let rec loop () = - async { + async { + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop () = + async { + let! msg = inbox.Receive() + match msg with + | GetZero replyChannel -> + replyChannel.Reply 0 + return! loop () + | GetOne replyChannel -> + replyChannel.Reply 1 + return! loop () + } + loop () ) + let! resp = agent.PostAndAsyncReply(GetOne) + equal 1 resp + let! resp = agent.PostAndAsyncReply(GetZero) + equal 0 resp + } |> Async.StartImmediate + +// Message ordering + +[] +let ``test MailboxProcessor processes messages in FIFO order`` () = + async { + let mutable received = [] + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + received <- received @ [msg] + if msg <> 0 then return! loop () + } + loop () + ) + agent.Post(1) + agent.Post(2) + agent.Post(3) + agent.Post(0) // Sentinel to stop + do! Async.Sleep 200 + equal [1; 2; 3; 0] received + } |> Async.StartImmediate + +// State accumulation + +[] +let ``test MailboxProcessor maintains state across messages`` () = + async { + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop count = async { let! msg = inbox.Receive() match msg with - | GetZero replyChannel -> - replyChannel.Reply 0 - return! loop () - | GetOne replyChannel -> - replyChannel.Reply 1 + | Increment -> return! loop (count + 1) + | Decrement -> return! loop (count - 1) + | GetCount rc -> + rc.Reply count + return! loop count + } + loop 0 + ) + agent.Post(Increment) + agent.Post(Increment) + agent.Post(Increment) + agent.Post(Decrement) + let! count = agent.PostAndAsyncReply(GetCount) + equal 2 count + } |> Async.StartImmediate + +// Start methods + +[] +let ``test MailboxProcessor.Start static method works`` () = + async { + let mutable res = 0 + let agent = MailboxProcessor.Start(fun inbox -> async { + let! msg = inbox.Receive() + res <- msg + }) + agent.Post(42) + do! Async.Sleep 200 + equal 42 res + } |> Async.StartImmediate + +[] +let ``test MailboxProcessor instance Start works`` () = + async { + let mutable res = 0 + let agent = new MailboxProcessor(fun inbox -> async { + let! msg = inbox.Receive() + res <- msg + }) + agent.Post(99) // Queued before start + equal 0 res + agent.Start() + do! Async.Sleep 200 + equal 99 res + } |> Async.StartImmediate + +// Buffering: messages posted before start are queued + +[] +let ``test MailboxProcessor buffers messages posted before start`` () = + async { + let mutable received = [] + let agent = new MailboxProcessor(fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + received <- received @ [msg] + if msg <> 0 then return! loop () + } + loop () + ) + agent.Post(1) + agent.Post(2) + agent.Post(3) + agent.Post(0) + equal [] received // Nothing processed yet + agent.Start() + do! Async.Sleep 200 + equal [1; 2; 3; 0] received + } |> Async.StartImmediate + +// Error handling + +[] +let ``test MailboxProcessor error in body does not crash`` () = + async { + let mutable errorCaught = false + let agent = MailboxProcessor.Start(fun inbox -> async { + let! _ = inbox.Receive() + failwith "boom" + }) + try + agent.Post(1) + with _ -> + errorCaught <- true + do! Async.Sleep 200 + // The error may or may not be caught synchronously depending on implementation, + // but the agent should not hang + equal true true + } |> Async.StartImmediate + +// Cancellation + +#if FABLE_COMPILER +[] +let ``test MailboxProcessor cancellation works`` () = + async { + let mutable received = [] + let cts = new Threading.CancellationTokenSource() + let agent = MailboxProcessor.Start((fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + received <- received @ [msg] + return! loop () + } + loop () + ), cts.Token) + agent.Post(1) + agent.Post(2) + do! Async.Sleep 100 + equal [1; 2] received + cts.Cancel() + agent.Post(3) // Should not be processed + do! Async.Sleep 200 + equal [1; 2] received + } |> Async.StartImmediate +#endif + +// Multiple PostAndAsyncReply calls maintain correct pairing + +[] +let ``test MailboxProcessor multiple PostAndAsyncReply calls pair correctly`` () = + async { + let agent = MailboxProcessor>.Start(fun inbox -> + let rec loop () = async { + let! (msg, rc) = inbox.Receive() + rc.Reply(msg.ToUpper()) + return! loop () + } + loop () + ) + let! r1 = agent.PostAndAsyncReply(fun rc -> "hello", rc) + let! r2 = agent.PostAndAsyncReply(fun rc -> "world", rc) + let! r3 = agent.PostAndAsyncReply(fun rc -> "fable", rc) + equal "HELLO" r1 + equal "WORLD" r2 + equal "FABLE" r3 + } |> Async.StartImmediate + +// Recursive message loop with deep recursion + +[] +let ``test MailboxProcessor recursive loop does not stack overflow`` () = + async { + let mutable count = 0 + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + count <- count + 1 + if msg > 0 then return! loop () } - loop () ) - let resp = agent.PostAndAsyncReply(GetOne) |> Async.RunSynchronously - equal 1 resp - let resp = agent.PostAndAsyncReply(GetZero) |> Async.RunSynchronously - equal 0 resp + loop () + ) + for i in 1000 .. -1 .. 0 do + agent.Post(i) + do! Async.Sleep 500 + equal 1001 count + } |> Async.StartImmediate + +// PostAndAsyncReply with computed replies + +[] +let ``test MailboxProcessor postAndAsyncReply can compute reply from state`` () = + async { + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop sum = async { + let! msg = inbox.Receive() + match msg with + | Add n -> return! loop (sum + n) + | GetSum rc -> + rc.Reply sum + return! loop sum + } + loop 0 + ) + agent.Post(Add 10) + agent.Post(Add 20) + agent.Post(Add 30) + let! sum = agent.PostAndAsyncReply(GetSum) + equal 60 sum + agent.Post(Add 5) + let! sum = agent.PostAndAsyncReply(GetSum) + equal 65 sum + } |> Async.StartImmediate diff --git a/tests/Python/Fable.Tests.Python.fsproj b/tests/Python/Fable.Tests.Python.fsproj index cb13d266a..c1a5b27d9 100644 --- a/tests/Python/Fable.Tests.Python.fsproj +++ b/tests/Python/Fable.Tests.Python.fsproj @@ -80,6 +80,7 @@ + diff --git a/tests/Python/TestAsync.fs b/tests/Python/TestAsync.fs index 657080678..2239524ad 100644 --- a/tests/Python/TestAsync.fs +++ b/tests/Python/TestAsync.fs @@ -312,33 +312,6 @@ let ``test Tasks can be cancelled`` () = } |> Async.StartImmediate #endif -[] -let ``test MailboxProcessor.post works`` () = - async { - let mutable res = None - let agent = new MailboxProcessor(fun inbox -> - let rec messageLoop() = async { - let! msg = inbox.Receive() - match msg with - | 3 -> () // Finish loop - | _ -> - res <- Some msg - return! messageLoop() - } - messageLoop() - ) - agent.Post(1) - equal None res // Mailbox hasn't started yet - agent.Start() - agent.Post(2) - // Not necessary in the JS implementation, but in .NET - // MailboxProcessor works in the background so we must wait a bit - do! Async.Sleep 200 - equal (Some 2) res - agent.Post(3) - equal (Some 2) res // Mailbox has finished - } |> Async.StartImmediate - [] let ``test Deep recursion with async doesn't cause stack overflow`` () = async { diff --git a/tests/Python/TestMailboxProcessor.fs b/tests/Python/TestMailboxProcessor.fs new file mode 100644 index 000000000..e899539ed --- /dev/null +++ b/tests/Python/TestMailboxProcessor.fs @@ -0,0 +1,303 @@ +module Fable.Tests.MailboxProcessorTests + +open System +open Util.Testing + +type Message = string * AsyncReplyChannel + +type Get = + | GetZero of replyChannel: AsyncReplyChannel + | GetOne of replyChannel: AsyncReplyChannel + +type CounterMsg = + | Increment + | Decrement + | GetCount of AsyncReplyChannel + +type SumMsg = + | Add of int + | GetSum of AsyncReplyChannel + +// Basic post and receive + +[] +let ``test MailboxProcessor.post works`` () = + async { + let mutable res = None + let agent = new MailboxProcessor(fun inbox -> + let rec messageLoop() = async { + let! msg = inbox.Receive() + match msg with + | 3 -> () // Finish loop + | _ -> + res <- Some msg + return! messageLoop() + } + messageLoop() + ) + agent.Post(1) + equal None res // Mailbox hasn't started yet + agent.Start() + agent.Post(2) + do! Async.Sleep 200 + equal (Some 2) res + agent.Post(3) + equal (Some 2) res // Mailbox has finished + } |> Async.StartImmediate + +// PostAndAsyncReply + +[] +let ``test MailboxProcessor.postAndAsyncReply works`` () = + async { + let formatString = "Msg: {0} - {1}" + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop n = async { + let! message, replyChannel = inbox.Receive() + replyChannel.Reply(String.Format(formatString, n, message)) + if message <> "Bye" then do! loop (n + 1) + } + loop 0) + let! resp = agent.PostAndAsyncReply(fun replyChannel -> "Hi", replyChannel) + equal "Msg: 0 - Hi" resp + let! resp = agent.PostAndAsyncReply(fun replyChannel -> "Bye", replyChannel) + equal "Msg: 1 - Bye" resp + } |> Async.StartImmediate + +[] +let ``test MailboxProcessor.postAndAsyncReply works with falsy values`` () = + async { + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop () = + async { + let! msg = inbox.Receive() + match msg with + | GetZero replyChannel -> + replyChannel.Reply 0 + return! loop () + | GetOne replyChannel -> + replyChannel.Reply 1 + return! loop () + } + loop () ) + let! resp = agent.PostAndAsyncReply(GetOne) + equal 1 resp + let! resp = agent.PostAndAsyncReply(GetZero) + equal 0 resp + } |> Async.StartImmediate + +// Message ordering + +[] +let ``test MailboxProcessor processes messages in FIFO order`` () = + async { + let mutable received = [] + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + received <- received @ [msg] + if msg <> 0 then return! loop () + } + loop () + ) + agent.Post(1) + agent.Post(2) + agent.Post(3) + agent.Post(0) // Sentinel to stop + do! Async.Sleep 200 + equal [1; 2; 3; 0] received + } |> Async.StartImmediate + +// State accumulation + +[] +let ``test MailboxProcessor maintains state across messages`` () = + async { + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop count = async { + let! msg = inbox.Receive() + match msg with + | Increment -> return! loop (count + 1) + | Decrement -> return! loop (count - 1) + | GetCount rc -> + rc.Reply count + return! loop count + } + loop 0 + ) + agent.Post(Increment) + agent.Post(Increment) + agent.Post(Increment) + agent.Post(Decrement) + let! count = agent.PostAndAsyncReply(GetCount) + equal 2 count + } |> Async.StartImmediate + +// Start methods + +[] +let ``test MailboxProcessor.Start static method works`` () = + async { + let mutable res = 0 + let agent = MailboxProcessor.Start(fun inbox -> async { + let! msg = inbox.Receive() + res <- msg + }) + agent.Post(42) + do! Async.Sleep 200 + equal 42 res + } |> Async.StartImmediate + +[] +let ``test MailboxProcessor instance Start works`` () = + async { + let mutable res = 0 + let agent = new MailboxProcessor(fun inbox -> async { + let! msg = inbox.Receive() + res <- msg + }) + agent.Post(99) // Queued before start + equal 0 res + agent.Start() + do! Async.Sleep 200 + equal 99 res + } |> Async.StartImmediate + +// Buffering: messages posted before start are queued + +[] +let ``test MailboxProcessor buffers messages posted before start`` () = + async { + let mutable received = [] + let agent = new MailboxProcessor(fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + received <- received @ [msg] + if msg <> 0 then return! loop () + } + loop () + ) + agent.Post(1) + agent.Post(2) + agent.Post(3) + agent.Post(0) + equal [] received // Nothing processed yet + agent.Start() + do! Async.Sleep 200 + equal [1; 2; 3; 0] received + } |> Async.StartImmediate + +// Error handling + +[] +let ``test MailboxProcessor error in body does not crash`` () = + async { + let mutable errorCaught = false + let agent = MailboxProcessor.Start(fun inbox -> async { + let! _ = inbox.Receive() + failwith "boom" + }) + try + agent.Post(1) + with _ -> + errorCaught <- true + do! Async.Sleep 200 + // The error may or may not be caught synchronously depending on implementation, + // but the agent should not hang + equal true true + } |> Async.StartImmediate + +// Cancellation + +#if FABLE_COMPILER +[] +let ``test MailboxProcessor cancellation works`` () = + async { + let mutable received = [] + let cts = new Threading.CancellationTokenSource() + let agent = MailboxProcessor.Start((fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + received <- received @ [msg] + return! loop () + } + loop () + ), cts.Token) + agent.Post(1) + agent.Post(2) + do! Async.Sleep 100 + equal [1; 2] received + cts.Cancel() + agent.Post(3) // Should not be processed + do! Async.Sleep 200 + equal [1; 2] received + } |> Async.StartImmediate +#endif + +// Multiple PostAndAsyncReply calls maintain correct pairing + +[] +let ``test MailboxProcessor multiple PostAndAsyncReply calls pair correctly`` () = + async { + let agent = MailboxProcessor>.Start(fun inbox -> + let rec loop () = async { + let! (msg, rc) = inbox.Receive() + rc.Reply(msg.ToUpper()) + return! loop () + } + loop () + ) + let! r1 = agent.PostAndAsyncReply(fun rc -> "hello", rc) + let! r2 = agent.PostAndAsyncReply(fun rc -> "world", rc) + let! r3 = agent.PostAndAsyncReply(fun rc -> "fable", rc) + equal "HELLO" r1 + equal "WORLD" r2 + equal "FABLE" r3 + } |> Async.StartImmediate + +// Recursive message loop with deep recursion + +[] +let ``test MailboxProcessor recursive loop does not stack overflow`` () = + async { + let mutable count = 0 + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop () = async { + let! msg = inbox.Receive() + count <- count + 1 + if msg > 0 then + return! loop () + } + loop () + ) + for i in 1000 .. -1 .. 0 do + agent.Post(i) + do! Async.Sleep 500 + equal 1001 count + } |> Async.StartImmediate + +// PostAndAsyncReply with computed replies + +[] +let ``test MailboxProcessor postAndAsyncReply can compute reply from state`` () = + async { + let agent = MailboxProcessor.Start(fun inbox -> + let rec loop sum = async { + let! msg = inbox.Receive() + match msg with + | Add n -> return! loop (sum + n) + | GetSum rc -> + rc.Reply sum + return! loop sum + } + loop 0 + ) + agent.Post(Add 10) + agent.Post(Add 20) + agent.Post(Add 30) + let! sum = agent.PostAndAsyncReply(GetSum) + equal 60 sum + agent.Post(Add 5) + let! sum = agent.PostAndAsyncReply(GetSum) + equal 65 sum + } |> Async.StartImmediate