diff --git a/src/buildcompiler/planning/__init__.py b/src/buildcompiler/planning/__init__.py index 5e19692..5dd1576 100644 --- a/src/buildcompiler/planning/__init__.py +++ b/src/buildcompiler/planning/__init__.py @@ -1 +1,6 @@ -"""Package scaffolding for clean architecture.""" +"""Planning package exports.""" + +from .full_build_planner import FullBuildPlanner +from .models import BuildPlan, UnsupportedPlanningRecord + +__all__ = ["BuildPlan", "UnsupportedPlanningRecord", "FullBuildPlanner"] diff --git a/src/buildcompiler/planning/classifier.py b/src/buildcompiler/planning/classifier.py new file mode 100644 index 0000000..5d666ae --- /dev/null +++ b/src/buildcompiler/planning/classifier.py @@ -0,0 +1,112 @@ +"""Design classification helpers for planning.""" + +from __future__ import annotations + +import re +from collections import Counter + +import sbol2 + +from buildcompiler.domain import BuildRequest, BuildStage, DesignKind +from buildcompiler.planning.models import UnsupportedPlanningRecord +from buildcompiler.planning.validation import classify_part_role + +RECOMMENDED_LVL1_PARTS = ("promoter", "rbs", "cds", "terminator") + + +def _stable_slug(identity: str) -> str: + return re.sub(r"[^a-z0-9]+", "-", identity.lower()).strip("-")[-48:] + + +def request_id_for( + stage: BuildStage, + source_identity: str, + source_display_id: str | None, + *, + variant_index: int | None = None, +) -> str: + base = source_display_id or _stable_slug(source_identity) + rid = f"{stage.value}:{base}" + if variant_index is not None: + rid = f"{rid}:v{variant_index}" + return rid + + +def classify_non_combinatorial( + design: object, +) -> BuildRequest | UnsupportedPlanningRecord: + if isinstance(design, sbol2.ModuleDefinition): + return BuildRequest( + request_id_for(BuildStage.ASSEMBLY_LVL2, design.identity, design.displayId), + BuildStage.ASSEMBLY_LVL2, + design.identity, + design.displayId, + DesignKind.MODULE_DEFINITION, + ) + + if isinstance(design, sbol2.ComponentDefinition): + count = len(design.components) + if count > 1: + observed_roles: list[str] = [] + for component in design.components: + target = ( + component.doc.find(component.definition) + if getattr(component, "doc", None) is not None + else None + ) + if isinstance(target, sbol2.ComponentDefinition): + role = classify_part_role(target) + if role is not None: + observed_roles.append(role) + + counts = Counter(observed_roles) + missing = [role for role in RECOMMENDED_LVL1_PARTS if counts[role] != 1] + has_role_evidence = len(observed_roles) > 0 + if count != 4 or (has_role_evidence and missing): + return UnsupportedPlanningRecord( + design.identity, + design.displayId, + DesignKind.COMPONENT_DEFINITION, + "Warning: Level-1 planning expects exactly four parts (promoter, RBS, CDS, terminator).", + { + "component_count": count, + "observed_role_counts": { + role: counts.get(role, 0) + for role in RECOMMENDED_LVL1_PARTS + }, + "suggested_parts": list(RECOMMENDED_LVL1_PARTS), + }, + ) + return BuildRequest( + request_id_for( + BuildStage.ASSEMBLY_LVL1, design.identity, design.displayId + ), + BuildStage.ASSEMBLY_LVL1, + design.identity, + design.displayId, + DesignKind.COMPONENT_DEFINITION, + ) + if count <= 1 and classify_part_role(design) is not None: + return BuildRequest( + request_id_for( + BuildStage.DOMESTICATION, design.identity, design.displayId + ), + BuildStage.DOMESTICATION, + design.identity, + design.displayId, + DesignKind.COMPONENT_DEFINITION, + ) + return UnsupportedPlanningRecord( + design.identity, + design.displayId, + DesignKind.COMPONENT_DEFINITION, + "ComponentDefinition is single/empty with unsupported role.", + {"component_count": count, "roles": list(design.roles)}, + ) + + return UnsupportedPlanningRecord( + getattr(design, "identity", str(design)), + getattr(design, "displayId", None), + DesignKind.UNSUPPORTED, + f"Unsupported design type: {type(design).__name__}", + ) diff --git a/src/buildcompiler/planning/combinatorial.py b/src/buildcompiler/planning/combinatorial.py new file mode 100644 index 0000000..a94a8bc --- /dev/null +++ b/src/buildcompiler/planning/combinatorial.py @@ -0,0 +1,113 @@ +from __future__ import annotations +import itertools +import sbol2 +from buildcompiler.api import BuildOptions +from buildcompiler.domain import BuildRequest, BuildStage, BuildWarning, DesignKind +from buildcompiler.planning.classifier import request_id_for +from buildcompiler.planning.models import UnsupportedPlanningRecord +from buildcompiler.planning.validation import ROLE_TO_NAME + + +def _collect_variant_sets(derivation): + variables = list(derivation.variableComponents) + variables.sort( + key=lambda variable: (str(getattr(variable, "variable", "")), variable.identity) + ) + return variables, [sorted(list(vc.variants), key=str) for vc in variables] + + +def expand_combinatorial_derivation( + derivation: sbol2.CombinatorialDerivation, *, options: BuildOptions +): + warnings = [] + unsupported = [] + requests = [] + variables, variant_sets = _collect_variant_sets(derivation) + if not variant_sets or any(len(v) == 0 for v in variant_sets): + unsupported.append( + UnsupportedPlanningRecord( + derivation.identity, + derivation.displayId, + DesignKind.COMBINATORIAL_DERIVATION, + "Variable component has no listed variants.", + ) + ) + return requests, unsupported, warnings + total = 1 + for v in variant_sets: + total *= len(v) + if ( + total > options.planning.combinatorial.max_variants + and not options.planning.combinatorial.allow_large_expansion + ): + warnings.append( + BuildWarning( + "planning.combinatorial.expansion_blocked", + "Combinatorial expansion exceeds max_variants and is blocked.", + BuildStage.ASSEMBLY_LVL1, + derivation.identity, + { + "variant_count": total, + "max_variants": options.planning.combinatorial.max_variants, + }, + ) + ) + unsupported.append( + UnsupportedPlanningRecord( + derivation.identity, + derivation.displayId, + DesignKind.COMBINATORIAL_DERIVATION, + "Combinatorial expansion blocked by max_variants.", + {"variant_count": total}, + ) + ) + return requests, unsupported, warnings + doc = derivation.doc + valid = 0 + for idx, chosen in enumerate(itertools.product(*variant_sets)): + roles = [] + for pid in chosen: + obj = doc.find(pid) if doc is not None else None + if isinstance(obj, sbol2.ComponentDefinition): + matching = [ROLE_TO_NAME[r] for r in obj.roles if r in ROLE_TO_NAME] + if len(matching) == 1: + roles.append(matching[0]) + if sorted(roles) != ["cds", "promoter", "rbs", "terminator"]: + warnings.append( + BuildWarning( + "planning.combinatorial.invalid_variant", + "Skipping invalid combinatorial variant.", + BuildStage.ASSEMBLY_LVL1, + derivation.identity, + {"variant_index": idx}, + ) + ) + continue + requests.append( + BuildRequest( + request_id_for( + BuildStage.ASSEMBLY_LVL1, + derivation.identity, + derivation.displayId, + variant_index=idx, + ), + BuildStage.ASSEMBLY_LVL1, + derivation.identity, + derivation.displayId, + DesignKind.COMBINATORIAL_DERIVATION, + parent_group=derivation.identity, + variant_index=idx, + constraints={"part_order": list(chosen)}, + ) + ) + valid += 1 + if valid == 0: + unsupported.append( + UnsupportedPlanningRecord( + derivation.identity, + derivation.displayId, + DesignKind.COMBINATORIAL_DERIVATION, + "All combinatorial variants were invalid.", + ) + ) + return requests, unsupported, warnings diff --git a/src/buildcompiler/planning/full_build_planner.py b/src/buildcompiler/planning/full_build_planner.py new file mode 100644 index 0000000..a4189fa --- /dev/null +++ b/src/buildcompiler/planning/full_build_planner.py @@ -0,0 +1,49 @@ +"""Pure full-build planner orchestration.""" + +from __future__ import annotations + +from collections.abc import Iterable + +import sbol2 + +from buildcompiler.api import BuildOptions +from buildcompiler.planning.classifier import classify_non_combinatorial +from buildcompiler.planning.combinatorial import expand_combinatorial_derivation +from buildcompiler.planning.models import BuildPlan, UnsupportedPlanningRecord +from buildcompiler.sbol import SbolResolver + + +class FullBuildPlanner: + def __init__( + self, + *, + options: BuildOptions | None = None, + resolver: SbolResolver | None = None, + ): + self.options = options or BuildOptions() + self.resolver = resolver + + def plan( + self, abstract_designs: Iterable[object], *, options: BuildOptions | None = None + ) -> BuildPlan: + active = options or self.options + out = BuildPlan() + for design in abstract_designs: + if isinstance(design, sbol2.CombinatorialDerivation): + reqs, unsupported, warnings = expand_combinatorial_derivation( + design, options=active + ) + out.lvl1_requests.extend(reqs) + out.unsupported.extend(unsupported) + out.warnings.extend(warnings) + continue + classified = classify_non_combinatorial(design) + if isinstance(classified, UnsupportedPlanningRecord): + out.unsupported.append(classified) + elif classified.stage.value == "assembly_lvl2": + out.lvl2_requests.append(classified) + elif classified.stage.value == "assembly_lvl1": + out.lvl1_requests.append(classified) + else: + out.domestication_requests.append(classified) + return out diff --git a/src/buildcompiler/planning/models.py b/src/buildcompiler/planning/models.py new file mode 100644 index 0000000..d4782c7 --- /dev/null +++ b/src/buildcompiler/planning/models.py @@ -0,0 +1,24 @@ +"""Planning contracts for full-build planning output.""" + +from dataclasses import dataclass, field +from typing import Any + +from buildcompiler.domain import BuildRequest, BuildWarning, DesignKind + + +@dataclass +class UnsupportedPlanningRecord: + source_identity: str + source_display_id: str | None + source_kind: DesignKind + reason: str + metadata: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class BuildPlan: + lvl2_requests: list[BuildRequest] = field(default_factory=list) + lvl1_requests: list[BuildRequest] = field(default_factory=list) + domestication_requests: list[BuildRequest] = field(default_factory=list) + unsupported: list[UnsupportedPlanningRecord] = field(default_factory=list) + warnings: list[BuildWarning] = field(default_factory=list) diff --git a/src/buildcompiler/planning/validation.py b/src/buildcompiler/planning/validation.py new file mode 100644 index 0000000..b00c70e --- /dev/null +++ b/src/buildcompiler/planning/validation.py @@ -0,0 +1,140 @@ +"""Deterministic validation helpers for level-1 designs.""" + +from __future__ import annotations + +from collections import Counter + +import sbol2 + +from buildcompiler.constants import PART_ROLES +from buildcompiler.domain import BuildStage, BuildWarning + +ROLE_TO_NAME = { + "http://identifiers.org/so/SO:0000167": "promoter", + "http://identifiers.org/so/SO:0000139": "rbs", + "http://identifiers.org/so/SO:0000316": "cds", + "http://identifiers.org/so/SO:0000141": "terminator", +} +CANONICAL_ROLE_ORDER = ["promoter", "rbs", "cds", "terminator"] + + +def classify_part_role(component_definition: sbol2.ComponentDefinition) -> str | None: + matches = sorted( + ROLE_TO_NAME[role] for role in component_definition.roles if role in PART_ROLES + ) + if len(matches) != 1: + return None + return matches[0] + + +def validate_lvl1_cardinality( + component_definition: sbol2.ComponentDefinition, +) -> tuple[bool, list[BuildWarning]]: + warnings: list[BuildWarning] = [] + if len(component_definition.components) != 4: + warnings.append( + BuildWarning( + code="lvl1.variable_length", + message="Level-1 v1 requires exactly four components.", + stage=BuildStage.ASSEMBLY_LVL1, + source_identity=component_definition.identity, + ) + ) + return False, warnings + + roles = [] + for comp in component_definition.components: + target = ( + comp.doc.find(comp.definition) + if getattr(comp, "doc", None) is not None + else None + ) + if not isinstance(target, sbol2.ComponentDefinition): + continue + role = classify_part_role(target) + if role is not None: + roles.append(role) + + counts = Counter(roles) + ok = True + for required in CANONICAL_ROLE_ORDER: + if counts[required] != 1: + ok = False + warnings.append( + BuildWarning( + code="lvl1.invalid_cardinality", + message=f"Expected exactly one {required}, found {counts[required]}.", + stage=BuildStage.ASSEMBLY_LVL1, + source_identity=component_definition.identity, + metadata={"role": required, "count": counts[required]}, + ) + ) + return ok, warnings + + +def ordered_lvl1_parts( + component_definition: sbol2.ComponentDefinition, +) -> tuple[list[str], list[BuildWarning]]: + warnings: list[BuildWarning] = [] + role_to_identity: dict[str, str] = {} + comp_by_identity: dict[str, sbol2.Component] = {} + for comp in component_definition.components: + target = ( + comp.doc.find(comp.definition) + if getattr(comp, "doc", None) is not None + else None + ) + if isinstance(target, sbol2.ComponentDefinition): + role = classify_part_role(target) + if role: + role_to_identity[role] = target.identity + comp_by_identity[target.identity] = comp + + if len(role_to_identity) < 4: + return [ + role_to_identity[r] for r in CANONICAL_ROLE_ORDER if r in role_to_identity + ], warnings + + try: + ordered_components = list(component_definition.getInSequentialOrder()) + except Exception: + ordered_components = [] + + if len(ordered_components) == 4: + ordered_part_ids = [] + ordered_roles = [] + for comp in ordered_components: + target = ( + comp.doc.find(comp.definition) + if getattr(comp, "doc", None) is not None + else None + ) + if isinstance(target, sbol2.ComponentDefinition): + role = classify_part_role(target) + if role: + ordered_roles.append(role) + ordered_part_ids.append(target.identity) + if len(ordered_part_ids) == 4 and set(ordered_roles) == set( + CANONICAL_ROLE_ORDER + ): + if ordered_roles != CANONICAL_ROLE_ORDER: + warnings.append( + BuildWarning( + code="lvl1.non_canonical_order", + message="SBOL order is non-canonical and will be preserved.", + stage=BuildStage.ASSEMBLY_LVL1, + source_identity=component_definition.identity, + metadata={"ordered_roles": ordered_roles}, + ) + ) + return ordered_part_ids, warnings + + warnings.append( + BuildWarning( + code="lvl1.ambiguous_order", + message="SBOL order unavailable or ambiguous; using canonical order.", + stage=BuildStage.ASSEMBLY_LVL1, + source_identity=component_definition.identity, + ) + ) + return [role_to_identity[role] for role in CANONICAL_ROLE_ORDER], warnings diff --git a/tests/unit/planning/test_classifier.py b/tests/unit/planning/test_classifier.py new file mode 100644 index 0000000..961e915 --- /dev/null +++ b/tests/unit/planning/test_classifier.py @@ -0,0 +1,55 @@ +import sbol2 + +from buildcompiler.domain import BuildStage +from buildcompiler.planning.classifier import classify_non_combinatorial, request_id_for +from buildcompiler.planning.models import UnsupportedPlanningRecord + + +def test_classifier_maps_module_and_components(): + sbol2.setHomespace("https://example.org") + md = sbol2.ModuleDefinition("https://example.org/mod") + out = classify_non_combinatorial(md) + assert out.stage == BuildStage.ASSEMBLY_LVL2 + + er = sbol2.ComponentDefinition("https://example.org/er") + p = sbol2.ComponentDefinition("https://example.org/p") + p.roles = ["http://identifiers.org/so/SO:0000167"] + r = sbol2.ComponentDefinition("https://example.org/r") + r.roles = ["http://identifiers.org/so/SO:0000139"] + c = sbol2.ComponentDefinition("https://example.org/c") + c.roles = ["http://identifiers.org/so/SO:0000316"] + t = sbol2.ComponentDefinition("https://example.org/t") + t.roles = ["http://identifiers.org/so/SO:0000141"] + er.components.create("c1").definition = p.identity + er.components.create("c2").definition = r.identity + er.components.create("c3").definition = c.identity + er.components.create("c4").definition = t.identity + out2 = classify_non_combinatorial(er) + assert out2.stage == BuildStage.ASSEMBLY_LVL1 + + +def test_classifier_warns_for_invalid_lvl1_part_mix(): + design = sbol2.ComponentDefinition("https://example.org/invalid") + p = sbol2.ComponentDefinition("https://example.org/p2") + p.roles = ["http://identifiers.org/so/SO:0000167"] + design.components.create("c1").definition = p.identity + design.components.create("c2").definition = p.identity + + out = classify_non_combinatorial(design) + assert isinstance(out, UnsupportedPlanningRecord) + assert "promoter" in out.reason.lower() + + +def test_classifier_domestication_and_unsupported_and_deterministic_id(): + part = sbol2.ComponentDefinition("https://example.org/part") + part.roles = ["http://identifiers.org/so/SO:0000139"] + out = classify_non_combinatorial(part) + assert out.stage == BuildStage.DOMESTICATION + + unknown = sbol2.ComponentDefinition("https://example.org/u") + bad = classify_non_combinatorial(unknown) + assert isinstance(bad, UnsupportedPlanningRecord) + + rid1 = request_id_for(BuildStage.ASSEMBLY_LVL1, "https://example.org/A", "A") + rid2 = request_id_for(BuildStage.ASSEMBLY_LVL1, "https://example.org/A", "A") + assert rid1 == rid2 diff --git a/tests/unit/planning/test_combinatorial.py b/tests/unit/planning/test_combinatorial.py new file mode 100644 index 0000000..ea54215 --- /dev/null +++ b/tests/unit/planning/test_combinatorial.py @@ -0,0 +1,84 @@ +import sbol2 +from buildcompiler.api import BuildOptions +from buildcompiler.planning.combinatorial import expand_combinatorial_derivation + +ROLE_URIS = [ + "http://identifiers.org/so/SO:0000167", + "http://identifiers.org/so/SO:0000139", + "http://identifiers.org/so/SO:0000316", + "http://identifiers.org/so/SO:0000141", +] + + +def _build_comb(valid=True): + sbol2.setHomespace("https://example.org") + doc = sbol2.Document() + template = sbol2.ComponentDefinition("https://example.org/template") + doc.add(template) + comb = sbol2.CombinatorialDerivation("https://example.org/comb", template.identity) + doc.add(comb) + roles = ROLE_URIS if valid else ROLE_URIS[:3] + for i, role in enumerate(roles): + vc = comb.variableComponents.create(f"https://example.org/var{i}") + p = sbol2.ComponentDefinition(f"https://example.org/v{i}") + p.roles = [role] + doc.add(p) + vc.variants = [p.identity] + return comb + + +def test_expansion_and_blocking_behaviors(): + comb = _build_comb(valid=True) + reqs, unsupported, warnings = expand_combinatorial_derivation( + comb, options=BuildOptions() + ) + assert len(reqs) == 1 and unsupported == [] and reqs[0].variant_index == 0 + limited = BuildOptions() + limited.planning.combinatorial.max_variants = 0 + reqs2, unsupported2, warnings2 = expand_combinatorial_derivation( + comb, options=limited + ) + assert reqs2 == [] and unsupported2 + assert any(w.code == "planning.combinatorial.expansion_blocked" for w in warnings2) + invalid = _build_comb(valid=False) + reqs3, unsupported3, warnings3 = expand_combinatorial_derivation( + invalid, options=BuildOptions() + ) + assert reqs3 == [] and unsupported3 + assert any(w.code == "planning.combinatorial.invalid_variant" for w in warnings3) + + +def test_part_order_follows_template_sequence_not_variable_ids(): + sbol2.setHomespace("https://example.org") + doc = sbol2.Document() + template = sbol2.ComponentDefinition("https://example.org/template_seq") + doc.add(template) + template_components = [] + for idx in range(4): + component = template.components.create(f"t_component_{idx}") + component.definition = f"https://example.org/template_part_{idx}" + template_components.append(component) + comb = sbol2.CombinatorialDerivation( + "https://example.org/comb_seq", template.identity + ) + doc.add(comb) + variable_specs = [ + ("var_z", template_components[0], ROLE_URIS[0], "p0"), + ("var_y", template_components[1], ROLE_URIS[1], "p1"), + ("var_x", template_components[2], ROLE_URIS[2], "p2"), + ("var_w", template_components[3], ROLE_URIS[3], "p3"), + ] + expected_order = [] + for var_id, template_component, role, part_id in variable_specs: + vc = comb.variableComponents.create(var_id) + vc.variable = template_component.identity + part = sbol2.ComponentDefinition(f"https://example.org/{part_id}") + part.roles = [role] + doc.add(part) + expected_order.append(part.identity) + vc.variants = [part.identity] + reqs, unsupported, warnings = expand_combinatorial_derivation( + comb, options=BuildOptions() + ) + assert len(reqs) == 1 and unsupported == [] + assert reqs[0].constraints["part_order"] == expected_order diff --git a/tests/unit/planning/test_full_build_planner.py b/tests/unit/planning/test_full_build_planner.py new file mode 100644 index 0000000..4fb66a5 --- /dev/null +++ b/tests/unit/planning/test_full_build_planner.py @@ -0,0 +1,31 @@ +import sbol2 + +from buildcompiler.planning import FullBuildPlanner + + +def test_planner_mixed_inputs_returns_queues_and_warnings(): + sbol2.setHomespace("https://example.org") + mod = sbol2.ModuleDefinition("https://example.org/mod") + multi = sbol2.ComponentDefinition("https://example.org/multi") + p = sbol2.ComponentDefinition("https://example.org/p") + p.roles = ["http://identifiers.org/so/SO:0000167"] + r = sbol2.ComponentDefinition("https://example.org/r") + r.roles = ["http://identifiers.org/so/SO:0000139"] + c = sbol2.ComponentDefinition("https://example.org/c") + c.roles = ["http://identifiers.org/so/SO:0000316"] + t = sbol2.ComponentDefinition("https://example.org/t") + t.roles = ["http://identifiers.org/so/SO:0000141"] + multi.components.create("c1").definition = p.identity + multi.components.create("c2").definition = r.identity + multi.components.create("c3").definition = c.identity + multi.components.create("c4").definition = t.identity + dom = sbol2.ComponentDefinition("https://example.org/dom") + dom.roles = ["http://identifiers.org/so/SO:0000139"] + + planner = FullBuildPlanner() + plan = planner.plan([mod, multi, dom, object()]) + + assert len(plan.lvl2_requests) == 1 + assert len(plan.lvl1_requests) == 1 + assert len(plan.domestication_requests) == 1 + assert len(plan.unsupported) == 1 diff --git a/tests/unit/planning/test_validation.py b/tests/unit/planning/test_validation.py new file mode 100644 index 0000000..32b53ef --- /dev/null +++ b/tests/unit/planning/test_validation.py @@ -0,0 +1,49 @@ +import sbol2 +from buildcompiler.planning.validation import ( + ordered_lvl1_parts, + validate_lvl1_cardinality, +) + +ROLE_URIS = [ + "http://identifiers.org/so/SO:0000167", + "http://identifiers.org/so/SO:0000139", + "http://identifiers.org/so/SO:0000316", + "http://identifiers.org/so/SO:0000141", +] + + +def _mk_part(i, role, doc): + p = sbol2.ComponentDefinition(f"https://example.org/p{i}") + p.roles = [role] + doc.add(p) + return p + + +def _mk_lvl1(parts, doc): + d = sbol2.ComponentDefinition("https://example.org/lvl1") + for i, p in enumerate(parts): + d.components.create(f"c{i}").definition = p.identity + doc.add(d) + return d + + +def test_validate_cardinality_and_order_fallback_and_warning(): + sbol2.setHomespace("https://example.org") + doc = sbol2.Document() + parts = [_mk_part(i, r, doc) for i, r in enumerate(ROLE_URIS)] + lvl1 = _mk_lvl1(parts, doc) + ok, warnings = validate_lvl1_cardinality(lvl1) + assert ok is True + assert warnings == [] + ordered, ow = ordered_lvl1_parts(lvl1) + assert len(ordered) == 4 + + +def test_validate_missing_or_duplicate_role_fails(): + sbol2.setHomespace("https://example.org") + doc = sbol2.Document() + p = _mk_part(0, ROLE_URIS[0], doc) + lvl1 = _mk_lvl1([p, p, p, p], doc) + ok, warnings = validate_lvl1_cardinality(lvl1) + assert ok is False + assert any(w.code == "lvl1.invalid_cardinality" for w in warnings)