Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/ci-tap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,16 @@ jobs:
docker compose -f docker/docker-compose.test.yml logs
exit 1

- name: Install goose
run: |
go install github.com/pressly/goose/v3/cmd/goose@v3.27.1
echo "$(go env GOPATH)/bin" >> "$GITHUB_PATH"
Comment thread
cursor[bot] marked this conversation as resolved.

Comment on lines +104 to +108
- name: Initialize ClickHouse schema
run: |
docker exec psch-clickhouse clickhouse-client --multiquery < docker/init/00-schema.sql
docker exec psch-clickhouse clickhouse-client -q "CREATE DATABASE IF NOT EXISTS pg_stat_ch"
goose -dir schema/migrations \
clickhouse "tcp://localhost:19000?database=pg_stat_ch" up
Comment thread
cursor[bot] marked this conversation as resolved.
Comment on lines +104 to +113

- name: Run TAP tests
run: |
Expand Down
5 changes: 5 additions & 0 deletions docker/init/.gitkeep
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Reserved for legacy docker/quickstart bind mounts. The canonical CH schema
# now lives in schema/migrations/ and is applied via goose (see CI workflow).
# This directory is intentionally empty; the .gitkeep keeps the bind mount
# in docker/docker-compose.test.yml and docker/quickstart/docker-compose.yml
# from failing on a missing host path.
Comment on lines +1 to +5
Comment on lines +1 to +5
9 changes: 9 additions & 0 deletions schema/migrations/00000000000001_bootstrap.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Bootstrap migration: seeds the Goose version table so that subsequent
-- migrations have a baseline. The SELECT 1 statements are intentional
-- no-ops — do not delete this file.

-- +goose Up
SELECT 1;

-- +goose Down
SELECT 1;

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions src/export/clickhouse_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ class ClickHouseExporter : public StatsExporter {
return MakeCol<StringCol<string_view>>(name);
}

shared_ptr<Column<string>> DbNameColumn() final { return TagString("db"); }
shared_ptr<Column<string>> DbUserColumn() final { return TagString("username"); }
// Semantic columns
shared_ptr<Column<string>> DbNameColumn() final { return TagString("db_name"); }
shared_ptr<Column<string>> DbUserColumn() final { return TagString("db_user"); }
shared_ptr<Column<uint64_t>> DbDurationColumn() final { return MetricUInt64("duration_us"); }
shared_ptr<Column<string>> DbOperationColumn() final { return TagString("cmd_type"); }
shared_ptr<Column<string_view>> DbQueryTextColumn() final { return RecordString("query"); }
shared_ptr<Column<string>> DbOperationColumn() final { return TagString("db_operation"); }
shared_ptr<Column<string_view>> DbQueryTextColumn() final { return RecordString("query_text"); }

