Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 28 additions & 61 deletions custom_components/lock_code_manager/providers/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
and -- once the unified-user migration lands -- Matter and Z-Wave User
Credential CC).

Four tag formats coexist during the migration window:
Three formats are emitted today; a fourth (legacy) is still tolerated on
read for locks whose names were written by older releases.

* Canonical ``lcm:<slot>:<name>`` -- the consolidated format every new
write uses when the lock's ``max_user_name_length`` can fit the full
Expand All @@ -27,13 +28,11 @@
digits, but only encountered on locks with absurdly small name limits
or firmwares that reject even ``lcm<slot>``, so the tradeoff is
accepted.
* Legacy ``[LCM:<slot>] <name>`` -- written by Schlage/Akuvox today.
Still emitted by ``make_legacy_tagged_name`` while those providers
migrate; their reads tolerate it via ``parse_tag_with_rewrite``, which
signals a legacy match so the caller can rewrite the lock-stored name
in place at the next write opportunity. The legacy emitter is the
deprecation marker -- ``_legacy_`` in the name is intentional so
reviewers see at the call site that the writer hasn't migrated yet.
* Legacy ``[LCM:<slot>] <name>`` -- read-only. Older Schlage/Akuvox
releases wrote this format; ``parse_tag`` still recognizes it so the
integration can identify its own users on locks that haven't been
rewritten yet. The next write naturally replaces the lock-stored name
with the canonical format.
"""

from __future__ import annotations
Expand Down Expand Up @@ -65,65 +64,33 @@ def make_compact_tagged_name(slot_num: int) -> str:
return f"lcm{slot_num}"


def make_legacy_tagged_name(slot_num: int, name: str | None = None) -> str:
"""
Return a code name in the legacy ``[LCM:<slot>]`` format.

Deprecated: kept for Schlage/Akuvox until they migrate to the
canonical format with read-time detect-and-rewrite. New call sites
should use ``make_tagged_name``.
"""
base = name or f"Code Slot {slot_num}"
return f"[LCM:{slot_num}] {base}"


def parse_tag(name: str) -> tuple[int | None, str]:
"""
Parse a Lock Code Manager slot tag, tolerant of both formats.

Returns ``(slot_num, friendly_name)`` when either format matches, or
``(None, original_name)`` otherwise. Callers that care about
rewriting legacy tags in place should use ``parse_tag_with_rewrite``
instead.
"""
slot, friendly, _ = parse_tag_with_rewrite(name)
return slot, friendly


def parse_tag_with_rewrite(name: str) -> tuple[int | None, str, bool]:
"""
Parse a slot tag and signal whether it needs format migration.

Returns ``(slot_num, friendly_name, needs_rewrite)``. ``needs_rewrite``
is True only when the legacy ``[LCM:<slot>]`` format matched; the
caller can re-emit via ``make_tagged_name`` on the next write to
migrate the lock-stored name in place. The canonical
``lcm:<slot>:`` format, the slot-only fallback, and untagged names
all return ``needs_rewrite=False``.
Parse a Lock Code Manager slot tag, tolerant of all known formats.

Match priority: canonical, then legacy, then compact, then
slot-only digits. The compact and slot-only branches are
Returns ``(slot_num, friendly_name)`` when any format matches, or
``(None, original_name)`` otherwise. Match priority: canonical,
legacy, compact, slot-only. The compact and slot-only branches are
charset-/length-constrained fallbacks emitted by Matter (and
eventually other providers) when the lock rejects the canonical
name. The display portion is empty for both fallbacks because the
name only carries the slot binding. Bare digits being treated as
a slot tag is intentional but ambiguous with external users whose
names happen to be digit-only -- the ambiguity is the cost of
preserving the slot binding on constrained locks.
name; their display portion is empty because the name only carries
the slot binding. The legacy ``[LCM:<slot>]`` format is read-only
-- nothing emits it anymore -- but is still recognized so older
lock-stored names continue to identify as LCM-owned until the next
write rewrites them in the canonical format. Bare digits being
treated as a slot tag is intentional but ambiguous with external
users whose names happen to be digit-only; the ambiguity is the
cost of preserving the slot binding on constrained locks.
"""
match = _TAG_RE.match(name)
if match:
return int(match.group(1)), match.group(2), False
match = _LEGACY_SLOT_TAG_RE.match(name)
if match:
return int(match.group(1)), match.group(2), True
match = _COMPACT_TAG_RE.match(name)
if match:
return int(match.group(1)), "", False
match = _SLOT_ONLY_RE.match(name)
if match:
return int(match.group(1)), "", False
return None, name, False
if match := _TAG_RE.match(name):
return int(match.group(1)), match.group(2)
if match := _LEGACY_SLOT_TAG_RE.match(name):
return int(match.group(1)), match.group(2)
if match := _COMPACT_TAG_RE.match(name):
return int(match.group(1)), ""
if match := _SLOT_ONLY_RE.match(name):
return int(match.group(1)), ""
return None, name


def parse_slot_num(value: object) -> int | None:
Expand Down
103 changes: 45 additions & 58 deletions custom_components/lock_code_manager/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,55 +484,64 @@ class SlotMetadata:
enabled: bool | None = None


def _build_slot_entities(
ent_reg: er.EntityRegistry, entry_id: str, slot_num: int
) -> SlotEntities:
"""
Build a fully-populated SlotEntities for a given (entry, slot) pair.

Both the per-lock iterator (``_get_slot_entity_ids``) and the
per-slot subscription resolver go through this primitive so the
``unique_id`` formula and the field list live in exactly one place.
"""

def _id(domain: str, key: str) -> str | None:
return ent_reg.async_get_entity_id(
domain, DOMAIN, _slot_unique_id(entry_id, slot_num, key)
)

return SlotEntities(
slot_num=slot_num,
config_entry_id=entry_id,
name_entity_id=_id(TEXT_DOMAIN, CONF_NAME),
pin_entity_id=_id(TEXT_DOMAIN, CONF_PIN),
enabled_entity_id=_id(SWITCH_DOMAIN, CONF_ENABLED),
active_entity_id=_id(BINARY_SENSOR_DOMAIN, ATTR_ACTIVE),
event_entity_id=_id(EVENT_DOMAIN, EVENT_PIN_USED),
)


def _get_slot_entity_ids(
hass: HomeAssistant, lock_entity_id: str
) -> dict[int, SlotEntities]:
"""Return a dict of slot number to SlotEntities for the four primary per-slot entities."""
slot_entities: dict[int, SlotEntities] = {}
"""Return a dict of slot number to SlotEntities for every slot LCM manages on a lock."""
ent_reg = er.async_get(hass)

for entry in hass.config_entries.async_entries(DOMAIN):
config = get_entry_config(entry)
if not config.has_lock(lock_entity_id):
continue

for slot_int in config.slots:

def _entity_id(
domain: str,
key: str,
entry_id: str = entry.entry_id,
slot: int = slot_int,
) -> str | None:
return ent_reg.async_get_entity_id(
domain, DOMAIN, _slot_unique_id(entry_id, slot, key)
)

slot_entities[slot_int] = SlotEntities(
slot_num=slot_int,
config_entry_id=entry.entry_id,
name_entity_id=_entity_id(TEXT_DOMAIN, CONF_NAME),
pin_entity_id=_entity_id(TEXT_DOMAIN, CONF_PIN),
active_entity_id=_entity_id(BINARY_SENSOR_DOMAIN, ATTR_ACTIVE),
enabled_entity_id=_entity_id(SWITCH_DOMAIN, CONF_ENABLED),
)

return slot_entities
return {
slot_int: _build_slot_entities(ent_reg, entry.entry_id, slot_int)
for entry in hass.config_entries.async_entries(DOMAIN)
if get_entry_config(entry).has_lock(lock_entity_id)
for slot_int in get_entry_config(entry).slots
}


def _get_slot_metadata(
hass: HomeAssistant, lock_entity_id: str
hass: HomeAssistant, slot_entity_ids: dict[int, SlotEntities]
) -> dict[int, SlotMetadata]:
"""Return a dict of slot number to SlotMetadata for all slots LCM manages on a lock."""
slot_entities = _get_slot_entity_ids(hass, lock_entity_id)
"""
Derive SlotMetadata for each slot from pre-resolved SlotEntities.

Callers already need ``slot_entity_ids`` for other purposes, so
passing it in avoids re-walking the entity registry just to rebuild
the same dict.
"""
return {
slot_num: SlotMetadata(
name=_get_text_state(hass, ids.name_entity_id),
configured_pin=_get_text_state(hass, ids.pin_entity_id),
active=_get_bool_state(hass, ids.active_entity_id),
enabled=_get_bool_state(hass, ids.enabled_entity_id),
)
for slot_num, ids in slot_entities.items()
for slot_num, ids in slot_entity_ids.items()
}


Expand Down Expand Up @@ -568,8 +577,8 @@ def _serialize_lock_coordinator(
coordinator = lock.coordinator
data = coordinator.data if coordinator is not None else {}
managed_slots = get_managed_slots(hass, lock.lock.entity_id)
slot_metadata = _get_slot_metadata(hass, lock.lock.entity_id)
slot_entity_ids = _get_slot_entity_ids(hass, lock.lock.entity_id)
slot_metadata = _get_slot_metadata(hass, slot_entity_ids)

slots = []
for slot, code in sorted(data.items()):
Expand Down Expand Up @@ -679,28 +688,6 @@ def _unsub_all() -> None:
_send_update()


def _get_slot_entity_data(
hass: HomeAssistant, config_entry: ConfigEntry, slot_num: int
) -> SlotEntities:
"""Get entity IDs for a specific slot."""
ent_reg = er.async_get(hass)
entry_id = config_entry.entry_id

def _get_entity_id(domain: str, key: str) -> str | None:
return ent_reg.async_get_entity_id(
domain, DOMAIN, _slot_unique_id(entry_id, slot_num, key)
)

return SlotEntities(
slot_num=slot_num,
name_entity_id=_get_entity_id(TEXT_DOMAIN, CONF_NAME),
pin_entity_id=_get_entity_id(TEXT_DOMAIN, CONF_PIN),
enabled_entity_id=_get_entity_id(SWITCH_DOMAIN, CONF_ENABLED),
active_entity_id=_get_entity_id(BINARY_SENSOR_DOMAIN, ATTR_ACTIVE),
event_entity_id=_get_entity_id(EVENT_DOMAIN, EVENT_PIN_USED),
)


def _get_slot_in_sync_entity_ids(
hass: HomeAssistant, config_entry: ConfigEntry, slot_num: int
) -> dict[str, str]:
Expand Down Expand Up @@ -1030,7 +1017,7 @@ async def subscribe_code_slot(
def _resolve_entity_ids() -> tuple[SlotEntities, dict[str, str], str | None]:
"""Resolve current entity IDs for this slot from the entity registry."""
return (
_get_slot_entity_data(hass, config_entry, slot_num),
_build_slot_entities(er.async_get(hass), config_entry.entry_id, slot_num),
_get_slot_in_sync_entity_ids(hass, config_entry, slot_num),
_get_slot_condition_entity_id(config_entry, slot_num),
)
Expand Down
Loading
Loading