Skip to content

Latest commit

 

History

History
448 lines (351 loc) · 11.8 KB

File metadata and controls

448 lines (351 loc) · 11.8 KB

VynFi Python SDK

The official Python client for the VynFi synthetic financial data API. Generate realistic financial datasets -- journal entries, chart of accounts, document flows, banking/AML data, ESG metrics, and more -- for audit analytics, fraud detection, compliance testing, and ML training.

PyPI version Python License

Installation

pip install vynfi

With optional integrations:

pip install vynfi[pandas]     # pandas DataFrame support
pip install vynfi[polars]     # polars DataFrame support
pip install vynfi[all]        # all integrations

Quick Start

from vynfi import VynFi

client = VynFi(api_key="vf_live_...")

# Generate synthetic financial data
job = client.generate(
    tables=[{"name": "journal_entries", "rows": 5000}],
    sector_slug="retail",
)
print(f"Job {job.id} submitted ({job.credits_reserved} credits)")

# Wait for completion (built-in polling)
completed = client.jobs.wait(job.id)

# Download and explore the archive
archive = client.jobs.download_archive(completed.id)
print(archive)  # JobArchive(84 files, 1.5 GB)

# Access specific files
entries = archive.json("journal_entries.json")
print(f"{len(entries)} journal entry documents")

# Or download raw bytes
data = client.jobs.download(completed.id)

See the examples/ directory for 7 Jupyter notebooks and 7 standalone scripts covering audit analytics, fraud detection, document flows, process mining, ESG reporting, and AML compliance testing.

Resources

Catalog & Templates

# Browse available sectors
sectors = client.catalog.list_sectors()
for s in sectors:
    print(f"{s.name}: {s.table_count} tables (quality={s.quality_score})")

# Get sector detail with table schemas
sector = client.catalog.get_sector("retail")
for table in sector.tables:
    print(f"  {table.name}: {len(table.columns)} columns")

# Browse system templates
templates = client.catalog.list_templates(sector="retail")
for t in templates:
    print(f"  {t.name} ({t.framework}, tier={t.min_tier})")

Jobs

# Async generation (large datasets)
job = client.jobs.generate(
    tables=[{"name": "journal_entries", "rows": 50000}],
    sector_slug="retail",
)
completed = client.jobs.wait(job.id)

# Quick synchronous generation (up to 10k rows)
result = client.jobs.generate_quick(
    tables=[{"name": "journal_entries", "rows": 100}],
    sector_slug="retail",
)

# Config-based generation
job = client.jobs.generate_config(
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
)

# List and filter jobs
jobs = client.jobs.list(status="completed", limit=10)

# Download specific artifacts
data = client.jobs.download_file(job_id, "journal_entries.json")

# Stream progress via SSE
for event in client.jobs.stream(job.id):
    if event["event"] == "progress":
        print(f"{event['data']['percent']}%")

Saved Configs

# Save a generation config for reuse
cfg = client.configs.create(
    name="Monthly Retail",
    config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
    tags=["recurring", "retail"],
)

# Validate before running
result = client.configs.validate(config={"sector": "retail", "rows": 100})
print(f"Valid: {result.valid}, errors: {len(result.errors)}")

# Estimate cost before running
est = client.configs.estimate_cost(config={"sector": "retail", "rows": 50000})
print(f"Estimated: {est.total_credits} credits")

Multi-Period Sessions

# Create a fiscal-year session
session = client.sessions.create(
    name="FY2026",
    fiscal_year_start="2026-01-01",
    period_length_months=3,
    periods=4,
    generation_config={"sector": "retail", "rows": 10000},
)

# Generate each period sequentially
for _ in range(session.periods_total):
    resp = client.sessions.generate_next(session.id)
    print(f"Period {resp.period_index}: job {resp.job_id}")

What-If Scenarios

# List causal graph templates
templates = client.scenarios.templates()

# Create a scenario
scenario = client.scenarios.create(
    name="Fraud Spike",
    template_id="supply-chain",
    interventions={"fraudRate": 0.05},
    generation_config={"sector": "retail", "rows": 10000},
)

# Run baseline vs counterfactual
scenario = client.scenarios.run(scenario.id)

# Get diff analysis
scenario = client.scenarios.diff(scenario.id)

Job Archives

# Download the output archive with easy file access
archive = client.jobs.download_archive(job_id)

# Explore contents
print(archive.backend)          # "zip" (legacy) or "managed_blob" (TB-scale)
print(archive.files())          # all 80+ files
print(archive.categories())     # ['banking', 'document_flows', 'esg', ...]
print(archive.summary())        # file counts and sizes by category

# Access specific files (lazy fetch via presigned URL for managed_blob)
entries = archive.json("journal_entries.json")
coa = archive.json("chart_of_accounts.json")

# Find files by pattern
banking_files = archive.find("banking/*")
esg_files = archive.find("esg/*")

# Extract everything to disk
archive.extract_to("./output")

Scenario Packs (DataSynth 3.0+)

# List 11 built-in scenario packs
packs = client.scenarios.packs()
for p in packs:
    print(f"{p.category}: {p.name}{p.description}")

# Run a scenario pack
scenario = client.scenarios.create(
    name="Q3 revenue stress test",
    generation_config={
        "sector": "retail", "rows": 10000,
        "scenarios": {
            "enabled": True,
            "packs": ["channel_stuffing"],
            "diffFormats": ["summary", "record_level"],
        },
    },
)
client.scenarios.run(scenario.id)  # spawns baseline + counterfactual
diff = client.scenarios.diff(scenario.id)

AI Tuning & Co-pilot (DataSynth 3.0+, Scale+)

# LLM suggests config improvements based on quality scores
suggestion = client.jobs.tune(job_id, target_scores={"overall": 0.95})
print(suggestion.explanation)
print("Change rows:", suggestion.original_config.get("rows"),
      "->",             suggestion.suggested_config.get("rows"))

# Ask the dashboard co-pilot
reply = client.ai.chat("Which fraud packs give me the best audit training?")
print(reply.reply)

Fingerprint Synthesis (DataSynth 3.0+, Team+)

# Privacy-preserving synthesis from a .dsf fingerprint file
submission = client.fingerprint.synthesize(
    "./my_data.dsf",
    rows=10000,
    backend="statistical",  # or "neural" / "hybrid" (Scale+)
)
job = client.jobs.wait(submission.job_id)

Adversarial Probing (DataSynth 3.0+, Enterprise)

# Probe an ONNX fraud detector for decision-boundary weaknesses
probe = client.adversarial.probe(
    "./my_model.onnx",
    n_probes=10000,
    perturbation_budget=0.05,
    threshold=0.5,
)
# ... wait for probe to complete ...
results = client.adversarial.results(probe.id)
print(f"Mean margin: {results.mean_margin:.3f}")
print(f"Positive rate: {results.positive_rate:.1%}")

Pre-built Analytics (DataSynth 2.3+)

# Get statistical evaluations for a completed job
a = client.jobs.analytics(job_id)

# Benford's Law conformity on amounts
print(f"MAD: {a.benford_analysis.mad:.4f}")
print(f"Conformity: {a.benford_analysis.conformity}")

# Amount distribution statistics
print(f"Skewness: {a.amount_distribution.skewness:.2f}")
print(f"Round number ratio: {a.amount_distribution.round_number_ratio:.2%}")

# Process variants
print(f"Happy path: {a.process_variant_summary.happy_path_concentration:.2%}")

# Banking evaluation (KYC, AML, cross-layer, velocity, false-positive)
print(f"Banking passes: {a.banking_evaluation.passes}")

NDJSON Streaming (Scale tier+)

# Rate-controlled streaming for TB-scale jobs
for envelope in client.jobs.stream_ndjson(job_id, rate=500, progress_interval=1000):
    if envelope.get("type") == "_progress":
        print(f"  {envelope['lines_emitted']:,} lines emitted")
    else:
        # Process each data record
        my_pipeline.send(envelope)

Output Mode (DataSynth 2.3+)

# Use native JSON numbers and flat layout to skip conversion boilerplate
job = client.jobs.generate_config(config={
    "sector": "retail",
    "rows": 1000,
    # ...
    "output": {
        "numericMode": "native",   # numbers, not strings
        "exportLayout": "flat",    # one row per line, header merged
    },
})

Storage Quota (TB-scale)

# Validate output size against tier quota before submitting
size = client.configs.estimate_size(config=my_config)
print(f"Estimated: {size.estimated_bytes / 1e9:.1f} GB across {size.estimated_files} files")
print(f"Quota: {size.tier_quota_bytes / 1e9:.0f} GB")
if size.exceeds_quota:
    print(f"WARNING: {size.warning}")
for bucket in size.breakdown:
    print(f"  {bucket.domain}: {bucket.bytes / 1e6:.0f} MB")

Usage & Credits

# Usage summary
usage = client.usage.summary()
print(f"Balance: {usage.balance} credits, burn rate: {usage.burn_rate}/day")

# Daily breakdown
daily = client.usage.daily()
for d in daily.daily:
    print(f"  {d.date}: {d.credits} credits")

# Prepaid credit balance
balance = client.credits.balance()
print(f"Prepaid: {balance.total_prepaid_credits}")

# Purchase credits
resp = client.credits.purchase(pack="10k")
print(f"Checkout: {resp.checkout_url}")

Quality Scores

scores = client.quality.scores()
for s in scores:
    print(f"Job {s.job_id}: overall={s.overall_score:.2f}")

timeline = client.quality.timeline(days=30)

API Keys, Webhooks, Billing, Notifications

# API keys
key = client.api_keys.create(name="CI pipeline", environment="test")
print(f"Key: {key.key}")  # Only shown once!

# Webhooks
hook = client.webhooks.create(
    url="https://example.com/webhook",
    events=["job.completed", "job.failed"],
)

# Billing
sub = client.billing.subscription()
portal = client.billing.portal()
print(f"Manage billing: {portal.portal_url}")

# Notifications
unread = client.notifications.list(unread=True)
client.notifications.mark_read(all=True)

Ecosystem Integrations

pandas

from vynfi.integrations.pandas import (
    job_to_dataframe,
    archive_to_dataframes,
    usage_to_dataframe,
)

# Convert a single file from an archive to a DataFrame
archive = client.jobs.download_archive(job_id)
df = job_to_dataframe(archive.read("journal_entries.json"))

# Convert ALL JSON files in the archive to DataFrames at once
frames = archive_to_dataframes(archive)
# {'journal_entries.json': DataFrame, 'banking/banking_customers.json': DataFrame, ...}

# Usage analytics as a time-indexed DataFrame
usage_df = usage_to_dataframe(client, days=30)

polars

from vynfi.integrations.polars import download_frame, usage_to_frame

df = download_frame(client, job_id, "journal_entries.json")
print(df.describe())

Error Handling

from vynfi import (
    VynFi,
    AuthenticationError,
    ForbiddenError,
    InsufficientCreditsError,
    NotFoundError,
    RateLimitError,
    ValidationError,
)

try:
    job = client.generate(tables=[{"name": "journal_entries", "rows": 1000000}])
except InsufficientCreditsError:
    print("Not enough credits")
except RateLimitError:
    print("Too many requests — automatic retry exhausted")
except ValidationError as e:
    print(f"Invalid request: {e}")

Configuration

client = VynFi(
    api_key="vf_live_...",
    base_url="https://api.vynfi.com",  # default
    timeout=30.0,                       # request timeout in seconds
    max_retries=2,                      # automatic retry on 429/5xx
)

# Context manager support
with VynFi(api_key="vf_live_...") as client:
    usage = client.usage.summary()

License

Apache 2.0