Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 68 additions & 33 deletions tools/normalize_contribution.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def _load_pack_registry(packs_root: Optional[Path]) -> tuple[dict, dict]:
".secrets-ignore",
"Author_image.png",
"CHANGELOG.md",
# Policy/config files that live inside List directories but are not
# list descriptors or data files — read directly by framework validators.
"shadow_mode_policy.json",
}

# Files in these directories are skipped. They have their own SDK schema
Expand Down Expand Up @@ -333,8 +336,12 @@ def content_type_from_path(path: Path) -> Optional[str]:
if content_dir == "Playbooks" and path.suffix.lower() in (".yml", ".yaml"):
return "playbook"

# Accept both .json and .txt in Lists/ — XSIAM exports can produce either
# Accept both .json and .txt in Lists/ — XSIAM exports can produce either.
# Never process _data.json files — they are the data half of the two-file
# list structure already in the repo, not a new contribution to normalise.
if content_dir == "Lists" and path.suffix.lower() in (".json", ".txt"):
if path.stem.endswith("_data"):
return None
return "list"

if content_dir == "Scripts":
Expand Down Expand Up @@ -1197,6 +1204,59 @@ def process_file(
return True, False

canon = list_canonical_name(path, override_name)

# Determine the target directory — where the canonical files live.
if out_dir is not None:
target_dir = Path(out_dir) / canon
else:
lists_parent = path.parent
if lists_parent.name == canon:
target_dir = lists_parent
else:
p = path.parent
while p != p.parent and p.name != "Lists":
p = p.parent
target_dir = p / canon

data_path = target_dir / f"{canon}_data.json"
desc_path = target_dir / f"{canon}.json"

# ── UPDATE MODE: _data.json already exists ────────────────────────────
# The contributor is updating an existing list. Write the new content
# directly to _data.json and leave the descriptor completely untouched.
# No descriptor modification, no fromVersion injection, no split.
if data_path.exists():
# Check if content actually changed
try:
existing = json.loads(data_path.read_text(encoding="utf-8"))
except Exception:
existing = {}

if existing == data:
print(OK(" ✓ already clean"))
return True, False

changes = ["data file updated (existing list)"]
prefix = "(dry-run) " if dry_run else ""
for c in changes:
print(f" {prefix}{OK('●')} {c}")

if not dry_run:
target_dir.mkdir(parents=True, exist_ok=True)
data_path.write_text(
json.dumps(data, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(f" {OK('→')} {data_path} (data)")
if path.resolve() != data_path.resolve():
path.unlink()
print(f" {OK('✗')} removed {path.name} (replaced by canonical data file)")

return True, True

# ── CREATE MODE: new list, no existing _data.json ─────────────────────
# The contributor is adding a brand new list. Create both the descriptor
# and data files from the contribution.
descriptor, changes, data_out = normalize_list(data, canon)

if not changes:
Expand All @@ -1208,47 +1268,22 @@ def process_file(
print(f" {prefix}{OK('●')} {c}")

if not dry_run:
# Determine the target directory.
# Repo structure: Lists/<ListName>/<ListName>.json + <ListName>_data.json
# If the file is already inside a correctly-named subdirectory, use it.
# If the file is at the Lists/ level or has a different name, create
# the subdirectory and write both files there.
if out_dir is not None:
target_dir = Path(out_dir) / canon
else:
# Check if we are already inside Lists/<canon>/
lists_parent = path.parent
if lists_parent.name == canon:
# Already in the right subdirectory
target_dir = lists_parent
else:
# At Lists/ level or wrong directory — create canonical subdir
# Walk up to find the Lists/ directory
p = path.parent
while p != p.parent and p.name != "Lists":
p = p.parent
target_dir = p / canon

target_dir.mkdir(parents=True, exist_ok=True)

# Write the descriptor file: <ListName>.json
desc_path = target_dir / f"{canon}.json"
desc_path.write_text(
json.dumps(descriptor, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(f" {OK('→')} {desc_path} (descriptor)")
# Write descriptor only if it doesn't already exist
if not desc_path.exists():
desc_path.write_text(
json.dumps(descriptor, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(f" {OK('→')} {desc_path} (descriptor)")

# Write the data file: <ListName>_data.json
data_path = target_dir / f"{canon}_data.json"
data_path.write_text(
json.dumps(data_out, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
print(f" {OK('→')} {data_path} (data)")

# Remove the original file if it is in a different location or has a
# different name — prevents stale files with duplicate identity
if path.resolve() != desc_path.resolve() and path.resolve() != data_path.resolve():
path.unlink()
print(f" {OK('✗')} removed {path.name} (replaced by canonical files)")
Expand Down
Loading