From 69c2776df6f6fca1f6e30368e674a117d5559386 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 20 May 2026 11:56:39 -0400 Subject: [PATCH 1/9] schema: unify events_raw column names and types with prod Arrow path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In-place patch of docker/init/00-schema.sql, the CH-native exporter, and the TAP tests so the docker quickstart schema aligns with what prod actually writes to (datagres_otel.query_logs_arrow in clickgres-platform). This is the pre-cutover unification: pg_stat_ch's CH-native path was previously isolated from prod, and the two schemas had drifted apart on both column naming and types. Column renames (prod-side naming wins; closer to OTel semantic conventions and minimizes downstream churn): ts_start -> ts db -> db_name username -> db_user cmd_type -> db_operation query -> query_text Type fix: err_sqlstate FixedString(5) -> LowCardinality(String) FixedString does not round-trip through Arrow IPC cleanly, and ~270 SQLSTATE codes are dictionary-friendly. The CH-native exporter is updated to write the column via TagString (clickhouse-cpp's ColumnString -> CH LowCardinality(String) is fine on the wire). Envelope columns added (with DEFAULT '' so the CH-native exporter, which does not yet emit these, continues to insert successfully): instance_ubid, server_ubid, server_role, region, cell, service_version, host_id, pod_name Engine/partitioning aligned with prod: ORDER BY ts -> ORDER BY (instance_ubid, ts) (tenant locality) TTL added: toDate(ts) + INTERVAL 180 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1 Materialized views (events_recent_1h, query_stats_5m, db_app_user_1m, errors_recent) updated to reference the new column names and to include instance_ubid in their ORDER BY / GROUP BY / SELECT projections so they remain consistent with the events_raw partitioning strategy. Test fixtures updated to query the new column names: t/010_clickhouse_export.pl, t/012_timing_accuracy.pl, t/021_cmd_type_counts.pl, t/027_query_normalization.pl, t/031_normalize_cache.pl parent_query_id is intentionally NOT included here — it's the subject of PR #95 (parent-query-id-surgical) and lands as its own follow-up migration after this PR. Validated end-to-end: docker/init/00-schema.sql applies cleanly on clickhouse/clickhouse-server:26.1 (the version pinned in docker/docker-compose.test.yml); INSERTs that omit the envelope columns fill them via DEFAULT ''; all 4 MVs build. CI will run the TAP suite. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/init/00-schema.sql | 127 ++++++++++++++++++------------ src/export/clickhouse_exporter.cc | 9 ++- src/export/exporter_interface.h | 10 +-- src/export/stats_exporter.cc | 9 ++- t/010_clickhouse_export.pl | 12 +-- t/012_timing_accuracy.pl | 4 +- t/013_clickhouse_tls.pl | 2 +- t/021_cmd_type_counts.pl | 4 +- t/027_query_normalization.pl | 52 ++++++------ t/031_normalize_cache.pl | 28 +++---- t/034_query_intern_oom_export.pl | 20 ++--- 11 files changed, 152 insertions(+), 125 deletions(-) diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..eee768a 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -32,7 +32,8 @@ DROP TABLE IF EXISTS pg_stat_ch.events_raw; -- exported in batches by the pg_stat_ch background worker. -- -- Partitioned by date for efficient time-range queries and data retention. --- Ordered by ts_start for efficient global time-range scans. +-- Ordered by (instance_ubid, ts) to keep tenant data colocated and bound +-- per-tenant time-range scans. -- -- ============================================================================ @@ -41,23 +42,23 @@ CREATE TABLE pg_stat_ch.events_raw -- ======================================================================== -- Core identity and timing -- ======================================================================== - ts_start DateTime64(6, 'UTC') COMMENT 'Query execution start timestamp (UTC). Used for time-range filtering and partitioning.', + ts DateTime64(6, 'UTC') COMMENT 'Query execution start timestamp (UTC). Used for time-range filtering and partitioning.', duration_us UInt64 COMMENT 'Total query execution time in microseconds. HIGH: slow query, investigate EXPLAIN. LOW: fast query. Compare with p95/p99 from query_stats_5m to identify outliers.', - db LowCardinality(String) COMMENT 'PostgreSQL database name. Use for multi-tenant filtering and per-database load analysis.', + db_name LowCardinality(String) COMMENT 'PostgreSQL database name. Use for multi-tenant filtering and per-database load analysis.', - username LowCardinality(String) COMMENT 'PostgreSQL user/role name. Useful for auditing and per-user resource tracking.', + db_user LowCardinality(String) COMMENT 'PostgreSQL user/role name. Useful for auditing and per-user resource tracking.', pid Int32 COMMENT 'PostgreSQL backend process ID. Correlate with pg_stat_activity for session debugging.', query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', - cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', + db_operation LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', - query String COMMENT 'Full SQL query text (may be truncated). Used for debugging and query analysis.', + query_text String COMMENT 'Full SQL query text (may be truncated). Used for debugging and query analysis.', -- ======================================================================== -- Shared buffer metrics (main buffer cache) @@ -176,7 +177,7 @@ CREATE TABLE pg_stat_ch.events_raw -- Captured via emit_log_hook when a query produces an error or warning. -- Useful for error tracking, debugging, and monitoring error rates. -- ======================================================================== - err_sqlstate FixedString(5) COMMENT 'SQL standard 5-character error code. Examples: 42P01=undefined_table, 23505=unique_violation, 42601=syntax_error, 57014=query_canceled. See PostgreSQL error codes appendix.', + err_sqlstate LowCardinality(String) COMMENT 'SQL standard 5-character error code. Examples: 42P01=undefined_table, 23505=unique_violation, 42601=syntax_error, 57014=query_canceled. See PostgreSQL error codes appendix.', err_elevel UInt8 COMMENT 'Error severity level. 0=none (success), 19=WARNING, 21=ERROR, 22=FATAL, 23=PANIC. Filter err_elevel>=21 for actual errors. WARNING (19) indicates potential issues.', @@ -189,11 +190,30 @@ CREATE TABLE pg_stat_ch.events_raw -- ======================================================================== app LowCardinality(String) COMMENT 'Client application_name. Set via connection string or SET application_name. Use for identifying load sources: "pgAdmin", "myapp-api", "pg_dump", etc.', - client_addr String COMMENT 'Client IP address. Useful for geographic analysis, debugging connection issues, and identifying load sources by host.' + client_addr String COMMENT 'Client IP address. Useful for geographic analysis, debugging connection issues, and identifying load sources by host.', + + -- ======================================================================== + -- OTel resource attributes (envelope) + -- ======================================================================== + -- Populated from psch_extra_attributes (pg_stat_ch GUC). One row carries + -- the resource context for the emitting Postgres instance. Default to '' + -- so the CH-native exporter (which does not yet emit these) inserts + -- successfully against this schema. + -- ======================================================================== + instance_ubid String DEFAULT '' COMMENT 'Ubicloud ID of the emitting Postgres instance. String (opaque, not LC) — cardinality scales with active customer count.', + server_ubid String DEFAULT '' COMMENT 'Ubicloud ID of the underlying server. String (opaque).', + server_role LowCardinality(String) DEFAULT '' COMMENT 'Server role within the HA pair. ~2 values: "primary", "standby".', + region LowCardinality(String) DEFAULT '' COMMENT 'Cloud region (e.g. "us-east-1"). ~tens of values.', + cell LowCardinality(String) DEFAULT '' COMMENT 'Cell (sharded deployment unit) within the region. ~hundreds of values.', + service_version LowCardinality(String) DEFAULT '' COMMENT 'pg_stat_ch extension version.', + host_id String DEFAULT '' COMMENT 'Physical host identifier. String (opaque).', + pod_name String DEFAULT '' COMMENT 'Kubernetes pod name. String (opaque).' ) ENGINE = MergeTree -PARTITION BY toDate(ts_start) -ORDER BY ts_start; +PARTITION BY toDate(ts) +ORDER BY (instance_ubid, ts) +TTL toDate(ts) + INTERVAL 180 DAY +SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- ============================================================================ @@ -213,10 +233,10 @@ ORDER BY ts_start; -- - events_raw can have longer retention (days/weeks) -- -- EXAMPLE QUERY: --- SELECT ts_start, db, duration_us/1000 AS ms, substring(query, 1, 100) +-- SELECT ts, db_name, duration_us/1000 AS ms, substring(query_text, 1, 100) -- FROM pg_stat_ch.events_recent_1h --- WHERE ts_start > now() - INTERVAL 5 MINUTE --- ORDER BY ts_start DESC +-- WHERE ts > now() - INTERVAL 5 MINUTE +-- ORDER BY ts DESC -- LIMIT 50; -- -- ============================================================================ @@ -225,9 +245,9 @@ DROP TABLE IF EXISTS pg_stat_ch.events_recent_1h; CREATE MATERIALIZED VIEW pg_stat_ch.events_recent_1h ENGINE = MergeTree -PARTITION BY toDate(ts_start) -ORDER BY ts_start -TTL toDateTime(ts_start) + INTERVAL 1 HOUR DELETE +PARTITION BY toDate(ts) +ORDER BY (instance_ubid, ts) +TTL toDateTime(ts) + INTERVAL 1 HOUR DELETE AS SELECT * FROM pg_stat_ch.events_raw; @@ -260,14 +280,14 @@ FROM pg_stat_ch.events_raw; -- -- SELECT -- query_id, --- cmd_type, +-- db_operation, -- countMerge(calls_state) AS calls, -- round(sumMerge(duration_sum_state) / countMerge(calls_state) / 1000, 2) AS avg_ms, -- round(quantilesTDigestMerge(0.95, 0.99)(duration_q_state)[1] / 1000, 2) AS p95_ms, -- round(quantilesTDigestMerge(0.95, 0.99)(duration_q_state)[2] / 1000, 2) AS p99_ms -- FROM pg_stat_ch.query_stats_5m -- WHERE bucket >= now() - INTERVAL 1 HOUR --- GROUP BY query_id, cmd_type +-- GROUP BY query_id, db_operation -- ORDER BY p99_ms DESC -- LIMIT 10; -- @@ -278,9 +298,10 @@ DROP TABLE IF EXISTS pg_stat_ch.query_stats_5m; CREATE MATERIALIZED VIEW pg_stat_ch.query_stats_5m ( bucket DateTime COMMENT '5-minute time bucket start', - db LowCardinality(String) COMMENT 'Database name', + instance_ubid String COMMENT 'Emitting instance ID — first ORDER BY key to keep tenant data colocated.', + db_name LowCardinality(String) COMMENT 'Database name', query_id Int64 COMMENT 'Normalized query identifier', - cmd_type LowCardinality(String) COMMENT 'Command type (SELECT, INSERT, etc.)', + db_operation LowCardinality(String) COMMENT 'Command type (SELECT, INSERT, etc.)', calls_state AggregateFunction(count) COMMENT 'Call count state. Finalize with countMerge().', duration_sum_state AggregateFunction(sum, UInt64) COMMENT 'Total duration state. Finalize with sumMerge().', @@ -294,13 +315,14 @@ CREATE MATERIALIZED VIEW pg_stat_ch.query_stats_5m ) ENGINE = AggregatingMergeTree PARTITION BY toYYYYMMDD(bucket) -ORDER BY (bucket, db, query_id, cmd_type) +ORDER BY (instance_ubid, bucket, db_name, query_id, db_operation) AS SELECT - toStartOfInterval(toDateTime(ts_start), INTERVAL 5 MINUTE) AS bucket, - db, + toStartOfInterval(toDateTime(ts), INTERVAL 5 MINUTE) AS bucket, + instance_ubid, + db_name, query_id, - cmd_type, + db_operation, countState() AS calls_state, sumState(duration_us) AS duration_sum_state, @@ -312,7 +334,7 @@ SELECT sumState(shared_blks_hit) AS shared_hit_sum_state, sumState(shared_blks_read) AS shared_read_sum_state FROM pg_stat_ch.events_raw -GROUP BY bucket, db, query_id, cmd_type; +GROUP BY bucket, instance_ubid, db_name, query_id, db_operation; -- ============================================================================ @@ -343,14 +365,14 @@ GROUP BY bucket, db, query_id, cmd_type; -- EXAMPLE: Error rate by database and user -- -- SELECT --- db, --- username, +-- db_name, +-- db_user, -- countMerge(calls_state) AS queries, -- sumMerge(errors_sum_state) AS errors, -- round(100 * sumMerge(errors_sum_state) / countMerge(calls_state), 2) AS error_pct -- FROM pg_stat_ch.db_app_user_1m -- WHERE bucket >= now() - INTERVAL 1 HOUR --- GROUP BY db, username +-- GROUP BY db_name, db_user -- HAVING errors > 0 -- ORDER BY error_pct DESC; -- @@ -361,10 +383,11 @@ DROP TABLE IF EXISTS pg_stat_ch.db_app_user_1m; CREATE MATERIALIZED VIEW pg_stat_ch.db_app_user_1m ( bucket DateTime COMMENT '1-minute time bucket start', - db LowCardinality(String) COMMENT 'Database name', + instance_ubid String COMMENT 'Emitting instance ID — first ORDER BY key for tenant locality.', + db_name LowCardinality(String) COMMENT 'Database name', app LowCardinality(String) COMMENT 'Application name', - username LowCardinality(String) COMMENT 'PostgreSQL username', - cmd_type LowCardinality(String) COMMENT 'Command type', + db_user LowCardinality(String) COMMENT 'PostgreSQL user/role name', + db_operation LowCardinality(String) COMMENT 'Command type', calls_state AggregateFunction(count) COMMENT 'Query count state. Finalize with countMerge().', duration_sum_state AggregateFunction(sum, UInt64) COMMENT 'Total duration state (μs). Finalize with sumMerge().', @@ -373,21 +396,22 @@ CREATE MATERIALIZED VIEW pg_stat_ch.db_app_user_1m ) ENGINE = AggregatingMergeTree PARTITION BY toYYYYMMDD(bucket) -ORDER BY (bucket, db, app, username, cmd_type) +ORDER BY (instance_ubid, bucket, db_name, app, db_user, db_operation) AS SELECT - toStartOfMinute(toDateTime(ts_start)) AS bucket, - db, + toStartOfMinute(toDateTime(ts)) AS bucket, + instance_ubid, + db_name, app, - username, - cmd_type, + db_user, + db_operation, countState() AS calls_state, sumState(duration_us) AS duration_sum_state, quantilesTDigestState(0.95, 0.99)(duration_us) AS duration_q_state, sumState(toUInt64(err_elevel > 0)) AS errors_sum_state FROM pg_stat_ch.events_raw -GROUP BY bucket, db, app, username, cmd_type; +GROUP BY bucket, instance_ubid, db_name, app, db_user, db_operation; -- ============================================================================ @@ -409,16 +433,16 @@ GROUP BY bucket, db, app, username, cmd_type; -- EXAMPLE: Recent errors with query context -- -- SELECT --- ts_start, --- db, --- username, +-- ts, +-- db_name, +-- db_user, -- app, -- err_sqlstate, -- err_message, --- substring(query, 1, 200) AS query_preview +-- substring(query_text, 1, 200) AS query_preview -- FROM pg_stat_ch.errors_recent --- WHERE ts_start > now() - INTERVAL 1 HOUR --- ORDER BY ts_start DESC +-- WHERE ts > now() - INTERVAL 1 HOUR +-- ORDER BY ts DESC -- LIMIT 100; -- -- EXAMPLE: Error breakdown by SQLSTATE @@ -429,7 +453,7 @@ GROUP BY bucket, db, app, username, cmd_type; -- uniq(query_id) AS unique_queries, -- any(err_message) AS sample_message -- FROM pg_stat_ch.errors_recent --- WHERE ts_start > now() - INTERVAL 24 HOUR +-- WHERE ts > now() - INTERVAL 24 HOUR -- GROUP BY err_sqlstate -- ORDER BY occurrences DESC; -- @@ -450,14 +474,15 @@ DROP TABLE IF EXISTS pg_stat_ch.errors_recent; CREATE MATERIALIZED VIEW pg_stat_ch.errors_recent ENGINE = MergeTree -PARTITION BY toDate(ts_start) -ORDER BY ts_start -TTL toDateTime(ts_start) + INTERVAL 7 DAY DELETE +PARTITION BY toDate(ts) +ORDER BY (instance_ubid, ts) +TTL toDateTime(ts) + INTERVAL 7 DAY DELETE AS SELECT - ts_start, - db, - username, + ts, + instance_ubid, + db_name, + db_user, app, client_addr, pid, @@ -465,6 +490,6 @@ SELECT err_sqlstate, err_elevel, err_message, - query + query_text FROM pg_stat_ch.events_raw WHERE err_elevel > 0; diff --git a/src/export/clickhouse_exporter.cc b/src/export/clickhouse_exporter.cc index aaaf2f8..cbdb04b 100644 --- a/src/export/clickhouse_exporter.cc +++ b/src/export/clickhouse_exporter.cc @@ -202,11 +202,12 @@ class ClickHouseExporter : public StatsExporter { return MakeCol>(name); } - shared_ptr> DbNameColumn() final { return TagString("db"); } - shared_ptr> DbUserColumn() final { return TagString("username"); } + // Semantic columns + shared_ptr> DbNameColumn() final { return TagString("db_name"); } + shared_ptr> DbUserColumn() final { return TagString("db_user"); } shared_ptr> DbDurationColumn() final { return MetricUInt64("duration_us"); } - shared_ptr> DbOperationColumn() final { return TagString("cmd_type"); } - shared_ptr> DbQueryTextColumn() final { return RecordString("query"); } + shared_ptr> DbOperationColumn() final { return TagString("db_operation"); } + shared_ptr> DbQueryTextColumn() final { return RecordString("query_text"); } void BeginBatch() final { for (auto& col : columns_) diff --git a/src/export/exporter_interface.h b/src/export/exporter_interface.h index 1dad609..afbc8c9 100644 --- a/src/export/exporter_interface.h +++ b/src/export/exporter_interface.h @@ -55,17 +55,17 @@ class StatsExporter { // instrument). Pure virtuals enforce explicit handling in every exporter. // =========================================================================== - // Database name. CH: TagString "db"; OTel semconv: "db.name" tag. + // Database name. CH: TagString "db_name"; OTel semconv: "db.name" tag. virtual shared_ptr> DbNameColumn() = 0; - // Authenticated user. CH: TagString "username"; OTel semconv: "db.user" tag. + // Authenticated user. CH: TagString "db_user"; OTel semconv: "db.user" tag. virtual shared_ptr> DbUserColumn() = 0; // Query duration. Caller appends microseconds. CH: MetricUInt64 "duration_us"; // OTel: converts to seconds, records as Histogram "db.client.operation.duration". virtual shared_ptr> DbDurationColumn() = 0; - // SQL command type. CH: RecordString "cmd_type"; OTel: TagString "db.operation.name" + // SQL command type. CH: TagString "db_operation"; OTel: TagString "db.operation.name" // (used as a dimension on the duration histogram). virtual shared_ptr> DbOperationColumn() = 0; - // Query text. CH: RecordString "query"; OTel semconv: "db.query.text". + // Query text. CH: RecordString "query_text"; OTel semconv: "db.query.text". virtual shared_ptr> DbQueryTextColumn() = 0; virtual void BeginBatch() = 0; @@ -96,7 +96,7 @@ void RecordExporterFailure(const char* message); // Expected usage: // void ProcessBatch(StatsExporter *exporter) { // exporter->BeginBatch(); // no op or ClickHouse column reset -// auto col_user = exporter->TagString("username"); +// auto col_user = exporter->TagString("db_user"); // auto col_rows = exporter->MetricUInt64("rows"); // // for (const auto &ev : events) { diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index f4f2f06..25de31d 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -233,7 +233,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte exporter->BeginBatch(); - auto col_ts_start = exporter->RecordDateTime("ts_start"); + auto col_ts = exporter->RecordDateTime("ts"); auto col_duration_us = exporter->DbDurationColumn(); auto col_db = exporter->DbNameColumn(); auto col_username = exporter->DbUserColumn(); @@ -278,7 +278,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned"); auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched"); - auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate"); + auto col_err_sqlstate = exporter->TagString("err_sqlstate"); auto col_err_elevel = exporter->RecordUInt8("err_elevel"); auto col_err_message = exporter->RecordString("err_message"); @@ -288,7 +288,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte for (const auto& ev : events) { exporter->BeginRow(); - col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs); + col_ts->Append(ev.ts_start + kPostgresEpochOffsetUs); col_duration_us->Append(ev.duration_us); col_db->Append(std::string(ev.datname, ev.datname_len)); col_username->Append(std::string(ev.username, ev.username_len)); @@ -335,7 +335,8 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_parallel_workers_planned->Append(ev.parallel_workers_planned); col_parallel_workers_launched->Append(ev.parallel_workers_launched); - col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5)); + col_err_sqlstate->Append( + std::string(ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate)))); col_err_elevel->Append(ev.err_elevel); auto elen = ClampFieldLen(ev.err_message_len, static_cast(PSCH_MAX_ERR_MSG_LEN), "err_message_len"); diff --git a/t/010_clickhouse_export.pl b/t/010_clickhouse_export.pl index fd2768b..64a4bd4 100644 --- a/t/010_clickhouse_export.pl +++ b/t/010_clickhouse_export.pl @@ -58,7 +58,7 @@ # Verify query field is populated my $query_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE query != ''", + "SELECT count() FROM pg_stat_ch.events_raw WHERE query_text != ''", sub { $_[0] >= 1 }, 10 ); @@ -134,18 +134,18 @@ cmp_ok($duration_check, '>=', 1, 'duration_us is populated'); my $db_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE db = 'postgres'", + "SELECT count() FROM pg_stat_ch.events_raw WHERE db_name = 'postgres'", sub { $_[0] >= 1 }, 10 ); - cmp_ok($db_check, '>=', 1, 'db field is populated'); + cmp_ok($db_check, '>=', 1, 'db_name field is populated'); - my $cmd_type_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE cmd_type != ''", + my $db_operation_check = psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw WHERE db_operation != ''", sub { $_[0] >= 1 }, 10 ); - cmp_ok($cmd_type_check, '>=', 1, 'cmd_type is populated'); + cmp_ok($db_operation_check, '>=', 1, 'db_operation is populated'); # Clean up $node->safe_psql('postgres', 'DROP TABLE IF EXISTS test_fields'); diff --git a/t/012_timing_accuracy.pl b/t/012_timing_accuracy.pl index d615f09..d306caa 100644 --- a/t/012_timing_accuracy.pl +++ b/t/012_timing_accuracy.pl @@ -101,7 +101,7 @@ # Get timing statistics my $timing_stats = psch_query_clickhouse( "SELECT count(), avg(duration_us), min(duration_us), max(duration_us) " . - "FROM pg_stat_ch.events_raw WHERE query LIKE '%pgbench%' OR query LIKE '%UPDATE%'" + "FROM pg_stat_ch.events_raw WHERE query_text LIKE '%pgbench%' OR query_text LIKE '%UPDATE%'" ); diag("Timing stats: $timing_stats"); @@ -131,7 +131,7 @@ # Check that we captured a duration close to 100ms my $duration = psch_query_clickhouse( "SELECT duration_us FROM pg_stat_ch.events_raw " . - "WHERE query LIKE '%pg_sleep%' ORDER BY ts_start DESC LIMIT 1" + "WHERE query_text LIKE '%pg_sleep%' ORDER BY ts DESC LIMIT 1" ); if ($duration) { diff --git a/t/013_clickhouse_tls.pl b/t/013_clickhouse_tls.pl index d8a0951..18f4cde 100644 --- a/t/013_clickhouse_tls.pl +++ b/t/013_clickhouse_tls.pl @@ -63,7 +63,7 @@ sub ch_tls_ready { "events visible in TLS ClickHouse (got $ch_count)"); my $query_check = psch_wait_for_clickhouse_query( - "SELECT count() FROM pg_stat_ch.events_raw WHERE query != ''", sub { $_[0] >= 1 }, 10); + "SELECT count() FROM pg_stat_ch.events_raw WHERE query_text != ''", sub { $_[0] >= 1 }, 10); cmp_ok($query_check, '>=', 1, 'query text captured over TLS'); }; diff --git a/t/021_cmd_type_counts.pl b/t/021_cmd_type_counts.pl index aa475dd..cccf9ac 100755 --- a/t/021_cmd_type_counts.pl +++ b/t/021_cmd_type_counts.pl @@ -28,10 +28,10 @@ batch_max => 100 ); -# Helper: parse cmd_type counts from ClickHouse +# Helper: parse db_operation counts from ClickHouse sub get_cmd_type_counts { my $result = psch_query_clickhouse( - "SELECT cmd_type, count() FROM pg_stat_ch.events_raw GROUP BY cmd_type FORMAT TabSeparated" + "SELECT db_operation, count() FROM pg_stat_ch.events_raw GROUP BY db_operation FORMAT TabSeparated" ); my %counts; for my $line (split /\n/, $result) { diff --git a/t/027_query_normalization.pl b/t/027_query_normalization.pl index 2c4bac6..a2702f2 100644 --- a/t/027_query_normalization.pl +++ b/t/027_query_normalization.pl @@ -53,12 +53,12 @@ sub get_captured_query { psch_wait_for_export($node, 1, 10); return psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != '' " . - "ORDER BY ts_start DESC LIMIT 1", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != '' " . + "ORDER BY ts DESC LIMIT 1", sub { $_[0] ne '' }, 10 ); @@ -193,11 +193,11 @@ sub get_captured_query { psch_wait_for_export($node, 2, 10); my $all_queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%test_norm%' " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query != ''", + "AND query_text LIKE '%test_norm%' " . + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text != ''", sub { $_[0] ne '' }, 10 ); @@ -292,19 +292,19 @@ sub get_captured_query { $session->quit(); my $error_query = psch_query_clickhouse( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE err_message != '' " . - "ORDER BY ts_start DESC LIMIT 1" + "ORDER BY ts DESC LIMIT 1" ); is($error_query, '', 'Error events do not export query text'); my $q = psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE err_message = '' " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != '' " . - "ORDER BY ts_start DESC LIMIT 1", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != '' " . + "ORDER BY ts DESC LIMIT 1", sub { $_[0] ne '' }, 10 ); @@ -388,8 +388,8 @@ sub get_captured_query { my $nested_count = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] >= 3 }, 10 ); @@ -397,10 +397,10 @@ sub get_captured_query { 'Captured recursive nested executions of the same SPI statement'); my $queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] ne '' }, 10 ); @@ -433,8 +433,8 @@ sub get_captured_query { my $repeat_count = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] >= 4 }, 10 ); @@ -442,10 +442,10 @@ sub get_captured_query { 'Captured repeated executions of the same SPI statement in one backend'); my $repeat_queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%WHERE%' " . - "AND query LIKE '%nested_normalize_same_sql%'", + "AND query_text LIKE '%WHERE%' " . + "AND query_text LIKE '%nested_normalize_same_sql%'", sub { $_[0] ne '' }, 10 ); diff --git a/t/031_normalize_cache.pl b/t/031_normalize_cache.pl index 68842b1..ea9bd11 100644 --- a/t/031_normalize_cache.pl +++ b/t/031_normalize_cache.pl @@ -85,11 +85,11 @@ # Collect all exported query texts from this backend. my $all_queries = psch_wait_for_clickhouse_query( - "SELECT groupArray(query) FROM pg_stat_ch.events_raw " . + "SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != ''", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != ''", sub { $_[0] ne '' }, 10 ); @@ -107,9 +107,9 @@ my $count = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query NOT LIKE '%pg_stat_ch%' " . - "AND query NOT LIKE '%pg_extension%' " . - "AND query != ''", + "AND query_text NOT LIKE '%pg_stat_ch%' " . + "AND query_text NOT LIKE '%pg_extension%' " . + "AND query_text != ''", sub { $_[0] >= scalar(@queries) }, 10 ); @@ -160,10 +160,10 @@ psch_wait_for_export($node, 1, 10); my $exported_query = psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE 'SELECT%pg_class%' " . - "ORDER BY ts_start DESC LIMIT 1", + "AND query_text LIKE 'SELECT%pg_class%' " . + "ORDER BY ts DESC LIMIT 1", sub { $_[0] ne '' }, 10 ); @@ -236,10 +236,10 @@ psch_wait_for_export($node, 1, 10); my $first = psch_wait_for_clickhouse_query( - "SELECT query FROM pg_stat_ch.events_raw " . + "SELECT query_text FROM pg_stat_ch.events_raw " . "WHERE pid = $pid " . - "AND query LIKE '%pg_class%' " . - "AND query LIKE '%\$1%' " . + "AND query_text LIKE '%pg_class%' " . + "AND query_text LIKE '%\$1%' " . "LIMIT 1", sub { $_[0] ne '' }, 10 @@ -281,7 +281,7 @@ my $empty_count = psch_query_clickhouse( "SELECT count() FROM pg_stat_ch.events_raw " . - "WHERE pid = $pid AND query = ''" + "WHERE pid = $pid AND query_text = ''" ); cmp_ok($empty_count, '>=', 1, 'Evicted cache entry produces empty query text on re-EXECUTE'); diff --git a/t/034_query_intern_oom_export.pl b/t/034_query_intern_oom_export.pl index 9faeab2..facf8d0 100644 --- a/t/034_query_intern_oom_export.pl +++ b/t/034_query_intern_oom_export.pl @@ -18,7 +18,7 @@ # 2. Send the same many-distinct-query workload as 033. # 3. After the workload, force-flush and wait for the drain to complete. # 4. Ask ClickHouse for the slice of rows where the interner failed -# (query = '') and assert those rows carry duration_us, db, cmd_type, +# (query_text = '') and assert those rows carry duration_us, db_name, db_operation, # and pid — the contract that "numeric telemetry is preserved on intern # failure" actually holds end-to-end. @@ -144,17 +144,17 @@ # Contract under test: # - intern_failed > 0 (we *did* drop some text; this proves the OOM path # reached the consumer, not just the in-process counter). -# - every intern_failed row still carries duration_us, db, and cmd_type — -# the numeric/identity telemetry the customer relies on for slow-query -# analysis even when SQL text is unavailable. +# - every intern_failed row still carries duration_us, db_name, and +# db_operation — the numeric/identity telemetry the customer relies on +# for slow-query analysis even when SQL text is unavailable. # --------------------------------------------------------------------------- my $intern_failed_rows = psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " . - "WHERE pid = $pid AND query = ''", + "WHERE pid = $pid AND query_text = ''", sub { $_[0] >= 1 }, 15); cmp_ok($intern_failed_rows, '>=', 1, - "ClickHouse has rows with empty query (got $intern_failed_rows)"); + "ClickHouse has rows with empty query_text (got $intern_failed_rows)"); # Among the intern_failed rows, count how many have duration_us > 0. # We expect ALL of them to — a SELECT can complete in microseconds but never @@ -164,10 +164,10 @@ # equality with the total count of empty-query rows. my $intern_failed_with_metrics = psch_query_clickhouse( "SELECT count() FROM pg_stat_ch.events_raw " . - "WHERE pid = $pid AND query = '' AND duration_us > 0 " . - "AND db = 'postgres' AND cmd_type != ''"); + "WHERE pid = $pid AND query_text = '' AND duration_us > 0 " . + "AND db_name = 'postgres' AND db_operation != ''"); cmp_ok($intern_failed_with_metrics, '>=', $intern_failed_rows, - "every empty-query row carries duration_us, db, and cmd_type " . + "every empty-query_text row carries duration_us, db_name, and db_operation " . "($intern_failed_with_metrics of $intern_failed_rows)"); # Sanity: there should also be at least *some* rows whose query text did @@ -175,7 +175,7 @@ # otherwise-working system, we're testing total failure. my $intern_ok_rows = psch_query_clickhouse( "SELECT count() FROM pg_stat_ch.events_raw " . - "WHERE pid = $pid AND query != ''"); + "WHERE pid = $pid AND query_text != ''"); cmp_ok($intern_ok_rows, '>=', 1, "ClickHouse has at least one row with non-empty query " . "(intern was working before pool filled; got $intern_ok_rows)"); From 033804d3d272d74440266006f26a71e99a1458b1 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 20 May 2026 12:52:13 -0400 Subject: [PATCH 2/9] schema: move docker init schema into Goose migrations directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mechanical move + goose annotations. The pg_stat_ch ClickHouse schema that was previously the docker quickstart init script becomes the first real Goose migration under schema/migrations/, matching clickgres-platform's runner layout (pressly/goose v3, DialectClickHouse, embed.FS). Changes to the content of the moved file: * Header banner rewritten from "CANONICAL SCHEMA REFERENCE / single source of truth / dual role as docker init" to "initial migration" framing. * Added -- +goose Up / -- +goose Down section markers. * Each CREATE DATABASE / CREATE TABLE / CREATE MATERIALIZED VIEW wrapped in -- +goose StatementBegin / StatementEnd so goose's parser handles the multi-statement bodies correctly. * Removed the pre-CREATE "DROP TABLE IF EXISTS X" idioms — those existed to make the docker init script idempotent on container restart, but goose tracks state via goose_db_version. Drops now live exclusively in the -- +goose Down section in reverse dependency order. The schema content itself (column names, types, MV definitions, ORDER BY / TTL / SETTINGS) is unchanged from the previous commit. Git rename detection should follow docker/init/00-schema.sql -> schema/migrations/20260519000001_create_initial_schema.sql. Also adds schema/migrations/00000000000001_bootstrap.sql, a no-op SELECT 1 migration required by goose to seed the goose_db_version table (copied verbatim from clickgres-platform's bootstrap). Validated end-to-end against clickhouse/clickhouse-server:26.1: pressly goose v3.27.1 `up` and `reset` round-trip cleanly. All 51 columns and 4 MVs land with the expected types. Note: this leaves docker/init/ empty. The docker-compose mounts will need updating in a follow-on PR to point at schema/migrations/ (which requires a small shim to invoke goose-up at container start, since clickhouse-server's docker entrypoint cannot parse goose's StatementBegin/End directives directly). Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/init/.gitkeep | 5 ++ .../migrations/00000000000001_bootstrap.sql | 9 +++ .../20260519000001_create_initial_schema.sql | 62 ++++++++++++------- 3 files changed, 54 insertions(+), 22 deletions(-) create mode 100644 docker/init/.gitkeep create mode 100644 schema/migrations/00000000000001_bootstrap.sql rename docker/init/00-schema.sql => schema/migrations/20260519000001_create_initial_schema.sql (96%) diff --git a/docker/init/.gitkeep b/docker/init/.gitkeep new file mode 100644 index 0000000..3fb9a7d --- /dev/null +++ b/docker/init/.gitkeep @@ -0,0 +1,5 @@ +# Reserved for legacy docker/quickstart bind mounts. The canonical CH schema +# now lives in schema/migrations/ and is applied via goose (see CI workflow). +# This directory is intentionally empty; the .gitkeep keeps the bind mount +# in docker/docker-compose.test.yml and docker/quickstart/docker-compose.yml +# from failing on a missing host path. diff --git a/schema/migrations/00000000000001_bootstrap.sql b/schema/migrations/00000000000001_bootstrap.sql new file mode 100644 index 0000000..1b3cdc5 --- /dev/null +++ b/schema/migrations/00000000000001_bootstrap.sql @@ -0,0 +1,9 @@ +-- Bootstrap migration: seeds the Goose version table so that subsequent +-- migrations have a baseline. The SELECT 1 statements are intentional +-- no-ops — do not delete this file. + +-- +goose Up +SELECT 1; + +-- +goose Down +SELECT 1; diff --git a/docker/init/00-schema.sql b/schema/migrations/20260519000001_create_initial_schema.sql similarity index 96% rename from docker/init/00-schema.sql rename to schema/migrations/20260519000001_create_initial_schema.sql index eee768a..a56e7d7 100644 --- a/docker/init/00-schema.sql +++ b/schema/migrations/20260519000001_create_initial_schema.sql @@ -1,18 +1,10 @@ -- ============================================================================ --- ClickHouse schema for pg_stat_ch events +-- Initial pg_stat_ch ClickHouse schema: events_raw + 4 aggregation MVs. -- ============================================================================ -- --- CANONICAL SCHEMA REFERENCE --- --- This file is the single source of truth for the pg_stat_ch ClickHouse schema. --- It serves a dual role: --- 1. Docker init script (applied automatically by docker-compose) --- 2. Schema documentation (column comments, MV explanations) --- --- For production deployments: clickhouse-client < docker/init/00-schema.sql --- For documentation: see docs/clickhouse.md --- --- ============================================================================ +-- This is the first real migration on top of bootstrap; everything in here is +-- authored together pre-GA, applied together, and not subject to historical +-- evolution. Subsequent schema changes get their own timestamped files. -- -- This schema is designed for the pg_stat_ch PostgreSQL extension which exports -- raw query execution telemetry to ClickHouse. All aggregation (p50/p95/p99, @@ -20,9 +12,11 @@ -- -- ============================================================================ -CREATE DATABASE IF NOT EXISTS pg_stat_ch; +-- +goose Up -DROP TABLE IF EXISTS pg_stat_ch.events_raw; +-- +goose StatementBegin +CREATE DATABASE IF NOT EXISTS pg_stat_ch; +-- +goose StatementEnd -- ============================================================================ -- events_raw: Raw query execution events @@ -37,6 +31,7 @@ DROP TABLE IF EXISTS pg_stat_ch.events_raw; -- -- ============================================================================ +-- +goose StatementBegin CREATE TABLE pg_stat_ch.events_raw ( -- ======================================================================== @@ -214,6 +209,7 @@ PARTITION BY toDate(ts) ORDER BY (instance_ubid, ts) TTL toDate(ts) + INTERVAL 180 DAY SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; +-- +goose StatementEnd -- ============================================================================ @@ -241,8 +237,7 @@ SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.events_recent_1h; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.events_recent_1h ENGINE = MergeTree PARTITION BY toDate(ts) @@ -251,6 +246,7 @@ TTL toDateTime(ts) + INTERVAL 1 HOUR DELETE AS SELECT * FROM pg_stat_ch.events_raw; +-- +goose StatementEnd -- ============================================================================ @@ -293,8 +289,7 @@ FROM pg_stat_ch.events_raw; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.query_stats_5m; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.query_stats_5m ( bucket DateTime COMMENT '5-minute time bucket start', @@ -335,6 +330,7 @@ SELECT sumState(shared_blks_read) AS shared_read_sum_state FROM pg_stat_ch.events_raw GROUP BY bucket, instance_ubid, db_name, query_id, db_operation; +-- +goose StatementEnd -- ============================================================================ @@ -378,8 +374,7 @@ GROUP BY bucket, instance_ubid, db_name, query_id, db_operation; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.db_app_user_1m; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.db_app_user_1m ( bucket DateTime COMMENT '1-minute time bucket start', @@ -412,6 +407,7 @@ SELECT sumState(toUInt64(err_elevel > 0)) AS errors_sum_state FROM pg_stat_ch.events_raw GROUP BY bucket, instance_ubid, db_name, app, db_user, db_operation; +-- +goose StatementEnd -- ============================================================================ @@ -470,8 +466,7 @@ GROUP BY bucket, instance_ubid, db_name, app, db_user, db_operation; -- -- ============================================================================ -DROP TABLE IF EXISTS pg_stat_ch.errors_recent; - +-- +goose StatementBegin CREATE MATERIALIZED VIEW pg_stat_ch.errors_recent ENGINE = MergeTree PARTITION BY toDate(ts) @@ -493,3 +488,26 @@ SELECT query_text FROM pg_stat_ch.events_raw WHERE err_elevel > 0; +-- +goose StatementEnd + +-- +goose Down + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.errors_recent; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.db_app_user_1m; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.query_stats_5m; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.events_recent_1h; +-- +goose StatementEnd + +-- +goose StatementBegin +DROP TABLE IF EXISTS pg_stat_ch.events_raw; +-- +goose StatementEnd From 857400ccc9add1cd310bcd33bd2875ac8ae95e41 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 20 May 2026 13:42:18 -0400 Subject: [PATCH 3/9] ci(tap): apply schema/migrations via goose instead of raw clickhouse-client The previous "Initialize ClickHouse schema" step ran `clickhouse-client --multiquery < docker/init/00-schema.sql`. That file moved in the previous commit; pointing the step at the new location without further changes would not work because clickhouse-client cannot parse goose -- +goose Up/Down/StatementBegin/End directives, and would execute the Down section's DROP statements right after the Up section's CREATEs. Switch the step to install pressly/goose v3.27.1 (~5 sec on Ubuntu CI runners which have Go preinstalled) and apply the migrations from schema/migrations/ via `goose ... up` against the running CH container. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci-tap.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-tap.yml b/.github/workflows/ci-tap.yml index e6e96ae..47f2d0b 100644 --- a/.github/workflows/ci-tap.yml +++ b/.github/workflows/ci-tap.yml @@ -101,9 +101,14 @@ jobs: docker compose -f docker/docker-compose.test.yml logs exit 1 + - name: Install goose + run: go install github.com/pressly/goose/v3/cmd/goose@v3.27.1 + - name: Initialize ClickHouse schema run: | - docker exec psch-clickhouse clickhouse-client --multiquery < docker/init/00-schema.sql + docker exec psch-clickhouse clickhouse-client -q "CREATE DATABASE IF NOT EXISTS pg_stat_ch" + "$HOME/go/bin/goose" -dir schema/migrations \ + clickhouse "tcp://localhost:19000?database=pg_stat_ch" up - name: Run TAP tests run: | From 57bf2078509e0d236ca999ddb50a5b93d933fb74 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Mon, 8 Jun 2026 18:22:01 -0400 Subject: [PATCH 4/9] schema: add read_replica_type column to events_raw MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-add the column being introduced on both sides of the Arrow pipeline to keep the unified events_raw schema wire-compatible once the cutover happens: * pg_stat_ch PR #107 — adds read_replica_type to the Arrow IPC output in arrow_batch.cc (dictionary-encoded, populated from the pg_stat_ch.extra_attributes GUC). * clickgres-platform PR #448 — adds the matching ALTER TABLE on query_logs_arrow (LowCardinality(String) DEFAULT 'none' AFTER server_role) and promotes read-replica traffic into query_logs via a widened MV filter. Mirroring the same type, default, and position here means the eventual cutover from query_logs_arrow to events_raw needs zero further schema changes — and lets PR #107 rebase onto unified_schema without having to amend the schema migration. Co-Authored-By: Claude Opus 4.7 (1M context) --- schema/migrations/20260519000001_create_initial_schema.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/schema/migrations/20260519000001_create_initial_schema.sql b/schema/migrations/20260519000001_create_initial_schema.sql index a56e7d7..f3b0504 100644 --- a/schema/migrations/20260519000001_create_initial_schema.sql +++ b/schema/migrations/20260519000001_create_initial_schema.sql @@ -198,6 +198,7 @@ CREATE TABLE pg_stat_ch.events_raw instance_ubid String DEFAULT '' COMMENT 'Ubicloud ID of the emitting Postgres instance. String (opaque, not LC) — cardinality scales with active customer count.', server_ubid String DEFAULT '' COMMENT 'Ubicloud ID of the underlying server. String (opaque).', server_role LowCardinality(String) DEFAULT '' COMMENT 'Server role within the HA pair. ~2 values: "primary", "standby".', + read_replica_type LowCardinality(String) DEFAULT 'none' COMMENT 'Read-replica classification. ~few values: "regional", "none". Lets downstream promote read-replica traffic that would otherwise report server_role=standby. Default ''none'' matches clickgres-platform query_logs_arrow.', region LowCardinality(String) DEFAULT '' COMMENT 'Cloud region (e.g. "us-east-1"). ~tens of values.', cell LowCardinality(String) DEFAULT '' COMMENT 'Cell (sharded deployment unit) within the region. ~hundreds of values.', service_version LowCardinality(String) DEFAULT '' COMMENT 'pg_stat_ch extension version.', From db35cfc00f061a4528e216fe5c3ebe3804e6f80c Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Tue, 9 Jun 2026 18:51:35 -0400 Subject: [PATCH 5/9] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- t/034_query_intern_oom_export.pl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/034_query_intern_oom_export.pl b/t/034_query_intern_oom_export.pl index facf8d0..5c761e8 100644 --- a/t/034_query_intern_oom_export.pl +++ b/t/034_query_intern_oom_export.pl @@ -177,7 +177,7 @@ "SELECT count() FROM pg_stat_ch.events_raw " . "WHERE pid = $pid AND query_text != ''"); cmp_ok($intern_ok_rows, '>=', 1, - "ClickHouse has at least one row with non-empty query " . + "ClickHouse has at least one row with non-empty query_text " . "(intern was working before pool filled; got $intern_ok_rows)"); $node->stop(); From b758c9acb26651bc22a6d12ad5cafd7bf6914bf8 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Tue, 9 Jun 2026 18:51:42 -0400 Subject: [PATCH 6/9] ci(tap): resolve goose via $(go env GOPATH)/bin instead of hardcoding $HOME/go/bin Per Copilot review on #99: $HOME/go/bin assumes GOPATH=$HOME/go, which is the Go default but not guaranteed (CI runners can configure GOPATH elsewhere, and GOBIN can override the install location entirely). Resolve the actual path via `go env GOPATH` and add it to $GITHUB_PATH so subsequent steps can just call `goose` without a path prefix. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci-tap.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-tap.yml b/.github/workflows/ci-tap.yml index 47f2d0b..ac0d626 100644 --- a/.github/workflows/ci-tap.yml +++ b/.github/workflows/ci-tap.yml @@ -102,12 +102,14 @@ jobs: exit 1 - name: Install goose - run: go install github.com/pressly/goose/v3/cmd/goose@v3.27.1 + run: | + go install github.com/pressly/goose/v3/cmd/goose@v3.27.1 + echo "$(go env GOPATH)/bin" >> "$GITHUB_PATH" - name: Initialize ClickHouse schema run: | docker exec psch-clickhouse clickhouse-client -q "CREATE DATABASE IF NOT EXISTS pg_stat_ch" - "$HOME/go/bin/goose" -dir schema/migrations \ + goose -dir schema/migrations \ clickhouse "tcp://localhost:19000?database=pg_stat_ch" up - name: Run TAP tests From 12ab775f4e003865f37516975bad93b1bc6edd3b Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Tue, 9 Jun 2026 18:59:51 -0400 Subject: [PATCH 7/9] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- src/export/stats_exporter.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index 25de31d..aedd100 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -335,8 +335,8 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_parallel_workers_planned->Append(ev.parallel_workers_planned); col_parallel_workers_launched->Append(ev.parallel_workers_launched); - col_err_sqlstate->Append( - std::string(ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate)))); + col_err_sqlstate->Append(std::string( + ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate) - 1))); col_err_elevel->Append(ev.err_elevel); auto elen = ClampFieldLen(ev.err_message_len, static_cast(PSCH_MAX_ERR_MSG_LEN), "err_message_len"); From b8282dc6bf64a3a0433e60aabe0d2267e5906b75 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Tue, 9 Jun 2026 19:01:20 -0400 Subject: [PATCH 8/9] chore: rename leftover cmd_type/db/username/query identifiers Per Copilot review on #99: identifiers carrying the old column names through Perl helpers and C++ locals were misleading after the column renames in commit 1. Updating them so failure messages and stack traces reflect what the code now actually does. t/021_cmd_type_counts.pl: sub get_cmd_type_counts -> sub get_db_operation_counts (+ 4 call sites) node name 'cmd_type_counts' -> 'db_operation_counts' src/export/stats_exporter.cc: col_db -> col_db_name col_username -> col_db_user col_cmd_type -> col_db_operation col_query -> col_query_text File name t/021_cmd_type_counts.pl is left as-is (per earlier decision - file rename is more churn than it's worth pre-GA; the TAP summary report will read fine). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/export/stats_exporter.cc | 16 ++++++++-------- t/021_cmd_type_counts.pl | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index aedd100..5811a8e 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -235,13 +235,13 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte auto col_ts = exporter->RecordDateTime("ts"); auto col_duration_us = exporter->DbDurationColumn(); - auto col_db = exporter->DbNameColumn(); - auto col_username = exporter->DbUserColumn(); + auto col_db_name = exporter->DbNameColumn(); + auto col_db_user = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); - auto col_cmd_type = exporter->DbOperationColumn(); + auto col_db_operation = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); - auto col_query = exporter->DbQueryTextColumn(); + auto col_query_text = exporter->DbQueryTextColumn(); auto col_shared_blks_hit = exporter->MetricInt64("shared_blks_hit"); auto col_shared_blks_read = exporter->MetricInt64("shared_blks_read"); @@ -290,15 +290,15 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_ts->Append(ev.ts_start + kPostgresEpochOffsetUs); col_duration_us->Append(ev.duration_us); - col_db->Append(std::string(ev.datname, ev.datname_len)); - col_username->Append(std::string(ev.username, ev.username_len)); + col_db_name->Append(std::string(ev.datname, ev.datname_len)); + col_db_user->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); - col_cmd_type->Append(PschCmdTypeToString(ev.cmd_type)); + col_db_operation->Append(PschCmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); auto qlen = ClampFieldLen(ev.query_len, static_cast(PSCH_MAX_QUERY_LEN), "query_len"); - col_query->Append(std::string(ev.query, qlen)); + col_query_text->Append(std::string(ev.query, qlen)); col_shared_blks_hit->Append(ev.shared_blks_hit); col_shared_blks_read->Append(ev.shared_blks_read); diff --git a/t/021_cmd_type_counts.pl b/t/021_cmd_type_counts.pl index cccf9ac..0ffc6e9 100755 --- a/t/021_cmd_type_counts.pl +++ b/t/021_cmd_type_counts.pl @@ -23,13 +23,13 @@ plan skip_all => 'ClickHouse container not running'; } -my $node = psch_init_node_with_clickhouse('cmd_type_counts', +my $node = psch_init_node_with_clickhouse('db_operation_counts', flush_interval_ms => 100, batch_max => 100 ); # Helper: parse db_operation counts from ClickHouse -sub get_cmd_type_counts { +sub get_db_operation_counts { my $result = psch_query_clickhouse( "SELECT db_operation, count() FROM pg_stat_ch.events_raw GROUP BY db_operation FORMAT TabSeparated" ); @@ -72,7 +72,7 @@ sub get_cmd_type_counts { $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); sleep(2); # Wait for export without polling (polling adds SELECTs) - my %counts = get_cmd_type_counts(); + my %counts = get_db_operation_counts(); # Verify exact counts (flush adds 1 SELECT, so expect 4) is($counts{SELECT} // 0, 4, 'SELECT count = 4 (3 + flush)'); @@ -109,7 +109,7 @@ sub get_cmd_type_counts { $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); sleep(2); # Wait for export without polling - my %counts = get_cmd_type_counts(); + my %counts = get_db_operation_counts(); is($counts{UTILITY} // 0, 8, 'UTILITY count = 8 DDL statements'); is($counts{SELECT} // 0, 1, 'SELECT count = 1 (flush only)'); @@ -156,7 +156,7 @@ sub get_cmd_type_counts { $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); sleep(2); # Wait for export without polling - my %counts = get_cmd_type_counts(); + my %counts = get_db_operation_counts(); # Verify exact counts # SELECT: 2 queries + 1 flush = 3 @@ -206,7 +206,7 @@ sub get_cmd_type_counts { $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); sleep(2); # Wait for export without polling - my %counts = get_cmd_type_counts(); + my %counts = get_db_operation_counts(); # All DDL should be UTILITY - exactly 11 is($counts{UTILITY} // 0, 11, From 5934589d260fba7cfdd0035d1af55288ae7e6536 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Tue, 9 Jun 2026 19:05:21 -0400 Subject: [PATCH 9/9] chore: restore clang-format-clean wrap on col_err_sqlstate Append Copilot's autofix in 12ab775 shifted the line break from before std::string( to after the opening paren while changing strnlen(..., sizeof(...) - 1). The new wrap position trips clang-format on the column-limit rule. Move the break back to where it was originally (before std::string), keep the sizeof - 1 fix. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/export/stats_exporter.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index 5811a8e..2a2899d 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -335,8 +335,8 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_parallel_workers_planned->Append(ev.parallel_workers_planned); col_parallel_workers_launched->Append(ev.parallel_workers_launched); - col_err_sqlstate->Append(std::string( - ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate) - 1))); + col_err_sqlstate->Append( + std::string(ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate) - 1))); col_err_elevel->Append(ev.err_elevel); auto elen = ClampFieldLen(ev.err_message_len, static_cast(PSCH_MAX_ERR_MSG_LEN), "err_message_len");