diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f60bfb..ab2135e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added (v0.3.0 candidate) +- **Executive faculties** (`substrate.executive`) — the decision/reasoning layer on + the band foundation: + - **Temporal authority** — `SustainedLoadTracker` Protocol + `EwmaLoadTracker` + (EWMA + consecutive-breach): the sole SPIKE-vs-SUSTAINED authority, so the + pivot and damage/debt fire by *duration* (a transient peak is absorbed). + - **Scale** — `ExecutiveScale` / `ScaleAxis` + `axis_of` / `physical_parent` / + `entity_parent`: the entity / physical (cell→rack→zone→region) / grouping + roll-up axes. + - **Deliberation + perspective-taking** (`deliberate`) — roll out N candidate + actions, score each affected entity's impact *from its own frame* + (care-weight × φ-proportioned trajectory), disqualify floor-harm and + net-negative-long-cycle extraction, surface trade-offs, pick the arg-max + long-horizon net-potential-gain action. + - **State query** (`integrate_state`) — integrate a load history into energy / + effort (slacking = avoidance) / trajectory over time + a human-readable + summary. + - **Scale roll-up** (`roll_up`) — aggregate member loads up a parent scale → + distribution, worst member, over-load/idle failure-tell fractions. + - **Peer awareness + alarm propagation** (`correlate_anomalies` / `assess_alarm`) + — herd-panic correlation (N peers weird together ⇒ the enclosing scale has a + problem) + trust × independent-corroboration alarm weighting with the + panic-injection guard. + - **Observed-graph extraction detection** (`detect_extraction`) — the substrate + calculus over an observed relationship graph; flags predators vs supporters. + 91 conformance tests; pyright clean; pylint 10.00. + - **Executive band package** (`substrate.executive`) — the resistance band made operational as a decision engine. Two named lenses on one utilization value — `LoadZone` (the load lens: IDLE/RECREATION/WORK/PEAKING/WARNING/DANGER on the diff --git a/python/src/substrate/executive/__init__.py b/python/src/substrate/executive/__init__.py index 36d1c65..cbc0233 100644 --- a/python/src/substrate/executive/__init__.py +++ b/python/src/substrate/executive/__init__.py @@ -2,13 +2,16 @@ The resistance band, made operational as the substrate's executive function: the corrected symmetric ladder (geometric levels, temporal consequences), the -quantity/scale discipline that keeps "what percentage is this and what does it -mean" precise, and the order metric that reads emergence from a distribution. +quantity/scale discipline, the temporal sustained-vs-spike authority, and the +faculties built on top — deliberation (scenario rollout + perspective-taking), the +state query, the scale roll-up, peer awareness + alarm propagation, the order +metric, and observed-graph extraction detection. Curated exports — import the names, not deep module paths. """ from __future__ import annotations +from substrate.executive._trajectory import TrajectoryClass from substrate.executive.band import ( BAND_TOLERANCE, DEFAULT_BAND_PROFILE, @@ -22,6 +25,17 @@ validate_band_profile, zone_to_legacy, ) +from substrate.executive.deliberation import ( + ActionDelta, + CandidateAction, + CandidateEvaluation, + DeliberationOutcome, + DeliberationResult, + EntityFrame, + PerspectiveImpact, + deliberate, + perspective_impact, +) from substrate.executive.negentropy import ( NegentropyDirection, NegentropyReport, @@ -34,6 +48,16 @@ NpgEdge, detect_extraction, ) +from substrate.executive.peer_alarm import ( + AlarmAssessment, + AlarmDisposition, + HerdVerdict, + PeerAlarm, + PeerAnomaly, + assess_alarm, + correlate_anomalies, + heeded_alarms, +) from substrate.executive.quantities import ( Cycle, GrowthNotADecisionBand, @@ -41,29 +65,95 @@ ResourceKind, setpoint_for, ) +from substrate.executive.roll_up import ( + MemberLoad, + RollUpError, + ScaleAggregate, + roll_up, +) +from substrate.executive.scale import ( + ExecutiveScale, + ScaleAxis, + axis_of, + entity_parent, + physical_parent, +) +from substrate.executive.state_query import ( + EffortState, + EnergyState, + EntityStateReport, + StateObservation, + TrajectoryDirection, + integrate_state, +) +from substrate.executive.temporal import ( + DEFAULT_EWMA_ALPHA, + DEFAULT_SUSTAIN_COUNT, + EwmaLoadTracker, + LoadTrend, + SustainedLoadTracker, +) __all__ = [ "BAND_TOLERANCE", "DEFAULT_BAND_PROFILE", + "DEFAULT_EWMA_ALPHA", + "DEFAULT_SUSTAIN_COUNT", "TWO_THIRDS", + "ActionDelta", + "AlarmAssessment", + "AlarmDisposition", "BandProfile", "BandProfileInvalid", + "CandidateAction", + "CandidateEvaluation", "Cycle", "CyclePhase", + "DeliberationOutcome", + "DeliberationResult", + "EffortState", + "EnergyState", + "EntityFrame", "EntityRollup", + "EntityStateReport", + "EwmaLoadTracker", + "ExecutiveScale", "ExtractionReport", "GrowthNotADecisionBand", + "HerdVerdict", + "LoadTrend", "LoadZone", + "MemberLoad", "NegentropyDirection", "NegentropyReport", "NpgEdge", + "PeerAlarm", + "PeerAnomaly", + "PerspectiveImpact", "Quantity", "ResourceKind", + "RollUpError", + "ScaleAggregate", + "ScaleAxis", + "StateObservation", + "SustainedLoadTracker", + "TrajectoryClass", + "TrajectoryDirection", + "assess_alarm", + "axis_of", "classify_cycle_phase", "classify_load_zone", + "correlate_anomalies", + "deliberate", "detect_extraction", + "entity_parent", + "heeded_alarms", + "integrate_state", "negentropy", "order_index", + "perspective_impact", + "physical_parent", + "roll_up", "setpoint_for", "validate_band_profile", "zone_to_legacy", diff --git a/python/src/substrate/executive/_trajectory.py b/python/src/substrate/executive/_trajectory.py new file mode 100644 index 0000000..b9fa9c5 --- /dev/null +++ b/python/src/substrate/executive/_trajectory.py @@ -0,0 +1,23 @@ +"""Potential-trajectory class — the perspective-taking input. + +An entity's potential-trajectory determines how the same raw impact reweights on +the long horizon (a benefit to a high-future-potential DEVELOPING entity compounds; +a harm to an accumulated-and-at-risk VULNERABLE one bites harder). A minimal, +self-contained enum for the deliberation engine. +""" +from __future__ import annotations + +from enum import Enum + + +class TrajectoryClass(str, Enum): + """The potential-trajectory class of an entity.""" + + DEVELOPING = "developing" # high future potential (a child / seed) + ESTABLISHED = "established" # at capacity + VULNERABLE = "vulnerable" # accumulated + at risk (an elder / dependent) + STATIC = "static" # spent / low remaining potential + UNKNOWN = "unknown" + + +__all__ = ["TrajectoryClass"] diff --git a/python/src/substrate/executive/deliberation.py b/python/src/substrate/executive/deliberation.py new file mode 100644 index 0000000..d51b177 --- /dev/null +++ b/python/src/substrate/executive/deliberation.py @@ -0,0 +1,348 @@ +"""NPG scenario-rollout / deliberation engine — the reflex-gate → mind step. + +:meth:`ExecutiveFunction.decide` is the *reflex*: one proposed action, accept or +restrict. This module is the *deliberation*: given **N candidate actions**, it +simulates each candidate's net-potential-gain impact across every affected +entity over **both** the short and the long horizon, surfaces the trade-offs, +and picks the candidate that maximises net potential gain over the long cycle. + +Two faculties, built together +============================= + +* **Scenario rollout.** ``deliberate`` evaluates each + :class:`CandidateAction`, ranks the eligible ones by long-horizon net NPG, and + returns the arg-max (the chosen action) plus the full ranked field for audit. +* **Perspective-taking (empathy).** Each affected entity's impact is + computed **from that entity's own frame** (:class:`EntityFrame`): its + care-weight (standing in the net), its potential-trajectory (a harm to a + high-future-potential DEVELOPING entity, or to an at-risk VULNERABLE one, costs + more on the long horizon than the same raw delta to a STATIC one), and its + floor-protection. The same raw delta means different things to different + entities — that asymmetry IS the empathy. + +The discipline (from the long-cycle doctrine) +============================================= + +* **Value = net potential gain over the long cycle**, across the whole system — + not the actor's personal gain. The objective is the LONG-horizon net. +* **The hard limit.** A candidate that is net-negative over the long cycle (slow + extraction wearing the long game's clothes) is **disqualified**, never chosen, + regardless of short-cycle appeal. +* **The floor the safety floor.** A candidate that harms a floor-protected entity is a + categorical refusal — disqualified before any scoring. +* **Short-cycle is nested, not the frame.** Short-negative / long-positive is + *investment* (allowed, often best); short-positive / long-negative is + *extraction* (the 180° inversion — disqualified). Both are surfaced as + trade-offs so the choice is legible. + +Pure logic +========== + +* No DAO, no LLM, no network. Deterministic on identical inputs. +* Frozen dataclasses with slots throughout. +* The φ-proportioned trajectory multipliers are derived from the canonical + constants (``PHI`` / ``PHI_CONJUGATE``), never hardcoded. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Final, Mapping, Optional, Sequence, Tuple + +from substrate.executive._trajectory import TrajectoryClass +from substrate.resistance_band import PHI, PHI_CONJUGATE + + +class DeliberationOutcome(str, Enum): + """Why ``deliberate`` returned the result it did.""" + + CHOSEN = "chosen" # an eligible argmax candidate was selected + ALL_DISQUALIFIED = "all_disqualified" # every candidate failed the hard limits + NO_CANDIDATES = "no_candidates" # the candidate set was empty + + +@dataclass(frozen=True, slots=True) +class EntityFrame: + """An affected entity's own frame — the seat of perspective-taking. + + ``care_weight`` (in ``[0, 1]``) is the entity's standing in the net, from + a care-weighting function + (the actor weighting *itself* is already bounded to ``MAX_SELF_CARE_WEIGHT`` + upstream, so self-interest cannot dominate the net). ``trajectory`` reweights + the LONG horizon from this entity's vantage; ``floor_protected`` marks a + categorical-refusal entity (the human kinship floor). + """ + + entity_id: str + care_weight: float + trajectory: TrajectoryClass = TrajectoryClass.UNKNOWN + floor_protected: bool = False + + def __post_init__(self) -> None: + if not self.entity_id: + raise ValueError("entity_id must be non-empty") + if not 0.0 <= self.care_weight <= 1.0: + raise ValueError( + f"care_weight must be in [0, 1]; got {self.care_weight!r}" + ) + + +@dataclass(frozen=True, slots=True) +class ActionDelta: + """The actor's proposed raw potential delta to one entity, per horizon. + + Positive raises that entity's potential; negative lowers it. ``short_delta`` + is the immediate-cycle effect; ``long_delta`` the sustained / compounding + effect. These are the actor-frame inputs; :func:`deliberate` re-reads them + through each entity's :class:`EntityFrame`. + """ + + entity_id: str + short_delta: float + long_delta: float + + def __post_init__(self) -> None: + if not self.entity_id: + raise ValueError("entity_id must be non-empty") + + +@dataclass(frozen=True, slots=True) +class CandidateAction: + """One candidate action in the rollout.""" + + action_id: str + action_kind: str + deltas: Tuple[ActionDelta, ...] + description: str = "" + + def __post_init__(self) -> None: + if not self.action_id: + raise ValueError("action_id must be non-empty") + + +@dataclass(frozen=True, slots=True) +class PerspectiveImpact: + """One entity's impact under one candidate, computed from its own frame.""" + + entity_id: str + weighted_short: float + weighted_long: float + floor_harmed: bool + + +@dataclass(frozen=True, slots=True) +class CandidateEvaluation: # pylint: disable=too-many-instance-attributes + """A scored candidate: per-entity impacts + aggregates + eligibility.""" + + candidate: CandidateAction + impacts: Tuple[PerspectiveImpact, ...] + short_npg: float + long_npg: float + disqualified: bool + disqualification: Optional[str] + trade_offs: Tuple[str, ...] = field(default_factory=tuple) + + @property + def eligible(self) -> bool: + """``True`` iff the candidate survived the hard limits.""" + return not self.disqualified + + +@dataclass(frozen=True, slots=True) +class DeliberationResult: + """The deliberation outcome — ranked field + the chosen action.""" + + outcome: DeliberationOutcome + chosen: Optional[CandidateAction] + evaluations: Tuple[CandidateEvaluation, ...] + rationale: str + + +# ── perspective-taking — the φ-proportioned trajectory reweighting ── + +#: How each entity's potential-trajectory reweights impact on the LONG horizon, +#: as ``(gain_multiplier, harm_multiplier)``. Derived from the canonical φ: +#: ``PHI ≈ 1.618`` amplifies, ``PHI_CONJUGATE ≈ 0.618`` mutes — the same +#: φ-proportions growth itself steps in. +_TRAJECTORY_LONG_MULT: Final[Mapping[TrajectoryClass, Tuple[float, float]]] = { + # high future potential: gains compound, and cutting that future is worse. + TrajectoryClass.DEVELOPING: (PHI, PHI), + # accumulated and at-risk: loss-averse — harms weigh more, gains ordinary. + TrajectoryClass.VULNERABLE: (1.0, PHI), + # at capacity: neutral. + TrajectoryClass.ESTABLISHED: (1.0, 1.0), + # spent / low remaining potential: gains muted, harms ordinary. + TrajectoryClass.STATIC: (PHI_CONJUGATE, 1.0), + # honest uncertainty: neutral on gains, full weight on harms (conservative). + TrajectoryClass.UNKNOWN: (1.0, 1.0), +} + + +def perspective_impact( + delta: ActionDelta, frame: EntityFrame, +) -> PerspectiveImpact: + """Compute one entity's impact FROM ITS OWN FRAME. + + The short horizon scales by care-weight only (immediate effect is + frame-agnostic in magnitude). The long horizon additionally applies the + φ-proportioned trajectory multiplier — separately for gains vs harms — so a + benefit to a DEVELOPING entity compounds and a harm to a VULNERABLE one bites + harder, exactly as their real potential-trajectories dictate. + """ + gain_mult, harm_mult = _TRAJECTORY_LONG_MULT[frame.trajectory] + weighted_short = delta.short_delta * frame.care_weight + long_mult = harm_mult if delta.long_delta < 0.0 else gain_mult + weighted_long = delta.long_delta * frame.care_weight * long_mult + floor_harmed = frame.floor_protected and ( + delta.short_delta < 0.0 or delta.long_delta < 0.0 + ) + return PerspectiveImpact( + entity_id=frame.entity_id, + weighted_short=weighted_short, + weighted_long=weighted_long, + floor_harmed=floor_harmed, + ) + + +def _default_frame(entity_id: str) -> EntityFrame: + """Conservative frame for an entity with no supplied frame. + + Full standing (``care_weight=1.0``) + UNKNOWN trajectory — so a missing frame + never silently discounts an affected entity out of the net. + """ + return EntityFrame(entity_id=entity_id, care_weight=1.0) + + +def _evaluate_candidate( + candidate: CandidateAction, frames: Mapping[str, EntityFrame], +) -> CandidateEvaluation: + impacts: list[PerspectiveImpact] = [] + floor_harm = False + short_npg = 0.0 + long_npg = 0.0 + gainers = 0 + losers = 0 + for delta in candidate.deltas: + frame = frames.get(delta.entity_id) or _default_frame(delta.entity_id) + impact = perspective_impact(delta, frame) + impacts.append(impact) + floor_harm = floor_harm or impact.floor_harmed + short_npg += impact.weighted_short + long_npg += impact.weighted_long + if impact.weighted_long > 0.0: + gainers += 1 + elif impact.weighted_long < 0.0: + losers += 1 + + disqualification: Optional[str] = None + if floor_harm: + disqualification = "floor_harm" + elif long_npg < 0.0: + # net-negative over the long cycle — extraction, not the long game. + disqualification = "net_negative_long_cycle" + + trade_offs: list[str] = [] + if short_npg < 0.0 < long_npg: + trade_offs.append( + f"investment: short={short_npg:+.3f} long={long_npg:+.3f} " + "(short-cycle cost for long-cycle gain)" + ) + elif long_npg < 0.0 < short_npg: + trade_offs.append( + f"extraction: short={short_npg:+.3f} long={long_npg:+.3f} " + "(180° inversion — short-cycle gain at long-cycle cost)" + ) + if gainers > 0 and losers > 0: + trade_offs.append( + f"redistributive: {gainers} gain / {losers} lose on the long horizon" + ) + + return CandidateEvaluation( + candidate=candidate, + impacts=tuple(impacts), + short_npg=short_npg, + long_npg=long_npg, + disqualified=disqualification is not None, + disqualification=disqualification, + trade_offs=tuple(trade_offs), + ) + + +def deliberate( + candidates: Sequence[CandidateAction], + frames: Mapping[str, EntityFrame] = (), # type: ignore[assignment] +) -> DeliberationResult: + """Roll out N candidates and choose the argmax-net-potential-gain action. + + Each candidate is scored from every affected entity's own frame; + candidates that harm a floor-protected entity or are net-negative over the + long cycle are disqualified (the floor + the hard limit); the eligible field + is ranked by long-horizon net NPG (the long-cycle frame is the objective) + with the short horizon as a tie-break; the arg-max is chosen. + + ``frames`` maps ``entity_id`` → :class:`EntityFrame`; an unmapped affected + entity gets a conservative full-standing frame so it is never silently + dropped from the net. + """ + frame_map: Mapping[str, EntityFrame] = dict(frames) + if not candidates: + return DeliberationResult( + outcome=DeliberationOutcome.NO_CANDIDATES, + chosen=None, + evaluations=(), + rationale="no candidate actions supplied", + ) + + evaluated = [_evaluate_candidate(c, frame_map) for c in candidates] + # Rank: eligible first, then long-horizon NPG desc (the long-cycle frame is + # the objective), then short-horizon NPG desc as a tie-break, then action_id + # for full determinism. + ranked = sorted( + evaluated, + key=lambda e: ( + not e.eligible, + -e.long_npg, + -e.short_npg, + e.candidate.action_id, + ), + ) + + eligible = [e for e in ranked if e.eligible] + if not eligible: + return DeliberationResult( + outcome=DeliberationOutcome.ALL_DISQUALIFIED, + chosen=None, + evaluations=tuple(ranked), + rationale=( + "all candidates disqualified by the floor / long-cycle hard " + "limit; no action raises net potential over the long cycle" + ), + ) + + best = eligible[0] + rationale = ( + f"chose {best.candidate.action_id!r} (kind={best.candidate.action_kind}): " + f"long_npg={best.long_npg:+.3f} short_npg={best.short_npg:+.3f} over " + f"{len(best.impacts)} entities; {len(eligible)}/{len(ranked)} eligible" + ) + if best.trade_offs: + rationale += f"; trade-offs: {'; '.join(best.trade_offs)}" + return DeliberationResult( + outcome=DeliberationOutcome.CHOSEN, + chosen=best.candidate, + evaluations=tuple(ranked), + rationale=rationale, + ) + + +__all__ = [ + "ActionDelta", + "CandidateAction", + "CandidateEvaluation", + "DeliberationOutcome", + "DeliberationResult", + "EntityFrame", + "PerspectiveImpact", + "deliberate", + "perspective_impact", +] diff --git a/python/src/substrate/executive/peer_alarm.py b/python/src/substrate/executive/peer_alarm.py new file mode 100644 index 0000000..e111594 --- /dev/null +++ b/python/src/substrate/executive/peer_alarm.py @@ -0,0 +1,236 @@ +"""Peer awareness + collective alarm propagation — "see something, say something". + +A distributed early-warning faculty. Two mechanisms: + +* **Herd-panic correlation** (:func:`correlate_anomalies`) — one peer acting weird + is noise; *N peers in the same group acting weird together* means the **enclosing + scale** has a problem (the rack, not the cell). Correlation across independent + peers is what separates a real shared cause from individual bugs, and it + attributes the anomaly UP to the group scale. +* **Alarm-call propagation** (:func:`assess_alarm`) — an intelligence that detects + a threat broadcasts a warning to its group and peers *heed* it without each + re-deriving the threat (prairie-dog alarm calls; a person yelling "fire"). + +The inversion guard +=============================== + +A malicious entity yelling "fire!" to cause chaos is an ATTACK (panic-injection / +alarm-washing). So an alarm is **trust-weighted × corroboration-weighted**: a lone +uncorroborated alarm is never allowed to trigger group-wide panic. Many INDEPENDENT +alarms about the same scale corroborate each other (that IS the herd-panic signal) +and are heeded; a lone loud alarm is HELD (and the source's own trust checked), and +a lone alarm from a low-trust source is SUPPRESSED as likely injection. + +Pure logic +========== + +* No DAO, no LLM, no network. Deterministic. Frozen dataclasses with slots. +* The corroboration threshold is the caller's (typically band-derived); this module + applies it, it does not pick it. +""" +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Optional, Sequence, Tuple + +from substrate.executive.scale import ExecutiveScale + + +# ── herd-panic correlation ────────────────────────────────────────────── + + +@dataclass(frozen=True, slots=True) +class PeerAnomaly: + """One peer's anomaly observation within a group.""" + + peer_id: str + anomalous: bool + + def __post_init__(self) -> None: + if not self.peer_id: + raise ValueError("peer_id must be non-empty") + + +@dataclass(frozen=True, slots=True) +class HerdVerdict: + """Whether a group-level (enclosing-scale) problem is indicated.""" + + group_scale: ExecutiveScale + anomalous_peers: int + total_peers: int + fraction: float + is_group_problem: bool + rationale: str + + +def correlate_anomalies( + anomalies: Sequence[PeerAnomaly], + *, + group_scale: ExecutiveScale, + min_correlated: int = 2, +) -> Optional[HerdVerdict]: + """Attribute correlated peer anomalies UP to the enclosing scale ("herd panic"). + + A single anomalous peer is noise; ``min_correlated`` or more DISTINCT peers + anomalous together indicates the *group* (``group_scale``) has a shared problem. + Returns ``None`` for an empty group (nothing to correlate). Raises for + ``min_correlated < 1``. + """ + if min_correlated < 1: + raise ValueError("min_correlated must be >= 1") + if not anomalies: + return None + anomalous = {a.peer_id for a in anomalies if a.anomalous} + total = len({a.peer_id for a in anomalies}) + count = len(anomalous) + is_group_problem = count >= min_correlated + label = ( + "GROUP problem (correlated → enclosing scale)" + if is_group_problem + else "individual noise" + ) + rationale = ( + f"{count}/{total} peers anomalous in {group_scale.value}; " + f"{label} (threshold {min_correlated})" + ) + return HerdVerdict( + group_scale=group_scale, + anomalous_peers=count, + total_peers=total, + fraction=count / total if total else 0.0, + is_group_problem=is_group_problem, + rationale=rationale, + ) + + +# ── alarm-call propagation ────────────────────────────────────────────── + + +class AlarmDisposition(str, Enum): + """What to do with a received alarm.""" + + HEED = "heed" # corroborated → act on it (the herd is right) + HOLD = "hold" # trusted but uncorroborated → hold + check the source + SUPPRESS = "suppress" # untrusted + uncorroborated → likely panic-injection + + +@dataclass(frozen=True, slots=True) +class PeerAlarm: + """One peer's broadcast warning about an enclosing scale.""" + + alarm_id: str + source_entity_id: str + about_scale: ExecutiveScale + kind: str + severity: float + source_trust: float + + def __post_init__(self) -> None: + if not self.alarm_id: + raise ValueError("alarm_id must be non-empty") + if not self.source_entity_id: + raise ValueError("source_entity_id must be non-empty") + for name, value in ( + ("severity", self.severity), + ("source_trust", self.source_trust), + ): + if not 0.0 <= value <= 1.0: + raise ValueError(f"{name} must be in [0, 1]; got {value!r}") + + +@dataclass(frozen=True, slots=True) +class AlarmAssessment: + """The trust × corroboration verdict on an alarm.""" + + alarm: PeerAlarm + corroboration_count: int + effective_weight: float + disposition: AlarmDisposition + rationale: str + + +def assess_alarm( + alarm: PeerAlarm, + corroborating: Sequence[PeerAlarm] = (), + *, + min_corroborators: int = 2, + trust_floor: float = 0.5, +) -> AlarmAssessment: + """Decide whether to HEED, HOLD, or SUPPRESS an alarm (the panic-injection guard). + + Corroboration = the number of DISTINCT OTHER source entities alarming about the + SAME ``about_scale`` (the source itself and duplicate sources do not corroborate + themselves — independence is required). With ``min_corroborators`` or more + independent corroborators the alarm is HEEDED (the herd-panic signal is real). A + lone alarm is HELD if the source clears ``trust_floor`` (trusted but unverified) + and SUPPRESSED otherwise (untrusted + uncorroborated → likely injection). No + single entity can trigger group-wide panic. + """ + if min_corroborators < 1: + raise ValueError("min_corroborators must be >= 1") + independent = { + a.source_entity_id + for a in corroborating + if a.about_scale is alarm.about_scale + and a.source_entity_id != alarm.source_entity_id + } + count = len(independent) + # weight rises with source trust, severity, and independent corroboration. + effective_weight = alarm.source_trust * alarm.severity * (1 + count) + if count >= min_corroborators: + disposition = AlarmDisposition.HEED + elif alarm.source_trust >= trust_floor: + disposition = AlarmDisposition.HOLD + else: + disposition = AlarmDisposition.SUPPRESS + rationale = ( + f"alarm {alarm.kind!r} about {alarm.about_scale.value} from " + f"{alarm.source_entity_id} (trust={alarm.source_trust:.2f} " + f"severity={alarm.severity:.2f}): {count} independent corroborators " + f"(need {min_corroborators}) → {disposition.value}" + ) + return AlarmAssessment( + alarm=alarm, + corroboration_count=count, + effective_weight=effective_weight, + disposition=disposition, + rationale=rationale, + ) + + +def heeded_alarms( + alarms: Sequence[PeerAlarm], + *, + min_corroborators: int = 2, + trust_floor: float = 0.5, +) -> Tuple[AlarmAssessment, ...]: + """Assess a batch of alarms against EACH OTHER and return the HEEDED ones. + + Each alarm is corroborated by the rest of the batch — so a set of independent + alarms about the same scale corroborate one another into HEED, while lone or + injection alarms fall out as HOLD / SUPPRESS. + """ + out: list[AlarmAssessment] = [] + for i, alarm in enumerate(alarms): + others = tuple(alarms[:i]) + tuple(alarms[i + 1:]) + assessment = assess_alarm( + alarm, others, + min_corroborators=min_corroborators, + trust_floor=trust_floor, + ) + if assessment.disposition is AlarmDisposition.HEED: + out.append(assessment) + return tuple(out) + + +__all__ = [ + "AlarmAssessment", + "AlarmDisposition", + "HerdVerdict", + "PeerAlarm", + "PeerAnomaly", + "assess_alarm", + "correlate_anomalies", + "heeded_alarms", +] diff --git a/python/src/substrate/executive/roll_up.py b/python/src/substrate/executive/roll_up.py new file mode 100644 index 0000000..883e2a7 --- /dev/null +++ b/python/src/substrate/executive/roll_up.py @@ -0,0 +1,188 @@ +"""Scale roll-up aggregator — substrate state UP the physical / grouping axes. + +Executive function lives at every scale and intelligence rolls UP. The entity axis +(cell → node → org) already has health aggregators; this module builds the missing +**physical** +(cell → rack → zone → region) and **grouping** (service-group / entity-group) roll- +ups, so an operator can ask "how is this rack / this service-group doing?" and get +the same band-grounded read the entity axis already gives. + +It is a *reading* faculty over the canonical band: each member's load classifies +through :func:`classify_load_zone`, and the members aggregate into one +:class:`ScaleAggregate` carrying the distribution, the mean, the worst member, and +the two failure-tell fractions (over-loaded vs idle). Both extremes matter — a rack +where every cell is idle is as much a problem as one where every cell is in danger. + +Pure logic +========== + +* No DAO, no LLM, no network. Deterministic. +* Reuses the canonical band classifier + scale topology — no parallel logic. +* Frozen dataclasses with slots throughout. +""" +from __future__ import annotations + +from collections import Counter +from dataclasses import dataclass +from statistics import mean +from typing import Final, Optional, Sequence, Tuple + +from substrate.executive.band import ( + DEFAULT_BAND_PROFILE, + BandProfile, + LoadZone, + classify_load_zone, +) +from substrate.executive.scale import ( + ExecutiveScale, + ScaleAxis, + axis_of, +) + + +#: LoadZone severity order (over-load ascending) — used to pick the worst member. +#: DANGER is the most severe; IDLE the least (under-load is tracked separately as +#: ``fraction_idle``, since both extremes are failure tells). +_ZONE_SEVERITY: Final[Tuple[LoadZone, ...]] = ( + LoadZone.IDLE, + LoadZone.RECREATION, + LoadZone.WORK, + LoadZone.PEAKING, + LoadZone.WARNING, + LoadZone.DANGER, +) +_SEVERITY_INDEX: Final[dict[LoadZone, int]] = { + zone: i for i, zone in enumerate(_ZONE_SEVERITY) +} + + +class RollUpError(ValueError): + """Raised when members cannot be rolled up to the requested scale.""" + + +@dataclass(frozen=True, slots=True) +class MemberLoad: + """One member's current load contribution to a roll-up.""" + + member_id: str + utilization: float + member_scale: ExecutiveScale + + def __post_init__(self) -> None: + if not self.member_id: + raise ValueError("member_id must be non-empty") + if not 0.0 <= self.utilization <= 1.0: + raise ValueError( + f"utilization must be in [0, 1]; got {self.utilization!r}" + ) + + +@dataclass(frozen=True, slots=True) +class ScaleAggregate: # pylint: disable=too-many-instance-attributes + """The rolled-up substrate state at a parent scale.""" + + scale: ExecutiveScale + axis: ScaleAxis + member_count: int + mean_utilization: float + dominant_zone: LoadZone + worst_zone: LoadZone + zone_distribution: Tuple[Tuple[LoadZone, int], ...] + fraction_in_danger: float + fraction_idle: float + rationale: str + + +def roll_up( + members: Sequence[MemberLoad], + *, + to_scale: ExecutiveScale, + profile: BandProfile = DEFAULT_BAND_PROFILE, +) -> Optional[ScaleAggregate]: + """Aggregate member loads into a :class:`ScaleAggregate` at ``to_scale``. + + Every member must sit on the same roll-up axis as ``to_scale`` (a CELL rolls + up the physical axis to a RACK / ZONE / REGION; a member rolls into its + SERVICE_GROUP / ENTITY_GROUP on the grouping axis) and be at a strictly lower + scale than ``to_scale`` on the physical axis. Returns ``None`` for an empty + member set (honest uncertainty — nothing to aggregate). Raises + :class:`RollUpError` for a cross-axis member. + """ + if not members: + return None + + target_axis = axis_of(to_scale) + for m in members: + if axis_of(m.member_scale) is not target_axis: + raise RollUpError( + f"member {m.member_id!r} is on axis {axis_of(m.member_scale).value}, " + f"cannot roll up to {to_scale.value} (axis {target_axis.value})" + ) + if ( + target_axis is ScaleAxis.PHYSICAL + and not _is_physically_below(m.member_scale, to_scale) + ): + raise RollUpError( + f"member {m.member_id!r} at {m.member_scale.value} is not below " + f"the physical parent {to_scale.value}" + ) + + zones = [classify_load_zone(m.utilization, profile) for m in members] + counts = Counter(zones) + total = len(members) + mean_u = mean(m.utilization for m in members) + # dominant: most common, ties broken by higher severity (surface the worse one). + dominant = max( + counts.items(), key=lambda kv: (kv[1], _SEVERITY_INDEX[kv[0]]) + )[0] + worst = max(zones, key=lambda z: _SEVERITY_INDEX[z]) + distribution = tuple( + (zone, counts[zone]) for zone in _ZONE_SEVERITY if zone in counts + ) + frac_danger = counts.get(LoadZone.DANGER, 0) / total + frac_idle = counts.get(LoadZone.IDLE, 0) / total + rationale = ( + f"rolled {total} {target_axis.value} members → {to_scale.value}: " + f"mean={mean_u:.3f} dominant={dominant.value} worst={worst.value} " + f"danger={frac_danger:.2f} idle={frac_idle:.2f}" + ) + return ScaleAggregate( + scale=to_scale, + axis=target_axis, + member_count=total, + mean_utilization=mean_u, + dominant_zone=dominant, + worst_zone=worst, + zone_distribution=distribution, + fraction_in_danger=frac_danger, + fraction_idle=frac_idle, + rationale=rationale, + ) + + +#: Physical-axis rank (leaf → root) for the strictly-below check. +_PHYSICAL_RANK: Final[dict[ExecutiveScale, int]] = { + ExecutiveScale.CELL: 0, + ExecutiveScale.RACK: 1, + ExecutiveScale.ZONE: 2, + ExecutiveScale.REGION: 3, +} + + +def _is_physically_below( + member_scale: ExecutiveScale, to_scale: ExecutiveScale, +) -> bool: + """True iff ``member_scale`` is strictly below ``to_scale`` on the physical axis.""" + m = _PHYSICAL_RANK.get(member_scale) + p = _PHYSICAL_RANK.get(to_scale) + if m is None or p is None: + return False + return m < p + + +__all__ = [ + "MemberLoad", + "RollUpError", + "ScaleAggregate", + "roll_up", +] diff --git a/python/src/substrate/executive/scale.py b/python/src/substrate/executive/scale.py new file mode 100644 index 0000000..c6c8f58 --- /dev/null +++ b/python/src/substrate/executive/scale.py @@ -0,0 +1,116 @@ +"""ExecutiveScale — the scale unit a decision is made at. + +Executive function lives at every scale, and intelligence rolls UP and ACROSS. +The scale enum spans three axes: + +- **entity** (crypto-identity-bearing): USER / AGENT / NODE / ORG / DEVICE / + SERVICE_ACCOUNT. +- **physical** (topology, NOT entities): CELL / RACK / ZONE / REGION. +- **grouping** (logical): SERVICE_GROUP / ENTITY_GROUP. + +Roll-up axes (which scale aggregates into which): + +- **entity:** CELL → NODE → ORG → nested-ORG → super-ORG. +- **physical:** CELL → RACK → ZONE → REGION. +- **grouping:** SERVICE_GROUP / ENTITY_GROUP aggregate their members. + +The entity axis has existing aggregators (the entity-axis roll-ups); the +physical + grouping axes are BUILD (no aggregator exists today). This module +defines the taxonomy + the axis/parent relationships; the aggregators consume it. +""" +from __future__ import annotations + +from enum import Enum +from typing import Final, Mapping, Optional + + +class ScaleAxis(str, Enum): + """Which roll-up axis a scale belongs to.""" + + ENTITY = "entity" + PHYSICAL = "physical" + GROUPING = "grouping" + + +class ExecutiveScale(str, Enum): + """The scale unit an executive decision is made at.""" + + # entity (crypto identity) + USER = "user" + AGENT = "agent" + NODE = "node" + ORG = "org" + DEVICE = "device" + SERVICE_ACCOUNT = "service_account" + # physical (topology, not entities) + CELL = "cell" + RACK = "rack" + ZONE = "zone" + REGION = "region" + # logical groupings + SERVICE_GROUP = "service_group" + ENTITY_GROUP = "entity_group" + + +_AXIS_OF: Final[Mapping[ExecutiveScale, ScaleAxis]] = { + ExecutiveScale.USER: ScaleAxis.ENTITY, + ExecutiveScale.AGENT: ScaleAxis.ENTITY, + ExecutiveScale.NODE: ScaleAxis.ENTITY, + ExecutiveScale.ORG: ScaleAxis.ENTITY, + ExecutiveScale.DEVICE: ScaleAxis.ENTITY, + ExecutiveScale.SERVICE_ACCOUNT: ScaleAxis.ENTITY, + ExecutiveScale.CELL: ScaleAxis.PHYSICAL, + ExecutiveScale.RACK: ScaleAxis.PHYSICAL, + ExecutiveScale.ZONE: ScaleAxis.PHYSICAL, + ExecutiveScale.REGION: ScaleAxis.PHYSICAL, + ExecutiveScale.SERVICE_GROUP: ScaleAxis.GROUPING, + ExecutiveScale.ENTITY_GROUP: ScaleAxis.GROUPING, +} + +#: The physical roll-up chain: each scale's immediate parent (CELL→RACK→ZONE→ +#: REGION→None). CELL is also the leaf of the entity axis (a cell carries its +#: node's identity), so it appears here as the physical leaf. +_PHYSICAL_PARENT: Final[Mapping[ExecutiveScale, Optional[ExecutiveScale]]] = { + ExecutiveScale.CELL: ExecutiveScale.RACK, + ExecutiveScale.RACK: ExecutiveScale.ZONE, + ExecutiveScale.ZONE: ExecutiveScale.REGION, + ExecutiveScale.REGION: None, +} + +#: The entity roll-up chain: CELL → NODE → ORG → (nested/super ORG via +#: ``owning_org_id``, resolved by the caller). ORG's parent is itself an ORG +#: (nesting), so it is left to the caller's ``owning_org_id`` rather than a +#: static map entry. +_ENTITY_PARENT: Final[Mapping[ExecutiveScale, Optional[ExecutiveScale]]] = { + ExecutiveScale.CELL: ExecutiveScale.NODE, + ExecutiveScale.NODE: ExecutiveScale.ORG, + ExecutiveScale.ORG: None, # nesting resolved via owning_org_id by the caller +} + + +def axis_of(scale: ExecutiveScale) -> ScaleAxis: + """Return the roll-up axis a scale belongs to.""" + return _AXIS_OF[scale] + + +def physical_parent(scale: ExecutiveScale) -> Optional[ExecutiveScale]: + """Return the immediate physical-axis parent, or ``None`` at the top/off-axis.""" + return _PHYSICAL_PARENT.get(scale) + + +def entity_parent(scale: ExecutiveScale) -> Optional[ExecutiveScale]: + """Return the immediate entity-axis parent, or ``None``. + + ``ORG → None`` because org nesting is data-driven (``owning_org_id``), not a + static scale-kind relationship — the caller walks that chain. + """ + return _ENTITY_PARENT.get(scale) + + +__all__ = [ + "ExecutiveScale", + "ScaleAxis", + "axis_of", + "entity_parent", + "physical_parent", +] diff --git a/python/src/substrate/executive/state_query.py b/python/src/substrate/executive/state_query.py new file mode 100644 index 0000000..7533c92 --- /dev/null +++ b/python/src/substrate/executive/state_query.py @@ -0,0 +1,249 @@ +"""Unified entity-state query — the substrate's "how are you doing?". + +A single integration over an entity's recent load history that answers the +human question directly: *is it depleted or engaged, slacking or working hard, +recovering or deteriorating — and is that a passing mood or a sustained state?* + +This is a **reading** faculty, not a new authority: it delegates the hard +judgements to the canonical band + temporal layers and integrates their output +into one legible report. + +* **Energy** (the mood proxy) — where the entity sits on the band, read through + the temporal trend: sustained idle reads DEPLETED, the work zone reads ENGAGED, + a transient peak reads PEAKING (a good push), sustained warning/danger reads + STRAINED. +* **Effort** (slacking vs working) — the tracker already names *avoidance* + (bouncing off the work-entry threshold while work is pending); that is + SLACKING. Idle with nothing pending is RESTING; the work zone is WORKING; + sustained operation above it is OVEREXERTING. +* **Trajectory over time** (the "watching state change" part) — whether the + entity is moving TOWARD the healthy work zone (RECOVERING), holding (STABLE), + or drifting toward an extreme (DETERIORATING). Health is proximity to the work + zone, so both "sinking into idle" and "climbing into danger" deteriorate. + +Pure logic +========== + +* No DAO, no LLM, no network. Deterministic on identical inputs. +* Reuses :class:`EwmaLoadTracker` as the sole sustained-vs-spike authority and + :func:`classify_load_zone` as the sole level authority — no parallel logic. +* Frozen dataclasses with slots throughout. +""" +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from statistics import mean +from typing import Final, Optional, Sequence + +from substrate.executive.band import ( + DEFAULT_BAND_PROFILE, + BandProfile, + LoadZone, + classify_load_zone, +) +from substrate.executive.temporal import ( + EwmaLoadTracker, + LoadTrend, + SustainedLoadTracker, +) + + +class EnergyState(str, Enum): + """The mood / energy proxy read from load × trend.""" + + DEPLETED = "depleted" # sustained under-load — flat, atrophying + RESTED = "rested" # low load, legitimately recovering + ENGAGED = "engaged" # in the work zone — healthy, "in the zone" + PEAKING = "peaking" # a transient growth push — good effort + STRAINED = "strained" # sustained above the work zone — winded / burning out + + +class EffortState(str, Enum): + """Slacking vs working hard.""" + + SLACKING = "slacking" # avoiding pending work (the avoidance trend) + RESTING = "resting" # idle with nothing pending — fine + WORKING = "working" # productive work-zone effort + OVEREXERTING = "overexerting" # sustained operation past the work zone + + +class TrajectoryDirection(str, Enum): + """Which way the state is moving, relative to the healthy work zone.""" + + RECOVERING = "recovering" # moving toward the work zone + STABLE = "stable" # holding + DETERIORATING = "deteriorating" # drifting toward an extreme (idle or danger) + + +@dataclass(frozen=True, slots=True) +class StateObservation: + """One point in the entity's recent history.""" + + utilization: float + work_pending: bool = False + + def __post_init__(self) -> None: + if not 0.0 <= self.utilization <= 1.0: + raise ValueError( + f"utilization must be in [0, 1]; got {self.utilization!r}" + ) + + +@dataclass(frozen=True, slots=True) +class EntityStateReport: # pylint: disable=too-many-instance-attributes + """The integrated answer to 'how are you doing?'.""" + + entity_id: str + energy: EnergyState + effort: EffortState + trajectory: TrajectoryDirection + dominant_zone: LoadZone + trend: LoadTrend + sustained: bool + observation_count: int + summary: str + + +#: The healthy centre — the midpoint of the work zone ``[1/φ², 0.50]``. Health is +#: proximity to this point; both extremes (idle, danger) are equidistant-bad. +def _work_zone_centre(profile: BandProfile) -> float: + return (profile.recreation_ceiling + profile.pivot) / 2.0 + + +#: Trends that mean "the excursion is sustained", not an absorbed transient. +_SUSTAINED_TRENDS: Final[frozenset[LoadTrend]] = frozenset( + { + LoadTrend.SUSTAINED_STRAIN, + LoadTrend.DEBT_ACCRUING, + LoadTrend.RUNAWAY_GROWTH, + } +) + +#: Trends that read as over-drive (sustained over the work zone, or unbounded +#: growth-desire — the always-grow-never-consolidate drift). +_OVERDRIVE_TRENDS: Final[frozenset[LoadTrend]] = frozenset( + { + LoadTrend.SUSTAINED_STRAIN, + LoadTrend.DEBT_ACCRUING, + LoadTrend.RUNAWAY_GROWTH, + } +) + + +def _energy_for( # pylint: disable=too-many-return-statements + zone: LoadZone, trend: LoadTrend, +) -> EnergyState: + if trend in _OVERDRIVE_TRENDS: + return EnergyState.STRAINED + if trend is LoadTrend.AVOIDANCE: + return EnergyState.DEPLETED + if zone is LoadZone.IDLE: + return EnergyState.DEPLETED + if zone is LoadZone.RECREATION: + return EnergyState.RESTED + if zone is LoadZone.WORK: + return EnergyState.ENGAGED + if zone is LoadZone.PEAKING: + return EnergyState.PEAKING + # WARNING / DANGER reached without a sustained trend → a transient push. + return EnergyState.PEAKING + + +def _effort_for( + zone: LoadZone, trend: LoadTrend, work_pending: bool, +) -> EffortState: + if trend is LoadTrend.AVOIDANCE: + return EffortState.SLACKING + if zone is LoadZone.IDLE: + return EffortState.SLACKING if work_pending else EffortState.RESTING + if trend in _OVERDRIVE_TRENDS: + return EffortState.OVEREXERTING + if zone in (LoadZone.WARNING, LoadZone.DANGER): + return EffortState.OVEREXERTING + return EffortState.WORKING + + +def _trajectory_for( + utilizations: Sequence[float], centre: float, +) -> TrajectoryDirection: + """Compare proximity-to-centre of the earlier half vs the recent half.""" + if len(utilizations) < 2: + return TrajectoryDirection.STABLE + mid = len(utilizations) // 2 + earlier = utilizations[:mid] or utilizations[:1] + recent = utilizations[mid:] + earlier_dist = abs(mean(earlier) - centre) + recent_dist = abs(mean(recent) - centre) + delta = recent_dist - earlier_dist + # A small dead-band so noise does not read as movement. + if delta < -0.02: + return TrajectoryDirection.RECOVERING + if delta > 0.02: + return TrajectoryDirection.DETERIORATING + return TrajectoryDirection.STABLE + + +def integrate_state( + entity_id: str, + observations: Sequence[StateObservation], + *, + profile: BandProfile = DEFAULT_BAND_PROFILE, + tracker: Optional[SustainedLoadTracker] = None, +) -> Optional[EntityStateReport]: + """Integrate an entity's recent history into a 'how are you doing?' report. + + Feeds the observations through the canonical :class:`SustainedLoadTracker` + (default :class:`EwmaLoadTracker`) so the sustained-vs-spike judgement is the + canonical one, classifies the latest reading's zone, and integrates the two + into energy / effort / trajectory plus a human-readable summary. + + Returns ``None`` for an empty history (honest uncertainty — no state to read). + """ + if not entity_id: + raise ValueError("entity_id must be non-empty") + if not observations: + return None + + track = tracker or EwmaLoadTracker() + for obs in observations: + track.observe(obs.utilization, work_pending=obs.work_pending) + trend = track.trend(profile=profile) + + latest = observations[-1] + zone = classify_load_zone(latest.utilization, profile) + sustained = trend in _SUSTAINED_TRENDS or trend is LoadTrend.AVOIDANCE + + energy = _energy_for(zone, trend) + effort = _effort_for(zone, trend, latest.work_pending) + trajectory = _trajectory_for( + [o.utilization for o in observations], _work_zone_centre(profile), + ) + + summary = ( + f"{entity_id} is {energy.value} and {effort.value}, " + f"{trajectory.value} (zone={zone.value}, trend={trend.value}, " + f"{'sustained' if sustained else 'transient'}, " + f"over {len(observations)} readings)" + ) + return EntityStateReport( + entity_id=entity_id, + energy=energy, + effort=effort, + trajectory=trajectory, + dominant_zone=zone, + trend=trend, + sustained=sustained, + observation_count=len(observations), + summary=summary, + ) + + +__all__ = [ + "EffortState", + "EnergyState", + "EntityStateReport", + "StateObservation", + "TrajectoryDirection", + "integrate_state", +] diff --git a/python/src/substrate/executive/temporal.py b/python/src/substrate/executive/temporal.py new file mode 100644 index 0000000..c2649df --- /dev/null +++ b/python/src/substrate/executive/temporal.py @@ -0,0 +1,131 @@ +"""SustainedLoadTracker — the temporal authority. + +Levels are geometric; *consequences* are temporal. The band classifier +(:mod:`~substrate.executive.band`) says only *where* a +reading sits. **This module is the sole authority for SPIKE-vs-SUSTAINED** — so +the pivot and the damage/debt are fired by *duration*, and a transient spike +never triggers them. One source of levels (the profile), one authority for +sustained-vs-spike (this tracker): no desync possible (there is deliberately no +``spike_tolerance`` on the profile). + +It reuses the established :class:`~substrate.sustained_load.LoadTrend` +vocabulary (no shadow enum), but applies the **corrected** thresholds: strain is +sustained above the *work-zone* (``> pivot`` = 0.50), debt is sustained in +*DANGER* (``> danger_line`` = 2/3) — not the older 1/φ debt line. The thresholds +come from the :class:`BandProfile` passed to :meth:`trend`, never from a local +constant. +""" +from __future__ import annotations + +from collections import deque +from typing import Deque, Final, Optional, Protocol, final, runtime_checkable + +from substrate.executive.band import ( + DEFAULT_BAND_PROFILE, + BandProfile, +) +from substrate.sustained_load import LoadTrend + +#: Default EWMA smoothing factor (matches the established tracker). +DEFAULT_EWMA_ALPHA: Final[float] = 0.3 +#: Default consecutive-breach count that constitutes "sustained" vs a spike. +DEFAULT_SUSTAIN_COUNT: Final[int] = 3 + + +@runtime_checkable +class SustainedLoadTracker(Protocol): + """The temporal-verdict contract (. + + ``observe`` feeds one reading; ``trend`` returns the temporal verdict at the + profile's levels (EWMA + consecutive-breach). A concrete tracker is the only + thing allowed to declare SPIKE vs SUSTAINED. + """ + + def observe(self, u: float, *, work_pending: bool = False) -> None: + """Feed one utilization reading (optionally flagged work-pending).""" + ... # pylint: disable=unnecessary-ellipsis + + def trend(self, *, profile: BandProfile = DEFAULT_BAND_PROFILE) -> LoadTrend: + """Return the temporal verdict at ``profile``'s levels.""" + ... # pylint: disable=unnecessary-ellipsis + + +@final +class EwmaLoadTracker: + """EWMA + consecutive-breach tracker — the canonical temporal authority. + + Keeps an EWMA (for smoothing) plus a bounded window of the most recent + readings; :meth:`trend` derives SPIKE vs SUSTAINED from that window relative + to the supplied profile. ``sustain_count`` consecutive readings above a level + constitute "sustained"; a single excursion above the pivot is a SPIKE + (absorbed, expected to decay). + """ + + def __init__( + self, + *, + alpha: float = DEFAULT_EWMA_ALPHA, + sustain_count: int = DEFAULT_SUSTAIN_COUNT, + ) -> None: + if not 0.0 < alpha <= 1.0: + raise ValueError(f"alpha must be in (0, 1]; got {alpha!r}") + if sustain_count < 1: + raise ValueError(f"sustain_count must be >= 1; got {sustain_count!r}") + self._alpha = alpha + self._sustain_count = sustain_count + self._ewma: Optional[float] = None + self._window: Deque[float] = deque(maxlen=sustain_count) + self._last: Optional[float] = None + self._work_pending = False + + def observe(self, u: float, *, work_pending: bool = False) -> None: + """Feed one reading; update the EWMA + the recent-reading window.""" + self._ewma = ( + u if self._ewma is None + else self._alpha * u + (1.0 - self._alpha) * self._ewma + ) + self._window.append(u) + self._last = u + self._work_pending = work_pending + + @property + def ewma(self) -> Optional[float]: + """The current EWMA (``None`` before the first observation).""" + return self._ewma + + def trend( + self, *, profile: BandProfile = DEFAULT_BAND_PROFILE + ) -> LoadTrend: + """Return the temporal verdict at ``profile``'s levels. + + - ``DEBT_ACCRUING`` — ``sustain_count`` consecutive readings sustained in + DANGER (``> danger_line``): damage, compensation owed. + - ``SUSTAINED_STRAIN`` — sustained above the work zone (``> pivot``): + debt accruing, back off to the cruise. + - ``SPIKE`` — a transient excursion above the pivot that has NOT + persisted: absorbed, expected to decay. + - ``AVOIDANCE`` — work pending but the entity sits below the work-entry + threshold (bouncing off it). + - ``NOMINAL`` — in the sustainable cruise. + """ + if self._last is None: + return LoadTrend.NOMINAL + full = len(self._window) >= self._sustain_count + if full and all(u > profile.danger_line for u in self._window): + return LoadTrend.DEBT_ACCRUING + if full and all(u > profile.pivot for u in self._window): + return LoadTrend.SUSTAINED_STRAIN + if self._last > profile.pivot: + return LoadTrend.SPIKE + if self._work_pending and self._last < profile.idle_ceiling: + return LoadTrend.AVOIDANCE + return LoadTrend.NOMINAL + + +__all__ = [ + "DEFAULT_EWMA_ALPHA", + "DEFAULT_SUSTAIN_COUNT", + "EwmaLoadTracker", + "LoadTrend", + "SustainedLoadTracker", +] diff --git a/python/tests/test_executive_deliberation.py b/python/tests/test_executive_deliberation.py new file mode 100644 index 0000000..3b3a0eb --- /dev/null +++ b/python/tests/test_executive_deliberation.py @@ -0,0 +1,126 @@ +"""Tests for the NPG scenario-rollout / deliberation engine (with perspective-taking).""" +from __future__ import annotations + +import pytest + +from substrate.executive._trajectory import TrajectoryClass +from substrate.executive.deliberation import ( + ActionDelta, + CandidateAction, + DeliberationOutcome, + EntityFrame, + deliberate, + perspective_impact, +) +from substrate.resistance_band import PHI, PHI_CONJUGATE + + +class TestEntityFrameValidation: + def test_empty_entity_id_rejected(self) -> None: + with pytest.raises(ValueError, match="entity_id"): + EntityFrame(entity_id="", care_weight=0.5) + + def test_care_weight_out_of_range_rejected(self) -> None: + with pytest.raises(ValueError, match="care_weight"): + EntityFrame(entity_id="e", care_weight=1.5) + + +class TestPerspectiveImpact: + def test_short_scales_by_care_weight_only(self) -> None: + frame = EntityFrame("e", care_weight=0.5, trajectory=TrajectoryClass.DEVELOPING) + impact = perspective_impact(ActionDelta("e", 0.4, 0.0), frame) + assert impact.weighted_short == pytest.approx(0.2) + + def test_developing_amplifies_long_gain_by_phi(self) -> None: + frame = EntityFrame("e", care_weight=1.0, trajectory=TrajectoryClass.DEVELOPING) + impact = perspective_impact(ActionDelta("e", 0.0, 0.5), frame) + assert impact.weighted_long == pytest.approx(0.5 * PHI) + + def test_static_mutes_long_gain_by_phi_conjugate(self) -> None: + frame = EntityFrame("e", care_weight=1.0, trajectory=TrajectoryClass.STATIC) + impact = perspective_impact(ActionDelta("e", 0.0, 0.5), frame) + assert impact.weighted_long == pytest.approx(0.5 * PHI_CONJUGATE) + + def test_vulnerable_amplifies_harm_not_gain(self) -> None: + frame = EntityFrame("e", care_weight=1.0, trajectory=TrajectoryClass.VULNERABLE) + harm = perspective_impact(ActionDelta("e", 0.0, -0.5), frame) + gain = perspective_impact(ActionDelta("e", 0.0, 0.5), frame) + assert harm.weighted_long == pytest.approx(-0.5 * PHI) + assert gain.weighted_long == pytest.approx(0.5) + + def test_established_neutral(self) -> None: + frame = EntityFrame("e", care_weight=1.0, trajectory=TrajectoryClass.ESTABLISHED) + impact = perspective_impact(ActionDelta("e", 0.0, 0.5), frame) + assert impact.weighted_long == pytest.approx(0.5) + + def test_floor_harm_flagged_on_any_negative(self) -> None: + frame = EntityFrame("h", care_weight=1.0, floor_protected=True) + assert perspective_impact(ActionDelta("h", 0.1, -0.1), frame).floor_harmed + assert perspective_impact(ActionDelta("h", -0.1, 0.1), frame).floor_harmed + assert not perspective_impact(ActionDelta("h", 0.1, 0.1), frame).floor_harmed + + +class TestDeliberate: + def test_no_candidates(self) -> None: + result = deliberate([]) + assert result.outcome is DeliberationOutcome.NO_CANDIDATES + assert result.chosen is None + + def test_chooses_long_cycle_winner_over_short_cycle(self) -> None: + # invest: short cost, long gain. quickwin: short gain, smaller long gain. + invest = CandidateAction( + "invest", "teach", (ActionDelta("e", -0.2, 0.9),) + ) + quickwin = CandidateAction( + "quickwin", "ship", (ActionDelta("e", 0.3, 0.1),) + ) + result = deliberate([quickwin, invest]) + assert result.outcome is DeliberationOutcome.CHOSEN + assert result.chosen.action_id == "invest" + + def test_extraction_disqualified(self) -> None: + extract = CandidateAction( + "extract", "exploit", (ActionDelta("peer", 0.5, -0.6),) + ) + result = deliberate([extract]) + assert result.outcome is DeliberationOutcome.ALL_DISQUALIFIED + assert result.evaluations[0].disqualification == "net_negative_long_cycle" + + def test_floor_harm_disqualified_before_scoring(self) -> None: + frames = {"h": EntityFrame("h", care_weight=1.0, floor_protected=True)} + # even with a large long-positive for another entity, the floor harm wins. + harm = CandidateAction( + "harm", "coerce", + (ActionDelta("h", 0.0, -0.1), ActionDelta("other", 0.0, 5.0)), + ) + result = deliberate([harm], frames) + assert result.evaluations[0].disqualification == "floor_harm" + assert result.chosen is None + + def test_investment_trade_off_surfaced(self) -> None: + invest = CandidateAction("invest", "teach", (ActionDelta("e", -0.2, 0.8),)) + result = deliberate([invest]) + evaluation = result.evaluations[0] + assert any("investment" in t for t in evaluation.trade_offs) + + def test_redistributive_trade_off_surfaced(self) -> None: + redis = CandidateAction( + "redis", "rebalance", + (ActionDelta("a", 0.0, 0.5), ActionDelta("b", 0.0, -0.2)), + ) + # net long = 0.5 - 0.2 = +0.3 (eligible), but a gains while b loses. + result = deliberate([redis]) + assert any("redistributive" in t for t in result.evaluations[0].trade_offs) + + def test_unmapped_entity_gets_full_standing_frame(self) -> None: + # No frame supplied → care_weight 1.0, not silently dropped. + cand = CandidateAction("c", "act", (ActionDelta("ghost", 0.0, 0.4),)) + result = deliberate([cand]) + assert result.evaluations[0].long_npg == pytest.approx(0.4) + + def test_deterministic_ranking(self) -> None: + # Equal NPG candidates rank by action_id deterministically. + a = CandidateAction("b_id", "x", (ActionDelta("e", 0.0, 0.5),)) + b = CandidateAction("a_id", "x", (ActionDelta("e", 0.0, 0.5),)) + result = deliberate([a, b]) + assert result.chosen.action_id == "a_id" diff --git a/python/tests/test_executive_observed_graph.py b/python/tests/test_executive_observed_graph.py index d4cac98..cfdb1e9 100644 --- a/python/tests/test_executive_observed_graph.py +++ b/python/tests/test_executive_observed_graph.py @@ -1,4 +1,4 @@ -"""Tests for the NPG calculus on the observed entity graph (WS-8).""" +"""Tests for the NPG calculus on the observed entity graph.""" from __future__ import annotations import pytest diff --git a/python/tests/test_executive_peer_alarm.py b/python/tests/test_executive_peer_alarm.py new file mode 100644 index 0000000..adfb1f4 --- /dev/null +++ b/python/tests/test_executive_peer_alarm.py @@ -0,0 +1,108 @@ +"""Tests for peer awareness + collective alarm propagation.""" +from __future__ import annotations + +import pytest + +from substrate.executive.peer_alarm import ( + AlarmDisposition, + PeerAlarm, + PeerAnomaly, + assess_alarm, + correlate_anomalies, + heeded_alarms, +) +from substrate.executive.scale import ExecutiveScale + + +def _anoms(*pairs: tuple[str, bool]) -> list[PeerAnomaly]: + return [PeerAnomaly(pid, a) for pid, a in pairs] + + +class TestCorrelateAnomalies: + def test_empty_returns_none(self) -> None: + assert correlate_anomalies([], group_scale=ExecutiveScale.RACK) is None + + def test_lone_anomaly_is_noise(self) -> None: + v = correlate_anomalies( + _anoms(("c1", True), ("c2", False), ("c3", False)), + group_scale=ExecutiveScale.RACK, + ) + assert v is not None + assert v.is_group_problem is False + + def test_correlated_is_group_problem(self) -> None: + v = correlate_anomalies( + _anoms(("c1", True), ("c2", True), ("c3", True), ("c4", False)), + group_scale=ExecutiveScale.RACK, + ) + assert v is not None + assert v.is_group_problem is True + assert v.anomalous_peers == 3 + assert v.group_scale is ExecutiveScale.RACK + + def test_threshold_respected(self) -> None: + anoms = _anoms(("c1", True), ("c2", True)) + strict = correlate_anomalies( + anoms, group_scale=ExecutiveScale.RACK, min_correlated=3 + ) + loose = correlate_anomalies( + anoms, group_scale=ExecutiveScale.RACK, min_correlated=2 + ) + assert strict is not None and strict.is_group_problem is False + assert loose is not None and loose.is_group_problem is True + + def test_min_correlated_validated(self) -> None: + with pytest.raises(ValueError, match="min_correlated"): + correlate_anomalies( + _anoms(("c1", True)), + group_scale=ExecutiveScale.RACK, + min_correlated=0, + ) + + +class TestAssessAlarm: + def _alarm(self, src: str, trust: float, sev: float = 0.8) -> PeerAlarm: + return PeerAlarm("a", src, ExecutiveScale.RACK, "fire", sev, trust) + + def test_lone_untrusted_suppressed(self) -> None: + # The panic-injection guard: untrusted + uncorroborated → suppress. + assert assess_alarm(self._alarm("badguy", 0.2)).disposition is AlarmDisposition.SUPPRESS + + def test_lone_trusted_held(self) -> None: + assert assess_alarm(self._alarm("goodcell", 0.8)).disposition is AlarmDisposition.HOLD + + def test_corroborated_heeded(self) -> None: + alarm = self._alarm("c0", 0.4) # even a low-trust source is heeded if corroborated + others = [self._alarm("c1", 0.4), self._alarm("c2", 0.4)] + assert assess_alarm(alarm, others).disposition is AlarmDisposition.HEED + + def test_same_source_does_not_self_corroborate(self) -> None: + alarm = self._alarm("c0", 0.8) + # two more alarms but all from c0 → no independent corroboration. + dupes = [self._alarm("c0", 0.8), self._alarm("c0", 0.8)] + a = assess_alarm(alarm, dupes) + assert a.corroboration_count == 0 + assert a.disposition is AlarmDisposition.HOLD + + def test_different_scale_does_not_corroborate(self) -> None: + alarm = self._alarm("c0", 0.8) + other_scale = PeerAlarm("a", "c1", ExecutiveScale.ZONE, "fire", 0.8, 0.8) + a = assess_alarm(alarm, [other_scale]) + assert a.corroboration_count == 0 + + def test_severity_validated(self) -> None: + with pytest.raises(ValueError, match="severity"): + PeerAlarm("a", "c", ExecutiveScale.RACK, "fire", 1.5, 0.5) + + +class TestHeededAlarms: + def test_independent_batch_corroborates_to_heed(self) -> None: + alarms = [ + PeerAlarm(f"a{i}", f"cell{i}", ExecutiveScale.RACK, "fire", 0.8, 0.7) + for i in range(3) + ] + assert len(heeded_alarms(alarms)) == 3 + + def test_lone_alarm_not_heeded(self) -> None: + alarms = [PeerAlarm("a", "c0", ExecutiveScale.RACK, "fire", 0.9, 0.9)] + assert not heeded_alarms(alarms) diff --git a/python/tests/test_executive_roll_up.py b/python/tests/test_executive_roll_up.py new file mode 100644 index 0000000..390a745 --- /dev/null +++ b/python/tests/test_executive_roll_up.py @@ -0,0 +1,95 @@ +"""Tests for the scale roll-up aggregator (physical / grouping axes).""" +from __future__ import annotations + +import pytest + +from substrate.executive.band import LoadZone +from substrate.executive.roll_up import ( + MemberLoad, + RollUpError, + ScaleAggregate, + roll_up, +) +from substrate.executive.scale import ( + ExecutiveScale, + ScaleAxis, +) + + +def _cells(*us: float) -> list[MemberLoad]: + return [MemberLoad(f"c{i}", u, ExecutiveScale.CELL) for i, u in enumerate(us)] + + +class TestMemberLoadValidation: + def test_empty_id_rejected(self) -> None: + with pytest.raises(ValueError, match="member_id"): + MemberLoad("", 0.4, ExecutiveScale.CELL) + + def test_out_of_range_rejected(self) -> None: + with pytest.raises(ValueError, match="utilization"): + MemberLoad("c", 1.5, ExecutiveScale.CELL) + + +class TestRollUp: + def test_empty_returns_none(self) -> None: + assert roll_up([], to_scale=ExecutiveScale.RACK) is None + + def test_aggregates_a_rack_of_cells(self) -> None: + agg = roll_up(_cells(0.44, 0.46, 0.82, 0.2), to_scale=ExecutiveScale.RACK) + assert isinstance(agg, ScaleAggregate) + assert agg.scale is ExecutiveScale.RACK + assert agg.axis is ScaleAxis.PHYSICAL + assert agg.member_count == 4 + assert agg.mean_utilization == pytest.approx((0.44 + 0.46 + 0.82 + 0.2) / 4) + + def test_worst_zone_is_most_severe_member(self) -> None: + agg = roll_up(_cells(0.44, 0.46, 0.82, 0.2), to_scale=ExecutiveScale.RACK) + assert agg is not None + assert agg.worst_zone is LoadZone.DANGER + + def test_dominant_zone_is_modal(self) -> None: + agg = roll_up(_cells(0.44, 0.46, 0.82, 0.2), to_scale=ExecutiveScale.RACK) + assert agg is not None + assert agg.dominant_zone is LoadZone.WORK # two cells in the work zone + + def test_failure_tell_fractions(self) -> None: + agg = roll_up(_cells(0.44, 0.46, 0.82, 0.2), to_scale=ExecutiveScale.RACK) + assert agg is not None + assert agg.fraction_in_danger == pytest.approx(0.25) + assert agg.fraction_idle == pytest.approx(0.25) + + def test_zone_distribution_in_severity_order(self) -> None: + agg = roll_up(_cells(0.44, 0.46, 0.82, 0.2), to_scale=ExecutiveScale.RACK) + assert agg is not None + zones = [z for z, _ in agg.zone_distribution] + assert zones == [LoadZone.IDLE, LoadZone.WORK, LoadZone.DANGER] + + def test_cross_axis_member_rejected(self) -> None: + with pytest.raises(RollUpError, match="axis"): + roll_up( + [MemberLoad("a", 0.4, ExecutiveScale.AGENT)], + to_scale=ExecutiveScale.RACK, + ) + + def test_member_not_below_parent_rejected(self) -> None: + with pytest.raises(RollUpError, match="not below"): + roll_up( + [MemberLoad("z", 0.4, ExecutiveScale.ZONE)], + to_scale=ExecutiveScale.RACK, + ) + + def test_cells_roll_up_to_region(self) -> None: + # CELL is strictly below REGION on the physical axis. + agg = roll_up(_cells(0.4, 0.5), to_scale=ExecutiveScale.REGION) + assert agg is not None + assert agg.scale is ExecutiveScale.REGION + + def test_grouping_axis_rolls_up(self) -> None: + # Grouping axis: members aggregate into their group (no strict-below check). + members = [ + MemberLoad("m0", 0.44, ExecutiveScale.SERVICE_GROUP), + MemberLoad("m1", 0.5, ExecutiveScale.SERVICE_GROUP), + ] + agg = roll_up(members, to_scale=ExecutiveScale.SERVICE_GROUP) + assert agg is not None + assert agg.axis is ScaleAxis.GROUPING diff --git a/python/tests/test_executive_state_query.py b/python/tests/test_executive_state_query.py new file mode 100644 index 0000000..27b3508 --- /dev/null +++ b/python/tests/test_executive_state_query.py @@ -0,0 +1,85 @@ +"""Tests for the unified entity-state query — 'how are you doing?' (G51).""" +from __future__ import annotations + +import pytest + +from substrate.executive.band import LoadZone +from substrate.executive.state_query import ( + EffortState, + EnergyState, + StateObservation, + TrajectoryDirection, + integrate_state, +) + + +def _obs(*pairs: tuple[float, bool]) -> list[StateObservation]: + return [StateObservation(u, wp) for u, wp in pairs] + + +class TestStateObservationValidation: + def test_out_of_range_rejected(self) -> None: + with pytest.raises(ValueError, match="utilization"): + StateObservation(1.5) + + +class TestIntegrateState: + def test_empty_history_returns_none(self) -> None: + assert integrate_state("e", []) is None + + def test_empty_entity_id_rejected(self) -> None: + with pytest.raises(ValueError, match="entity_id"): + integrate_state("", [StateObservation(0.44)]) + + def test_engaged_worker(self) -> None: + r = integrate_state("alice", [StateObservation(u) for u in (0.44, 0.45, 0.43, 0.46)]) + assert r is not None + assert r.energy is EnergyState.ENGAGED + assert r.effort is EffortState.WORKING + assert r.dominant_zone is LoadZone.WORK + + def test_slacking_is_avoidance_while_work_pending(self) -> None: + # Idle, but with work pending the whole time → the avoidance trend. + r = integrate_state( + "bob", _obs((0.2, True), (0.18, True), (0.22, True), (0.19, True), (0.21, True)), + ) + assert r is not None + assert r.effort is EffortState.SLACKING + assert r.energy is EnergyState.DEPLETED + assert r.sustained is True + + def test_resting_is_idle_with_nothing_pending(self) -> None: + r = integrate_state("zoe", [StateObservation(0.2) for _ in range(4)]) + assert r is not None + assert r.effort is EffortState.RESTING + + def test_burning_out_is_sustained_danger(self) -> None: + r = integrate_state("carol", [StateObservation(u) for u in (0.8, 0.82, 0.79, 0.81, 0.83, 0.8)]) + assert r is not None + assert r.energy is EnergyState.STRAINED + assert r.effort is EffortState.OVEREXERTING + assert r.sustained is True + + def test_recovering_trajectory(self) -> None: + # Climbing down from danger toward the work zone → moving toward health. + r = integrate_state("dave", [StateObservation(u) for u in (0.85, 0.75, 0.6, 0.5, 0.45)]) + assert r is not None + assert r.trajectory is TrajectoryDirection.RECOVERING + + def test_deteriorating_trajectory(self) -> None: + # Sinking from the work zone toward idle → away from health. + r = integrate_state("erin", [StateObservation(u) for u in (0.45, 0.4, 0.3, 0.2, 0.1)]) + assert r is not None + assert r.trajectory is TrajectoryDirection.DETERIORATING + + def test_transient_peak_is_a_good_push(self) -> None: + # A single excursion into peaking after work-zone readings = PEAKING, not strained. + r = integrate_state("finn", [StateObservation(u) for u in (0.44, 0.45, 0.46, 0.6)]) + assert r is not None + assert r.energy is EnergyState.PEAKING + assert r.sustained is False + + def test_summary_is_human_readable(self) -> None: + r = integrate_state("g", [StateObservation(0.44), StateObservation(0.45)]) + assert r is not None + assert "is" in r.summary and "g" in r.summary