Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions edg/abstract_parts/BaseIoControllerWrapped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import *

from ..electronics_interfaces import *
from .IoController import BaseIoController


class BaseIoControllerWrapped(BaseIoController):
"""Base class for IoController wrapped blocks, particularly footprints that are used
with an outer WrapperSubboardBlock to implement e.g. a dev board or module around a modeling subcircuit.

Provides some utility functions to remap pin assignments from the model to the footprint.

In this class, pin_assigns is treated as the model's pin assigns and internally remapped.
"""

@staticmethod
def _remap_pin_assigns_list(
remapping: Dict[str, str],
pin_assigns: List[str],
*,
invert_remapping: bool = False,
) -> Dict[str, str]:
"""Given a remapping dict and a list of pin assigns, returns the mapping as a dict with the remapping applied.
If invert_remapping is True, the remapping dict is inverted before applying.
Assigns not present in the remapping dict are passed unchanged, eg for non-pin-assigns like bundle containers.
"""
if invert_remapping:
remapping = {v: k for k, v in remapping.items()}

remapped_assigns = {}
for assign in pin_assigns:
name, pindef = assign.split("=")
pin = pindef.split(",")[0].strip() # take the first (gpio name) if multiple
remapped_pin = remapping.get(pin)
if remapped_pin is not None:
remapped_assigns[name.strip()] = remapped_pin
else:
remapped_assigns[name.strip()] = pindef # pass unmodified if not remappable, eg bundle containers
return remapped_assigns

def _remap_to_footprint_pinning(
self, pin_assigns: Dict[str, str], valid_pins: Iterable[str]
) -> Dict[str, HasPassivePort]:
"""Given the pin assigns in a dict form as port name -> footprint pin, eg from _remap_pin_assigns_list,
returns the footprint-compatible form as footprint pin -> port object.

This simplified pin assignment tool requires all pins to be assigned.
It does not automatically assign unassigned pins, that is assumed to have happened at a higher level."""
pinning: Dict[str, HasPassivePort] = {}
seen_names: Set[str] = set()

def remap_port_recursive(port: Port, prefix: str = "") -> None:
"""Remaps a port, recursively for bundles"""
if isinstance(port, HasPassivePort):
pin = pin_assigns.get(prefix)
assert pin is not None, f"pin {prefix} not assigned"
assert pin in valid_pins, f"pin {pin} not in valid pins {valid_pins}"
pinning[pin] = port

for subport_name, subport in port._ports.items():
remap_port_recursive(subport, f"{prefix}.{subport_name}")

for io_port in self._io_ports:
if isinstance(io_port, Vector):
io_port.defined()
for subport_name in self.get(io_port.requested()):
assert subport_name not in seen_names, f"duplicate pin name {subport_name}"
seen_names.add(subport_name)
subport = io_port.append_elt(io_port._tpe.empty(), subport_name)
remap_port_recursive(subport, subport_name)
elif isinstance(io_port, Port):
if self.get(io_port.is_connected()):
port_name = io_port._name_from(self)
assert port_name not in seen_names, f"duplicate pin name {port_name}"
seen_names.add(port_name)
remap_port_recursive(io_port, port_name)
else:
raise NotImplementedError(f"unknown port type {io_port}")

return pinning

@staticmethod
def _remap_assigns_to_value(assigns: Dict[str, str]) -> List[str]:
"""Given a dict of pin assigns from _remap_pinning_assigns, returns a list of assign strings
for use in self.actual_pin_assigns"""
return [f"{name}={assign}" for name, assign in assigns.items()]
46 changes: 46 additions & 0 deletions edg/abstract_parts/IoController.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,52 @@ def _export_tap_ios_inner(self, inner: "BaseIoController") -> None:
inner_io = inner_ios_by_type[self_io_type]
self.export_tap(self_io, inner_io)

def _generator_param_all_ios(self) -> None:
"""Declares all BaseIoController IOs as generator params.
This must be a GeneratorBlock."""
assert isinstance(self, GeneratorBlock)
for io_port in self._io_ports:
if isinstance(io_port, Vector):
self.generator_param(io_port.requested())
elif isinstance(io_port, Port):
self.generator_param(io_port.is_connected())
else:
raise NotImplementedError(f"unknown port type {io_port}")

