From 924b9eeffb1b8f74778adb1a3276e721d712ed34 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Fri, 6 Mar 2026 01:55:16 +0100 Subject: [PATCH 1/5] =?UTF-8?q?fix:=20resolve=20all=20Credo=20warnings=20?= =?UTF-8?q?=E2=80=94=20zero=20issues=20remaining?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add missing Logger metadata keys to config (count, keys, retry, max_retries, type, endpoint, path, manifest_count) - Extract handle_send_failure/4 from Events.Emitter.handle_info to reduce nesting depth from 3 to 2 - Remove redundant last with clause in Occurrence.persist — the final File.write becomes the with body's return value directly - Replace length(list) >= 1 with list != [] in query_test.exs Co-Authored-By: Claude Opus 4.6 --- config/config.exs | 10 ++++++- lib/nopea/events/emitter.ex | 50 +++++++++++++++++--------------- lib/nopea/occurrence.ex | 12 ++++---- test/nopea/memory/query_test.exs | 4 +-- 4 files changed, 43 insertions(+), 33 deletions(-) diff --git a/config/config.exs b/config/config.exs index 4a3ad5c..6ec2e5d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -22,7 +22,15 @@ config :logger, :default_formatter, :node_count, :relationship_count, :auto_selected, - :verified + :verified, + :count, + :keys, + :retry, + :max_retries, + :type, + :endpoint, + :path, + :manifest_count ] import_config "#{config_env()}.exs" diff --git a/lib/nopea/events/emitter.ex b/lib/nopea/events/emitter.ex index 954f96a..818f0bb 100644 --- a/lib/nopea/events/emitter.ex +++ b/lib/nopea/events/emitter.ex @@ -144,35 +144,39 @@ defmodule Nopea.Events.Emitter do {:noreply, new_state} {:error, reason} -> - new_retry_count = item.retry_count + 1 - - if new_retry_count > state.max_retries do - Logger.warning("CDEvent dropped after max retries", - max_retries: state.max_retries, - error: inspect(reason) - ) - - new_state = %{state | queue: rest_queue, dropped_count: state.dropped_count + 1} - schedule_next_process(new_state, 0) - {:noreply, new_state} - else - Logger.debug("CDEvent send failed, retrying", - retry: new_retry_count, - error: inspect(reason) - ) - - updated_item = %{item | retry_count: new_retry_count} - new_state = %{state | queue: :queue.in_r(updated_item, rest_queue)} - delay = backoff_delay(new_retry_count, state.retry_delay_ms) - schedule_next_process(new_state, delay) - {:noreply, new_state} - end + {:noreply, handle_send_failure(item, reason, rest_queue, state)} end end end # Private Functions + defp handle_send_failure(item, reason, rest_queue, state) do + new_retry_count = item.retry_count + 1 + + if new_retry_count > state.max_retries do + Logger.warning("CDEvent dropped after max retries", + max_retries: state.max_retries, + error: inspect(reason) + ) + + new_state = %{state | queue: rest_queue, dropped_count: state.dropped_count + 1} + schedule_next_process(new_state, 0) + new_state + else + Logger.debug("CDEvent send failed, retrying", + retry: new_retry_count, + error: inspect(reason) + ) + + updated_item = %{item | retry_count: new_retry_count} + new_state = %{state | queue: :queue.in_r(updated_item, rest_queue)} + delay = backoff_delay(new_retry_count, state.retry_delay_ms) + schedule_next_process(new_state, delay) + new_state + end + end + defp maybe_start_processing(%{processing: true} = state), do: state defp maybe_start_processing(state) do diff --git a/lib/nopea/occurrence.ex b/lib/nopea/occurrence.ex index af7c9aa..49a0239 100644 --- a/lib/nopea/occurrence.ex +++ b/lib/nopea/occurrence.ex @@ -72,13 +72,11 @@ defmodule Nopea.Occurrence do with {:ok, json} <- FalseProtocol.JSON.encode(occurrence), :ok <- File.mkdir_p(dir), :ok <- File.write(Path.join(dir, "occurrence.json"), json), - :ok <- File.mkdir_p(etf_dir), - :ok <- - File.write( - Path.join(etf_dir, "#{occurrence.id}.etf"), - :erlang.term_to_binary(occurrence) - ) do - :ok + :ok <- File.mkdir_p(etf_dir) do + File.write( + Path.join(etf_dir, "#{occurrence.id}.etf"), + :erlang.term_to_binary(occurrence) + ) end end diff --git a/test/nopea/memory/query_test.exs b/test/nopea/memory/query_test.exs index 17886e6..2cb4c77 100644 --- a/test/nopea/memory/query_test.exs +++ b/test/nopea/memory/query_test.exs @@ -88,7 +88,7 @@ defmodule Nopea.Memory.QueryTest do ctx = Query.deploy_context(graph, "auth-service", "production") # Should have depends_on relationship to namespace - assert length(ctx.dependencies) >= 1 + assert ctx.dependencies != [] ns_dep = Enum.find(ctx.dependencies, fn d -> String.contains?(d.target, "namespace:") end) assert ns_dep != nil end @@ -193,7 +193,7 @@ defmodule Nopea.Memory.QueryTest do service_id = Identity.compute_id(:concept, "risky-svc") recs = Query.recommendations(graph, service_id) - assert length(recs) >= 1 + assert recs != [] assert Enum.any?(recs, fn r -> String.contains?(r, "Deploy with caution") end) end From ed51905113bdf982ef6e93e1c202e94fb1fa4f6d Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sun, 8 Mar 2026 02:25:52 +0100 Subject: [PATCH 2/5] feat: extract Nopea.Surface module and align CLI/MCP/HTTP surfaces MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unified facade backing all user-facing interfaces — CLI, MCP, and HTTP now delegate to Surface for consistent behaviour and graceful degradation when optional subsystems aren't running. - Extract Nopea.Surface with status/context/history/explain/health/services - Rewire CLI to Surface, add explain/health/services/mcp commands - Rewire MCP to Surface, add nopea_services tool (6 tools total) - Add GET /api/status/:service, /api/explain/:service, /api/services - Fix HTTP router to use Deploy.deploy/1 instead of Deploy.run/1 Co-Authored-By: Claude Opus 4.6 --- lib/nopea/api/router.ex | 62 ++++++++++++------ lib/nopea/cli.ex | 122 +++++++++++++++++++++++++++-------- lib/nopea/mcp.ex | 123 ++++++++++++++++++++++++------------ lib/nopea/surface.ex | 123 ++++++++++++++++++++++++++++++++++++ test/nopea/mcp_test.exs | 5 +- test/nopea/surface_test.exs | 85 +++++++++++++++++++++++++ 6 files changed, 432 insertions(+), 88 deletions(-) create mode 100644 lib/nopea/surface.ex create mode 100644 test/nopea/surface_test.exs diff --git a/lib/nopea/api/router.ex b/lib/nopea/api/router.ex index e8cc399..2342432 100644 --- a/lib/nopea/api/router.ex +++ b/lib/nopea/api/router.ex @@ -39,31 +39,53 @@ defmodule Nopea.API.Router do handle_deploy(conn) end + get "/api/status/:service" do + case Nopea.Surface.status(service) do + {:ok, state} -> json(conn, 200, state) + {:error, :not_found} -> json(conn, 404, %{error: "not_found"}) + {:error, :unavailable} -> json(conn, 503, %{error: "unavailable"}) + end + end + get "/api/context/:service" do namespace = conn.params["namespace"] || "default" - - context = - if Process.whereis(Nopea.Memory) do - Nopea.Memory.get_deploy_context(service, namespace) - else - %{known: false} - end - + context = Nopea.Surface.context(service, namespace) json(conn, 200, context) end get "/api/history/:service" do - history = - if Nopea.Cache.available?() do - case Nopea.Cache.get_service_state(service) do - {:ok, state} -> %{service: service, state: state} - {:error, :not_found} -> %{service: service, deployments: []} - end - else - %{service: service, deployments: []} - end - - json(conn, 200, history) + case Nopea.Surface.history(service) do + {:ok, data} -> json(conn, 200, data) + {:error, :not_found} -> json(conn, 200, %{service: service, deployments: []}) + {:error, :unavailable} -> json(conn, 200, %{service: service, deployments: []}) + end + end + + get "/api/explain/:service" do + namespace = conn.params["namespace"] || "default" + explanation = Nopea.Surface.explain(service, namespace) + json(conn, 200, %{service: service, explanation: explanation}) + end + + get "/api/services" do + services = Nopea.Surface.services() + json(conn, 200, %{services: services, count: length(services)}) + end + + post "/api/promote/:deploy_id" do + case Nopea.Surface.promote(deploy_id) do + {:ok, rollout} -> json(conn, 200, Map.from_struct(rollout)) + {:error, :not_found} -> json(conn, 404, %{error: "not_found"}) + {:error, reason} -> json(conn, 500, %{error: inspect(reason)}) + end + end + + post "/api/rollback/:deploy_id" do + case Nopea.Surface.rollback(deploy_id) do + {:ok, rollout} -> json(conn, 200, Map.from_struct(rollout)) + {:error, :not_found} -> json(conn, 404, %{error: "not_found"}) + {:error, reason} -> json(conn, 500, %{error: inspect(reason)}) + end end match _ do @@ -80,7 +102,7 @@ defmodule Nopea.API.Router do strategy: Nopea.Helpers.parse_strategy(params["strategy"]) } - result = Nopea.Deploy.run(spec) + result = Nopea.Deploy.deploy(spec) json(conn, 200, Nopea.Helpers.serialize_deploy_result(result)) _ -> diff --git a/lib/nopea/cli.ex b/lib/nopea/cli.ex index 95d36dc..2fa8e13 100644 --- a/lib/nopea/cli.ex +++ b/lib/nopea/cli.ex @@ -3,12 +3,18 @@ defmodule Nopea.CLI do Escript entry point for Nopea CLI. Commands: - - deploy Deploy manifests to a cluster - - status Show deployment status - - context Show memory context for a service - - history Show deployment history - - memory Show memory graph stats - - serve Start daemon mode (HTTP API) + - deploy Deploy manifests to a cluster + - status Show deployment status + - context Show memory context for a service + - history Show deployment history + - explain Explain strategy selection for a service + - promote Promote an active progressive rollout + - rollback Rollback an active progressive rollout + - health Show system health + - services List known services + - memory Show memory graph stats + - serve Start daemon mode (HTTP API) + - mcp Start MCP server (JSON-RPC over stdin/stdout) """ require Logger @@ -26,17 +32,23 @@ defmodule Nopea.CLI do aliases: [f: :file, s: :service, n: :namespace, j: :json] ) - case args do - ["deploy" | _] -> deploy(opts) - ["status" | rest] -> status(rest, opts) - ["context" | rest] -> context(rest, opts) - ["history" | rest] -> history(rest, opts) - ["memory" | _] -> memory(opts) - ["serve" | _] -> serve(opts) - _ -> usage() - end + dispatch(args, opts) end + defp dispatch(["deploy" | _], opts), do: deploy(opts) + defp dispatch(["status" | rest], opts), do: status(rest, opts) + defp dispatch(["context" | rest], opts), do: context(rest, opts) + defp dispatch(["history" | rest], opts), do: history(rest, opts) + defp dispatch(["explain" | rest], opts), do: explain(rest, opts) + defp dispatch(["health" | _], opts), do: health(opts) + defp dispatch(["services" | _], opts), do: services(opts) + defp dispatch(["memory" | _], opts), do: memory(opts) + defp dispatch(["promote" | rest], opts), do: promote(rest, opts) + defp dispatch(["rollback" | rest], opts), do: do_rollback(rest, opts) + defp dispatch(["serve" | _], opts), do: serve(opts) + defp dispatch(["mcp" | _], _opts), do: mcp() + defp dispatch(_, _opts), do: usage() + defp deploy(opts) do path = Keyword.get(opts, :file) || "." service = Keyword.get(opts, :service) || Path.basename(path) @@ -60,9 +72,10 @@ defmodule Nopea.CLI do error("Usage: nopea status ") end - case Nopea.Cache.get_service_state(service) do + case Nopea.Surface.status(service) do {:ok, state} -> output(state, opts) {:error, :not_found} -> error("Service '#{service}' not found") + {:error, :unavailable} -> error("No status backend available") end end @@ -74,7 +87,7 @@ defmodule Nopea.CLI do error("Usage: nopea context ") end - ctx = Nopea.Memory.get_deploy_context(service, namespace) + ctx = Nopea.Surface.context(service, namespace) output(ctx, opts) end @@ -85,17 +98,66 @@ defmodule Nopea.CLI do error("Usage: nopea history ") end - deploys = Nopea.Cache.list_deployments(service) - output(deploys, opts) + case Nopea.Surface.history(service) do + {:ok, data} -> output(data, opts) + {:error, :not_found} -> error("No history found for '#{service}'") + {:error, :unavailable} -> error("Cache not available") + end end - defp memory(opts) do - stats = %{ - nodes: Nopea.Memory.node_count(), - relationships: Nopea.Memory.relationship_count() - } + defp explain(args, opts) do + service = List.first(args) || Keyword.get(opts, :service) + namespace = Keyword.get(opts, :namespace, "default") + + unless service do + error("Usage: nopea explain ") + end + + result = Nopea.Surface.explain(service, namespace) + output(result, opts) + end + + defp health(opts) do + result = Nopea.Surface.health() + output(result, opts) + end + + defp services(opts) do + result = Nopea.Surface.services() + output(result, opts) + end - output(stats, opts) + defp promote(args, opts) do + deploy_id = List.first(args) + + unless deploy_id do + error("Usage: nopea promote ") + end + + case Nopea.Surface.promote(deploy_id) do + {:ok, rollout} -> output(rollout, opts) + {:error, :not_found} -> error("No active rollout for deploy '#{deploy_id}'") + {:error, reason} -> error("Promote failed: #{inspect(reason)}") + end + end + + defp do_rollback(args, opts) do + deploy_id = List.first(args) + + unless deploy_id do + error("Usage: nopea rollback ") + end + + case Nopea.Surface.rollback(deploy_id) do + {:ok, rollout} -> output(rollout, opts) + {:error, :not_found} -> error("No active rollout for deploy '#{deploy_id}'") + {:error, reason} -> error("Rollback failed: #{inspect(reason)}") + end + end + + defp memory(opts) do + result = Nopea.Surface.health() + output(result.memory, opts) end defp serve(_opts) do @@ -114,6 +176,10 @@ defmodule Nopea.CLI do end end + defp mcp do + Nopea.MCP.serve() + end + defp output(data, opts) do if Keyword.get(opts, :json, false) do IO.puts(Jason.encode!(data, pretty: true)) @@ -136,8 +202,14 @@ defmodule Nopea.CLI do nopea status nopea context [--json] nopea history [--json] + nopea explain [--json] + nopea promote [--json] + nopea rollback [--json] + nopea health [--json] + nopea services [--json] nopea memory [--json] nopea serve + nopea mcp """) end end diff --git a/lib/nopea/mcp.ex b/lib/nopea/mcp.ex index e4a2252..d8fd832 100644 --- a/lib/nopea/mcp.ex +++ b/lib/nopea/mcp.ex @@ -11,7 +11,9 @@ defmodule Nopea.MCP do - `nopea_deploy` — Deploy manifests to a namespace - `nopea_context` — Get memory context for a service - `nopea_history` — Get deployment history + - `nopea_health` — Check health of active service agents - `nopea_explain` — Explain why a strategy was selected + - `nopea_services` — List known services ## Protocol @@ -97,6 +99,37 @@ defmodule Nopea.MCP do }, "required" => ["service"] } + }, + %{ + "name" => "nopea_services", + "description" => "List all known services that have been deployed.", + "inputSchema" => %{ + "type" => "object", + "properties" => %{}, + "required" => [] + } + }, + %{ + "name" => "nopea_promote", + "description" => "Promote an active progressive rollout to completion.", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "deploy_id" => %{"type" => "string", "description" => "Deploy ID of the active rollout"} + }, + "required" => ["deploy_id"] + } + }, + %{ + "name" => "nopea_rollback", + "description" => "Rollback an active progressive rollout.", + "inputSchema" => %{ + "type" => "object", + "properties" => %{ + "deploy_id" => %{"type" => "string", "description" => "Deploy ID of the active rollout"} + }, + "required" => ["deploy_id"] + } } ] @@ -176,33 +209,27 @@ defmodule Nopea.MCP do end end - # Tool implementations + # Tool implementations — delegate to Surface defp call_tool("nopea_context", args) do service = args["service"] namespace = args["namespace"] || "default" - - if Process.whereis(Nopea.Memory) do - context = Nopea.Memory.get_deploy_context(service, namespace) - {:ok, Jason.encode!(context, pretty: true)} - else - {:ok, Jason.encode!(%{known: false, message: "Memory not available"})} - end + context = Nopea.Surface.context(service, namespace) + {:ok, Jason.encode!(context, pretty: true)} end defp call_tool("nopea_history", args) do service = args["service"] - if Nopea.Cache.available?() do - case Nopea.Cache.get_service_state(service) do - {:ok, state} -> - {:ok, Jason.encode!(%{service: service, state: state}, pretty: true)} + case Nopea.Surface.history(service) do + {:ok, data} -> + {:ok, Jason.encode!(data, pretty: true)} - {:error, :not_found} -> - {:ok, Jason.encode!(%{service: service, deployments: [], message: "No history found"})} - end - else - {:ok, Jason.encode!(%{service: service, deployments: [], message: "Cache not available"})} + {:error, :not_found} -> + {:ok, Jason.encode!(%{service: service, deployments: [], message: "No history found"})} + + {:error, :unavailable} -> + {:ok, Jason.encode!(%{service: service, deployments: [], message: "Cache not available"})} end end @@ -236,16 +263,19 @@ defmodule Nopea.MCP do defp call_tool("nopea_health", args) do case args["service"] do nil -> - agents = Nopea.ServiceAgent.health() - {:ok, Jason.encode!(%{agents: agents, count: length(agents)}, pretty: true)} + health = Nopea.Surface.health() + {:ok, Jason.encode!(health, pretty: true)} service -> - case Nopea.ServiceAgent.status(service) do + case Nopea.Surface.status(service) do {:ok, status} -> {:ok, Jason.encode!(status, pretty: true)} {:error, :not_found} -> {:ok, Jason.encode!(%{service: service, message: "No active agent"}, pretty: true)} + + {:error, :unavailable} -> + {:ok, Jason.encode!(%{service: service, message: "No active agent"}, pretty: true)} end end end @@ -253,39 +283,48 @@ defmodule Nopea.MCP do defp call_tool("nopea_explain", args) do service = args["service"] namespace = args["namespace"] || "default" + {:ok, Nopea.Surface.explain(service, namespace)} + end - if Process.whereis(Nopea.Memory) do - context = Nopea.Memory.get_deploy_context(service, namespace) - {:ok, explain_strategy(service, namespace, context)} - else - {:ok, "Memory not available. Would use direct strategy by default."} - end + defp call_tool("nopea_services", _args) do + services = Nopea.Surface.services() + {:ok, Jason.encode!(%{services: services, count: length(services)}, pretty: true)} end - defp call_tool(name, _args) do - {:error, "Unknown tool: #{name}"} + defp call_tool("nopea_promote", args) do + deploy_id = args["deploy_id"] + + case Nopea.Surface.promote(deploy_id) do + {:ok, rollout} -> + {:ok, Jason.encode!(Map.from_struct(rollout), pretty: true)} + + {:error, :not_found} -> + {:error, "No active rollout for deploy '#{deploy_id}'"} + + {:error, reason} -> + {:error, "Promote failed: #{inspect(reason)}"} + end end - defp explain_strategy(service, namespace, context) do - cond do - not context.known -> - "No deployment history for #{service}/#{namespace}. Would use direct strategy (default for unknown services)." + defp call_tool("nopea_rollback", args) do + deploy_id = args["deploy_id"] - Enum.any?(context.failure_patterns, fn p -> p.confidence > 0.15 end) -> - patterns = - Enum.map_join(context.failure_patterns, ", ", fn p -> - "#{p.error} (confidence: #{Float.round(p.confidence, 2)})" - end) + case Nopea.Surface.rollback(deploy_id) do + {:ok, rollout} -> + {:ok, Jason.encode!(Map.from_struct(rollout), pretty: true)} - "Failure patterns detected for #{service}/#{namespace}: #{patterns}. " <> - "Use canary or blue_green strategy — Kulta will handle progressive delivery." + {:error, :not_found} -> + {:error, "No active rollout for deploy '#{deploy_id}'"} - true -> - "Would use direct strategy for #{service}/#{namespace}. " <> - "No significant failure patterns detected. Service is known and stable." + {:error, reason} -> + {:error, "Rollback failed: #{inspect(reason)}"} end end + defp call_tool(name, _args) do + {:error, "Unknown tool: #{name}"} + end + defp success_response(id, result) do %{"jsonrpc" => "2.0", "id" => id, "result" => result} end diff --git a/lib/nopea/surface.ex b/lib/nopea/surface.ex new file mode 100644 index 0000000..6d99b7f --- /dev/null +++ b/lib/nopea/surface.ex @@ -0,0 +1,123 @@ +defmodule Nopea.Surface do + @moduledoc """ + Shared domain logic backing CLI, MCP, and HTTP surfaces. + + Every user-facing interface delegates here. This guarantees + consistent behaviour and graceful degradation when optional + subsystems (Memory, Cache, Registry) are not running. + """ + + @spec status(String.t()) :: {:ok, map()} | {:error, :not_found | :unavailable} + def status(service) do + cond do + Process.whereis(Nopea.ServiceAgent.Supervisor) != nil -> + Nopea.ServiceAgent.status(service) + + Nopea.Cache.available?() -> + Nopea.Cache.get_service_state(service) + + true -> + {:error, :unavailable} + end + end + + @spec context(String.t(), String.t()) :: map() + def context(service, namespace \\ "default") do + if Process.whereis(Nopea.Memory) do + Nopea.Memory.get_deploy_context(service, namespace) + else + %{known: false, message: "Memory not available"} + end + end + + @spec history(String.t()) :: {:ok, map()} | {:error, :not_found | :unavailable} + def history(service) do + if Nopea.Cache.available?() do + case Nopea.Cache.get_service_state(service) do + {:ok, state} -> {:ok, %{service: service, state: state}} + {:error, :not_found} -> {:error, :not_found} + end + else + {:error, :unavailable} + end + end + + @spec explain(String.t(), String.t()) :: String.t() + def explain(service, namespace \\ "default") do + if Process.whereis(Nopea.Memory) do + context = Nopea.Memory.get_deploy_context(service, namespace) + explain_strategy(service, namespace, context) + else + "Memory not available. Would use direct strategy by default." + end + end + + @spec health() :: map() + def health do + agents = + if Process.whereis(Nopea.ServiceAgent.Supervisor) != nil do + Nopea.ServiceAgent.health() + else + [] + end + + memory = + if Process.whereis(Nopea.Memory) do + %{ + nodes: Nopea.Memory.node_count(), + relationships: Nopea.Memory.relationship_count() + } + else + %{status: :not_running} + end + + %{agents: agents, agent_count: length(agents), memory: memory} + end + + @spec services() :: [String.t()] + def services do + if Nopea.Cache.available?() do + Nopea.Cache.list_services() + else + [] + end + end + + @spec promote(String.t()) :: {:ok, Nopea.Progressive.Rollout.t()} | {:error, term()} + def promote(deploy_id) do + Nopea.Progressive.Monitor.promote(deploy_id) + end + + @spec rollback(String.t()) :: {:ok, Nopea.Progressive.Rollout.t()} | {:error, term()} + def rollback(deploy_id) do + Nopea.Progressive.Monitor.rollback(deploy_id) + end + + @spec rollout_status(String.t()) :: {:ok, Nopea.Progressive.Rollout.t()} | {:error, :not_found} + def rollout_status(deploy_id) do + Nopea.Progressive.Monitor.status(deploy_id) + end + + # Private + + defp explain_strategy(service, namespace, context) do + cond do + not context.known -> + "No deployment history for #{service}/#{namespace}. " <> + "Would use direct strategy (default for unknown services)." + + Enum.any?(context.failure_patterns, fn p -> p.confidence > 0.15 end) -> + patterns = + Enum.map_join(context.failure_patterns, ", ", fn p -> + "#{p.error} (confidence: #{Float.round(p.confidence, 2)})" + end) + + "Failure patterns detected for #{service}/#{namespace}: #{patterns}. " <> + "Use canary or blue_green strategy — Kulta will handle progressive delivery." + + true -> + "Would use direct strategy for #{service}/#{namespace}. " <> + "No significant failure patterns detected. Service is known and stable." + end + end +end diff --git a/test/nopea/mcp_test.exs b/test/nopea/mcp_test.exs index 02fb8ce..62e8f7c 100644 --- a/test/nopea/mcp_test.exs +++ b/test/nopea/mcp_test.exs @@ -37,12 +37,15 @@ defmodule Nopea.MCPTest do assert is_list(tools) tool_names = Enum.map(tools, & &1["name"]) - assert length(tool_names) == 5 + assert length(tool_names) == 8 assert "nopea_deploy" in tool_names assert "nopea_context" in tool_names assert "nopea_history" in tool_names assert "nopea_health" in tool_names assert "nopea_explain" in tool_names + assert "nopea_services" in tool_names + assert "nopea_promote" in tool_names + assert "nopea_rollback" in tool_names end end diff --git a/test/nopea/surface_test.exs b/test/nopea/surface_test.exs new file mode 100644 index 0000000..a8ebf0a --- /dev/null +++ b/test/nopea/surface_test.exs @@ -0,0 +1,85 @@ +defmodule Nopea.SurfaceTest do + use ExUnit.Case + + import Mox + + alias Nopea.Surface + + setup :verify_on_exit! + + setup do + Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + + Mox.stub(Nopea.K8sMock, :get_resource, fn _, _, _, _ -> + {:error, :not_found} + end) + + start_supervised!({Nopea.Memory, []}) + start_supervised!(Nopea.Cache) + :ok + end + + describe "context/2" do + test "returns context from Memory" do + ctx = Surface.context("test-svc", "default") + assert is_map(ctx) + assert Map.has_key?(ctx, :known) + end + end + + describe "explain/2" do + test "returns explanation string for unknown service" do + result = Surface.explain("unknown-svc", "default") + assert result =~ "No deployment history" + assert result =~ "direct strategy" + end + end + + describe "health/0" do + test "returns health with memory stats" do + result = Surface.health() + assert is_list(result.agents) + assert result.agent_count == 0 + assert is_integer(result.memory.nodes) + assert is_integer(result.memory.relationships) + end + end + + describe "services/0" do + test "returns empty list when no services deployed" do + assert Surface.services() == [] + end + + test "returns services after cache population" do + Nopea.Cache.put_service_state("svc-a", %{status: :completed}) + Nopea.Cache.put_service_state("svc-b", %{status: :completed}) + + services = Surface.services() + assert "svc-a" in services + assert "svc-b" in services + end + end + + describe "status/1" do + test "returns error when service not found" do + assert {:error, _} = Surface.status("nonexistent") + end + + test "returns status from cache" do + Nopea.Cache.put_service_state("cached-svc", %{status: :completed, last_deploy: "abc"}) + assert {:ok, state} = Surface.status("cached-svc") + assert state.status == :completed + end + end + + describe "history/1" do + test "returns error for unknown service" do + assert {:error, :not_found} = Surface.history("unknown") + end + + test "returns history from cache" do + Nopea.Cache.put_service_state("hist-svc", %{status: :completed}) + assert {:ok, %{service: "hist-svc", state: _}} = Surface.history("hist-svc") + end + end +end From 538f17eab0aa420bd476a459efb67f3eeac7866f Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sun, 8 Mar 2026 02:26:09 +0100 Subject: [PATCH 3/5] feat: persist memory graph to disk (.nopea/graph.etf) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Graph survives restarts — persisted as version-prefixed ETF binary. Restore chain: ETS snapshot → disk → fresh graph. - Add workdir to Memory GenServer state - Persist after record_deploy, decay, and terminate/2 - Version byte prefix (<<1>>) for future format migrations - :erlang.binary_to_term with [:safe] prevents code execution - Update existing tests to use tmp_dir for isolation Co-Authored-By: Claude Opus 4.6 --- lib/nopea/memory.ex | 53 +++++++++++++- test/nopea/application_test.exs | 10 +-- test/nopea/memory_persistence_test.exs | 96 ++++++++++++++++++++++++++ test/nopea/memory_test.exs | 8 ++- 4 files changed, 157 insertions(+), 10 deletions(-) create mode 100644 test/nopea/memory_persistence_test.exs diff --git a/lib/nopea/memory.ex b/lib/nopea/memory.ex index 91eedc4..d04a44d 100644 --- a/lib/nopea/memory.ex +++ b/lib/nopea/memory.ex @@ -9,6 +9,11 @@ defmodule Nopea.Memory do The graph enables context-aware deployments: "auth-service deploys fail when redis is also updating" (0.85 confidence, seen 4 times). + + ## Persistence + + Graph is persisted to `.nopea/graph.etf` alongside ETS snapshots. + On startup, restore order: ETS snapshot → disk → fresh graph. """ use GenServer @@ -16,10 +21,11 @@ defmodule Nopea.Memory do alias Nopea.Graph.Graph - defstruct [:graph, :decay_timer] + defstruct [:graph, :decay_timer, :workdir] @decay_interval_ms :timer.hours(1) @decay_factor 0.98 + @graph_version <<1>> # Client API @@ -56,7 +62,8 @@ defmodule Nopea.Memory do @impl true def init(opts) do - graph = restore_snapshot(opts) || Graph.new() + workdir = Keyword.get(opts, :workdir, File.cwd!()) + graph = restore_snapshot(opts) || restore_from_disk(workdir) || Graph.new() timer = schedule_decay() Logger.info("Memory started", @@ -64,7 +71,7 @@ defmodule Nopea.Memory do relationship_count: Graph.relationship_count(graph) ) - {:ok, %__MODULE__{graph: graph, decay_timer: timer}} + {:ok, %__MODULE__{graph: graph, decay_timer: timer, workdir: workdir}} end @impl true @@ -90,6 +97,7 @@ defmodule Nopea.Memory do try do graph = Nopea.Memory.Ingestor.ingest(state.graph, deploy_result) snapshot_graph(graph) + persist_to_disk(graph, state.workdir) {:noreply, %{state | graph: graph}} rescue error -> @@ -106,12 +114,19 @@ defmodule Nopea.Memory do def handle_info(:decay, state) do graph = Graph.decay_all(state.graph, @decay_factor) timer = schedule_decay() + persist_to_disk(graph, state.workdir) Logger.debug("Memory decay applied", node_count: Graph.node_count(graph)) {:noreply, %{state | graph: graph, decay_timer: timer}} end + @impl true + def terminate(_reason, state) do + persist_to_disk(state.graph, state.workdir) + :ok + end + # Private defp schedule_decay do @@ -144,4 +159,36 @@ defmodule Nopea.Memory do Nopea.Cache.put_graph_snapshot(binary) end end + + defp persist_to_disk(graph, workdir) do + path = graph_path(workdir) + + with :ok <- File.mkdir_p(Path.dirname(path)) do + binary = @graph_version <> :erlang.term_to_binary(graph) + File.write(path, binary) + end + rescue + error -> + Logger.warning("Failed to persist graph to disk", + error: inspect(error) + ) + end + + defp restore_from_disk(workdir) do + path = graph_path(workdir) + + case File.read(path) do + {:ok, <<1, rest::binary>>} -> + :erlang.binary_to_term(rest, [:safe]) + + _ -> + nil + end + rescue + _ -> nil + end + + defp graph_path(workdir) do + Path.join([workdir, ".nopea", "graph.etf"]) + end end diff --git a/test/nopea/application_test.exs b/test/nopea/application_test.exs index 1863bf1..10668f4 100644 --- a/test/nopea/application_test.exs +++ b/test/nopea/application_test.exs @@ -1,22 +1,24 @@ defmodule Nopea.ApplicationTest do use ExUnit.Case, async: false + @moduletag :tmp_dir + describe "supervision tree ordering" do - test "Cache is available before Memory starts and can restore snapshot" do + test "Cache is available before Memory starts and can restore snapshot", %{tmp_dir: tmp_dir} do # Start Cache first (as the fixed supervision tree does) cache_pid = start_supervised!(Nopea.Cache) assert Process.alive?(cache_pid) assert Nopea.Cache.available?() # Now start Memory — it calls restore_snapshot which needs Cache - memory_pid = start_supervised!({Nopea.Memory, []}) + memory_pid = start_supervised!({Nopea.Memory, workdir: tmp_dir}) assert Process.alive?(memory_pid) # Memory should be functional assert Nopea.Memory.node_count() == 0 end - test "Memory can restore snapshot from Cache when started after Cache" do + test "Memory can restore snapshot from Cache when started after Cache", %{tmp_dir: tmp_dir} do start_supervised!(Nopea.Cache) # Store a graph snapshot in cache @@ -29,7 +31,7 @@ defmodule Nopea.ApplicationTest do Nopea.Cache.put_graph_snapshot(binary) # Memory should restore this snapshot on init - start_supervised!({Nopea.Memory, []}) + start_supervised!({Nopea.Memory, workdir: tmp_dir}) assert Nopea.Memory.node_count() == 1 end end diff --git a/test/nopea/memory_persistence_test.exs b/test/nopea/memory_persistence_test.exs new file mode 100644 index 0000000..a89a1a0 --- /dev/null +++ b/test/nopea/memory_persistence_test.exs @@ -0,0 +1,96 @@ +defmodule Nopea.MemoryPersistenceTest do + use ExUnit.Case + + import Mox + + alias Nopea.Memory + alias Nopea.Test.Factory + + @moduletag :tmp_dir + + setup :verify_on_exit! + + setup do + Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + + Mox.stub(Nopea.K8sMock, :get_resource, fn _, _, _, _ -> + {:error, :not_found} + end) + + :ok + end + + describe "disk persistence" do + test "persists graph to .nopea/graph.etf after record_deploy", %{tmp_dir: tmp_dir} do + pid = start_supervised!({Memory, workdir: tmp_dir}) + assert is_pid(pid) + + Memory.record_deploy(Factory.build_result(service: "persist-svc")) + _ = Memory.node_count() + + graph_path = Path.join([tmp_dir, ".nopea", "graph.etf"]) + assert File.exists?(graph_path) + + # Verify file format: version byte + ETF binary + {:ok, <<1, rest::binary>>} = File.read(graph_path) + graph = :erlang.binary_to_term(rest, [:safe]) + assert is_struct(graph) + end + + test "restores graph from disk on restart", %{tmp_dir: tmp_dir} do + # Start Memory, record a deploy, stop it + pid1 = start_supervised!({Memory, workdir: tmp_dir}, id: :mem1) + assert is_pid(pid1) + + Memory.record_deploy(Factory.build_result(service: "restore-svc", namespace: "prod")) + _ = Memory.node_count() + + # Get context before stop + ctx_before = Memory.get_deploy_context("restore-svc", "prod") + assert ctx_before.known == true + + stop_supervised!(:mem1) + + # Restart without ETS (no Cache) — should restore from disk + pid2 = start_supervised!({Memory, workdir: tmp_dir}, id: :mem2) + assert is_pid(pid2) + + ctx_after = Memory.get_deploy_context("restore-svc", "prod") + assert ctx_after.known == true + end + + test "starts with fresh graph when no disk file exists", %{tmp_dir: tmp_dir} do + pid = start_supervised!({Memory, workdir: tmp_dir}) + assert is_pid(pid) + + assert Memory.node_count() == 0 + assert Memory.relationship_count() == 0 + end + + test "handles corrupted disk file gracefully", %{tmp_dir: tmp_dir} do + graph_path = Path.join([tmp_dir, ".nopea", "graph.etf"]) + File.mkdir_p!(Path.dirname(graph_path)) + File.write!(graph_path, "corrupted data") + + pid = start_supervised!({Memory, workdir: tmp_dir}) + assert is_pid(pid) + + # Should start with fresh graph + assert Memory.node_count() == 0 + end + + test "persists after terminate callback", %{tmp_dir: tmp_dir} do + pid = start_supervised!({Memory, workdir: tmp_dir}, id: :mem_term) + assert is_pid(pid) + + Memory.record_deploy(Factory.build_result(service: "term-svc")) + _ = Memory.node_count() + + # Stop gracefully — terminate/2 should persist + stop_supervised!(:mem_term) + + graph_path = Path.join([tmp_dir, ".nopea", "graph.etf"]) + assert File.exists?(graph_path) + end + end +end diff --git a/test/nopea/memory_test.exs b/test/nopea/memory_test.exs index b04b2c8..ad76a6a 100644 --- a/test/nopea/memory_test.exs +++ b/test/nopea/memory_test.exs @@ -3,9 +3,11 @@ defmodule Nopea.MemoryTest do alias Nopea.Memory - setup do - # ULID is started by the application; just ensure Memory is fresh - pid = start_supervised!({Memory, []}) + @moduletag :tmp_dir + + setup %{tmp_dir: tmp_dir} do + # Use tmp_dir as workdir to avoid picking up stale graph files + pid = start_supervised!({Memory, workdir: tmp_dir}) %{pid: pid} end From 9ed2280f7a23416ae2c737109206e51af3fdd426 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sun, 8 Mar 2026 02:26:24 +0100 Subject: [PATCH 4/5] =?UTF-8?q?feat:=20progressive=20delivery=20=E2=80=94?= =?UTF-8?q?=20canary/blue=5Fgreen=20return=20:progressing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Canary and blue_green deploys now honestly report :progressing instead of :completed. A per-rollout Monitor polls the Kulta Rollout CRD and supports manual promote/rollback from all surfaces. - Add :progressing status to Result, add Result.progressing/5 - Add Progressive.Rollout struct for rollout lifecycle tracking - Add patch_resource to K8s Behaviour for promote/rollback - Implement Progressive.Monitor GenServer (poll/promote/rollback) - Add Progressive.Supervisor (DynamicSupervisor) to OTP tree - Integrate progressive flow into Deploy pipeline - Add promote/rollback to Surface, CLI, MCP, and HTTP - MCP now exposes 8 tools (added nopea_promote, nopea_rollback) Co-Authored-By: Claude Opus 4.6 --- lib/nopea/application.ex | 7 + lib/nopea/deploy.ex | 58 ++++- lib/nopea/deploy/result.ex | 18 +- lib/nopea/k8s.ex | 21 ++ lib/nopea/k8s/behaviour.ex | 8 + lib/nopea/progressive/monitor.ex | 288 ++++++++++++++++++++++++ lib/nopea/progressive/rollout.ex | 50 ++++ lib/nopea/progressive/supervisor.ex | 28 +++ test/nopea/deploy_progressive_test.exs | 116 ++++++++++ test/nopea/deploy_test.exs | 4 +- test/nopea/progressive/monitor_test.exs | 151 +++++++++++++ 11 files changed, 743 insertions(+), 6 deletions(-) create mode 100644 lib/nopea/progressive/monitor.ex create mode 100644 lib/nopea/progressive/rollout.ex create mode 100644 lib/nopea/progressive/supervisor.ex create mode 100644 test/nopea/deploy_progressive_test.exs create mode 100644 test/nopea/progressive/monitor_test.exs diff --git a/lib/nopea/application.ex b/lib/nopea/application.ex index 29b8c8b..e9e7f34 100644 --- a/lib/nopea/application.ex +++ b/lib/nopea/application.ex @@ -29,6 +29,7 @@ defmodule Nopea.Application do |> add_cluster_child(cluster_enabled) |> add_registry_child(cluster_enabled) |> add_service_agent_child() + |> add_progressive_child() |> add_router_child() opts = [strategy: :one_for_one, name: Nopea.AppSupervisor] @@ -87,6 +88,12 @@ defmodule Nopea.Application do else: children end + defp add_progressive_child(children) do + if Application.get_env(:nopea, :enable_deploy_supervisor, true), + do: children ++ [Nopea.Progressive.Supervisor], + else: children + end + defp add_router_child(children) do if Application.get_env(:nopea, :enable_router, false), do: children ++ [Nopea.API.Router], diff --git a/lib/nopea/deploy.ex b/lib/nopea/deploy.ex index 86ee106..3b29a32 100644 --- a/lib/nopea/deploy.ex +++ b/lib/nopea/deploy.ex @@ -56,6 +56,21 @@ defmodule Nopea.Deploy do # 4. Execute case execute_strategy(strategy, spec) do + {:ok, {applied, :progressing}} -> + duration_ms = duration_ms(start_time) + result = Result.progressing(deploy_id, spec, strategy, applied, duration_ms) + record_outcome(result, context) + start_progressive_monitor(deploy_id, spec, strategy) + + Logger.info("Deploy progressing", + service: spec.service, + deploy_id: deploy_id, + strategy: strategy, + duration_ms: duration_ms + ) + + result + {:ok, applied} -> duration_ms = duration_ms(start_time) @@ -100,6 +115,7 @@ defmodule Nopea.Deploy do case result.status do :completed -> {:ok, result} + :progressing -> {:ok, result} :failed -> {:error, result.error} end end @@ -143,9 +159,8 @@ defmodule Nopea.Deploy do defp execute_strategy(strategy, spec) when strategy in [:canary, :blue_green] do case Nopea.Kulta.RolloutBuilder.build(spec, strategy) do {:ok, rollout} -> - k8s_module().apply_manifest(rollout, spec.namespace) - |> case do - {:ok, applied} -> {:ok, [applied]} + case k8s_module().apply_manifest(rollout, spec.namespace) do + {:ok, applied} -> {:ok, {[applied], :progressing}} {:error, _} = error -> error end @@ -317,6 +332,24 @@ defmodule Nopea.Deploy do end end + defp emit_status_log(emitter, %{status: :progressing} = result) do + case FalseProtocol.LogEmitter.info_full( + emitter, + "deploy progressing via #{result.strategy}", + %FalseProtocol.Semantic{ + event: "deploy.apply.progressing", + what_happened: "#{result.service} progressive delivery started", + parameters: %{"strategy" => result.strategy, "duration_ms" => result.duration_ms} + } + ) do + {:ok, _entry} -> + :ok + + {:error, reason} -> + Logger.warning("Failed to emit deploy progressing log", reason: inspect(reason)) + end + end + defp emit_status_log(emitter, %{status: :rolledback} = result) do case FalseProtocol.LogEmitter.emit( emitter, @@ -406,6 +439,25 @@ defmodule Nopea.Deploy do end end + defp start_progressive_monitor(deploy_id, spec, strategy) do + if Process.whereis(Nopea.Progressive.Supervisor) do + case Nopea.Progressive.Supervisor.start_monitor(deploy_id, spec, strategy) do + {:ok, _pid} -> + Logger.info("Progressive monitor started", + deploy_id: deploy_id, + service: spec.service, + strategy: strategy + ) + + {:error, reason} -> + Logger.error("Failed to start progressive monitor", + deploy_id: deploy_id, + error: inspect(reason) + ) + end + end + end + defp emitter_running?, do: Process.whereis(Nopea.Events.Emitter) != nil defp get_concurrent_services(current_service) do diff --git a/lib/nopea/deploy/result.ex b/lib/nopea/deploy/result.ex index 98c6bf5..bcd739b 100644 --- a/lib/nopea/deploy/result.ex +++ b/lib/nopea/deploy/result.ex @@ -12,7 +12,7 @@ defmodule Nopea.Deploy.Result do deploy_id: String.t(), service: String.t(), namespace: String.t(), - status: :completed | :failed | :rolledback, + status: :completed | :failed | :rolledback | :progressing, strategy: strategy(), manifest_count: non_neg_integer(), duration_ms: non_neg_integer(), @@ -76,6 +76,22 @@ defmodule Nopea.Deploy.Result do } end + @spec progressing(String.t(), Nopea.Deploy.Spec.t(), strategy(), [map()], non_neg_integer()) :: + t() + def progressing(deploy_id, spec, strategy, applied, duration_ms) do + %__MODULE__{ + deploy_id: deploy_id, + service: spec.service, + namespace: spec.namespace, + status: :progressing, + strategy: strategy, + manifest_count: length(spec.manifests), + duration_ms: duration_ms, + applied_resources: applied, + timestamp: DateTime.utc_now() + } + end + @spec rolledback(String.t(), Nopea.Deploy.Spec.t(), strategy(), term(), non_neg_integer()) :: t() def rolledback(deploy_id, spec, strategy, error, duration_ms) do diff --git a/lib/nopea/k8s.ex b/lib/nopea/k8s.ex index 3e65e33..1617b23 100644 --- a/lib/nopea/k8s.ex +++ b/lib/nopea/k8s.ex @@ -66,4 +66,25 @@ defmodule Nopea.K8s do end end end + + @impl true + @spec patch_resource(String.t(), String.t(), String.t(), String.t(), map()) :: + {:ok, map()} | {:error, term()} + def patch_resource(api_version, kind, name, namespace, patch) do + with {:ok, conn} <- conn() do + resource = + Map.merge(patch, %{ + "apiVersion" => api_version, + "kind" => kind, + "metadata" => + Map.merge(patch["metadata"] || %{}, %{ + "name" => name, + "namespace" => namespace + }) + }) + + operation = K8s.Client.patch(resource) + K8s.Client.run(conn, operation) + end + end end diff --git a/lib/nopea/k8s/behaviour.ex b/lib/nopea/k8s/behaviour.ex index 9ab3bca..0b29eb9 100644 --- a/lib/nopea/k8s/behaviour.ex +++ b/lib/nopea/k8s/behaviour.ex @@ -26,4 +26,12 @@ defmodule Nopea.K8s.Behaviour do name :: String.t(), namespace :: String.t() ) :: :ok | {:error, term()} + + @callback patch_resource( + api_version :: String.t(), + kind :: String.t(), + name :: String.t(), + namespace :: String.t(), + patch :: map() + ) :: {:ok, map()} | {:error, term()} end diff --git a/lib/nopea/progressive/monitor.ex b/lib/nopea/progressive/monitor.ex new file mode 100644 index 0000000..7d19318 --- /dev/null +++ b/lib/nopea/progressive/monitor.ex @@ -0,0 +1,288 @@ +defmodule Nopea.Progressive.Monitor do + @moduledoc """ + Per-rollout GenServer that monitors a progressive delivery rollout. + + Polls the Kulta Rollout CRD status and updates the rollout phase. + Supports manual promote (advance/complete) and rollback (abort). + Self-terminates on completion, failure, or timeout. + """ + + use GenServer + require Logger + + alias Nopea.Progressive.Rollout + + @poll_interval_ms 10_000 + @max_duration_ms 3_600_000 + + @kulta_api_version "kulta.io/v1alpha1" + @kulta_kind "Rollout" + + defstruct [:rollout, :spec, :poll_timer, :deadline] + + # Client API + + @spec start_link({String.t(), Nopea.Deploy.Spec.t(), :canary | :blue_green}) :: + GenServer.on_start() + def start_link({deploy_id, spec, strategy}) do + GenServer.start_link(__MODULE__, {deploy_id, spec, strategy}, name: via(deploy_id)) + end + + @spec promote(String.t()) :: {:ok, Rollout.t()} | {:error, term()} + def promote(deploy_id) do + GenServer.call(via(deploy_id), :promote) + catch + :exit, _ -> {:error, :not_found} + end + + @spec rollback(String.t()) :: {:ok, Rollout.t()} | {:error, term()} + def rollback(deploy_id) do + GenServer.call(via(deploy_id), :rollback) + catch + :exit, _ -> {:error, :not_found} + end + + @spec status(String.t()) :: {:ok, Rollout.t()} | {:error, :not_found} + def status(deploy_id) do + GenServer.call(via(deploy_id), :status) + catch + :exit, _ -> {:error, :not_found} + end + + @doc false + @spec whereis(String.t()) :: pid() | nil + def whereis(deploy_id) do + case Registry.lookup(Nopea.Registry, {:rollout, deploy_id}) do + [{pid, _}] -> pid + [] -> nil + end + end + + @spec list_active() :: [Rollout.t()] + def list_active do + if Process.whereis(Nopea.Registry) do + Registry.select(Nopea.Registry, [ + {{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]} + ]) + |> Enum.flat_map(fn + {{:rollout, _deploy_id}, pid} -> + try do + case GenServer.call(pid, :status, 5_000) do + {:ok, rollout} -> [rollout] + _ -> [] + end + catch + :exit, _ -> [] + end + + _ -> + [] + end) + else + [] + end + end + + # Server + + @impl true + def init({deploy_id, spec, strategy}) do + rollout = Rollout.new(deploy_id, spec.service, spec.namespace, strategy) + deadline = System.monotonic_time(:millisecond) + @max_duration_ms + + Logger.info("Progressive monitor started", + deploy_id: deploy_id, + service: spec.service, + strategy: strategy + ) + + timer = schedule_poll() + + {:ok, + %__MODULE__{ + rollout: rollout, + spec: spec, + poll_timer: timer, + deadline: deadline + }} + end + + @impl true + def handle_call(:status, _from, state) do + {:reply, {:ok, state.rollout}, state} + end + + def handle_call(:promote, _from, state) do + case do_promote(state) do + {:ok, rollout} -> + state = %{state | rollout: rollout} + + if terminal?(rollout.phase) do + {:stop, :normal, {:ok, rollout}, state} + else + {:reply, {:ok, rollout}, state} + end + + {:error, _} = error -> + {:reply, error, state} + end + end + + def handle_call(:rollback, _from, state) do + case do_rollback(state) do + {:ok, rollout} -> + state = %{state | rollout: rollout} + {:stop, :normal, {:ok, rollout}, state} + + {:error, _} = error -> + {:reply, error, state} + end + end + + @impl true + def handle_info(:poll, state) do + now_ms = System.monotonic_time(:millisecond) + + if now_ms >= state.deadline do + Logger.warning("Progressive rollout timed out", + deploy_id: state.rollout.deploy_id, + service: state.rollout.service + ) + + rollout = update_phase(state.rollout, :failed) + record_outcome(rollout) + {:stop, :normal, %{state | rollout: rollout}} + else + state = poll_rollout_status(state) + + if terminal?(state.rollout.phase) do + record_outcome(state.rollout) + {:stop, :normal, state} + else + timer = schedule_poll() + {:noreply, %{state | poll_timer: timer}} + end + end + end + + # Private + + defp via(deploy_id) do + {:via, Registry, {Nopea.Registry, {:rollout, deploy_id}}} + end + + defp schedule_poll do + Process.send_after(self(), :poll, @poll_interval_ms) + end + + defp poll_rollout_status(state) do + rollout = state.rollout + + case k8s_module().get_resource( + @kulta_api_version, + @kulta_kind, + rollout.service, + rollout.namespace + ) do + {:ok, resource} -> + new_rollout = parse_rollout_status(rollout, resource) + %{state | rollout: new_rollout} + + {:error, reason} -> + Logger.warning("Failed to poll rollout status", + deploy_id: rollout.deploy_id, + error: inspect(reason) + ) + + state + end + end + + @phase_map %{ + "healthy" => :completed, + "completed" => :completed, + "degraded" => :degraded, + "paused" => :paused, + "failed" => :failed + } + + defp parse_rollout_status(rollout, resource) do + status = get_in(resource, ["status"]) || %{} + phase_str = status["phase"] || "Progressing" + phase = Map.get(@phase_map, String.downcase(phase_str), :progressing) + + %{ + rollout + | phase: phase, + current_step: status["currentStepIndex"] || rollout.current_step, + total_steps: status["totalSteps"] || rollout.total_steps, + updated_at: DateTime.utc_now() + } + end + + defp do_promote(state) do + rollout = state.rollout + + patch = %{ + "metadata" => %{ + "annotations" => %{"kulta.io/promote" => "true"} + } + } + + case k8s_module().patch_resource( + @kulta_api_version, + @kulta_kind, + rollout.service, + rollout.namespace, + patch + ) do + {:ok, _} -> + {:ok, update_phase(rollout, :promoted)} + + {:error, _} = error -> + error + end + end + + defp do_rollback(state) do + rollout = state.rollout + + case k8s_module().delete_resource( + @kulta_api_version, + @kulta_kind, + rollout.service, + rollout.namespace + ) do + :ok -> + {:ok, update_phase(rollout, :failed)} + + {:error, _} = error -> + error + end + end + + defp update_phase(rollout, phase) do + %{rollout | phase: phase, updated_at: DateTime.utc_now()} + end + + defp terminal?(phase), do: phase in [:completed, :promoted, :failed] + + defp record_outcome(rollout) do + if Process.whereis(Nopea.Memory) do + status = if rollout.phase in [:completed, :promoted], do: :completed, else: :failed + + Nopea.Memory.record_deploy(%{ + service: rollout.service, + namespace: rollout.namespace, + status: status, + error: if(status == :failed, do: :rollout_failed), + duration_ms: DateTime.diff(rollout.updated_at, rollout.started_at, :millisecond), + concurrent_deploys: [] + }) + end + end + + defp k8s_module do + Application.get_env(:nopea, :k8s_module, Nopea.K8s) + end +end diff --git a/lib/nopea/progressive/rollout.ex b/lib/nopea/progressive/rollout.ex new file mode 100644 index 0000000..5513870 --- /dev/null +++ b/lib/nopea/progressive/rollout.ex @@ -0,0 +1,50 @@ +defmodule Nopea.Progressive.Rollout do + @moduledoc """ + Tracks the state of an active progressive delivery rollout. + + Created when a canary or blue_green deploy starts, updated + as the Monitor polls the Kulta Rollout CRD status. + """ + + @type phase :: :progressing | :promoted | :degraded | :paused | :failed | :completed + + @type t :: %__MODULE__{ + deploy_id: String.t(), + service: String.t(), + namespace: String.t(), + strategy: :canary | :blue_green, + phase: phase(), + current_step: non_neg_integer(), + total_steps: non_neg_integer(), + started_at: DateTime.t(), + updated_at: DateTime.t() + } + + @enforce_keys [:deploy_id, :service, :namespace, :strategy, :phase] + defstruct [ + :deploy_id, + :service, + :namespace, + :strategy, + phase: :progressing, + current_step: 0, + total_steps: 0, + started_at: nil, + updated_at: nil + ] + + @spec new(String.t(), String.t(), String.t(), :canary | :blue_green) :: t() + def new(deploy_id, service, namespace, strategy) do + now = DateTime.utc_now() + + %__MODULE__{ + deploy_id: deploy_id, + service: service, + namespace: namespace, + strategy: strategy, + phase: :progressing, + started_at: now, + updated_at: now + } + end +end diff --git a/lib/nopea/progressive/supervisor.ex b/lib/nopea/progressive/supervisor.ex new file mode 100644 index 0000000..3726c73 --- /dev/null +++ b/lib/nopea/progressive/supervisor.ex @@ -0,0 +1,28 @@ +defmodule Nopea.Progressive.Supervisor do + @moduledoc """ + DynamicSupervisor for Progressive.Monitor processes. + + One Monitor per active rollout, started when a canary or + blue_green deploy returns :progressing. + """ + + use DynamicSupervisor + + def start_link(opts \\ []) do + DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_opts) do + DynamicSupervisor.init(strategy: :one_for_one) + end + + @spec start_monitor(String.t(), Nopea.Deploy.Spec.t(), :canary | :blue_green) :: + {:ok, pid()} | {:error, term()} + def start_monitor(deploy_id, spec, strategy) do + DynamicSupervisor.start_child( + __MODULE__, + {Nopea.Progressive.Monitor, {deploy_id, spec, strategy}} + ) + end +end diff --git a/test/nopea/deploy_progressive_test.exs b/test/nopea/deploy_progressive_test.exs new file mode 100644 index 0000000..1286b4b --- /dev/null +++ b/test/nopea/deploy_progressive_test.exs @@ -0,0 +1,116 @@ +defmodule Nopea.DeployProgressiveTest do + use ExUnit.Case + + import Mox + + alias Nopea.Deploy + alias Nopea.Deploy.Spec + alias Nopea.Test.Factory + + @moduletag :tmp_dir + + setup :set_mox_global + setup :verify_on_exit! + + setup %{tmp_dir: tmp_dir} do + Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + + Mox.stub(Nopea.K8sMock, :get_resource, fn _, _, _, _ -> + {:error, :not_found} + end) + + Mox.stub(Nopea.K8sMock, :patch_resource, fn _, _, _, _, _ -> + {:ok, %{}} + end) + + start_supervised!({Nopea.Memory, workdir: tmp_dir}) + start_supervised!(Nopea.Cache) + start_supervised!({Registry, keys: :unique, name: Nopea.Registry}) + start_supervised!(Nopea.Progressive.Supervisor) + :ok + end + + describe "canary deploy returns :progressing" do + test "canary deploy returns progressing status and starts monitor" do + deployment = Factory.sample_deployment_manifest("canary-svc", "default") + + Mox.expect(Nopea.K8sMock, :apply_manifest, fn manifest, "default" -> + assert manifest["kind"] == "Rollout" + assert manifest["metadata"]["name"] == "canary-svc" + {:ok, manifest} + end) + + spec = %Spec{ + service: "canary-svc", + namespace: "default", + manifests: [deployment], + strategy: :canary + } + + result = Deploy.run(spec) + + assert result.status == :progressing + assert result.strategy == :canary + assert result.service == "canary-svc" + assert is_binary(result.deploy_id) + + # Monitor should be running + assert {:ok, rollout} = Nopea.Progressive.Monitor.status(result.deploy_id) + assert rollout.phase == :progressing + assert rollout.strategy == :canary + end + end + + describe "blue_green deploy returns :progressing" do + test "blue_green deploy returns progressing status" do + deployment = Factory.sample_deployment_manifest("bg-svc", "default") + + Mox.expect(Nopea.K8sMock, :apply_manifest, fn manifest, "default" -> + assert manifest["kind"] == "Rollout" + {:ok, manifest} + end) + + spec = %Spec{ + service: "bg-svc", + namespace: "default", + manifests: [deployment], + strategy: :blue_green + } + + result = Deploy.run(spec) + + assert result.status == :progressing + assert result.strategy == :blue_green + end + end + + describe "direct deploy still returns :completed" do + test "direct deploy is unaffected" do + spec = %Spec{ + service: "direct-svc", + namespace: "default", + manifests: [], + strategy: :direct + } + + result = Deploy.run(spec) + assert result.status == :completed + assert result.strategy == :direct + end + end + + describe "canary deploy fails without Deployment manifest" do + test "returns :failed with :no_deployment_found" do + spec = %Spec{ + service: "no-deploy-svc", + namespace: "default", + manifests: [Factory.sample_configmap_manifest("cfg", "default")], + strategy: :canary + } + + result = Deploy.run(spec) + assert result.status == :failed + assert result.error == :no_deployment_found + end + end +end diff --git a/test/nopea/deploy_test.exs b/test/nopea/deploy_test.exs index c7f9151..5ce7c59 100644 --- a/test/nopea/deploy_test.exs +++ b/test/nopea/deploy_test.exs @@ -249,7 +249,7 @@ defmodule Nopea.DeployTest do } result = Deploy.run(spec) - assert result.status == :completed + assert result.status == :progressing assert result.strategy == :canary end @@ -271,7 +271,7 @@ defmodule Nopea.DeployTest do } result = Deploy.run(spec) - assert result.status == :completed + assert result.status == :progressing assert result.strategy == :blue_green end diff --git a/test/nopea/progressive/monitor_test.exs b/test/nopea/progressive/monitor_test.exs new file mode 100644 index 0000000..32e4ccd --- /dev/null +++ b/test/nopea/progressive/monitor_test.exs @@ -0,0 +1,151 @@ +defmodule Nopea.Progressive.MonitorTest do + use ExUnit.Case + + import Mox + + alias Nopea.Progressive.Monitor + alias Nopea.Test.Factory + + setup :set_mox_global + setup :verify_on_exit! + + @moduletag :tmp_dir + + setup %{tmp_dir: tmp_dir} do + Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + + Mox.stub(Nopea.K8sMock, :get_resource, fn _, _, _, _ -> + {:error, :not_found} + end) + + Mox.stub(Nopea.K8sMock, :patch_resource, fn _, _, _, _, _ -> + {:ok, %{}} + end) + + Mox.stub(Nopea.K8sMock, :delete_resource, fn _, _, _, _ -> + :ok + end) + + start_supervised!({Registry, keys: :unique, name: Nopea.Registry}) + start_supervised!({Nopea.Memory, workdir: tmp_dir}) + start_supervised!(Nopea.Cache) + :ok + end + + defp build_spec(service \\ "canary-svc") do + Factory.build_spec( + service: service, + namespace: "default", + strategy: :canary, + manifests: [Factory.sample_deployment_manifest(service)] + ) + end + + describe "start_link/1 and status/1" do + test "starts monitor and returns progressing status" do + spec = build_spec() + deploy_id = "test-deploy-001" + + {:ok, _pid} = + start_supervised!({Monitor, {deploy_id, spec, :canary}}, id: :mon1) + |> then(&{:ok, &1}) + + assert {:ok, rollout} = Monitor.status(deploy_id) + assert rollout.deploy_id == deploy_id + assert rollout.service == "canary-svc" + assert rollout.phase == :progressing + assert rollout.strategy == :canary + end + end + + describe "promote/1" do + test "patches rollout and updates phase to promoted" do + Mox.expect(Nopea.K8sMock, :patch_resource, fn + "kulta.io/v1alpha1", "Rollout", "canary-svc", "default", patch -> + assert patch["metadata"]["annotations"]["kulta.io/promote"] == "true" + {:ok, %{"status" => %{"phase" => "Healthy"}}} + end) + + spec = build_spec() + deploy_id = "test-promote-001" + start_supervised!({Monitor, {deploy_id, spec, :canary}}, id: :mon_promote) + + assert {:ok, rollout} = Monitor.promote(deploy_id) + assert rollout.phase == :promoted + end + + test "returns error when patch fails" do + Mox.expect(Nopea.K8sMock, :patch_resource, fn _, _, _, _, _ -> + {:error, :forbidden} + end) + + spec = build_spec() + deploy_id = "test-promote-err" + start_supervised!({Monitor, {deploy_id, spec, :canary}}, id: :mon_promote_err) + + assert {:error, :forbidden} = Monitor.promote(deploy_id) + end + end + + describe "rollback/1" do + test "deletes rollout and updates phase to failed" do + Mox.expect(Nopea.K8sMock, :delete_resource, fn + "kulta.io/v1alpha1", "Rollout", "canary-svc", "default" -> + :ok + end) + + spec = build_spec() + deploy_id = "test-rollback-001" + start_supervised!({Monitor, {deploy_id, spec, :canary}}, id: :mon_rollback) + + assert {:ok, rollout} = Monitor.rollback(deploy_id) + assert rollout.phase == :failed + end + end + + describe "status/1 for nonexistent monitor" do + test "returns not_found" do + assert {:error, :not_found} = Monitor.status("nonexistent-deploy") + end + end + + describe "list_active/0" do + test "returns all active rollouts" do + spec1 = build_spec("svc-a") + spec2 = build_spec("svc-b") + + start_supervised!({Monitor, {"deploy-a", spec1, :canary}}, id: :mon_a) + start_supervised!({Monitor, {"deploy-b", spec2, :blue_green}}, id: :mon_b) + + active = Monitor.list_active() + deploy_ids = Enum.map(active, & &1.deploy_id) + assert "deploy-a" in deploy_ids + assert "deploy-b" in deploy_ids + end + end + + describe "polling" do + test "updates rollout phase from polled status" do + Mox.expect(Nopea.K8sMock, :get_resource, fn + "kulta.io/v1alpha1", "Rollout", "poll-svc", "default" -> + {:ok, + %{"status" => %{"phase" => "Healthy", "currentStepIndex" => 3, "totalSteps" => 4}}} + end) + + spec = build_spec("poll-svc") + deploy_id = "test-poll-001" + start_supervised!({Monitor, {deploy_id, spec, :canary}}, id: :mon_poll) + + # Trigger poll manually + pid = Monitor.whereis(deploy_id) + send(pid, :poll) + + # Wait for the monitor to process poll and terminate + ref = Process.monitor(pid) + assert_receive {:DOWN, ^ref, :process, ^pid, :normal}, 5_000 + + # Monitor should have stopped since "Healthy" is terminal + assert {:error, :not_found} = Monitor.status(deploy_id) + end + end +end From 3d0ea1eb8d6beb06b124edbf9e676b24e21dea87 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Sun, 8 Mar 2026 16:58:42 +0100 Subject: [PATCH 5/5] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?= =?UTF-8?q?remove=20stale=20cache=20writes,=20handle=20disk=20errors,=20sa?= =?UTF-8?q?nitize=20HTTP=20errors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove record_outcome from :progressing branch to prevent Cache getting stuck at :progressing status - Add Cache update to Monitor.record_outcome so final rollout status is persisted - Chain File.write into with clause in persist_to_disk with proper error logging - Replace inspect(reason) in HTTP error responses with generic "Internal server error" and server-side Logger.error Co-Authored-By: Claude Opus 4.6 --- lib/nopea/api/router.ex | 24 ++++++++++++++++++------ lib/nopea/deploy.ex | 1 - lib/nopea/memory.ex | 14 ++++++++++---- lib/nopea/progressive/monitor.ex | 12 ++++++++++-- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/lib/nopea/api/router.ex b/lib/nopea/api/router.ex index 2342432..220a5c9 100644 --- a/lib/nopea/api/router.ex +++ b/lib/nopea/api/router.ex @@ -74,17 +74,29 @@ defmodule Nopea.API.Router do post "/api/promote/:deploy_id" do case Nopea.Surface.promote(deploy_id) do - {:ok, rollout} -> json(conn, 200, Map.from_struct(rollout)) - {:error, :not_found} -> json(conn, 404, %{error: "not_found"}) - {:error, reason} -> json(conn, 500, %{error: inspect(reason)}) + {:ok, rollout} -> + json(conn, 200, Map.from_struct(rollout)) + + {:error, :not_found} -> + json(conn, 404, %{error: "not_found"}) + + {:error, reason} -> + Logger.error("Promote failed", deploy_id: deploy_id, error: inspect(reason)) + json(conn, 500, %{error: "Internal server error"}) end end post "/api/rollback/:deploy_id" do case Nopea.Surface.rollback(deploy_id) do - {:ok, rollout} -> json(conn, 200, Map.from_struct(rollout)) - {:error, :not_found} -> json(conn, 404, %{error: "not_found"}) - {:error, reason} -> json(conn, 500, %{error: inspect(reason)}) + {:ok, rollout} -> + json(conn, 200, Map.from_struct(rollout)) + + {:error, :not_found} -> + json(conn, 404, %{error: "not_found"}) + + {:error, reason} -> + Logger.error("Rollback failed", deploy_id: deploy_id, error: inspect(reason)) + json(conn, 500, %{error: "Internal server error"}) end end diff --git a/lib/nopea/deploy.ex b/lib/nopea/deploy.ex index 3b29a32..2710c86 100644 --- a/lib/nopea/deploy.ex +++ b/lib/nopea/deploy.ex @@ -59,7 +59,6 @@ defmodule Nopea.Deploy do {:ok, {applied, :progressing}} -> duration_ms = duration_ms(start_time) result = Result.progressing(deploy_id, spec, strategy, applied, duration_ms) - record_outcome(result, context) start_progressive_monitor(deploy_id, spec, strategy) Logger.info("Deploy progressing", diff --git a/lib/nopea/memory.ex b/lib/nopea/memory.ex index d04a44d..f2923ad 100644 --- a/lib/nopea/memory.ex +++ b/lib/nopea/memory.ex @@ -162,10 +162,16 @@ defmodule Nopea.Memory do defp persist_to_disk(graph, workdir) do path = graph_path(workdir) - - with :ok <- File.mkdir_p(Path.dirname(path)) do - binary = @graph_version <> :erlang.term_to_binary(graph) - File.write(path, binary) + binary = @graph_version <> :erlang.term_to_binary(graph) + + with :ok <- File.mkdir_p(Path.dirname(path)), + :ok <- File.write(path, binary) do + :ok + else + {:error, reason} -> + Logger.warning("Failed to persist graph to disk", + error: inspect(reason) + ) end rescue error -> diff --git a/lib/nopea/progressive/monitor.ex b/lib/nopea/progressive/monitor.ex index 7d19318..ec9a77d 100644 --- a/lib/nopea/progressive/monitor.ex +++ b/lib/nopea/progressive/monitor.ex @@ -268,9 +268,9 @@ defmodule Nopea.Progressive.Monitor do defp terminal?(phase), do: phase in [:completed, :promoted, :failed] defp record_outcome(rollout) do - if Process.whereis(Nopea.Memory) do - status = if rollout.phase in [:completed, :promoted], do: :completed, else: :failed + status = if rollout.phase in [:completed, :promoted], do: :completed, else: :failed + if Process.whereis(Nopea.Memory) do Nopea.Memory.record_deploy(%{ service: rollout.service, namespace: rollout.namespace, @@ -280,6 +280,14 @@ defmodule Nopea.Progressive.Monitor do concurrent_deploys: [] }) end + + if Nopea.Cache.available?() do + Nopea.Cache.put_service_state(rollout.service, %{ + status: status, + last_deploy: rollout.deploy_id, + last_deploy_at: DateTime.utc_now() + }) + end end defp k8s_module do