Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyrightconfig.ci.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"**/node_modules/**",
"temp/tests/Python/test_applicative.py",
"temp/tests/Python/test_hash_set.py",
"temp/tests/Python/test_mailbox_processor.py",
"temp/tests/Python/test_nested_and_recursive_pattern.py",
"temp/tests/Python/fable_modules/thoth_json_python/encode.py"
]
Expand Down
13 changes: 8 additions & 5 deletions src/fable-library-beam/fable_async_builder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,14 @@ while(Guard, Computation) ->
false -> zero()
end.

%% For: iterate over list, bind body for each element
for(List, Body) when is_reference(List) ->
for(get(List), Body);
for(List, Body) ->
%% For: iterate over list or enumerable, bind body for each element
for(List, Body) when is_list(List) ->
case List of
[] -> zero();
[H | T] -> bind(Body(H), fun(_) -> for(T, Body) end)
end.
end;
for(Ref, Body) when is_reference(Ref) ->
for(get(Ref), Body);
for(Enumerable, Body) ->
%% Lazy seq or other enumerable — convert to list first
for(fable_utils:to_list(Enumerable), Body).
35 changes: 23 additions & 12 deletions src/fable-library-beam/fable_mailbox.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
%% Constructor: create agent with empty queue, not started.
%% State stored in process dict keyed by a unique Ref.
default(Body) -> default(Body, undefined).
default(Body, _CancelToken) ->
default(Body, CancelToken) ->
Ref = make_ref(),
put(Ref, #{body => Body, messages => [], continuation => undefined}),
put(Ref, #{body => Body, messages => [], continuation => undefined, cancel_token => CancelToken}),
#{ref => Ref}.

%% Static Start: create + start + return agent
start(Body) -> start(Body, undefined).
start(Body, _CancelToken) ->
Agent = default(Body),
start(Body, CancelToken) ->
Agent = default(Body, CancelToken),
start_instance(Agent),
Agent.

Expand Down Expand Up @@ -81,16 +81,27 @@ post_and_async_reply(Agent, BuildMessage) ->
OnSuccess(Value)
end).

%% Internal: if continuation AND message available, invoke continuation with message
%% Internal: if continuation AND message available, invoke continuation with message.
%% Check cancellation token before processing.
process_events(Agent) ->
Ref = maps:get(ref, Agent),
State = get(Ref),
case {maps:get(continuation, State), maps:get(messages, State)} of
{undefined, _} ->
case maps:get(continuation, State) of
undefined ->
ok;
{_, []} ->
ok;
{Cont, [Msg | Rest]} ->
put(Ref, State#{messages => Rest, continuation => undefined}),
Cont(Msg)
Cont ->
CancelToken = maps:get(cancel_token, State, undefined),
case fable_cancellation:is_cancellation_requested(CancelToken) of
true ->
put(Ref, State#{continuation => undefined}),
ok;
false ->
case maps:get(messages, State) of
[] ->
ok;
[Msg | Rest] ->
put(Ref, State#{messages => Rest, continuation => undefined}),
Cont(Msg)
end
end
end.
49 changes: 26 additions & 23 deletions src/fable-library-py/fable_library/mailbox_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Callable
from queue import SimpleQueue
from threading import RLock
from typing import Any, Generic, TypeVar
from typing import Any

from .async_ import from_continuations, start_immediate
from .async_builder import (
Expand All @@ -14,33 +14,29 @@
)


_Msg = TypeVar("_Msg")
_Reply = TypeVar("_Reply")


class AsyncReplyChannel(Generic[_Reply]):
class AsyncReplyChannel[Reply]:
def __init__(self, fn: Callable[[Any], None]) -> None:
self.fn = fn

def reply(self, r: Any) -> None:
self.fn(r)


class MailboxProcessor(Generic[_Msg]):
class MailboxProcessor[Msg]:
def __init__(
self,
body: Callable[[MailboxProcessor[_Msg]], Async[None]],
body: Callable[[MailboxProcessor[Msg]], Async[None]],
cancellation_token: CancellationToken | None = None,
):
self.messages: SimpleQueue[_Msg] = SimpleQueue()
self.messages: SimpleQueue[Msg] = SimpleQueue()
self.token = cancellation_token or CancellationToken()
self.lock = RLock()
self.body = body

# Holds the continuation i.e the `done` callback of Async.from_continuations returned by `receive`.
self.continuation: Continuations[Any] | None = None

def post(self, msg: _Msg) -> None:
def post(self, msg: Msg) -> None:
"""Post a message synchronously to the mailbox processor.

This method is not asynchronous since it's very fast to execute.
Expand All @@ -56,7 +52,7 @@ def post(self, msg: _Msg) -> None:
self.messages.put(msg)
self.__process_events()

def post_and_async_reply(self, build_message: Callable[[AsyncReplyChannel[_Reply]], _Msg]) -> Async[_Reply]:
def post_and_async_reply[Reply](self, build_message: Callable[[AsyncReplyChannel[Reply]], Msg]) -> Async[Reply]:
"""Post a message asynchronously to the mailbox processor and
wait for the reply.

Expand All @@ -70,7 +66,7 @@ def post_and_async_reply(self, build_message: Callable[[AsyncReplyChannel[_Reply
The reply from mailbox processor.
"""

result: _Reply | None = None
result: Reply | None = None
continuation: Continuations[Any] | None = (
None # This is the continuation for the `done` callback of the awaiting poster.
)
Expand All @@ -79,12 +75,12 @@ def check_completion() -> None:
if result is not None and continuation is not None:
continuation[0](result)

def reply_callback(res: _Reply):
def reply_callback(res: Reply):
nonlocal result
result = res
check_completion()

reply_channel: AsyncReplyChannel[_Reply] = AsyncReplyChannel(reply_callback)
reply_channel: AsyncReplyChannel[Reply] = AsyncReplyChannel(reply_callback)
self.messages.put(build_message(reply_channel))
self.__process_events()

Expand All @@ -95,7 +91,7 @@ def callback(conts: Continuations[Any]) -> None:

return from_continuations(callback)

def receive(self) -> Async[_Msg]:
def receive(self) -> Async[Msg]:
"""Receive message from mailbox.

Returns:
Expand Down Expand Up @@ -139,31 +135,37 @@ def __process_events(self) -> None:
@classmethod
def start(
cls,
body: Callable[[MailboxProcessor[_Msg]], Async[None]],
body: Callable[[MailboxProcessor[Msg]], Async[None]],
cancellation_token: CancellationToken | None = None,
) -> MailboxProcessor[_Msg]:
mbox: MailboxProcessor[_Msg] = MailboxProcessor(body, cancellation_token)
) -> MailboxProcessor[Msg]:
mbox: MailboxProcessor[Msg] = MailboxProcessor(body, cancellation_token)
start_immediate(body(mbox))
return mbox


def receive(mbox: MailboxProcessor[_Msg]) -> Async[_Msg]:
def receive[Msg](mbox: MailboxProcessor[Msg]) -> Async[Msg]:
return mbox.receive()


def post(mbox: MailboxProcessor[_Msg], msg: _Msg):
def post[Msg](mbox: MailboxProcessor[Msg], msg: Msg):
return mbox.post(msg)


def post_and_async_reply[Msg, Reply](
mbox: MailboxProcessor[Msg], build_message: Callable[[AsyncReplyChannel[Reply]], Msg]
) -> Async[Reply]:
return mbox.post_and_async_reply(build_message)


def start_instance(mbox: MailboxProcessor[Any]) -> None:
body = mbox.body(mbox)
return start_immediate(body)


def start(
body: Callable[[MailboxProcessor[_Msg]], Async[None]],
def start[Msg](
body: Callable[[MailboxProcessor[Msg]], Async[None]],
cancellationToken: CancellationToken | None = None,
) -> MailboxProcessor[_Msg]:
) -> MailboxProcessor[Msg]:
mbox = MailboxProcessor(body, cancellationToken)
start_instance(mbox)
return mbox
Expand All @@ -173,6 +175,7 @@ def start(
"AsyncReplyChannel",
"MailboxProcessor",
"post",
"post_and_async_reply",
"receive",
"start",
"start_instance",
Expand Down
23 changes: 23 additions & 0 deletions tests/Beam/LoopTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,26 @@ let ``test while loop with counter works`` () =
x <- x - 3
count <- count + 1
count |> equal 4

[<Fact>]
let ``test for-in loop with negative step range works`` () =
let mutable result = []
for i in 5 .. -1 .. 1 do
result <- result @ [i]
result |> equal [5; 4; 3; 2; 1]

[<Fact>]
let ``test for-in loop with large negative step range works`` () =
let mutable count = 0
for _i in 1000 .. -1 .. 0 do
count <- count + 1
count |> equal 1001

[<Fact>]
let ``test for-in loop with negative step range works inside async`` () =
async {
let mutable count = 0
for _i in 5 .. -1 .. 1 do
count <- count + 1
equal 5 count
} |> Async.StartImmediate
Loading
Loading