def _generator_pin_dict(self) -> Dict[str, Port]:
"""Returns a dict of pin name to port for all IO ports, recursing into bundles Ports.
This includes both the bundle container Port and their (recursive) contents.
Users of this may want to filter by the port type.

For Vector-typed IO ports, this generates the subports and must be authoritative.
This cannot be used with anything else that generates vector sub-ports.
This must be a GeneratorBlock."""
assert isinstance(self, GeneratorBlock)

pin_dict: Dict[str, Port] = {}

def recurse_port(port: Port, prefix: str = "") -> None:
assert prefix not in pin_dict, f"duplicate pin name {prefix}"
pin_dict[prefix] = port

for subport_name, subport in port._ports.items():
recurse_port(subport, f"{prefix}.{subport_name}")

for io_port in self._io_ports:
if isinstance(io_port, Vector):
io_port.defined()
for subport_name in self.get(io_port.requested()):
subport = io_port.append_elt(io_port._tpe.empty(), subport_name)
recurse_port(subport, subport_name)
elif isinstance(io_port, Port):
if self.get(io_port.is_connected()):
port_name = io_port._name_from(self)
recurse_port(io_port, port_name)
else:
raise NotImplementedError(f"unknown port type {io_port}")

return pin_dict

@staticmethod
def _instantiate_from(
ios: List[BasePort], allocations: List[AllocatedResource]
Expand Down
64 changes: 0 additions & 64 deletions edg/abstract_parts/IoControllerWrapped.py

This file was deleted.

76 changes: 67 additions & 9 deletions edg/abstract_parts/PinMappable.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,24 @@ class BaseDelegatingPinMapResource(BasePinMapResource):


class PinResource(BaseLeafPinMapResource):
"""A resource for a single chip pin, which can be one of several port types (eg, an ADC and DIO sharing a pin)."""
"""A resource for a single chip pin, which can be one of several port types (eg, an ADC and DIO sharing a pin).

def __init__(self, pin: str, name_models: Mapping[str, Union[Passive, HasPassivePort]]):
Generally, this is initially created with pin = pin name (like GPIO0), then remapped so the pin = pin number,
with the pinname inheriting the prior pin name."""

def __init__(
self, pin: str, name_models: Mapping[str, Union[Passive, HasPassivePort]], pinname: Optional[str] = None
):
self.pin = pin
self.name_models = name_models
if pinname is not None:
self.pinname = pinname
else:
self.pinname = pin

@override
def __repr__(self) -> str:
return f"PinResource({self.pin}, {self.name_models})"
return f"PinResource({self.pinname}, {self.pin}, {self.name_models})"

@override
def __eq__(self, other: Any) -> bool:
Expand All @@ -79,14 +88,24 @@ class PeripheralFixedPin(BaseLeafPinMapResource):
"""A resource for a peripheral as a bundle port, where the internal ports are fixed. No allocation happens.
The internal port model must be fully defined here."""

def __init__(self, name: str, port_model: Port, inner_allowed_pins: Dict[str, str]):
def __init__(
self,
name: str,
port_model: Port,
inner_allowed_pins: Dict[str, str],
inner_pinnames: Optional[Dict[str, str]] = None,
):
self.name = name
self.port_model = port_model
self.inner_allowed_pins = inner_allowed_pins
if inner_pinnames is not None:
self.inner_pinnames = inner_pinnames
else:
self.inner_pinnames = inner_allowed_pins

@override
def __repr__(self) -> str:
return f"PeripheralFixedPin({self.name}, {self.port_model.__class__.__name__} {self.inner_allowed_pins})"
return f"PeripheralFixedPin({self.name}, {self.port_model.__class__.__name__} {self.inner_allowed_pins} {self.inner_pinnames})"

@override
def __eq__(self, other: Any) -> bool:
Expand All @@ -96,6 +115,7 @@ def __eq__(self, other: Any) -> bool:
and self.name == other.name
and self.port_model is other.port_model
and self.inner_allowed_pins == other.inner_allowed_pins
and self.inner_pinnames == other.inner_pinnames
)