void BeginBatch() final {
for (auto& col : columns_)
Expand Down
10 changes: 5 additions & 5 deletions src/export/exporter_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ class StatsExporter {
// instrument). Pure virtuals enforce explicit handling in every exporter.
// ===========================================================================

// Database name. CH: TagString "db"; OTel semconv: "db.name" tag.
// Database name. CH: TagString "db_name"; OTel semconv: "db.name" tag.
virtual shared_ptr<Column<string>> DbNameColumn() = 0;
// Authenticated user. CH: TagString "username"; OTel semconv: "db.user" tag.
// Authenticated user. CH: TagString "db_user"; OTel semconv: "db.user" tag.
virtual shared_ptr<Column<string>> DbUserColumn() = 0;
// Query duration. Caller appends microseconds. CH: MetricUInt64 "duration_us";
// OTel: converts to seconds, records as Histogram<double> "db.client.operation.duration".
virtual shared_ptr<Column<uint64_t>> DbDurationColumn() = 0;
// SQL command type. CH: RecordString "cmd_type"; OTel: TagString "db.operation.name"
// SQL command type. CH: TagString "db_operation"; OTel: TagString "db.operation.name"
// (used as a dimension on the duration histogram).
virtual shared_ptr<Column<string>> DbOperationColumn() = 0;
// Query text. CH: RecordString "query"; OTel semconv: "db.query.text".
// Query text. CH: RecordString "query_text"; OTel semconv: "db.query.text".
virtual shared_ptr<Column<string_view>> DbQueryTextColumn() = 0;

virtual void BeginBatch() = 0;
Expand Down Expand Up @@ -96,7 +96,7 @@ void RecordExporterFailure(const char* message);
// Expected usage:
// void ProcessBatch(StatsExporter *exporter) {
// exporter->BeginBatch(); // no op or ClickHouse column reset
// auto col_user = exporter->TagString("username");
// auto col_user = exporter->TagString("db_user");
// auto col_rows = exporter->MetricUInt64("rows");
//
// for (const auto &ev : events) {
Expand Down
25 changes: 13 additions & 12 deletions src/export/stats_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,15 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte

exporter->BeginBatch();

auto col_ts_start = exporter->RecordDateTime("ts_start");
auto col_ts = exporter->RecordDateTime("ts");
auto col_duration_us = exporter->DbDurationColumn();
auto col_db = exporter->DbNameColumn();
auto col_username = exporter->DbUserColumn();
auto col_db_name = exporter->DbNameColumn();
auto col_db_user = exporter->DbUserColumn();
auto col_pid = exporter->RecordInt32("pid");
auto col_query_id = exporter->RecordInt64("query_id");
auto col_cmd_type = exporter->DbOperationColumn();
auto col_db_operation = exporter->DbOperationColumn();
auto col_rows = exporter->MetricUInt64("rows");
auto col_query = exporter->DbQueryTextColumn();
auto col_query_text = exporter->DbQueryTextColumn();

auto col_shared_blks_hit = exporter->MetricInt64("shared_blks_hit");
auto col_shared_blks_read = exporter->MetricInt64("shared_blks_read");
Expand Down Expand Up @@ -278,7 +278,7 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
auto col_parallel_workers_planned = exporter->RecordInt16("parallel_workers_planned");
auto col_parallel_workers_launched = exporter->RecordInt16("parallel_workers_launched");

auto col_err_sqlstate = exporter->MetricFixedString(5, "err_sqlstate");
auto col_err_sqlstate = exporter->TagString("err_sqlstate");
auto col_err_elevel = exporter->RecordUInt8("err_elevel");
auto col_err_message = exporter->RecordString("err_message");

Expand All @@ -288,17 +288,17 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
for (const auto& ev : events) {
exporter->BeginRow();

col_ts_start->Append(ev.ts_start + kPostgresEpochOffsetUs);
col_ts->Append(ev.ts_start + kPostgresEpochOffsetUs);
col_duration_us->Append(ev.duration_us);
col_db->Append(std::string(ev.datname, ev.datname_len));
col_username->Append(std::string(ev.username, ev.username_len));
col_db_name->Append(std::string(ev.datname, ev.datname_len));
col_db_user->Append(std::string(ev.username, ev.username_len));
col_pid->Append(ev.pid);
col_query_id->Append(static_cast<int64_t>(ev.queryid));
col_cmd_type->Append(PschCmdTypeToString(ev.cmd_type));
col_db_operation->Append(PschCmdTypeToString(ev.cmd_type));
col_rows->Append(ev.rows);

auto qlen = ClampFieldLen(ev.query_len, static_cast<uint16>(PSCH_MAX_QUERY_LEN), "query_len");
col_query->Append(std::string(ev.query, qlen));
col_query_text->Append(std::string(ev.query, qlen));

col_shared_blks_hit->Append(ev.shared_blks_hit);
col_shared_blks_read->Append(ev.shared_blks_read);
Expand Down Expand Up @@ -335,7 +335,8 @@ void ExportEventStatsInternal(const std::vector<PschEvent>& events, StatsExporte
col_parallel_workers_planned->Append(ev.parallel_workers_planned);
col_parallel_workers_launched->Append(ev.parallel_workers_launched);

col_err_sqlstate->Append(std::string_view(ev.err_sqlstate, 5));
col_err_sqlstate->Append(
std::string(ev.err_sqlstate, strnlen(ev.err_sqlstate, sizeof(ev.err_sqlstate) - 1)));
col_err_elevel->Append(ev.err_elevel);
auto elen = ClampFieldLen(ev.err_message_len, static_cast<uint16>(PSCH_MAX_ERR_MSG_LEN),
"err_message_len");
Expand Down
12 changes: 6 additions & 6 deletions t/010_clickhouse_export.pl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

# Verify query field is populated
my $query_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE query != ''",
"SELECT count() FROM pg_stat_ch.events_raw WHERE query_text != ''",
sub { $_[0] >= 1 },
10
);
Expand Down Expand Up @@ -134,18 +134,18 @@
cmp_ok($duration_check, '>=', 1, 'duration_us is populated');

my $db_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE db = 'postgres'",
"SELECT count() FROM pg_stat_ch.events_raw WHERE db_name = 'postgres'",
sub { $_[0] >= 1 },
10
);
cmp_ok($db_check, '>=', 1, 'db field is populated');
cmp_ok($db_check, '>=', 1, 'db_name field is populated');

my $cmd_type_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE cmd_type != ''",
my $db_operation_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE db_operation != ''",
sub { $_[0] >= 1 },
10
);
cmp_ok($cmd_type_check, '>=', 1, 'cmd_type is populated');
cmp_ok($db_operation_check, '>=', 1, 'db_operation is populated');

# Clean up
$node->safe_psql('postgres', 'DROP TABLE IF EXISTS test_fields');
Expand Down
4 changes: 2 additions & 2 deletions t/012_timing_accuracy.pl
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
# Get timing statistics
my $timing_stats = psch_query_clickhouse(
"SELECT count(), avg(duration_us), min(duration_us), max(duration_us) " .
"FROM pg_stat_ch.events_raw WHERE query LIKE '%pgbench%' OR query LIKE '%UPDATE%'"
"FROM pg_stat_ch.events_raw WHERE query_text LIKE '%pgbench%' OR query_text LIKE '%UPDATE%'"
);
diag("Timing stats: $timing_stats");

Expand Down Expand Up @@ -131,7 +131,7 @@
# Check that we captured a duration close to 100ms
my $duration = psch_query_clickhouse(
"SELECT duration_us FROM pg_stat_ch.events_raw " .
"WHERE query LIKE '%pg_sleep%' ORDER BY ts_start DESC LIMIT 1"
"WHERE query_text LIKE '%pg_sleep%' ORDER BY ts DESC LIMIT 1"
);

if ($duration) {
Expand Down
2 changes: 1 addition & 1 deletion t/013_clickhouse_tls.pl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ sub ch_tls_ready {
"events visible in TLS ClickHouse (got $ch_count)");

my $query_check = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw WHERE query != ''", sub { $_[0] >= 1 }, 10);
"SELECT count() FROM pg_stat_ch.events_raw WHERE query_text != ''", sub { $_[0] >= 1 }, 10);
cmp_ok($query_check, '>=', 1, 'query text captured over TLS');
};

