From 18cb5a8f68f584239ff654a3bf66ecf1d139bdf0 Mon Sep 17 00:00:00 2001 From: taran Date: Mon, 8 Jun 2026 22:53:38 +0200 Subject: [PATCH] fix(tiering): honor tiered_retention policies in the flag phase add_tiered_retention_policy writes policy_type='tiered_retention', but the flag/tier phase only matched 'tiering' -- so the tiering job reported 'Found 0 table(s)' and chunks were never flagged 'tiered' (only the drop horizon worked). - tiering_job.py + _get_chunks_to_tier: match policy_type IN ('tiering','tiered_retention') and read the horizon from 'after' or 'tier_after'. - add_tiered_retention_policy: install the write-tracking trigger, backfill last_write_lsn on existing active chunks, and set tiering_enabled = TRUE -- without these tier_chunk treats last_write_lsn as NULL and defers forever. Release 0.1.2. --- CHANGELOG.md | 8 ++++++++ VERSION | 2 +- databricks/workflows/tiering_job.py | 4 ++-- sql/00_version.sql | 2 +- sql/05_tiering.sql | 9 +++++++-- sql/06_retention.sql | 17 +++++++++++++++++ 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3718ff..e81c3db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ Format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), version ## [Unreleased] +## [0.1.2] - 2026-06-08 + +### Fixed + +- **`add_tiered_retention_policy` now actually tiers.** The flag/tier phase only recognized `tiering` policies, so a combined `tiered_retention` policy was never flagged (`Found 0 table(s) with tiering policies`) — only its drop horizon worked. Fixed across the flag path: the tiering job and `_get_chunks_to_tier` now match `policy_type IN ('tiering', 'tiered_retention')` and read the horizon from `after` or `tier_after`. `add_tiered_retention_policy` also installs the write-tracking trigger, backfills `last_write_lsn` on existing chunks, and sets `tiering_enabled = TRUE` — without these, `tier_chunk` deferred every chunk. + +Upgrade by reinstalling `dist/lakets.sql` and redeploying the maintenance jobs (the fix spans both SQL and `tiering_job.py`). + ## [0.1.1] - 2026-06-08 ### Fixed diff --git a/VERSION b/VERSION index 17e51c3..d917d3e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.1 +0.1.2 diff --git a/databricks/workflows/tiering_job.py b/databricks/workflows/tiering_job.py index 622329b..48bd30e 100644 --- a/databricks/workflows/tiering_job.py +++ b/databricks/workflows/tiering_job.py @@ -44,10 +44,10 @@ def run(project_name: str) -> int: deferred = 0 with lakebase_cursor(project_name) as cur: tables = fetch_all(cur, """ - SELECT hr.id, hr.schema_name, hr.table_name + SELECT DISTINCT hr.id, hr.schema_name, hr.table_name FROM lakets._chronotable_registry hr JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id - WHERE pr.policy_type = 'tiering' AND pr.enabled = TRUE + WHERE pr.policy_type IN ('tiering', 'tiered_retention') AND pr.enabled = TRUE """) logger.info("Found %d table(s) with tiering policies", len(tables)) diff --git a/sql/00_version.sql b/sql/00_version.sql index 76e5e88..dc749d4 100644 --- a/sql/00_version.sql +++ b/sql/00_version.sql @@ -22,7 +22,7 @@ SET client_min_messages TO NOTICE; DO $$ DECLARE v_installed TEXT; - v_incoming TEXT := coalesce(nullif('__LAKETS_VERSION__', '__LAKE' || 'TS_VERSION__'), '0.1.1'); + v_incoming TEXT := coalesce(nullif('__LAKETS_VERSION__', '__LAKE' || 'TS_VERSION__'), '0.1.2'); v_installed_parts INT[]; v_incoming_parts INT[]; BEGIN diff --git a/sql/05_tiering.sql b/sql/05_tiering.sql index 50b93c8..5189657 100644 --- a/sql/05_tiering.sql +++ b/sql/05_tiering.sql @@ -288,13 +288,18 @@ DECLARE v_after INTERVAL; v_shadow TEXT; BEGIN - SELECT hr.id, (pr.config->>'after')::INTERVAL, hr.shadow_table_name + -- The flag/tier horizon comes from a 'tiering' policy ('after') or the + -- tier_after leg of a combined 'tiered_retention' policy. Both are validated + -- and flagged here; 'tiered_retention' also owns the later drop at drop_after. + SELECT hr.id, + COALESCE(pr.config->>'after', pr.config->>'tier_after')::INTERVAL, + hr.shadow_table_name INTO v_chronotable_id, v_after, v_shadow FROM lakets._chronotable_registry hr JOIN lakets._policy_registry pr ON hr.id = pr.chronotable_id WHERE hr.schema_name = p_schema_name AND hr.table_name = p_table_name - AND pr.policy_type = 'tiering' + AND pr.policy_type IN ('tiering', 'tiered_retention') AND pr.enabled = TRUE; IF NOT FOUND THEN diff --git a/sql/06_retention.sql b/sql/06_retention.sql index 3ce0090..e8389b8 100644 --- a/sql/06_retention.sql +++ b/sql/06_retention.sql @@ -100,6 +100,23 @@ BEGIN ), TRUE) RETURNING id INTO v_policy_id; + UPDATE lakets._chronotable_registry + SET tiering_enabled = TRUE + WHERE id = v_chronotable_id; + + -- The tiering job flags chunks 'tiered' at tier_after, gated on each chunk's + -- last_write_lsn being durable in UC. Install write-tracking and backfill the + -- current WAL head onto existing active chunks (same as add_tiering_policy), + -- or tier_chunk treats a NULL last_write_lsn as "cannot prove durable" and + -- defers every chunk forever. + PERFORM lakets._install_tiering_write_tracking(p_schema_name, p_table_name); + + UPDATE lakets._chunk_metadata + SET last_write_lsn = pg_current_wal_lsn() + WHERE chronotable_id = v_chronotable_id + AND status = 'active' + AND last_write_lsn IS NULL; + RETURN v_policy_id; END; $$;