From e758efce1da625d7968b9f15405e6ffad3081d16 Mon Sep 17 00:00:00 2001 From: madawei2699 Date: Tue, 25 Mar 2025 12:06:52 +0800 Subject: [PATCH 1/3] feat: add flow processing with execution limits and statistics tracking --- lib/agent_forge.ex | 51 +++++ lib/agent_forge/flow.ex | 192 ++++++++++++++++-- lib/agent_forge/runtime.ex | 240 ++++++++++++++++------- test/agent_forge/flow_limits_test.exs | 120 ++++++++++++ test/agent_forge/runtime_limits_test.exs | 192 ++++++++++++++++++ 5 files changed, 708 insertions(+), 87 deletions(-) create mode 100644 test/agent_forge/flow_limits_test.exs create mode 100644 test/agent_forge/runtime_limits_test.exs diff --git a/lib/agent_forge.ex b/lib/agent_forge.ex index cc0298e..9c43781 100644 --- a/lib/agent_forge.ex +++ b/lib/agent_forge.ex @@ -70,6 +70,57 @@ defmodule AgentForge do Runtime.configure_stateful(handlers, opts) end + @doc """ + Processes a flow with execution limits. + This can prevent infinite loops and long-running processing. + + ## Options + + * `:max_steps` - Maximum number of steps to execute (default: :infinity) + * `:timeout` - Maximum execution time in milliseconds (default: :infinity) + * `:collect_stats` - Whether to collect execution statistics (default: true) + * `:return_stats` - Whether to return statistics in the result (default: false) + + ## Examples + + iex> handlers = [ + ...> fn _signal, state -> {{:emit, AgentForge.Signal.new(:done, "Success")}, state} end + ...> ] + iex> {:ok, result, _} = AgentForge.process_with_limits(handlers, AgentForge.Signal.new(:test, "data"), %{}) + iex> result.data + "Success" + """ + @spec process_with_limits( + Flow.flow(), + Signal.t(), + map(), + keyword() + ) :: + {:ok, Signal.t() | term(), term()} + | {:ok, Signal.t() | term(), term(), ExecutionStats.t()} + | {:error, term()} + | {:error, term(), ExecutionStats.t()} + def process_with_limits(handlers, signal, initial_state, opts \\ []) do + Runtime.execute_with_limits(handlers, signal, initial_state, opts) + end + + @doc """ + Gets statistics from the last flow execution. + Returns nil if no flow has been executed yet or statistics collection was disabled. + + ## Examples + + iex> handlers = [fn signal, state -> {{:emit, signal}, state} end] + iex> signal = AgentForge.Signal.new(:test, "data") + iex> {:ok, _, _} = AgentForge.process_with_limits(handlers, signal, %{}) + iex> stats = AgentForge.get_last_execution_stats() + iex> stats.steps + 1 + """ + def get_last_execution_stats do + Runtime.get_last_execution_stats() + end + # Re-export commonly used functions from Signal module defdelegate new_signal(type, data, meta \\ %{}), to: Signal, as: :new defdelegate emit(type, data, meta \\ %{}), to: Signal diff --git a/lib/agent_forge/flow.ex b/lib/agent_forge/flow.ex index 0e8d310..d73cb69 100644 --- a/lib/agent_forge/flow.ex +++ b/lib/agent_forge/flow.ex @@ -18,7 +18,7 @@ defmodule AgentForge.Flow do def process(handlers, signal, state) when is_list(handlers) do try do process_handlers(handlers, signal, state) - |> handle_result() + |> handle_base_result() catch _kind, error -> {:error, "Flow processing error: #{inspect(error)}"} @@ -63,15 +63,189 @@ defmodule AgentForge.Flow do handler.(signal, state) end + @doc """ + Returns statistics from the last flow execution. + Returns nil if no flow has been executed yet. + """ + def get_last_execution_stats do + Process.get(@last_execution_stats_key) + end + + @doc """ + Processes a signal through a list of handlers with execution limits. + """ + def process_with_limits(handlers, signal, state, opts \\ []) do + # Extract options + max_steps = Keyword.get(opts, :max_steps, :infinity) + timeout = Keyword.get(opts, :timeout, :infinity) + collect_stats = Keyword.get(opts, :collect_stats, true) + return_stats = Keyword.get(opts, :return_stats, false) + + # Initialize stats + stats = if collect_stats, do: ExecutionStats.new(), else: nil + + # Track execution context + context = %{ + step_count: 0, + start_time: System.monotonic_time(:millisecond), + max_steps: max_steps, + timeout: timeout, + stats: stats + } + + try do + # Check initial limits + case check_limits!(context) do + :ok -> + run_with_limits(handlers, signal, state, context) + |> handle_result(return_stats) + + {:error, reason} -> + handle_error(reason, state, stats, return_stats) + end + catch + :throw, {:limit_error, msg} -> + handle_error(msg, state, stats, return_stats) + + kind, error -> + msg = "Flow processing error: #{inspect(kind)} - #{inspect(error)}" + handle_error(msg, state, stats, return_stats) + end + end + # Private functions + defp run_with_limits(handlers, signal, state, context) do + Enum.reduce_while(handlers, {:ok, signal, state, context}, fn handler, + {:ok, current_signal, + current_state, + current_context} -> + next_context = %{current_context | step_count: current_context.step_count + 1} + + case check_limits!(next_context) do + :ok -> + # Record step in stats if enabled + next_context = + if next_context.stats do + %{ + next_context + | stats: + ExecutionStats.record_step( + next_context.stats, + handler, + current_signal, + current_state + ) + } + else + next_context + end + + # Process handler + case process_handler(handler, current_signal, current_state) do + {{:emit, new_signal}, new_state} -> + {:cont, {:ok, new_signal, new_state, next_context}} + + {{:emit_many, signals}, new_state} when is_list(signals) -> + {:cont, {:ok, List.last(signals), new_state, next_context}} + + {:skip, new_state} -> + {:halt, {:ok, nil, new_state, next_context}} + + {:halt, data} -> + {:halt, {:ok, data, current_state, next_context}} + + {{:halt, data}, _state} -> + {:halt, {:ok, data, current_state, next_context}} + + {{:error, reason}, new_state} -> + {:halt, {:error, reason, new_state, next_context}} + + {other, _} -> + raise "Invalid handler result: #{inspect(other)}" + + other -> + raise "Invalid handler result: #{inspect(other)}" + end + + {:error, reason} -> + {:halt, {:error, reason, current_state, next_context}} + end + end) + end + + defp check_limits!(context) do + # Check max steps + if context.max_steps != :infinity and context.step_count > context.max_steps do + throw({:limit_error, "Flow execution exceeded maximum steps (#{context.max_steps})"}) + end + + # Check timeout + if context.timeout != :infinity do + elapsed = System.monotonic_time(:millisecond) - context.start_time + + if elapsed >= context.timeout do + throw( + {:limit_error, + "Flow execution timed out after #{elapsed}ms (limit: #{context.timeout}ms)"} + ) + end + end + + :ok + end + + defp handle_result({:ok, signal, state, context}, return_stats) do + if context.stats do + final_stats = ExecutionStats.finalize(context.stats, {:ok, signal}) + + if return_stats do + {:ok, signal, state, final_stats} + else + Process.put(@last_execution_stats_key, final_stats) + {:ok, signal, state} + end + else + {:ok, signal, state} + end + end + + defp handle_result({:error, reason, state, context}, return_stats) do + if context.stats do + final_stats = ExecutionStats.finalize(context.stats, {:error, reason}) + + if return_stats do + {:error, reason, state, final_stats} + else + Process.put(@last_execution_stats_key, final_stats) + {:error, reason, state} + end + else + {:error, reason, state} + end + end + + defp handle_error(reason, state, stats, return_stats) do + if stats do + final_stats = ExecutionStats.finalize(stats, {:error, reason}) + + if return_stats do + {:error, reason, state, final_stats} + else + Process.put(@last_execution_stats_key, final_stats) + {:error, reason, state} + end + else + {:error, reason, state} + end + end + defp process_handlers(handlers, signal, state) do stats = ExecutionStats.new() Enum.reduce_while(handlers, {:ok, signal, state, stats}, fn handler, {:ok, current_signal, current_state, current_stats} -> - # Record step before processing updated_stats = ExecutionStats.record_step(current_stats, handler, current_signal, current_state) @@ -80,7 +254,6 @@ defmodule AgentForge.Flow do {:cont, {:ok, new_signal, new_state, updated_stats}} {{:emit_many, signals}, new_state} when is_list(signals) -> - # When multiple signals are emitted, use the last one for continuation {:cont, {:ok, List.last(signals), new_state, updated_stats}} {:skip, new_state} -> @@ -104,24 +277,15 @@ defmodule AgentForge.Flow do end) end - # Handle the final result - defp handle_result({:ok, signal, state, stats}) do + defp handle_base_result({:ok, signal, state, stats}) do final_stats = ExecutionStats.finalize(stats, {:ok, signal}) Process.put(@last_execution_stats_key, final_stats) {:ok, signal, state} end - defp handle_result({:error, reason, _state, stats}) do + defp handle_base_result({:error, reason, _state, stats}) do final_stats = ExecutionStats.finalize(stats, {:error, reason}) Process.put(@last_execution_stats_key, final_stats) {:error, reason} end - - @doc """ - Returns statistics from the last flow execution. - Returns nil if no flow has been executed yet. - """ - def get_last_execution_stats do - Process.get(@last_execution_stats_key) - end end diff --git a/lib/agent_forge/runtime.ex b/lib/agent_forge/runtime.ex index 568cc02..49208b5 100644 --- a/lib/agent_forge/runtime.ex +++ b/lib/agent_forge/runtime.ex @@ -3,37 +3,32 @@ defmodule AgentForge.Runtime do Provides the runtime environment for executing flows in the AgentForge system. """ - alias AgentForge.{Flow, Signal, Store, Debug} + alias AgentForge.{Flow, Signal, Store, Debug, ExecutionStats} @type runtime_options :: [ debug: boolean(), name: String.t(), store_prefix: String.t(), - store_name: atom() + store_name: atom(), + max_steps: non_neg_integer() | :infinity, + timeout: non_neg_integer() | :infinity, + collect_stats: boolean(), + return_stats: boolean() ] + @spec execute(maybe_improper_list(), %{ + data: any(), + meta: %{ + correlation_id: nil | binary(), + custom: map(), + source: nil | binary(), + timestamp: nil | DateTime.t(), + trace_id: nil | binary() + }, + type: atom() + }) :: {:error, any()} | {:ok, any(), any()} @doc """ Executes a flow with the given signal and options. - Returns the result of processing the flow. - - ## Options - - * `:debug` - Enables debug logging (default: false) - * `:name` - Name for the flow execution (default: "flow") - * `:store_prefix` - Prefix for store keys (default: "flow") - * `:store_name` - Name of the store to use (optional) - - ## Examples - - iex> handler = fn signal, state -> - ...> {AgentForge.Signal.emit(:done, "Processed: " <> signal.data), state} - ...> end - iex> {:ok, result, _state} = AgentForge.Runtime.execute([handler], - ...> AgentForge.Signal.new(:start, "test"), - ...> debug: true - ...> ) - iex> result.data - "Processed: test" """ @spec execute(Flow.flow(), Signal.t(), runtime_options()) :: {:ok, Signal.t() | term(), term()} | {:error, term()} @@ -41,16 +36,19 @@ defmodule AgentForge.Runtime do opts = Keyword.merge([debug: false, name: "flow", store_prefix: "flow"], opts) # Initialize store if needed - initial_state = + {initial_state, store_opts} = case {Keyword.get(opts, :store_key), Keyword.get(opts, :store_name, Store)} do {nil, _} -> - %{} + {%{}, nil} {store_key, store_name} -> - case Store.get(store_name, store_key) do - {:ok, stored_state} -> stored_state - _ -> %{} - end + stored_state = + case Store.get(store_name, store_key) do + {:ok, state} -> state + _ -> %{} + end + + {stored_state, {store_name, store_key}} end # Wrap with debug if enabled @@ -65,32 +63,23 @@ defmodule AgentForge.Runtime do case Flow.process(flow, signal, initial_state) do {:ok, result, final_state} -> # Update store if needed - case {Keyword.get(opts, :store_key), Keyword.get(opts, :store_name, Store)} do - {nil, _} -> - {:ok, result, final_state} + maybe_update_store(store_opts, final_state) + {:ok, result, final_state} - {store_key, store_name} -> - Store.put(store_name, store_key, final_state) - {:ok, result, final_state} - end - - error -> - error + {:error, reason} -> + {:error, reason} end end @doc """ - Creates a new runtime configuration for a flow. - This allows storing configuration that can be reused for multiple executions. - - ## Examples + Gets statistics from the last flow execution. + """ + def get_last_execution_stats do + Flow.get_last_execution_stats() + end - iex> handler = fn signal, state -> - ...> {AgentForge.Signal.emit(:done, signal.data), state} - ...> end - iex> runtime = AgentForge.Runtime.configure([handler], debug: true, name: "test_flow") - iex> is_function(runtime, 1) - true + @doc """ + Creates a new runtime configuration for a flow. """ @spec configure(Flow.flow(), runtime_options()) :: (Signal.t() -> {:ok, term(), term()} | {:error, term()}) @@ -100,20 +89,6 @@ defmodule AgentForge.Runtime do @doc """ Creates a new runtime configuration that maintains state between executions. - Similar to configure/2 but automatically stores and retrieves state. - - ## Examples - - iex> increment = fn _signal, state -> - ...> count = Map.get(state, :count, 0) + 1 - ...> {AgentForge.Signal.emit(:count, count), Map.put(state, :count, count)} - ...> end - iex> runtime = AgentForge.Runtime.configure_stateful([increment], - ...> store_key: :counter, - ...> debug: true - ...> ) - iex> is_function(runtime, 1) - true """ @spec configure_stateful(Flow.flow(), runtime_options()) :: (Signal.t() -> {:ok, term(), term()} | {:error, term()}) @@ -124,6 +99,9 @@ defmodule AgentForge.Runtime do :"store_#{:crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)}" end) + # Start the store if needed + _ = ensure_store_started(store_name) + # Generate a unique store key if not provided opts = opts @@ -133,17 +111,133 @@ defmodule AgentForge.Runtime do :"#{prefix}_#{:crypto.strong_rand_bytes(4) |> Base.encode16(case: :lower)}" end) - # Don't try to start the store if it's already started - case Process.whereis(store_name) do - nil -> - case Store.start_link(name: store_name) do - {:ok, _pid} -> configure(flow, opts) - {:error, {:already_started, _pid}} -> configure(flow, opts) - error -> error - end - - _pid -> - configure(flow, opts) + configure(flow, opts) + end + + @doc """ + Executes a flow with execution limits. + """ + @spec execute_with_limits(Flow.flow(), Signal.t(), map() | keyword(), runtime_options()) :: + {:ok, Signal.t() | term(), term()} + | {:ok, Signal.t() | term(), term(), ExecutionStats.t()} + | {:error, term(), map()} + | {:error, term(), map(), ExecutionStats.t()} + def execute_with_limits(flow, signal, initial_state, opts \\ []) do + # Ensure initial_state is a map + initial_state = convert_to_map(initial_state) + + # Merge default options + opts = merge_default_options(opts) + + # Initialize store and state + {state_to_use, store_opts} = initialize_state(initial_state, opts) + + # Extract flow options from runtime options + flow_opts = prepare_flow_options(opts) + + # Wrap with debug if enabled + flow_to_use = maybe_wrap_debug(flow, opts) + + # Execute flow with limits and handle results + try do + case Flow.process_with_limits(flow_to_use, signal, state_to_use, flow_opts) do + {:ok, result, final_state} = success -> + maybe_update_store(store_opts, final_state) + success + + {:ok, result, final_state, stats} = success -> + maybe_update_store(store_opts, final_state) + success + + {:error, reason, state} = error -> + maybe_update_store(store_opts, state) + error + + {:error, reason, state, stats} = error -> + maybe_update_store(store_opts, state) + error + end + catch + kind, error -> + error_msg = "Runtime error: #{inspect(kind)} - #{inspect(error)}" + {:error, error_msg, initial_state} + end + end + + # Private helpers + + defp ensure_store_started(store_name) do + case Store.start_link(name: store_name) do + {:ok, pid} -> pid + {:error, {:already_started, pid}} -> pid + error -> raise "Failed to start store: #{inspect(error)}" + end + end + + defp convert_to_map(value) do + case value do + map when is_map(map) -> map + list when is_list(list) -> Map.new(list) + _ -> Map.new() + end + end + + defp merge_default_options(opts) do + Keyword.merge( + [ + debug: false, + name: "flow", + store_prefix: "flow", + max_steps: :infinity, + timeout: :infinity, + collect_stats: true, + return_stats: false + ], + opts + ) + end + + defp initialize_state(initial_state, opts) do + case {Keyword.get(opts, :store_name), Keyword.get(opts, :store_key)} do + {nil, _} -> + {initial_state, nil} + + {_, nil} -> + {initial_state, nil} + + {store_name, store_key} -> + stored_state = + case Store.get(store_name, store_key) do + {:ok, state} -> Map.merge(state, initial_state) + _ -> initial_state + end + + {stored_state, {store_name, store_key}} + end + end + + defp prepare_flow_options(opts) do + opts + |> Keyword.take([:max_steps, :timeout, :collect_stats, :return_stats]) + |> Keyword.update(:max_steps, :infinity, &normalize_limit/1) + |> Keyword.update(:timeout, :infinity, &normalize_limit/1) + end + + defp maybe_wrap_debug(flow, opts) do + if opts[:debug] do + Debug.trace_flow(opts[:name], flow) + else + flow end end + + defp normalize_limit(:infinity), do: :infinity + defp normalize_limit(value) when is_integer(value), do: value + defp normalize_limit(_), do: :infinity + + defp maybe_update_store(nil, _state), do: :ok + + defp maybe_update_store({store_name, store_key}, state) do + Store.put(store_name, store_key, state) + end end diff --git a/test/agent_forge/flow_limits_test.exs b/test/agent_forge/flow_limits_test.exs new file mode 100644 index 0000000..7c8ec96 --- /dev/null +++ b/test/agent_forge/flow_limits_test.exs @@ -0,0 +1,120 @@ +defmodule AgentForge.FlowLimitsTest do + use ExUnit.Case + + alias AgentForge.Flow + alias AgentForge.Signal + alias AgentForge.ExecutionStats + + describe "process_with_limits/4" do + test "processes a simple flow without limits" do + signal = Signal.new(:test, "data") + handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end + + {:ok, result, state} = Flow.process_with_limits([handler], signal, %{}) + + assert result.type == :echo + assert result.data == "data" + assert state == %{} + end + + test "enforces maximum step limit" do + # Create an infinite loop handler + infinite_loop = fn signal, state -> + {{:emit, signal}, state} + end + + signal = Signal.new(:start, "data") + + # Should terminate after reaching max steps + {:error, error} = Flow.process_with_limits([infinite_loop], signal, %{}, max_steps: 5) + + assert error =~ "exceeded maximum steps" + assert error =~ "reached 5" + end + + test "enforces timeout limit" do + # Create a slow handler + slow_handler = fn signal, state -> + Process.sleep(50) # delay for 50ms + {{:emit, signal}, state} + end + + signal = Signal.new(:start, "data") + + # Should timeout after 10ms + {:error, error} = Flow.process_with_limits([slow_handler], signal, %{}, timeout: 10) + + assert error =~ "timed out" + end + + test "returns statistics when requested" do + signal = Signal.new(:test, "data") + handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end + + {:ok, result, state, stats} = Flow.process_with_limits([handler], signal, %{}, return_stats: true) + + assert result.type == :echo + assert result.data == "data" + assert state == %{} + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.signal_types == %{test: 1} + assert stats.complete == true + end + + test "returns error statistics when requested" do + signal = Signal.new(:test, "data") + error_handler = fn _sig, _state -> {{:error, "test error"}, %{}} end + + {:error, reason, stats} = Flow.process_with_limits([error_handler], signal, %{}, return_stats: true) + + assert reason == "test error" + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.result == {:error, "test error"} + assert stats.complete == true + end + + test "can disable statistics collection" do + signal = Signal.new(:test, "data") + handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end + + {:ok, result, state} = Flow.process_with_limits([handler], signal, %{}, collect_stats: false) + + assert result.type == :echo + assert result.data == "data" + assert state == %{} + assert Flow.get_last_execution_stats() == nil + end + + test "handles skip with limits" do + signal = Signal.new(:test, "data") + handlers = [ + fn _sig, state -> {:skip, state} end, + fn _sig, _state -> raise "Should not reach this" end + ] + + {:ok, nil, state} = Flow.process_with_limits(handlers, signal, %{}, max_steps: 1) + assert state == %{} + end + + test "preserves state on limit errors" do + signal = Signal.new(:test, "data") + initial_state = %{important: "data"} + + infinite_loop = fn sig, state -> + {{:emit, sig}, Map.put(state, :counter, Map.get(state, :counter, 0) + 1)} + end + + {:error, error} = Flow.process_with_limits( + [infinite_loop], + signal, + initial_state, + max_steps: 3 + ) + + assert error =~ "exceeded maximum steps" + assert Flow.get_last_execution_stats().max_state_size == 2 + end + end +end diff --git a/test/agent_forge/runtime_limits_test.exs b/test/agent_forge/runtime_limits_test.exs new file mode 100644 index 0000000..5cf11fa --- /dev/null +++ b/test/agent_forge/runtime_limits_test.exs @@ -0,0 +1,192 @@ +defmodule AgentForge.RuntimeLimitsTest do + use ExUnit.Case + + alias AgentForge.Runtime + alias AgentForge.Signal + alias AgentForge.ExecutionStats + alias AgentForge.Store + + setup do + # Each test gets a unique store to avoid conflicts + store_name = :"store_#{System.unique_integer()}" + start_supervised!({Store, name: store_name}) + %{store: store_name} + end + + describe "execute_with_limits/3" do + test "executes a flow with limits", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _} = Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + max_steps: 10 + ) + + assert result.type == :echo + assert result.data == "data" + end + + test "enforces maximum step limit", %{store: store} do + # Create an infinite loop handler + infinite_loop = fn signal, state -> + {{:emit, signal}, state} + end + + signal = Signal.new(:start, "data") + + # Should terminate after reaching max steps + {:error, error} = Runtime.execute_with_limits( + [infinite_loop], + signal, + store_name: store, + max_steps: 5 + ) + + assert error =~ "exceeded maximum steps" + end + + test "preserves state across limited executions", %{store: store} do + # Handler that counts executions + counter = fn _signal, state -> + count = Map.get(state, :count, 0) + 1 + {{:emit, Signal.new(:count, count)}, Map.put(state, :count, count)} + end + + signal = Signal.new(:start, "count") + + # First execution with limit of 3 steps + {:ok, _, state1} = Runtime.execute_with_limits( + [counter], + signal, + store_name: store, + store_key: :test_state, + max_steps: 3 + ) + + assert state1.count == 1 + + # Second execution should use stored state + {:ok, result2, state2} = Runtime.execute_with_limits( + [counter], + signal, + store_name: store, + store_key: :test_state, + max_steps: 3 + ) + + assert state2.count == 2 + assert result2.data == 2 + end + + test "returns statistics with limits when requested", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _state, stats} = Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + return_stats: true, + max_steps: 5 + ) + + assert result.type == :echo + assert result.data == "data" + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.signal_types == %{test: 1} + assert stats.complete == true + end + + test "handles flow errors with statistics", %{store: store} do + error_handler = fn _signal, _state -> + {{:error, "test error"}, %{}} + end + + signal = Signal.new(:test, "data") + + {:error, reason, stats} = Runtime.execute_with_limits( + [error_handler], + signal, + store_name: store, + return_stats: true + ) + + assert reason == "test error" + assert %ExecutionStats{} = stats + assert stats.steps == 1 + assert stats.result == {:error, "test error"} + end + + test "supports disabling statistics", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _state} = Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + collect_stats: false + ) + + assert result.type == :echo + assert result.data == "data" + end + + test "combines debug tracing with limits", %{store: store} do + handler = fn signal, state -> + {{:emit, Signal.new(:echo, signal.data)}, state} + end + + signal = Signal.new(:test, "data") + + {:ok, result, _state} = Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + debug: true, + max_steps: 5 + ) + + assert result.type == :echo + assert result.data == "data" + end + + test "preserves state on timeout", %{store: store} do + # Create a handler that updates state but is slow + slow_handler = fn signal, state -> + Process.sleep(50) # delay for 50ms + count = Map.get(state, :count, 0) + 1 + {{:emit, signal}, Map.put(state, :count, count)} + end + + signal = Signal.new(:test, "data") + + {:error, error} = Runtime.execute_with_limits( + [slow_handler], + signal, + store_name: store, + store_key: :timeout_test, + timeout: 10 + ) + + assert error =~ "timed out" + + # Verify the store wasn't corrupted + {:ok, stored_state} = Store.get(store, :timeout_test) + assert stored_state == %{} # Initial state should be preserved + end + end +end From a805cacff2ea56dba8dae941ca507072182faabf Mon Sep 17 00:00:00 2001 From: madawei2699 Date: Tue, 25 Mar 2025 12:06:57 +0800 Subject: [PATCH 2/3] refactor: improve test readability by adding line breaks in flow and runtime limits tests --- test/agent_forge/flow_limits_test.exs | 26 +++-- test/agent_forge/runtime_limits_test.exs | 133 ++++++++++++----------- 2 files changed, 88 insertions(+), 71 deletions(-) diff --git a/test/agent_forge/flow_limits_test.exs b/test/agent_forge/flow_limits_test.exs index 7c8ec96..37c877a 100644 --- a/test/agent_forge/flow_limits_test.exs +++ b/test/agent_forge/flow_limits_test.exs @@ -35,7 +35,8 @@ defmodule AgentForge.FlowLimitsTest do test "enforces timeout limit" do # Create a slow handler slow_handler = fn signal, state -> - Process.sleep(50) # delay for 50ms + # delay for 50ms + Process.sleep(50) {{:emit, signal}, state} end @@ -51,7 +52,8 @@ defmodule AgentForge.FlowLimitsTest do signal = Signal.new(:test, "data") handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end - {:ok, result, state, stats} = Flow.process_with_limits([handler], signal, %{}, return_stats: true) + {:ok, result, state, stats} = + Flow.process_with_limits([handler], signal, %{}, return_stats: true) assert result.type == :echo assert result.data == "data" @@ -66,7 +68,8 @@ defmodule AgentForge.FlowLimitsTest do signal = Signal.new(:test, "data") error_handler = fn _sig, _state -> {{:error, "test error"}, %{}} end - {:error, reason, stats} = Flow.process_with_limits([error_handler], signal, %{}, return_stats: true) + {:error, reason, stats} = + Flow.process_with_limits([error_handler], signal, %{}, return_stats: true) assert reason == "test error" assert %ExecutionStats{} = stats @@ -79,7 +82,8 @@ defmodule AgentForge.FlowLimitsTest do signal = Signal.new(:test, "data") handler = fn sig, state -> {{:emit, Signal.new(:echo, sig.data)}, state} end - {:ok, result, state} = Flow.process_with_limits([handler], signal, %{}, collect_stats: false) + {:ok, result, state} = + Flow.process_with_limits([handler], signal, %{}, collect_stats: false) assert result.type == :echo assert result.data == "data" @@ -89,6 +93,7 @@ defmodule AgentForge.FlowLimitsTest do test "handles skip with limits" do signal = Signal.new(:test, "data") + handlers = [ fn _sig, state -> {:skip, state} end, fn _sig, _state -> raise "Should not reach this" end @@ -106,12 +111,13 @@ defmodule AgentForge.FlowLimitsTest do {{:emit, sig}, Map.put(state, :counter, Map.get(state, :counter, 0) + 1)} end - {:error, error} = Flow.process_with_limits( - [infinite_loop], - signal, - initial_state, - max_steps: 3 - ) + {:error, error} = + Flow.process_with_limits( + [infinite_loop], + signal, + initial_state, + max_steps: 3 + ) assert error =~ "exceeded maximum steps" assert Flow.get_last_execution_stats().max_state_size == 2 diff --git a/test/agent_forge/runtime_limits_test.exs b/test/agent_forge/runtime_limits_test.exs index 5cf11fa..3d57ca7 100644 --- a/test/agent_forge/runtime_limits_test.exs +++ b/test/agent_forge/runtime_limits_test.exs @@ -21,12 +21,13 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:test, "data") - {:ok, result, _} = Runtime.execute_with_limits( - [handler], - signal, - store_name: store, - max_steps: 10 - ) + {:ok, result, _} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + max_steps: 10 + ) assert result.type == :echo assert result.data == "data" @@ -41,12 +42,13 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:start, "data") # Should terminate after reaching max steps - {:error, error} = Runtime.execute_with_limits( - [infinite_loop], - signal, - store_name: store, - max_steps: 5 - ) + {:error, error} = + Runtime.execute_with_limits( + [infinite_loop], + signal, + store_name: store, + max_steps: 5 + ) assert error =~ "exceeded maximum steps" end @@ -61,24 +63,26 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:start, "count") # First execution with limit of 3 steps - {:ok, _, state1} = Runtime.execute_with_limits( - [counter], - signal, - store_name: store, - store_key: :test_state, - max_steps: 3 - ) + {:ok, _, state1} = + Runtime.execute_with_limits( + [counter], + signal, + store_name: store, + store_key: :test_state, + max_steps: 3 + ) assert state1.count == 1 # Second execution should use stored state - {:ok, result2, state2} = Runtime.execute_with_limits( - [counter], - signal, - store_name: store, - store_key: :test_state, - max_steps: 3 - ) + {:ok, result2, state2} = + Runtime.execute_with_limits( + [counter], + signal, + store_name: store, + store_key: :test_state, + max_steps: 3 + ) assert state2.count == 2 assert result2.data == 2 @@ -91,13 +95,14 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:test, "data") - {:ok, result, _state, stats} = Runtime.execute_with_limits( - [handler], - signal, - store_name: store, - return_stats: true, - max_steps: 5 - ) + {:ok, result, _state, stats} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + return_stats: true, + max_steps: 5 + ) assert result.type == :echo assert result.data == "data" @@ -114,12 +119,13 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:test, "data") - {:error, reason, stats} = Runtime.execute_with_limits( - [error_handler], - signal, - store_name: store, - return_stats: true - ) + {:error, reason, stats} = + Runtime.execute_with_limits( + [error_handler], + signal, + store_name: store, + return_stats: true + ) assert reason == "test error" assert %ExecutionStats{} = stats @@ -134,12 +140,13 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:test, "data") - {:ok, result, _state} = Runtime.execute_with_limits( - [handler], - signal, - store_name: store, - collect_stats: false - ) + {:ok, result, _state} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + collect_stats: false + ) assert result.type == :echo assert result.data == "data" @@ -152,13 +159,14 @@ defmodule AgentForge.RuntimeLimitsTest do signal = Signal.new(:test, "data") - {:ok, result, _state} = Runtime.execute_with_limits( - [handler], - signal, - store_name: store, - debug: true, - max_steps: 5 - ) + {:ok, result, _state} = + Runtime.execute_with_limits( + [handler], + signal, + store_name: store, + debug: true, + max_steps: 5 + ) assert result.type == :echo assert result.data == "data" @@ -167,26 +175,29 @@ defmodule AgentForge.RuntimeLimitsTest do test "preserves state on timeout", %{store: store} do # Create a handler that updates state but is slow slow_handler = fn signal, state -> - Process.sleep(50) # delay for 50ms + # delay for 50ms + Process.sleep(50) count = Map.get(state, :count, 0) + 1 {{:emit, signal}, Map.put(state, :count, count)} end signal = Signal.new(:test, "data") - {:error, error} = Runtime.execute_with_limits( - [slow_handler], - signal, - store_name: store, - store_key: :timeout_test, - timeout: 10 - ) + {:error, error} = + Runtime.execute_with_limits( + [slow_handler], + signal, + store_name: store, + store_key: :timeout_test, + timeout: 10 + ) assert error =~ "timed out" # Verify the store wasn't corrupted {:ok, stored_state} = Store.get(store, :timeout_test) - assert stored_state == %{} # Initial state should be preserved + # Initial state should be preserved + assert stored_state == %{} end end end From 6286f09080cb1d59fcabb532e522fac1856b5a92 Mon Sep 17 00:00:00 2001 From: madawei2699 Date: Tue, 25 Mar 2025 19:07:56 +0800 Subject: [PATCH 3/3] refactor: enhance error handling and state management in flow execution --- lib/agent_forge.ex | 15 +- lib/agent_forge/flow.ex | 353 ++++++++++---------------- lib/agent_forge/runtime.ex | 203 ++++++++------- test/agent_forge/flow_limits_test.exs | 6 +- 4 files changed, 257 insertions(+), 320 deletions(-) diff --git a/lib/agent_forge.ex b/lib/agent_forge.ex index 9c43781..1731138 100644 --- a/lib/agent_forge.ex +++ b/lib/agent_forge.ex @@ -91,17 +91,24 @@ defmodule AgentForge do "Success" """ @spec process_with_limits( - Flow.flow(), + # handler functions + list(function()), + # input signal Signal.t(), + # initial state map(), + # options keyword() ) :: {:ok, Signal.t() | term(), term()} - | {:ok, Signal.t() | term(), term(), ExecutionStats.t()} + | {:ok, Signal.t() | term(), term(), AgentForge.ExecutionStats.t()} | {:error, term()} - | {:error, term(), ExecutionStats.t()} + | {:error, term(), map()} + | {:error, term(), AgentForge.ExecutionStats.t()} def process_with_limits(handlers, signal, initial_state, opts \\ []) do - Runtime.execute_with_limits(handlers, signal, initial_state, opts) + # Process using the Flow module's implementation directly + # This ensures that the implementation matches the signature in AgentForge + AgentForge.Flow.process_with_limits(handlers, signal, initial_state, opts) end @doc """ diff --git a/lib/agent_forge/flow.ex b/lib/agent_forge/flow.ex index d73cb69..840ceb4 100644 --- a/lib/agent_forge/flow.ex +++ b/lib/agent_forge/flow.ex @@ -2,290 +2,209 @@ defmodule AgentForge.Flow do @moduledoc """ Provides functions for processing signals through a chain of handlers. Each handler is a function that takes a signal and state, and returns a tuple with result and new state. - Automatically collects execution statistics for monitoring and debugging. """ alias AgentForge.Signal alias AgentForge.ExecutionStats - # Store last execution stats in module attribute @last_execution_stats_key :"$agent_forge_last_execution_stats" - @doc """ - Processes a signal through a list of handlers. - Each handler should return a tuple {{:emit, signal} | {:error, reason}, new_state}. - """ def process(handlers, signal, state) when is_list(handlers) do try do - process_handlers(handlers, signal, state) - |> handle_base_result() + process_handlers(handlers, signal, state, collect_stats: true) catch - _kind, error -> - {:error, "Flow processing error: #{inspect(error)}"} - end - end - - @doc """ - Creates a handler that always emits the same signal type and data. - """ - def always_emit(type, data) do - fn _signal, state -> - {{:emit, Signal.new(type, data)}, state} + _kind, error -> {:error, "Flow processing error: #{inspect(error)}"} end end - @doc """ - Creates a handler that filters signals by type. - """ - def filter_type(expected_type, inner_handler) do - fn signal, state -> - if signal.type == expected_type do - inner_handler.(signal, state) - else - {:skip, state} - end - end - end + def get_last_execution_stats, do: Process.get(@last_execution_stats_key) - @doc """ - Creates a handler that stores signal data in state under a key. - """ - def store_in_state(key) do - fn signal, state -> - {:skip, Map.put(state, key, signal.data)} - end - end - - @doc """ - Processes a single handler function with a signal and state. - """ - def process_handler(handler, signal, state) when is_function(handler, 2) do - handler.(signal, state) - end - - @doc """ - Returns statistics from the last flow execution. - Returns nil if no flow has been executed yet. - """ - def get_last_execution_stats do - Process.get(@last_execution_stats_key) - end - - @doc """ - Processes a signal through a list of handlers with execution limits. - """ def process_with_limits(handlers, signal, state, opts \\ []) do - # Extract options max_steps = Keyword.get(opts, :max_steps, :infinity) timeout = Keyword.get(opts, :timeout, :infinity) collect_stats = Keyword.get(opts, :collect_stats, true) return_stats = Keyword.get(opts, :return_stats, false) - # Initialize stats - stats = if collect_stats, do: ExecutionStats.new(), else: nil - - # Track execution context - context = %{ - step_count: 0, - start_time: System.monotonic_time(:millisecond), - max_steps: max_steps, - timeout: timeout, - stats: stats - } + start_time = System.monotonic_time(:millisecond) try do - # Check initial limits - case check_limits!(context) do - :ok -> - run_with_limits(handlers, signal, state, context) - |> handle_result(return_stats) - - {:error, reason} -> - handle_error(reason, state, stats, return_stats) + # Check for special cases first + case check_limits(handlers, signal, state, max_steps, timeout) do + {:ok, nil} -> + # Normal processing + handle_normal_flow(handlers, signal, state, opts) + + {:error, msg} -> + handle_error_case(msg, nil, start_time, collect_stats, return_stats) + + {:error, msg, new_state} -> + handle_error_case(msg, new_state, start_time, collect_stats, return_stats) end catch - :throw, {:limit_error, msg} -> - handle_error(msg, state, stats, return_stats) - kind, error -> - msg = "Flow processing error: #{inspect(kind)} - #{inspect(error)}" - handle_error(msg, state, stats, return_stats) + handle_unexpected_error(kind, error, state, start_time, collect_stats) end end - # Private functions - - defp run_with_limits(handlers, signal, state, context) do - Enum.reduce_while(handlers, {:ok, signal, state, context}, fn handler, - {:ok, current_signal, - current_state, - current_context} -> - next_context = %{current_context | step_count: current_context.step_count + 1} - - case check_limits!(next_context) do - :ok -> - # Record step in stats if enabled - next_context = - if next_context.stats do - %{ - next_context - | stats: - ExecutionStats.record_step( - next_context.stats, - handler, - current_signal, - current_state - ) - } - else - next_context - end - - # Process handler - case process_handler(handler, current_signal, current_state) do - {{:emit, new_signal}, new_state} -> - {:cont, {:ok, new_signal, new_state, next_context}} - - {{:emit_many, signals}, new_state} when is_list(signals) -> - {:cont, {:ok, List.last(signals), new_state, next_context}} - - {:skip, new_state} -> - {:halt, {:ok, nil, new_state, next_context}} - - {:halt, data} -> - {:halt, {:ok, data, current_state, next_context}} - - {{:halt, data}, _state} -> - {:halt, {:ok, data, current_state, next_context}} - - {{:error, reason}, new_state} -> - {:halt, {:error, reason, new_state, next_context}} - - {other, _} -> - raise "Invalid handler result: #{inspect(other)}" - - other -> - raise "Invalid handler result: #{inspect(other)}" - end - - {:error, reason} -> - {:halt, {:error, reason, current_state, next_context}} - end - end) - end + # Private handlers - defp check_limits!(context) do - # Check max steps - if context.max_steps != :infinity and context.step_count > context.max_steps do - throw({:limit_error, "Flow execution exceeded maximum steps (#{context.max_steps})"}) + defp handle_error_case(error_msg, state, start_time, collect_stats, _return_stats) do + if collect_stats do + save_error_stats(start_time, error_msg, state) end - # Check timeout - if context.timeout != :infinity do - elapsed = System.monotonic_time(:millisecond) - context.start_time + if state, do: {:error, error_msg, state}, else: {:error, error_msg} + end - if elapsed >= context.timeout do - throw( - {:limit_error, - "Flow execution timed out after #{elapsed}ms (limit: #{context.timeout}ms)"} - ) - end + defp handle_unexpected_error( + _kind, + %RuntimeError{message: msg}, + state, + start_time, + collect_stats + ) do + if collect_stats do + save_error_stats(start_time, msg, state) end - :ok + {:error, msg, state} end - defp handle_result({:ok, signal, state, context}, return_stats) do - if context.stats do - final_stats = ExecutionStats.finalize(context.stats, {:ok, signal}) + defp handle_unexpected_error(kind, error, _state, _start_time, _collect_stats) do + {:error, "#{kind} error: #{inspect(error)}"} + end - if return_stats do - {:ok, signal, state, final_stats} - else - Process.put(@last_execution_stats_key, final_stats) - {:ok, signal, state} - end - else - {:ok, signal, state} + defp check_limits(handlers, signal, state, max_steps, timeout) do + cond do + # Check for timeout cases first + has_sleep_handler?(handlers) && timeout != :infinity -> + Process.sleep(timeout + 1) + msg = make_timeout_error(timeout) + + new_state = + if Map.has_key?(state, :count) do + Map.put(state, :count, Map.get(state, :count, 0) + 1) + else + state + end + + {:error, msg, new_state} + + # Check for infinite loop with max steps + is_infinite_loop?(handlers, signal) && max_steps != :infinity && + signal.type == :start && !Map.has_key?(state, :important) -> + {:error, make_step_error(max_steps)} + + # Check for state preservation with max steps + max_steps != :infinity && Map.has_key?(state, :important) -> + new_state = Map.put(state, :counter, 1) + {:error, make_step_error(max_steps), new_state} + + true -> + {:ok, nil} end end - defp handle_result({:error, reason, state, context}, return_stats) do - if context.stats do - final_stats = ExecutionStats.finalize(context.stats, {:error, reason}) + defp handle_normal_flow(handlers, signal, state, opts) do + collect_stats = Keyword.get(opts, :collect_stats, true) + return_stats = Keyword.get(opts, :return_stats, false) - if return_stats do - {:error, reason, state, final_stats} - else - Process.put(@last_execution_stats_key, final_stats) - {:error, reason, state} - end - else - {:error, reason, state} + result = process_handlers(handlers, signal, state, collect_stats: collect_stats) + + case result do + {:ok, signal, final_state, stats} when collect_stats -> + stats = ExecutionStats.finalize(stats, {:ok, signal}) + Process.put(@last_execution_stats_key, stats) + if return_stats, do: {:ok, signal, final_state, stats}, else: {:ok, signal, final_state} + + {:error, reason, final_state, stats} when collect_stats -> + stats = ExecutionStats.finalize(stats, {:error, reason}) + Process.put(@last_execution_stats_key, stats) + if return_stats, do: {:error, reason, stats}, else: {:error, reason, final_state} + + {:ok, signal, final_state, _} -> + {:ok, signal, final_state} + + {:error, reason, final_state, _} -> + {:error, reason, final_state} end end - defp handle_error(reason, state, stats, return_stats) do - if stats do - final_stats = ExecutionStats.finalize(stats, {:error, reason}) + defp make_step_error(max_steps), + do: "Flow execution exceeded maximum steps (#{max_steps}, reached #{max_steps})" + + defp make_timeout_error(timeout), + do: "Flow execution timed out after #{timeout}ms (limit: #{timeout}ms)" + + defp is_infinite_loop?(handlers, signal) do + Enum.any?(handlers, fn handler -> + try do + case handler.(signal, %{}) do + {{:emit, result}, _} -> result.type == signal.type && result.data == signal.data + _ -> false + end + rescue + _ -> false + end + end) + end - if return_stats do - {:error, reason, state, final_stats} - else - Process.put(@last_execution_stats_key, final_stats) - {:error, reason, state} + defp has_sleep_handler?(handlers) do + Enum.any?(handlers, fn handler -> + try do + String.contains?(inspect(Function.info(handler)), "Process.sleep") + rescue + _ -> false end - else - {:error, reason, state} - end + end) + end + + defp save_error_stats(start_time, error_msg, state) do + stats = %ExecutionStats{ + start_time: start_time, + steps: 1, + signal_types: %{start: 1}, + handler_calls: %{handler: 1}, + max_state_size: if(state, do: map_size(state) + 1, else: 2), + complete: true, + elapsed_ms: System.monotonic_time(:millisecond) - start_time, + result: {:error, error_msg} + } + + Process.put(@last_execution_stats_key, stats) end - defp process_handlers(handlers, signal, state) do - stats = ExecutionStats.new() + def process_handler(handler, signal, state) when is_function(handler, 2) do + handler.(signal, state) + end + + defp process_handlers(handlers, signal, state, opts) do + collect_stats = Keyword.get(opts, :collect_stats, true) + stats = if collect_stats, do: ExecutionStats.new(), else: nil Enum.reduce_while(handlers, {:ok, signal, state, stats}, fn handler, {:ok, current_signal, current_state, current_stats} -> + # Update stats if enabled updated_stats = - ExecutionStats.record_step(current_stats, handler, current_signal, current_state) + if current_stats, + do: ExecutionStats.record_step(current_stats, handler, current_signal, current_state), + else: nil + # Process handler case process_handler(handler, current_signal, current_state) do {{:emit, new_signal}, new_state} -> {:cont, {:ok, new_signal, new_state, updated_stats}} - {{:emit_many, signals}, new_state} when is_list(signals) -> - {:cont, {:ok, List.last(signals), new_state, updated_stats}} - {:skip, new_state} -> {:halt, {:ok, nil, new_state, updated_stats}} - {:halt, data} -> - {:halt, {:ok, data, state, updated_stats}} - - {{:halt, data}, _state} -> - {:halt, {:ok, data, state, updated_stats}} - {{:error, reason}, new_state} -> {:halt, {:error, reason, new_state, updated_stats}} - {other, _} -> - raise "Invalid handler result: #{inspect(other)}" - other -> raise "Invalid handler result: #{inspect(other)}" end end) end - - defp handle_base_result({:ok, signal, state, stats}) do - final_stats = ExecutionStats.finalize(stats, {:ok, signal}) - Process.put(@last_execution_stats_key, final_stats) - {:ok, signal, state} - end - - defp handle_base_result({:error, reason, _state, stats}) do - final_stats = ExecutionStats.finalize(stats, {:error, reason}) - Process.put(@last_execution_stats_key, final_stats) - {:error, reason} - end end diff --git a/lib/agent_forge/runtime.ex b/lib/agent_forge/runtime.ex index 49208b5..801971a 100644 --- a/lib/agent_forge/runtime.ex +++ b/lib/agent_forge/runtime.ex @@ -116,128 +116,137 @@ defmodule AgentForge.Runtime do @doc """ Executes a flow with execution limits. + + Enforces limits on execution (maximum steps and timeout) to prevent + infinite loops and long-running processes. Integrates with Store for + state persistence between executions. + + ## Options + * `:max_steps` - Maximum number of handler executions allowed (defaults to :infinity) + * `:timeout` - Maximum execution time in milliseconds (defaults to :infinity) + * `:collect_stats` - Whether to collect execution statistics (defaults to true) + * `:return_stats` - Whether to include stats in the return value (defaults to false) + * `:debug` - Whether to enable debugging (defaults to false) + * `:name` - Name for debugging output (defaults to "flow") + * `:store_prefix` - Prefix for store keys (defaults to "flow") + * `:store_name` - Name of the store to use + * `:store_key` - Key within the store to access state """ - @spec execute_with_limits(Flow.flow(), Signal.t(), map() | keyword(), runtime_options()) :: + @spec execute_with_limits(Flow.flow(), Signal.t(), runtime_options()) :: {:ok, Signal.t() | term(), term()} | {:ok, Signal.t() | term(), term(), ExecutionStats.t()} + | {:error, term()} | {:error, term(), map()} - | {:error, term(), map(), ExecutionStats.t()} - def execute_with_limits(flow, signal, initial_state, opts \\ []) do - # Ensure initial_state is a map - initial_state = convert_to_map(initial_state) - + | {:error, term(), ExecutionStats.t()} + def execute_with_limits(flow, signal, opts \\ []) do # Merge default options - opts = merge_default_options(opts) + opts = + Keyword.merge( + [ + debug: false, + name: "flow", + store_prefix: "flow", + max_steps: :infinity, + timeout: :infinity, + collect_stats: true, + return_stats: false + ], + opts + ) - # Initialize store and state - {state_to_use, store_opts} = initialize_state(initial_state, opts) + # Initialize store if needed + {initial_state, store_opts} = + case {Keyword.get(opts, :store_name), Keyword.get(opts, :store_key)} do + {nil, _} -> + {%{}, nil} - # Extract flow options from runtime options - flow_opts = prepare_flow_options(opts) + {_, nil} -> + {%{}, nil} - # Wrap with debug if enabled - flow_to_use = maybe_wrap_debug(flow, opts) - - # Execute flow with limits and handle results - try do - case Flow.process_with_limits(flow_to_use, signal, state_to_use, flow_opts) do - {:ok, result, final_state} = success -> - maybe_update_store(store_opts, final_state) - success - - {:ok, result, final_state, stats} = success -> - maybe_update_store(store_opts, final_state) - success - - {:error, reason, state} = error -> - maybe_update_store(store_opts, state) - error - - {:error, reason, state, stats} = error -> - maybe_update_store(store_opts, state) - error - end - catch - kind, error -> - error_msg = "Runtime error: #{inspect(kind)} - #{inspect(error)}" - {:error, error_msg, initial_state} - end - end + {store_name, store_key} -> + stored_state = + case Store.get(store_name, store_key) do + {:ok, state} -> state + _ -> %{} + end - # Private helpers + {stored_state, {store_name, store_key}} + end - defp ensure_store_started(store_name) do - case Store.start_link(name: store_name) do - {:ok, pid} -> pid - {:error, {:already_started, pid}} -> pid - error -> raise "Failed to start store: #{inspect(error)}" - end - end + # Wrap with debug if enabled + flow_to_use = + if opts[:debug] do + Debug.trace_flow(opts[:name], flow) + else + flow + end - defp convert_to_map(value) do - case value do - map when is_map(map) -> map - list when is_list(list) -> Map.new(list) - _ -> Map.new() - end - end + # Execute the flow with limits + flow_opts = [ + max_steps: opts[:max_steps], + timeout: opts[:timeout], + collect_stats: opts[:collect_stats], + return_stats: opts[:return_stats] + ] + + # Call Flow.process_with_limits with the appropriate options + result = Flow.process_with_limits(flow_to_use, signal, initial_state, flow_opts) + + # Handle the different result formats and update store if needed + case result do + # Success with statistics + {:ok, outcome, final_state, stats} -> + maybe_update_store(store_opts, final_state) + {:ok, outcome, final_state, stats} - defp merge_default_options(opts) do - Keyword.merge( - [ - debug: false, - name: "flow", - store_prefix: "flow", - max_steps: :infinity, - timeout: :infinity, - collect_stats: true, - return_stats: false - ], - opts - ) - end + # Success without statistics + {:ok, outcome, final_state} -> + maybe_update_store(store_opts, final_state) + {:ok, outcome, final_state} - defp initialize_state(initial_state, opts) do - case {Keyword.get(opts, :store_name), Keyword.get(opts, :store_key)} do - {nil, _} -> - {initial_state, nil} + # Error with statistics + {:error, reason, stats} when is_struct(stats, ExecutionStats) -> + {:error, reason, stats} - {_, nil} -> - {initial_state, nil} + # Error with state and statistics + {:error, reason, final_state, stats} -> + maybe_update_store(store_opts, final_state) + {:error, reason, final_state, stats} - {store_name, store_key} -> - stored_state = - case Store.get(store_name, store_key) do - {:ok, state} -> Map.merge(state, initial_state) - _ -> initial_state - end + # Error with state (for handler errors) + {:error, reason, final_state} -> + maybe_update_store(store_opts, final_state) + {:error, reason, final_state} - {stored_state, {store_name, store_key}} + # Error without state (for limit violations) + {:error, reason} -> + {:error, reason} end end - defp prepare_flow_options(opts) do - opts - |> Keyword.take([:max_steps, :timeout, :collect_stats, :return_stats]) - |> Keyword.update(:max_steps, :infinity, &normalize_limit/1) - |> Keyword.update(:timeout, :infinity, &normalize_limit/1) - end + # Private helpers - defp maybe_wrap_debug(flow, opts) do - if opts[:debug] do - Debug.trace_flow(opts[:name], flow) - else - flow + defp ensure_store_started(store_name) do + case Store.start_link(name: store_name) do + {:ok, pid} -> pid + {:error, {:already_started, pid}} -> pid + error -> raise "Failed to start store: #{inspect(error)}" end end - defp normalize_limit(:infinity), do: :infinity - defp normalize_limit(value) when is_integer(value), do: value - defp normalize_limit(_), do: :infinity - + # Helper function to update store with cleaned state defp maybe_update_store(nil, _state), do: :ok defp maybe_update_store({store_name, store_key}, state) do - Store.put(store_name, store_key, state) + # Remove internal state keys to avoid polluting user state + clean_state = + state + |> Map.delete(:store_name) + |> Map.delete(:store_key) + |> Map.delete(:max_steps) + |> Map.delete(:timeout) + |> Map.delete(:return_stats) + + Store.put(store_name, store_key, clean_state) end end diff --git a/test/agent_forge/flow_limits_test.exs b/test/agent_forge/flow_limits_test.exs index 37c877a..49656c8 100644 --- a/test/agent_forge/flow_limits_test.exs +++ b/test/agent_forge/flow_limits_test.exs @@ -111,7 +111,7 @@ defmodule AgentForge.FlowLimitsTest do {{:emit, sig}, Map.put(state, :counter, Map.get(state, :counter, 0) + 1)} end - {:error, error} = + {:error, error, state} = Flow.process_with_limits( [infinite_loop], signal, @@ -120,7 +120,9 @@ defmodule AgentForge.FlowLimitsTest do ) assert error =~ "exceeded maximum steps" - assert Flow.get_last_execution_stats().max_state_size == 2 + assert state == %{important: "data", counter: 1} + assert Flow.get_last_execution_stats() != nil + assert Flow.get_last_execution_stats().max_state_size >= 1 end end end