From d0d39e4474d8254f3877988c29a91a192100913b Mon Sep 17 00:00:00 2001 From: Michael Ivertowski Date: Sat, 9 May 2026 13:17:09 +0200 Subject: [PATCH] =?UTF-8?q?release:=20v5.10.0=20=E2=80=94=20group-audit-aw?= =?UTF-8?q?are=20Method-A=20je=5Fnetwork=20export?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds per-entity (entities/{code}/graphs/je_network.{csv,parquet}) and consolidated (consolidated/je_network.{csv,parquet}) accounting- network edge lists to the group-audit pipeline. IC pairs surface via ic_pair_id + ic_partner_entity columns; elimination JEs are flagged is_eliminated=true on the consolidated file with entity_code set to the group_id. * New shared helper datasynth_runtime::je_network::build_je_network_edges reused by single-entity output_writer + the new group emitter. * Single-entity CSV output is byte-identical to v5.9.0 (existing integration test je_network_export still passes). * New aggregate module datasynth-group/src/aggregate/je_network.rs wires into aggregate/driver.rs after eliminations_to_journal_entries. * Tests cover Method A, Cartesian, multi-line skip, IC field surfacing, and per-entity + consolidated file emission. Validated end-to-end via 'datasynth-data group generate' on mini_nestle.yaml: 4 per-entity files, 368 elimination edges, 69,287 consolidated edges, 376 IC-pair edges, coverage 0.9583. vynfi-group-audit-enterprise-2000 regen is the next deliverable (cloud H100 wall-clock + 3 GB upload — code is ready). --- CHANGELOG.md | 64 +++ Cargo.lock | 41 +- Cargo.toml | 36 +- crates/datasynth-group/Cargo.toml | 4 + .../datasynth-group/src/aggregate/driver.rs | 18 + .../src/aggregate/je_network.rs | 513 ++++++++++++++++++ crates/datasynth-group/src/aggregate/mod.rs | 2 + crates/datasynth-group/tests/aggregate_e2e.rs | 89 +++ crates/datasynth-runtime/Cargo.toml | 1 + crates/datasynth-runtime/src/je_network.rs | 282 ++++++++++ crates/datasynth-runtime/src/lib.rs | 1 + crates/datasynth-runtime/src/output_writer.rs | 176 +----- 12 files changed, 1041 insertions(+), 186 deletions(-) create mode 100644 crates/datasynth-group/src/aggregate/je_network.rs create mode 100644 crates/datasynth-runtime/src/je_network.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d693c4a..10c59dd5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,70 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [5.10.0] - 2026-05-09 + +Group-audit-aware Method-A edge-list export — the v5.9.0 single-entity +Method-A network is now extended to the consolidated-group level so +the `vynfi-group-audit-enterprise-2000` archive (and any other +group-audit run) emits per-entity *and* consolidated `je_network` +artefacts. + +### Added — group-audit `je_network` export + +- **Per-entity** `entities/{code}/graphs/je_network.{csv,parquet}` — + the same Method-A edges the single-entity output_writer would + produce, extended with `ic_pair_id` + `ic_partner_entity` columns + so consumers can join the IC postings back into pairs. +- **Consolidated** `consolidated/je_network.{csv,parquet}` — every + entity's edges concatenated, plus the elimination JEs (flagged + `is_eliminated=true`), with `entity_code` as a partition column. +- Schema additions on the consolidated file: `entity_code`, + `ic_pair_id`, `ic_partner_entity`, `is_eliminated`, + `eliminates_ic_pair_id`. Per-entity adds `ic_pair_id` + + `ic_partner_entity` only. +- Both formats use Zstd-compressed parquet (~5× smaller than CSV) + matching the convention from the single-entity dataset. + +### Refactored — shared Method-A helper + +- New `datasynth_runtime::je_network::build_je_network_edges()` — + pure builder reused by `output_writer::write_je_network_csv` and + the new group emitter. Single-entity CSV output is byte-identical + to v5.9.0 (verified by the existing + `tests/je_network_export.rs` integration test). +- New struct `JeNetworkEdge` exposes the IC fields through the + existing edge model so single-entity runs that *do* have IC + postings (multi-company-in-one-shard configs) can still surface + them — though the single-entity CSV writer keeps the v5.8.0 + 13-column schema for backwards compatibility. + +### Validated against `mini_nestle.yaml` + +End-to-end smoke through `datasynth-data group generate` against +`configs/examples/group/mini_nestle.yaml` produced: +- 4 per-entity `je_network.{csv,parquet}` files + (NESTLE_SA / NESTLE_USA / NESTLE_DE / NESTLE_BR) +- 368 elimination edges +- 69,287 consolidated edges +- 376 IC-pair edges (matched seller + buyer sides) with + `ic_pair_id` populated +- Coverage 0.9583 (matched / planned IC pairs) + +`vynfi-group-audit-enterprise-2000` regeneration is the next +deliverable; this release ships the engine changes that make it +possible. + +### Implementation notes + +- Wired into `aggregate/driver.rs` immediately after + `eliminations_to_journal_entries` runs (step 7b) so elimination + edges can be flagged `is_eliminated=true` while the contributing + entity-tagged JEs are still in their pre-TB-rewrite form. +- `eliminates_ic_pair_id` is left empty on v5.10 elimination edges + — the synthetic JEs produced by the elimination factory don't + carry the source `IcPairId` on their headers. Plumbing it + through the elimination → JE conversion can land in v5.10.x. + ## [5.9.0] - 2026-05-08 Customer-feedback follow-up release on top of v5.8.0. Bundles a diff --git a/Cargo.lock b/Cargo.lock index d024bf87..c825fe37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1395,7 +1395,7 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" [[package]] name = "datasynth-audit-fsm" -version = "5.9.0" +version = "5.10.0" dependencies = [ "arrow", "chrono", @@ -1422,7 +1422,7 @@ dependencies = [ [[package]] name = "datasynth-audit-optimizer" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-audit-fsm", @@ -1437,7 +1437,7 @@ dependencies = [ [[package]] name = "datasynth-banking" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "csv", @@ -1457,7 +1457,7 @@ dependencies = [ [[package]] name = "datasynth-cli" -version = "5.9.0" +version = "5.10.0" dependencies = [ "anyhow", "assert_cmd", @@ -1496,7 +1496,7 @@ dependencies = [ [[package]] name = "datasynth-config" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-banking", @@ -1511,7 +1511,7 @@ dependencies = [ [[package]] name = "datasynth-core" -version = "5.9.0" +version = "5.10.0" dependencies = [ "candle-core", "candle-nn", @@ -1540,7 +1540,7 @@ dependencies = [ [[package]] name = "datasynth-eval" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-config", @@ -1565,7 +1565,7 @@ dependencies = [ [[package]] name = "datasynth-fingerprint" -version = "5.9.0" +version = "5.10.0" dependencies = [ "anyhow", "arrow", @@ -1594,7 +1594,7 @@ dependencies = [ [[package]] name = "datasynth-generators" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-config", @@ -1618,7 +1618,7 @@ dependencies = [ [[package]] name = "datasynth-graph" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-banking", @@ -1637,8 +1637,9 @@ dependencies = [ [[package]] name = "datasynth-group" -version = "5.9.0" +version = "5.10.0" dependencies = [ + "arrow", "assert_cmd", "blake3", "chrono", @@ -1651,6 +1652,7 @@ dependencies = [ "datasynth-standards", "datasynth-test-utils", "hex", + "parquet", "pretty_assertions", "rand 0.10.1", "rand_chacha 0.10.0", @@ -1660,15 +1662,17 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "smallvec", "strsim", "tempfile", "thiserror 2.0.18", "tracing", + "uuid", ] [[package]] name = "datasynth-ocpm" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-core", @@ -1684,7 +1688,7 @@ dependencies = [ [[package]] name = "datasynth-output" -version = "5.9.0" +version = "5.10.0" dependencies = [ "arrow", "chrono", @@ -1708,7 +1712,7 @@ dependencies = [ [[package]] name = "datasynth-runtime" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "crossbeam-channel", @@ -1738,6 +1742,7 @@ dependencies = [ "serde_json", "serde_yaml", "sha2 0.11.0", + "smallvec", "tempfile", "thiserror 2.0.18", "tokio", @@ -1747,7 +1752,7 @@ dependencies = [ [[package]] name = "datasynth-server" -version = "5.9.0" +version = "5.10.0" dependencies = [ "anyhow", "argon2", @@ -1797,7 +1802,7 @@ dependencies = [ [[package]] name = "datasynth-standards" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-core", @@ -1813,7 +1818,7 @@ dependencies = [ [[package]] name = "datasynth-test-utils" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "datasynth-banking", @@ -1836,7 +1841,7 @@ dependencies = [ [[package]] name = "datasynth-workspace" -version = "5.9.0" +version = "5.10.0" dependencies = [ "chrono", "criterion", diff --git a/Cargo.toml b/Cargo.toml index 8ddbe644..6680552d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ exclude = ["fuzz", "attic/datasynth-graph-export"] # Root package for workspace-level benchmarks [package] name = "datasynth-workspace" -version = "5.9.0" +version = "5.10.0" edition = "2021" publish = false @@ -44,7 +44,7 @@ tempfile = { workspace = true } serde_json = { workspace = true } [workspace.package] -version = "5.9.0" +version = "5.10.0" edition = "2021" license = "Apache-2.0" rust-version = "1.88" @@ -60,22 +60,22 @@ categories = ["simulation", "command-line-utilities"] # Internal crates - version required for crates.io publishing # Version must match workspace.package.version to prevent cargo from resolving # old incompatible versions during publish verification -datasynth-core = { version = "5.9.0", path = "crates/datasynth-core" } -datasynth-config = { version = "5.9.0", path = "crates/datasynth-config" } -datasynth-generators = { version = "5.9.0", path = "crates/datasynth-generators" } -datasynth-output = { version = "5.9.0", path = "crates/datasynth-output" } -datasynth-runtime = { version = "5.9.0", path = "crates/datasynth-runtime" } -datasynth-graph = { version = "5.9.0", path = "crates/datasynth-graph" } -datasynth-server = { version = "5.9.0", path = "crates/datasynth-server" } -datasynth-test-utils = { version = "5.9.0", path = "crates/datasynth-test-utils" } -datasynth-eval = { version = "5.9.0", path = "crates/datasynth-eval" } -datasynth-ocpm = { version = "5.9.0", path = "crates/datasynth-ocpm" } -datasynth-banking = { version = "5.9.0", path = "crates/datasynth-banking" } -datasynth-fingerprint = { version = "5.9.0", path = "crates/datasynth-fingerprint" } -datasynth-standards = { version = "5.9.0", path = "crates/datasynth-standards" } -datasynth-audit-fsm = { version = "5.9.0", path = "crates/datasynth-audit-fsm" } -datasynth-audit-optimizer = { version = "5.9.0", path = "crates/datasynth-audit-optimizer" } -datasynth-group = { version = "5.9.0", path = "crates/datasynth-group" } +datasynth-core = { version = "5.10.0", path = "crates/datasynth-core" } +datasynth-config = { version = "5.10.0", path = "crates/datasynth-config" } +datasynth-generators = { version = "5.10.0", path = "crates/datasynth-generators" } +datasynth-output = { version = "5.10.0", path = "crates/datasynth-output" } +datasynth-runtime = { version = "5.10.0", path = "crates/datasynth-runtime" } +datasynth-graph = { version = "5.10.0", path = "crates/datasynth-graph" } +datasynth-server = { version = "5.10.0", path = "crates/datasynth-server" } +datasynth-test-utils = { version = "5.10.0", path = "crates/datasynth-test-utils" } +datasynth-eval = { version = "5.10.0", path = "crates/datasynth-eval" } +datasynth-ocpm = { version = "5.10.0", path = "crates/datasynth-ocpm" } +datasynth-banking = { version = "5.10.0", path = "crates/datasynth-banking" } +datasynth-fingerprint = { version = "5.10.0", path = "crates/datasynth-fingerprint" } +datasynth-standards = { version = "5.10.0", path = "crates/datasynth-standards" } +datasynth-audit-fsm = { version = "5.10.0", path = "crates/datasynth-audit-fsm" } +datasynth-audit-optimizer = { version = "5.10.0", path = "crates/datasynth-audit-optimizer" } +datasynth-group = { version = "5.10.0", path = "crates/datasynth-group" } # Serialization serde = { version = "1.0", features = ["derive"] } diff --git a/crates/datasynth-group/Cargo.toml b/crates/datasynth-group/Cargo.toml index 5c1a7bee..f2497f6a 100644 --- a/crates/datasynth-group/Cargo.toml +++ b/crates/datasynth-group/Cargo.toml @@ -33,12 +33,16 @@ blake3 = { workspace = true } hex = { workspace = true } strsim = "0.11" rayon = { workspace = true } +arrow = { workspace = true } +parquet = { workspace = true } [dev-dependencies] datasynth-test-utils = { workspace = true } rust_decimal_macros = { workspace = true } pretty_assertions = "1" tempfile = { workspace = true } +smallvec = { workspace = true } +uuid = { workspace = true } # Subprocess-based determinism harness in `tests/determinism_in_process.rs` # (Task 11.4) drives the `datasynth-data` CLI binary as a subprocess. # `assert_cmd` is used for ergonomics; the binary itself is discovered diff --git a/crates/datasynth-group/src/aggregate/driver.rs b/crates/datasynth-group/src/aggregate/driver.rs index c1548d8b..67d124be 100644 --- a/crates/datasynth-group/src/aggregate/driver.rs +++ b/crates/datasynth-group/src/aggregate/driver.rs @@ -294,6 +294,24 @@ pub fn run_aggregate( // ── 7. Convert to elimination JEs (Task 5.5) ──────────────────────── let elim_jes = eliminations_to_journal_entries(&elim_result); + // ── 7b. v5.10 — emit per-entity + consolidated je_network artefacts. + // Hooks in here (after eliminations land but before TB consolidation + // rewrites contributing_jes) so we can mark elimination edges with + // is_eliminated=true while the original IC pair JEs are still in + // their entity-tagged form. + let je_network_summary = crate::aggregate::je_network::write_je_network_artefacts( + &contributing_jes, + &elim_jes, + out_dir, + )?; + tracing::info!( + "v5.10 je_network: {} per-entity files, {} elim edges, {} consolidated edges -> {:?}", + je_network_summary.per_entity_edge_count.len(), + je_network_summary.elim_edge_count, + je_network_summary.consolidated_edge_count, + je_network_summary.consolidated_csv_path, + ); + // ── 8. Apply eliminations to pre-elim TB (Task 5.6) ───────────────── let post_elim = apply_eliminations_to_tb(&pre_elim, &elim_jes)?; diff --git a/crates/datasynth-group/src/aggregate/je_network.rs b/crates/datasynth-group/src/aggregate/je_network.rs new file mode 100644 index 00000000..7679e3dd --- /dev/null +++ b/crates/datasynth-group/src/aggregate/je_network.rs @@ -0,0 +1,513 @@ +//! v5.10 — group-aware Method-A je_network export. +//! +//! Emits two artefacts during the aggregate phase: +//! +//! 1. **Per-entity** `entities/{code}/graphs/je_network.{csv,parquet}` +//! — the same Method-A edges the single-entity output_writer would +//! produce, extended with `ic_pair_id` + `ic_partner_entity` columns +//! so consumers can join the IC postings back into pairs. +//! +//! 2. **Consolidated** `consolidated/je_network.{csv,parquet}` — every +//! entity's edges concatenated, plus the elimination JEs (flagged +//! `is_eliminated=true`), with `entity_code` as a partition column. +//! +//! Method-A semantics — *one edge per 2-line journal entry, +//! confidence = 1.0* — are inherited verbatim from +//! [`datasynth_runtime::je_network::build_je_network_edges`]. IC pairs +//! naturally produce two edges (one seller-side, one buyer-side) that +//! share the same `ic_pair_id`; the consolidated file is therefore a +//! straight concat of the per-entity edges plus the eliminations. + +use std::fs::File; +use std::io::BufWriter; +use std::path::Path; +use std::sync::Arc; + +use arrow::array::{ArrayRef, BooleanArray, Float64Array, RecordBatch, StringArray}; +use arrow::datatypes::{DataType, Field, Schema}; +use datasynth_config::JeNetworkMethod; +use datasynth_core::models::JournalEntry; +use datasynth_runtime::je_network::{build_je_network_edges, JeNetworkEdge}; +use parquet::arrow::ArrowWriter; +use parquet::basic::{Compression, ZstdLevel}; +use parquet::file::properties::WriterProperties; +use rust_decimal::Decimal; + +use crate::errors::{GroupError, GroupResult}; + +/// Summary statistics returned by [`write_je_network_artefacts`]. +#[derive(Debug, Clone, Default)] +pub struct JeNetworkSummary { + pub per_entity_edge_count: Vec<(String, usize)>, + pub elim_edge_count: usize, + pub consolidated_edge_count: usize, + pub consolidated_csv_path: Option, + pub consolidated_parquet_path: Option, +} + +/// Per-entity CSV header (15 columns — single-entity v5.8 schema + 2 IC fields). +const ENTITY_CSV_HEADER: &str = "edge_id,document_id,posting_date,from_account,to_account,\ +from_line_id,to_line_id,amount,confidence,predecessor_edge_id,\ +business_process,is_fraud,is_anomaly,ic_pair_id,ic_partner_entity"; + +/// Consolidated CSV header (18 columns — entity-scoped + IC + elimination flags). +const CONSOLIDATED_CSV_HEADER: &str = "edge_id,document_id,entity_code,posting_date,\ +from_account,to_account,from_line_id,to_line_id,amount,confidence,\ +predecessor_edge_id,business_process,is_fraud,is_anomaly,\ +ic_pair_id,ic_partner_entity,is_eliminated,eliminates_ic_pair_id"; + +/// Emit per-entity + consolidated je_network artefacts. +/// +/// # Arguments +/// +/// * `contributing_jes` — `(entity_code, JEs)` pairs as walked by +/// `walk_entity_archives` in the aggregate driver. +/// * `elim_jes` — elimination JEs returned by +/// [`crate::aggregate::elimination::eliminations_to_journal_entries`]. +/// These appear only in the consolidated output with +/// `is_eliminated = true`. +/// * `out_dir` — group archive root. +pub fn write_je_network_artefacts( + contributing_jes: &[(String, Vec)], + elim_jes: &[JournalEntry], + out_dir: &Path, +) -> GroupResult { + let mut summary = JeNetworkSummary::default(); + + // Container for the consolidated concat. Each entry is + // (entity_code, edge, is_eliminated, eliminates_ic_pair_id). + let mut all_edges: Vec<(String, JeNetworkEdge, bool, Option)> = + Vec::with_capacity(contributing_jes.iter().map(|(_, j)| j.len()).sum::() * 2); + + // ── Per-entity emit ────────────────────────────────────────────────── + for (entity, jes) in contributing_jes { + let edges = build_je_network_edges(jes, JeNetworkMethod::A); + write_entity_csv(out_dir, entity, &edges)?; + write_entity_parquet(out_dir, entity, &edges)?; + summary + .per_entity_edge_count + .push((entity.clone(), edges.len())); + for e in edges { + all_edges.push((entity.clone(), e, false, None)); + } + } + + // ── Elimination edges ──────────────────────────────────────────────── + let elim_edges = build_je_network_edges(elim_jes, JeNetworkMethod::A); + summary.elim_edge_count = elim_edges.len(); + for (je, e) in elim_jes.iter().zip(elim_edges) { + // The elimination JE's company_code is set to "CONSOLIDATION" by + // the elimination factory (per v5.0 contract); use it as the + // entity code for the consolidated row. + let entity_code = je.header.company_code.clone(); + // No direct ic_pair_id link on the synthetic elim JE — the + // pair-level association lives on the EliminationEntry that + // produced the JE. v5.10 leaves this column empty; future + // releases can plumb it through if needed. + all_edges.push((entity_code, e, true, None)); + } + summary.consolidated_edge_count = all_edges.len(); + + // ── Consolidated emit ──────────────────────────────────────────────── + let consol_dir = out_dir.join("consolidated"); + std::fs::create_dir_all(&consol_dir).map_err(GroupError::Io)?; + + let csv_path = consol_dir.join("je_network.csv"); + write_consolidated_csv(&csv_path, &all_edges)?; + summary.consolidated_csv_path = Some(csv_path); + + let parquet_path = consol_dir.join("je_network.parquet"); + write_consolidated_parquet(&parquet_path, &all_edges)?; + summary.consolidated_parquet_path = Some(parquet_path); + + Ok(summary) +} + +// ─── CSV writers ───────────────────────────────────────────────────────────── + +fn csv_escape(s: &str) -> String { + if s.contains(',') || s.contains('"') || s.contains('\n') { + format!("\"{}\"", s.replace('"', "\"\"")) + } else { + s.to_string() + } +} + +fn write_entity_csv(out_dir: &Path, entity: &str, edges: &[JeNetworkEdge]) -> GroupResult<()> { + let dir = out_dir.join("entities").join(entity).join("graphs"); + std::fs::create_dir_all(&dir).map_err(GroupError::Io)?; + let path = dir.join("je_network.csv"); + let file = File::create(&path).map_err(GroupError::Io)?; + let mut w = BufWriter::with_capacity(256 * 1024, file); + use std::io::Write; + writeln!(w, "{ENTITY_CSV_HEADER}").map_err(GroupError::Io)?; + for e in edges { + write_entity_row(&mut w, e)?; + } + w.flush().map_err(GroupError::Io)?; + Ok(()) +} + +fn write_entity_row(w: &mut W, e: &JeNetworkEdge) -> GroupResult<()> { + writeln!( + w, + "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", + csv_escape(&e.edge_id), + csv_escape(&e.document_id.to_string()), + csv_escape(&e.posting_date.to_string()), + csv_escape(&e.from_account), + csv_escape(&e.to_account), + csv_escape(&e.from_line_id), + csv_escape(&e.to_line_id), + e.amount, + e.confidence, + csv_escape(&e.predecessor_edge_id), + csv_escape(&e.business_process), + e.is_fraud, + e.is_anomaly, + csv_escape(e.ic_pair_id.as_deref().unwrap_or("")), + csv_escape(e.ic_partner_entity.as_deref().unwrap_or("")), + ) + .map_err(GroupError::Io)?; + Ok(()) +} + +fn write_consolidated_csv( + path: &Path, + rows: &[(String, JeNetworkEdge, bool, Option)], +) -> GroupResult<()> { + let file = File::create(path).map_err(GroupError::Io)?; + let mut w = BufWriter::with_capacity(1024 * 1024, file); + use std::io::Write; + writeln!(w, "{CONSOLIDATED_CSV_HEADER}").map_err(GroupError::Io)?; + for (entity_code, e, is_elim, elim_pair) in rows { + writeln!( + w, + "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", + csv_escape(&e.edge_id), + csv_escape(&e.document_id.to_string()), + csv_escape(entity_code), + csv_escape(&e.posting_date.to_string()), + csv_escape(&e.from_account), + csv_escape(&e.to_account), + csv_escape(&e.from_line_id), + csv_escape(&e.to_line_id), + e.amount, + e.confidence, + csv_escape(&e.predecessor_edge_id), + csv_escape(&e.business_process), + e.is_fraud, + e.is_anomaly, + csv_escape(e.ic_pair_id.as_deref().unwrap_or("")), + csv_escape(e.ic_partner_entity.as_deref().unwrap_or("")), + is_elim, + csv_escape(elim_pair.as_deref().unwrap_or("")), + ) + .map_err(GroupError::Io)?; + } + w.flush().map_err(GroupError::Io)?; + Ok(()) +} + +// ─── Parquet writers ───────────────────────────────────────────────────────── + +fn entity_parquet_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("edge_id", DataType::Utf8, false), + Field::new("document_id", DataType::Utf8, false), + Field::new("posting_date", DataType::Utf8, false), + Field::new("from_account", DataType::Utf8, false), + Field::new("to_account", DataType::Utf8, false), + Field::new("from_line_id", DataType::Utf8, false), + Field::new("to_line_id", DataType::Utf8, false), + // Stored as Utf8 to preserve full Decimal precision (matches + // datasynth-output's parquet-sink convention for monetary + // fields). + Field::new("amount", DataType::Utf8, false), + Field::new("confidence", DataType::Float64, false), + Field::new("predecessor_edge_id", DataType::Utf8, false), + Field::new("business_process", DataType::Utf8, false), + Field::new("is_fraud", DataType::Boolean, false), + Field::new("is_anomaly", DataType::Boolean, false), + Field::new("ic_pair_id", DataType::Utf8, true), + Field::new("ic_partner_entity", DataType::Utf8, true), + ])) +} + +fn consolidated_parquet_schema() -> Arc { + Arc::new(Schema::new(vec![ + Field::new("edge_id", DataType::Utf8, false), + Field::new("document_id", DataType::Utf8, false), + Field::new("entity_code", DataType::Utf8, false), + Field::new("posting_date", DataType::Utf8, false), + Field::new("from_account", DataType::Utf8, false), + Field::new("to_account", DataType::Utf8, false), + Field::new("from_line_id", DataType::Utf8, false), + Field::new("to_line_id", DataType::Utf8, false), + Field::new("amount", DataType::Utf8, false), + Field::new("confidence", DataType::Float64, false), + Field::new("predecessor_edge_id", DataType::Utf8, false), + Field::new("business_process", DataType::Utf8, false), + Field::new("is_fraud", DataType::Boolean, false), + Field::new("is_anomaly", DataType::Boolean, false), + Field::new("ic_pair_id", DataType::Utf8, true), + Field::new("ic_partner_entity", DataType::Utf8, true), + Field::new("is_eliminated", DataType::Boolean, false), + Field::new("eliminates_ic_pair_id", DataType::Utf8, true), + ])) +} + +fn dec_to_str(d: Decimal) -> String { + d.to_string() +} + +fn write_entity_parquet(out_dir: &Path, entity: &str, edges: &[JeNetworkEdge]) -> GroupResult<()> { + let dir = out_dir.join("entities").join(entity).join("graphs"); + std::fs::create_dir_all(&dir).map_err(GroupError::Io)?; + let path = dir.join("je_network.parquet"); + let schema = entity_parquet_schema(); + + let edge_id: Vec<&str> = edges.iter().map(|e| e.edge_id.as_str()).collect(); + let document_id: Vec = edges.iter().map(|e| e.document_id.to_string()).collect(); + let posting_date: Vec = edges.iter().map(|e| e.posting_date.to_string()).collect(); + let from_account: Vec<&str> = edges.iter().map(|e| e.from_account.as_str()).collect(); + let to_account: Vec<&str> = edges.iter().map(|e| e.to_account.as_str()).collect(); + let from_line_id: Vec<&str> = edges.iter().map(|e| e.from_line_id.as_str()).collect(); + let to_line_id: Vec<&str> = edges.iter().map(|e| e.to_line_id.as_str()).collect(); + let amount: Vec = edges.iter().map(|e| dec_to_str(e.amount)).collect(); + let confidence: Vec = edges.iter().map(|e| e.confidence).collect(); + let predecessor: Vec<&str> = edges + .iter() + .map(|e| e.predecessor_edge_id.as_str()) + .collect(); + let bp: Vec<&str> = edges.iter().map(|e| e.business_process.as_str()).collect(); + let is_fraud: Vec = edges.iter().map(|e| e.is_fraud).collect(); + let is_anomaly: Vec = edges.iter().map(|e| e.is_anomaly).collect(); + let ic_pair: Vec> = edges.iter().map(|e| e.ic_pair_id.clone()).collect(); + let ic_partner: Vec> = + edges.iter().map(|e| e.ic_partner_entity.clone()).collect(); + + let columns: Vec = vec![ + Arc::new(StringArray::from(edge_id)), + Arc::new(StringArray::from(document_id)), + Arc::new(StringArray::from(posting_date)), + Arc::new(StringArray::from(from_account)), + Arc::new(StringArray::from(to_account)), + Arc::new(StringArray::from(from_line_id)), + Arc::new(StringArray::from(to_line_id)), + Arc::new(StringArray::from(amount)), + Arc::new(Float64Array::from(confidence)), + Arc::new(StringArray::from(predecessor)), + Arc::new(StringArray::from(bp)), + Arc::new(BooleanArray::from(is_fraud)), + Arc::new(BooleanArray::from(is_anomaly)), + Arc::new(StringArray::from(ic_pair)), + Arc::new(StringArray::from(ic_partner)), + ]; + + let batch = RecordBatch::try_new(Arc::clone(&schema), columns) + .map_err(|e| GroupError::Aggregate(format!("je_network entity arrow build: {e}")))?; + + write_parquet_batch(&path, &schema, &batch) +} + +fn write_consolidated_parquet( + path: &Path, + rows: &[(String, JeNetworkEdge, bool, Option)], +) -> GroupResult<()> { + let schema = consolidated_parquet_schema(); + + let edge_id: Vec<&str> = rows.iter().map(|(_, e, _, _)| e.edge_id.as_str()).collect(); + let document_id: Vec = rows + .iter() + .map(|(_, e, _, _)| e.document_id.to_string()) + .collect(); + let entity_code: Vec<&str> = rows.iter().map(|(c, _, _, _)| c.as_str()).collect(); + let posting_date: Vec = rows + .iter() + .map(|(_, e, _, _)| e.posting_date.to_string()) + .collect(); + let from_account: Vec<&str> = rows + .iter() + .map(|(_, e, _, _)| e.from_account.as_str()) + .collect(); + let to_account: Vec<&str> = rows + .iter() + .map(|(_, e, _, _)| e.to_account.as_str()) + .collect(); + let from_line_id: Vec<&str> = rows + .iter() + .map(|(_, e, _, _)| e.from_line_id.as_str()) + .collect(); + let to_line_id: Vec<&str> = rows + .iter() + .map(|(_, e, _, _)| e.to_line_id.as_str()) + .collect(); + let amount: Vec = rows + .iter() + .map(|(_, e, _, _)| dec_to_str(e.amount)) + .collect(); + let confidence: Vec = rows.iter().map(|(_, e, _, _)| e.confidence).collect(); + let predecessor: Vec<&str> = rows + .iter() + .map(|(_, e, _, _)| e.predecessor_edge_id.as_str()) + .collect(); + let bp: Vec<&str> = rows + .iter() + .map(|(_, e, _, _)| e.business_process.as_str()) + .collect(); + let is_fraud: Vec = rows.iter().map(|(_, e, _, _)| e.is_fraud).collect(); + let is_anomaly: Vec = rows.iter().map(|(_, e, _, _)| e.is_anomaly).collect(); + let ic_pair: Vec> = rows + .iter() + .map(|(_, e, _, _)| e.ic_pair_id.clone()) + .collect(); + let ic_partner: Vec> = rows + .iter() + .map(|(_, e, _, _)| e.ic_partner_entity.clone()) + .collect(); + let is_eliminated: Vec = rows.iter().map(|(_, _, b, _)| *b).collect(); + let eliminates_pair: Vec> = rows.iter().map(|(_, _, _, p)| p.clone()).collect(); + + let columns: Vec = vec![ + Arc::new(StringArray::from(edge_id)), + Arc::new(StringArray::from(document_id)), + Arc::new(StringArray::from(entity_code)), + Arc::new(StringArray::from(posting_date)), + Arc::new(StringArray::from(from_account)), + Arc::new(StringArray::from(to_account)), + Arc::new(StringArray::from(from_line_id)), + Arc::new(StringArray::from(to_line_id)), + Arc::new(StringArray::from(amount)), + Arc::new(Float64Array::from(confidence)), + Arc::new(StringArray::from(predecessor)), + Arc::new(StringArray::from(bp)), + Arc::new(BooleanArray::from(is_fraud)), + Arc::new(BooleanArray::from(is_anomaly)), + Arc::new(StringArray::from(ic_pair)), + Arc::new(StringArray::from(ic_partner)), + Arc::new(BooleanArray::from(is_eliminated)), + Arc::new(StringArray::from(eliminates_pair)), + ]; + + let batch = RecordBatch::try_new(Arc::clone(&schema), columns) + .map_err(|e| GroupError::Aggregate(format!("je_network consolidated arrow build: {e}")))?; + + write_parquet_batch(path, &schema, &batch) +} + +fn write_parquet_batch(path: &Path, schema: &Arc, batch: &RecordBatch) -> GroupResult<()> { + let file = File::create(path).map_err(GroupError::Io)?; + let props = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .build(); + let mut writer = ArrowWriter::try_new(file, Arc::clone(schema), Some(props)) + .map_err(|e| GroupError::Aggregate(format!("parquet writer init: {e}")))?; + writer + .write(batch) + .map_err(|e| GroupError::Aggregate(format!("parquet write: {e}")))?; + writer + .close() + .map_err(|e| GroupError::Aggregate(format!("parquet close: {e}")))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::NaiveDate; + use datasynth_core::models::{ + BusinessProcess, JournalEntry, JournalEntryHeader, JournalEntryLine, + }; + use rust_decimal::Decimal; + use uuid::Uuid; + + fn header_for(doc: Uuid, company: &str) -> JournalEntryHeader { + let mut h = JournalEntryHeader::new( + company.to_string(), + NaiveDate::from_ymd_opt(2024, 6, 15).unwrap(), + ); + h.document_id = doc; + h.business_process = Some(BusinessProcess::P2P); + h + } + + fn line(doc: Uuid, n: u32, account: &str, debit: i64, credit: i64) -> JournalEntryLine { + JournalEntryLine { + document_id: doc, + line_number: n, + gl_account: account.into(), + debit_amount: Decimal::from(debit), + credit_amount: Decimal::from(credit), + ..Default::default() + } + } + + fn two_line_je(company: &str, debit_acc: &str, credit_acc: &str, amt: i64) -> JournalEntry { + let doc = Uuid::new_v4(); + let header = header_for(doc, company); + let lines = smallvec::smallvec![ + line(doc, 1, debit_acc, amt, 0), + line(doc, 2, credit_acc, 0, amt), + ]; + JournalEntry { header, lines } + } + + #[test] + fn writes_per_entity_and_consolidated_files() { + let tmp = tempfile::tempdir().unwrap(); + let contributing = vec![ + ( + "ACME_HQ".to_string(), + vec![ + two_line_je("ACME_HQ", "1000", "2000", 1000), + two_line_je("ACME_HQ", "1000", "4000", 5000), + ], + ), + ( + "ACME_EUR".to_string(), + vec![two_line_je("ACME_EUR", "1100", "4500", 2500)], + ), + ]; + let elim_jes: Vec = vec![two_line_je("CONSOLIDATION", "4500", "1100", 2500)]; + + let summary = write_je_network_artefacts(&contributing, &elim_jes, tmp.path()).unwrap(); + + assert_eq!(summary.per_entity_edge_count.len(), 2); + assert_eq!(summary.elim_edge_count, 1); + assert_eq!(summary.consolidated_edge_count, 4); + + // Per-entity files exist + for entity in ["ACME_HQ", "ACME_EUR"] { + let csv = tmp + .path() + .join("entities") + .join(entity) + .join("graphs") + .join("je_network.csv"); + let pq = tmp + .path() + .join("entities") + .join(entity) + .join("graphs") + .join("je_network.parquet"); + assert!(csv.exists(), "missing {csv:?}"); + assert!(pq.exists(), "missing {pq:?}"); + } + + // Consolidated files exist + let consol_csv = tmp.path().join("consolidated").join("je_network.csv"); + let consol_pq = tmp.path().join("consolidated").join("je_network.parquet"); + assert!(consol_csv.exists()); + assert!(consol_pq.exists()); + + let body = std::fs::read_to_string(&consol_csv).unwrap(); + let lines: Vec<&str> = body.lines().collect(); + assert_eq!(lines.len(), 1 + 4, "header + 4 edges (3 entity + 1 elim)"); + assert!(lines[0].starts_with("edge_id,document_id,entity_code,")); + assert!( + lines.iter().any(|l| l.contains(",true,")), + "at least one row should have is_eliminated=true" + ); + } +} diff --git a/crates/datasynth-group/src/aggregate/mod.rs b/crates/datasynth-group/src/aggregate/mod.rs index 13726258..1791732b 100644 --- a/crates/datasynth-group/src/aggregate/mod.rs +++ b/crates/datasynth-group/src/aggregate/mod.rs @@ -26,6 +26,7 @@ pub mod elimination; pub mod equity_method; pub mod fs; pub mod ic_matcher; +pub mod je_network; pub mod nci; pub mod opening_balance; pub mod post_elim; @@ -65,6 +66,7 @@ pub use fs::{ pub use ic_matcher::{ match_ic_pairs, IcMatchResult, IcMatchedPair, UnmatchedReason, UnmatchedSide, }; +pub use je_network::{write_je_network_artefacts, JeNetworkSummary}; pub use nci::{ compute_nci_rollforward, ingest_opening_nci_balances, write_nci_rollforward, NciInputs, NciRollforward, NCI_ROLLFORWARD_FILENAME, diff --git a/crates/datasynth-group/tests/aggregate_e2e.rs b/crates/datasynth-group/tests/aggregate_e2e.rs index 85894d90..c2e168ab 100644 --- a/crates/datasynth-group/tests/aggregate_e2e.rs +++ b/crates/datasynth-group/tests/aggregate_e2e.rs @@ -371,6 +371,95 @@ fn run_aggregate_writes_full_artifact_set_in_emission_order() { summary.total_nci, diff, ); + + // ── v5.10 je_network artefacts ───────────────────────────────── + // Per-entity files + for entity in ["NESTLE_SA", "NESTLE_USA"] { + let entity_csv = root + .join("entities") + .join(entity) + .join("graphs") + .join("je_network.csv"); + let entity_pq = root + .join("entities") + .join(entity) + .join("graphs") + .join("je_network.parquet"); + assert!( + entity_csv.exists(), + "missing per-entity je_network.csv: {:?}", + entity_csv + ); + assert!( + entity_pq.exists(), + "missing per-entity je_network.parquet: {:?}", + entity_pq + ); + + // Header sanity: 15 columns, ic_pair_id + ic_partner_entity present + let body = std::fs::read_to_string(&entity_csv).expect("read entity csv"); + let header = body.lines().next().expect("non-empty"); + assert!( + header.contains("ic_pair_id") && header.contains("ic_partner_entity"), + "entity csv header missing v5.10 IC columns: {header}", + ); + let line_count = body.lines().count(); + assert!( + line_count >= 2, + "expect at least header + 1 row; got {line_count}" + ); + } + + // Consolidated file + let consol_csv = root.join("consolidated").join("je_network.csv"); + let consol_pq = root.join("consolidated").join("je_network.parquet"); + assert!(consol_csv.exists(), "missing consolidated je_network.csv"); + assert!( + consol_pq.exists(), + "missing consolidated je_network.parquet" + ); + + let consol_body = std::fs::read_to_string(&consol_csv).expect("read consolidated csv"); + let consol_header = consol_body.lines().next().expect("non-empty"); + for col in [ + "edge_id", + "entity_code", + "ic_pair_id", + "ic_partner_entity", + "is_eliminated", + "eliminates_ic_pair_id", + ] { + assert!( + consol_header.contains(col), + "consolidated header missing column `{col}`: {consol_header}", + ); + } + + // At least one row should have is_eliminated=true (Mini-Nestlé has + // matched IC pairs that produce eliminations). + let has_elim = consol_body.lines().skip(1).any(|l| { + // find `,true,` in the right column position is fragile; just + // require *any* row to contain the literal `,true,` substring + // — the only true booleans are is_fraud / is_anomaly / + // is_eliminated and the test fixture sets neither fraud nor + // anomaly, so any `,true,` must be is_eliminated. + l.contains(",true,") + }); + assert!( + has_elim, + "expected at least one is_eliminated=true row in consolidated csv" + ); + + // At least one row should carry an IC pair id (matched seller/buyer). + let has_ic = consol_body.lines().skip(1).any(|l| { + l.split(',') + .nth(14) + .is_some_and(|c| !c.is_empty() && c != "\"\"") + }); + assert!( + has_ic, + "expected at least one row with non-empty ic_pair_id in consolidated csv" + ); } #[test] diff --git a/crates/datasynth-runtime/Cargo.toml b/crates/datasynth-runtime/Cargo.toml index bf4d7602..cfed9f28 100644 --- a/crates/datasynth-runtime/Cargo.toml +++ b/crates/datasynth-runtime/Cargo.toml @@ -56,3 +56,4 @@ reqwest = { workspace = true, optional = true } proptest = { workspace = true } datasynth-test-utils = { workspace = true } tempfile = { workspace = true } +smallvec = { workspace = true } diff --git a/crates/datasynth-runtime/src/je_network.rs b/crates/datasynth-runtime/src/je_network.rs new file mode 100644 index 00000000..56bbf72b --- /dev/null +++ b/crates/datasynth-runtime/src/je_network.rs @@ -0,0 +1,282 @@ +//! Shared Method-A / Method-B/C edge-list builder for the JE network export. +//! +//! v5.10: extracted from [`output_writer::write_je_network_csv`] so the +//! same logic can be reused by the `datasynth-group` aggregate emitter, +//! which builds both per-entity and consolidated edge lists from many +//! per-entity JE batches. +//! +//! The builder is pure — no I/O — and returns a `Vec`. +//! Single-entity callers feed the result into the existing CSV writer +//! (preserving v5.8.0 byte-identical output); group callers serialise +//! the same struct with extra contextual columns (entity_code, +//! is_eliminated, eliminates_ic_pair_id) added at write time. + +use chrono::NaiveDate; +use rust_decimal::Decimal; +use std::collections::HashMap; +use uuid::Uuid; + +use datasynth_config::JeNetworkMethod; +use datasynth_core::models::JournalEntry; + +/// One row of the je_network output. +/// +/// The 13 columns matching v5.8.0 CSV output are at the top; v5.10 +/// added the optional IC fields (Some only on group-context inputs). +#[derive(Debug, Clone)] +pub struct JeNetworkEdge { + pub edge_id: String, + pub document_id: Uuid, + pub posting_date: NaiveDate, + pub from_account: String, + pub to_account: String, + pub from_line_id: String, + pub to_line_id: String, + pub amount: Decimal, + pub confidence: f64, + pub predecessor_edge_id: String, + pub business_process: String, + pub is_fraud: bool, + pub is_anomaly: bool, + /// v5.10 — surfaces `JournalEntryHeader::ic_pair_id` when present. + /// `None` for non-IC postings (and on every edge from a single-entity run). + pub ic_pair_id: Option, + /// v5.10 — surfaces `JournalEntryHeader::ic_partner_entity` when present. + pub ic_partner_entity: Option, +} + +/// Build the Method-A / Method-B/C edge list for a batch of JEs. +/// +/// JEs are processed in the iteration order of the slice; predecessor +/// edge ids are resolved against earlier JEs in the same batch (so the +/// caller must pass JEs that share predecessor chains in the same call). +/// +/// Method semantics — see Ivertowski et al. (2024) Methods A through E: +/// * `JeNetworkMethod::A` — bijective on 2-line entries only; multi-line +/// JEs are skipped. Confidence on every emitted edge = `1.0`. +/// * `JeNetworkMethod::Cartesian` — full Cartesian debit × credit product +/// with proportional amount allocation; confidence = `1 / (n × m)`. +pub fn build_je_network_edges(jes: &[JournalEntry], method: JeNetworkMethod) -> Vec { + let mut edges = Vec::with_capacity(jes.len() * 2); + let mut line_id_to_edge_id: HashMap = HashMap::with_capacity(jes.len() * 2); + + for je in jes { + let h = &je.header; + + let line_ids: Vec = je + .lines + .iter() + .map(|l| { + l.transaction_id.clone().unwrap_or_else(|| { + datasynth_core::models::JournalEntryLine::derive_transaction_id( + l.document_id, + l.line_number, + ) + }) + }) + .collect(); + + let debits: Vec = je + .lines + .iter() + .enumerate() + .filter(|(_, l)| l.debit_amount > Decimal::ZERO) + .map(|(i, _)| i) + .collect(); + let credits: Vec = je + .lines + .iter() + .enumerate() + .filter(|(_, l)| l.credit_amount > Decimal::ZERO) + .map(|(i, _)| i) + .collect(); + if debits.is_empty() || credits.is_empty() { + continue; + } + + if method == JeNetworkMethod::A && !(debits.len() == 1 && credits.len() == 1) { + continue; + } + + let total_debit: Decimal = debits.iter().map(|i| je.lines[*i].debit_amount).sum(); + let total_credit: Decimal = credits.iter().map(|i| je.lines[*i].credit_amount).sum(); + if total_debit.is_zero() || total_credit.is_zero() { + continue; + } + + let confidence: f64 = if debits.len() == 1 && credits.len() == 1 { + 1.0 + } else { + 1.0 / (debits.len() * credits.len()) as f64 + }; + + let bp = h + .business_process + .map(|bp| format!("{bp:?}")) + .unwrap_or_default(); + let ic_pair_id_str = h.ic_pair_id.as_ref().map(|id| id.to_string()); + let ic_partner = h.ic_partner_entity.clone(); + + for &di in &debits { + let debit_line = &je.lines[di]; + let to_line_id = &line_ids[di]; + for &ci in &credits { + let credit_line = &je.lines[ci]; + let from_line_id = &line_ids[ci]; + + // Edge id = UUID v5 of (document_id, debit.line_number, + // credit.line_number). Stable across regenerations. + let mut input = Vec::with_capacity(16 + 8); + input.extend_from_slice(h.document_id.as_bytes()); + input.extend_from_slice(&debit_line.line_number.to_le_bytes()); + input.extend_from_slice(&credit_line.line_number.to_le_bytes()); + let edge_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, &input).to_string(); + + // Proportional allocation matches + // TransactionGraphBuilder::add_journal_entry_debit_credit. + let proportion = (debit_line.debit_amount / total_debit) + * (credit_line.credit_amount / total_credit); + let amount = debit_line.debit_amount * proportion; + + let predecessor_edge_id: String = credit_line + .predecessor_line_id + .as_ref() + .or(debit_line.predecessor_line_id.as_ref()) + .and_then(|tx_id| line_id_to_edge_id.get(tx_id).cloned()) + .unwrap_or_default(); + + edges.push(JeNetworkEdge { + edge_id: edge_id.clone(), + document_id: h.document_id, + posting_date: h.posting_date, + from_account: credit_line.gl_account.clone(), + to_account: debit_line.gl_account.clone(), + from_line_id: from_line_id.clone(), + to_line_id: to_line_id.clone(), + amount, + confidence, + predecessor_edge_id, + business_process: bp.clone(), + is_fraud: h.is_fraud, + is_anomaly: h.is_anomaly, + ic_pair_id: ic_pair_id_str.clone(), + ic_partner_entity: ic_partner.clone(), + }); + + line_id_to_edge_id + .entry(from_line_id.clone()) + .or_insert(edge_id); + } + } + } + + edges +} + +#[cfg(test)] +mod tests { + use super::*; + use datasynth_core::models::{JournalEntry, JournalEntryHeader, JournalEntryLine}; + + fn dec(v: i64) -> Decimal { + Decimal::from(v) + } + + fn header_for(doc: Uuid) -> JournalEntryHeader { + let mut h = JournalEntryHeader::new( + "C001".to_string(), + NaiveDate::from_ymd_opt(2026, 5, 9).expect("2026-05-09 is a valid date"), + ); + h.document_id = doc; + h + } + + fn make_line(doc: Uuid, n: u32, account: &str, debit: i64, credit: i64) -> JournalEntryLine { + JournalEntryLine { + document_id: doc, + line_number: n, + gl_account: account.into(), + debit_amount: dec(debit), + credit_amount: dec(credit), + ..Default::default() + } + } + + fn make_two_line_je(debit_account: &str, credit_account: &str, amount: i64) -> JournalEntry { + let document_id = Uuid::new_v4(); + let header = header_for(document_id); + let lines = smallvec::smallvec![ + make_line(document_id, 1, debit_account, amount, 0), + make_line(document_id, 2, credit_account, 0, amount), + ]; + JournalEntry { header, lines } + } + + #[test] + fn method_a_emits_one_edge_per_two_line_je() { + let jes = vec![ + make_two_line_je("1000", "2000", 1_000), + make_two_line_je("1000", "4000", 5_000), + ]; + let edges = build_je_network_edges(&jes, JeNetworkMethod::A); + assert_eq!(edges.len(), 2, "one edge per 2-line JE"); + for e in &edges { + assert_eq!(e.confidence, 1.0, "Method A confidence is exactly 1.0"); + assert!(e.ic_pair_id.is_none()); + assert!(e.ic_partner_entity.is_none()); + } + } + + #[test] + fn method_a_skips_multi_line_jes() { + let document_id = Uuid::new_v4(); + let header = header_for(document_id); + let lines = smallvec::smallvec![ + make_line(document_id, 1, "1000", 1_000, 0), + make_line(document_id, 2, "1010", 500, 0), + make_line(document_id, 3, "2000", 0, 1_500), + ]; + let je = JournalEntry { header, lines }; + let edges = build_je_network_edges(&[je], JeNetworkMethod::A); + assert_eq!(edges.len(), 0, "3-line JE skipped under Method A"); + } + + #[test] + fn cartesian_emits_n_times_m_edges_per_je() { + let document_id = Uuid::new_v4(); + let header = header_for(document_id); + let lines = smallvec::smallvec![ + make_line(document_id, 1, "D1", 100, 0), + make_line(document_id, 2, "D2", 50, 0), + make_line(document_id, 3, "C1", 0, 80), + make_line(document_id, 4, "C2", 0, 70), + ]; + let je = JournalEntry { header, lines }; + let edges = build_je_network_edges(&[je], JeNetworkMethod::Cartesian); + assert_eq!( + edges.len(), + 4, + "2 debits × 2 credits = 4 edges under Cartesian" + ); + for e in &edges { + assert!((e.confidence - 0.25).abs() < 1e-9, "1/(n*m) = 0.25"); + } + } + + #[test] + fn ic_fields_surface_when_present_on_header() { + let document_id = Uuid::new_v4(); + let mut header = header_for(document_id); + header.ic_partner_entity = Some("ACME_EUR".to_string()); + + let lines = smallvec::smallvec![ + make_line(document_id, 1, "1150", 1000, 0), + make_line(document_id, 2, "4500", 0, 1000), + ]; + let je = JournalEntry { header, lines }; + let edges = build_je_network_edges(&[je], JeNetworkMethod::A); + assert_eq!(edges.len(), 1); + assert_eq!(edges[0].ic_partner_entity, Some("ACME_EUR".to_string())); + // ic_pair_id requires construction via IcPairId — covered in group tests. + } +} diff --git a/crates/datasynth-runtime/src/lib.rs b/crates/datasynth-runtime/src/lib.rs index 8a8a3ae9..ec7ba7ce 100644 --- a/crates/datasynth-runtime/src/lib.rs +++ b/crates/datasynth-runtime/src/lib.rs @@ -25,6 +25,7 @@ pub mod config_mutator; pub mod enhanced_orchestrator; pub mod generation_session; pub mod intervention_manager; +pub mod je_network; pub mod label_export; pub mod lineage; pub mod output_writer; diff --git a/crates/datasynth-runtime/src/output_writer.rs b/crates/datasynth-runtime/src/output_writer.rs index e2d1ef6b..ad0ea2c3 100644 --- a/crates/datasynth-runtime/src/output_writer.rs +++ b/crates/datasynth-runtime/src/output_writer.rs @@ -252,6 +252,11 @@ fn write_journal_entries_csv( /// Ivertowski et al. (2024); for larger JEs it is a Method-B/C /// approximation with proportional amount allocation. /// +/// v5.10: edge construction has been extracted into +/// [`crate::je_network::build_je_network_edges`] so the same logic is +/// reused by the `datasynth-group` aggregate emitter. The CSV format +/// is unchanged — this writer keeps the v5.8.0 13-column schema. +/// /// Joins back to `journal_entries.csv` via: /// - `document_id` → JE-level header /// - `from_line_id` / `to_line_id` → per-line `transaction_id` @@ -261,8 +266,6 @@ fn write_je_network_csv( output_dir: &Path, method: datasynth_config::JeNetworkMethod, ) -> Result<(), Box> { - use rust_decimal::Decimal; - if result.journal_entries.is_empty() { return Ok(()); } @@ -279,159 +282,32 @@ fn write_je_network_csv( predecessor_edge_id,business_process,is_fraud,is_anomaly" )?; - // Build a map line_transaction_id → edge_id of the FIRST out-going - // edge from that line so predecessor_edge_id can be resolved without - // a second pass: if a curr-line's predecessor_line_id points at - // some prev-line's transaction_id, the predecessor edge is the - // first edge in that prev JE that has this line as its credit - // ("from") side. This reasonably maps each line to one canonical - // outgoing edge. - // - // We populate the map during the first pass (write the rows) and - // resolve predecessor_edge_id with the existing entry — JEs are - // emitted in chain order, so the predecessor will already be in - // the map by the time we look it up. - let mut line_id_to_edge_id: std::collections::HashMap = - std::collections::HashMap::with_capacity(result.journal_entries.len() * 2); - - let mut total_edges: usize = 0; - - for je in &result.journal_entries { - let h = &je.header; - // Pre-compute transaction_ids for every line (so we can pair - // them without re-deriving inside the inner loop). - let line_ids: Vec = je - .lines - .iter() - .map(|l| { - l.transaction_id.clone().unwrap_or_else(|| { - datasynth_core::models::JournalEntryLine::derive_transaction_id( - l.document_id, - l.line_number, - ) - }) - }) - .collect(); - - let debits: Vec = je - .lines - .iter() - .enumerate() - .filter(|(_, l)| l.debit_amount > Decimal::ZERO) - .map(|(i, _)| i) - .collect(); - let credits: Vec = je - .lines - .iter() - .enumerate() - .filter(|(_, l)| l.credit_amount > Decimal::ZERO) - .map(|(i, _)| i) - .collect(); - if debits.is_empty() || credits.is_empty() { - continue; - } - - // Method A: bijective on 2-line entries only. Multi-line JEs - // are skipped under this method — see Ivertowski (2024) - // Methods A through E. The full Cartesian product of a - // multi-line consolidation produces O(n × m) edges per JE, - // which dominates total dataset size at scale; users who need - // the multi-line edges should set - // `graph_export.je_network.method: cartesian` (the default). - if method == datasynth_config::JeNetworkMethod::A - && !(debits.len() == 1 && credits.len() == 1) - { - continue; - } - - let total_debit: Decimal = debits.iter().map(|i| je.lines[*i].debit_amount).sum(); - let total_credit: Decimal = credits.iter().map(|i| je.lines[*i].credit_amount).sum(); - if total_debit.is_zero() || total_credit.is_zero() { - continue; - } - - // Confidence per the paper: bijective on 2-line entries - // (Method A), 1/(n*m) approximation otherwise. - let confidence: f64 = if debits.len() == 1 && credits.len() == 1 { - 1.0 - } else { - 1.0 / (debits.len() * credits.len()) as f64 - }; - - let bp = h - .business_process - .map(|bp| format!("{bp:?}")) - .unwrap_or_default(); - let posting_date = h.posting_date.to_string(); - let doc_id = h.document_id.to_string(); - - for &di in &debits { - let debit_line = &je.lines[di]; - let to_line_id = &line_ids[di]; - for &ci in &credits { - let credit_line = &je.lines[ci]; - let from_line_id = &line_ids[ci]; - - // Edge id = UUID v5 of (document_id, debit.line_number, - // credit.line_number). Stable across regenerations. - let mut input = Vec::with_capacity(16 + 8); - input.extend_from_slice(h.document_id.as_bytes()); - input.extend_from_slice(&debit_line.line_number.to_le_bytes()); - input.extend_from_slice(&credit_line.line_number.to_le_bytes()); - let edge_id = uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, &input).to_string(); - - // Proportional allocation of the flow's amount (matches - // the existing TransactionGraphBuilder::add_journal_entry - // _debit_credit formula). - let proportion = (debit_line.debit_amount / total_debit) - * (credit_line.credit_amount / total_credit); - let amount = debit_line.debit_amount * proportion; - - // Resolve predecessor: the predecessor_line_id from the - // CREDIT side (it represents the "from" of the next - // edge); fall back to the debit side. Either points at - // a transaction_id whose first outgoing edge id is the - // predecessor edge. - let predecessor_edge_id: String = credit_line - .predecessor_line_id - .as_ref() - .or(debit_line.predecessor_line_id.as_ref()) - .and_then(|tx_id| line_id_to_edge_id.get(tx_id).cloned()) - .unwrap_or_default(); - - writeln!( - w, - "{},{},{},{},{},{},{},{},{},{},{},{},{}", - csv_escape(&edge_id), - csv_escape(&doc_id), - csv_escape(&posting_date), - csv_escape(&credit_line.gl_account), - csv_escape(&debit_line.gl_account), - csv_escape(from_line_id), - csv_escape(to_line_id), - amount, - confidence, - csv_escape(&predecessor_edge_id), - csv_escape(&bp), - h.is_fraud, - h.is_anomaly, - )?; - - // Map the credit line ("from" end) to its first edge — - // this is the canonical outgoing edge subsequent JEs - // can refer to as their predecessor. - line_id_to_edge_id - .entry(from_line_id.clone()) - .or_insert_with(|| edge_id.clone()); - total_edges += 1; - } - } + let edges = crate::je_network::build_je_network_edges(&result.journal_entries, method); + + for e in &edges { + writeln!( + w, + "{},{},{},{},{},{},{},{},{},{},{},{},{}", + csv_escape(&e.edge_id), + csv_escape(&e.document_id.to_string()), + csv_escape(&e.posting_date.to_string()), + csv_escape(&e.from_account), + csv_escape(&e.to_account), + csv_escape(&e.from_line_id), + csv_escape(&e.to_line_id), + e.amount, + e.confidence, + csv_escape(&e.predecessor_edge_id), + csv_escape(&e.business_process), + e.is_fraud, + e.is_anomaly, + )?; } w.flush()?; info!( " JE network CSV written: {} edges from {} entries -> {}", - total_edges, + edges.len(), result.journal_entries.len(), path.display() );