Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions keepercommander/commands/security_audit.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import argparse
import base64
import datetime
import json
import logging
import os
from json import JSONDecodeError
from typing import Dict, List, Optional, Any

from cryptography.hazmat.primitives.asymmetric import rsa, ec

from .helpers.enterprise import get_enterprise_key, try_enterprise_decrypt
from .. import api, crypto, utils
from ..error import CommandError
from ..breachwatch import BreachWatch
from .base import GroupCommand, field_to_title, dump_report_data, report_output_parser
from .enterprise_common import EnterpriseCommand
Expand All @@ -30,6 +33,8 @@ def register_command_info(aliases, command_info):

report_parser = argparse.ArgumentParser(prog='security-audit-report', description='Run a security audit report.',
parents=[report_output_parser])
report_parser.add_argument('--siem', dest='siem', action='store_true',
help='output in SIEM-ready NDJSON format (one event per line)')
report_parser.add_argument('--syntax-help', dest='syntax_help', action='store_true', help='display help')
node_filter_help = 'name(s) or UID(s) of node(s) to filter results of the report by'
report_parser.add_argument('-n', '--node', action='append', help=node_filter_help)
Expand Down Expand Up @@ -480,6 +485,10 @@ def decrypt_security_data(sec_data, key_type):
if save_report:
self.save_updated_security_reports(params, updated_security_reports)

# Reject incompatible flag combinations early
if kwargs.get('siem') and record_details:
raise CommandError('security-audit', '--siem and --record-details cannot be used together')

if record_details:
if not has_incremental_data:
logging.warning('No incremental security data available for record detail output.')
Expand All @@ -492,12 +501,19 @@ def decrypt_security_data(sec_data, key_type):
fields = ('email', 'name', 'sync_pending', 'at_risk', 'passed', 'ignored') if show_breachwatch else \
('email', 'name', 'sync_pending', 'weak', 'fair', 'medium', 'strong', 'reused', 'unique', 'securityScore',
'twoFactorChannel', 'node')
field_descriptions = fields

report_title = f'Security Audit Report{" (BreachWatch)" if show_breachwatch else ""}'

# SIEM-ready NDJSON output
if kwargs.get('siem'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets silently ignored when --record-details is set, because the method returns from the record-detail branch before it ever reaches the SIEM branch. If we want to support that combination, it needs handling there; otherwise we should reject the flag combo explicitly instead of accepting it and returning the normal record-detail report.

if fmt != 'table':
logging.warning('--siem produces NDJSON output; ignoring --format %s', fmt)
return self._export_siem(params, rows, fields, show_breachwatch, out, report_title)

field_descriptions = fields
if fmt == 'table':
field_descriptions = (field_to_title(x) for x in fields)

report_title = f'Security Audit Report{" (BreachWatch)" if show_breachwatch else ""}'
table = []
for raw in rows:
row = []
Expand All @@ -506,6 +522,58 @@ def decrypt_security_data(sec_data, key_type):
table.append(row)
return dump_report_data(table, field_descriptions, fmt=fmt, filename=out, title=report_title)

@staticmethod
def _export_siem(params, rows, fields, show_breachwatch, filename, title):
"""Export security audit as NDJSON (one JSON event per line).

Each line is a self-contained JSON object that SIEMs can ingest
independently. Compatible with Splunk HEC, Elastic Filebeat,
Datadog log pipelines, and any NDJSON consumer.
"""
now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat(timespec='seconds')
server = params.server if hasattr(params, 'server') else ''
ndjson_lines = []

for raw in rows:
user_data = {f: raw.get(f) for f in fields}

# Collect risk factors from the data — no hardcoded thresholds
risk_factors = []
if not show_breachwatch:
if raw.get('weak', 0) > 0:
risk_factors.append('weak_passwords')
if raw.get('reused', 0) > 0:
risk_factors.append('reused_passwords')
two_fa = raw.get('twoFactorChannel', '')
if not two_fa or two_fa == 'Off':
risk_factors.append('no_2fa')
else:
if raw.get('at_risk', 0) > 0:
risk_factors.append('breach_exposure')

event = {
'event_type': 'keeper.security_audit',
'timestamp': now,
'source': server,
'user': user_data,
'risk_factors': risk_factors,
}
if not show_breachwatch:
event['security_score'] = raw.get('securityScore', 0)
ndjson_lines.append(json.dumps(event, default=str))

report = '\n'.join(ndjson_lines) + '\n' if ndjson_lines else ''

if filename:
if not os.path.splitext(filename)[1]:
Copy link
Contributor

@aaunario-keeper aaunario-keeper Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This silently bypasses the shared --format / report-output path. Once --siem is set, fmt no longer governs behavior, and --output is honored even though the shared parser help says output is ignored for table mode. The main issue here is the lack of validation or explicit contract for how --siem interacts with --format and the normal report-output path. I would either validate the combination and fail closed, or make the override explicit in the CLI contract.

filename += '.ndjson'
logging.info('SIEM report path: %s', os.path.abspath(filename))
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, 'w') as f:
f.write(report)

Check failure

Code scanning / CodeQL

Clear-text storage of sensitive information

This expression stores [sensitive data (password)](1) as clear text.
else:
return report

def get_updated_security_report_row(self, sr, rsa_key, ec_key, last_saved_data):
# type: (APIRequest_pb2.SecurityReport, rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey, Dict[str, int]) -> Dict[str, int]

Expand Down
Loading