Expand Down Expand Up @@ -274,7 +294,7 @@ def remap_pins(self, pinmap: Dict[str, str]) -> "PinMapUtil":
def remap_resource(resource: BasePinMapResource) -> Optional[BasePinMapResource]:
if isinstance(resource, PinResource):
if resource.pin in pinmap:
return PinResource(pinmap[resource.pin], resource.name_models)
return PinResource(pinmap[resource.pin], resource.name_models, resource.pinname)
else:
return None
elif isinstance(resource, PeripheralFixedPin):
Expand All @@ -283,7 +303,7 @@ def remap_resource(resource: BasePinMapResource) -> Optional[BasePinMapResource]
for elt_name, elt_pin in resource.inner_allowed_pins.items()
if elt_pin in pinmap
}
return PeripheralFixedPin(resource.name, resource.port_model, remapped_pins)
return PeripheralFixedPin(resource.name, resource.port_model, remapped_pins, resource.inner_pinnames)
elif isinstance(resource, BaseDelegatingPinMapResource):
return resource
else:
Expand All @@ -308,6 +328,44 @@ def resource_pin(resource: BasePinMapResource) -> List[str]:

return PinMapUtil(remapped_resources, self.transforms)

def filter_pins(self, allowed_pins: List[str]) -> "PinMapUtil":
"""Returns a new PinMapUtil with only the specified pins kept.
If the allowed_pins list is empty, returns the input (all pins kept).
allowed_pins may be specified as a pin name or pin number."""

if not allowed_pins:
return self

def filter_resource(resource: BasePinMapResource) -> Optional[BasePinMapResource]:
if isinstance(resource, PinResource):
if resource.pin in allowed_pins or resource.pinname in allowed_pins:
return resource
else:
return None
elif isinstance(resource, PeripheralFixedPin):
filtered_keys = []
for key, pin in resource.inner_allowed_pins.items():
if pin in allowed_pins or resource.inner_pinnames[key] in allowed_pins:
filtered_keys.append(key)
if filtered_keys:
return PeripheralFixedPin(
resource.name,
resource.port_model,
{k: v for k, v in resource.inner_allowed_pins.items() if k in filtered_keys},
{k: v for k, v in resource.inner_pinnames.items() if k in filtered_keys},
)
else:
return None
elif isinstance(resource, BaseDelegatingPinMapResource):
return resource
else:
raise NotImplementedError(f"unknown resource {resource}")

filtered_resources_raw = [filter_resource(resource) for resource in self.resources]
filtered_resources = [resource for resource in filtered_resources_raw if resource is not None]

return PinMapUtil(filtered_resources, self.transforms)

@staticmethod
def _resource_port_types(resource: BasePinMapResource) -> List[Type[Port]]:
if isinstance(resource, PinResource):
Expand Down Expand Up @@ -363,7 +421,7 @@ def try_allocate_resource(
if isinstance(resource, PinResource): # single pin: just assign it
sub_assignments.check_empty()
resource_name, resource_model = resource.get_name_model_for_type(port_type)
allocated_resource = AllocatedResource(resource_model, port_name, resource_name, resource.pin)
allocated_resource = AllocatedResource(resource_model, port_name, resource.pinname, resource.pin)
return allocated_resource
elif isinstance(resource, PeripheralFixedPin): # fixed pin: check user-assignment, or assign first
inner_pin_map: Dict[str, Tuple[str, Optional[str]]] = {}
Expand All @@ -372,7 +430,7 @@ def try_allocate_resource(
if inner_assignment is not None and inner_assignment != inner_pin:
raise BadUserAssignError(f"invalid assignment to {port_name}.{inner_name}: {inner_assignment}")

inner_pin_map[inner_name] = (inner_pin, None)
inner_pin_map[inner_name] = (inner_pin, resource.inner_pinnames[inner_name])
inner_sub_assignments.check_empty()

sub_assignments.check_empty()
Expand Down
2 changes: 1 addition & 1 deletion edg/abstract_parts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@

from .IoController import BaseIoController, IoController, IoControllerPowerRequired, BaseIoControllerPinmapGenerator
from .IoControllerExportable import BaseIoControllerExportable
from .IoControllerWrapped import IoControllerWrapped
from .BaseIoControllerWrapped import BaseIoControllerWrapped
from .IoControllerInterfaceMixins import (
IoControllerSpiPeripheral,
IoControllerI2cTarget,
Expand Down
Loading