From c9e7bef6bc7ad8684f43aebb8e6ac7f11e550c7d Mon Sep 17 00:00:00 2001 From: Jakob Homan Date: Thu, 18 Jun 2026 14:33:44 -0700 Subject: [PATCH] config: dump every leaf config field to the startup log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the only startup log line was "Viaduck started: source=…, routing_field=…, mode=…, destinations=N, instance=…". Operators investigating production behavior at 3am had no way to confirm what delivery.flush_interval_seconds the pod started with, whether append_at_least_once was actually enabled on team-2, what poll.cdc_chunk_snapshots was set to, etc., without re-reading the rendered yaml or execing into the pod. ViaduckConfig.log_summary(log) emits one INFO line per leaf field across every section (source, routing, poll, delivery, server, web, instance, state, plus destinations[i].* per destination). Each value is independently greppable: "config: delivery.workers=8", "config: destinations[0].append_at_least_once=True", etc. Resolved secrets are never logged. log_summary touches only the raw dataclass fields, so postgres_uri_env shows up (the env var NAME, safe) while the postgres_uri @property (which resolves credentials) does not. Same for properties dicts — they hold env var names by YAML convention, the resolved_properties() @property is what fetches the actual values, and only the raw dict is dumped. Called from main.run() right after metrics.init, before any external connections. The structured "config: =" shape lets existing Loki / log-search queries lock onto specific keys. Tests: - Section coverage gate: every top-level section has at least one line. Catches a new field added to ViaduckConfig without log_summary being extended (the deploy log would silently miss it). - One-line-per-field assertion: exactly one log line per spot-checked leaf field. Defends against a refactor that bundles multiple fields onto one log line. - Per-destination indexing under destinations[i].* so multi-dest configs stay disambiguated. - append_at_least_once: True flows through verbatim. - Resolved postgres URI passwords (env-resolved) never appear in the log; env var NAMES (postgres_uri_env) do. - Resolved S3 access key / secret (from properties.*_env) never appear; the env var names do. --- tests/unit/test_config.py | 133 ++++++++++++++++++++++++++++++++++++++ viaduck/config.py | 60 +++++++++++++++++ viaduck/main.py | 6 ++ 3 files changed, 199 insertions(+) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index ad5ea56..ef0d2ff 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -573,3 +573,136 @@ def test_state_uri_passes_through_libpq_forms(config_file, monkeypatch): monkeypatch.setenv("SRC_PG", raw) cfg = load(config_file) assert cfg.state.resolve_postgres_uri(cfg.source) == raw + + +# --- log_summary --- + + +def _captured_log_lines(cfg, caplog) -> list[str]: + """Helper: call cfg.log_summary and return the rendered messages. + + log_summary uses lazy % formatting; the rendered text is on record.message, + not record.msg. Filter to the config-summary records by the "config: " prefix + so a stray unrelated log doesn't pollute the assertion set.""" + import logging as _logging + + log = _logging.getLogger("viaduck.test.config_summary") + with caplog.at_level(_logging.INFO, logger="viaduck.test.config_summary"): + cfg.log_summary(log) + return [r.getMessage() for r in caplog.records if r.getMessage().startswith("config: ")] + + +def test_log_summary_covers_every_top_level_section(config_file, caplog): + """Every top-level config section dumps at least one line. Catches the + case where a new section is added to ViaduckConfig but log_summary + isn't extended — the deploy log would silently miss the new values.""" + cfg = load(config_file) + lines = _captured_log_lines(cfg, caplog) + + # Section coverage: at least one line whose key starts with each section name. + section_prefixes = [ + "source.", + "routing.", + "poll.", + "delivery.", + "server.", + "web.", + "instance.", + "state.", + "destinations", # both destinations.count and destinations[i].* + ] + for prefix in section_prefixes: + assert any(f"config: {prefix}" in line for line in lines), ( + f"no log line for section {prefix!r}; new field added without updating log_summary?" + ) + + +def test_log_summary_one_line_per_field_not_grouped(config_file, caplog): + """Each leaf field gets its own log line so operators can grep for + individual values (the whole point of the "one line per config" shape). + Defends against a refactor that bundles multiple fields onto one line.""" + cfg = load(config_file) + lines = _captured_log_lines(cfg, caplog) + + # Spot-check known leaf fields each appear on their own line. + for needle in [ + "config: source.name=", + "config: source.table=", + "config: routing.field=", + "config: routing.seed_mode=", + "config: delivery.workers=", + "config: delivery.flush_interval_seconds=", + "config: delivery.flush_max_rows=", + "config: delivery.flush_max_bytes=", + "config: poll.interval_seconds=", + "config: poll.cdc_chunk_snapshots=", + "config: instance.partition.mode=", + "config: state.table=", + "config: state.schema=", + ]: + matching = [line for line in lines if line.startswith(needle)] + assert len(matching) == 1, f"expected exactly one line for {needle!r}, got {matching!r}" + + +def test_log_summary_destination_fields_per_index(config_file, caplog): + """Each destination's fields appear under destinations[i].* so multi- + destination configs stay disambiguated by index (not name) in the log.""" + cfg = load(config_file) + lines = _captured_log_lines(cfg, caplog) + + assert "config: destinations.count=1" in lines + assert any("config: destinations[0].id='quacksworth-lake'" == line for line in lines) + assert any("config: destinations[0].routing_value='quacksworth'" == line for line in lines) + assert any("config: destinations[0].append_at_least_once=False" == line for line in lines) + + +def test_log_summary_logs_append_at_least_once_true(tmp_path: Path, caplog): + """The flag value flows through verbatim — defends against a "redact bools" + refactor or a hardcoded False that would hide an enabled fast path.""" + p = tmp_path / "cfg.yaml" + p.write_text( + MINIMAL_YAML.replace( + " data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: true" + ) + ) + cfg = load(p) + lines = _captured_log_lines(cfg, caplog) + assert any("config: destinations[0].append_at_least_once=True" == line for line in lines) + + +def test_log_summary_does_not_log_resolved_postgres_uri(config_file, caplog, monkeypatch): + """Env-var-resolved credentials (DB passwords inside the URI, S3 keys) must + never appear in the log — only env var NAMES. Catches a future refactor + that calls .postgres_uri (the resolved @property) instead of the raw field.""" + monkeypatch.setenv("SRC_PG", "postgres:host=src password=SUPER_SECRET_PG_PW") + monkeypatch.setenv("DEST_QW_PG", "postgres:host=qw password=SUPER_SECRET_DEST_PW") + cfg = load(config_file) + lines = _captured_log_lines(cfg, caplog) + + joined = "\n".join(lines) + assert "SUPER_SECRET_PG_PW" not in joined, "source resolved postgres URI leaked into the log" + assert "SUPER_SECRET_DEST_PW" not in joined, "destination resolved postgres URI leaked into the log" + # Env var NAMES are safe and SHOULD appear. + assert any("config: source.postgres_uri_env='SRC_PG'" == line for line in lines) + assert any("config: destinations[0].postgres_uri_env='DEST_QW_PG'" == line for line in lines) + + +def test_log_summary_does_not_log_resolved_s3_credentials(tmp_path, caplog, monkeypatch): + """Properties dicts hold env var NAMES (s3_access_key_id_env: MY_KEY_ENV), + never the resolved credentials. The resolved_properties() @property + resolves env values; log_summary must dump the raw properties dict only.""" + p = tmp_path / "cfg.yaml" + p.write_text( + MINIMAL_YAML + + "\ndefaults:\n properties:\n s3_access_key_id_env: 'S3_KEY'\n s3_secret_access_key_env: 'S3_SECRET'\n" + ) + monkeypatch.setenv("S3_KEY", "AKIA_RESOLVED_KEY_ID_SHOULD_NOT_APPEAR") + monkeypatch.setenv("S3_SECRET", "RESOLVED_S3_SECRET_SHOULD_NOT_APPEAR") + cfg = load(p) + lines = _captured_log_lines(cfg, caplog) + + joined = "\n".join(lines) + assert "AKIA_RESOLVED_KEY_ID_SHOULD_NOT_APPEAR" not in joined + assert "RESOLVED_S3_SECRET_SHOULD_NOT_APPEAR" not in joined + # The env-var names themselves are fine — they're just identifiers. + assert any("s3_access_key_id_env" in line and "S3_KEY" in line for line in lines) diff --git a/viaduck/config.py b/viaduck/config.py index 4dc53d0..c6cb478 100644 --- a/viaduck/config.py +++ b/viaduck/config.py @@ -17,6 +17,7 @@ from __future__ import annotations import hashlib +import logging import os from dataclasses import dataclass, field from pathlib import Path @@ -303,6 +304,65 @@ def destination_by_id(self, dest_id: str) -> DestinationConfig: return d raise ConfigError(f"Unknown destination ID: {dest_id!r}") + def log_summary(self, log: logging.Logger) -> None: + """Emit one INFO log line per leaf config field. Each value is + independently greppable from the deploy log — operators can answer + "what flush_interval_seconds did this pod start with?" or "is the + fast path on for team-2?" without re-reading the values yaml or + execing into the pod. + + Resolved secrets (postgres connection strings with credentials, S3 + access keys) are never logged. Only raw dataclass fields are dumped, + so a `*_env` field holds the env var NAME — safe — while the + `postgres_uri` @property (which resolves the env var) is not touched. + `properties` dicts contain the same `*_env` references (YAML literal + env var names), so they're safe too. + """ + log.info("config: source.name=%r", self.source.name) + log.info("config: source.postgres_uri_env=%r", self.source.postgres_uri_env) + log.info("config: source.data_path=%r", self.source.data_path) + log.info("config: source.table=%r", self.source.table) + log.info("config: source.properties=%r", self.source.properties) + + log.info("config: routing.field=%r", self.routing.field) + log.info("config: routing.key_columns=%r", self.routing.key_columns) + log.info("config: routing.seed_mode=%r", self.routing.seed_mode) + log.info("config: routing.seed_truncate=%s", self.routing.seed_truncate) + + log.info("config: poll.interval_seconds=%s", self.poll.interval_seconds) + log.info("config: poll.cdc_chunk_snapshots=%d", self.poll.cdc_chunk_snapshots) + + log.info("config: delivery.workers=%d", self.delivery.workers) + log.info("config: delivery.flush_interval_seconds=%s", self.delivery.flush_interval_seconds) + log.info("config: delivery.flush_max_rows=%d", self.delivery.flush_max_rows) + log.info("config: delivery.flush_max_bytes=%d", self.delivery.flush_max_bytes) + log.info("config: delivery.buffer_total_max_bytes=%d", self.delivery.buffer_total_max_bytes) + log.info("config: delivery.pool_max_open=%d", self.delivery.pool_max_open) + + log.info("config: server.port=%d", self.server.port) + log.info("config: web.enabled=%s", self.web.enabled) + + log.info("config: instance.id=%r", self.instance.id) + log.info("config: instance.partition.mode=%r", self.instance.partition.mode) + log.info("config: instance.partition.include=%r", self.instance.partition.include) + log.info("config: instance.partition.total=%d", self.instance.partition.total) + log.info("config: instance.partition.ordinal=%d", self.instance.partition.ordinal) + + log.info("config: state.table=%r", self.state.table) + log.info("config: state.schema=%r", self.state.schema) + log.info("config: state.postgres_uri_env=%r", self.state.postgres_uri_env) + + log.info("config: destinations.count=%d", len(self.destinations)) + for i, d in enumerate(self.destinations): + log.info("config: destinations[%d].id=%r", i, d.id) + log.info("config: destinations[%d].routing_value=%r", i, d.routing_value) + log.info("config: destinations[%d].name=%r", i, d.name) + log.info("config: destinations[%d].postgres_uri_env=%r", i, d.postgres_uri_env) + log.info("config: destinations[%d].data_path=%r", i, d.data_path) + log.info("config: destinations[%d].table=%r", i, d.table) + log.info("config: destinations[%d].properties=%r", i, d.properties) + log.info("config: destinations[%d].append_at_least_once=%s", i, d.append_at_least_once) + def assigned_destination_ids(self) -> list[str]: """Return destination IDs assigned to this instance based on partition config.""" all_ids = [d.id for d in self.destinations] diff --git a/viaduck/main.py b/viaduck/main.py index b1fa423..406514b 100644 --- a/viaduck/main.py +++ b/viaduck/main.py @@ -656,6 +656,12 @@ def run(cfg: config.ViaduckConfig) -> None: """Main poll loop.""" metrics.init(cfg.pipeline_name) + # One INFO line per leaf config field, before any external connections. + # Lets ops grep the deploy log for individual values without re-reading + # the rendered yaml or execing into the pod. Resolved secrets (postgres + # URIs, S3 creds) are never logged — only env var names. + cfg.log_summary(log) + from viaduck import server http = server.start(cfg.server.port, web_enabled=cfg.web.enabled)