Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions keepercommander/commands/pam_launch/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
)
from ..pam.gateway_helper import get_all_gateways
from ..pam.router_helper import router_get_connected_gateways
from ..ssh_agent import try_extract_private_key
from ... import api, vault
from ...subfolder import try_resolve_path
from ...error import CommandError
Expand Down Expand Up @@ -165,12 +166,14 @@ def _get_host_port_from_record(record: Any) -> Tuple[Optional[str], Optional[int
return candidates[0]


def _record_has_credentials(record: Any) -> bool:
def _record_has_credentials(record: Any, params: Optional['KeeperParams'] = None) -> bool:
"""
Return True if the record has exactly one non-empty login field and exactly one non-empty
password field (value[0] != ''). Searches both fields[] and custom[].
Return True if the record has exactly one non-empty login field and at least one of:
- exactly one non-empty password field (fields[] and custom[]), or
- a usable SSH private key (same discovery as the launch path: keyPair, notes, custom fields,
attachments), when ``params`` is given so attachments can be resolved.

Raises CommandError if multiple non-empty fields of the same type are found (ambiguous).
Raises CommandError if multiple non-empty login or password fields are found (ambiguous).
"""
if not record:
return False
Expand All @@ -197,10 +200,16 @@ def _count_nonempty(field_type: str) -> int:
raise CommandError('pam launch',
f'Record has {password_count} non-empty password fields (expected exactly one). '
'Clear the extra password field before launching.')
if password_count == 0:
return False
if password_count == 1:
return True

# No password: SSH (and similar) may authenticate with a private key only.
if password_count == 0 and params is not None:
key_result = try_extract_private_key(params, record)
if key_result and key_result[0]:
return True

return True
return False


def _record_has_host_port(record: Any) -> bool:
Expand Down Expand Up @@ -615,9 +624,10 @@ def execute(self, params: KeeperParams, **kwargs):
cred_record = vault.KeeperRecord.load(params, launch_credential_uid)
if not cred_record:
raise CommandError('pam launch', f'Credential record not found: {launch_credential_uid}')
if not _record_has_credentials(cred_record):
if not _record_has_credentials(cred_record, params):
raise CommandError('pam launch',
f'Credential record {launch_credential_uid} must have non-empty login and password fields.')
f'Credential record {launch_credential_uid} must have non-empty login and '
'password, or login with an SSH private key.')

if allow_supply_host:
# allowSupplyHost mode: host comes from -H/-hr (CLI) or from the --credential record.
Expand Down Expand Up @@ -671,12 +681,13 @@ def execute(self, params: KeeperParams, **kwargs):
raise CommandError('pam launch',
f'No hostname configured for record {record_uid}.')

# No CLI options at all -> validate DAG-linked credential has login + password
# No CLI options at all -> validate DAG-linked credential has login + password or SSH key
if dag_linked_uid:
dag_cred_record = vault.KeeperRecord.load(params, dag_linked_uid)
if dag_cred_record and not _record_has_credentials(dag_cred_record):
if dag_cred_record and not _record_has_credentials(dag_cred_record, params):
raise CommandError('pam launch',
f'Linked credential record {dag_linked_uid} has empty login or password. '
f'Linked credential record {dag_linked_uid} has no usable auth '
'(need login and password, or login and SSH private key). '
'Configure valid credentials or use --credential to override.')
elif not allow_supply_user and not allow_supply_host:
raise CommandError('pam launch',
Expand Down
94 changes: 75 additions & 19 deletions keepercommander/commands/ssh_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,59 @@ def is_private_key_name(name): # type: (str) -> bool
return False

KEY_SIZE_MIN = 119 # Smallest possible size for ed25519 private key in PKCS#8 format
# PEM bodies for large RSA keys (8192+) exceed 4K; keep a generous cap for sanity.
KEY_SIZE_MAX = 4000
PEM_TEXT_MAX = 256 * 1024


def _normalize_typed_field_label(label):
# type: (Any) -> str
if not label or not isinstance(label, str):
return ''
return ''.join(c.lower() for c in label if c.isalnum())


# Vault/PAM pamUser often stores PEM in a secret field labeled privatePEMKey (or similar).
_PEM_SECRET_FIELD_LABELS = frozenset({
'privatepemkey',
'sshprivatekey',
'sshkeypem',
})


def _coerce_str_field_value(value):
# type: (Any) -> Optional[str]
if value is None:
return None
if isinstance(value, bytes):
try:
return value.decode('utf-8')
except Exception:
return None
if isinstance(value, str):
return value
return None


def is_valid_key_value(value):
return isinstance(value, str) and KEY_SIZE_MIN <= len(value) < KEY_SIZE_MAX


def _is_plausible_pem_private_key_blob(text):
# type: (Optional[str]) -> bool
text = _coerce_str_field_value(text)
if not text:
return False
text = text.strip()
if len(text) < KEY_SIZE_MIN or len(text) > PEM_TEXT_MAX:
return False
header, _, _ = text.partition('\n')
return bool(is_private_key(header))


def is_valid_key_file(file):
try:
return KEY_SIZE_MIN <= file.size < KEY_SIZE_MAX
return KEY_SIZE_MIN <= file.size < PEM_TEXT_MAX
except:
return False

Expand All @@ -211,31 +257,41 @@ def try_extract_private_key(params, record_or_uid):
if key_pair:
private_key = key_pair.get('privateKey')

# Explicit PEM secret fields (pamUser template: type secret, label privatePEMKey, etc.)
if not private_key and isinstance(record, vault.TypedRecord):
for fld in itertools.chain(record.fields, record.custom):
if _normalize_typed_field_label(getattr(fld, 'label', None)) not in _PEM_SECRET_FIELD_LABELS:
continue
candidate = _coerce_str_field_value(fld.get_default_value())
if _is_plausible_pem_private_key_blob(candidate):
private_key = candidate.strip()
break

# check notes field
if not private_key:
if isinstance(record, (vault.PasswordRecord, vault.TypedRecord)):
if is_valid_key_value(record.notes):
header, _, _ = record.notes.partition('\n')
if is_private_key(header):
private_key = record.notes
notes = getattr(record, 'notes', None)
if _is_plausible_pem_private_key_blob(notes):
private_key = notes.strip()

# check custom fields
# check typed fields / custom (text, multiline, secret, note)
if not private_key:
if isinstance(record, vault.TypedRecord):
try_values = (x.get_default_value() for x in itertools.chain(record.fields, record.custom) if x.type in ('text', 'multiline', 'secret', 'note'))
for value in (x for x in try_values if x):
if is_valid_key_value(value):
header, _, _ = value.partition('\n')
if is_private_key(header):
private_key = value
break
for x in itertools.chain(record.fields, record.custom):
if x.type not in ('text', 'multiline', 'secret', 'note'):
continue
candidate = _coerce_str_field_value(x.get_default_value())
if _is_plausible_pem_private_key_blob(candidate):
private_key = candidate.strip()
break
elif isinstance(record, vault.PasswordRecord):
for value in (x.value for x in record.custom if x.value):
if is_valid_key_value(value):
header, _, _ = value.partition('\n')
if is_private_key(header):
private_key = value
break
for cf in record.custom:
if not cf.value:
continue
candidate = _coerce_str_field_value(cf.value[0] if isinstance(cf.value, list) and cf.value else cf.value)
if _is_plausible_pem_private_key_blob(candidate):
private_key = candidate.strip()
break

# check for a single attachment
if not private_key:
Expand Down
Loading