From 6d172394a2acf9986c8ec4119b09d861f503e60c Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 15 Jun 2026 22:39:03 -0400 Subject: [PATCH] refactor: prune dead provider helpers and collapse SlotEntities builders * `providers/_util.py`: delete `make_legacy_tagged_name` (zero production callers) and inline `parse_tag_with_rewrite` into `parse_tag` (single caller, vestigial `needs_rewrite` signal). The legacy `[LCM:]` format is now read-only -- `parse_tag` still recognizes it so older lock-stored names continue to identify as LCM-owned until the next write rewrites them. * `websocket.py`: extract `_build_slot_entities()` so the per-lock iterator (`_get_slot_entity_ids`) and the per-slot subscription resolver share one unique_id formula and one `SlotEntities` shape. `_get_slot_metadata()` now takes pre-resolved entities, removing the duplicate registry walk in `_serialize_lock_coordinator`. Drops the thin `_get_slot_entity_data` wrapper -- its only caller now calls `_build_slot_entities` directly. * Tests: port the rich match-priority cases from `TestParseTagWithRewrite` onto `TestParseTag` (adapted to 2-tuple); switch `test_websocket` mocks from `_get_slot_entity_data` to `_build_slot_entities`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../lock_code_manager/providers/_util.py | 89 +++++--------- .../lock_code_manager/websocket.py | 103 ++++++++--------- tests/providers/test_util.py | 109 ++++++------------ tests/test_websocket.py | 14 ++- 4 files changed, 114 insertions(+), 201 deletions(-) diff --git a/custom_components/lock_code_manager/providers/_util.py b/custom_components/lock_code_manager/providers/_util.py index 48a382411..ebcaa9170 100644 --- a/custom_components/lock_code_manager/providers/_util.py +++ b/custom_components/lock_code_manager/providers/_util.py @@ -7,7 +7,8 @@ and -- once the unified-user migration lands -- Matter and Z-Wave User Credential CC). -Four tag formats coexist during the migration window: +Three formats are emitted today; a fourth (legacy) is still tolerated on +read for locks whose names were written by older releases. * Canonical ``lcm::`` -- the consolidated format every new write uses when the lock's ``max_user_name_length`` can fit the full @@ -27,13 +28,11 @@ digits, but only encountered on locks with absurdly small name limits or firmwares that reject even ``lcm``, so the tradeoff is accepted. -* Legacy ``[LCM:] `` -- written by Schlage/Akuvox today. - Still emitted by ``make_legacy_tagged_name`` while those providers - migrate; their reads tolerate it via ``parse_tag_with_rewrite``, which - signals a legacy match so the caller can rewrite the lock-stored name - in place at the next write opportunity. The legacy emitter is the - deprecation marker -- ``_legacy_`` in the name is intentional so - reviewers see at the call site that the writer hasn't migrated yet. +* Legacy ``[LCM:] `` -- read-only. Older Schlage/Akuvox + releases wrote this format; ``parse_tag`` still recognizes it so the + integration can identify its own users on locks that haven't been + rewritten yet. The next write naturally replaces the lock-stored name + with the canonical format. """ from __future__ import annotations @@ -65,65 +64,33 @@ def make_compact_tagged_name(slot_num: int) -> str: return f"lcm{slot_num}" -def make_legacy_tagged_name(slot_num: int, name: str | None = None) -> str: - """ - Return a code name in the legacy ``[LCM:]`` format. - - Deprecated: kept for Schlage/Akuvox until they migrate to the - canonical format with read-time detect-and-rewrite. New call sites - should use ``make_tagged_name``. - """ - base = name or f"Code Slot {slot_num}" - return f"[LCM:{slot_num}] {base}" - - def parse_tag(name: str) -> tuple[int | None, str]: """ - Parse a Lock Code Manager slot tag, tolerant of both formats. - - Returns ``(slot_num, friendly_name)`` when either format matches, or - ``(None, original_name)`` otherwise. Callers that care about - rewriting legacy tags in place should use ``parse_tag_with_rewrite`` - instead. - """ - slot, friendly, _ = parse_tag_with_rewrite(name) - return slot, friendly - - -def parse_tag_with_rewrite(name: str) -> tuple[int | None, str, bool]: - """ - Parse a slot tag and signal whether it needs format migration. - - Returns ``(slot_num, friendly_name, needs_rewrite)``. ``needs_rewrite`` - is True only when the legacy ``[LCM:]`` format matched; the - caller can re-emit via ``make_tagged_name`` on the next write to - migrate the lock-stored name in place. The canonical - ``lcm::`` format, the slot-only fallback, and untagged names - all return ``needs_rewrite=False``. + Parse a Lock Code Manager slot tag, tolerant of all known formats. - Match priority: canonical, then legacy, then compact, then - slot-only digits. The compact and slot-only branches are + Returns ``(slot_num, friendly_name)`` when any format matches, or + ``(None, original_name)`` otherwise. Match priority: canonical, + legacy, compact, slot-only. The compact and slot-only branches are charset-/length-constrained fallbacks emitted by Matter (and eventually other providers) when the lock rejects the canonical - name. The display portion is empty for both fallbacks because the - name only carries the slot binding. Bare digits being treated as - a slot tag is intentional but ambiguous with external users whose - names happen to be digit-only -- the ambiguity is the cost of - preserving the slot binding on constrained locks. + name; their display portion is empty because the name only carries + the slot binding. The legacy ``[LCM:]`` format is read-only + -- nothing emits it anymore -- but is still recognized so older + lock-stored names continue to identify as LCM-owned until the next + write rewrites them in the canonical format. Bare digits being + treated as a slot tag is intentional but ambiguous with external + users whose names happen to be digit-only; the ambiguity is the + cost of preserving the slot binding on constrained locks. """ - match = _TAG_RE.match(name) - if match: - return int(match.group(1)), match.group(2), False - match = _LEGACY_SLOT_TAG_RE.match(name) - if match: - return int(match.group(1)), match.group(2), True - match = _COMPACT_TAG_RE.match(name) - if match: - return int(match.group(1)), "", False - match = _SLOT_ONLY_RE.match(name) - if match: - return int(match.group(1)), "", False - return None, name, False + if match := _TAG_RE.match(name): + return int(match.group(1)), match.group(2) + if match := _LEGACY_SLOT_TAG_RE.match(name): + return int(match.group(1)), match.group(2) + if match := _COMPACT_TAG_RE.match(name): + return int(match.group(1)), "" + if match := _SLOT_ONLY_RE.match(name): + return int(match.group(1)), "" + return None, name def parse_slot_num(value: object) -> int | None: diff --git a/custom_components/lock_code_manager/websocket.py b/custom_components/lock_code_manager/websocket.py index ad99cc440..00b2d60fb 100644 --- a/custom_components/lock_code_manager/websocket.py +++ b/custom_components/lock_code_manager/websocket.py @@ -484,47 +484,56 @@ class SlotMetadata: enabled: bool | None = None +def _build_slot_entities( + ent_reg: er.EntityRegistry, entry_id: str, slot_num: int +) -> SlotEntities: + """ + Build a fully-populated SlotEntities for a given (entry, slot) pair. + + Both the per-lock iterator (``_get_slot_entity_ids``) and the + per-slot subscription resolver go through this primitive so the + ``unique_id`` formula and the field list live in exactly one place. + """ + + def _id(domain: str, key: str) -> str | None: + return ent_reg.async_get_entity_id( + domain, DOMAIN, _slot_unique_id(entry_id, slot_num, key) + ) + + return SlotEntities( + slot_num=slot_num, + config_entry_id=entry_id, + name_entity_id=_id(TEXT_DOMAIN, CONF_NAME), + pin_entity_id=_id(TEXT_DOMAIN, CONF_PIN), + enabled_entity_id=_id(SWITCH_DOMAIN, CONF_ENABLED), + active_entity_id=_id(BINARY_SENSOR_DOMAIN, ATTR_ACTIVE), + event_entity_id=_id(EVENT_DOMAIN, EVENT_PIN_USED), + ) + + def _get_slot_entity_ids( hass: HomeAssistant, lock_entity_id: str ) -> dict[int, SlotEntities]: - """Return a dict of slot number to SlotEntities for the four primary per-slot entities.""" - slot_entities: dict[int, SlotEntities] = {} + """Return a dict of slot number to SlotEntities for every slot LCM manages on a lock.""" ent_reg = er.async_get(hass) - - for entry in hass.config_entries.async_entries(DOMAIN): - config = get_entry_config(entry) - if not config.has_lock(lock_entity_id): - continue - - for slot_int in config.slots: - - def _entity_id( - domain: str, - key: str, - entry_id: str = entry.entry_id, - slot: int = slot_int, - ) -> str | None: - return ent_reg.async_get_entity_id( - domain, DOMAIN, _slot_unique_id(entry_id, slot, key) - ) - - slot_entities[slot_int] = SlotEntities( - slot_num=slot_int, - config_entry_id=entry.entry_id, - name_entity_id=_entity_id(TEXT_DOMAIN, CONF_NAME), - pin_entity_id=_entity_id(TEXT_DOMAIN, CONF_PIN), - active_entity_id=_entity_id(BINARY_SENSOR_DOMAIN, ATTR_ACTIVE), - enabled_entity_id=_entity_id(SWITCH_DOMAIN, CONF_ENABLED), - ) - - return slot_entities + return { + slot_int: _build_slot_entities(ent_reg, entry.entry_id, slot_int) + for entry in hass.config_entries.async_entries(DOMAIN) + if get_entry_config(entry).has_lock(lock_entity_id) + for slot_int in get_entry_config(entry).slots + } def _get_slot_metadata( - hass: HomeAssistant, lock_entity_id: str + hass: HomeAssistant, slot_entity_ids: dict[int, SlotEntities] ) -> dict[int, SlotMetadata]: - """Return a dict of slot number to SlotMetadata for all slots LCM manages on a lock.""" - slot_entities = _get_slot_entity_ids(hass, lock_entity_id) + """ + Derive SlotMetadata for each slot from pre-resolved SlotEntities. + + Callers already need ``slot_entity_ids`` for other purposes, so + passing it in avoids re-walking the entity registry just to rebuild + the same dict. + """ return { slot_num: SlotMetadata( name=_get_text_state(hass, ids.name_entity_id), @@ -532,7 +541,7 @@ def _get_slot_metadata( active=_get_bool_state(hass, ids.active_entity_id), enabled=_get_bool_state(hass, ids.enabled_entity_id), ) - for slot_num, ids in slot_entities.items() + for slot_num, ids in slot_entity_ids.items() } @@ -568,8 +577,8 @@ def _serialize_lock_coordinator( coordinator = lock.coordinator data = coordinator.data if coordinator is not None else {} managed_slots = get_managed_slots(hass, lock.lock.entity_id) - slot_metadata = _get_slot_metadata(hass, lock.lock.entity_id) slot_entity_ids = _get_slot_entity_ids(hass, lock.lock.entity_id) + slot_metadata = _get_slot_metadata(hass, slot_entity_ids) slots = [] for slot, code in sorted(data.items()): @@ -679,28 +688,6 @@ def _unsub_all() -> None: _send_update() -def _get_slot_entity_data( - hass: HomeAssistant, config_entry: ConfigEntry, slot_num: int -) -> SlotEntities: - """Get entity IDs for a specific slot.""" - ent_reg = er.async_get(hass) - entry_id = config_entry.entry_id - - def _get_entity_id(domain: str, key: str) -> str | None: - return ent_reg.async_get_entity_id( - domain, DOMAIN, _slot_unique_id(entry_id, slot_num, key) - ) - - return SlotEntities( - slot_num=slot_num, - name_entity_id=_get_entity_id(TEXT_DOMAIN, CONF_NAME), - pin_entity_id=_get_entity_id(TEXT_DOMAIN, CONF_PIN), - enabled_entity_id=_get_entity_id(SWITCH_DOMAIN, CONF_ENABLED), - active_entity_id=_get_entity_id(BINARY_SENSOR_DOMAIN, ATTR_ACTIVE), - event_entity_id=_get_entity_id(EVENT_DOMAIN, EVENT_PIN_USED), - ) - - def _get_slot_in_sync_entity_ids( hass: HomeAssistant, config_entry: ConfigEntry, slot_num: int ) -> dict[str, str]: @@ -1030,7 +1017,7 @@ async def subscribe_code_slot( def _resolve_entity_ids() -> tuple[SlotEntities, dict[str, str], str | None]: """Resolve current entity IDs for this slot from the entity registry.""" return ( - _get_slot_entity_data(hass, config_entry, slot_num), + _build_slot_entities(er.async_get(hass), config_entry.entry_id, slot_num), _get_slot_in_sync_entity_ids(hass, config_entry, slot_num), _get_slot_condition_entity_id(config_entry, slot_num), ) diff --git a/tests/providers/test_util.py b/tests/providers/test_util.py index d80f2a897..7c25992e4 100644 --- a/tests/providers/test_util.py +++ b/tests/providers/test_util.py @@ -6,11 +6,9 @@ from custom_components.lock_code_manager.providers._util import ( make_compact_tagged_name, - make_legacy_tagged_name, make_tagged_name, parse_slot_num, parse_tag, - parse_tag_with_rewrite, ) @@ -46,99 +44,76 @@ def test_make_compact_tagged_name(self, slot: int, expected: str) -> None: assert make_compact_tagged_name(slot) == expected -class TestMakeLegacyTaggedName: - """Deprecated ``[LCM:]`` builder; preserved for Schlage/Akuvox writes.""" - - @pytest.mark.parametrize( - ("slot", "name", "expected"), - [ - pytest.param(1, "Guest", "[LCM:1] Guest", id="with-name"), - pytest.param(5, None, "[LCM:5] Code Slot 5", id="default-name"), - pytest.param( - 99, "Family Member", "[LCM:99] Family Member", id="multi-word" - ), - ], - ) - def test_make_legacy_tagged_name( - self, slot: int, name: str | None, expected: str - ) -> None: - assert make_legacy_tagged_name(slot, name) == expected - - -class TestParseTagWithRewrite: - """Tolerant parser that accepts both formats and flags legacy matches.""" +class TestParseTag: + """Tolerant parser that accepts every format LCM has ever written.""" @pytest.mark.parametrize( ("input_name", "expected"), [ - # New format -- no rewrite needed. - pytest.param("lcm:1:Guest", (1, "Guest", False), id="new-simple"), - pytest.param("lcm:255:Family", (255, "Family", False), id="new-large-slot"), + # Canonical ``lcm::``. + pytest.param("lcm:1:Guest", (1, "Guest"), id="canonical-simple"), + pytest.param("lcm:255:Family", (255, "Family"), id="canonical-large-slot"), pytest.param( - "lcm:7:Bob Jones", (7, "Bob Jones", False), id="new-multi-word" + "lcm:7:Bob Jones", (7, "Bob Jones"), id="canonical-multi-word" ), - pytest.param("lcm:5:", (5, "", False), id="new-empty-display"), + pytest.param("lcm:5:", (5, ""), id="canonical-empty-display"), pytest.param( "lcm:5: Alice", - (5, "Alice", False), - id="new-trims-whitespace-after-colon", + (5, "Alice"), + id="canonical-trims-whitespace-after-colon", ), pytest.param( "lcm:5: Alice", - (5, "Alice", False), - id="new-trims-multiple-whitespace-after-colon", + (5, "Alice"), + id="canonical-trims-multiple-whitespace-after-colon", ), pytest.param( "lcm:5:lcm6:nested", - (5, "lcm6:nested", False), - id="new-display-looks-like-tag", - ), - # Legacy format -- rewrite needed. - pytest.param("[LCM:1] Guest", (1, "Guest", True), id="legacy-simple"), - pytest.param( - "[LCM:99] Family", (99, "Family", True), id="legacy-large-slot" + (5, "lcm6:nested"), + id="canonical-display-looks-like-tag", ), + # Legacy ``[LCM:] `` -- read-only. + pytest.param("[LCM:1] Guest", (1, "Guest"), id="legacy-simple"), + pytest.param("[LCM:99] Family", (99, "Family"), id="legacy-large-slot"), pytest.param( - "[LCM:5]Tight", (5, "Tight", True), id="legacy-no-space-after-bracket" + "[LCM:5]Tight", (5, "Tight"), id="legacy-no-space-after-bracket" ), # Compact fallback -- written when the lock firmware # rejects the colons in the canonical prefix. - pytest.param("lcm1", (1, "", False), id="compact-single-digit"), - pytest.param("lcm42", (42, "", False), id="compact-multi-digit"), - pytest.param("lcm255", (255, "", False), id="compact-max-slot"), + pytest.param("lcm1", (1, ""), id="compact-single-digit"), + pytest.param("lcm42", (42, ""), id="compact-multi-digit"), + pytest.param("lcm255", (255, ""), id="compact-max-slot"), # Slot-only fallback -- written when neither canonical nor # compact tier survives the lock's constraints. - pytest.param("5", (5, "", False), id="slot-only-single-digit"), - pytest.param("255", (255, "", False), id="slot-only-multi-digit"), - # Untagged or malformed -- no match, no rewrite. - pytest.param("Guest Code", (None, "Guest Code", False), id="untagged"), - pytest.param("", (None, "", False), id="empty-string"), + pytest.param("5", (5, ""), id="slot-only-single-digit"), + pytest.param("255", (255, ""), id="slot-only-multi-digit"), + # Untagged or malformed -- no match. + pytest.param("Guest Code", (None, "Guest Code"), id="untagged"), + pytest.param("", (None, ""), id="empty-string"), pytest.param( "lcm 5:Alice", - (None, "lcm 5:Alice", False), + (None, "lcm 5:Alice"), id="space-after-prefix-rejected", ), pytest.param( "lcm-5:Alice", - (None, "lcm-5:Alice", False), + (None, "lcm-5:Alice"), id="negative-slot-rejected", ), pytest.param( ":lcm5:Alice", - (None, ":lcm5:Alice", False), + (None, ":lcm5:Alice"), id="leading-colon-rejected", ), pytest.param( "LCM5:Alice", - (None, "LCM5:Alice", False), + (None, "LCM5:Alice"), id="uppercase-prefix-rejected", ), ], ) - def test_parse_tag_with_rewrite( - self, input_name: str, expected: tuple[int | None, str, bool] - ) -> None: - assert parse_tag_with_rewrite(input_name) == expected + def test_parse_tag(self, input_name: str, expected: tuple[int | None, str]) -> None: + assert parse_tag(input_name) == expected def test_compact_regex_does_not_match_canonical(self) -> None: """Anchored regexes prevent the compact arm from claiming a canonical match. @@ -149,7 +124,7 @@ def test_compact_regex_does_not_match_canonical(self) -> None: edit that relaxes the anchor (or drops it entirely) gets caught. """ - assert parse_tag_with_rewrite("lcm:5:Alice") == (5, "Alice", False) + assert parse_tag("lcm:5:Alice") == (5, "Alice") def test_canonical_prefix_wins_when_display_contains_legacy_text(self) -> None: """The canonical prefix is consumed first; the display is taken verbatim. @@ -162,25 +137,7 @@ def test_canonical_prefix_wins_when_display_contains_legacy_text(self) -> None: (including the literal ``[LCM:6] x``) is returned as the friendly name without further parsing. """ - assert parse_tag_with_rewrite("lcm:5:[LCM:6] x") == (5, "[LCM:6] x", False) - - -class TestParseTag: - """Thin 2-tuple wrapper -- drops the rewrite flag.""" - - @pytest.mark.parametrize( - ("input_name", "expected"), - [ - pytest.param("lcm:5:Alice", (5, "Alice"), id="new-format"), - pytest.param("[LCM:5] Alice", (5, "Alice"), id="legacy-format"), - pytest.param("lcm42", (42, ""), id="compact"), - pytest.param("42", (42, ""), id="slot-only"), - pytest.param("Alice", (None, "Alice"), id="untagged"), - pytest.param("", (None, ""), id="empty"), - ], - ) - def test_parse_tag(self, input_name: str, expected: tuple[int | None, str]) -> None: - assert parse_tag(input_name) == expected + assert parse_tag("lcm:5:[LCM:6] x") == (5, "[LCM:6] x") class TestParseSlotNum: diff --git a/tests/test_websocket.py b/tests/test_websocket.py index 546e59edb..313d2c79e 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -69,13 +69,13 @@ from custom_components.lock_code_manager.providers import BaseLock from custom_components.lock_code_manager.websocket import ( SlotEntities, + _build_slot_entities, _find_config_entry_by_title, _get_bool_state, _get_condition_entity_data, _get_last_changed, _get_next_calendar_event, _get_slot_condition_entity_id, - _get_slot_entity_data, _get_slot_state_entity_ids, _get_text_state, _serialize_slot, @@ -2568,7 +2568,9 @@ async def test_subscribe_code_slot_entity_tracking_refreshes_on_update( ws_client = await hass_ws_client(hass) # Get real entity data for the initial call - real_entity_data = _get_slot_entity_data(hass, lock_code_manager_config_entry, 1) + real_entity_data = _build_slot_entities( + er.async_get(hass), lock_code_manager_config_entry.entry_id, 1 + ) # Create a synthetic new entity that will appear on subsequent calls new_entity_id = "text.mock_title_code_slot_1_extra" @@ -2577,7 +2579,7 @@ async def test_subscribe_code_slot_entity_tracking_refreshes_on_update( counter = {"calls": 0} - def _mock_get_slot_entity_data(hass_arg, config_entry_arg, slot_num_arg): + def _mock_build_slot_entities(ent_reg_arg, entry_id_arg, slot_num_arg): """Return growing entity data to simulate entities appearing.""" counter["calls"] += 1 if counter["calls"] <= 1: @@ -2594,8 +2596,8 @@ def _mock_get_slot_entity_data(hass_arg, config_entry_arg, slot_num_arg): ) with patch( - "custom_components.lock_code_manager.websocket._get_slot_entity_data", - side_effect=_mock_get_slot_entity_data, + "custom_components.lock_code_manager.websocket._build_slot_entities", + side_effect=_mock_build_slot_entities, ): await ws_client.send_json( { @@ -2815,7 +2817,7 @@ async def test_subscribe_code_slot_unsub_all_with_empty_state_ref( with ( patch( - "custom_components.lock_code_manager.websocket._get_slot_entity_data", + "custom_components.lock_code_manager.websocket._build_slot_entities", return_value=empty_entity_data, ), patch(