Expand Down
16 changes: 8 additions & 8 deletions t/021_cmd_type_counts.pl
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
plan skip_all => 'ClickHouse container not running';
}

my $node = psch_init_node_with_clickhouse('cmd_type_counts',
my $node = psch_init_node_with_clickhouse('db_operation_counts',
flush_interval_ms => 100,
batch_max => 100
);

# Helper: parse cmd_type counts from ClickHouse
sub get_cmd_type_counts {
# Helper: parse db_operation counts from ClickHouse
sub get_db_operation_counts {
my $result = psch_query_clickhouse(
"SELECT cmd_type, count() FROM pg_stat_ch.events_raw GROUP BY cmd_type FORMAT TabSeparated"
"SELECT db_operation, count() FROM pg_stat_ch.events_raw GROUP BY db_operation FORMAT TabSeparated"
);
Comment on lines +31 to 35
my %counts;
for my $line (split /\n/, $result) {
Expand Down Expand Up @@ -72,7 +72,7 @@ sub get_cmd_type_counts {
$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()');
sleep(2); # Wait for export without polling (polling adds SELECTs)

my %counts = get_cmd_type_counts();
my %counts = get_db_operation_counts();

# Verify exact counts (flush adds 1 SELECT, so expect 4)
is($counts{SELECT} // 0, 4, 'SELECT count = 4 (3 + flush)');
Expand Down Expand Up @@ -109,7 +109,7 @@ sub get_cmd_type_counts {
$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()');
sleep(2); # Wait for export without polling

my %counts = get_cmd_type_counts();
my %counts = get_db_operation_counts();

is($counts{UTILITY} // 0, 8, 'UTILITY count = 8 DDL statements');
is($counts{SELECT} // 0, 1, 'SELECT count = 1 (flush only)');
Expand Down Expand Up @@ -156,7 +156,7 @@ sub get_cmd_type_counts {
$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()');
sleep(2); # Wait for export without polling

my %counts = get_cmd_type_counts();
my %counts = get_db_operation_counts();

# Verify exact counts
# SELECT: 2 queries + 1 flush = 3
Expand Down Expand Up @@ -206,7 +206,7 @@ sub get_cmd_type_counts {
$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()');
sleep(2); # Wait for export without polling

my %counts = get_cmd_type_counts();
my %counts = get_db_operation_counts();

# All DDL should be UTILITY - exactly 11
is($counts{UTILITY} // 0, 11,
Expand Down
52 changes: 26 additions & 26 deletions t/027_query_normalization.pl
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ sub get_captured_query {
psch_wait_for_export($node, 1, 10);

return psch_wait_for_clickhouse_query(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query NOT LIKE '%pg_extension%' " .
"AND query != '' " .
"ORDER BY ts_start DESC LIMIT 1",
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text NOT LIKE '%pg_extension%' " .
"AND query_text != '' " .
"ORDER BY ts DESC LIMIT 1",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -193,11 +193,11 @@ sub get_captured_query {
psch_wait_for_export($node, 2, 10);

my $all_queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%test_norm%' " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query != ''",
"AND query_text LIKE '%test_norm%' " .
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text != ''",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -292,19 +292,19 @@ sub get_captured_query {
$session->quit();

my $error_query = psch_query_clickhouse(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE err_message != '' " .
"ORDER BY ts_start DESC LIMIT 1"
"ORDER BY ts DESC LIMIT 1"
);
is($error_query, '', 'Error events do not export query text');

my $q = psch_wait_for_clickhouse_query(
"SELECT query FROM pg_stat_ch.events_raw " .
"SELECT query_text FROM pg_stat_ch.events_raw " .
"WHERE err_message = '' " .
"AND query NOT LIKE '%pg_stat_ch%' " .
"AND query NOT LIKE '%pg_extension%' " .
"AND query != '' " .
"ORDER BY ts_start DESC LIMIT 1",
"AND query_text NOT LIKE '%pg_stat_ch%' " .
"AND query_text NOT LIKE '%pg_extension%' " .
"AND query_text != '' " .
"ORDER BY ts DESC LIMIT 1",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -388,19 +388,19 @@ sub get_captured_query {
my $nested_count = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] >= 3 },
10
);
cmp_ok($nested_count, '>=', 3,
'Captured recursive nested executions of the same SPI statement');

my $queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] ne '' },
10
);
Expand Down Expand Up @@ -433,19 +433,19 @@ sub get_captured_query {
my $repeat_count = psch_wait_for_clickhouse_query(
"SELECT count() FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] >= 4 },
10
);
cmp_ok($repeat_count, '>=', 4,
'Captured repeated executions of the same SPI statement in one backend');

my $repeat_queries = psch_wait_for_clickhouse_query(
"SELECT groupArray(query) FROM pg_stat_ch.events_raw " .
"SELECT groupArray(query_text) FROM pg_stat_ch.events_raw " .
"WHERE pid = $pid " .
"AND query LIKE '%WHERE%' " .
"AND query LIKE '%nested_normalize_same_sql%'",
"AND query_text LIKE '%WHERE%' " .
"AND query_text LIKE '%nested_normalize_same_sql%'",
sub { $_[0] ne '' },
10
);
Expand Down
Loading
Loading