From 6a7062ce290cf491f19832c31d277545c1f8699e Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 13 May 2026 11:56:12 -0400 Subject: [PATCH 01/10] feat: emit parent_query_id to link nested SPI queries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each event now carries the queryid of its calling query (0 for top-level). Lets aggregations filter to top-level work with WHERE parent_query_id = 0, avoiding the double-counting that happens when plpgsql functions issue SPI statements that themselves emit events. The Event payload's prior `bool top_level` is replaced in-place by `uint64 parent_query_id`; static asserts in ring_entry.h continue to verify layout equivalence with the on-wire ring slot. Single statics for rusage_start, query_start_ts, and current_query_is_top_level are consolidated into a fixed-size query_stack[16] indexed by nesting_level. The previous single rusage_start was clobbered on nested SPI getrusage() calls, so nested CPU deltas were measuring just the inner portion and the outer's CPU was wrong; per-frame baselines fix that alongside parent linkage. The PG_TRY/nesting_level pattern in ExecutorRun/Finish/ProcessUtility is untouched. Schema gains a parent_query_id UInt64 column; migrations/001_add_parent_query_id.sql covers existing deployments. This supersedes #61 (which mixed the feature with broader refactor churn — vector→array→array+counter iterations, helper extraction, depth-cap retunes. Co-Authored-By: Claude Opus 4.7 (1M context) EOF ) --- docker/init/00-schema.sql | 2 + migrations/001_add_parent_query_id.sql | 12 +++ src/export/stats_exporter.cc | 2 + src/hooks/hooks.c | 103 +++++++++++++++++-------- src/queue/event.h | 6 +- src/queue/ring_entry.h | 2 +- 6 files changed, 90 insertions(+), 37 deletions(-) create mode 100644 migrations/001_add_parent_query_id.sql diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..c353915 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -53,6 +53,8 @@ CREATE TABLE pg_stat_ch.events_raw query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', + parent_query_id UInt64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration.', + cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', diff --git a/migrations/001_add_parent_query_id.sql b/migrations/001_add_parent_query_id.sql new file mode 100644 index 0000000..3b49bf7 --- /dev/null +++ b/migrations/001_add_parent_query_id.sql @@ -0,0 +1,12 @@ +-- Migration: add parent_query_id column +-- +-- Introduced in pg_stat_ch 0.4.x. Each event now carries the query_id of its +-- calling query (e.g. the plpgsql function that issued an SPI statement). +-- Top-level queries emit 0. Use WHERE parent_query_id = 0 in aggregations to +-- avoid double-counting CPU and duration across nested calls. +-- +-- Run against your ClickHouse instance before upgrading the extension: +-- clickhouse-client < migrations/001_add_parent_query_id.sql + +ALTER TABLE pg_stat_ch.events_raw + ADD COLUMN IF NOT EXISTS parent_query_id UInt64 DEFAULT 0; diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index ee19661..b2d2270 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -241,6 +241,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte auto col_username = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); + auto col_parent_query_id = exporter->RecordUInt64("parent_query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); @@ -296,6 +297,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); + col_parent_query_id->Append(ev.parent_query_id); col_cmd_type->Append(PschCmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index 6d99d3a..8ff75e8 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -51,18 +51,30 @@ static emit_log_hook_type prev_emit_log_hook = NULL; // Track nesting level to identify top-level queries static int nesting_level = 0; -// CPU time tracking via getrusage -static struct rusage rusage_start; +// Per-query frame indexed by nesting_level: each query gets its own rusage +// baseline and start timestamp, and its queryid is stashed so a nested SPI +// call can read its parent. Replaces three single statics whose lifetimes +// collapsed under SPI: the outer's rusage baseline was overwritten by the +// inner's getrusage(), so the outer's CPU delta ended up measuring just the +// nested portion. +// +// PSCH_MAX_NESTING_DEPTH is a small fixed cap. Frames at greater depth are +// not written; the corresponding emit at that depth falls back to zero CPU +// and parent_query_id 0. PL/pgSQL recursion is capped well below 16 by +// PostgreSQL itself (max_stack_depth typically yields a much shallower +// effective limit), so the cap is defensive against runaway recursive +// functions rather than a normal-case constraint. +#define PSCH_MAX_NESTING_DEPTH 16 +typedef struct PschQueryFrame { + uint64 queryid; + struct rusage rusage_start; + TimestampTz query_start_ts; +} PschQueryFrame; +static PschQueryFrame query_stack[PSCH_MAX_NESTING_DEPTH]; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; -// Track whether the current query started at top level -static bool current_query_is_top_level = false; - -// Track query start time for duration calculation -static TimestampTz query_start_ts = 0; - // System initialization flag - set after hooks are installed and shmem is ready static bool system_init = false; @@ -311,18 +323,28 @@ static void InitEventPartial(PschEvent* event) { event->query[0] = '\0'; } -static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, bool top_level, +static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_query_id, PschCmdType cmd_type) { InitEventPartial(event); event->ts_start = ts_start; event->dbid = MyDatabaseId; event->userid = GetUserId(); event->pid = MyProcPid; - event->top_level = top_level; + event->parent_query_id = parent_query_id; event->cmd_type = cmd_type; ResolveNames(event); } +// Return the queryid of the query at depth `nesting_level - 1` — i.e., the +// caller of whatever is currently emitting. 0 at top level or when depth +// exceeded PSCH_MAX_NESTING_DEPTH (no frame was written for that slot). +static uint64 GetParentQueryId(void) { + if (nesting_level <= 0 || nesting_level > PSCH_MAX_NESTING_DEPTH) { + return 0; + } + return query_stack[nesting_level - 1].queryid; +} + static void CopyClientContext(PschEvent* event) { event->application_name_len = (uint8)( GetApplicationName(event->application_name, sizeof(event->application_name))); @@ -412,10 +434,9 @@ static void CopyParallelWorkerInfo(PschEvent* event pg_attribute_unused(), #endif } -static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int64 cpu_user_us, - int64 cpu_sys_us) { - InitBaseEvent(event, query_start_ts, current_query_is_top_level, - ConvertCmdType(query_desc->operation)); +static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, TimestampTz start_ts, + uint64 parent_query_id, int64 cpu_user_us, int64 cpu_sys_us) { + InitBaseEvent(event, start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; event->cpu_user_time_us = cpu_user_us; @@ -432,7 +453,7 @@ static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int CopyIoTiming(event, &query_desc->totaltime->bufusage); CopyWalUsage(event, &query_desc->totaltime->walusage); } else { - event->duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); + event->duration_us = (uint64)(GetCurrentTimestamp() - start_ts); } CopyJitInstrumentation(event, query_desc); @@ -498,16 +519,17 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Record if this is a top-level query (before nesting_level changes in Run) - if (nesting_level == 0) { - current_query_is_top_level = true; - query_start_ts = GetCurrentTimestamp(); - // Capture CPU time baseline for top-level queries + // Push our frame at the current depth, before ExecutorRun bumps nesting_level. + // Each frame holds its own rusage baseline so nested CPU deltas don't ride on + // the outer query's baseline. The queryid is what nested queries will read + // back as their parent_query_id. + if (nesting_level < PSCH_MAX_NESTING_DEPTH) { + PschQueryFrame* frame = &query_stack[nesting_level]; + frame->queryid = query_desc->plannedstmt->queryId; + frame->query_start_ts = GetCurrentTimestamp(); if (psch_enabled) { - getrusage(RUSAGE_SELF, &rusage_start); + getrusage(RUSAGE_SELF, &frame->rusage_start); } - } else { - current_query_is_top_level = false; } if (prev_executor_start != NULL) { @@ -612,6 +634,12 @@ static void PschExecutorEnd(QueryDesc* query_desc) { InstrEndLoop(query_desc->totaltime); } + // Pull our frame back out — same slot ExecutorStart wrote. Null only if + // depth exceeded the cap; in that case start_ts and CPU delta are zero. + const PschQueryFrame* frame = + (nesting_level < PSCH_MAX_NESTING_DEPTH) ? &query_stack[nesting_level] : NULL; + TimestampTz start_ts = frame ? frame->query_start_ts : 0; + // Compute duration early for sampling filter uint64 duration_us; if (query_desc->totaltime != NULL) { @@ -621,7 +649,7 @@ static void PschExecutorEnd(QueryDesc* query_desc) { duration_us = (uint64)(query_desc->totaltime->total * 1000000.0); #endif } else { - duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); + duration_us = (uint64)(GetCurrentTimestamp() - start_ts); } if (!ShouldSampleEvent(duration_us)) { @@ -637,13 +665,14 @@ static void PschExecutorEnd(QueryDesc* query_desc) { int64 cpu_user_us = 0; int64 cpu_sys_us = 0; struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, rusage_start.ru_stime); + if (frame != NULL && getrusage(RUSAGE_SELF, &rusage_end) == 0) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); } PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, cpu_user_us, cpu_sys_us); + BuildEventFromQueryDesc(query_desc, &event, start_ts, GetParentQueryId(), cpu_user_us, + cpu_sys_us); PschEnqueueEvent(&event); if (prev_executor_end != NULL) { @@ -655,10 +684,10 @@ static void PschExecutorEnd(QueryDesc* query_desc) { // Build a PschEvent for utility statements (no QueryDesc available) static void BuildEventForUtility(PschEvent* event, uint64 query_id, TimestampTz start_ts, - uint64 duration_us, bool is_top_level, uint64 rows, + uint64 duration_us, uint64 parent_query_id, uint64 rows, BufferUsage* bufusage, WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) { - InitBaseEvent(event, start_ts, is_top_level, PSCH_CMD_UTILITY); + InitBaseEvent(event, start_ts, parent_query_id, PSCH_CMD_UTILITY); event->queryid = query_id; event->duration_us = duration_us; event->rows = rows; @@ -760,8 +789,16 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, return; } + // Capture parent before pushing our own frame. + uint64 parent_query_id = GetParentQueryId(); + + // Push our frame so any executor hooks fired from within this utility (e.g. + // the SELECT inside CREATE TABLE AS) read us as their parent_query_id. + if (nesting_level < PSCH_MAX_NESTING_DEPTH) { + query_stack[nesting_level].queryid = pstmt->queryId; + } + // Capture state before execution - bool is_top_level = (nesting_level == 0); TimestampTz start_ts = GetCurrentTimestamp(); uint64 query_id = pstmt->queryId; BufferUsage bufusage_start = pgBufferUsage; @@ -802,7 +839,7 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, } PschEvent event; - BuildEventForUtility(&event, query_id, start_ts, duration_us, is_top_level, + BuildEventForUtility(&event, query_id, start_ts, duration_us, parent_query_id, GetUtilityRowCount(qc), &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); @@ -850,7 +887,7 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN); + InitBaseEvent(&event, GetCurrentTimestamp(), GetParentQueryId(), PSCH_CMD_UNKNOWN); UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); event.err_elevel = (uint8)(edata->elevel); diff --git a/src/queue/event.h b/src/queue/event.h index a617915..e3181a5 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -103,9 +103,9 @@ typedef struct PschEvent { char username[64]; // User name (NAMEDATALEN=64, resolved at capture) uint8 username_len; // Actual length int32 pid; // Backend process ID - uint64 queryid; // Query ID (from pg_stat_statements) - bool top_level; // True if this is a top-level query - PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) + uint64 queryid; // Query ID (from pg_stat_statements) + uint64 parent_query_id; // queryid of the calling query (0 if top-level) + PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) // Results uint64 rows; // Number of rows affected/returned diff --git a/src/queue/ring_entry.h b/src/queue/ring_entry.h index faf1c4c..c3ce6dd 100644 --- a/src/queue/ring_entry.h +++ b/src/queue/ring_entry.h @@ -39,7 +39,7 @@ typedef struct PschRingEntry { uint8 username_len; int32 pid; uint64 queryid; - bool top_level; + uint64 parent_query_id; PschCmdType cmd_type; // === Results === From 16510f4dcab9652281b34c6d9811243b7d08f6f1 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 13 May 2026 12:47:31 -0400 Subject: [PATCH 02/10] style: re-align identity-block trailing comments clang-format wants the column-aligned trailing comments above parent_query_id to share its (wider) alignment column. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/queue/event.h | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/queue/event.h b/src/queue/event.h index e3181a5..830fb9f 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -96,13 +96,13 @@ typedef struct PschEvent { uint64 duration_us; // Execution duration in microseconds // Identity - Oid dbid; // Database OID - Oid userid; // User OID - char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) - uint8 datname_len; // Actual length - char username[64]; // User name (NAMEDATALEN=64, resolved at capture) - uint8 username_len; // Actual length - int32 pid; // Backend process ID + Oid dbid; // Database OID + Oid userid; // User OID + char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) + uint8 datname_len; // Actual length + char username[64]; // User name (NAMEDATALEN=64, resolved at capture) + uint8 username_len; // Actual length + int32 pid; // Backend process ID uint64 queryid; // Query ID (from pg_stat_statements) uint64 parent_query_id; // queryid of the calling query (0 if top-level) PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) From b9f4e7de94916b9990dd1c065a792f90fd44f428 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 13 May 2026 12:53:55 -0400 Subject: [PATCH 03/10] fix: use Int64 for parent_query_id, guard NULL-frame skew Copilot review on #95: 1. parent_query_id was UInt64 while query_id is Int64; comparisons/joins between the two would require explicit casts in ClickHouse. Switch the ClickHouse column, migration, and exporter to Int64. Keep the in-memory struct field as uint64 (matches PG's queryId type) and cast at append time the same way query_id already does. 2. At nesting_level >= PSCH_MAX_NESTING_DEPTH, the PschExecutorEnd frame pointer is NULL, leaving start_ts at 0 (the PG epoch). Any path that computed GetCurrentTimestamp() - start_ts would yield ~25 years of us. Fall back to GetCurrentTimestamp() so deltas are ~0 instead. In practice query_desc->totaltime supplies a real duration via instrumentation, so the fallback subtraction is only used when totaltime wasn't allocated. Co-Authored-By: Claude Opus 4.7 (1M context) --- docker/init/00-schema.sql | 2 +- migrations/001_add_parent_query_id.sql | 2 +- src/export/stats_exporter.cc | 4 ++-- src/hooks/hooks.c | 8 ++++++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index c353915..542a5f2 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -53,7 +53,7 @@ CREATE TABLE pg_stat_ch.events_raw query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', - parent_query_id UInt64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration.', + parent_query_id Int64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration. Signedness matches query_id so the two columns compare/join without explicit casts.', cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', diff --git a/migrations/001_add_parent_query_id.sql b/migrations/001_add_parent_query_id.sql index 3b49bf7..6ebb46e 100644 --- a/migrations/001_add_parent_query_id.sql +++ b/migrations/001_add_parent_query_id.sql @@ -9,4 +9,4 @@ -- clickhouse-client < migrations/001_add_parent_query_id.sql ALTER TABLE pg_stat_ch.events_raw - ADD COLUMN IF NOT EXISTS parent_query_id UInt64 DEFAULT 0; + ADD COLUMN IF NOT EXISTS parent_query_id Int64 DEFAULT 0; diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index b2d2270..9d99a3a 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -241,7 +241,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte auto col_username = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); - auto col_parent_query_id = exporter->RecordUInt64("parent_query_id"); + auto col_parent_query_id = exporter->RecordInt64("parent_query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); @@ -297,7 +297,7 @@ void ExportEventStatsInternal(const std::vector& events, StatsExporte col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); - col_parent_query_id->Append(ev.parent_query_id); + col_parent_query_id->Append(static_cast(ev.parent_query_id)); col_cmd_type->Append(PschCmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index 8ff75e8..929190f 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -635,10 +635,14 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } // Pull our frame back out — same slot ExecutorStart wrote. Null only if - // depth exceeded the cap; in that case start_ts and CPU delta are zero. + // depth exceeded the cap; in that case CPU delta stays zero and start_ts + // falls back to "now" so any `GetCurrentTimestamp() - start_ts` path yields + // ~0us rather than ~25 years of µs from subtracting 0 (the PG epoch). In + // practice query_desc->totaltime supplies a real duration from instrumentation, + // so the fallback subtraction is only used when totaltime wasn't allocated. const PschQueryFrame* frame = (nesting_level < PSCH_MAX_NESTING_DEPTH) ? &query_stack[nesting_level] : NULL; - TimestampTz start_ts = frame ? frame->query_start_ts : 0; + TimestampTz start_ts = frame ? frame->query_start_ts : GetCurrentTimestamp(); // Compute duration early for sampling filter uint64 duration_us; From 98ca62ff5a4c0036d6dabef464b60833688b0765 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 13 May 2026 14:11:59 -0400 Subject: [PATCH 04/10] test: TAP test for parent_query_id linkage and log-event off-by-one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 028_parent_query_id.pl exercises the three semantic invariants by inspecting actual exported rows in ClickHouse: 1. Top-level queries report parent_query_id = 0. 2. Nested SPI queries report parent_query_id = outer caller's query_id (verified by self-join on the events_raw table). 3. A log/error event captured during nested execution reports parent_query_id = outer caller (not the running query itself) and a non-zero query_id (the running statement). This catches the CaptureLogEvent off-by-one: nesting_level is bumped inside ExecutorRun, so slot nesting_level - 1 holds the running query and its caller lives at nesting_level - 2. Filters key off distinctive table/function names rather than constants or comments — those survive query normalization, where literals would be replaced with $N placeholders. Co-Authored-By: Claude Opus 4.7 (1M context) --- t/028_parent_query_id.pl | 198 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 t/028_parent_query_id.pl diff --git a/t/028_parent_query_id.pl b/t/028_parent_query_id.pl new file mode 100644 index 0000000..80c8ff4 --- /dev/null +++ b/t/028_parent_query_id.pl @@ -0,0 +1,198 @@ +#!/usr/bin/env perl +# Parent query id linkage: +# - top-level queries report parent_query_id = 0 +# - nested SPI queries report parent_query_id = outer's query_id +# - log/error events report queryid = the running query and parent_query_id +# = its caller (catches the CaptureLogEvent off-by-one where reading slot +# nesting_level - 1 returns the *running* query, not its parent) +# +# Filters key off distinctive table/function names — these survive query +# normalization, where string/numeric literals get replaced with $N +# placeholders and would not. + +use strict; +use warnings; +use lib 't'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +if (!psch_clickhouse_available()) { + plan skip_all => 'Docker not available, skipping ClickHouse tests'; +} + +my $ch_check = `curl -s 'http://localhost:18123/' --data 'SELECT 1' 2>/dev/null`; +if ($ch_check !~ /^1/) { + plan skip_all => 'ClickHouse container not running. Start with: docker compose -f docker/docker-compose.test.yml up -d'; +} + +psch_query_clickhouse("TRUNCATE TABLE IF EXISTS pg_stat_ch.events_raw"); + +my $node = psch_init_node_with_clickhouse('parent_qid', + flush_interval_ms => 100, + batch_max => 100, +); + +# A dedicated table whose name appears verbatim in normalized query text. +$node->safe_psql('postgres', 'CREATE TABLE pqid_top_marker(x int)'); + +# Test 1: Top-level queries report parent_query_id = 0 +subtest 'top-level parent_query_id is 0' => sub { + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT * FROM pqid_top_marker'); + $node->safe_psql('postgres', 'SELECT count(*) FROM pqid_top_marker'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + my $matches = psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw WHERE query LIKE '%pqid_top_marker%'", + sub { $_[0] >= 2 }, + 10, + ); + cmp_ok($matches, '>=', 2, 'top-level marker rows landed'); + + my $nonzero_parents = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_top_marker%' AND parent_query_id != 0" + ); + is($nonzero_parents, '0', 'top-level queries report parent_query_id = 0'); +}; + +# Test 2: Nested SPI queries report parent_query_id matching the outer's queryid +subtest 'nested SPI parent_query_id links to outer' => sub { + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', q{ + CREATE TABLE pqid_inner_marker(x int); + INSERT INTO pqid_inner_marker VALUES (1); + CREATE FUNCTION pqid_outer_caller() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT x INTO v FROM pqid_inner_marker; + RETURN v; + END$$; + }); + + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT pqid_outer_caller()'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # Wait for both rows. + psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_outer_caller%' OR query LIKE '%pqid_inner_marker%'", + sub { $_[0] >= 2 }, + 10, + ); + + # Self-join: inner row's parent_query_id must equal outer row's query_id. + my $linked = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw inner_q + JOIN pg_stat_ch.events_raw outer_q + ON inner_q.parent_query_id = outer_q.query_id + WHERE inner_q.query LIKE '%pqid_inner_marker%' + AND outer_q.query LIKE '%pqid_outer_caller%' + }); + cmp_ok($linked, '>=', 1, 'nested SPI parent_query_id matches outer query_id'); + + my $orphan = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_inner_marker%' AND parent_query_id = 0" + ); + is($orphan, '0', 'nested SPI query is not reported as top-level'); + + my $outer_self = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE query LIKE '%pqid_outer_caller%' AND parent_query_id != 0" + ); + is($outer_self, '0', 'outer call still reports parent_query_id = 0'); +}; + +# Test 3: Error event captured inside a nested SPI query. +# queryid = the running (nested) query's id +# parent_query_id = the outer caller's id +# Catches the CaptureLogEvent off-by-one: reading slot nesting_level - 1 +# returns the running query (itself), not its caller. +subtest 'error inside nested SPI links queryid -> outer' => sub { + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', q{ + CREATE TABLE pqid_err_inner(x int); + INSERT INTO pqid_err_inner VALUES (0); + CREATE FUNCTION pqid_err_outer() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + BEGIN + SELECT 1 / x INTO v FROM pqid_err_inner; + EXCEPTION WHEN division_by_zero THEN + NULL; + END; + RETURN 1; + END$$; + }); + + psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); + psch_reset_stats($node); + + $node->safe_psql('postgres', 'SELECT pqid_err_outer()'); + $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + + # The error-aborts longjmp means the inner SELECT never reaches its + # ExecutorEnd, so we won't find a query-text row for it. The error event + # itself (cmd_type=UNKNOWN, err_sqlstate=22012 for division_by_zero) is + # what carries the linkage signal. + psch_wait_for_clickhouse_query( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '22012'", + sub { $_[0] >= 1 }, + 10, + ); + + # Before the off-by-one fix, parent_query_id of a log event captured during + # nested execution was the running query's *own* id (slot nesting_level - 1), + # which would not match the outer caller's query_id. After the fix it + # correctly points to the outer (slot nesting_level - 2). + my $err_to_outer = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw err_q + JOIN pg_stat_ch.events_raw outer_q + ON err_q.parent_query_id = outer_q.query_id + WHERE err_q.cmd_type = 'UNKNOWN' + AND err_q.err_sqlstate = '22012' + AND outer_q.query LIKE '%pqid_err_outer%' + }); + cmp_ok($err_to_outer, '>=', 1, + 'div-by-zero log event: parent_query_id = outer caller'); + + # Before the fix, queryid was always 0 on log events. After the fix it is + # the currently-running (nested) statement's id, which we don't have a + # textual handle on, but it must at least be non-zero. + my $err_qid_nonzero = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '22012' AND query_id != 0" + ); + cmp_ok($err_qid_nonzero, '>=', 1, + 'div-by-zero log event: query_id is the running statement (non-zero)'); + + # And those two must NOT be equal — running query is the child, parent + # is its caller. + my $running_vs_parent = psch_query_clickhouse( + "SELECT count() FROM pg_stat_ch.events_raw " + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '22012' " + . " AND query_id = parent_query_id AND query_id != 0" + ); + is($running_vs_parent, '0', + 'log event query_id and parent_query_id are distinct (no self-parent)'); +}; + +$node->stop(); +done_testing(); From b82cc74bb294c444e3f003c8410133dbc49114fa Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 13 May 2026 16:02:50 -0400 Subject: [PATCH 05/10] fix: bind nesting_level to frame push/pop; store parent in frame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CaptureLogEvent off-by-one came from nesting_level moving in places where the frame stack didn't — Run/Finish bumped/decremented it around the body, while the frame slot was written in Start and only read in End. Inside the body (where CaptureLogEvent fires) nesting_level was one higher than it was at End, so the same "slot[nesting_level - 1]" expression meant different things in each place. Bind the two together: nesting_level moves exactly where a frame is pushed or popped, and parent_query_id is captured into the frame at push time. Now slot[nesting_level - 1] is the currently-active query at every emit site — End, ProcessUtility, CaptureLogEvent — and parent is read directly from frame->parent_query_id without offset arithmetic. Mechanics: - PushQueryFrame in PschExecutorStart bumps nesting_level after writing the slot. PopQueryFrame in PschExecutorEnd decrements. - PschExecutorRun and PschExecutorFinish no longer touch nesting_level on the success path; each just wraps its chain in PG_TRY/PG_CATCH so a longjmp out of the body pops the frame on the unwind and keeps depth balanced. Each PG_CATCH on the unwind path decrements once per level — three deep nesting with an error at the bottom cascades through three PG_CATCH blocks for three pops, exactly matching three pushes. No subxact callback needed. - PschProcessUtility brackets its body with PG_TRY/PG_FINALLY in the same function, so push at the top and PopQueryFrame in PG_FINALLY balance regardless of how the body exits. ExecuteUtilityWithNesting is gone. - CaptureLogEvent now reads TopQueryFrame() and uses both its queryid (the running query, attached as event.queryid for attribution) and its parent_query_id (the running query's caller, attached as event.parent_query_id with strict semantics — previously this slot was misread as the running query's own id). PeekQueryStack and its offset parameter are gone. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/hooks/hooks.c | 232 ++++++++++++++++++++++++++++------------------ 1 file changed, 143 insertions(+), 89 deletions(-) diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index 929190f..5dc1245 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -48,29 +48,31 @@ static ExecutorEnd_hook_type prev_executor_end = NULL; static ProcessUtility_hook_type prev_process_utility = NULL; static emit_log_hook_type prev_emit_log_hook = NULL; -// Track nesting level to identify top-level queries -static int nesting_level = 0; - -// Per-query frame indexed by nesting_level: each query gets its own rusage -// baseline and start timestamp, and its queryid is stashed so a nested SPI -// call can read its parent. Replaces three single statics whose lifetimes -// collapsed under SPI: the outer's rusage baseline was overwritten by the -// inner's getrusage(), so the outer's CPU delta ended up measuring just the -// nested portion. +// nesting_level is the current depth of pushed frames: it moves only where +// frames move (PschExecutorStart++ / PschExecutorEnd--, and the same pair +// inside PschProcessUtility around its body). Run/Finish do not touch it. +// Tying push to the same point as the increment means every reader knows +// exactly what each slot represents — no offset reinterpretation by phase. +// +// parent_query_id is captured into the frame at push time, so any reader +// fetches frame->parent_query_id directly regardless of whether it's at an +// emit hook or inside the body via CaptureLogEvent. The top of the stack +// is uniformly slot[nesting_level - 1]. // // PSCH_MAX_NESTING_DEPTH is a small fixed cap. Frames at greater depth are // not written; the corresponding emit at that depth falls back to zero CPU // and parent_query_id 0. PL/pgSQL recursion is capped well below 16 by -// PostgreSQL itself (max_stack_depth typically yields a much shallower -// effective limit), so the cap is defensive against runaway recursive -// functions rather than a normal-case constraint. +// PostgreSQL's max_stack_depth, so the cap is defensive against runaway +// recursion rather than a normal-case constraint. #define PSCH_MAX_NESTING_DEPTH 16 typedef struct PschQueryFrame { uint64 queryid; + uint64 parent_query_id; // captured from the previous top at push time struct rusage rusage_start; TimestampTz query_start_ts; } PschQueryFrame; static PschQueryFrame query_stack[PSCH_MAX_NESTING_DEPTH]; +static int nesting_level = 0; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; @@ -335,14 +337,48 @@ static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_ ResolveNames(event); } -// Return the queryid of the query at depth `nesting_level - 1` — i.e., the -// caller of whatever is currently emitting. 0 at top level or when depth -// exceeded PSCH_MAX_NESTING_DEPTH (no frame was written for that slot). -static uint64 GetParentQueryId(void) { +// Push a frame for the about-to-execute query, returning the frame pointer +// (NULL if depth has exceeded the cap — we still bump nesting_level so the +// matching pop balances). parent_query_id is captured here from the +// previous top, so every later reader fetches it directly from the frame +// without reinterpreting slot offsets. +static PschQueryFrame* PushQueryFrame(uint64 queryid) { + uint64 parent = (nesting_level > 0 && nesting_level <= PSCH_MAX_NESTING_DEPTH) + ? query_stack[nesting_level - 1].queryid + : 0; + PschQueryFrame* frame = NULL; + if (nesting_level < PSCH_MAX_NESTING_DEPTH) { + frame = &query_stack[nesting_level]; + frame->queryid = queryid; + frame->parent_query_id = parent; + frame->query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &frame->rusage_start); + } + nesting_level++; + return frame; +} + +// Return a pointer to the top frame (the currently-active query) without +// changing depth. NULL if the stack is empty or depth is past the cap. +static const PschQueryFrame* TopQueryFrame(void) { if (nesting_level <= 0 || nesting_level > PSCH_MAX_NESTING_DEPTH) { - return 0; + return NULL; } - return query_stack[nesting_level - 1].queryid; + return &query_stack[nesting_level - 1]; +} + +// Pop the top frame and return its still-valid data (slots are never zeroed +// — next push at the same depth overwrites). NULL if the stack was empty +// or its matching push had been past the cap. +static const PschQueryFrame* PopQueryFrame(void) { + if (nesting_level <= 0) { + return NULL; + } + nesting_level--; + if (nesting_level >= PSCH_MAX_NESTING_DEPTH) { + return NULL; + } + return &query_stack[nesting_level]; } static void CopyClientContext(PschEvent* event) { @@ -519,24 +555,27 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Push our frame at the current depth, before ExecutorRun bumps nesting_level. - // Each frame holds its own rusage baseline so nested CPU deltas don't ride on - // the outer query's baseline. The queryid is what nested queries will read - // back as their parent_query_id. - if (nesting_level < PSCH_MAX_NESTING_DEPTH) { - PschQueryFrame* frame = &query_stack[nesting_level]; - frame->queryid = query_desc->plannedstmt->queryId; - frame->query_start_ts = GetCurrentTimestamp(); - if (psch_enabled) { - getrusage(RUSAGE_SELF, &frame->rusage_start); + // Push our frame. The matching pop is in PschExecutorEnd. If the body + // longjmps before End fires, the PG_CATCH below — and the same shape in + // Run/Finish — pops nesting_level on the unwind path so depth stays + // balanced. See the comment at the top of the file for why push and + // increment are tied together. + PushQueryFrame(query_desc->plannedstmt->queryId); + + PG_TRY(); + { + if (prev_executor_start != NULL) { + prev_executor_start(query_desc, eflags); + } else { + standard_ExecutorStart(query_desc, eflags); } } - - if (prev_executor_start != NULL) { - prev_executor_start(query_desc, eflags); - } else { - standard_ExecutorStart(query_desc, eflags); + PG_CATCH(); + { + PopQueryFrame(); + PG_RE_THROW(); } + PG_END_TRY(); if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { if (query_desc->totaltime == NULL) { @@ -551,6 +590,10 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { } } +// Run does no nesting_level bookkeeping of its own — the frame was pushed +// in Start and will be popped in End on success. We only wrap the chain +// in PG_TRY/PG_CATCH so that if the body longjmps (the common error path), +// we pop nesting_level on the unwind and stay balanced. #if PG_VERSION_NUM >= 180000 static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count) { #else @@ -558,6 +601,7 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint bool execute_once) { #endif if (IsParallelWorker()) { + // Parallel workers never pushed, so they don't pop and don't need PG_CATCH. #if PG_VERSION_NUM >= 180000 if (prev_executor_run != NULL) { prev_executor_run(query_desc, direction, count); @@ -574,7 +618,6 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint return; } - nesting_level++; PG_TRY(); { #if PG_VERSION_NUM >= 180000 @@ -591,8 +634,11 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint } #endif } - PG_FINALLY(); - { nesting_level--; } + PG_CATCH(); + { + PopQueryFrame(); + PG_RE_THROW(); + } PG_END_TRY(); } @@ -606,7 +652,8 @@ static void PschExecutorFinish(QueryDesc* query_desc) { return; } - nesting_level++; + // Finish does no nesting_level bookkeeping of its own; only PG_CATCH-pop + // on error so the unwind stays balanced. PG_TRY(); { if (prev_executor_finish != NULL) { @@ -615,13 +662,32 @@ static void PschExecutorFinish(QueryDesc* query_desc) { standard_ExecutorFinish(query_desc); } } - PG_FINALLY(); - { nesting_level--; } + PG_CATCH(); + { + PopQueryFrame(); + PG_RE_THROW(); + } PG_END_TRY(); } static void PschExecutorEnd(QueryDesc* query_desc) { - if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) { + if (IsParallelWorker()) { + // Parallel workers never pushed in Start, so don't pop here. + if (prev_executor_end != NULL) { + prev_executor_end(query_desc); + } else { + standard_ExecutorEnd(query_desc); + } + return; + } + + // Pop the frame Start pushed. Slot data remains valid through the + // returned pointer (slots are never zeroed), so we keep reading from it + // through the rest of this function. Null only if the matching push + // had been past the cap. + const PschQueryFrame* frame = PopQueryFrame(); + + if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { @@ -634,17 +700,14 @@ static void PschExecutorEnd(QueryDesc* query_desc) { InstrEndLoop(query_desc->totaltime); } - // Pull our frame back out — same slot ExecutorStart wrote. Null only if - // depth exceeded the cap; in that case CPU delta stays zero and start_ts - // falls back to "now" so any `GetCurrentTimestamp() - start_ts` path yields - // ~0us rather than ~25 years of µs from subtracting 0 (the PG epoch). In - // practice query_desc->totaltime supplies a real duration from instrumentation, - // so the fallback subtraction is only used when totaltime wasn't allocated. - const PschQueryFrame* frame = - (nesting_level < PSCH_MAX_NESTING_DEPTH) ? &query_stack[nesting_level] : NULL; + // start_ts falls back to "now" so the duration computation below yields + // ~0us if the frame is null, rather than ~25 years from subtracting the + // PG epoch. query_desc->totaltime almost always supplies a real + // duration from instrumentation; the fallback subtraction is only used + // when totaltime wasn't allocated. TimestampTz start_ts = frame ? frame->query_start_ts : GetCurrentTimestamp(); + uint64 parent_query_id = frame ? frame->parent_query_id : 0; - // Compute duration early for sampling filter uint64 duration_us; if (query_desc->totaltime != NULL) { #if PG_VERSION_NUM >= 190000 @@ -665,7 +728,6 @@ static void PschExecutorEnd(QueryDesc* query_desc) { return; } - // Compute CPU time delta from getrusage int64 cpu_user_us = 0; int64 cpu_sys_us = 0; struct rusage rusage_end; @@ -675,8 +737,7 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, start_ts, GetParentQueryId(), cpu_user_us, - cpu_sys_us); + BuildEventFromQueryDesc(query_desc, &event, start_ts, parent_query_id, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); if (prev_executor_end != NULL) { @@ -761,21 +822,6 @@ static uint64 GetUtilityRowCount(QueryCompletion* qc) { } } -static void ExecuteUtilityWithNesting(PlannedStmt* pstmt, const char* queryString, -#if PG_VERSION_NUM >= 140000 - bool readOnlyTree, -#endif - ProcessUtilityContext context, ParamListInfo params, - QueryEnvironment* queryEnv, DestReceiver* dest, - QueryCompletion* qc) { - nesting_level++; - PG_TRY(); - { CALL_PROCESS_UTILITY(); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} - // ProcessUtility hook - captures DDL and utility statements #if PG_VERSION_NUM >= 140000 static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, bool readOnlyTree, @@ -793,30 +839,24 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, return; } - // Capture parent before pushing our own frame. - uint64 parent_query_id = GetParentQueryId(); + // Push our frame so any executor hooks fired from within this utility + // (e.g. the SELECT inside CREATE TABLE AS) read us as their parent. + // PushQueryFrame captures the rusage baseline and start timestamp into + // the frame; we use those for the emit below. The PG_TRY/PG_FINALLY + // around the body guarantees the pop on both success and longjmp. + const PschQueryFrame* frame = PushQueryFrame(pstmt->queryId); - // Push our frame so any executor hooks fired from within this utility (e.g. - // the SELECT inside CREATE TABLE AS) read us as their parent_query_id. - if (nesting_level < PSCH_MAX_NESTING_DEPTH) { - query_stack[nesting_level].queryid = pstmt->queryId; - } - - // Capture state before execution - TimestampTz start_ts = GetCurrentTimestamp(); uint64 query_id = pstmt->queryId; BufferUsage bufusage_start = pgBufferUsage; WalUsage walusage_start = pgWalUsage; - struct rusage rusage_util_start; - getrusage(RUSAGE_SELF, &rusage_util_start); instr_time start_time; INSTR_TIME_SET_CURRENT(start_time); -#if PG_VERSION_NUM >= 140000 - ExecuteUtilityWithNesting(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); -#else - ExecuteUtilityWithNesting(pstmt, queryString, context, params, queryEnv, dest, qc); -#endif + PG_TRY(); + { CALL_PROCESS_UTILITY(); } + PG_FINALLY(); + { PopQueryFrame(); } + PG_END_TRY(); instr_time duration; INSTR_TIME_SET_CURRENT(duration); @@ -836,12 +876,15 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, int64 cpu_user_us = 0; int64 cpu_sys_us = 0; - struct rusage rusage_util_end; - if (getrusage(RUSAGE_SELF, &rusage_util_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_util_end.ru_utime, rusage_util_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_util_end.ru_stime, rusage_util_start.ru_stime); + struct rusage rusage_end; + if (frame != NULL && getrusage(RUSAGE_SELF, &rusage_end) == 0) { + cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); } + TimestampTz start_ts = frame ? frame->query_start_ts : 0; + uint64 parent_query_id = frame ? frame->parent_query_id : 0; + PschEvent event; BuildEventForUtility(&event, query_id, start_ts, duration_us, parent_query_id, GetUtilityRowCount(qc), &bufusage_delta, &walusage_delta, cpu_user_us, @@ -891,7 +934,18 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - InitBaseEvent(&event, GetCurrentTimestamp(), GetParentQueryId(), PSCH_CMD_UNKNOWN); + + // Top of the stack is whatever query is currently executing — Start + // pushed it, End/ProcessUtility haven't popped yet because we're still + // inside the body that fired this elog. Attribute this log event to the + // running query (queryid) and inherit its parent_query_id, captured at + // push time. + const PschQueryFrame* top = TopQueryFrame(); + uint64 running_query_id = top ? top->queryid : 0; + uint64 parent_query_id = top ? top->parent_query_id : 0; + + InitBaseEvent(&event, GetCurrentTimestamp(), parent_query_id, PSCH_CMD_UNKNOWN); + event.queryid = running_query_id; UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); event.err_elevel = (uint8)(edata->elevel); From 534abc999505ee967a29bd77f4081f5704f1a37a Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Wed, 13 May 2026 19:14:04 -0400 Subject: [PATCH 06/10] fix: extend Start's PG_TRY to cover InstrAlloc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The totaltime allocation runs after standard_ExecutorStart returns, between our PG_END_TRY and the close of PschExecutorStart. InstrAlloc goes through MemoryContextAllocZero, which can ereport(ERROR) on OOM — that longjmp would unwind past our PG_END_TRY without firing the PG_CATCH, leaving the pushed frame unpopped. Move the allocation inside the existing PG_TRY block so the same PG_CATCH cleans up the frame. No behavior change on the happy path; new robustness against any future code added between the push and the function's return. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/hooks/hooks.c | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index 5dc1245..304afbe 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -555,11 +555,10 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Push our frame. The matching pop is in PschExecutorEnd. If the body - // longjmps before End fires, the PG_CATCH below — and the same shape in - // Run/Finish — pops nesting_level on the unwind path so depth stays - // balanced. See the comment at the top of the file for why push and - // increment are tied together. + // Push our frame. The matching pop is in PschExecutorEnd. The PG_TRY + // wraps everything between push and function-return so any longjmp + // before End fires — including from InstrAlloc OOM — pops the frame on + // the unwind. Run/Finish use the same shape for the body. PushQueryFrame(query_desc->plannedstmt->queryId); PG_TRY(); @@ -569,6 +568,18 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { } else { standard_ExecutorStart(query_desc, eflags); } + + if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { + if (query_desc->totaltime == NULL) { + MemoryContext oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); +#if PG_VERSION_NUM < 140000 + query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); +#else + query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); +#endif + MemoryContextSwitchTo(oldcxt); + } + } } PG_CATCH(); { @@ -576,18 +587,6 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { PG_RE_THROW(); } PG_END_TRY(); - - if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { - if (query_desc->totaltime == NULL) { - MemoryContext oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); -#if PG_VERSION_NUM < 140000 - query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); -#else - query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL, false); -#endif - MemoryContextSwitchTo(oldcxt); - } - } } // Run does no nesting_level bookkeeping of its own — the frame was pushed From 92edd0c405d153b0ca457ec8f69af870446f9f3e Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 14 May 2026 10:31:06 -0400 Subject: [PATCH 07/10] refactor: use nesting_level = -1 as the no-frame resting state The push had been doing more conditional work than necessary to figure out whether a parent existed. Convert nesting_level so that: - -1 is the resting state (no active query), - n in [0, PSCH_MAX_NESTING_DEPTH-1] is the slot index of the currently-active frame on top of the stack, - >= PSCH_MAX_NESTING_DEPTH is the overflow region. Push now increments first and bails on overflow, so the cap check collapses to a single comparison. The parent_query_id lookup still needs one conditional ("do we have a parent at all"), but the previous double-condition (positive AND within cap) is gone. TopQueryFrame / PopQueryFrame benefit too: < 0 doubles as a nullity check. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/hooks/hooks.c | 50 +++++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index 304afbe..cbcbf89 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -48,16 +48,22 @@ static ExecutorEnd_hook_type prev_executor_end = NULL; static ProcessUtility_hook_type prev_process_utility = NULL; static emit_log_hook_type prev_emit_log_hook = NULL; -// nesting_level is the current depth of pushed frames: it moves only where -// frames move (PschExecutorStart++ / PschExecutorEnd--, and the same pair -// inside PschProcessUtility around its body). Run/Finish do not touch it. -// Tying push to the same point as the increment means every reader knows -// exactly what each slot represents — no offset reinterpretation by phase. +// nesting_level is the slot index of the currently-active frame: it moves +// only where frames move (PschExecutorStart++ / PschExecutorEnd--, and the +// same pair inside PschProcessUtility around its body). Run/Finish do not +// touch it. Tying push to the same point as the increment means every +// reader knows exactly what each slot represents — no offset +// reinterpretation by phase. +// +// nesting_level == -1 is the resting state (no query active). Push +// increments before writing; pop reads before decrementing. A negative +// nesting_level doubles as the "no active query" check used by +// TopQueryFrame/PopQueryFrame. // // parent_query_id is captured into the frame at push time, so any reader // fetches frame->parent_query_id directly regardless of whether it's at an // emit hook or inside the body via CaptureLogEvent. The top of the stack -// is uniformly slot[nesting_level - 1]. +// is uniformly slot[nesting_level]. // // PSCH_MAX_NESTING_DEPTH is a small fixed cap. Frames at greater depth are // not written; the corresponding emit at that depth falls back to zero CPU @@ -72,7 +78,7 @@ typedef struct PschQueryFrame { TimestampTz query_start_ts; } PschQueryFrame; static PschQueryFrame query_stack[PSCH_MAX_NESTING_DEPTH]; -static int nesting_level = 0; +static int nesting_level = -1; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; @@ -343,42 +349,40 @@ static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_ // previous top, so every later reader fetches it directly from the frame // without reinterpreting slot offsets. static PschQueryFrame* PushQueryFrame(uint64 queryid) { - uint64 parent = (nesting_level > 0 && nesting_level <= PSCH_MAX_NESTING_DEPTH) - ? query_stack[nesting_level - 1].queryid - : 0; - PschQueryFrame* frame = NULL; - if (nesting_level < PSCH_MAX_NESTING_DEPTH) { - frame = &query_stack[nesting_level]; - frame->queryid = queryid; - frame->parent_query_id = parent; - frame->query_start_ts = GetCurrentTimestamp(); - getrusage(RUSAGE_SELF, &frame->rusage_start); - } nesting_level++; + if (nesting_level >= PSCH_MAX_NESTING_DEPTH) { + return NULL; + } + PschQueryFrame* frame = &query_stack[nesting_level]; + frame->queryid = queryid; + frame->parent_query_id = (nesting_level > 0) ? query_stack[nesting_level - 1].queryid : 0; + frame->query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &frame->rusage_start); return frame; } // Return a pointer to the top frame (the currently-active query) without // changing depth. NULL if the stack is empty or depth is past the cap. static const PschQueryFrame* TopQueryFrame(void) { - if (nesting_level <= 0 || nesting_level > PSCH_MAX_NESTING_DEPTH) { + if (nesting_level < 0 || nesting_level >= PSCH_MAX_NESTING_DEPTH) { return NULL; } - return &query_stack[nesting_level - 1]; + return &query_stack[nesting_level]; } // Pop the top frame and return its still-valid data (slots are never zeroed // — next push at the same depth overwrites). NULL if the stack was empty // or its matching push had been past the cap. static const PschQueryFrame* PopQueryFrame(void) { - if (nesting_level <= 0) { + if (nesting_level < 0) { return NULL; } + int top = nesting_level; nesting_level--; - if (nesting_level >= PSCH_MAX_NESTING_DEPTH) { + if (top >= PSCH_MAX_NESTING_DEPTH) { return NULL; } - return &query_stack[nesting_level]; + return &query_stack[top]; } static void CopyClientContext(PschEvent* event) { From c632cdcf42822760b85efbfdb0d77835e8171478 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 14 May 2026 10:38:29 -0400 Subject: [PATCH 08/10] fix: surface depth-cap overflow and default start_ts to now MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two cleanups around the runaway-nesting case: - PushQueryFrame now emits a WARNING when it returns NULL because the cap was exceeded. Previously the overflow was silent — telemetry just went missing for the frame. WARNING is loud enough to notice if it happens in practice but doesn't fail the query. - PschProcessUtility's start_ts fallback was still 0 (PG epoch) when frame was NULL. Match PschExecutorEnd by falling back to GetCurrentTimestamp() so the emitted event carries an approximately correct ts_start rather than 1970. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/hooks/hooks.c | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/hooks/hooks.c b/src/hooks/hooks.c index cbcbf89..8619625 100644 --- a/src/hooks/hooks.c +++ b/src/hooks/hooks.c @@ -351,6 +351,14 @@ static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_ static PschQueryFrame* PushQueryFrame(uint64 queryid) { nesting_level++; if (nesting_level >= PSCH_MAX_NESTING_DEPTH) { + // Runaway nesting — log once per overflowing push so it's visible if it + // happens in practice, but stay non-fatal so the query itself still + // runs. The pop on this push is still balanced (nesting_level keeps + // incrementing past the cap). + elog(WARNING, + "pg_stat_ch: query nesting depth %d exceeds cap %d; CPU and " + "parent_query_id telemetry will be missing for this frame", + nesting_level, PSCH_MAX_NESTING_DEPTH); return NULL; } PschQueryFrame* frame = &query_stack[nesting_level]; @@ -885,7 +893,10 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); } - TimestampTz start_ts = frame ? frame->query_start_ts : 0; + // start_ts falls back to "now" so ts_start in the emitted event is at + // least approximately correct rather than the PG epoch. Matches the + // same fallback in PschExecutorEnd. + TimestampTz start_ts = frame ? frame->query_start_ts : GetCurrentTimestamp(); uint64 parent_query_id = frame ? frame->parent_query_id : 0; PschEvent event; From 28e079ef21e308c4623647c9378636700d56aec1 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 14 May 2026 16:27:05 -0400 Subject: [PATCH 09/10] fix(arrow): ship parent_query_id in the Arrow IPC schema Earlier commits on this branch added parent_query_id to PschEvent and to the ClickHouse-native exporter, plus the ClickHouse schema migration, but arrow_batch.cc keeps its own hard-coded column list and was missed. The result: the production export path (OTel + Arrow IPC) silently dropped parent_query_id on every event, so downstream consumers saw 0 across the board. Caught via the OTel/Arrow quickstart-validate harness (see PR #96). Local script flips from 4/8 passing to all-green once this lands. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/export/arrow_batch.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/export/arrow_batch.cc b/src/export/arrow_batch.cc index 75d4ebf..acb611c 100644 --- a/src/export/arrow_batch.cc +++ b/src/export/arrow_batch.cc @@ -149,6 +149,7 @@ struct ArrowBatchBuilder::Impl { arrow::StringBuilder trace_id_builder; arrow::StringBuilder span_id_builder; DictBuilder query_id_builder; + DictBuilder parent_query_id_builder; DictBuilder db_name_builder; DictBuilder db_user_builder; DictBuilder db_operation_builder; @@ -216,6 +217,7 @@ struct ArrowBatchBuilder::Impl { arrow::field("trace_id", arrow::utf8()), arrow::field("span_id", arrow::utf8()), arrow::field("query_id", DictionaryUtf8Type()), + arrow::field("parent_query_id", DictionaryUtf8Type()), arrow::field("db_name", DictionaryUtf8Type()), arrow::field("db_user", DictionaryUtf8Type()), arrow::field("db_operation", DictionaryUtf8Type()), @@ -309,10 +311,15 @@ struct ArrowBatchBuilder::Impl { char queryid_buf[24]; snprintf(queryid_buf, sizeof(queryid_buf), "%" PRIu64, static_cast(event.queryid)); + char parent_query_id_buf[24]; + snprintf(parent_query_id_buf, sizeof(parent_query_id_buf), "%" PRIu64, + static_cast(event.parent_query_id)); char pid_buf[12]; snprintf(pid_buf, sizeof(pid_buf), "%d", event.pid); if (!AppendString(&query_id_builder, queryid_buf, "Arrow query_id append") || + !AppendString(&parent_query_id_builder, parent_query_id_buf, + "Arrow parent_query_id append") || !AppendString(&db_name_builder, db_name, "Arrow db_name append") || !AppendString(&db_user_builder, db_user, "Arrow db_user append") || !AppendString(&db_operation_builder, PschCmdTypeToString(event.cmd_type), @@ -475,6 +482,7 @@ struct ArrowBatchBuilder::Impl { !add_array(&trace_id_builder, "Arrow trace_id finish") || !add_array(&span_id_builder, "Arrow span_id finish") || !add_dict_array(&query_id_builder, "Arrow query_id finish") || + !add_dict_array(&parent_query_id_builder, "Arrow parent_query_id finish") || !add_dict_array(&db_name_builder, "Arrow db_name finish") || !add_dict_array(&db_user_builder, "Arrow db_user finish") || !add_dict_array(&db_operation_builder, "Arrow db_operation finish") || @@ -581,6 +589,7 @@ struct ArrowBatchBuilder::Impl { trace_id_builder.Reset(); span_id_builder.Reset(); query_id_builder.ResetFull(); + parent_query_id_builder.ResetFull(); db_name_builder.ResetFull(); db_user_builder.ResetFull(); db_operation_builder.ResetFull(); From f52d76c6af1966ee8c1e23b2cb51cc789dd22e2d Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 14 May 2026 17:20:03 -0400 Subject: [PATCH 10/10] test: add t/029 for the Arrow/OTel path; keep t/028 on ClickHouse MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit t/028 already exercises parent_query_id linkage through the ClickHouse-native exporter. Add t/029 to exercise the same invariants through the Arrow/OTel export path (the production pathway), using pg_stat_ch.debug_arrow_dump_dir to capture each Arrow IPC batch to disk before the gRPC send (which we point at a non-existent collector so it fails harmlessly). Same trick as t/026_arrow_dump. t/029 guards specifically against the earlier surgical-PR regression where arrow_batch.cc was missing parent_query_id entirely (the ClickHouse-native exporter had it, so t/028 alone would have let the gap through). Test 3 in both files uses RAISE WARNING inside a nested SPI call to exercise CaptureLogEvent's queryid/parent_query_id assignment. We cannot use a caught ERROR here: errfinish PG_RE_THROWs at elog.c:539 for ERROR-level events without calling EmitErrorReport, so emit_log_hook only fires later from PostgresMain's top-level catch (after all frames have been popped via PG_CATCH unwinding) — or never at all, for errors caught by a plpgsql EXCEPTION block. WARNING-level events go through EmitErrorReport directly, so our hook fires while the inner SPI's frame is still on the stack, which is the only scenario where CaptureLogEvent's slot choice is observable. The assertions distinguish "inner SPI queryid" from "outer caller queryid" so a regression that attributes the log event to the wrong slot (the off-by-one we're guarding against) would be caught. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/ci-tap.yml | 7 + t/028_parent_query_id.pl | 106 ++++++++------- t/029_parent_query_id_arrow.pl | 240 +++++++++++++++++++++++++++++++++ 3 files changed, 303 insertions(+), 50 deletions(-) create mode 100644 t/029_parent_query_id_arrow.pl diff --git a/.github/workflows/ci-tap.yml b/.github/workflows/ci-tap.yml index 718888b..8e60aca 100644 --- a/.github/workflows/ci-tap.yml +++ b/.github/workflows/ci-tap.yml @@ -42,6 +42,13 @@ jobs: - uses: ./.github/actions/setup-vcpkg + - name: Install uv + # uv is used by Arrow-IPC TAP tests (t/026, t/028) to run inline + # pyarrow scripts that decode the debug_arrow_dump_dir output. + uses: astral-sh/setup-uv@v6 + with: + version: "latest" + - name: Cache PostgreSQL build id: cache-pg uses: actions/cache@v4 diff --git a/t/028_parent_query_id.pl b/t/028_parent_query_id.pl index 80c8ff4..ca4d9f5 100644 --- a/t/028_parent_query_id.pl +++ b/t/028_parent_query_id.pl @@ -2,9 +2,16 @@ # Parent query id linkage: # - top-level queries report parent_query_id = 0 # - nested SPI queries report parent_query_id = outer's query_id -# - log/error events report queryid = the running query and parent_query_id -# = its caller (catches the CaptureLogEvent off-by-one where reading slot -# nesting_level - 1 returns the *running* query, not its parent) +# - log events captured while a nested SPI query is on the stack report +# queryid = the running (inner) statement and parent_query_id = its +# outer caller — catches the CaptureLogEvent off-by-one. +# NOTE: we use RAISE WARNING for this rather than a caught ERROR. +# emit_log_hook does not fire from errfinish for ERROR-level events +# (errfinish PG_RE_THROWs without calling EmitErrorReport; emit_log_hook +# only fires from PostgresMain's top-level catch, after all frames have +# been popped — or never, for caught-in-EXCEPTION errors). WARNING +# goes through EmitErrorReport directly so the frame stack is intact +# when our hook runs. # # Filters key off distinctive table/function names — these survive query # normalization, where string/numeric literals get replaced with $N @@ -116,81 +123,80 @@ is($outer_self, '0', 'outer call still reports parent_query_id = 0'); }; -# Test 3: Error event captured inside a nested SPI query. +# Test 3: Log event captured inside a nested SPI query. # queryid = the running (nested) query's id # parent_query_id = the outer caller's id -# Catches the CaptureLogEvent off-by-one: reading slot nesting_level - 1 -# returns the running query (itself), not its caller. -subtest 'error inside nested SPI links queryid -> outer' => sub { - psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); - psch_reset_stats($node); - +# Catches the CaptureLogEvent off-by-one — the old code read the wrong +# slot and would have attributed the warning to the outer caller (or +# emitted query_id = 0) instead of the inner SPI statement. +subtest 'log event inside nested SPI links queryid -> outer' => sub { $node->safe_psql('postgres', q{ - CREATE TABLE pqid_err_inner(x int); - INSERT INTO pqid_err_inner VALUES (0); - CREATE FUNCTION pqid_err_outer() RETURNS int + CREATE TABLE pqid_warn_tbl(x int); + INSERT INTO pqid_warn_tbl VALUES (1); + CREATE FUNCTION pqid_emit_warn(x int) RETURNS int + LANGUAGE plpgsql AS $$ + BEGIN + RAISE WARNING 'pqid_warn_marker' USING ERRCODE = '01001'; + RETURN x; + END$$; + CREATE FUNCTION pqid_warn_outer() RETURNS int LANGUAGE plpgsql AS $$ DECLARE v int; BEGIN - BEGIN - SELECT 1 / x INTO v FROM pqid_err_inner; - EXCEPTION WHEN division_by_zero THEN - NULL; - END; - RETURN 1; + SELECT pqid_emit_warn(x) INTO v FROM pqid_warn_tbl; + RETURN v; END$$; }); psch_query_clickhouse("TRUNCATE TABLE pg_stat_ch.events_raw"); psch_reset_stats($node); - $node->safe_psql('postgres', 'SELECT pqid_err_outer()'); + $node->safe_psql('postgres', 'SELECT pqid_warn_outer()'); $node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); - # The error-aborts longjmp means the inner SELECT never reaches its - # ExecutorEnd, so we won't find a query-text row for it. The error event - # itself (cmd_type=UNKNOWN, err_sqlstate=22012 for division_by_zero) is - # what carries the linkage signal. + # The log event itself carries no query text (CaptureLogEvent leaves + # query empty) — identify it via cmd_type=UNKNOWN + err_sqlstate=01001 + # (our RAISE WARNING's custom code). psch_wait_for_clickhouse_query( "SELECT count() FROM pg_stat_ch.events_raw " - . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '22012'", + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '01001'", sub { $_[0] >= 1 }, 10, ); - # Before the off-by-one fix, parent_query_id of a log event captured during - # nested execution was the running query's *own* id (slot nesting_level - 1), - # which would not match the outer caller's query_id. After the fix it - # correctly points to the outer (slot nesting_level - 2). - my $err_to_outer = psch_query_clickhouse(q{ - SELECT count() FROM pg_stat_ch.events_raw err_q + # parent_query_id of the warning must equal the outer caller's query_id. + my $warn_to_outer = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw warn_q JOIN pg_stat_ch.events_raw outer_q - ON err_q.parent_query_id = outer_q.query_id - WHERE err_q.cmd_type = 'UNKNOWN' - AND err_q.err_sqlstate = '22012' - AND outer_q.query LIKE '%pqid_err_outer%' + ON warn_q.parent_query_id = outer_q.query_id + WHERE warn_q.cmd_type = 'UNKNOWN' + AND warn_q.err_sqlstate = '01001' + AND outer_q.query LIKE '%pqid_warn_outer%' }); - cmp_ok($err_to_outer, '>=', 1, - 'div-by-zero log event: parent_query_id = outer caller'); - - # Before the fix, queryid was always 0 on log events. After the fix it is - # the currently-running (nested) statement's id, which we don't have a - # textual handle on, but it must at least be non-zero. - my $err_qid_nonzero = psch_query_clickhouse( - "SELECT count() FROM pg_stat_ch.events_raw " - . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '22012' AND query_id != 0" - ); - cmp_ok($err_qid_nonzero, '>=', 1, - 'div-by-zero log event: query_id is the running statement (non-zero)'); + cmp_ok($warn_to_outer, '>=', 1, + 'warning log event: parent_query_id = outer caller'); + + # query_id of the warning must equal the inner SPI statement's query_id + # (the running query), NOT the outer caller's query_id. + my $warn_to_inner = psch_query_clickhouse(q{ + SELECT count() FROM pg_stat_ch.events_raw warn_q + JOIN pg_stat_ch.events_raw inner_q + ON warn_q.query_id = inner_q.query_id + WHERE warn_q.cmd_type = 'UNKNOWN' + AND warn_q.err_sqlstate = '01001' + AND inner_q.query LIKE '%pqid_emit_warn%' + }); + cmp_ok($warn_to_inner, '>=', 1, + 'warning log event: query_id = inner SPI statement (the running query)'); # And those two must NOT be equal — running query is the child, parent # is its caller. - my $running_vs_parent = psch_query_clickhouse( + my $self_parent = psch_query_clickhouse( "SELECT count() FROM pg_stat_ch.events_raw " - . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '22012' " + . "WHERE cmd_type = 'UNKNOWN' AND err_sqlstate = '01001' " . " AND query_id = parent_query_id AND query_id != 0" ); - is($running_vs_parent, '0', + is($self_parent, '0', 'log event query_id and parent_query_id are distinct (no self-parent)'); }; diff --git a/t/029_parent_query_id_arrow.pl b/t/029_parent_query_id_arrow.pl new file mode 100644 index 0000000..9e765d1 --- /dev/null +++ b/t/029_parent_query_id_arrow.pl @@ -0,0 +1,240 @@ +#!/usr/bin/env perl +# Parent query id linkage, verified through the OTel/Arrow export path +# (the production export pathway). No ClickHouse or OTel collector is +# needed: pg_stat_ch.debug_arrow_dump_dir captures each Arrow IPC batch +# to disk before the gRPC send (which we deliberately point at a +# non-existent collector so it fails harmlessly). See t/026_arrow_dump.pl +# for the same trick. +# +# What this test guards: +# 1. The Arrow IPC schema actually contains parent_query_id. An earlier +# version of #95 added the field to PschEvent and the +# ClickHouse-native exporter but missed arrow_batch.cc. That gap +# escaped because the only existing parent_query_id test +# (t/028, ClickHouse-native) didn't exercise the Arrow path. +# 2. Top-level queries report parent_query_id = 0. +# 3. Nested SPI queries report parent_query_id matching the outer's +# query_id. +# 4. Log events captured by emit_log_hook while a nested SPI query is on +# the stack carry query_id of the running (inner) statement and +# parent_query_id of the outer caller — catches the CaptureLogEvent +# off-by-one (would otherwise read the wrong slot). We use RAISE +# WARNING for this: emit_log_hook does not fire from errfinish for +# ERROR-level events (errfinish PG_RE_THROWs without calling +# EmitErrorReport; emit_log_hook only fires later in PostgresMain's +# top-level catch, after all frames have been popped — or never, for +# caught-in-EXCEPTION errors). WARNING goes through EmitErrorReport +# directly so the frame stack is intact when our hook runs. + +use strict; +use warnings; +use lib 't'; +use File::Temp qw(tempdir); + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +if (system("uv --version >/dev/null 2>&1") != 0) { + plan skip_all => 'uv not installed (needed for inline pyarrow validation)'; +} + +my $dump_dir = tempdir('psch_pqid_arrow_XXXX', TMPDIR => 1, CLEANUP => 1); + +my $node = PostgreSQL::Test::Cluster->new('pqid_arrow'); +$node->init(); +$node->append_conf('postgresql.conf', qq{ +shared_preload_libraries = 'pg_stat_ch' +pg_stat_ch.enabled = on +pg_stat_ch.queue_capacity = 65536 +pg_stat_ch.flush_interval_ms = 100 +pg_stat_ch.batch_max = 100 +pg_stat_ch.use_otel = on +pg_stat_ch.otel_endpoint = 'localhost:14317' +pg_stat_ch.otel_arrow_passthrough = on +pg_stat_ch.debug_arrow_dump_dir = '$dump_dir' +pg_stat_ch.hostname = 'test-pqid-arrow-host' +}); +$node->start(); +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_ch'); + +# Fixtures. Distinctive table/function names so we can filter on them +# in the Arrow dumps — names survive query normalization where literals +# do not. +$node->safe_psql('postgres', q{ + CREATE TABLE pqid_top_marker(x int); + + CREATE TABLE pqid_inner_marker(x int); + INSERT INTO pqid_inner_marker VALUES (1); + CREATE FUNCTION pqid_outer_caller() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT x INTO v FROM pqid_inner_marker; + RETURN v; + END$$; + + CREATE TABLE pqid_warn_tbl(x int); + INSERT INTO pqid_warn_tbl VALUES (1); + CREATE FUNCTION pqid_emit_warn(x int) RETURNS int + LANGUAGE plpgsql AS $$ + BEGIN + RAISE WARNING 'pqid_warn_marker' USING ERRCODE = '01001'; + RETURN x; + END$$; + CREATE FUNCTION pqid_warn_outer() RETURNS int + LANGUAGE plpgsql AS $$ + DECLARE v int; + BEGIN + SELECT pqid_emit_warn(x) INTO v FROM pqid_warn_tbl; + RETURN v; + END$$; +}); + +# Wait briefly for setup events to flush, then clear so the test queries +# we drive next are the only ones in the dump set. +$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); +select(undef, undef, undef, 0.5); +unlink glob("$dump_dir/*.ipc"); +psch_reset_stats($node); + +# Drive the test queries. +$node->safe_psql('postgres', q{ + SELECT * FROM pqid_top_marker; + SELECT count(*) FROM pqid_top_marker; + SELECT pqid_outer_caller(); + SELECT pqid_warn_outer(); + SELECT pg_stat_ch_flush(); +}); + +# Wait for dump files. +my @ipc_files; +my $deadline = time() + 10; +while (time() < $deadline) { + @ipc_files = glob("$dump_dir/*.ipc"); + last if @ipc_files > 0; + select(undef, undef, undef, 0.2); +} +cmp_ok(scalar @ipc_files, '>=', 1, 'arrow IPC dumps were produced') + or BAIL_OUT('No Arrow IPC dumps; cannot verify parent_query_id'); + +# Inline pyarrow validator: parses all .ipc files, computes the +# assertions, and emits one KEY=VALUE line per result. Keeping the +# logic in one script avoids parsing Arrow IPC from Perl directly. +my $py = <<'PYEOF'; +# /// script +# requires-python = ">=3.10" +# dependencies = ["pyarrow"] +# /// +import glob, os, sys +import pyarrow.ipc as ipc + +dump_dir = sys.argv[1] +rows = [] +schema_has_pqid = False +for p in sorted(glob.glob(os.path.join(dump_dir, "*.ipc"))): + with open(p, "rb") as f: + reader = ipc.open_stream(f) + if reader.schema.get_field_index("parent_query_id") != -1: + schema_has_pqid = True + rows.extend(reader.read_all().to_pylist()) + +print(f"schema_has_parent_query_id={'1' if schema_has_pqid else '0'}") + +def has(text): + return [r for r in rows if text in (r.get("query_text") or "")] + +# Arrow dict-encodes query_id and parent_query_id as decimal *strings*; +# "0" means top-level / no parent. Compare against strings, not ints. +def is_zero(v): + return v is None or v == "" or v == "0" + +# Test 1: top-level +top = has("pqid_top_marker") +print(f"top_rows={len(top)}") +print(f"top_nonzero_parent={sum(1 for r in top if not is_zero(r.get('parent_query_id')))}") + +# Test 2: nested SPI. Filter down to the SELECT op so that setup CREATE +# TABLE / INSERT / DROP utility statements (which also mention the table) +# don't pollute the "no orphans" check. +outer = [r for r in has("pqid_outer_caller") if r.get("db_operation") == "SELECT"] +inner = [r for r in has("pqid_inner_marker") if r.get("db_operation") == "SELECT"] +outer_qids = {o.get('query_id') for o in outer if not is_zero(o.get('query_id'))} +inner_linked = sum(1 for r in inner if r.get('parent_query_id') in outer_qids) +print(f"outer_rows={len(outer)}") +print(f"inner_rows={len(inner)}") +print(f"inner_linked_to_outer={inner_linked}") +print(f"inner_zero_parent={sum(1 for r in inner if is_zero(r.get('parent_query_id')))}") +print(f"outer_nonzero_parent={sum(1 for r in outer if not is_zero(r.get('parent_query_id')))}") + +# Test 3: log event captured inside nested SPI. The warning event itself +# carries no query_text (CaptureLogEvent leaves it empty), so we identify +# it via err_sqlstate '01001' (our RAISE WARNING's custom code). +warn_outer = [r for r in has("pqid_warn_outer") if r.get("db_operation") == "SELECT"] +warn_inner = [r for r in has("pqid_emit_warn") if r.get("db_operation") == "SELECT"] +warn_outer_qids = {o.get('query_id') for o in warn_outer if not is_zero(o.get('query_id'))} +warn_inner_qids = {i.get('query_id') for i in warn_inner if not is_zero(i.get('query_id'))} +warn = [r for r in rows + if r.get("err_sqlstate") == "01001" and r.get("db_operation") in (None, "", "UNKNOWN")] +warn_linked_to_outer = sum(1 for r in warn + if r.get('parent_query_id') in warn_outer_qids) +warn_qid_is_inner = sum(1 for r in warn if r.get('query_id') in warn_inner_qids) +warn_qid_is_outer = sum(1 for r in warn if r.get('query_id') in warn_outer_qids) +warn_self_parent = sum(1 for r in warn + if not is_zero(r.get('query_id')) + and r.get('query_id') == r.get('parent_query_id')) +print(f"warn_rows={len(warn)}") +print(f"warn_linked_to_outer={warn_linked_to_outer}") +print(f"warn_qid_is_inner={warn_qid_is_inner}") +print(f"warn_qid_is_outer={warn_qid_is_outer}") +print(f"warn_self_parent={warn_self_parent}") +PYEOF + +my $script_path = "$dump_dir/_validate.py"; +open(my $fh, '>', $script_path) or die "Cannot write $script_path: $!"; +print $fh $py; +close $fh; + +my $raw = `uv run --quiet '$script_path' '$dump_dir' 2>&1`; +my %r; +for my $line (split /\n/, $raw) { + $r{$1} = $2 if $line =~ /^(\w+)=(.*)$/; +} +diag("pyarrow stdout:\n$raw") if $ENV{TEST_VERBOSE}; + +# Regression check: the schema itself must include parent_query_id. +is($r{schema_has_parent_query_id}, '1', + 'arrow_batch.cc schema includes parent_query_id column'); + +subtest 'top-level parent_query_id is 0' => sub { + cmp_ok($r{top_rows}, '>=', 1, 'top-level marker rows landed'); + is($r{top_nonzero_parent}, '0', 'top-level rows report parent_query_id = 0'); +}; + +subtest 'nested SPI parent_query_id links to outer' => sub { + cmp_ok($r{outer_rows}, '>=', 1, 'outer rows landed'); + cmp_ok($r{inner_rows}, '>=', 1, 'inner rows landed'); + cmp_ok($r{inner_linked_to_outer}, '>=', 1, + 'inner row joins outer via parent_query_id'); + is($r{inner_zero_parent}, '0', + 'nested SPI is not reported as top-level'); + is($r{outer_nonzero_parent}, '0', + 'outer call still reports parent_query_id = 0'); +}; + +subtest 'log event inside nested SPI links queryid -> outer' => sub { + cmp_ok($r{warn_rows}, '>=', 1, 'RAISE WARNING log event landed'); + cmp_ok($r{warn_linked_to_outer}, '>=', 1, + "log event parent_query_id = outer caller's query_id"); + cmp_ok($r{warn_qid_is_inner}, '>=', 1, + 'log event query_id = inner SPI statement (the running query)'); + is($r{warn_qid_is_outer}, '0', + 'log event query_id is NOT the outer caller (off-by-one regression)'); + is($r{warn_self_parent}, '0', + 'log event query_id != parent_query_id (no self-parent)'); +}; + +$node->stop(); +done_testing();