From 3ed4e990a8a6b446c5ceaad7600250d978540960 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Thu, 12 Mar 2026 01:09:33 +0100 Subject: [PATCH 01/12] docs: update CLAUDE.md for Surface, progressive delivery, and persistence Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 50 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 6b1e010..c44de50 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -19,7 +19,7 @@ Nopea is a deployment tool that builds a knowledge graph from every deployment. mix format && mix compile --warnings-as-errors && mix test # Individual commands -mix test # 280 tests, 0 failures +mix test # 305 tests, 0 failures mix test test/nopea/deploy_test.exs # Single file mix test test/nopea/deploy_test.exs:106 # Single test by line number mix test --exclude integration --exclude cluster # Skip slow tests @@ -35,19 +35,36 @@ Tests exclude `:integration` and `:cluster` tags by default (configured in `test ## DEPLOY PIPELINE ``` -CLI/MCP/API → Deploy.deploy(spec) +CLI/MCP/API → Surface.*() → Deploy.deploy(spec) → ServiceAgent.deploy() # queue/serialize per-service → Deploy.run(spec) # orchestration → Memory.get_deploy_context() # graph query → select_strategy() # direct/canary/blue_green (memory-aware) - → Strategy.Direct.execute() # K8s server-side apply - → Drift.verify_manifest() # post-deploy 3-way diff + → Strategy.Direct.execute() # K8s server-side apply (direct) + → Kulta.RolloutBuilder.build() # Rollout CRD (canary/blue_green) + → Drift.verify_manifest() # post-deploy 3-way diff (direct only) → Memory.record_deploy() # graph update (EWMA, async cast) → Occurrence.build() + persist() # FALSE Protocol ``` **Entry points**: `Deploy.deploy/1` routes through ServiceAgent if the supervisor is running; falls back to `Deploy.run/1` otherwise. Always use `deploy/1` — never call `run/1` directly from external callers. +**Progressive delivery**: Canary/blue_green strategies return `status: :progressing` and start a `Progressive.Monitor` GenServer. The Monitor polls the Kulta Rollout CRD and records the final outcome. Direct deploys return `:completed` or `:failed` immediately. + +--- + +## SURFACE — UNIFIED INTERFACE LAYER + +`Nopea.Surface` is the facade backing CLI, MCP, and HTTP. All user-facing interfaces delegate here. + +``` +CLI (cli.ex) ──→ Surface.*() ──→ Memory / Cache / ServiceAgent / Progressive.Monitor +MCP (mcp.ex) ──→ Surface.*() +HTTP (router.ex) → Surface.*() +``` + +Key design: Surface handles graceful degradation when optional subsystems aren't running (e.g., returns `{:error, :unavailable}` if Cache is down rather than crashing). + --- ## OTP SUPERVISION TREE @@ -62,6 +79,7 @@ Nopea.Application ├── Nopea.Cluster # libcluster (optional, cluster mode) ├── Nopea.Registry / DistributedRegistry # Process registry ├── Nopea.ServiceAgent.Supervisor # DynamicSupervisor for per-service agents +├── Nopea.Progressive.Supervisor # DynamicSupervisor for rollout monitors └── Nopea.API.Router # Plug/Cowboy HTTP (optional) ``` @@ -74,7 +92,7 @@ Most children are optional, controlled by `Application.get_env(:nopea, key)`: | `:enable_metrics` | `true` | TelemetryMetricsPrometheus | | `:enable_cache` | `true` | Nopea.Cache (ETS) | | `:enable_memory` | `true` | Nopea.Memory (knowledge graph) | -| `:enable_deploy_supervisor` | `true` | Registry + ServiceAgent.Supervisor | +| `:enable_deploy_supervisor` | `true` | Registry + ServiceAgent.Supervisor + Progressive.Supervisor | | `:enable_router` | `false` | Nopea.API.Router (HTTP) | | `:cluster_enabled` | `false` | Cluster + DistributedRegistry | | `:cdevents_endpoint` | `nil` | Events.Emitter (started only if set) | @@ -118,7 +136,7 @@ Per-service GenServer that queues and serializes deploys: ## MEMORY SYSTEM -Knowledge graph stored in `Nopea.Memory` GenServer state. +Knowledge graph stored in `Nopea.Memory` GenServer state, persisted to `.nopea/graph.etf`. **Graph nodes**: services, namespaces, errors (kinds: `:concept`, `:error`) **Graph relationships**: `:deployed_to`, `:breaks`, `:deployed_together` @@ -129,6 +147,12 @@ Key API: - `Memory.record_deploy(result)` → ingest into graph (**async cast**) - `Memory.node_count()` / `Memory.relationship_count()` → graph stats (**sync call**) +### Persistence + +Graph persists to `.nopea/graph.etf` as versioned binary (`<<1, rest::binary>>`). +Restore order on startup: ETS snapshot → disk → fresh `Graph.new()`. +Written on every `record_deploy`, hourly decay, and `terminate/2`. + --- ## K8S MOCK PATTERN @@ -224,9 +248,21 @@ Occurrences are structured events generated after every deployment. --- +## PROGRESSIVE DELIVERY + +Canary and blue_green strategies create Kulta Rollout CRDs and return `:progressing`. A `Progressive.Monitor` GenServer per rollout polls the CRD status. + +- **Phases**: `:progressing` → `:promoted` / `:completed` / `:degraded` / `:paused` / `:failed` +- **Terminal**: `:completed`, `:promoted`, `:failed` — Monitor stops and records outcome +- **Poll interval**: 10s, **Max duration**: 1 hour (timeout → `:failed`) +- **Manual control**: `Surface.promote(deploy_id)` / `Surface.rollback(deploy_id)` +- **Registry**: Monitors register as `{:rollout, deploy_id}` in `Nopea.Registry` + +--- + ## MCP SERVER -JSON-RPC 2.0 over stdin/stdout. Tools: `nopea_deploy`, `nopea_context`, `nopea_history`, `nopea_health`, `nopea_explain`. +JSON-RPC 2.0 over stdin/stdout. Tools: `nopea_deploy`, `nopea_context`, `nopea_history`, `nopea_health`, `nopea_explain`, `nopea_services`, `nopea_promote`, `nopea_rollback`. --- From 0d0961f327c9bf09189a7afd8b81935b6c0f592b Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Thu, 12 Mar 2026 23:52:06 +0100 Subject: [PATCH 02/12] fix: record outcome on manual promote/rollback, clarify pipeline diagram - Add record_outcome call to promote and rollback handle_call paths so Memory and Cache are updated on manual intervention - Update CLAUDE.md pipeline diagram to show branching between direct and progressive strategy paths Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 11 +++++++---- lib/nopea/progressive/monitor.ex | 2 ++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index c44de50..e127477 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -40,10 +40,13 @@ CLI/MCP/API → Surface.*() → Deploy.deploy(spec) → Deploy.run(spec) # orchestration → Memory.get_deploy_context() # graph query → select_strategy() # direct/canary/blue_green (memory-aware) - → Strategy.Direct.execute() # K8s server-side apply (direct) - → Kulta.RolloutBuilder.build() # Rollout CRD (canary/blue_green) - → Drift.verify_manifest() # post-deploy 3-way diff (direct only) - → Memory.record_deploy() # graph update (EWMA, async cast) + ├─ :direct → + │ → Strategy.Direct.execute() # K8s server-side apply + │ → Drift.verify_manifest() # post-deploy 3-way diff + │ → Memory.record_deploy() # graph update (EWMA, async cast) + └─ :canary/:blue_green → + → Kulta.RolloutBuilder.build() # Rollout CRD + → Progressive.Monitor.start() # polls CRD → records outcome on terminal → Occurrence.build() + persist() # FALSE Protocol ``` diff --git a/lib/nopea/progressive/monitor.ex b/lib/nopea/progressive/monitor.ex index ec9a77d..a5961fb 100644 --- a/lib/nopea/progressive/monitor.ex +++ b/lib/nopea/progressive/monitor.ex @@ -118,6 +118,7 @@ defmodule Nopea.Progressive.Monitor do state = %{state | rollout: rollout} if terminal?(rollout.phase) do + record_outcome(rollout) {:stop, :normal, {:ok, rollout}, state} else {:reply, {:ok, rollout}, state} @@ -132,6 +133,7 @@ defmodule Nopea.Progressive.Monitor do case do_rollback(state) do {:ok, rollout} -> state = %{state | rollout: rollout} + record_outcome(rollout) {:stop, :normal, {:ok, rollout}, state} {:error, _} = error -> From 0e6bf08215066dd404322ff83338ff58688809ba Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Mon, 16 Mar 2026 01:56:46 +0100 Subject: [PATCH 03/12] fix: replace bare raise with error tuple in Occurrence.build/2 Change Occurrence.build/2 to return {:ok, occ} | {:error, term()} instead of raising on failure. Update deploy.ex caller and test files to handle the new tuple return. Co-Authored-By: Claude Opus 4.6 --- lib/nopea/deploy.ex | 39 ++++++++++++------------ lib/nopea/occurrence.ex | 21 +++++++------ test/nopea/occurrence_log_test.exs | 10 +++---- test/nopea/occurrence_test.exs | 48 +++++++++++++++--------------- 4 files changed, 61 insertions(+), 57 deletions(-) diff --git a/lib/nopea/deploy.ex b/lib/nopea/deploy.ex index 2710c86..5698cc5 100644 --- a/lib/nopea/deploy.ex +++ b/lib/nopea/deploy.ex @@ -231,32 +231,33 @@ defmodule Nopea.Deploy do nil end - occurrence = Nopea.Occurrence.build(occurrence_input, memory_context) - - # Start log emitter and emit key deploy events - occurrence = emit_deploy_logs(occurrence, result) - - # Persist to .nopea/ directory - workdir = File.cwd!() - - case Nopea.Occurrence.persist(occurrence, workdir) do - :ok -> - :ok + case Nopea.Occurrence.build(occurrence_input, memory_context) do + {:ok, occurrence} -> + # Start log emitter and emit key deploy events + occurrence = emit_deploy_logs(occurrence, result) + + # Persist to .nopea/ directory + workdir = File.cwd!() + + case Nopea.Occurrence.persist(occurrence, workdir) do + :ok -> + :ok + + {:error, reason} -> + Logger.error("Failed to persist occurrence", + service: result.service, + deploy_id: result.deploy_id, + error: inspect(reason) + ) + end {:error, reason} -> - Logger.error("Failed to persist occurrence", + Logger.error("Failed to generate occurrence", service: result.service, deploy_id: result.deploy_id, error: inspect(reason) ) end - rescue - error -> - Logger.error("Failed to generate occurrence: #{Exception.message(error)}", - service: result.service, - deploy_id: result.deploy_id, - error: Exception.format(:error, error, __STACKTRACE__) - ) end defp emit_deploy_logs(occurrence, result) do diff --git a/lib/nopea/occurrence.ex b/lib/nopea/occurrence.ex index 49a0239..5196d21 100644 --- a/lib/nopea/occurrence.ex +++ b/lib/nopea/occurrence.ex @@ -24,7 +24,7 @@ defmodule Nopea.Occurrence do Optional second argument provides memory context from the knowledge graph to enrich the reasoning block. """ - @spec build(map(), map() | nil) :: Occurrence.t() + @spec build(map(), map() | nil) :: {:ok, Occurrence.t()} | {:error, term()} def build(result, memory_context \\ nil) do {type_suffix, severity, outcome} = classify(result.status) @@ -33,16 +33,19 @@ defmodule Nopea.Occurrence do outcome: outcome ) do {:ok, occ} -> - occ - |> maybe_set_namespace(result) - |> maybe_add_entities(result) - |> Occurrence.with_data(build_deploy_data(result)) - |> Occurrence.with_history(build_history(result)) - |> maybe_add_error(result) - |> maybe_add_reasoning(result, memory_context) + enriched = + occ + |> maybe_set_namespace(result) + |> maybe_add_entities(result) + |> Occurrence.with_data(build_deploy_data(result)) + |> Occurrence.with_history(build_history(result)) + |> maybe_add_error(result) + |> maybe_add_reasoning(result, memory_context) + + {:ok, enriched} {:error, reason} -> - raise "FalseProtocol.Occurrence.new failed: #{inspect(reason)}" + {:error, {:occurrence_new_failed, reason}} end end diff --git a/test/nopea/occurrence_log_test.exs b/test/nopea/occurrence_log_test.exs index 06c35b8..c9c9092 100644 --- a/test/nopea/occurrence_log_test.exs +++ b/test/nopea/occurrence_log_test.exs @@ -24,13 +24,13 @@ defmodule Nopea.OccurrenceLogTest do describe "start_log_emitter/1" do test "starts a log emitter for the occurrence" do - occ = NopeaOccurrence.build(@result) + {:ok, occ} = NopeaOccurrence.build(@result) emitter = start_emitter(occ) assert is_pid(emitter) end test "emitter uses :both mode" do - occ = NopeaOccurrence.build(@result) + {:ok, occ} = NopeaOccurrence.build(@result) emitter = start_emitter(occ) semantic = %FalseProtocol.Semantic{ @@ -45,7 +45,7 @@ defmodule Nopea.OccurrenceLogTest do end test "emitter sequences entries correctly" do - occ = NopeaOccurrence.build(@result) + {:ok, occ} = NopeaOccurrence.build(@result) emitter = start_emitter(occ) semantic = %FalseProtocol.Semantic{ @@ -67,7 +67,7 @@ defmodule Nopea.OccurrenceLogTest do end test "entries reference the parent occurrence" do - occ = NopeaOccurrence.build(@result) + {:ok, occ} = NopeaOccurrence.build(@result) emitter = start_emitter(occ) semantic = %FalseProtocol.Semantic{ @@ -82,7 +82,7 @@ defmodule Nopea.OccurrenceLogTest do describe "attach_log_ref/2" do test "attaches log_ref with entry count" do - occ = NopeaOccurrence.build(@result) + {:ok, occ} = NopeaOccurrence.build(@result) emitter = start_emitter(occ) semantic = %FalseProtocol.Semantic{ diff --git a/test/nopea/occurrence_test.exs b/test/nopea/occurrence_test.exs index cdec8db..50f4f1a 100644 --- a/test/nopea/occurrence_test.exs +++ b/test/nopea/occurrence_test.exs @@ -39,7 +39,7 @@ defmodule Nopea.OccurrenceTest do describe "build/1 for successful deploys" do test "produces valid FALSE Protocol occurrence struct" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert %FalseProtocol.Occurrence{} = occ assert occ.protocol_version == "1.0" @@ -52,17 +52,17 @@ defmodule Nopea.OccurrenceTest do end test "has no error block on success" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert occ.error == nil end test "has no reasoning block on success" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert occ.reasoning == nil end test "has history block with steps" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert %FalseProtocol.History{} = occ.history assert is_list(occ.history.steps) @@ -75,14 +75,14 @@ defmodule Nopea.OccurrenceTest do end test "includes verification step when verified" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) actions = Enum.map(occ.history.steps, & &1.action) assert "verify" in actions end test "has deploy_data in data field" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert data = occ.data assert data["service"] == "auth-service" @@ -93,12 +93,12 @@ defmodule Nopea.OccurrenceTest do end test "sets namespace in context" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert occ.context.namespace == "production" end test "builds entities from applied_resources" do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert [entity] = occ.context.entities assert entity.type == "Deployment" @@ -112,7 +112,7 @@ defmodule Nopea.OccurrenceTest do describe "build/1 for failed deploys" do test "produces failed occurrence" do - occ = Nopea.Occurrence.build(@failed_result) + {:ok, occ} = Nopea.Occurrence.build(@failed_result) assert occ.type == "deploy.run.failed" assert occ.severity == :error @@ -120,7 +120,7 @@ defmodule Nopea.OccurrenceTest do end test "has error struct with structured details" do - occ = Nopea.Occurrence.build(@failed_result) + {:ok, occ} = Nopea.Occurrence.build(@failed_result) assert %FalseProtocol.Error{} = occ.error assert occ.error.code == "timeout" @@ -129,14 +129,14 @@ defmodule Nopea.OccurrenceTest do end test "error has why_it_matters" do - occ = Nopea.Occurrence.build(@failed_result) + {:ok, occ} = Nopea.Occurrence.build(@failed_result) assert is_binary(occ.error.why_it_matters) assert String.contains?(occ.error.why_it_matters, "production") end test "has reasoning block with low confidence without memory" do - occ = Nopea.Occurrence.build(@failed_result) + {:ok, occ} = Nopea.Occurrence.build(@failed_result) assert %FalseProtocol.Reasoning{} = occ.reasoning assert occ.reasoning.confidence == 0.3 @@ -144,7 +144,7 @@ defmodule Nopea.OccurrenceTest do end test "history steps have action and timestamp" do - occ = Nopea.Occurrence.build(@failed_result) + {:ok, occ} = Nopea.Occurrence.build(@failed_result) [step] = occ.history.steps assert step.action == "apply" @@ -173,7 +173,7 @@ defmodule Nopea.OccurrenceTest do ] } - occ = Nopea.Occurrence.build(@failed_result, memory_context) + {:ok, occ} = Nopea.Occurrence.build(@failed_result, memory_context) assert %FalseProtocol.Reasoning{} = occ.reasoning assert is_binary(occ.reasoning.summary) @@ -194,7 +194,7 @@ defmodule Nopea.OccurrenceTest do recommendations: ["Consider canary deployment."] } - occ = Nopea.Occurrence.build(@failed_result, memory_context) + {:ok, occ} = Nopea.Occurrence.build(@failed_result, memory_context) assert occ.reasoning.explanation =~ "Consider canary deployment." end @@ -203,7 +203,7 @@ defmodule Nopea.OccurrenceTest do describe "build/2 with rolledback status" do test "produces rolledback type with failure outcome" do result = %{@failed_result | status: :rolledback} - occ = Nopea.Occurrence.build(result) + {:ok, occ} = Nopea.Occurrence.build(result) assert occ.type == "deploy.run.rolledback" assert occ.severity == :warning @@ -212,7 +212,7 @@ defmodule Nopea.OccurrenceTest do test "history includes rollback step" do result = %{@failed_result | status: :rolledback} - occ = Nopea.Occurrence.build(result) + {:ok, occ} = Nopea.Occurrence.build(result) actions = Enum.map(occ.history.steps, & &1.action) assert "apply" in actions @@ -221,7 +221,7 @@ defmodule Nopea.OccurrenceTest do test "rollback indicated in deploy data" do result = %{@failed_result | status: :rolledback} - occ = Nopea.Occurrence.build(result) + {:ok, occ} = Nopea.Occurrence.build(result) assert occ.data["service"] == "payment-svc" end @@ -242,7 +242,7 @@ defmodule Nopea.OccurrenceTest do ] } - occ = Nopea.Occurrence.build(result) + {:ok, occ} = Nopea.Occurrence.build(result) assert [entity] = occ.context.entities assert entity.type == "ConfigMap" @@ -268,7 +268,7 @@ defmodule Nopea.OccurrenceTest do ] } - occ = Nopea.Occurrence.build(result) + {:ok, occ} = Nopea.Occurrence.build(result) assert [entity] = occ.context.entities assert entity.name == "valid-svc" @@ -289,7 +289,7 @@ defmodule Nopea.OccurrenceTest do end test "writes occurrence.json to .nopea directory", %{workdir: workdir} do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert :ok = Nopea.Occurrence.persist(occ, workdir) json_path = Path.join([workdir, ".nopea", "occurrence.json"]) @@ -302,7 +302,7 @@ defmodule Nopea.OccurrenceTest do end test "writes ETF to occurrences/ directory", %{workdir: workdir} do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) :ok = Nopea.Occurrence.persist(occ, workdir) etf_dir = Path.join([workdir, ".nopea", "occurrences"]) @@ -314,12 +314,12 @@ defmodule Nopea.OccurrenceTest do end test "returns error when workdir is not writable", _context do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) assert {:error, _reason} = Nopea.Occurrence.persist(occ, "/nonexistent/path/that/fails") end test "ETF round-trips to same struct", %{workdir: workdir} do - occ = Nopea.Occurrence.build(@successful_result) + {:ok, occ} = Nopea.Occurrence.build(@successful_result) :ok = Nopea.Occurrence.persist(occ, workdir) etf_dir = Path.join([workdir, ".nopea", "occurrences"]) From 32956658817f6d075d21a70d6d0a5a694500194f Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Mon, 16 Mar 2026 02:07:21 +0100 Subject: [PATCH 04/12] feat: create SYKLI TargetBehaviour with callback specs Co-Authored-By: Claude Opus 4.6 --- lib/nopea/sykli/target.ex | 2 ++ lib/nopea/sykli/target_behaviour.ex | 33 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 lib/nopea/sykli/target_behaviour.ex diff --git a/lib/nopea/sykli/target.ex b/lib/nopea/sykli/target.ex index bc7750d..c8828f0 100644 --- a/lib/nopea/sykli/target.ex +++ b/lib/nopea/sykli/target.ex @@ -21,6 +21,8 @@ defmodule Nopea.SYKLI.Target do strategy: canary """ + @behaviour Nopea.SYKLI.TargetBehaviour + require Logger defmodule State do diff --git a/lib/nopea/sykli/target_behaviour.ex b/lib/nopea/sykli/target_behaviour.ex new file mode 100644 index 0000000..8a812f1 --- /dev/null +++ b/lib/nopea/sykli/target_behaviour.ex @@ -0,0 +1,33 @@ +defmodule Nopea.SYKLI.TargetBehaviour do + @moduledoc """ + Behaviour defining the SYKLI target interface for Nopea deployments. + + Any module implementing this behaviour can serve as a SYKLI pipeline target, + providing deploy capabilities through the standard SYKLI task execution model. + """ + + @doc """ + Returns the target name identifier. + """ + @callback name() :: String.t() + + @doc """ + Checks whether this target is available and returns capability metadata. + """ + @callback available?() :: {:ok, map()} | {:error, term()} + + @doc """ + Initializes the target with the given options, returning target state. + """ + @callback setup(keyword()) :: {:ok, term()} | {:error, term()} + + @doc """ + Tears down the target, releasing any resources held in state. + """ + @callback teardown(term()) :: :ok + + @doc """ + Executes a task against this target with the given state and options. + """ + @callback run_task(map(), term(), keyword()) :: {:ok, term()} | {:error, term()} +end From 15a605f53b871f79da5cd6eca653a2fcc3d68031 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Mon, 16 Mar 2026 14:18:18 +0100 Subject: [PATCH 05/12] feat: add MCP lifecycle handling and fix CLI serve SIGTERM Implement proper shutdown/exit/cancelled notification handling in MCP server. Add System.trap_signal(:sigterm) in CLI serve command for graceful container shutdown. Co-Authored-By: Claude Opus 4.6 --- lib/nopea/cli.ex | 1 + lib/nopea/mcp.ex | 124 ++++++++++++++++++++++++++++++---------- test/nopea/mcp_test.exs | 94 ++++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 29 deletions(-) diff --git a/lib/nopea/cli.ex b/lib/nopea/cli.ex index 2fa8e13..063c371 100644 --- a/lib/nopea/cli.ex +++ b/lib/nopea/cli.ex @@ -168,6 +168,7 @@ defmodule Nopea.CLI do {:ok, _apps} -> port = Application.get_env(:nopea, :api_port, 4000) Logger.info("Nopea API listening on port #{port}") + System.trap_signal(:sigterm, fn -> System.stop(0) end) Process.sleep(:infinity) {:error, reason} -> diff --git a/lib/nopea/mcp.ex b/lib/nopea/mcp.ex index d8fd832..5229a68 100644 --- a/lib/nopea/mcp.ex +++ b/lib/nopea/mcp.ex @@ -18,6 +18,13 @@ defmodule Nopea.MCP do ## Protocol JSON-RPC 2.0 over stdin/stdout, newline-delimited. + + ## Lifecycle + + Supports `shutdown` request and `exit` notification per JSON-RPC/MCP spec. + The `shutdown` request returns a null result and sets a shutdown flag. + The `exit` notification halts the process: exit code 0 if shutdown was + received, exit code 1 otherwise. """ require Logger @@ -133,46 +140,89 @@ defmodule Nopea.MCP do } ] + @typedoc "Server state tracking lifecycle flags" + @type state :: %{shutdown_received: boolean()} + + @doc "Returns initial MCP server state." + @spec initial_state() :: state() + def initial_state, do: %{shutdown_received: false} + # Public API - @spec handle_request(map()) :: {:ok, map()} - def handle_request(%{"method" => "initialize", "id" => id}) do - {:ok, - success_response(id, %{ + @doc """ + Handle a JSON-RPC request without state tracking. + + Returns `{:ok, response}` where response may be nil for notifications. + """ + @spec handle_request(map()) :: {:ok, map() | nil} + def handle_request(request) do + {response, _state} = handle_request(request, initial_state()) + {:ok, response} + end + + @doc """ + Handle a JSON-RPC request with state tracking. + + Returns `{response, new_state}` where response may be nil for notifications, + or `:halt` to signal the server should stop (exit notification). + """ + @spec handle_request(map(), state()) :: {map() | nil, state()} | :halt + def handle_request(%{"method" => "initialize", "id" => id}, state) do + {success_response(id, %{ "protocolVersion" => @protocol_version, "serverInfo" => %{"name" => "nopea", "version" => @version}, "capabilities" => %{ "tools" => %{"listChanged" => false} } - })} + }), state} + end + + def handle_request(%{"method" => "shutdown", "id" => id}, state) do + {success_response(id, nil), %{state | shutdown_received: true}} end - def handle_request(%{"method" => "tools/list", "id" => id}) do - {:ok, success_response(id, %{"tools" => @tools})} + def handle_request(%{"method" => "exit"}, %{shutdown_received: true}) do + :halt end - def handle_request(%{"method" => "tools/call", "id" => id, "params" => params}) do + def handle_request(%{"method" => "exit"}, %{shutdown_received: false}) do + :halt + end + + def handle_request(%{"method" => "tools/list", "id" => id}, state) do + {success_response(id, %{"tools" => @tools}), state} + end + + def handle_request(%{"method" => "tools/call", "id" => id, "params" => params}, state) do tool_name = params["name"] arguments = params["arguments"] || %{} case call_tool(tool_name, arguments) do {:ok, text} -> - {:ok, - success_response(id, %{ + {success_response(id, %{ "content" => [%{"type" => "text", "text" => text}] - })} + }), state} {:error, message} -> - {:ok, error_response(id, -32_602, message)} + {error_response(id, -32_602, message), state} end end - def handle_request(%{"method" => "notifications/initialized"}) do - {:ok, nil} + def handle_request(%{"method" => "notifications/initialized"}, state) do + {nil, state} + end + + def handle_request(%{"method" => "notifications/cancelled"} = request, state) do + Logger.info("MCP notification cancelled", + request_id: get_in(request, ["params", "id"]), + reason: get_in(request, ["params", "reason"]) + ) + + {nil, state} end - def handle_request(%{"id" => id}) do - {:ok, error_response(id, -32_601, "Method not found")} + def handle_request(%{"id" => id}, state) do + {error_response(id, -32_601, "Method not found"), state} end @spec encode(map()) :: binary() @@ -187,25 +237,41 @@ defmodule Nopea.MCP do @doc """ Run the MCP server loop reading from stdin, writing to stdout. + + Tracks shutdown state across requests using `Enum.reduce_while/3`. + On `exit` notification after `shutdown`, halts with exit code 0. + On `exit` without prior `shutdown`, halts with exit code 1. """ @spec serve() :: :ok def serve do - IO.stream(:stdio, :line) - |> Stream.each(&handle_line/1) - |> Stream.run() - end + _final_state = + IO.stream(:stdio, :line) + |> Enum.reduce_while(initial_state(), fn line, state -> + case decode(line) do + {:ok, request} -> + dispatch(request, state) + + {:error, _} -> + IO.write(encode(error_response(nil, -32_700, "Parse error"))) + {:cont, state} + end + end) - defp handle_line(line) do - case decode(line) do - {:ok, request} -> dispatch(request) - {:error, _} -> IO.write(encode(error_response(nil, -32_700, "Parse error"))) - end + :ok end - defp dispatch(request) do - case handle_request(request) do - {:ok, nil} -> :ok - {:ok, response} -> IO.write(encode(response)) + defp dispatch(request, state) do + case handle_request(request, state) do + :halt -> + exit_code = if state.shutdown_received, do: 0, else: 1 + System.halt(exit_code) + + {nil, new_state} -> + {:cont, new_state} + + {response, new_state} -> + IO.write(encode(response)) + {:cont, new_state} end end diff --git a/test/nopea/mcp_test.exs b/test/nopea/mcp_test.exs index 62e8f7c..e49b8d7 100644 --- a/test/nopea/mcp_test.exs +++ b/test/nopea/mcp_test.exs @@ -209,6 +209,100 @@ defmodule Nopea.MCPTest do end end + describe "handle_request/2 shutdown" do + test "returns null result and sets shutdown flag" do + state = MCP.initial_state() + request = %{"jsonrpc" => "2.0", "id" => 50, "method" => "shutdown"} + + {response, new_state} = MCP.handle_request(request, state) + + assert response["jsonrpc"] == "2.0" + assert response["id"] == 50 + assert response["result"] == nil + assert new_state.shutdown_received == true + end + + test "shutdown flag starts as false" do + state = MCP.initial_state() + assert state.shutdown_received == false + end + end + + describe "handle_request/2 exit" do + test "returns :halt after shutdown was received" do + state = %{shutdown_received: true} + request = %{"jsonrpc" => "2.0", "method" => "exit"} + + assert :halt = MCP.handle_request(request, state) + end + + test "returns :halt even without prior shutdown" do + state = %{shutdown_received: false} + request = %{"jsonrpc" => "2.0", "method" => "exit"} + + assert :halt = MCP.handle_request(request, state) + end + end + + describe "handle_request/2 notifications/cancelled" do + test "returns nil response and preserves state" do + state = MCP.initial_state() + + request = %{ + "jsonrpc" => "2.0", + "method" => "notifications/cancelled", + "params" => %{"id" => 42, "reason" => "user cancelled"} + } + + {response, new_state} = MCP.handle_request(request, state) + + assert response == nil + assert new_state == state + end + + test "handles missing params gracefully" do + state = MCP.initial_state() + request = %{"jsonrpc" => "2.0", "method" => "notifications/cancelled"} + + {response, new_state} = MCP.handle_request(request, state) + + assert response == nil + assert new_state == state + end + end + + describe "handle_request/2 state passthrough" do + test "initialize preserves state" do + state = %{shutdown_received: false} + request = %{"jsonrpc" => "2.0", "id" => 1, "method" => "initialize"} + + {response, new_state} = MCP.handle_request(request, state) + + assert response["result"]["serverInfo"]["name"] == "nopea" + assert new_state == state + end + + test "tools/list preserves state" do + state = %{shutdown_received: false} + request = %{"jsonrpc" => "2.0", "id" => 2, "method" => "tools/list"} + + {response, new_state} = MCP.handle_request(request, state) + + assert is_list(response["result"]["tools"]) + assert new_state == state + end + + test "notifications/initialized preserves state" do + state = %{shutdown_received: true} + request = %{"jsonrpc" => "2.0", "method" => "notifications/initialized"} + + {response, new_state} = MCP.handle_request(request, state) + + assert response == nil + assert new_state.shutdown_received == true + end + end + describe "encode/decode" do test "round-trips through JSON" do request = %{ From bc1c81cb67220b5517d171b0ce8a85a81de053d0 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Mon, 16 Mar 2026 15:12:43 +0100 Subject: [PATCH 06/12] docs: add cluster config keys to runtime.exs and CLAUDE.md Read all cluster-related config keys from environment variables in the prod block of runtime.exs: NOPEA_CLUSTER_STRATEGY, NOPEA_CLUSTER_SERVICE, NOPEA_CLUSTER_APP_NAME, NOPEA_POD_NAMESPACE, NOPEA_CLUSTER_POLLING_INTERVAL, NOPEA_CLUSTER_GOSSIP_PORT, NOPEA_CLUSTER_GOSSIP_SECRET, NOPEA_CLUSTER_HOSTS. Update the configuration table in CLAUDE.md with all cluster keys. Also fix corrupted literal \n in config/config.exs logger metadata. Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 8 ++++++++ config/config.exs | 3 ++- config/runtime.exs | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index e127477..e932e8e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -100,6 +100,14 @@ Most children are optional, controlled by `Application.get_env(:nopea, key)`: | `:cluster_enabled` | `false` | Cluster + DistributedRegistry | | `:cdevents_endpoint` | `nil` | Events.Emitter (started only if set) | | `:canary_threshold` | `0.15` | Failure confidence for auto-canary | +| `:cluster_strategy` | `:kubernetes_dns` | libcluster strategy (`:kubernetes_dns`, `:gossip`, `:epmd`) | +| `:cluster_service` | `"nopea-headless"` | K8s headless service for DNS discovery | +| `:cluster_app_name` | `"nopea"` | Erlang application name for DNS discovery | +| `:pod_namespace` | `"default"` | Kubernetes namespace for DNS discovery | +| `:cluster_polling_interval` | `5_000` | DNS polling interval in ms | +| `:cluster_gossip_port` | `45_892` | UDP port for gossip strategy | +| `:cluster_gossip_secret` | `nil` | Shared secret for gossip authentication | +| `:cluster_hosts` | `[]` | Node list for EPMD strategy (atom list) | --- diff --git a/config/config.exs b/config/config.exs index 6ec2e5d..b365597 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,7 +30,8 @@ config :logger, :default_formatter, :type, :endpoint, :path, - :manifest_count + :manifest_count, + :request_id ] import_config "#{config_env()}.exs" diff --git a/config/runtime.exs b/config/runtime.exs index e1c471f..69e16ac 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -3,8 +3,47 @@ import Config if config_env() == :prod do cluster_enabled = System.get_env("NOPEA_CLUSTER_ENABLED", "false") == "true" + cluster_strategy = + case System.get_env("NOPEA_CLUSTER_STRATEGY") do + "kubernetes_dns" -> :kubernetes_dns + "gossip" -> :gossip + "epmd" -> :epmd + _ -> :kubernetes_dns + end + + cluster_hosts = + case System.get_env("NOPEA_CLUSTER_HOSTS") do + nil -> + [] + + hosts -> + hosts + |> String.split(",", trim: true) + |> Enum.map(&String.to_atom(String.trim(&1))) + end + + cluster_polling_interval = + case System.get_env("NOPEA_CLUSTER_POLLING_INTERVAL") do + nil -> 5_000 + val -> String.to_integer(val) + end + + cluster_gossip_port = + case System.get_env("NOPEA_CLUSTER_GOSSIP_PORT") do + nil -> 45_892 + val -> String.to_integer(val) + end + config :nopea, cluster_enabled: cluster_enabled, + cluster_strategy: cluster_strategy, + cluster_service: System.get_env("NOPEA_CLUSTER_SERVICE", "nopea-headless"), + cluster_app_name: System.get_env("NOPEA_CLUSTER_APP_NAME", "nopea"), + pod_namespace: System.get_env("NOPEA_POD_NAMESPACE", "default"), + cluster_polling_interval: cluster_polling_interval, + cluster_gossip_port: cluster_gossip_port, + cluster_gossip_secret: System.get_env("NOPEA_CLUSTER_GOSSIP_SECRET"), + cluster_hosts: cluster_hosts, enable_router: System.get_env("NOPEA_ENABLE_ROUTER", "true") == "true", api_port: String.to_integer(System.get_env("NOPEA_API_PORT", "4000")) end From c1c7af25faccc9eb9d447a7bf7a3332f853f4f2b Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Tue, 17 Mar 2026 00:42:32 +0100 Subject: [PATCH 07/12] feat: add HTTP API authentication plug Create Nopea.API.AuthPlug that checks x-api-key header against the configured NOPEA_API_KEY. Skip auth for /health and /ready paths. When no key is configured (nil), all requests pass through (dev mode). Wire plug into router between Plug.Parsers and :match. Co-Authored-By: Claude Opus 4.6 --- config/runtime.exs | 1 + lib/nopea/api/auth_plug.ex | 59 +++++++++++++++++++ lib/nopea/api/router.ex | 1 + test/nopea/api/auth_plug_test.exs | 95 +++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 lib/nopea/api/auth_plug.ex create mode 100644 test/nopea/api/auth_plug_test.exs diff --git a/config/runtime.exs b/config/runtime.exs index 69e16ac..3daa461 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -45,5 +45,6 @@ if config_env() == :prod do cluster_gossip_secret: System.get_env("NOPEA_CLUSTER_GOSSIP_SECRET"), cluster_hosts: cluster_hosts, enable_router: System.get_env("NOPEA_ENABLE_ROUTER", "true") == "true", + api_key: System.get_env("NOPEA_API_KEY"), api_port: String.to_integer(System.get_env("NOPEA_API_PORT", "4000")) end diff --git a/lib/nopea/api/auth_plug.ex b/lib/nopea/api/auth_plug.ex new file mode 100644 index 0000000..90c2ec2 --- /dev/null +++ b/lib/nopea/api/auth_plug.ex @@ -0,0 +1,59 @@ +defmodule Nopea.API.AuthPlug do + @moduledoc """ + Plug that authenticates API requests using an `x-api-key` header. + + ## Behaviour + + - If no API key is configured (nil), all requests pass through (dev mode). + - If a key is configured, requests must include a matching `x-api-key` header. + - `/health` and `/ready` paths are always allowed without authentication. + - Returns 401 Unauthorized for missing or invalid keys. + """ + + @behaviour Plug + + import Plug.Conn + + @skip_paths ["/health", "/ready"] + + @impl true + @spec init(keyword()) :: keyword() + def init(opts), do: opts + + @impl true + @spec call(Plug.Conn.t(), keyword()) :: Plug.Conn.t() + def call(conn, _opts) do + if skip_auth?(conn) do + conn + else + case configured_key() do + nil -> + conn + + expected_key -> + verify_key(conn, expected_key) + end + end + end + + defp skip_auth?(conn) do + conn.request_path in @skip_paths + end + + defp configured_key do + Application.get_env(:nopea, :api_key) + end + + defp verify_key(conn, expected_key) do + case get_req_header(conn, "x-api-key") do + [^expected_key] -> + conn + + _ -> + conn + |> put_resp_content_type("application/json") + |> send_resp(401, Jason.encode!(%{error: "unauthorized"})) + |> halt() + end + end +end diff --git a/lib/nopea/api/router.ex b/lib/nopea/api/router.ex index 220a5c9..5a38e54 100644 --- a/lib/nopea/api/router.ex +++ b/lib/nopea/api/router.ex @@ -14,6 +14,7 @@ defmodule Nopea.API.Router do json_decoder: Jason ) + plug(Nopea.API.AuthPlug) plug(:match) plug(:dispatch) diff --git a/test/nopea/api/auth_plug_test.exs b/test/nopea/api/auth_plug_test.exs new file mode 100644 index 0000000..82ef8db --- /dev/null +++ b/test/nopea/api/auth_plug_test.exs @@ -0,0 +1,95 @@ +defmodule Nopea.API.AuthPlugTest do + use ExUnit.Case, async: true + import Plug.Test + import Plug.Conn + + alias Nopea.API.AuthPlug + + setup do + # Ensure no api_key is set by default (dev mode) + original = Application.get_env(:nopea, :api_key) + on_exit(fn -> Application.put_env(:nopea, :api_key, original) end) + :ok + end + + describe "dev mode (no key configured)" do + test "allows requests without x-api-key header" do + Application.put_env(:nopea, :api_key, nil) + + conn = + conn(:get, "/api/services") + |> AuthPlug.call(AuthPlug.init([])) + + refute conn.halted + end + + test "allows requests to /api/deploy without key" do + Application.put_env(:nopea, :api_key, nil) + + conn = + conn(:post, "/api/deploy") + |> AuthPlug.call(AuthPlug.init([])) + + refute conn.halted + end + end + + describe "with key configured" do + test "allows requests with valid x-api-key" do + Application.put_env(:nopea, :api_key, "test-secret-key") + + conn = + conn(:get, "/api/services") + |> put_req_header("x-api-key", "test-secret-key") + |> AuthPlug.call(AuthPlug.init([])) + + refute conn.halted + end + + test "rejects requests with invalid x-api-key" do + Application.put_env(:nopea, :api_key, "test-secret-key") + + conn = + conn(:get, "/api/services") + |> put_req_header("x-api-key", "wrong-key") + |> AuthPlug.call(AuthPlug.init([])) + + assert conn.halted + assert conn.status == 401 + assert Jason.decode!(conn.resp_body) == %{"error" => "unauthorized"} + end + + test "rejects requests with missing x-api-key" do + Application.put_env(:nopea, :api_key, "test-secret-key") + + conn = + conn(:get, "/api/services") + |> AuthPlug.call(AuthPlug.init([])) + + assert conn.halted + assert conn.status == 401 + end + end + + describe "skipped paths" do + test "allows /health without key" do + Application.put_env(:nopea, :api_key, "test-secret-key") + + conn = + conn(:get, "/health") + |> AuthPlug.call(AuthPlug.init([])) + + refute conn.halted + end + + test "allows /ready without key" do + Application.put_env(:nopea, :api_key, "test-secret-key") + + conn = + conn(:get, "/ready") + |> AuthPlug.call(AuthPlug.init([])) + + refute conn.halted + end + end +end From 4509bcb13c5c07731453009d976e89394273e942 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Tue, 17 Mar 2026 02:24:35 +0100 Subject: [PATCH 08/12] feat: start DistributedSupervisor when clustering is enabled Add add_distributed_supervisor_child/2 helper to the application startup pipeline, placed after the registry child and before the service_agent child. When cluster_enabled is true, Nopea.DistributedSupervisor (Horde) is added to the supervision tree for cross-node process distribution. Also fix async test races in auth_plug_test and router_test by making them non-async since they modify Application config. Co-Authored-By: Claude Opus 4.6 --- lib/nopea/application.ex | 6 ++++++ test/nopea/api/auth_plug_test.exs | 2 +- test/nopea/api/router_test.exs | 6 +++++- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/nopea/application.ex b/lib/nopea/application.ex index e9e7f34..cb26b28 100644 --- a/lib/nopea/application.ex +++ b/lib/nopea/application.ex @@ -28,6 +28,7 @@ defmodule Nopea.Application do |> add_memory_child() |> add_cluster_child(cluster_enabled) |> add_registry_child(cluster_enabled) + |> add_distributed_supervisor_child(cluster_enabled) |> add_service_agent_child() |> add_progressive_child() |> add_router_child() @@ -82,6 +83,11 @@ defmodule Nopea.Application do end end + defp add_distributed_supervisor_child(children, false), do: children + + defp add_distributed_supervisor_child(children, true), + do: children ++ [Nopea.DistributedSupervisor] + defp add_service_agent_child(children) do if Application.get_env(:nopea, :enable_deploy_supervisor, true), do: children ++ [Nopea.ServiceAgent.Supervisor], diff --git a/test/nopea/api/auth_plug_test.exs b/test/nopea/api/auth_plug_test.exs index 82ef8db..3fcd5af 100644 --- a/test/nopea/api/auth_plug_test.exs +++ b/test/nopea/api/auth_plug_test.exs @@ -1,5 +1,5 @@ defmodule Nopea.API.AuthPlugTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false import Plug.Test import Plug.Conn diff --git a/test/nopea/api/router_test.exs b/test/nopea/api/router_test.exs index 1c5b0a4..1659407 100644 --- a/test/nopea/api/router_test.exs +++ b/test/nopea/api/router_test.exs @@ -1,5 +1,5 @@ defmodule Nopea.API.RouterTest do - use ExUnit.Case, async: true + use ExUnit.Case, async: false import Plug.Test import Plug.Conn @@ -12,6 +12,10 @@ defmodule Nopea.API.RouterTest do setup do Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + # Ensure auth plug is in dev mode (no key required) + original_key = Application.get_env(:nopea, :api_key) + Application.put_env(:nopea, :api_key, nil) + on_exit(fn -> Application.put_env(:nopea, :api_key, original_key) end) :ok end From db891c61a893b653d924c4575f22efbaff6fa032 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Wed, 18 Mar 2026 00:57:55 +0100 Subject: [PATCH 09/12] fix: wire up dead Prometheus metrics with telemetry emissions 5 of 6 metrics defined in metrics.ex never fired because no code emitted the corresponding telemetry events. Add emissions for: - [:nopea, :deploy, :stop] via new emit_deploy_complete/2 in Metrics - [:nopea, :memory, :query, :stop] with duration in Memory.get_deploy_context - [:nopea, :verify, :drift] with count in Drift.verify_manifest - [:nopea, :deploys, :active] with count in ServiceAgent.health Also capture and thread the metrics start_time through deploy.ex so emit_deploy_complete fires on both success and progressive delivery paths. Co-Authored-By: Claude Opus 4.6 --- lib/nopea/deploy.ex | 27 +++++++++++++++++++------- lib/nopea/drift.ex | 21 ++++++++++++++------ lib/nopea/memory.ex | 9 +++++++++ lib/nopea/metrics.ex | 13 +++++++++++++ lib/nopea/service_agent.ex | 39 ++++++++++++++++++++++++-------------- 5 files changed, 82 insertions(+), 27 deletions(-) diff --git a/lib/nopea/deploy.ex b/lib/nopea/deploy.ex index 5698cc5..a9e683d 100644 --- a/lib/nopea/deploy.ex +++ b/lib/nopea/deploy.ex @@ -52,7 +52,7 @@ defmodule Nopea.Deploy do ) # 3. Emit start event - emit_start(spec, deploy_id, strategy) + metrics_start_time = emit_start(spec, deploy_id, strategy) # 4. Execute case execute_strategy(strategy, spec) do @@ -61,6 +61,11 @@ defmodule Nopea.Deploy do result = Result.progressing(deploy_id, spec, strategy, applied, duration_ms) start_progressive_monitor(deploy_id, spec, strategy) + Nopea.Metrics.emit_deploy_complete(metrics_start_time, %{ + service: spec.service, + strategy: strategy + }) + Logger.info("Deploy progressing", service: spec.service, deploy_id: deploy_id, @@ -79,7 +84,7 @@ defmodule Nopea.Deploy do # 6. Record success result = Result.success(deploy_id, spec, strategy, applied, duration_ms, verified) record_outcome(result, context) - emit_complete(spec, deploy_id, strategy, duration_ms, verified) + emit_complete(spec, deploy_id, strategy, duration_ms, verified, metrics_start_time) Logger.info("Deploy completed", service: spec.service, @@ -94,7 +99,7 @@ defmodule Nopea.Deploy do duration_ms = duration_ms(start_time) result = Result.failure(deploy_id, spec, strategy, reason, duration_ms) record_outcome(result, context) - emit_failure(spec, deploy_id, strategy, reason, duration_ms, start_time) + emit_failure(spec, deploy_id, strategy, reason, duration_ms, metrics_start_time) Logger.error("Deploy failed", service: spec.service, @@ -388,7 +393,8 @@ defmodule Nopea.Deploy do end defp emit_start(spec, deploy_id, strategy) do - Nopea.Metrics.emit_deploy_start(%{service: spec.service, strategy: strategy}) + metrics_start_time = + Nopea.Metrics.emit_deploy_start(%{service: spec.service, strategy: strategy}) if emitter_running?() do event = @@ -401,9 +407,16 @@ defmodule Nopea.Deploy do Nopea.Events.Emitter.emit(Nopea.Events.Emitter, event) end + + metrics_start_time end - defp emit_complete(spec, deploy_id, strategy, duration_ms, verified) do + defp emit_complete(spec, deploy_id, strategy, duration_ms, verified, metrics_start_time) do + Nopea.Metrics.emit_deploy_complete(metrics_start_time, %{ + service: spec.service, + strategy: strategy + }) + if emitter_running?() do event = Nopea.Events.deploy_completed(spec.service, %{ @@ -418,8 +431,8 @@ defmodule Nopea.Deploy do end end - defp emit_failure(spec, deploy_id, strategy, reason, duration_ms, start_time) do - Nopea.Metrics.emit_deploy_error(start_time, %{ + defp emit_failure(spec, deploy_id, strategy, reason, duration_ms, metrics_start_time) do + Nopea.Metrics.emit_deploy_error(metrics_start_time, %{ service: spec.service, strategy: strategy, error: reason diff --git a/lib/nopea/drift.ex b/lib/nopea/drift.ex index f4d5915..1fc9063 100644 --- a/lib/nopea/drift.ex +++ b/lib/nopea/drift.ex @@ -85,12 +85,21 @@ defmodule Nopea.Drift do last_applied_result = cache_module.get_last_applied(service, resource_key) live_result = k8s_module.get_resource(api_version, kind, name, namespace) - case {last_applied_result, live_result} do - {{:error, :not_found}, {:error, _}} -> :new_resource - {{:error, :not_found}, {:ok, _live}} -> :needs_apply - {{:ok, _last}, {:error, _}} -> :new_resource - {{:ok, last_applied}, {:ok, live}} -> three_way_diff(last_applied, manifest, live) - end + result = + case {last_applied_result, live_result} do + {{:error, :not_found}, {:error, _}} -> :new_resource + {{:error, :not_found}, {:ok, _live}} -> :needs_apply + {{:ok, _last}, {:error, _}} -> :new_resource + {{:ok, last_applied}, {:ok, live}} -> three_way_diff(last_applied, manifest, live) + end + + :telemetry.execute( + [:nopea, :verify, :drift], + %{count: 1}, + %{service: service} + ) + + result end @spec compute_hash(map()) :: {:ok, String.t()} | {:error, term()} diff --git a/lib/nopea/memory.ex b/lib/nopea/memory.ex index f2923ad..d0473b1 100644 --- a/lib/nopea/memory.ex +++ b/lib/nopea/memory.ex @@ -80,7 +80,16 @@ defmodule Nopea.Memory do end def handle_call({:get_deploy_context, service, namespace}, _from, state) do + query_start = System.monotonic_time() context = Nopea.Memory.Query.deploy_context(state.graph, service, namespace) + duration = System.monotonic_time() - query_start + + :telemetry.execute( + [:nopea, :memory, :query, :stop], + %{duration: duration}, + %{service: service} + ) + {:reply, context, state} end diff --git a/lib/nopea/metrics.ex b/lib/nopea/metrics.ex index 2814ad3..e5fff92 100644 --- a/lib/nopea/metrics.ex +++ b/lib/nopea/metrics.ex @@ -67,6 +67,19 @@ defmodule Nopea.Metrics do start_time end + @spec emit_deploy_complete(integer(), map()) :: :ok + def emit_deploy_complete(start_time, metadata) do + duration = System.monotonic_time() - start_time + + :telemetry.execute( + [:nopea, :deploy, :stop], + %{duration: duration}, + metadata + ) + + :ok + end + @spec emit_deploy_error(integer(), map()) :: :ok def emit_deploy_error(start_time, metadata) do duration = System.monotonic_time() - start_time diff --git a/lib/nopea/service_agent.ex b/lib/nopea/service_agent.ex index 3b08107..c7575d8 100644 --- a/lib/nopea/service_agent.ex +++ b/lib/nopea/service_agent.ex @@ -69,20 +69,31 @@ defmodule Nopea.ServiceAgent do @spec health() :: [map()] def health do if registry_available?() do - Registry.select(Nopea.Registry, [ - {{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]} - ]) - |> Enum.filter(fn - {{:service, _name}, _pid} -> true - _ -> false - end) - |> Enum.map(fn {{:service, name}, pid} -> - try do - GenServer.call(pid, :status, 5_000) - catch - :exit, _ -> %{service: name, status: :unavailable} - end - end) + statuses = + Registry.select(Nopea.Registry, [ + {{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]} + ]) + |> Enum.filter(fn + {{:service, _name}, _pid} -> true + _ -> false + end) + |> Enum.map(fn {{:service, name}, pid} -> + try do + GenServer.call(pid, :status, 5_000) + catch + :exit, _ -> %{service: name, status: :unavailable} + end + end) + + active_count = Enum.count(statuses, fn s -> s[:status] == :deploying end) + + :telemetry.execute( + [:nopea, :deploys, :active], + %{count: active_count}, + %{} + ) + + statuses else [] end From ca42ee5ec16b1ec693232f3220a12368a47565dd Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Wed, 18 Mar 2026 01:18:09 +0100 Subject: [PATCH 10/12] fix: cache applied manifests so drift detection can find last-applied state Cache.put_last_applied/3 existed but was never called after applying manifests, so the three-way diff in drift detection could never find last-applied state and always returned :needs_apply or :new_resource. - Add cache_applied_manifests/2 in deploy.ex that iterates applied manifests and calls Cache.put_last_applied/3 for each one - Call it in both the direct strategy success path (before verify) and the canary/blue_green progressing path - Add public Drift.resource_key/1 delegating to Applier.resource_key/1 Co-Authored-By: Claude Opus 4.6 --- lib/nopea/deploy.ex | 19 +++++++++++++++++-- lib/nopea/drift.ex | 4 ++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/lib/nopea/deploy.ex b/lib/nopea/deploy.ex index a9e683d..83ee5a1 100644 --- a/lib/nopea/deploy.ex +++ b/lib/nopea/deploy.ex @@ -60,6 +60,7 @@ defmodule Nopea.Deploy do duration_ms = duration_ms(start_time) result = Result.progressing(deploy_id, spec, strategy, applied, duration_ms) start_progressive_monitor(deploy_id, spec, strategy) + cache_applied_manifests(spec.service, applied) Nopea.Metrics.emit_deploy_complete(metrics_start_time, %{ service: spec.service, @@ -78,10 +79,13 @@ defmodule Nopea.Deploy do {:ok, applied} -> duration_ms = duration_ms(start_time) - # 5. Verify + # 5. Cache applied manifests for drift detection + cache_applied_manifests(spec.service, applied) + + # 6. Verify verified = verify_deploy(spec, applied) - # 6. Record success + # 7. Record success result = Result.success(deploy_id, spec, strategy, applied, duration_ms, verified) record_outcome(result, context) emit_complete(spec, deploy_id, strategy, duration_ms, verified, metrics_start_time) @@ -497,6 +501,17 @@ defmodule Nopea.Deploy do end end + defp cache_applied_manifests(service, applied) when is_list(applied) do + if Nopea.Cache.available?() do + Enum.each(applied, fn manifest -> + key = Nopea.Drift.resource_key(manifest) + Nopea.Cache.put_last_applied(service, key, manifest) + end) + end + end + + defp cache_applied_manifests(_service, _applied), do: :ok + defp duration_ms(start_time) do System.convert_time_unit(System.monotonic_time() - start_time, :native, :millisecond) end diff --git a/lib/nopea/drift.ex b/lib/nopea/drift.ex index 1fc9063..254872d 100644 --- a/lib/nopea/drift.ex +++ b/lib/nopea/drift.ex @@ -108,6 +108,10 @@ defmodule Nopea.Drift do {:ok, "sha256:#{do_hash(normalized)}"} end + @doc "Returns a cache key string for a manifest, suitable for Cache.put_last_applied/3." + @spec resource_key(map()) :: String.t() + def resource_key(manifest), do: Nopea.Applier.resource_key(manifest) + # Private functions defp strip_status(manifest), do: Map.delete(manifest, "status") From 239873645d89c95fd79547ffa4051a99d9e9247e Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Wed, 18 Mar 2026 02:20:33 +0100 Subject: [PATCH 11/12] refactor: extract Strategy.Canary and Strategy.BlueGreen from inline deploy logic The canary and blue_green strategy logic was inlined in Deploy.execute_strategy/2. Extract into dedicated modules implementing the Strategy behaviour, matching the existing Strategy.Direct pattern. Update the behaviour callback to include the {:ok, {applied, :progressing}} return type for progressive strategies. Co-Authored-By: Claude Opus 4.6 --- lib/nopea/deploy.ex | 15 +---- lib/nopea/strategy.ex | 8 ++- lib/nopea/strategy/blue_green.ex | 33 +++++++++++ lib/nopea/strategy/canary.ex | 33 +++++++++++ test/nopea/strategy/blue_green_test.exs | 74 +++++++++++++++++++++++++ test/nopea/strategy/canary_test.exs | 74 +++++++++++++++++++++++++ 6 files changed, 222 insertions(+), 15 deletions(-) create mode 100644 lib/nopea/strategy/blue_green.ex create mode 100644 lib/nopea/strategy/canary.ex create mode 100644 test/nopea/strategy/blue_green_test.exs create mode 100644 test/nopea/strategy/canary_test.exs diff --git a/lib/nopea/deploy.ex b/lib/nopea/deploy.ex index 83ee5a1..8d811c0 100644 --- a/lib/nopea/deploy.ex +++ b/lib/nopea/deploy.ex @@ -163,19 +163,8 @@ defmodule Nopea.Deploy do end defp execute_strategy(:direct, spec), do: Nopea.Strategy.Direct.execute(spec) - - defp execute_strategy(strategy, spec) when strategy in [:canary, :blue_green] do - case Nopea.Kulta.RolloutBuilder.build(spec, strategy) do - {:ok, rollout} -> - case k8s_module().apply_manifest(rollout, spec.namespace) do - {:ok, applied} -> {:ok, {[applied], :progressing}} - {:error, _} = error -> error - end - - {:error, _} = error -> - error - end - end + defp execute_strategy(:canary, spec), do: Nopea.Strategy.Canary.execute(spec) + defp execute_strategy(:blue_green, spec), do: Nopea.Strategy.BlueGreen.execute(spec) defp k8s_module do Application.get_env(:nopea, :k8s_module, Nopea.K8s) diff --git a/lib/nopea/strategy.ex b/lib/nopea/strategy.ex index 3e76710..cec736b 100644 --- a/lib/nopea/strategy.ex +++ b/lib/nopea/strategy.ex @@ -3,8 +3,12 @@ defmodule Nopea.Strategy do Behaviour for deployment strategies. All strategies take a DeploySpec and return either - {:ok, applied_resources} or {:error, reason}. + {:ok, applied_resources}, {:ok, {applied_resources, :progressing}}, or {:error, reason}. + + Progressive strategies (canary, blue_green) return the :progressing tuple + to indicate that a rollout monitor should be started. """ - @callback execute(Nopea.Deploy.Spec.t()) :: {:ok, [map()]} | {:error, term()} + @callback execute(Nopea.Deploy.Spec.t()) :: + {:ok, [map()]} | {:ok, {[map()], :progressing}} | {:error, term()} end diff --git a/lib/nopea/strategy/blue_green.ex b/lib/nopea/strategy/blue_green.ex new file mode 100644 index 0000000..906bc38 --- /dev/null +++ b/lib/nopea/strategy/blue_green.ex @@ -0,0 +1,33 @@ +defmodule Nopea.Strategy.BlueGreen do + @moduledoc """ + Blue/green deployment strategy. + + Builds a Kulta Rollout CRD with blue/green configuration and applies it. + Returns {:ok, {applied, :progressing}} to signal that a progressive + monitor should be started to track the rollout. + """ + + @behaviour Nopea.Strategy + + require Logger + + @impl true + @spec execute(Nopea.Deploy.Spec.t()) :: + {:ok, {[map()], :progressing}} | {:error, term()} + def execute(%Nopea.Deploy.Spec{} = spec) do + Logger.info("Blue/green deploy", + service: spec.service, + namespace: spec.namespace, + manifest_count: length(spec.manifests) + ) + + with {:ok, rollout} <- Nopea.Kulta.RolloutBuilder.build(spec, :blue_green), + {:ok, applied} <- k8s_module().apply_manifest(rollout, spec.namespace) do + {:ok, {[applied], :progressing}} + end + end + + defp k8s_module do + Application.get_env(:nopea, :k8s_module, Nopea.K8s) + end +end diff --git a/lib/nopea/strategy/canary.ex b/lib/nopea/strategy/canary.ex new file mode 100644 index 0000000..f9ea861 --- /dev/null +++ b/lib/nopea/strategy/canary.ex @@ -0,0 +1,33 @@ +defmodule Nopea.Strategy.Canary do + @moduledoc """ + Canary deployment strategy. + + Builds a Kulta Rollout CRD with canary configuration and applies it. + Returns {:ok, {applied, :progressing}} to signal that a progressive + monitor should be started to track the rollout. + """ + + @behaviour Nopea.Strategy + + require Logger + + @impl true + @spec execute(Nopea.Deploy.Spec.t()) :: + {:ok, {[map()], :progressing}} | {:error, term()} + def execute(%Nopea.Deploy.Spec{} = spec) do + Logger.info("Canary deploy", + service: spec.service, + namespace: spec.namespace, + manifest_count: length(spec.manifests) + ) + + with {:ok, rollout} <- Nopea.Kulta.RolloutBuilder.build(spec, :canary), + {:ok, applied} <- k8s_module().apply_manifest(rollout, spec.namespace) do + {:ok, {[applied], :progressing}} + end + end + + defp k8s_module do + Application.get_env(:nopea, :k8s_module, Nopea.K8s) + end +end diff --git a/test/nopea/strategy/blue_green_test.exs b/test/nopea/strategy/blue_green_test.exs new file mode 100644 index 0000000..53b8d02 --- /dev/null +++ b/test/nopea/strategy/blue_green_test.exs @@ -0,0 +1,74 @@ +defmodule Nopea.Strategy.BlueGreenTest do + use ExUnit.Case, async: true + + import Mox + + alias Nopea.Deploy.Spec + alias Nopea.Strategy.BlueGreen + alias Nopea.Test.Factory + + setup :verify_on_exit! + + setup do + Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + :ok + end + + describe "execute/1" do + test "builds blue_green rollout and returns progressing" do + deployment = Factory.sample_deployment_manifest("api", "production") + + applied_rollout = %{ + "apiVersion" => "kulta.io/v1alpha1", + "kind" => "Rollout", + "metadata" => %{"name" => "api", "namespace" => "production"} + } + + Nopea.K8sMock + |> expect(:apply_manifest, fn rollout, "production" -> + assert rollout["kind"] == "Rollout" + assert rollout["apiVersion"] == "kulta.io/v1alpha1" + assert rollout["metadata"]["name"] == "api" + assert rollout["spec"]["strategy"]["blueGreen"] != nil + {:ok, applied_rollout} + end) + + spec = %Spec{ + service: "api", + namespace: "production", + manifests: [deployment] + } + + assert {:ok, {[^applied_rollout], :progressing}} = BlueGreen.execute(spec) + end + + test "returns error when no deployment manifest found" do + configmap = Factory.sample_configmap_manifest("cfg", "default", %{"key" => "val"}) + + spec = %Spec{ + service: "api", + namespace: "default", + manifests: [configmap] + } + + assert {:error, :no_deployment_found} = BlueGreen.execute(spec) + end + + test "propagates K8s apply errors" do + deployment = Factory.sample_deployment_manifest("api", "default") + + Nopea.K8sMock + |> expect(:apply_manifest, fn _rollout, "default" -> + {:error, :timeout} + end) + + spec = %Spec{ + service: "api", + namespace: "default", + manifests: [deployment] + } + + assert {:error, :timeout} = BlueGreen.execute(spec) + end + end +end diff --git a/test/nopea/strategy/canary_test.exs b/test/nopea/strategy/canary_test.exs new file mode 100644 index 0000000..7cccec7 --- /dev/null +++ b/test/nopea/strategy/canary_test.exs @@ -0,0 +1,74 @@ +defmodule Nopea.Strategy.CanaryTest do + use ExUnit.Case, async: true + + import Mox + + alias Nopea.Deploy.Spec + alias Nopea.Strategy.Canary + alias Nopea.Test.Factory + + setup :verify_on_exit! + + setup do + Mox.stub_with(Nopea.K8sMock, Nopea.K8s) + :ok + end + + describe "execute/1" do + test "builds canary rollout and returns progressing" do + deployment = Factory.sample_deployment_manifest("web", "staging") + + applied_rollout = %{ + "apiVersion" => "kulta.io/v1alpha1", + "kind" => "Rollout", + "metadata" => %{"name" => "web", "namespace" => "staging"} + } + + Nopea.K8sMock + |> expect(:apply_manifest, fn rollout, "staging" -> + assert rollout["kind"] == "Rollout" + assert rollout["apiVersion"] == "kulta.io/v1alpha1" + assert rollout["metadata"]["name"] == "web" + assert rollout["spec"]["strategy"]["canary"] != nil + {:ok, applied_rollout} + end) + + spec = %Spec{ + service: "web", + namespace: "staging", + manifests: [deployment] + } + + assert {:ok, {[^applied_rollout], :progressing}} = Canary.execute(spec) + end + + test "returns error when no deployment manifest found" do + configmap = Factory.sample_configmap_manifest("cfg", "default", %{"key" => "val"}) + + spec = %Spec{ + service: "web", + namespace: "default", + manifests: [configmap] + } + + assert {:error, :no_deployment_found} = Canary.execute(spec) + end + + test "propagates K8s apply errors" do + deployment = Factory.sample_deployment_manifest("web", "default") + + Nopea.K8sMock + |> expect(:apply_manifest, fn _rollout, "default" -> + {:error, :forbidden} + end) + + spec = %Spec{ + service: "web", + namespace: "default", + manifests: [deployment] + } + + assert {:error, :forbidden} = Canary.execute(spec) + end + end +end From 5ef1ed4920f667a23714b30c4954c74d1968d440 Mon Sep 17 00:00:00 2001 From: Yair Etziony Date: Wed, 18 Mar 2026 22:03:20 +0100 Subject: [PATCH 12/12] refactor: replace Process.sleep in tests with proper synchronization Add assert_eventually polling macro in test/support/test_helpers.ex and replace all Process.sleep calls with deterministic alternatives: - Memory barrier (node_count/0 sync call) for async cast flushes - assert_eventually for supervisor restart and state convergence checks - assert_receive with message signaling for deploy start coordination Leaves distributed_registry_test.exs unchanged (intentional polling sleeps). Co-Authored-By: Claude Opus 4.6 --- test/nopea/deploy_integration_test.exs | 4 +- test/nopea/distributed_supervisor_test.exs | 9 ++- test/nopea/events/emitter_test.exs | 12 ++-- .../integration/service_agent_deploy_test.exs | 20 ++++--- test/nopea/memory_test.exs | 13 ++--- test/nopea/service_agent_test.exs | 56 ++++++++++++------- test/support/test_helpers.ex | 35 ++++++++++++ 7 files changed, 104 insertions(+), 45 deletions(-) create mode 100644 test/support/test_helpers.ex diff --git a/test/nopea/deploy_integration_test.exs b/test/nopea/deploy_integration_test.exs index 1bde765..ce69965 100644 --- a/test/nopea/deploy_integration_test.exs +++ b/test/nopea/deploy_integration_test.exs @@ -98,8 +98,8 @@ defmodule Nopea.DeployIntegrationTest do Deploy.run(spec) - # Wait for async cast to complete - Process.sleep(50) + # Flush async cast via sync call (BEAM mailbox FIFO ordering) + _ = Nopea.Memory.node_count() ctx = Nopea.Memory.get_deploy_context("fragile-svc", "prod") assert ctx.known == true diff --git a/test/nopea/distributed_supervisor_test.exs b/test/nopea/distributed_supervisor_test.exs index b746226..1f3dfcd 100644 --- a/test/nopea/distributed_supervisor_test.exs +++ b/test/nopea/distributed_supervisor_test.exs @@ -9,6 +9,8 @@ defmodule Nopea.DistributedSupervisorTest do use ExUnit.Case, async: false + import Nopea.Test.Helpers + alias Nopea.{DistributedRegistry, DistributedSupervisor} @moduletag :distributed @@ -133,9 +135,10 @@ defmodule Nopea.DistributedSupervisorTest do # Process should be dead refute Process.alive?(pid) - # Registry should clean up - Process.sleep(100) - assert {:error, :not_found} = DistributedRegistry.lookup(key) + # Poll until registry cleans up + assert_eventually do + {:error, :not_found} == DistributedRegistry.lookup(key) + end end end diff --git a/test/nopea/events/emitter_test.exs b/test/nopea/events/emitter_test.exs index 02d5279..b32321a 100644 --- a/test/nopea/events/emitter_test.exs +++ b/test/nopea/events/emitter_test.exs @@ -1,6 +1,8 @@ defmodule Nopea.Events.EmitterTest do use ExUnit.Case, async: true + import Nopea.Test.Helpers + alias Nopea.Events alias Nopea.Events.Emitter @@ -124,11 +126,11 @@ defmodule Nopea.Events.EmitterTest do Emitter.emit(pid, event) - # Wait for retries - Process.sleep(100) - - state = Emitter.get_state(pid) - assert state.dropped_count == 1 + # Poll until the emitter has processed all retries and dropped the event + assert_eventually do + state = Emitter.get_state(pid) + state.dropped_count == 1 + end end end diff --git a/test/nopea/integration/service_agent_deploy_test.exs b/test/nopea/integration/service_agent_deploy_test.exs index bfced3a..2553041 100644 --- a/test/nopea/integration/service_agent_deploy_test.exs +++ b/test/nopea/integration/service_agent_deploy_test.exs @@ -8,6 +8,8 @@ defmodule Nopea.Integration.ServiceAgentDeployTest do use ExUnit.Case + import Nopea.Test.Helpers + alias Nopea.ServiceAgent alias Nopea.Deploy alias Nopea.Deploy.Spec @@ -82,7 +84,14 @@ defmodule Nopea.Integration.ServiceAgentDeployTest do pid = ServiceAgent.ensure_started("crash-target") Process.exit(pid, :kill) - Process.sleep(50) + + # Poll until crash-target has been restarted by supervisor + assert_eventually do + case ServiceAgent.status("crash-target") do + {:ok, %{status: :idle, deploy_count: 0}} -> true + _ -> false + end + end # Other services still healthy {:ok, s1} = ServiceAgent.status("stable-1") @@ -92,11 +101,6 @@ defmodule Nopea.Integration.ServiceAgentDeployTest do {:ok, s2} = ServiceAgent.status("stable-2") assert s2.status == :idle assert s2.deploy_count == 1 - - # Crash-target recovered but lost state - {:ok, crashed} = ServiceAgent.status("crash-target") - assert crashed.status == :idle - assert crashed.deploy_count == 0 end end @@ -150,8 +154,8 @@ defmodule Nopea.Integration.ServiceAgentDeployTest do Task.await_many(tasks, 10_000) - # Memory.record_deploy is a cast — give it time - Process.sleep(100) + # Flush async casts via sync call (BEAM mailbox FIFO ordering) + _ = Nopea.Memory.node_count() Enum.each(services, fn svc -> ctx = Nopea.Memory.get_deploy_context(svc, "default") diff --git a/test/nopea/memory_test.exs b/test/nopea/memory_test.exs index ad76a6a..0db2b17 100644 --- a/test/nopea/memory_test.exs +++ b/test/nopea/memory_test.exs @@ -28,8 +28,8 @@ defmodule Nopea.MemoryTest do error: nil }) - # Cast is async, give it a moment - Process.sleep(50) + # Flush async cast via sync call (BEAM mailbox FIFO ordering) + _ = Memory.node_count() ctx = Memory.get_deploy_context("auth-service", "production") assert ctx.service == "auth-service" @@ -45,7 +45,8 @@ defmodule Nopea.MemoryTest do error: {:timeout, "connection refused"} }) - Process.sleep(50) + # Flush async cast via sync call (BEAM mailbox FIFO ordering) + _ = Memory.node_count() ctx = Memory.get_deploy_context("api-gateway", "staging") assert ctx.known == true @@ -73,8 +74,7 @@ defmodule Nopea.MemoryTest do error: nil }) - Process.sleep(50) - + # Flush async cast via sync call (BEAM mailbox FIFO ordering) # service + namespace assert Memory.node_count() == 2 # depends_on @@ -94,9 +94,8 @@ defmodule Nopea.MemoryTest do test "decay message is handled without crash" do # Manually send :decay to trigger the handler send(Process.whereis(Memory), :decay) - Process.sleep(50) - # Should still be alive and functional + # Flush the :decay handle_info via a sync call assert Memory.node_count() == 0 end end diff --git a/test/nopea/service_agent_test.exs b/test/nopea/service_agent_test.exs index 0a635bd..055a9d7 100644 --- a/test/nopea/service_agent_test.exs +++ b/test/nopea/service_agent_test.exs @@ -2,6 +2,7 @@ defmodule Nopea.ServiceAgentTest do use ExUnit.Case import Mox + import Nopea.Test.Helpers alias Nopea.ServiceAgent alias Nopea.Deploy.Spec @@ -125,14 +126,13 @@ defmodule Nopea.ServiceAgentTest do pid = ServiceAgent.ensure_started("crash-svc") Process.exit(pid, :kill) - # Give supervisor time to restart - Process.sleep(50) - - {:ok, status} = ServiceAgent.status("crash-svc") - assert status.service == "crash-svc" - assert status.status == :idle - # State resets on crash — deploy_count back to 0 - assert status.deploy_count == 0 + # Poll until supervisor restarts the agent + assert_eventually do + match?( + {:ok, %{service: "crash-svc", status: :idle, deploy_count: 0}}, + ServiceAgent.status("crash-svc") + ) + end end test "recovers last_result from cache after restart" do @@ -143,26 +143,36 @@ defmodule Nopea.ServiceAgentTest do # Kill and let supervisor restart [{pid, _}] = Registry.lookup(Nopea.Registry, {:service, "cache-svc"}) Process.exit(pid, :kill) - Process.sleep(50) - {:ok, status} = ServiceAgent.status("cache-svc") - assert status.last_result != nil - assert status.last_result.status == :completed + # Poll until restarted and cache is restored + assert_eventually do + case ServiceAgent.status("cache-svc") do + {:ok, %{last_result: %{status: :completed}}} -> true + _ -> false + end + end end end describe "cooldown after crash" do test "delays next queued deploy after worker crash" do - # Make apply_manifests crash to trigger the DOWN handler + test_pid = self() + + # First call signals then crashes; second just crashes Mox.expect(Nopea.K8sMock, :apply_manifests, 2, fn _manifests, _ns -> + send(test_pid, :deploy_started) raise "boom" end) spec = make_spec("cooldown-svc", manifests: [%{"kind" => "Deployment"}]) - # Fire two deploys concurrently — first will crash, second gets queued + # Fire first deploy — it will signal us then crash task1 = Task.async(fn -> ServiceAgent.deploy("cooldown-svc", spec) end) - Process.sleep(10) + + # Wait until the first deploy actually starts executing + assert_receive :deploy_started, 5_000 + + # Now enqueue second deploy task2 = Task.async(fn -> ServiceAgent.deploy("cooldown-svc", spec) end) # First deploy fails from crash @@ -205,8 +215,13 @@ defmodule Nopea.ServiceAgentTest do Task.async(fn -> ServiceAgent.deploy("queue-svc", spec) end) end - # Give queued tasks time to reach the agent - Process.sleep(50) + # Poll until the queue is full + assert_eventually do + case ServiceAgent.status("queue-svc") do + {:ok, %{queue_length: 10}} -> true + _ -> false + end + end # 11th should be rejected immediately with :queue_full overflow_result = ServiceAgent.deploy("queue-svc", spec) @@ -232,10 +247,11 @@ defmodule Nopea.ServiceAgentTest do # Simulate the idle timeout firing send(pid, :idle_timeout) - Process.sleep(50) - # Agent should have stopped - refute Process.alive?(pid) + # Poll until agent stops + assert_eventually do + not Process.alive?(pid) + end end test "idle timeout is rescheduled during deploy" do diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex new file mode 100644 index 0000000..142cdf1 --- /dev/null +++ b/test/support/test_helpers.ex @@ -0,0 +1,35 @@ +defmodule Nopea.Test.Helpers do + @moduledoc """ + Shared test helpers — polling assertions that replace raw `Process.sleep`. + """ + + @doc "Polls until the given block returns a truthy value, with timeout." + defmacro assert_eventually(timeout_ms \\ 1000, do: block) do + quote do + deadline = System.monotonic_time(:millisecond) + unquote(timeout_ms) + + Stream.repeatedly(fn -> + try do + result = unquote(block) + if result, do: {:ok, result}, else: {:retry} + rescue + _ -> {:retry} + catch + _, _ -> {:retry} + end + end) + |> Enum.reduce_while(nil, fn + {:ok, result}, _acc -> + {:halt, result} + + {:retry}, _acc -> + if System.monotonic_time(:millisecond) > deadline do + {:halt, flunk("assert_eventually timed out after #{unquote(timeout_ms)}ms")} + else + Process.sleep(10) + {:cont, nil} + end + end) + end + end +end