From d6bde25a99d8e8546c99e4c7842697c5750426fb Mon Sep 17 00:00:00 2001 From: 3rd Iteration Date: Wed, 5 Nov 2025 13:48:18 -0300 Subject: [PATCH] Add datum tool integration and helper utilities --- src/seedsigner/gui/screens/tools_screens.py | 117 ++++ src/seedsigner/helpers/datum.py | 403 ++++++++++++ src/seedsigner/views/tools_views.py | 678 +++++++++++++++++++- tests/test_datum_helper.py | 105 +++ 4 files changed, 1296 insertions(+), 7 deletions(-) create mode 100644 src/seedsigner/helpers/datum.py create mode 100644 tests/test_datum_helper.py diff --git a/src/seedsigner/gui/screens/tools_screens.py b/src/seedsigner/gui/screens/tools_screens.py index 602b6d8f4..9ca58b3de 100644 --- a/src/seedsigner/gui/screens/tools_screens.py +++ b/src/seedsigner/gui/screens/tools_screens.py @@ -5,6 +5,7 @@ from gettext import gettext as _ from typing import Any, List from PIL import Image, ImageDraw +import textwrap from seedsigner.helpers import mnemonic_generation from seedsigner.gui.renderer import Renderer from seedsigner.hardware.camera import Camera @@ -29,6 +30,122 @@ def __post_init__(self): super().__post_init__() +@dataclass +class DatumContentScreen(BaseTopNavScreen): + text: str = "" + title: str = _("Datum contents") + + def __post_init__(self): + if not self.title: + self.title = _("Datum contents") + super().__post_init__() + + self.font = Fonts.get_font(GUIConstants.FIXED_WIDTH_FONT_NAME_JP, GUIConstants.get_body_font_size()) + bbox = self.font.getbbox("M") + self.char_width = max(1, bbox[2] - bbox[0]) + self.line_height = max(1, bbox[3] - bbox[1]) + + available_height = self.canvas_height - self.top_nav.height - GUIConstants.EDGE_PADDING + self.visible_lines = max(1, available_height // self.line_height) + text_width = self.canvas_width - 2 * GUIConstants.EDGE_PADDING + self.chars_per_line = max(1, text_width // self.char_width) + + raw_lines = self.text.splitlines() or [""] + self.lines = [] + for paragraph in raw_lines: + if not paragraph: + self.lines.append("") + continue + wrapped = textwrap.wrap( + paragraph, + width=self.chars_per_line, + break_long_words=False, + replace_whitespace=False, + ) + self.lines.extend(wrapped if wrapped else [""]) + + if not self.lines: + self.lines = [""] + + self.total_lines = len(self.lines) + self.top_index = 0 + + with self.renderer.lock: + self._render_page() + + def _render_page(self): + self.clear_screen() + for component in self.components: + component.render() + + y = self.top_nav.height + GUIConstants.EDGE_PADDING + bottom = self.canvas_height - GUIConstants.EDGE_PADDING + for line in self.lines[self.top_index : self.top_index + self.visible_lines]: + if y + self.line_height > bottom: + break + self.renderer.draw.text( + (GUIConstants.EDGE_PADDING, y), + line, + font=self.font, + fill=GUIConstants.BODY_FONT_COLOR, + ) + y += self.line_height + + if self.total_lines > self.visible_lines: + indicator = f"{self.top_index + 1}-{min(self.top_index + self.visible_lines, self.total_lines)}/{self.total_lines}" + indicator_font = Fonts.get_font(GUIConstants.get_button_font_name(), GUIConstants.get_button_font_size()) + indicator_width = indicator_font.getlength(indicator) + indicator_height = indicator_font.getbbox("A")[3] + self.renderer.draw.text( + ( + self.canvas_width - GUIConstants.EDGE_PADDING - indicator_width, + self.canvas_height - GUIConstants.EDGE_PADDING - indicator_height, + ), + indicator, + font=indicator_font, + fill=GUIConstants.SECONDARY_BODY_FONT_COLOR, + ) + + self.renderer.show_image() + + def _run(self): + while True: + user_input = self.hw_inputs.wait_for(HardwareButtonsConstants.ALL_KEYS) + + with self.renderer.lock: + if user_input in ( + HardwareButtonsConstants.KEY_LEFT, + HardwareButtonsConstants.KEY_UP, + ) and not self.top_nav.is_selected: + self.top_nav.is_selected = True + self.top_nav.render_buttons() + self.renderer.show_image() + continue + + if self.top_nav.is_selected: + if user_input in ( + HardwareButtonsConstants.KEY_DOWN, + HardwareButtonsConstants.KEY_RIGHT, + ): + self.top_nav.is_selected = False + self.top_nav.render_buttons() + self.renderer.show_image() + continue + if user_input == HardwareButtonsConstants.KEY_PRESS: + return RET_CODE__BACK_BUTTON + + if user_input == HardwareButtonsConstants.KEY_UP: + if self.top_index > 0: + self.top_index -= 1 + self._render_page() + elif user_input == HardwareButtonsConstants.KEY_DOWN: + if self.top_index + self.visible_lines < self.total_lines: + self.top_index += 1 + self._render_page() + elif user_input in HardwareButtonsConstants.KEYS__ANYCLICK: + return RET_CODE__BACK_BUTTON + + @dataclass class ToolsImageEntropyLivePreviewScreen(BaseScreen): def __post_init__(self): diff --git a/src/seedsigner/helpers/datum.py b/src/seedsigner/helpers/datum.py new file mode 100644 index 000000000..120acdc62 --- /dev/null +++ b/src/seedsigner/helpers/datum.py @@ -0,0 +1,403 @@ +"""Utilities for the Datum Tool. + +This module adapts the Krux Datum Tool helpers for use in SeedSigner. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, List, Sequence, Tuple, Union + +from binascii import hexlify, unhexlify +import base64 + +from embit import base58 +from embit.bech32 import bech32_decode, Encoding + +from urtypes import bytes as ur_bytes +from urtypes import crypto as ur_crypto + +from seedsigner.helpers import kef + +# Datum type identifiers +DATUM_DESCRIPTOR = "DESC" +DATUM_PSBT = "PSBT" +DATUM_XPUB = "XPUB" +DATUM_ADDRESS = "ADDR" + +# UR types that the Datum tool can decode +DATUM_UR_TYPES = { + DATUM_PSBT: ["crypto-psbt"], +} + +# BBQR type hints preserved from the Krux implementation +DATUM_BBQR_TYPES = { + DATUM_DESCRIPTOR: ["U"], + DATUM_PSBT: ["P"], + DATUM_XPUB: ["U"], + DATUM_ADDRESS: ["U"], +} + +STATIC_QR_MAX_SIZE = 4 # version 5 - 37x37 modules +SUFFICIENT_SAMPLE_SIZE = 512 +SLOW_ENCODING_MAX_SIZE = 2 ** 14 + +DatumData = Union[str, bytes] + + +class DatumError(Exception): + """Base error for datum helper failures.""" + + +class DatumConversionError(DatumError): + """Raised when an encoding conversion fails.""" + + +# --------------------------------------------------------------------------- +# Base encoding helpers +# --------------------------------------------------------------------------- + +B43CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ$*+-./:" + + +def _base43_decode(value: str) -> bytes: + long_value = 0 + power = 1 + for char in reversed(value): + digit = B43CHARS.find(char) + if digit == -1: + raise ValueError(f"forbidden character {char} for base 43") + long_value += digit * power + power *= 43 + result = bytearray() + while long_value >= 256: + long_value, mod = divmod(long_value, 256) + result.append(mod) + if long_value: + result.append(long_value) + for char in value: + if char == B43CHARS[0]: + result.append(0) + else: + break + return bytes(reversed(result)) + + +def _base43_encode(value: bytes) -> str: + long_value = 0 + power = 1 + for char in reversed(value): + long_value += power * char + power <<= 8 + result = bytearray() + while long_value >= 43: + long_value, mod = divmod(long_value, 43) + result.extend(B43CHARS[mod].encode()) + if long_value: + result.extend(B43CHARS[long_value].encode()) + for char in value: + if char == 0: + result.extend(B43CHARS[0].encode()) + else: + break + return bytes(reversed(result)).decode() + + +def base_decode(value: str, base: int) -> bytes: + if not isinstance(value, str): + raise TypeError("Invalid value, expected str") + if value == "": + return b"" + if base == 32: + padding = (-len(value)) % 8 + return base64.b32decode(value + "=" * padding, casefold=False) + if base == 43: + return _base43_decode(value) + if base == 58: + return base58.decode(value) + if base == 64: + return base64.b64decode(value) + raise ValueError(f"Unsupported base: {base}") + + +def base_encode(value: bytes, base: int) -> str: + if not isinstance(value, (bytes, bytearray)): + raise TypeError("Invalid value, expected bytes") + if value == b"": + return "" + if base == 32: + return base64.b32encode(value).decode().rstrip("=") + if base == 43: + return _base43_encode(bytes(value)) + if base == 58: + return base58.encode(bytes(value)) + if base == 64: + return base64.b64encode(value).decode() + raise ValueError(f"Unsupported base: {base}") + + +# --------------------------------------------------------------------------- +# UR helpers +# --------------------------------------------------------------------------- + + +def urobj_to_data(ur_obj: Any) -> DatumData | None: + """Extract raw data from a UR object.""" + + if ur_obj.type == "crypto-bip39": + return ur_crypto.BIP39.from_cbor(ur_obj.cbor).words + if ur_obj.type == "crypto-account": + account = ur_crypto.Account.from_cbor(ur_obj.cbor) + return account.output_descriptors[0].descriptor() + if ur_obj.type == "crypto-output": + return ur_crypto.Output.from_cbor(ur_obj.cbor).descriptor() + if ur_obj.type == "crypto-psbt": + return ur_crypto.PSBT.from_cbor(ur_obj.cbor).data + if ur_obj.type == "bytes": + return ur_bytes.Bytes.from_cbor(ur_obj.cbor).data + return None + + +# --------------------------------------------------------------------------- +# Encoding detection and datum identification +# --------------------------------------------------------------------------- + + +def convert_encoding(contents: DatumData, conversion: Union[str, int]) -> DatumData | None: + """Convert between encodings. + + ``conversion`` mirrors the Krux implementation: + * ``"hex"`` or ``"HEX"`` + * ``32``, ``43``, ``58``, ``64`` + * ``"utf8"`` + * ``"shift_case"`` + """ + + from_bytes = isinstance(contents, (bytes, bytearray)) + + try: + if conversion in (32, 43, 58, 64): + if from_bytes: + return base_encode(bytes(contents), conversion) + return base_decode(contents, conversion) + if conversion == "hex": + if from_bytes: + return hexlify(contents).decode() + return unhexlify(contents) + if conversion == "HEX": + if from_bytes: + return hexlify(contents).decode().upper() + return unhexlify(contents) + if conversion == "utf8": + if from_bytes: + return bytes(contents).decode() + return contents.encode() + if conversion == "shift_case" and isinstance(contents, str): + if contents == contents.lower(): + return contents.upper() + if contents == contents.upper(): + return contents.lower() + except Exception as exc: # pragma: no cover - error mapped to None + raise DatumConversionError(str(exc)) from exc + + raise DatumConversionError(f"Unsupported conversion: {conversion}") + + +def detect_encodings(str_data: str, verify: bool = True) -> List[Union[str, int]]: + if not isinstance(str_data, str): + raise TypeError("detect_encodings() expected str") + + encodings: List[Union[str, int]] = [] + sample = str_data[:SUFFICIENT_SAMPLE_SIZE] + if not sample: + return ["utf8"] + + min_chr = min(sample) + max_chr = max(sample) + length = len(str_data) + + if length % 2 == 0 and "0" <= min_chr: + if max_chr <= "F": + if not verify: + encodings.append("HEX") + else: + try: + unhexlify(str_data) + encodings.append("HEX") + except Exception: + pass + elif max_chr <= "f": + if not verify: + encodings.append("hex") + else: + try: + unhexlify(str_data) + encodings.append("hex") + except Exception: + pass + + if "2" <= min_chr and max_chr <= "Z": + if not verify: + encodings.append(32) + else: + try: + base_decode(str_data, 32) + encodings.append(32) + except Exception: + pass + + if length <= SLOW_ENCODING_MAX_SIZE and "0" <= min_chr: + encoding = None + if max_chr <= "Z": + if not verify: + encoding = Encoding.BECH32 + else: + encoding, _, _ = bech32_decode(str_data) + elif max_chr <= "z": + if not verify: + encoding = Encoding.BECH32 + else: + encoding, _, _ = bech32_decode(str_data) + if encoding == Encoding.BECH32: + encodings.append("BECH32" if max_chr <= "Z" else "bech32") + elif encoding == Encoding.BECH32M: + encodings.append("BECH32M" if max_chr <= "Z" else "bech32m") + + if length <= SLOW_ENCODING_MAX_SIZE and "$" <= min_chr and max_chr <= "Z": + if not verify: + encodings.append(43) + else: + try: + base_decode(str_data, 43) + encodings.append(43) + except Exception: + pass + + if length <= SLOW_ENCODING_MAX_SIZE and "1" <= min_chr <= "z": + if not verify: + encodings.append(58) + else: + try: + base_decode(str_data, 58) + encodings.append(58) + except Exception: + pass + + if "+" <= min_chr and max_chr <= "z": + if not verify: + encodings.append(64) + else: + try: + decoded = base_decode(str_data, 64) + if base_encode(decoded, 64) == str_data: + encodings.append(64) + except Exception: + pass + + if ord(max_chr) <= 127: + encodings.append("ascii") + if 128 <= ord(max_chr) <= 255: + encodings.append("latin-1") + + encodings.append("utf8") + return encodings + + +def identify_datum(data: DatumData, encodings: Sequence[Union[str, int]] | None = None) -> str | None: + if isinstance(data, bytes): + if data[:5] == b"psbt\xff": + return DATUM_PSBT + return None + + if len(data) <= 33: + return None + + encodings = list(encodings) if encodings is not None else detect_encodings(data) + + if data[:1] in "xyzYZtuvUV" and data[1:4] == "pub" and 58 in encodings: + return DATUM_XPUB + if ( + data[:1] == "[" + and "]" in data + and data.split("]")[1][:1] in "xyzYZtuvUV" + and data.split("]")[1][1:4] == "pub" + ): + return DATUM_XPUB + if data.split("(")[0] in ("pkh", "sh", "wpkh", "wsh", "tr"): + return DATUM_DESCRIPTOR + if ( + (data[:1] in ("1", "3", "n", "2", "m") and 58 in encodings) + or ( + data[:4].lower() in ("bc1p", "bc1q", "tb1p", "tb1q") + and any( + isinstance(enc, str) and enc.lower().startswith("bech32") + for enc in encodings + ) + ) + ): + return DATUM_ADDRESS + return None + + +# --------------------------------------------------------------------------- +# Analysis helpers +# --------------------------------------------------------------------------- + + +@dataclass +class DatumAnalysis: + contents: DatumData + about_prefix: str + about: str + encodings: List[Union[str, int]] + datum: str | None + sensitive: bool + + def preview(self) -> str: + if isinstance(self.contents, bytes): + return "0x" + hexlify(self.contents[:SUFFICIENT_SAMPLE_SIZE]).decode() + return self.contents[:SUFFICIENT_SAMPLE_SIZE] + + +def analyze_contents(contents: DatumData) -> DatumAnalysis: + if isinstance(contents, bytes): + about_prefix = "binary:" + about = f"{about_prefix} {len(contents)} bytes" + encodings: List[Union[str, int]] = [] + sensitive = len(contents) in (16, 32) and max(contents) > 127 + datum = identify_datum(contents) + return DatumAnalysis(contents, about_prefix, about, encodings, datum, sensitive) + + encodings = detect_encodings(contents) + about_prefix = "text:" + about = f"{about_prefix} {len(contents)} chars" + words = contents[:256].split() + sensitive = len(words) in (12, 24) + if not sensitive and len(contents) in (12 * 4, 24 * 4): + if all(c in "0123456789" for c in contents): + sensitive = True + datum = identify_datum(contents, encodings) + return DatumAnalysis(contents, about_prefix, about, encodings, datum, sensitive) + + +# --------------------------------------------------------------------------- +# KEF helpers +# --------------------------------------------------------------------------- + + +@dataclass +class KEFMetadata: + label: bytes + version: int + iterations: int + + def display_label(self) -> str: + try: + return self.label.decode() + except UnicodeDecodeError: + return "0x" + hexlify(self.label).decode() + + +def unwrap_kef_envelope(payload: bytes) -> Tuple[KEFMetadata, bytes]: + label, version, iterations, ciphertext = kef.unwrap(payload) + metadata = KEFMetadata(label=label, version=version, iterations=iterations) + return metadata, ciphertext diff --git a/src/seedsigner/views/tools_views.py b/src/seedsigner/views/tools_views.py index c6bc1ab57..2f80a684e 100644 --- a/src/seedsigner/views/tools_views.py +++ b/src/seedsigner/views/tools_views.py @@ -1,11 +1,14 @@ import hashlib +import json import logging import os +import re import time import platform import binascii import subprocess from pathlib import Path +from typing import Optional from embit.util import secp256k1 from embit.psbt import PSBT @@ -26,18 +29,47 @@ ErrorScreen, ) from seedsigner.gui.screens.scan_screens import ScanScreen -from seedsigner.gui.screens.tools_screens import (ToolsCalcFinalWordDoneScreen, ToolsCalcFinalWordFinalizePromptScreen, - ToolsCalcFinalWordScreen, ToolsCoinFlipEntryScreen, ToolsDiceEntropyEntryScreen, ToolsImageEntropyFinalImageScreen, - ToolsImageEntropyLivePreviewScreen, ToolsAddressExplorerAddressTypeScreen, ToolsTextQRTextEntryScreen, ToolsTextQRReviewTextScreen, - ToolsTextQRTranscribeModePromptScreen, ToolsTranscribeTextQRWholeQRScreen, ToolsTranscribeTextQRZoomedInScreen, - ToolsTranscribeTextQRConfirmQRPromptScreen, ToolsCommonFilterScreen) -from seedsigner.helpers import embit_utils, mnemonic_generation +from seedsigner.gui.screens.tools_screens import ( + ToolsCalcFinalWordDoneScreen, + ToolsCalcFinalWordFinalizePromptScreen, + ToolsCalcFinalWordScreen, + ToolsCoinFlipEntryScreen, + ToolsDiceEntropyEntryScreen, + ToolsImageEntropyFinalImageScreen, + ToolsImageEntropyLivePreviewScreen, + ToolsAddressExplorerAddressTypeScreen, + ToolsTextQRTextEntryScreen, + ToolsTextQRReviewTextScreen, + ToolsTextQRTranscribeModePromptScreen, + ToolsTranscribeTextQRWholeQRScreen, + ToolsTranscribeTextQRZoomedInScreen, + ToolsTranscribeTextQRConfirmQRPromptScreen, + ToolsCommonFilterScreen, + DatumContentScreen, +) +from seedsigner.helpers import embit_utils, mnemonic_generation, kef from seedsigner.helpers.iso7816 import format_sw_error from seedsigner.models.decode_qr import DecodeQR from seedsigner.models.encode_qr import GenericStaticQrEncoder -from seedsigner.gui.screens.screen import ButtonOption +from seedsigner.gui.screens.screen import ButtonOption, QRDisplayScreen from seedsigner.models.seed import Seed from seedsigner.models.settings_definition import SettingsConstants +from seedsigner.models.qr_type import QRType +from seedsigner.helpers.qr import QR +from seedsigner.helpers.datum import ( + DATUM_ADDRESS, + DATUM_DESCRIPTOR, + DATUM_PSBT, + DATUM_XPUB, + STATIC_QR_MAX_SIZE, + SLOW_ENCODING_MAX_SIZE, + analyze_contents, + convert_encoding, + DatumConversionError, + urobj_to_data, + unwrap_kef_envelope, + KEFMetadata, +) from seedsigner.views.seed_views import ( SeedDiscardView, SeedFinalizeView, @@ -81,6 +113,7 @@ class ToolsMenuView(View): ADDRESS_EXPLORER = ButtonOption("Address Explorer") VERIFY_ADDRESS = ButtonOption("Verify address") TEXTQRCODE = ButtonOption("Text QR Code") + DATUM_TOOL = ButtonOption("Datum Tool") SMARTCARD = ButtonOption("Smartcard Tools", FontAwesomeIconConstants.LOCK) MICROSD = ButtonOption("MicroSD Tools") GPG = ButtonOption("GPG Tools") @@ -100,6 +133,7 @@ def run(self): self.ADDRESS_EXPLORER, self.VERIFY_ADDRESS, self.TEXTQRCODE, + self.DATUM_TOOL, self.MICROSD, self.GPG, self.CLEAR_DESCRIPTOR, @@ -142,6 +176,9 @@ def run(self): elif button_data[selected_menu_num] == self.TEXTQRCODE: return Destination(ToolsTextQRView) + elif button_data[selected_menu_num] == self.DATUM_TOOL: + return Destination(ToolsDatumMenuView) + elif button_data[selected_menu_num] == self.SMARTCARD: return Destination(ToolsSmartcardMenuView) @@ -958,6 +995,633 @@ def run(self): # Exiting/Cancelling the QR display screen always returns to the list return Destination(ToolsAddressExplorerAddressListView, view_args=dict(is_change=self.is_change, start_index=self.start_index, selected_button_index=self.index - self.start_index, initial_scroll=self.parent_initial_scroll), skip_current_view=True) +"""**************************************************************************** + Datum Tool Views +****************************************************************************""" +class ToolsDatumMenuView(View): + SCAN = ButtonOption(_("Scan QR code")) + MANUAL = ButtonOption(_("Manual input")) + MICROSD = ButtonOption(_("From microSD")) + + def run(self): + button_data = [self.SCAN, self.MANUAL] + button_data.append(self.MICROSD) + + selection = self.run_screen( + ButtonListScreen, + title=_("Datum Tool"), + is_button_text_centered=False, + button_data=button_data, + ) + + if selection == RET_CODE__BACK_BUTTON: + return Destination(BackStackView) + + if button_data[selection] == self.SCAN: + result = self._scan_qr() + elif button_data[selection] == self.MANUAL: + result = self._manual_input() + else: + result = self._load_from_microsd() + + if not result: + return Destination(BackStackView) + + contents, title = result + return Destination( + ToolsDatumActionView, + view_args=dict(contents=contents, title=title), + ) + + def _scan_qr(self): + decoder = DecodeQR() + ScanScreen(decoder=decoder, instructions_text=_("Scan datum QR code")).display() + + self.controller.reset_screensaver_timeout() + time.sleep(0.1) + + if not decoder.is_complete: + if decoder.is_nonUTF8: + self._show_error(_("Non UTF-8 data detected.")) + else: + self._show_error(_("Failed to load")) + return None + + try: + contents = self._extract_decoder_contents(decoder) + except Exception as err: # pragma: no cover - defensive logging + logger.exception("Datum tool failed to parse QR: %s", err) + contents = None + + if contents is None: + self._show_error(_("Failed to load")) + return None + + if isinstance(contents, list): + contents = " ".join(str(item) for item in contents) + if isinstance(contents, dict): + contents = json.dumps(contents, indent=2) + + return contents, _("QR contents") + + def _manual_input(self): + ret = ToolsTextQRTextEntryScreen( + textToEncode="", + title=_("Custom Text"), + ).display() + + if ret.get("is_back_button"): + return None + + text = ret.get("textToEncode", "") + if not text: + self._show_error(_("Failed to load")) + return None + + return text, _("Custom Text") + + def _load_from_microsd(self): + microsd = MicroSD.get_instance() + if not microsd.is_inserted: + self._show_error(_("SD card not detected.")) + return None + + files = self._list_microsd_files() + if not files: + self._show_error(_("No files found on microSD.")) + return None + + options = [ButtonOption(path.name, return_data=path) for path in files] + selection = self.run_screen( + ButtonListScreen, + title=_("Select file"), + is_button_text_centered=False, + button_data=options, + ) + + if selection == RET_CODE__BACK_BUTTON: + return None + + file_path = options[selection].return_data + try: + with open(file_path, "rb") as file_handle: + data = file_handle.read() + except OSError as err: + logger.info("Failed to read microSD file %s: %s", file_path, err) + self._show_error(_("Failed to load")) + return None + + try: + text = data.decode() + if text.endswith("\n"): + text = text[:-1] + contents = text + except UnicodeDecodeError: + contents = data + + return contents, file_path.name + + def _list_microsd_files(self, limit: int = 100): + microsd_dir = MicroSD.get_microsd_dir() + files = sorted( + [path for path in microsd_dir.iterdir() if path.is_file()], + key=lambda path: path.name.lower(), + ) + return files[:limit] + + def _extract_decoder_contents(self, decoder: DecodeQR): + impl = getattr(decoder, "decoder", None) + if impl is None: + return None + + qr_type = decoder.qr_type + if qr_type in ( + QRType.PSBT__UR2, + QRType.OUTPUT__UR, + QRType.ACCOUNT__UR, + QRType.BYTES__UR, + ): + try: + ur_obj = impl.result_message() + except Exception: + ur_obj = None + if ur_obj: + data = urobj_to_data(ur_obj) + if data is not None: + return data + if hasattr(ur_obj, "cbor"): + return ur_obj.cbor + + for attr in ("get_data", "get_text", "get_address", "get_wallet_descriptor"): + if hasattr(impl, attr): + try: + result = getattr(impl, attr)() + except Exception: + continue + if result: + if isinstance(result, dict): + return json.dumps(result, indent=2) + return result + + if hasattr(impl, "get_qr_data"): + try: + qr_data = impl.get_qr_data() + except Exception: + qr_data = None + if qr_data: + if isinstance(qr_data, dict): + return json.dumps(qr_data, indent=2) + return qr_data + + try: + if decoder.is_psbt: + psbt_obj = decoder.get_psbt() + if psbt_obj: + return psbt_obj.serialize() + except Exception: + pass + + if decoder.is_text: + return decoder.get_text() + + if getattr(decoder, "is_address", False): + try: + return impl.get_address() + except Exception: + pass + + return None + + def _show_error(self, message: str): + self.run_screen( + ErrorScreen, + title=_("Error"), + status_headline=None, + text=str(message), + ) + + +class ToolsDatumActionView(View): + def __init__(self, contents, title: str): + super().__init__() + self.contents = contents + self.title = title + self.history: list = [] + self.analysis = analyze_contents(contents) + self.kef_metadata: Optional[KEFMetadata] = None + self._kef_ciphertext: Optional[bytes] = None + + def run(self): + offer_convert = False + show_summary = True + + while True: + self._update_analysis() + + if show_summary: + if self._show_summary() == RET_CODE__BACK_BUTTON: + return Destination(BackStackView) + show_summary = False + + menu_options = self._build_menu(offer_convert) + if not menu_options: + return Destination(BackStackView) + + selection = self.run_screen( + ButtonListScreen, + title=self.title or _("Datum Tool"), + is_button_text_centered=False, + button_data=menu_options, + ) + + if selection == RET_CODE__BACK_BUTTON: + return Destination(BackStackView) + + action = menu_options[selection].return_data + + if action == "info": + show_summary = True + elif action == "view": + self._view_contents() + elif action == "convert": + offer_convert = True + show_summary = True + elif action == "convert_done": + offer_convert = False + elif action == "qr": + self._display_qr() + elif action == "save": + if self._save_to_sd(): + show_summary = True + elif action == "encrypt": + if self._encrypt_contents(): + offer_convert = False + show_summary = True + elif action == "decrypt": + if self._decrypt_kef(): + offer_convert = False + show_summary = True + elif action == "undo": + if self._undo_conversion(): + show_summary = True + elif action in ("hex", "HEX", "utf8", "shift_case", 32, 43, 58, 64): + if self._apply_conversion(action): + show_summary = True + + def _build_menu(self, offer_convert: bool): + options = [] + if offer_convert: + for token in self._get_conversion_tokens(): + options.append(ButtonOption(self._conversion_label(token), return_data=token)) + if self.history: + options.append(ButtonOption(_("Undo last conversion"), return_data="undo")) + options.append(ButtonOption(_("Done converting"), return_data="convert_done")) + return options + + options.append(ButtonOption(_("Show info"), return_data="info")) + options.append(ButtonOption(_("View contents"), return_data="view")) + + if self._get_conversion_tokens(): + options.append(ButtonOption(_("Convert datum"), return_data="convert")) + + if self._can_display_qr(): + options.append(ButtonOption(_("QR code"), return_data="qr")) + + if not self.analysis.sensitive: + options.append(ButtonOption(_("Save to microSD"), return_data="save")) + + if self.kef_metadata and self._kef_ciphertext is not None: + options.append(ButtonOption(_("Decrypt KEF envelope"), return_data="decrypt")) + elif isinstance(self.contents, (bytes, bytearray)): + options.append(ButtonOption(_("Encrypt (KEF)"), return_data="encrypt")) + + return options + + def _get_conversion_tokens(self): + tokens = [] + if isinstance(self.contents, (bytes, bytearray)): + tokens.extend(["hex", "HEX", 32]) + if len(self.contents) <= int(SLOW_ENCODING_MAX_SIZE * 5.42 / 8): + tokens.append(43) + tokens.extend([58, 64, "utf8"]) + else: + encodings = set(self.analysis.encodings) + if "HEX" in encodings: + tokens.append("HEX") + if "hex" in encodings: + tokens.append("hex") + if 32 in encodings: + tokens.append(32) + if 43 in encodings: + tokens.append(43) + if 58 in encodings: + tokens.append(58) + if 64 in encodings: + tokens.append(64) + if "utf8" in encodings: + tokens.append("utf8") + if any(c.isalpha() for c in self.contents): + tokens.append("shift_case") + + seen = [] + for token in tokens: + if token not in seen: + seen.append(token) + return seen + + def _conversion_label(self, token): + if isinstance(self.contents, (bytes, bytearray)): + if token == "hex": + return _("To hex") + if token == "HEX": + return _("To HEX") + if token == "utf8": + return _("To utf8 text") + if token == "shift_case": + return _("Shift case") + return _("To base{}" ).format(token) + + if token == "hex": + return _("From hex") + if token == "HEX": + return _("From HEX") + if token == "utf8": + return _("From utf8") + if token == "shift_case": + return _("Shift case") + return _("From base{}" ).format(token) + + def _apply_conversion(self, token): + try: + new_contents = convert_encoding(self.contents, token) + except DatumConversionError as err: + self._show_error(str(err)) + return False + except Exception as err: # pragma: no cover - defensive logging + logger.exception("Datum conversion failed: %s", err) + self._show_error(_("Failed to convert")) + return False + + if new_contents is None: + self._show_error(_("Failed to convert")) + return False + + self.history.append(self.contents) + self.contents = new_contents + return True + + def _undo_conversion(self): + if not self.history: + self._show_error(_("Nothing to undo")) + return False + self.contents = self.history.pop() + return True + + def _view_contents(self): + if isinstance(self.contents, (bytes, bytearray)): + hex_str = binascii.hexlify(bytes(self.contents)).decode() + formatted = " ".join(hex_str[i : i + 2] for i in range(0, len(hex_str), 2)) + title = _("Hex data") + text = formatted + else: + title = self.title or _("Datum contents") + text = self.contents + + self.run_screen(DatumContentScreen, text=text, title=title) + + def _can_display_qr(self): + data = self.contents + if isinstance(data, bytearray): + data = bytes(data) + + if not isinstance(data, (bytes, str)): + return False + + try: + modules = QR().qrsize(data) + except Exception: + return False + + return modules <= 37 + + def _display_qr(self): + if not self._can_display_qr(): + self._show_error(_("Data is too large to encode as a static QR.")) + return + + data = self.contents if isinstance(self.contents, str) else bytes(self.contents) + + try: + encoder = GenericStaticQrEncoder(data=data) + self.run_screen(QRDisplayScreen, qr_encoder=encoder) + except Exception as err: + logger.exception("Datum QR display failed: %s", err) + self._show_error(_("Failed to encode QR")) + + def _save_to_sd(self): + microsd = MicroSD.get_instance() + if not microsd.is_inserted: + self._show_error(_("SD card not detected.")) + return False + + default_name = (self.title or _("datum")).strip() + default_name = re.sub(r"[^A-Za-z0-9._-]", "_", default_name) or "datum" + ext = "bin" if isinstance(self.contents, (bytes, bytearray)) else "txt" + default_name = f"{default_name}.{ext}" + + ret = ToolsTextQRTextEntryScreen( + textToEncode=default_name, + title=_("Filename"), + ).display() + + if ret.get("is_back_button"): + return False + + filename = ret.get("textToEncode", "").strip() + if not filename: + self._show_error(_("Filename required.")) + return False + + filename = re.sub(r"[^A-Za-z0-9._-]", "_", filename) + if not filename.lower().endswith(f".{ext}"): + filename = f"{filename}.{ext}" + + file_path = MicroSD.get_microsd_dir() / filename + + try: + mode = "wb" if isinstance(self.contents, (bytes, bytearray)) else "w" + with open(file_path, mode) as file_handle: + if mode == "wb": + file_handle.write(bytes(self.contents)) + else: + file_handle.write(self.contents) + except OSError as err: + logger.info("Failed to save datum to %s: %s", file_path, err) + self._show_error(_("Failed to save to microSD.")) + return False + + self._show_success(_("Saved to {}" ).format(file_path.name)) + return True + + def _prompt_encryption_key(self, existing: str = ""): + from seedsigner.gui.screens.scan_screens import ScanTypeEncryptionKeyScreen + + result = self.run_screen(ScanTypeEncryptionKeyScreen, encryptionkey=existing) + encryption_key = result.get("encryptionkey", "") + if "is_back_button" in result: + return None + return encryption_key + + def _prompt_label(self, default: str = ""): + ret = ToolsTextQRTextEntryScreen( + textToEncode=default, + title=_("Label"), + ).display() + + if ret.get("is_back_button"): + return None + + return ret.get("textToEncode", "") + + def _encrypt_contents(self): + if not isinstance(self.contents, (bytes, bytearray)): + self._show_error(_("Convert data to bytes before encrypting.")) + return False + + key = self._prompt_encryption_key() + if key is None: + return False + + label = self._prompt_label(default=self.analysis.datum or (self.title or "")) + if label is None: + return False + + label_bytes = label.encode() if label else b"" + iterations_setting = self.settings.get_value(SettingsConstants.SETTING__ENCRYPTION_ITER) + iterations = iterations_setting * 10000 + mode_name = self.settings.get_value(SettingsConstants.SETTING__ENCRYPTION_MODE) + plaintext = bytes(self.contents) + + try: + version = kef.suggest_versions(plaintext, mode_name)[0] + cipher = kef.Cipher(key, label_bytes, iterations) + ciphertext = cipher.encrypt(plaintext, version) + envelope = kef.wrap(label_bytes, version, iterations, ciphertext) + except Exception as err: + logger.exception("Datum encryption failed: %s", err) + self._show_error(_("Encryption failed.")) + return False + + self.contents = envelope + self.history.clear() + self.kef_metadata = None + self._kef_ciphertext = None + self.title = f"{label} KEF" if label else "KEF" + self._show_success(_("Created KEF envelope.")) + return True + + def _decrypt_kef(self): + if not self.kef_metadata or self._kef_ciphertext is None: + self._show_error(_("No KEF envelope detected.")) + return False + + key = self._prompt_encryption_key() + if key is None: + return False + + try: + cipher = kef.Cipher(key, self.kef_metadata.label, self.kef_metadata.iterations) + plaintext = cipher.decrypt(self._kef_ciphertext, self.kef_metadata.version) + except Exception as err: + logger.info("Datum KEF decrypt failed: %s", err) + self._show_error(_("Failed to decrypt.")) + return False + + label_display = self.kef_metadata.display_label() + self.contents = plaintext + self.history.clear() + self.kef_metadata = None + self._kef_ciphertext = None + if label_display: + self.title = label_display + self._show_success(_("Decrypted KEF envelope.")) + return True + + def _update_analysis(self): + self.analysis = analyze_contents(self.contents) + self.kef_metadata = None + self._kef_ciphertext = None + + if isinstance(self.contents, (bytes, bytearray)): + try: + metadata, ciphertext = unwrap_kef_envelope(bytes(self.contents)) + except Exception: + return + self.kef_metadata = metadata + self._kef_ciphertext = ciphertext + + def _summary_lines(self): + lines = [] + headline = self.title or _("Datum") + lines.append(headline) + lines.append(self.analysis.about) + + if self.analysis.encodings: + enc = ", ".join(str(item) for item in self.analysis.encodings) + lines.append(_("Encodings: {}" ).format(enc)) + + datum_map = { + DATUM_PSBT: _("Detected type: PSBT"), + DATUM_DESCRIPTOR: _("Detected type: Descriptor"), + DATUM_XPUB: _("Detected type: Extended key"), + DATUM_ADDRESS: _("Detected type: Address"), + } + if self.analysis.datum: + lines.append(datum_map.get(self.analysis.datum, _("Detected type: {}" ).format(self.analysis.datum))) + + if self.analysis.sensitive: + lines.append(_("Sensitive data")) + + if self.kef_metadata: + lines.append(_("KEF envelope: {}" ).format(self.kef_metadata.display_label())) + + preview = self.analysis.preview() + if preview: + preview_text = preview.replace("\n", " ") if isinstance(self.contents, str) else preview + if len(preview_text) > 160: + preview_text = preview_text[:160] + "…" + lines.append(_("Preview: {}" ).format(preview_text)) + + return lines + + def _show_summary(self): + return self.run_screen( + LargeIconStatusScreen, + title=self.title or _("Datum Tool"), + status_headline=None, + text="\n".join(self._summary_lines()), + button_data=[ButtonOption(_("Continue"))], + allow_text_overflow=True, + ) + + def _show_error(self, message: str): + self.run_screen( + ErrorScreen, + title=_("Error"), + status_headline=None, + text=str(message), + ) + + def _show_success(self, message: str): + self.run_screen( + LargeIconStatusScreen, + title=_("Success"), + status_headline=None, + text=message, + button_data=[ButtonOption(_("Continue"))], + ) + + """**************************************************************************** Text QR Code Views ****************************************************************************""" diff --git a/tests/test_datum_helper.py b/tests/test_datum_helper.py new file mode 100644 index 000000000..a171ff7cb --- /dev/null +++ b/tests/test_datum_helper.py @@ -0,0 +1,105 @@ +import binascii +import os + +from types import SimpleNamespace +from urtypes.crypto import PSBT as UR_PSBT + +from seedsigner.helpers import kef +from seedsigner.helpers.datum import ( + DATUM_ADDRESS, + DATUM_DESCRIPTOR, + DATUM_PSBT, + DATUM_XPUB, + analyze_contents, + convert_encoding, + detect_encodings, + identify_datum, + unwrap_kef_envelope, + urobj_to_data, +) + + +def test_detect_encodings_hex(): + encodings = detect_encodings("deadbeef") + assert "hex" in encodings + assert "utf8" in encodings + + +def test_detect_encodings_base32(): + data = "MFRGGZDFMZTWQ2LK" + encodings = detect_encodings(data) + assert 32 in encodings + + +def test_convert_encoding_hex_roundtrip(): + original = b"hello world" + encoded = convert_encoding(original, "hex") + assert encoded == binascii.hexlify(original).decode() + decoded = convert_encoding(encoded, "hex") + assert decoded == original + + +def test_convert_encoding_utf8_roundtrip(): + text = "SeedSigner" + data = convert_encoding(text, "utf8") + assert isinstance(data, bytes) + assert convert_encoding(data, "utf8") == text + + +def test_identify_datum_types(): + assert identify_datum(b"psbt\xff\x01") == DATUM_PSBT + + xpub = ( + "xpub661MyMwAqRbcF75hXVh8AyTzw9uQWGWpZ6YG1ChcxrFAuoRE2R3pNu3yqYgnSUZXoqBYwygJyBEt" + "QtdgQXVc3k5ufADG7n2AFDzy83H8dH7" + ) + assert identify_datum(xpub, [58]) == DATUM_XPUB + + descriptor = ( + "wpkh([d34db33f/84h/0h/0h]xpub661MyMwAqRbcF75hXVh8AyTzw9uQWGWpZ6YG1ChcxrFAuoRE2R3p" + "Nu3yqYgnSUZXoqBYwygJyBEtQtdgQXVc3k5ufADG7n2AFDzy83H8dH7/0/*)" + ) + assert identify_datum(descriptor, [58]) == DATUM_DESCRIPTOR + + address = "1BoatSLRHtKNngkdXEeobR76b53LETtpyT" + encodings = detect_encodings(address) + assert identify_datum(address, encodings) == DATUM_ADDRESS + + +def test_analyze_contents_sensitivity(): + mnemonic = " ".join(["zoo"] * 12) + analysis = analyze_contents(mnemonic) + assert analysis.sensitive + + entropy = bytes([200] * 16) + analysis_bytes = analyze_contents(entropy) + assert analysis_bytes.sensitive + + +def test_urobj_to_data_psbt(): + sample = b"psbt\xff\x01\x02" + ur_obj = SimpleNamespace(type="crypto-psbt", cbor=UR_PSBT(sample).to_cbor()) + extracted = urobj_to_data(ur_obj) + assert extracted == sample + + +def test_unwrap_kef_envelope_roundtrip(): + payload = b"hello datum" + label = b"label" + iterations = 10000 + mode_name = "AES-CBC" + + version = kef.suggest_versions(payload, mode_name)[0] + cipher = kef.Cipher("secret", label, iterations) + iv_len = kef.MODE_IVS.get(kef.MODE_NUMBERS[mode_name], 0) + iv = os.urandom(iv_len) if iv_len else b"" + ciphertext = cipher.encrypt(payload, version, iv=iv) + envelope = kef.wrap(label, version, iterations, ciphertext) + + metadata, ciphertext_out = unwrap_kef_envelope(envelope) + assert metadata.label == label + assert metadata.iterations == iterations + assert metadata.version == version + + cipher2 = kef.Cipher("secret", metadata.label, metadata.iterations) + assert cipher2.decrypt(ciphertext_out, metadata.version) == payload