From 40d8c511c1895f987cee59d0227e253752abc278 Mon Sep 17 00:00:00 2001 From: Ivan Dimov <78815270+idimov-keeper@users.noreply.github.com> Date: Fri, 20 Mar 2026 10:43:43 -0500 Subject: [PATCH] Improved lookup for launch credentials with SSH keys only --- keepercommander/commands/pam_launch/launch.py | 35 ++++--- keepercommander/commands/ssh_agent.py | 94 +++++++++++++++---- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/keepercommander/commands/pam_launch/launch.py b/keepercommander/commands/pam_launch/launch.py index 056a329cf..1b6b5cba2 100644 --- a/keepercommander/commands/pam_launch/launch.py +++ b/keepercommander/commands/pam_launch/launch.py @@ -49,6 +49,7 @@ ) from ..pam.gateway_helper import get_all_gateways from ..pam.router_helper import router_get_connected_gateways +from ..ssh_agent import try_extract_private_key from ... import api, vault from ...subfolder import try_resolve_path from ...error import CommandError @@ -165,12 +166,14 @@ def _get_host_port_from_record(record: Any) -> Tuple[Optional[str], Optional[int return candidates[0] -def _record_has_credentials(record: Any) -> bool: +def _record_has_credentials(record: Any, params: Optional['KeeperParams'] = None) -> bool: """ - Return True if the record has exactly one non-empty login field and exactly one non-empty - password field (value[0] != ''). Searches both fields[] and custom[]. + Return True if the record has exactly one non-empty login field and at least one of: + - exactly one non-empty password field (fields[] and custom[]), or + - a usable SSH private key (same discovery as the launch path: keyPair, notes, custom fields, + attachments), when ``params`` is given so attachments can be resolved. - Raises CommandError if multiple non-empty fields of the same type are found (ambiguous). + Raises CommandError if multiple non-empty login or password fields are found (ambiguous). """ if not record: return False @@ -197,10 +200,16 @@ def _count_nonempty(field_type: str) -> int: raise CommandError('pam launch', f'Record has {password_count} non-empty password fields (expected exactly one). ' 'Clear the extra password field before launching.') - if password_count == 0: - return False + if password_count == 1: + return True + + # No password: SSH (and similar) may authenticate with a private key only. + if password_count == 0 and params is not None: + key_result = try_extract_private_key(params, record) + if key_result and key_result[0]: + return True - return True + return False def _record_has_host_port(record: Any) -> bool: @@ -615,9 +624,10 @@ def execute(self, params: KeeperParams, **kwargs): cred_record = vault.KeeperRecord.load(params, launch_credential_uid) if not cred_record: raise CommandError('pam launch', f'Credential record not found: {launch_credential_uid}') - if not _record_has_credentials(cred_record): + if not _record_has_credentials(cred_record, params): raise CommandError('pam launch', - f'Credential record {launch_credential_uid} must have non-empty login and password fields.') + f'Credential record {launch_credential_uid} must have non-empty login and ' + 'password, or login with an SSH private key.') if allow_supply_host: # allowSupplyHost mode: host comes from -H/-hr (CLI) or from the --credential record. @@ -671,12 +681,13 @@ def execute(self, params: KeeperParams, **kwargs): raise CommandError('pam launch', f'No hostname configured for record {record_uid}.') - # No CLI options at all -> validate DAG-linked credential has login + password + # No CLI options at all -> validate DAG-linked credential has login + password or SSH key if dag_linked_uid: dag_cred_record = vault.KeeperRecord.load(params, dag_linked_uid) - if dag_cred_record and not _record_has_credentials(dag_cred_record): + if dag_cred_record and not _record_has_credentials(dag_cred_record, params): raise CommandError('pam launch', - f'Linked credential record {dag_linked_uid} has empty login or password. ' + f'Linked credential record {dag_linked_uid} has no usable auth ' + '(need login and password, or login and SSH private key). ' 'Configure valid credentials or use --credential to override.') elif not allow_supply_user and not allow_supply_host: raise CommandError('pam launch', diff --git a/keepercommander/commands/ssh_agent.py b/keepercommander/commands/ssh_agent.py index 74fdfb009..d7d53691d 100644 --- a/keepercommander/commands/ssh_agent.py +++ b/keepercommander/commands/ssh_agent.py @@ -179,13 +179,59 @@ def is_private_key_name(name): # type: (str) -> bool return False KEY_SIZE_MIN = 119 # Smallest possible size for ed25519 private key in PKCS#8 format +# PEM bodies for large RSA keys (8192+) exceed 4K; keep a generous cap for sanity. KEY_SIZE_MAX = 4000 +PEM_TEXT_MAX = 256 * 1024 + + +def _normalize_typed_field_label(label): + # type: (Any) -> str + if not label or not isinstance(label, str): + return '' + return ''.join(c.lower() for c in label if c.isalnum()) + + +# Vault/PAM pamUser often stores PEM in a secret field labeled privatePEMKey (or similar). +_PEM_SECRET_FIELD_LABELS = frozenset({ + 'privatepemkey', + 'sshprivatekey', + 'sshkeypem', +}) + + +def _coerce_str_field_value(value): + # type: (Any) -> Optional[str] + if value is None: + return None + if isinstance(value, bytes): + try: + return value.decode('utf-8') + except Exception: + return None + if isinstance(value, str): + return value + return None + + def is_valid_key_value(value): return isinstance(value, str) and KEY_SIZE_MIN <= len(value) < KEY_SIZE_MAX + +def _is_plausible_pem_private_key_blob(text): + # type: (Optional[str]) -> bool + text = _coerce_str_field_value(text) + if not text: + return False + text = text.strip() + if len(text) < KEY_SIZE_MIN or len(text) > PEM_TEXT_MAX: + return False + header, _, _ = text.partition('\n') + return bool(is_private_key(header)) + + def is_valid_key_file(file): try: - return KEY_SIZE_MIN <= file.size < KEY_SIZE_MAX + return KEY_SIZE_MIN <= file.size < PEM_TEXT_MAX except: return False @@ -211,31 +257,41 @@ def try_extract_private_key(params, record_or_uid): if key_pair: private_key = key_pair.get('privateKey') + # Explicit PEM secret fields (pamUser template: type secret, label privatePEMKey, etc.) + if not private_key and isinstance(record, vault.TypedRecord): + for fld in itertools.chain(record.fields, record.custom): + if _normalize_typed_field_label(getattr(fld, 'label', None)) not in _PEM_SECRET_FIELD_LABELS: + continue + candidate = _coerce_str_field_value(fld.get_default_value()) + if _is_plausible_pem_private_key_blob(candidate): + private_key = candidate.strip() + break + # check notes field if not private_key: if isinstance(record, (vault.PasswordRecord, vault.TypedRecord)): - if is_valid_key_value(record.notes): - header, _, _ = record.notes.partition('\n') - if is_private_key(header): - private_key = record.notes + notes = getattr(record, 'notes', None) + if _is_plausible_pem_private_key_blob(notes): + private_key = notes.strip() - # check custom fields + # check typed fields / custom (text, multiline, secret, note) if not private_key: if isinstance(record, vault.TypedRecord): - try_values = (x.get_default_value() for x in itertools.chain(record.fields, record.custom) if x.type in ('text', 'multiline', 'secret', 'note')) - for value in (x for x in try_values if x): - if is_valid_key_value(value): - header, _, _ = value.partition('\n') - if is_private_key(header): - private_key = value - break + for x in itertools.chain(record.fields, record.custom): + if x.type not in ('text', 'multiline', 'secret', 'note'): + continue + candidate = _coerce_str_field_value(x.get_default_value()) + if _is_plausible_pem_private_key_blob(candidate): + private_key = candidate.strip() + break elif isinstance(record, vault.PasswordRecord): - for value in (x.value for x in record.custom if x.value): - if is_valid_key_value(value): - header, _, _ = value.partition('\n') - if is_private_key(header): - private_key = value - break + for cf in record.custom: + if not cf.value: + continue + candidate = _coerce_str_field_value(cf.value[0] if isinstance(cf.value, list) and cf.value else cf.value) + if _is_plausible_pem_private_key_blob(candidate): + private_key = candidate.strip() + break # check for a single attachment if not private_key: