Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,136 @@ def test_state_uri_passes_through_libpq_forms(config_file, monkeypatch):
monkeypatch.setenv("SRC_PG", raw)
cfg = load(config_file)
assert cfg.state.resolve_postgres_uri(cfg.source) == raw


# --- log_summary ---


def _captured_log_lines(cfg, caplog) -> list[str]:
"""Helper: call cfg.log_summary and return the rendered messages.

log_summary uses lazy % formatting; the rendered text is on record.message,
not record.msg. Filter to the config-summary records by the "config: " prefix
so a stray unrelated log doesn't pollute the assertion set."""
import logging as _logging

log = _logging.getLogger("viaduck.test.config_summary")
with caplog.at_level(_logging.INFO, logger="viaduck.test.config_summary"):
cfg.log_summary(log)
return [r.getMessage() for r in caplog.records if r.getMessage().startswith("config: ")]


def test_log_summary_covers_every_top_level_section(config_file, caplog):
"""Every top-level config section dumps at least one line. Catches the
case where a new section is added to ViaduckConfig but log_summary
isn't extended — the deploy log would silently miss the new values."""
cfg = load(config_file)
lines = _captured_log_lines(cfg, caplog)

# Section coverage: at least one line whose key starts with each section name.
section_prefixes = [
"source.",
"routing.",
"poll.",
"delivery.",
"server.",
"web.",
"instance.",
"state.",
"destinations", # both destinations.count and destinations[i].*
]
for prefix in section_prefixes:
assert any(f"config: {prefix}" in line for line in lines), (
f"no log line for section {prefix!r}; new field added without updating log_summary?"
)


def test_log_summary_one_line_per_field_not_grouped(config_file, caplog):
"""Each leaf field gets its own log line so operators can grep for
individual values (the whole point of the "one line per config" shape).
Defends against a refactor that bundles multiple fields onto one line."""
cfg = load(config_file)
lines = _captured_log_lines(cfg, caplog)

# Spot-check known leaf fields each appear on their own line.
for needle in [
"config: source.name=",
"config: source.table=",
"config: routing.field=",
"config: routing.seed_mode=",
"config: delivery.workers=",
"config: delivery.flush_interval_seconds=",
"config: delivery.flush_max_rows=",
"config: delivery.flush_max_bytes=",
"config: poll.interval_seconds=",
"config: poll.cdc_chunk_snapshots=",
"config: instance.partition.mode=",
"config: state.table=",
"config: state.schema=",
]:
matching = [line for line in lines if line.startswith(needle)]
assert len(matching) == 1, f"expected exactly one line for {needle!r}, got {matching!r}"


def test_log_summary_destination_fields_per_index(config_file, caplog):
"""Each destination's fields appear under destinations[i].* so multi-
destination configs stay disambiguated by index (not name) in the log."""
cfg = load(config_file)
lines = _captured_log_lines(cfg, caplog)

assert "config: destinations.count=1" in lines
assert any("config: destinations[0].id='quacksworth-lake'" == line for line in lines)
assert any("config: destinations[0].routing_value='quacksworth'" == line for line in lines)
assert any("config: destinations[0].append_at_least_once=False" == line for line in lines)


def test_log_summary_logs_append_at_least_once_true(tmp_path: Path, caplog):
"""The flag value flows through verbatim — defends against a "redact bools"
refactor or a hardcoded False that would hide an enabled fast path."""
p = tmp_path / "cfg.yaml"
p.write_text(
MINIMAL_YAML.replace(
" data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: true"
)
)
cfg = load(p)
lines = _captured_log_lines(cfg, caplog)
assert any("config: destinations[0].append_at_least_once=True" == line for line in lines)


def test_log_summary_does_not_log_resolved_postgres_uri(config_file, caplog, monkeypatch):
"""Env-var-resolved credentials (DB passwords inside the URI, S3 keys) must
never appear in the log — only env var NAMES. Catches a future refactor
that calls .postgres_uri (the resolved @property) instead of the raw field."""
monkeypatch.setenv("SRC_PG", "postgres:host=src password=SUPER_SECRET_PG_PW")
monkeypatch.setenv("DEST_QW_PG", "postgres:host=qw password=SUPER_SECRET_DEST_PW")
cfg = load(config_file)
lines = _captured_log_lines(cfg, caplog)

joined = "\n".join(lines)
assert "SUPER_SECRET_PG_PW" not in joined, "source resolved postgres URI leaked into the log"
assert "SUPER_SECRET_DEST_PW" not in joined, "destination resolved postgres URI leaked into the log"
# Env var NAMES are safe and SHOULD appear.
assert any("config: source.postgres_uri_env='SRC_PG'" == line for line in lines)
assert any("config: destinations[0].postgres_uri_env='DEST_QW_PG'" == line for line in lines)


def test_log_summary_does_not_log_resolved_s3_credentials(tmp_path, caplog, monkeypatch):
"""Properties dicts hold env var NAMES (s3_access_key_id_env: MY_KEY_ENV),
never the resolved credentials. The resolved_properties() @property
resolves env values; log_summary must dump the raw properties dict only."""
p = tmp_path / "cfg.yaml"
p.write_text(
MINIMAL_YAML
+ "\ndefaults:\n properties:\n s3_access_key_id_env: 'S3_KEY'\n s3_secret_access_key_env: 'S3_SECRET'\n"
)
monkeypatch.setenv("S3_KEY", "AKIA_RESOLVED_KEY_ID_SHOULD_NOT_APPEAR")
monkeypatch.setenv("S3_SECRET", "RESOLVED_S3_SECRET_SHOULD_NOT_APPEAR")
cfg = load(p)
lines = _captured_log_lines(cfg, caplog)

joined = "\n".join(lines)
assert "AKIA_RESOLVED_KEY_ID_SHOULD_NOT_APPEAR" not in joined
assert "RESOLVED_S3_SECRET_SHOULD_NOT_APPEAR" not in joined
# The env-var names themselves are fine — they're just identifiers.
assert any("s3_access_key_id_env" in line and "S3_KEY" in line for line in lines)
60 changes: 60 additions & 0 deletions viaduck/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import hashlib
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
Expand Down Expand Up @@ -303,6 +304,65 @@ def destination_by_id(self, dest_id: str) -> DestinationConfig:
return d
raise ConfigError(f"Unknown destination ID: {dest_id!r}")

def log_summary(self, log: logging.Logger) -> None:
"""Emit one INFO log line per leaf config field. Each value is
independently greppable from the deploy log — operators can answer
"what flush_interval_seconds did this pod start with?" or "is the
fast path on for team-2?" without re-reading the values yaml or
execing into the pod.

Resolved secrets (postgres connection strings with credentials, S3
access keys) are never logged. Only raw dataclass fields are dumped,
so a `*_env` field holds the env var NAME — safe — while the
`postgres_uri` @property (which resolves the env var) is not touched.
`properties` dicts contain the same `*_env` references (YAML literal
env var names), so they're safe too.
"""
log.info("config: source.name=%r", self.source.name)
log.info("config: source.postgres_uri_env=%r", self.source.postgres_uri_env)
log.info("config: source.data_path=%r", self.source.data_path)
log.info("config: source.table=%r", self.source.table)
log.info("config: source.properties=%r", self.source.properties)

log.info("config: routing.field=%r", self.routing.field)
log.info("config: routing.key_columns=%r", self.routing.key_columns)
log.info("config: routing.seed_mode=%r", self.routing.seed_mode)
log.info("config: routing.seed_truncate=%s", self.routing.seed_truncate)

log.info("config: poll.interval_seconds=%s", self.poll.interval_seconds)
log.info("config: poll.cdc_chunk_snapshots=%d", self.poll.cdc_chunk_snapshots)

log.info("config: delivery.workers=%d", self.delivery.workers)
log.info("config: delivery.flush_interval_seconds=%s", self.delivery.flush_interval_seconds)
log.info("config: delivery.flush_max_rows=%d", self.delivery.flush_max_rows)
log.info("config: delivery.flush_max_bytes=%d", self.delivery.flush_max_bytes)
log.info("config: delivery.buffer_total_max_bytes=%d", self.delivery.buffer_total_max_bytes)
log.info("config: delivery.pool_max_open=%d", self.delivery.pool_max_open)

log.info("config: server.port=%d", self.server.port)
log.info("config: web.enabled=%s", self.web.enabled)

log.info("config: instance.id=%r", self.instance.id)
log.info("config: instance.partition.mode=%r", self.instance.partition.mode)
log.info("config: instance.partition.include=%r", self.instance.partition.include)
log.info("config: instance.partition.total=%d", self.instance.partition.total)
log.info("config: instance.partition.ordinal=%d", self.instance.partition.ordinal)

log.info("config: state.table=%r", self.state.table)
log.info("config: state.schema=%r", self.state.schema)
log.info("config: state.postgres_uri_env=%r", self.state.postgres_uri_env)

log.info("config: destinations.count=%d", len(self.destinations))
for i, d in enumerate(self.destinations):
log.info("config: destinations[%d].id=%r", i, d.id)
log.info("config: destinations[%d].routing_value=%r", i, d.routing_value)
log.info("config: destinations[%d].name=%r", i, d.name)
log.info("config: destinations[%d].postgres_uri_env=%r", i, d.postgres_uri_env)
log.info("config: destinations[%d].data_path=%r", i, d.data_path)
log.info("config: destinations[%d].table=%r", i, d.table)
log.info("config: destinations[%d].properties=%r", i, d.properties)
log.info("config: destinations[%d].append_at_least_once=%s", i, d.append_at_least_once)

def assigned_destination_ids(self) -> list[str]:
"""Return destination IDs assigned to this instance based on partition config."""
all_ids = [d.id for d in self.destinations]
Expand Down
6 changes: 6 additions & 0 deletions viaduck/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,12 @@ def run(cfg: config.ViaduckConfig) -> None:
"""Main poll loop."""
metrics.init(cfg.pipeline_name)

# One INFO line per leaf config field, before any external connections.
# Lets ops grep the deploy log for individual values without re-reading
# the rendered yaml or execing into the pod. Resolved secrets (postgres
# URIs, S3 creds) are never logged — only env var names.
cfg.log_summary(log)

from viaduck import server

http = server.start(cfg.server.port, web_enabled=cfg.web.enabled)
Expand Down
Loading