diff --git a/include/config/guc.h b/include/config/guc.h index eb0d4a9..a728ce9 100644 --- a/include/config/guc.h +++ b/include/config/guc.h @@ -33,6 +33,7 @@ extern int psch_min_duration_us; extern int psch_normalize_cache_max; extern double psch_sample_rate; extern bool psch_otel_arrow_passthrough; +extern bool psch_use_unified_arrow_exporter; extern int psch_otel_max_block_bytes; extern char* psch_extra_attributes; extern char* psch_debug_arrow_dump_dir; diff --git a/src/config/guc.c b/src/config/guc.c index 97b5473..46f9641 100644 --- a/src/config/guc.c +++ b/src/config/guc.c @@ -35,6 +35,7 @@ int psch_min_duration_us = 0; int psch_normalize_cache_max = 32768; double psch_sample_rate = 1.0; bool psch_otel_arrow_passthrough = false; +bool psch_use_unified_arrow_exporter = false; int psch_otel_max_block_bytes = 3 * 1024 * 1024; // 3 MiB (max: 16 MiB) char* psch_extra_attributes = NULL; char* psch_debug_arrow_dump_dir = NULL; @@ -356,6 +357,23 @@ void PschInitGuc(void) { 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "pg_stat_ch.use_unified_arrow_exporter", + "Use the StatsExporter-implementing Arrow exporter instead of arrow_batch.cc.", + "When enabled together with use_otel and otel_arrow_passthrough, the bgworker " + "builds Arrow IPC via the typed StatsExporter column interface, writing the " + "events_raw schema (typed integer ids, no sprintf decimal-string encoding). " + "When off, the legacy arrow_batch.cc path runs and produces the query_logs_arrow " + "wire shape. Default off; flip on to opt a producer into the new exporter. " + "PGC_POSTMASTER: the bgworker picks the exporter implementation at init time, " + "so a runtime change would mismatch the per-cycle dispatcher (Arrow batches " + "would silently drop, since OTelArrowExporter does not implement SendArrowBatch).", + &psch_use_unified_arrow_exporter, + false, + PGC_POSTMASTER, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "pg_stat_ch.otel_max_block_bytes", "Maximum Arrow batch size in bytes per OTLP request.", diff --git a/src/export/otel_arrow_exporter.cc b/src/export/otel_arrow_exporter.cc new file mode 100644 index 0000000..38400b1 --- /dev/null +++ b/src/export/otel_arrow_exporter.cc @@ -0,0 +1,595 @@ +// pg_stat_ch OTel/Arrow unified exporter. +// +// Adapts the StatsExporter column-factory API into a typed Arrow column set, +// serializes the assembled RecordBatch as ZSTD-compressed Arrow IPC, and +// ships it through a held OTelExporter via SendArrowBatch. Wire shape +// targets events_raw (the unified schema authored in PR #99), not the legacy +// query_logs_arrow table. The legacy arrow_batch.cc path stays alive for +// the latter; this exporter is selected via +// pg_stat_ch.use_unified_arrow_exporter (default off). +// +// Composition over inheritance: we hold (not extend) OTelExporter so we +// reuse its gRPC connection lifecycle and Arrow-over-OTel wire format +// without inheriting its per-row LogRecord state machine, which is unused +// on the Arrow path. + +extern "C" { +#include "postgres.h" + +#include "utils/timestamp.h" +} + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "pg_stat_ch/pg_stat_ch.h" +#include "config/guc.h" +#include "export/exporter_interface.h" +#include "export/otel_arrow_exporter.h" +#include "export/otel_exporter.h" + +namespace { + +using DictBuilder = arrow::StringDictionary32Builder; + +// PostgreSQL epoch is 2000-01-01, Unix epoch is 1970-01-01. Difference is +// 946684800 seconds = 946684800000000 microseconds. +constexpr int64_t kPostgresEpochOffsetUs = 946684800000000LL; + +// Optional debug capture: when psch_debug_arrow_dump_dir is set, write each +// IPC batch to disk before shipping. Used by t/026_arrow_dump and the new +// end-to-end test to assert on Arrow output without standing up an OTel +// collector. +// +// Intentionally a near-duplicate of stats_exporter.cc:MaybeDumpArrowBatch. +// Both copies die when arrow_batch.cc is retired after the +// use_unified_arrow_exporter cutover; extracting a shared helper now would +// create a header just to delete it later. +void MaybeDumpArrowBatch(const uint8_t* data, size_t len) { + if (data == nullptr || len == 0 || psch_debug_arrow_dump_dir == nullptr || + *psch_debug_arrow_dump_dir == '\0') { + return; + } + const auto unix_now_ns = + static_cast((GetCurrentTimestamp() + kPostgresEpochOffsetUs) * 1000LL); + char path[MAXPGPATH]; + char tmp_path[MAXPGPATH]; + snprintf(path, sizeof(path), "%s/arrow_%llu.ipc", psch_debug_arrow_dump_dir, unix_now_ns); + snprintf(tmp_path, sizeof(tmp_path), "%s/arrow_%llu.ipc.tmp", psch_debug_arrow_dump_dir, + unix_now_ns); + + FILE* file = fopen(tmp_path, "wb"); + if (file == nullptr) { + elog(WARNING, "pg_stat_ch: failed to open Arrow dump file '%s'", tmp_path); + return; + } + const size_t written = fwrite(data, 1, len, file); + fclose(file); + if (written != len) { + remove(tmp_path); + elog(WARNING, "pg_stat_ch: short Arrow dump write to '%s' (%zu/%zu bytes)", tmp_path, written, + len); + return; + } + if (rename(tmp_path, path) != 0) { + remove(tmp_path); + elog(WARNING, "pg_stat_ch: failed to finalize Arrow dump file '%s'", path); + } +} + +// events_raw declares ts as DateTime64(6, 'UTC') (microsecond precision). +// Match it on the wire: any other precision risks silent truncation or a +// CH type-mismatch on insert. +std::shared_ptr TimestampType() { + static const auto kType = arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"); + return kType; +} + +std::shared_ptr DictUtf8Type() { + static const auto kType = arrow::dictionary(arrow::int32(), arrow::utf8()); + return kType; +} + +void LogArrowFailure(const char* context, const arrow::Status& status) { + elog(WARNING, "pg_stat_ch: %s: %s", context, status.ToString().c_str()); +} + +// One column slot: a typed Arrow builder kept alive by the exporter (the +// Column wrapper handed to the caller holds only a non-owning pointer +// into this slot, so it can safely drop out of scope before CommitBatch). +struct ArrowSlot { + std::string name; + std::shared_ptr field; + std::shared_ptr builder; +}; + +// Parse "key1:val1;key2:val2" into a flat list. First match wins on +// duplicate keys (Get linear-scans from the front). Empty input -> empty list. +class ExtraAttrs { + public: + explicit ExtraAttrs(const char* raw) { + if (raw == nullptr) { + return; + } + std::string_view input(raw); + while (!input.empty()) { + const size_t delim = input.find(';'); + const std::string_view token = + (delim == std::string_view::npos) ? input : input.substr(0, delim); + const size_t sep = token.find(':'); + if (sep != std::string_view::npos) { + attrs_.emplace_back(std::string(token.substr(0, sep)), std::string(token.substr(sep + 1))); + } + if (delim == std::string_view::npos) { + break; + } + input.remove_prefix(delim + 1); + } + } + + std::string Get(std::string_view key) const { + for (const auto& [k, v] : attrs_) { + if (k == key) { + return v; + } + } + return {}; + } + + private: + std::vector> attrs_; +}; + +// --------------------------------------------------------------------------- + +class OTelArrowExporter : public StatsExporter { + public: + OTelArrowExporter() : inner_(MakeOpenTelemetryExporter()) {} + + // -- Low-cardinality columns -- + shared_ptr> StatLCString(string_view n) final { return MakeDictStr(n); } + shared_ptr> StatLCUInt8(string_view n) final { + return MakeNum(n, arrow::uint8()); + } + shared_ptr> StatLCInt16(string_view n) final { + return MakeNum(n, arrow::int16()); + } + shared_ptr> StatLCInt32(string_view n) final { + return MakeNum(n, arrow::int32()); + } + + // -- High-cardinality columns -- + shared_ptr> StatHCString(string_view n) final { return MakeUtf8Sv(n); } + shared_ptr> StatHCInt64(string_view n) final { + return MakeNum(n, arrow::int64()); + } + shared_ptr> StatHCUInt64(string_view n) final { + return MakeNum(n, arrow::uint64()); + } + + shared_ptr> StatTimestamp(string_view n) final { return MakeTimestamp(n); } + + // -- Semantic columns -- + shared_ptr> DbNameColumn() final { return MakeDictStr("db_name"); } + shared_ptr> DbUserColumn() final { return MakeDictStr("db_user"); } + shared_ptr> DbDurationColumn() final { + return MakeNum("duration_us", arrow::uint64()); + } + shared_ptr> DbOperationColumn() final { return MakeDictStr("db_operation"); } + shared_ptr> DbQueryTextColumn() final { return MakeUtf8Sv("query_text"); } + + // -- Lifecycle -- + void BeginBatch() final; + void BeginRow() final; + bool CommitBatch() final; + + // Transport pass-through to the inner OTelExporter. + bool EstablishNewConnection() final { return inner_->EstablishNewConnection(); } + bool IsConnected() const final { return inner_->IsConnected(); } + int NumConsecutiveFailures() const final { return inner_->NumConsecutiveFailures(); } + void ResetFailures() final { inner_->ResetFailures(); } + // Report rows from THIS batch (sum across mid-batch flushes + the final + // flush), not the inner exporter's cumulative count. + // BeginBatch resets exported_in_batch_; each successful Flush adds the + // chunk's row count. The inner's exported_count_ accumulates across + // batches because we never call inner_->BeginBatch() (the inner's per-record + // LogRecord state machine is unused on this path). PschExportBatch reads + // NumExported() once per successful CommitBatch and fetch_adds it into + // shared stats, so a cumulative value here would cause quadratic + // over-reporting. + int NumExported() const final { return exported_in_batch_; } + + private: + // -- Column wrappers --------------------------------------------------- + // Nested so they can inherit from StatsExporter::Column (protected in + // the base; visible to derived classes and their nested types). Each one + // Append-forwards to a single typed builder and bumps the exporter's + // bytes_estimate_ for mid-batch flush bookkeeping. Crunch() is unused on + // this path — builders are finished together in Flush() (invoked + // mid-batch when bytes_estimate_ crosses max_block_bytes_, and once at + // CommitBatch for the residual chunk). + + // Variable-length columns over-estimate offset overhead at 4 bytes/value + // (32-bit offsets), ignoring the dictionary's shared backing store. For LC + // columns this overshoots when values repeat — fine; bytes_estimate_ is a + // budget threshold, not a wire-size predictor, and over-counting just + // means flushing slightly earlier than strictly necessary. + static constexpr size_t kVarLenOffsetBytes = 4; + + template + class ArrowNumColumn : public Column { + public: + ArrowNumColumn(BuilderT* builder, size_t* bytes) : builder_(builder), bytes_(bytes) {} + void Append(const T& v) final { + const arrow::Status s = builder_->Append(static_cast(v)); + if (!s.ok()) { + LogArrowFailure("Arrow numeric append", s); + return; + } + *bytes_ += sizeof(typename BuilderT::value_type); + } + void Crunch() final {} + + private: + BuilderT* const builder_; + size_t* const bytes_; + }; + + class ArrowDictStrColumn : public Column { + public: + ArrowDictStrColumn(DictBuilder* builder, size_t* bytes) : builder_(builder), bytes_(bytes) {} + void Append(const std::string& v) final { + const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); + if (!s.ok()) { + LogArrowFailure("Arrow dict str append", s); + return; + } + *bytes_ += v.size() + kVarLenOffsetBytes; + } + void Crunch() final {} + + private: + DictBuilder* const builder_; + size_t* const bytes_; + }; + + class ArrowDictSvColumn : public Column { + public: + ArrowDictSvColumn(DictBuilder* builder, size_t* bytes) : builder_(builder), bytes_(bytes) {} + void Append(const std::string_view& v) final { + const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); + if (!s.ok()) { + LogArrowFailure("Arrow dict sv append", s); + return; + } + *bytes_ += v.size() + kVarLenOffsetBytes; + } + void Crunch() final {} + + private: + DictBuilder* const builder_; + size_t* const bytes_; + }; + + class ArrowUtf8SvColumn : public Column { + public: + ArrowUtf8SvColumn(arrow::StringBuilder* builder, size_t* bytes) + : builder_(builder), bytes_(bytes) {} + void Append(const std::string_view& v) final { + const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); + if (!s.ok()) { + LogArrowFailure("Arrow utf8 sv append", s); + return; + } + *bytes_ += v.size() + kVarLenOffsetBytes; + } + void Crunch() final {} + + private: + arrow::StringBuilder* const builder_; + size_t* const bytes_; + }; + + class ArrowTimestampColumn : public Column { + public: + ArrowTimestampColumn(arrow::TimestampBuilder* builder, size_t* bytes) + : builder_(builder), bytes_(bytes) {} + // Caller passes Unix microseconds; the column is MICRO-precision, so + // append directly with no conversion. + void Append(const int64_t& v) final { + const arrow::Status s = builder_->Append(v); + if (!s.ok()) { + LogArrowFailure("Arrow timestamp append", s); + return; + } + *bytes_ += sizeof(int64_t); + } + void Crunch() final {} + + private: + arrow::TimestampBuilder* const builder_; + size_t* const bytes_; + }; + + // Schema-side helpers: create a builder, register a slot, return a wrapper. + template + shared_ptr> MakeNum(string_view name, std::shared_ptr dtype) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), std::move(dtype)), std::move(builder)}); + return std::make_shared>(raw, &bytes_estimate_); + } + shared_ptr> MakeDictStr(string_view name) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), DictUtf8Type()), std::move(builder)}); + return std::make_shared(raw, &bytes_estimate_); + } + shared_ptr> MakeDictSv(string_view name) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), DictUtf8Type()), std::move(builder)}); + return std::make_shared(raw, &bytes_estimate_); + } + shared_ptr> MakeUtf8Sv(string_view name) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), arrow::utf8()), std::move(builder)}); + return std::make_shared(raw, &bytes_estimate_); + } + shared_ptr> MakeTimestamp(string_view name) { + auto builder = + std::make_shared(TimestampType(), arrow::default_memory_pool()); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), TimestampType()), std::move(builder)}); + return std::make_shared(raw, &bytes_estimate_); + } + + // Builds the IPC stream from the current builder state, ships it via the + // inner exporter, and resets per-flush counters. After successful + // ArrayBuilder::Finish each builder is reset to empty (Arrow contract), so + // mid-batch chunks reuse the same slot vector and column wrappers transparently. + bool Flush(); + + // ---------------------------------------------------------------- + // Columns the events_raw schema declares that the StatsExporter caller + // does not explicitly populate. The exporter synthesizes them on + // BeginRow so stats_exporter.cc's column-emission loop doesn't have to + // know about them: + // + // - 8 envelope columns + read_replica_type: per-process constants from + // pg_stat_ch.extra_attributes (or "none" default for read_replica_type + // per clickgres-platform's convention). + // - service_version: PG_STAT_CH_VERSION macro, not from extra_attributes. + // + // parent_query_id is *not* synthesized: PR #95 will add the column to + // events_raw and wire PschEvent::parent_query_id through the interface + // (StatHCInt64). Sending it before then would either fail the insert + // (column doesn't exist in events_raw on main yet) or rely on CH's + // unknown-column drop behavior — both worse than just not sending it. + // ---------------------------------------------------------------- + void RegisterEnvelopeColumns(); + + std::unique_ptr inner_; + std::vector slots_; + // Rows accumulated in the current chunk (resets after each Flush). The + // total rows exported across all chunks in the active batch live in + // exported_in_batch_ below. + int row_count_ = 0; + int exported_in_batch_ = 0; + // Conservative running estimate of how many bytes the current chunk would + // produce. Bumped by each column wrapper's Append; sampled at row boundary + // to decide whether to flush mid-batch. + size_t bytes_estimate_ = 0; + size_t max_block_bytes_ = 0; + // Sticky: any Flush failure inside the batch poisons CommitBatch so the + // dispatcher doesn't see partial rows charged twice or a half-shipped batch + // counted as success. + bool batch_failed_ = false; + + // Synthesized columns (populated implicitly in BeginRow). + shared_ptr> inst_ubid_; + shared_ptr> srv_ubid_; + shared_ptr> srv_role_; + shared_ptr> region_; + shared_ptr> cell_; + shared_ptr> svc_ver_; + shared_ptr> host_id_; + shared_ptr> pod_name_; + shared_ptr> read_replica_type_; + + // Cached for per-row appends. + std::string instance_ubid_val_; + std::string server_ubid_val_; + std::string server_role_val_; + std::string region_val_; + std::string cell_val_; + std::string service_version_val_; + std::string host_id_val_; + std::string pod_name_val_; + std::string read_replica_type_val_; +}; + +void OTelArrowExporter::RegisterEnvelopeColumns() { + // OTel resource attributes from psch_extra_attributes. + inst_ubid_ = MakeUtf8Sv("instance_ubid"); + srv_ubid_ = MakeUtf8Sv("server_ubid"); + srv_role_ = MakeDictSv("server_role"); + read_replica_type_ = MakeDictSv("read_replica_type"); + region_ = MakeDictSv("region"); + cell_ = MakeDictSv("cell"); + svc_ver_ = MakeDictSv("service_version"); + host_id_ = MakeUtf8Sv("host_id"); + pod_name_ = MakeUtf8Sv("pod_name"); + + const ExtraAttrs attrs(psch_extra_attributes); + instance_ubid_val_ = attrs.Get("instance_ubid"); + server_ubid_val_ = attrs.Get("server_ubid"); + server_role_val_ = attrs.Get("server_role"); + region_val_ = attrs.Get("region"); + cell_val_ = attrs.Get("cell"); + host_id_val_ = attrs.Get("host_id"); + pod_name_val_ = attrs.Get("pod_name"); + + // events_raw declares read_replica_type DEFAULT 'none'; if extra_attributes + // didn't supply one, send 'none' explicitly so the wire shape matches the + // schema regardless of CH's default-fill behavior for Arrow inserts. + read_replica_type_val_ = attrs.Get("read_replica_type"); + if (read_replica_type_val_.empty()) { + read_replica_type_val_ = "none"; + } + + // service_version is a compile-time pg_stat_ch identifier, not a runtime + // resource attribute — keep it pinned to the macro the rest of the + // codebase already uses. + service_version_val_ = PG_STAT_CH_VERSION; +} + +void OTelArrowExporter::BeginBatch() { + slots_.clear(); + row_count_ = 0; + exported_in_batch_ = 0; + bytes_estimate_ = 0; + batch_failed_ = false; + // Match ExportEventsAsArrowInternal's floor so a misconfigured tiny GUC + // value can't degrade into a per-row flush loop. + max_block_bytes_ = std::max(65536, static_cast(psch_otel_max_block_bytes)); + RegisterEnvelopeColumns(); +} + +void OTelArrowExporter::BeginRow() { + if (batch_failed_) { + return; + } + // Mid-batch flush at the row boundary: only safe between rows (column + // builders are row-aligned), and only meaningful once at least one row is + // already accumulated. + if (row_count_ > 0 && bytes_estimate_ >= max_block_bytes_) { + if (!Flush()) { + batch_failed_ = true; + return; + } + } + ++row_count_; + // Synthesized columns fire here so the call site doesn't need to know + // about them. + inst_ubid_->Append(instance_ubid_val_); + srv_ubid_->Append(server_ubid_val_); + srv_role_->Append(server_role_val_); + read_replica_type_->Append(read_replica_type_val_); + region_->Append(region_val_); + cell_->Append(cell_val_); + svc_ver_->Append(service_version_val_); + host_id_->Append(host_id_val_); + pod_name_->Append(pod_name_val_); +} + +bool OTelArrowExporter::CommitBatch() { + if (batch_failed_) { + return false; + } + if (slots_.empty() || row_count_ == 0) { + return true; + } + return Flush(); +} + +bool OTelArrowExporter::Flush() { + std::vector> fields; + fields.reserve(slots_.size()); + std::vector> arrays; + arrays.reserve(slots_.size()); + + for (auto& slot : slots_) { + fields.push_back(slot.field); + std::shared_ptr array; + // arrow::ArrayBuilder::Finish is virtual; DictBuilder's override returns + // a DictionaryArray downcast to Array. The call also resets the builder + // back to an empty state, so the next chunk in this same batch can + // reuse it without re-registering slots or invalidating the column + // shared_ptrs the dispatcher is still holding. + const arrow::Status status = slot.builder->Finish(&array); + if (!status.ok()) { + LogArrowFailure(("Arrow finish " + slot.name).c_str(), status); + return false; + } + arrays.push_back(std::move(array)); + } + + auto schema = arrow::schema(std::move(fields)); + auto record_batch = arrow::RecordBatch::Make(std::move(schema), row_count_, std::move(arrays)); + + auto out_stream_result = arrow::io::BufferOutputStream::Create(); + if (!out_stream_result.ok()) { + LogArrowFailure("Arrow IPC stream create", out_stream_result.status()); + return false; + } + auto out_stream = *out_stream_result; + + arrow::ipc::IpcWriteOptions write_opts = arrow::ipc::IpcWriteOptions::Defaults(); + auto codec_result = arrow::util::Codec::Create(arrow::Compression::ZSTD); + if (!codec_result.ok()) { + LogArrowFailure("Arrow ZSTD codec create", codec_result.status()); + return false; + } + write_opts.codec = std::shared_ptr(std::move(*codec_result)); + + auto writer_result = arrow::ipc::MakeStreamWriter(out_stream, record_batch->schema(), write_opts); + if (!writer_result.ok()) { + LogArrowFailure("Arrow stream writer create", writer_result.status()); + return false; + } + auto writer = *writer_result; + if (auto s = writer->WriteRecordBatch(*record_batch); !s.ok()) { + LogArrowFailure("Arrow WriteRecordBatch", s); + return false; + } + if (auto s = writer->Close(); !s.ok()) { + LogArrowFailure("Arrow writer close", s); + return false; + } + + auto buf_result = out_stream->Finish(); + if (!buf_result.ok()) { + LogArrowFailure("Arrow IPC finalize", buf_result.status()); + return false; + } + auto buf = *buf_result; + const auto buf_len = static_cast(buf->size()); + + MaybeDumpArrowBatch(buf->data(), buf_len); + if (!inner_->SendArrowBatch(buf->data(), buf_len, row_count_)) { + return false; + } + // Builders are already reset by Finish above; clear our row-side state so + // continued appends from the dispatcher start fresh. + exported_in_batch_ += row_count_; + row_count_ = 0; + bytes_estimate_ = 0; + return true; +} + +} // namespace + +std::unique_ptr MakeUnifiedArrowExporter() { + return std::make_unique(); +} diff --git a/src/export/otel_arrow_exporter.h b/src/export/otel_arrow_exporter.h new file mode 100644 index 0000000..9c98a90 --- /dev/null +++ b/src/export/otel_arrow_exporter.h @@ -0,0 +1,21 @@ +#ifndef PG_STAT_CH_SRC_EXPORT_OTEL_ARROW_EXPORTER_H_ +#define PG_STAT_CH_SRC_EXPORT_OTEL_ARROW_EXPORTER_H_ + +#include "export/exporter_interface.h" + +#include + +// Build an Arrow-IPC-emitting StatsExporter that ships through a held +// OTelExporter for transport. Implements the StatLC/StatHC interface end-to- +// end with typed Arrow column wrappers (LC -> StringDictionary32Builder, +// HC -> plain typed builder), so column type choices stay in one place +// instead of being duplicated across the column-emission path and a +// separate IPC builder. +// +// Gated by pg_stat_ch.use_unified_arrow_exporter (default off). The legacy +// arrow_batch.cc path stays alive when the GUC is off — wire shape there +// targets clickgres-platform's query_logs_arrow, which is a different +// table from events_raw and retains the sprintf-decimal id encoding. +std::unique_ptr MakeUnifiedArrowExporter(); + +#endif // PG_STAT_CH_SRC_EXPORT_OTEL_ARROW_EXPORTER_H_ diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index f586238..624af55 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -17,6 +17,7 @@ extern "C" { #include "export/arrow_batch.h" #include "export/clickhouse_exporter.h" #include "export/exporter_interface.h" +#include "export/otel_arrow_exporter.h" #include "export/otel_exporter.h" #include "export/stats_exporter.h" #include "queue/event.h" @@ -410,7 +411,11 @@ bool PschExporterInit(void) { std::set_terminate(PschTerminateHandler); try { - if (psch_use_otel) { + if (psch_use_otel && psch_otel_arrow_passthrough && psch_use_unified_arrow_exporter) { + // New unified path: typed Arrow column wrappers driving the + // StatsExporter interface end-to-end, writing the events_raw schema. + g_exporter.exporter = MakeUnifiedArrowExporter(); + } else if (psch_use_otel) { g_exporter.exporter = MakeOpenTelemetryExporter(); } else { g_exporter.exporter = MakeClickHouseExporter(); @@ -450,8 +455,15 @@ int PschExportBatch(void) { return 0; } - if (psch_use_otel && psch_otel_arrow_passthrough) { - elog(DEBUG1, "pg_stat_ch: exporting batch of %zu events as Arrow IPC", events.size()); + // Legacy Arrow path bypasses the StatsExporter column interface entirely + // — ArrowBatchBuilder owns its own column shape, the exporter is reached + // only for SendArrowBatch (transport). The new unified exporter goes + // through the interface like CH-native and OTel column-emission do, so + // when its GUC is on we skip the bypass and fall through to + // ExportEventStats. + if (psch_use_otel && psch_otel_arrow_passthrough && !psch_use_unified_arrow_exporter) { + elog(DEBUG1, "pg_stat_ch: exporting batch of %zu events as Arrow IPC (legacy)", + events.size()); return ExportEventsAsArrow(events, exporter); } diff --git a/t/036_unified_arrow_e2e.pl b/t/036_unified_arrow_e2e.pl new file mode 100644 index 0000000..c0e9cf8 --- /dev/null +++ b/t/036_unified_arrow_e2e.pl @@ -0,0 +1,190 @@ +#!/usr/bin/env perl +# Test: end-to-end round-trip of the new OTelArrowExporter into events_raw. +# +# Closes the test gap that's existed since the Arrow path went live: nothing +# currently proves that pg_stat_ch's Arrow IPC output can actually be ingested +# by ClickHouse against the unified events_raw schema. t/026_arrow_dump asserts +# on the IPC schema shape via pyarrow but never pushes the bytes into CH. +# +# Flow: +# 1. Spin up a node with use_unified_arrow_exporter=on + +# debug_arrow_dump_dir set. The OTel endpoint points at a non-existent +# collector — the bgworker's gRPC send fails, but MaybeDumpArrowBatch +# fires BEFORE the send, so IPC files still land on disk regardless. +# 2. Run a known set of queries to populate the queue. +# 3. Wait for IPC files to appear in the dump dir. +# 4. TRUNCATE pg_stat_ch.events_raw. +# 5. For each IPC file: curl POST it to CH as +# INSERT INTO pg_stat_ch.events_raw FORMAT ArrowStream. +# 6. Assert: row count matches what we sent, column types are what +# events_raw declares (no silent string-to-Int coercion), known queries +# land with the right db_name/db_operation/query_text values. + +use strict; +use warnings; +use lib 't'; +use File::Temp qw(tempdir); +use File::Basename; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +if (!psch_clickhouse_available()) { + plan skip_all => 'Docker not available, skipping end-to-end Arrow test'; +} + +my $ch_check = `curl -s 'http://localhost:18123/' --data 'SELECT 1' 2>/dev/null`; +if ($ch_check !~ /^1/) { + plan skip_all => 'ClickHouse container not running'; +} + +# Verify the events_raw schema is present (applied by the goose migration +# step in CI). If we hit an empty CH or a different schema, fail loudly +# rather than silently inserting into nothing. +my $schema_check = psch_query_clickhouse( + "SELECT count() FROM system.columns WHERE database = 'pg_stat_ch' AND table = 'events_raw'"); +chomp $schema_check; +if ($schema_check eq '' || $schema_check < 50) { + plan skip_all => "events_raw not initialized in CH (got $schema_check columns); " . + "run goose migrations against schema/migrations/ first"; +} + +# Pre-flight: ensure events_raw is empty so we don't measure leftover rows. +psch_query_clickhouse('TRUNCATE TABLE pg_stat_ch.events_raw'); + +my $dump_dir = tempdir('psch_unified_arrow_e2e_XXXX', TMPDIR => 1, CLEANUP => 1); + +my $node = PostgreSQL::Test::Cluster->new('unified_arrow_e2e'); +$node->init(); +$node->append_conf('postgresql.conf', qq{ +shared_preload_libraries = 'pg_stat_ch' +pg_stat_ch.enabled = on +pg_stat_ch.queue_capacity = 65536 +pg_stat_ch.flush_interval_ms = 100 +pg_stat_ch.batch_max = 100 +pg_stat_ch.use_otel = on +pg_stat_ch.otel_endpoint = 'localhost:14317' +pg_stat_ch.otel_arrow_passthrough = on +pg_stat_ch.use_unified_arrow_exporter = on +pg_stat_ch.debug_arrow_dump_dir = '$dump_dir' +pg_stat_ch.hostname = 'unified-arrow-e2e-host' +pg_stat_ch.extra_attributes = 'instance_ubid:test-instance;server_role:primary;region:test-region;cell:test-cell;read_replica_type:none' +}); +$node->start(); +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_ch'); + +# ---------------------------------------------------------------------------- +# Run a deliberately-shaped workload so the assertions can pin specific rows. +# ---------------------------------------------------------------------------- +psch_reset_stats($node); + +$node->safe_psql('postgres', 'SELECT 42 AS marker_select'); +$node->safe_psql('postgres', 'CREATE TABLE unified_e2e_t (id int)'); +$node->safe_psql('postgres', 'INSERT INTO unified_e2e_t VALUES (1), (2), (3)'); +$node->safe_psql('postgres', 'SELECT count(*) FROM unified_e2e_t'); +$node->safe_psql('postgres', 'DROP TABLE unified_e2e_t'); + +# Force a flush so the bgworker drains the queue promptly. +$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + +# ---------------------------------------------------------------------------- +# Wait for IPC files in the dump dir. The bgworker writes them out before +# the gRPC SendArrowBatch call, so we see them even though OTel send fails. +# ---------------------------------------------------------------------------- +my @ipc_files; +my $deadline = time() + 10; +while (time() < $deadline) { + @ipc_files = glob("$dump_dir/*.ipc"); + last if @ipc_files > 0; + select(undef, undef, undef, 0.2); +} + +cmp_ok(scalar @ipc_files, '>=', 1, "Unified exporter produced at least one IPC dump file"); + +# Each file is non-empty (ZSTD-compressed Arrow IPC has some envelope bytes +# even for zero rows; for our payload it should be hundreds of bytes). +for my $f (@ipc_files) { + cmp_ok(-s $f, '>', 0, basename($f) . " is non-empty"); +} + +# ---------------------------------------------------------------------------- +# Post each IPC file to CH as INSERT INTO ... FORMAT ArrowStream and verify +# CH accepts the wire format. A 200 response means ArrowStream parsed the +# IPC, found events_raw columns by name, and inserted at least one row. +# Any column type mismatch (e.g. if the producer regressed to writing +# query_id as String) would surface here as a 4xx with a clear error. +# ---------------------------------------------------------------------------- +my $insert_url = + 'http://localhost:18123/?query=' . + 'INSERT%20INTO%20pg_stat_ch.events_raw%20FORMAT%20ArrowStream'; + +my $total_posted = 0; +for my $f (@ipc_files) { + my $err_file = "$f.err"; + # --fail-with-body makes HTTP 4xx/5xx responses from CH surface as a + # non-zero curl exit code (default curl returns 0 on any completed + # HTTP transfer regardless of status). The subsequent SELECT count() + # assertion would catch ingestion failure too, but failing here gives + # a sharper error message at the point the bug actually happened. + my $rc = system("curl -sS --fail-with-body -X POST " . + "-H 'Content-Type: application/octet-stream' " . + "--data-binary \@$f '$insert_url' 2>$err_file >/dev/null"); + my $stderr = -s $err_file ? do { + open my $fh, '<', $err_file; local $/; <$fh> + } : ''; + is($rc, 0, basename($f) . " accepted by CH ArrowStream parser" . + ($stderr ? " (stderr: $stderr)" : "")); + ++$total_posted; +} +cmp_ok($total_posted, '>=', 1, "Posted at least one IPC file to CH"); + +# ---------------------------------------------------------------------------- +# Verify: rows landed, with the right typing, populated by our known workload. +# ---------------------------------------------------------------------------- +my $rowcount = psch_query_clickhouse('SELECT count() FROM pg_stat_ch.events_raw'); +chomp $rowcount; +cmp_ok($rowcount, '>=', 5, "events_raw has at least the 5 workload queries (got $rowcount)"); + +# Column types: pull system.columns and assert no surprises. If the new +# exporter regressed to the legacy string-encoded id columns, query_id / +# pid would come back as String here. +my $types = psch_query_clickhouse( + "SELECT name, type FROM system.columns " . + "WHERE database = 'pg_stat_ch' AND table = 'events_raw' " . + "AND name IN ('query_id', 'pid', 'err_elevel', " . + "'parallel_workers_planned', 'duration_us', 'shared_blks_hit') " . + "ORDER BY name FORMAT TSV"); + +# system.columns reflects the schema's declared types, not the inserted bytes. +# But a wire-format mismatch on insert would have failed the curl above +# already (CH refuses ArrowStream inserts when an Arrow column's logical +# type can't be cast to the target schema's column type). +like($types, qr/^duration_us\tUInt64\b/m, 'duration_us declared UInt64'); +like($types, qr/^err_elevel\tUInt8\b/m, 'err_elevel declared UInt8'); +like($types, qr/^parallel_workers_planned\tInt16\b/m, 'parallel_workers_planned declared Int16'); +like($types, qr/^pid\tInt32\b/m, 'pid declared Int32'); +like($types, qr/^query_id\tInt64\b/m, 'query_id declared Int64'); +like($types, qr/^shared_blks_hit\tInt64\b/m, 'shared_blks_hit declared Int64'); + +# Known-row assertions: pinpoint the SELECT 42 AS marker_select event. +my $marker = psch_query_clickhouse( + "SELECT db_name, db_operation, query_text FROM pg_stat_ch.events_raw " . + "WHERE query_text LIKE '%marker_select%' AND query_text NOT LIKE '%pg_stat_ch%' " . + "ORDER BY ts DESC LIMIT 1 FORMAT TSV"); +chomp $marker; +like($marker, qr/^postgres\tSELECT\t/, "marker SELECT landed with db_name=postgres, db_operation=SELECT"); +like($marker, qr/marker_select/, "marker SELECT preserved query_text"); + +# Envelope columns from extra_attributes were threaded through. +my $envelope = psch_query_clickhouse( + "SELECT DISTINCT instance_ubid, server_role, region, cell, read_replica_type " . + "FROM pg_stat_ch.events_raw WHERE instance_ubid != '' LIMIT 1 FORMAT TSV"); +chomp $envelope; +is($envelope, "test-instance\tprimary\ttest-region\ttest-cell\tnone", + "envelope columns populated from extra_attributes"); + +$node->stop(); +done_testing(); diff --git a/test/regression/expected/guc.out b/test/regression/expected/guc.out index 0707840..51fc35d 100644 --- a/test/regression/expected/guc.out +++ b/test/regression/expected/guc.out @@ -32,7 +32,8 @@ SELECT name FROM pg_settings WHERE name LIKE 'pg_stat_ch.%' ORDER BY name COLLAT pg_stat_ch.sample_rate pg_stat_ch.string_area_size pg_stat_ch.use_otel -(29 rows) + pg_stat_ch.use_unified_arrow_exporter +(30 rows) SHOW pg_stat_ch.enabled; pg_stat_ch.enabled