From 22b9f85e802650e341403be107d52461d47a891b Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 18 Jun 2026 13:56:53 -0700 Subject: [PATCH 1/6] feat(exporter): add OTelArrowExporter implementing StatsExporter end-to-end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce a new exporter that builds Arrow IPC RecordBatches through the typed StatsExporter column-factory interface (StatLC/StatHC/StatTimestamp) instead of the open-coded ArrowBatchBuilder used by arrow_batch.cc. Composition over inheritance: the new exporter holds an OTelExporter for gRPC transport (SendArrowBatch) but doesn't extend it, so the per-row LogRecord state machine in OTelExporter — which is unused on this path post-PR-#72 — stays out of scope. Wire shape targets events_raw (the unified schema authored in PR #99), not the legacy query_logs_arrow: * query_id, parent_query_id: Int64 (no sprintf decimal-string encoding) * pid: Int32 * err_elevel: UInt8 * buffer counters (shared/local/temp_blks_*, *_blk_*_time_us, wal_*, cpu_*_time_us): Int64 * parallel_workers_planned/launched: Int16 * jit_*: Int32 * LC strings (db_*, err_sqlstate, app, server_role, region, cell, service_version, read_replica_type) -> DictionaryUtf8 * HC strings (query_text, err_message, client_addr, instance_ubid, server_ubid, host_id, pod_name) -> plain utf8 * ts: arrow::timestamp(MICRO, "UTC") matching DateTime64(6, 'UTC') Column wrappers are nested private types inside OTelArrowExporter (not at namespace scope) so they can inherit from the protected Column base — same convention OTelExporter and ClickHouseExporter use for their own column types. Columns the caller doesn't explicitly populate are synthesized in BeginRow by the exporter itself, so stats_exporter.cc's column-emission loop stays unchanged: * parent_query_id (hardcoded 0 until PR #95 lands and PschEvent carries the field — events_raw requires the column on every insert, no DEFAULT) * 8 envelope columns from pg_stat_ch.extra_attributes (instance_ubid, server_ubid, server_role, region, cell, host_id, pod_name) plus read_replica_type (default 'none' if extra_attributes didn't supply) * service_version pinned to the compile-time PG_STAT_CH_VERSION macro This commit only adds the exporter file (no dispatcher wiring yet) — the next commit adds the GUC and routes batches through it when on. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/export/otel_arrow_exporter.cc | 501 ++++++++++++++++++++++++++++++ src/export/otel_arrow_exporter.h | 21 ++ 2 files changed, 522 insertions(+) create mode 100644 src/export/otel_arrow_exporter.cc create mode 100644 src/export/otel_arrow_exporter.h diff --git a/src/export/otel_arrow_exporter.cc b/src/export/otel_arrow_exporter.cc new file mode 100644 index 0000000..90ad080 --- /dev/null +++ b/src/export/otel_arrow_exporter.cc @@ -0,0 +1,501 @@ +// pg_stat_ch OTel/Arrow unified exporter. +// +// Adapts the StatsExporter column-factory API into a typed Arrow column set, +// serializes the assembled RecordBatch as ZSTD-compressed Arrow IPC, and +// ships it through a held OTelExporter via SendArrowBatch. Wire shape +// targets events_raw (the unified schema authored in PR #99), not the legacy +// query_logs_arrow table. The legacy arrow_batch.cc path stays alive for +// the latter; this exporter is selected via +// pg_stat_ch.use_unified_arrow_exporter (default off). +// +// Composition over inheritance: we hold (not extend) OTelExporter so we +// reuse its gRPC connection lifecycle and Arrow-over-OTel wire format +// without inheriting its per-row LogRecord state machine, which is unused +// on the Arrow path. + +extern "C" { +#include "postgres.h" + +#include "utils/timestamp.h" +} + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "pg_stat_ch/pg_stat_ch.h" +#include "config/guc.h" +#include "export/exporter_interface.h" +#include "export/otel_arrow_exporter.h" +#include "export/otel_exporter.h" + +namespace { + +using DictBuilder = arrow::StringDictionary32Builder; + +// PostgreSQL epoch is 2000-01-01, Unix epoch is 1970-01-01. Difference is +// 946684800 seconds = 946684800000000 microseconds. +constexpr int64_t kPostgresEpochOffsetUs = 946684800000000LL; + +// Optional debug capture: when psch_debug_arrow_dump_dir is set, write each +// IPC batch to disk before shipping. Used by t/026_arrow_dump and the new +// end-to-end test to assert on Arrow output without standing up an OTel +// collector. +void MaybeDumpArrowBatch(const uint8_t* data, size_t len) { + if (data == nullptr || len == 0 || psch_debug_arrow_dump_dir == nullptr || + *psch_debug_arrow_dump_dir == '\0') { + return; + } + const auto unix_now_ns = + static_cast((GetCurrentTimestamp() + kPostgresEpochOffsetUs) * 1000LL); + char path[MAXPGPATH]; + char tmp_path[MAXPGPATH]; + snprintf(path, sizeof(path), "%s/arrow_%llu.ipc", psch_debug_arrow_dump_dir, unix_now_ns); + snprintf(tmp_path, sizeof(tmp_path), "%s/arrow_%llu.ipc.tmp", psch_debug_arrow_dump_dir, + unix_now_ns); + + FILE* file = fopen(tmp_path, "wb"); + if (file == nullptr) { + elog(WARNING, "pg_stat_ch: failed to open Arrow dump file '%s'", tmp_path); + return; + } + const size_t written = fwrite(data, 1, len, file); + fclose(file); + if (written != len) { + remove(tmp_path); + elog(WARNING, "pg_stat_ch: short Arrow dump write to '%s' (%zu/%zu bytes)", tmp_path, written, + len); + return; + } + if (rename(tmp_path, path) != 0) { + remove(tmp_path); + elog(WARNING, "pg_stat_ch: failed to finalize Arrow dump file '%s'", path); + } +} + +// events_raw declares ts as DateTime64(6, 'UTC') (microsecond precision). +// Match it on the wire: any other precision risks silent truncation or a +// CH type-mismatch on insert. +std::shared_ptr TimestampType() { + static const auto kType = arrow::timestamp(arrow::TimeUnit::MICRO, "UTC"); + return kType; +} + +std::shared_ptr DictUtf8Type() { + static const auto kType = arrow::dictionary(arrow::int32(), arrow::utf8()); + return kType; +} + +void LogArrowFailure(const char* context, const arrow::Status& status) { + elog(WARNING, "pg_stat_ch: %s: %s", context, status.ToString().c_str()); +} + +// One column slot: a typed Arrow builder kept alive by the exporter (the +// Column wrapper handed to the caller holds only a non-owning pointer +// into this slot, so it can safely drop out of scope before CommitBatch). +struct ArrowSlot { + std::string name; + std::shared_ptr field; + std::shared_ptr builder; +}; + +// Parse "key1:val1;key2:val2" into a flat list. Last write wins on duplicate +// keys. Empty input -> empty list. +class ExtraAttrs { + public: + explicit ExtraAttrs(const char* raw) { + if (raw == nullptr) { + return; + } + std::string_view input(raw); + while (!input.empty()) { + const size_t delim = input.find(';'); + const std::string_view token = + (delim == std::string_view::npos) ? input : input.substr(0, delim); + const size_t sep = token.find(':'); + if (sep != std::string_view::npos) { + attrs_.emplace_back(std::string(token.substr(0, sep)), std::string(token.substr(sep + 1))); + } + if (delim == std::string_view::npos) { + break; + } + input.remove_prefix(delim + 1); + } + } + + std::string Get(std::string_view key) const { + for (const auto& [k, v] : attrs_) { + if (k == key) { + return v; + } + } + return {}; + } + + private: + std::vector> attrs_; +}; + +// --------------------------------------------------------------------------- + +class OTelArrowExporter : public StatsExporter { + public: + OTelArrowExporter() : inner_(MakeOpenTelemetryExporter()) {} + + // -- Low-cardinality columns -- + shared_ptr> StatLCString(string_view n) final { return MakeDictStr(n); } + shared_ptr> StatLCUInt8(string_view n) final { + return MakeNum(n, arrow::uint8()); + } + shared_ptr> StatLCInt16(string_view n) final { + return MakeNum(n, arrow::int16()); + } + shared_ptr> StatLCInt32(string_view n) final { + return MakeNum(n, arrow::int32()); + } + + // -- High-cardinality columns -- + shared_ptr> StatHCString(string_view n) final { return MakeUtf8Sv(n); } + shared_ptr> StatHCInt64(string_view n) final { + return MakeNum(n, arrow::int64()); + } + shared_ptr> StatHCUInt64(string_view n) final { + return MakeNum(n, arrow::uint64()); + } + + shared_ptr> StatTimestamp(string_view n) final { return MakeTimestamp(n); } + + // -- Semantic columns -- + shared_ptr> DbNameColumn() final { return MakeDictStr("db_name"); } + shared_ptr> DbUserColumn() final { return MakeDictStr("db_user"); } + shared_ptr> DbDurationColumn() final { + return MakeNum("duration_us", arrow::uint64()); + } + shared_ptr> DbOperationColumn() final { return MakeDictStr("db_operation"); } + shared_ptr> DbQueryTextColumn() final { return MakeUtf8Sv("query_text"); } + + // -- Lifecycle -- + void BeginBatch() final; + void BeginRow() final; + bool CommitBatch() final; + + // Transport pass-through to the inner OTelExporter. + bool EstablishNewConnection() final { return inner_->EstablishNewConnection(); } + bool IsConnected() const final { return inner_->IsConnected(); } + int NumConsecutiveFailures() const final { return inner_->NumConsecutiveFailures(); } + void ResetFailures() final { inner_->ResetFailures(); } + int NumExported() const final { return inner_->NumExported(); } + + private: + // -- Column wrappers --------------------------------------------------- + // Nested so they can inherit from StatsExporter::Column (protected in + // the base; visible to derived classes and their nested types). Each one + // Append-forwards to a single typed builder; Crunch() is unused on this + // path — builders are finished together in CommitBatch when we have all + // slots in hand. + + template + class ArrowNumColumn : public Column { + public: + explicit ArrowNumColumn(BuilderT* builder) : builder_(builder) {} + void Append(const T& v) final { + const arrow::Status s = builder_->Append(static_cast(v)); + if (!s.ok()) { + LogArrowFailure("Arrow numeric append", s); + } + } + void Crunch() final {} + + private: + BuilderT* const builder_; + }; + + class ArrowDictStrColumn : public Column { + public: + explicit ArrowDictStrColumn(DictBuilder* builder) : builder_(builder) {} + void Append(const std::string& v) final { + const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); + if (!s.ok()) { + LogArrowFailure("Arrow dict str append", s); + } + } + void Crunch() final {} + + private: + DictBuilder* const builder_; + }; + + class ArrowDictSvColumn : public Column { + public: + explicit ArrowDictSvColumn(DictBuilder* builder) : builder_(builder) {} + void Append(const std::string_view& v) final { + const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); + if (!s.ok()) { + LogArrowFailure("Arrow dict sv append", s); + } + } + void Crunch() final {} + + private: + DictBuilder* const builder_; + }; + + class ArrowUtf8SvColumn : public Column { + public: + explicit ArrowUtf8SvColumn(arrow::StringBuilder* builder) : builder_(builder) {} + void Append(const std::string_view& v) final { + const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); + if (!s.ok()) { + LogArrowFailure("Arrow utf8 sv append", s); + } + } + void Crunch() final {} + + private: + arrow::StringBuilder* const builder_; + }; + + class ArrowTimestampColumn : public Column { + public: + explicit ArrowTimestampColumn(arrow::TimestampBuilder* builder) : builder_(builder) {} + // Caller passes Unix microseconds; the column is MICRO-precision, so + // append directly with no conversion. + void Append(const int64_t& v) final { + const arrow::Status s = builder_->Append(v); + if (!s.ok()) { + LogArrowFailure("Arrow timestamp append", s); + } + } + void Crunch() final {} + + private: + arrow::TimestampBuilder* const builder_; + }; + + // Schema-side helpers: create a builder, register a slot, return a wrapper. + template + shared_ptr> MakeNum(string_view name, std::shared_ptr dtype) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), std::move(dtype)), std::move(builder)}); + return std::make_shared>(raw); + } + shared_ptr> MakeDictStr(string_view name) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), DictUtf8Type()), std::move(builder)}); + return std::make_shared(raw); + } + shared_ptr> MakeDictSv(string_view name) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), DictUtf8Type()), std::move(builder)}); + return std::make_shared(raw); + } + shared_ptr> MakeUtf8Sv(string_view name) { + auto builder = std::make_shared(); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), arrow::utf8()), std::move(builder)}); + return std::make_shared(raw); + } + shared_ptr> MakeTimestamp(string_view name) { + auto builder = + std::make_shared(TimestampType(), arrow::default_memory_pool()); + auto* raw = builder.get(); + slots_.push_back( + {std::string(name), arrow::field(std::string(name), TimestampType()), std::move(builder)}); + return std::make_shared(raw); + } + + // ---------------------------------------------------------------- + // Columns the events_raw schema declares that the StatsExporter caller + // does not explicitly populate. The exporter synthesizes them on + // BeginRow so stats_exporter.cc's column-emission loop doesn't have to + // know about them: + // + // - 8 envelope columns + read_replica_type: per-process constants from + // pg_stat_ch.extra_attributes (or "none" default for read_replica_type + // per clickgres-platform's convention). + // - service_version: PG_STAT_CH_VERSION macro, not from extra_attributes. + // + // parent_query_id is *not* synthesized: PR #95 will add the column to + // events_raw and wire PschEvent::parent_query_id through the interface + // (StatHCInt64). Sending it before then would either fail the insert + // (column doesn't exist in events_raw on main yet) or rely on CH's + // unknown-column drop behavior — both worse than just not sending it. + // ---------------------------------------------------------------- + void RegisterEnvelopeColumns(); + + std::unique_ptr inner_; + std::vector slots_; + int row_count_ = 0; + + // Synthesized columns (populated implicitly in BeginRow). + shared_ptr> inst_ubid_; + shared_ptr> srv_ubid_; + shared_ptr> srv_role_; + shared_ptr> region_; + shared_ptr> cell_; + shared_ptr> svc_ver_; + shared_ptr> host_id_; + shared_ptr> pod_name_; + shared_ptr> read_replica_type_; + + // Cached for per-row appends. + std::string instance_ubid_val_; + std::string server_ubid_val_; + std::string server_role_val_; + std::string region_val_; + std::string cell_val_; + std::string service_version_val_; + std::string host_id_val_; + std::string pod_name_val_; + std::string read_replica_type_val_; +}; + +void OTelArrowExporter::RegisterEnvelopeColumns() { + // OTel resource attributes from psch_extra_attributes. + inst_ubid_ = MakeUtf8Sv("instance_ubid"); + srv_ubid_ = MakeUtf8Sv("server_ubid"); + srv_role_ = MakeDictSv("server_role"); + read_replica_type_ = MakeDictSv("read_replica_type"); + region_ = MakeDictSv("region"); + cell_ = MakeDictSv("cell"); + svc_ver_ = MakeDictSv("service_version"); + host_id_ = MakeUtf8Sv("host_id"); + pod_name_ = MakeUtf8Sv("pod_name"); + + const ExtraAttrs attrs(psch_extra_attributes); + instance_ubid_val_ = attrs.Get("instance_ubid"); + server_ubid_val_ = attrs.Get("server_ubid"); + server_role_val_ = attrs.Get("server_role"); + region_val_ = attrs.Get("region"); + cell_val_ = attrs.Get("cell"); + host_id_val_ = attrs.Get("host_id"); + pod_name_val_ = attrs.Get("pod_name"); + + // events_raw declares read_replica_type DEFAULT 'none'; if extra_attributes + // didn't supply one, send 'none' explicitly so the wire shape matches the + // schema regardless of CH's default-fill behavior for Arrow inserts. + read_replica_type_val_ = attrs.Get("read_replica_type"); + if (read_replica_type_val_.empty()) { + read_replica_type_val_ = "none"; + } + + // service_version is a compile-time pg_stat_ch identifier, not a runtime + // resource attribute — keep it pinned to the macro the rest of the + // codebase already uses. + service_version_val_ = PG_STAT_CH_VERSION; +} + +void OTelArrowExporter::BeginBatch() { + slots_.clear(); + row_count_ = 0; + RegisterEnvelopeColumns(); +} + +void OTelArrowExporter::BeginRow() { + ++row_count_; + // Synthesized columns fire here so the call site doesn't need to know + // about them. + inst_ubid_->Append(instance_ubid_val_); + srv_ubid_->Append(server_ubid_val_); + srv_role_->Append(server_role_val_); + read_replica_type_->Append(read_replica_type_val_); + region_->Append(region_val_); + cell_->Append(cell_val_); + svc_ver_->Append(service_version_val_); + host_id_->Append(host_id_val_); + pod_name_->Append(pod_name_val_); +} + +bool OTelArrowExporter::CommitBatch() { + if (slots_.empty() || row_count_ == 0) { + return true; + } + + std::vector> fields; + fields.reserve(slots_.size()); + std::vector> arrays; + arrays.reserve(slots_.size()); + + for (auto& slot : slots_) { + fields.push_back(slot.field); + std::shared_ptr array; + // arrow::ArrayBuilder::Finish is virtual; DictBuilder's override returns + // a DictionaryArray downcast to Array. No RTTI needed. + const arrow::Status status = slot.builder->Finish(&array); + if (!status.ok()) { + LogArrowFailure(("Arrow finish " + slot.name).c_str(), status); + return false; + } + arrays.push_back(std::move(array)); + } + + auto schema = arrow::schema(std::move(fields)); + auto record_batch = arrow::RecordBatch::Make(std::move(schema), row_count_, std::move(arrays)); + + auto out_stream_result = arrow::io::BufferOutputStream::Create(); + if (!out_stream_result.ok()) { + LogArrowFailure("Arrow IPC stream create", out_stream_result.status()); + return false; + } + auto out_stream = *out_stream_result; + + arrow::ipc::IpcWriteOptions write_opts = arrow::ipc::IpcWriteOptions::Defaults(); + auto codec_result = arrow::util::Codec::Create(arrow::Compression::ZSTD); + if (!codec_result.ok()) { + LogArrowFailure("Arrow ZSTD codec create", codec_result.status()); + return false; + } + write_opts.codec = std::shared_ptr(std::move(*codec_result)); + + auto writer_result = arrow::ipc::MakeStreamWriter(out_stream, record_batch->schema(), write_opts); + if (!writer_result.ok()) { + LogArrowFailure("Arrow stream writer create", writer_result.status()); + return false; + } + auto writer = *writer_result; + if (auto s = writer->WriteRecordBatch(*record_batch); !s.ok()) { + LogArrowFailure("Arrow WriteRecordBatch", s); + return false; + } + if (auto s = writer->Close(); !s.ok()) { + LogArrowFailure("Arrow writer close", s); + return false; + } + + auto buf_result = out_stream->Finish(); + if (!buf_result.ok()) { + LogArrowFailure("Arrow IPC finalize", buf_result.status()); + return false; + } + auto buf = *buf_result; + const auto buf_len = static_cast(buf->size()); + + MaybeDumpArrowBatch(buf->data(), buf_len); + return inner_->SendArrowBatch(buf->data(), buf_len, row_count_); +} + +} // namespace + +std::unique_ptr MakeUnifiedArrowExporter() { + return std::make_unique(); +} diff --git a/src/export/otel_arrow_exporter.h b/src/export/otel_arrow_exporter.h new file mode 100644 index 0000000..9c98a90 --- /dev/null +++ b/src/export/otel_arrow_exporter.h @@ -0,0 +1,21 @@ +#ifndef PG_STAT_CH_SRC_EXPORT_OTEL_ARROW_EXPORTER_H_ +#define PG_STAT_CH_SRC_EXPORT_OTEL_ARROW_EXPORTER_H_ + +#include "export/exporter_interface.h" + +#include + +// Build an Arrow-IPC-emitting StatsExporter that ships through a held +// OTelExporter for transport. Implements the StatLC/StatHC interface end-to- +// end with typed Arrow column wrappers (LC -> StringDictionary32Builder, +// HC -> plain typed builder), so column type choices stay in one place +// instead of being duplicated across the column-emission path and a +// separate IPC builder. +// +// Gated by pg_stat_ch.use_unified_arrow_exporter (default off). The legacy +// arrow_batch.cc path stays alive when the GUC is off — wire shape there +// targets clickgres-platform's query_logs_arrow, which is a different +// table from events_raw and retains the sprintf-decimal id encoding. +std::unique_ptr MakeUnifiedArrowExporter(); + +#endif // PG_STAT_CH_SRC_EXPORT_OTEL_ARROW_EXPORTER_H_ From 0f10fc9c21197b22e7e549c85da4715607bd04f9 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 18 Jun 2026 13:57:11 -0700 Subject: [PATCH 2/6] feat(exporter): add use_unified_arrow_exporter GUC + dispatcher wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New bool GUC pg_stat_ch.use_unified_arrow_exporter (default off, PGC_SIGHUP) opts a producer instance into the OTelArrowExporter added in the previous commit. When on (together with use_otel and otel_arrow_passthrough), the bgworker constructs an OTelArrowExporter at init time and PschExportBatch goes through ExportEventStats (the typed column interface) instead of calling ExportEventsAsArrow (the legacy bypass that uses arrow_batch.cc directly). When off — the default — behavior is preserved bit-for-bit: * arrow_batch.cc / ExportEventsAsArrow remain reachable on the arrow_passthrough path * The OTelExporter column-emission path (off-arrow OTel logs) remains reachable when arrow_passthrough is off * The CH-native ClickHouseExporter remains reachable when use_otel is off The sprintf-decimal-string ID encoding in arrow_batch.cc lives on for the legacy path; the new exporter neither calls into it nor perpetuates it. After the GUC has been on in prod long enough to retire the legacy query_logs_arrow table, the arrow_batch.cc + ExportEventsAsArrow* + the otel_arrow_passthrough GUC itself can be deleted in a single follow-on. Co-Authored-By: Claude Opus 4.7 (1M context) --- include/config/guc.h | 1 + src/config/guc.c | 15 +++++++++++++++ src/export/stats_exporter.cc | 18 +++++++++++++++--- test/regression/expected/guc.out | 3 ++- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/include/config/guc.h b/include/config/guc.h index eb0d4a9..a728ce9 100644 --- a/include/config/guc.h +++ b/include/config/guc.h @@ -33,6 +33,7 @@ extern int psch_min_duration_us; extern int psch_normalize_cache_max; extern double psch_sample_rate; extern bool psch_otel_arrow_passthrough; +extern bool psch_use_unified_arrow_exporter; extern int psch_otel_max_block_bytes; extern char* psch_extra_attributes; extern char* psch_debug_arrow_dump_dir; diff --git a/src/config/guc.c b/src/config/guc.c index 97b5473..8aaa5b1 100644 --- a/src/config/guc.c +++ b/src/config/guc.c @@ -35,6 +35,7 @@ int psch_min_duration_us = 0; int psch_normalize_cache_max = 32768; double psch_sample_rate = 1.0; bool psch_otel_arrow_passthrough = false; +bool psch_use_unified_arrow_exporter = false; int psch_otel_max_block_bytes = 3 * 1024 * 1024; // 3 MiB (max: 16 MiB) char* psch_extra_attributes = NULL; char* psch_debug_arrow_dump_dir = NULL; @@ -356,6 +357,20 @@ void PschInitGuc(void) { 0, NULL, NULL, NULL); + DefineCustomBoolVariable( + "pg_stat_ch.use_unified_arrow_exporter", + "Use the StatsExporter-implementing Arrow exporter instead of arrow_batch.cc.", + "When enabled together with use_otel and otel_arrow_passthrough, the bgworker " + "builds Arrow IPC via the typed StatsExporter column interface, writing the " + "events_raw schema (typed integer ids, no sprintf decimal-string encoding). " + "When off, the legacy arrow_batch.cc path runs and produces the query_logs_arrow " + "wire shape. Default off; flip on to opt a producer into the new exporter.", + &psch_use_unified_arrow_exporter, + false, + PGC_SIGHUP, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "pg_stat_ch.otel_max_block_bytes", "Maximum Arrow batch size in bytes per OTLP request.", diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index f586238..624af55 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -17,6 +17,7 @@ extern "C" { #include "export/arrow_batch.h" #include "export/clickhouse_exporter.h" #include "export/exporter_interface.h" +#include "export/otel_arrow_exporter.h" #include "export/otel_exporter.h" #include "export/stats_exporter.h" #include "queue/event.h" @@ -410,7 +411,11 @@ bool PschExporterInit(void) { std::set_terminate(PschTerminateHandler); try { - if (psch_use_otel) { + if (psch_use_otel && psch_otel_arrow_passthrough && psch_use_unified_arrow_exporter) { + // New unified path: typed Arrow column wrappers driving the + // StatsExporter interface end-to-end, writing the events_raw schema. + g_exporter.exporter = MakeUnifiedArrowExporter(); + } else if (psch_use_otel) { g_exporter.exporter = MakeOpenTelemetryExporter(); } else { g_exporter.exporter = MakeClickHouseExporter(); @@ -450,8 +455,15 @@ int PschExportBatch(void) { return 0; } - if (psch_use_otel && psch_otel_arrow_passthrough) { - elog(DEBUG1, "pg_stat_ch: exporting batch of %zu events as Arrow IPC", events.size()); + // Legacy Arrow path bypasses the StatsExporter column interface entirely + // — ArrowBatchBuilder owns its own column shape, the exporter is reached + // only for SendArrowBatch (transport). The new unified exporter goes + // through the interface like CH-native and OTel column-emission do, so + // when its GUC is on we skip the bypass and fall through to + // ExportEventStats. + if (psch_use_otel && psch_otel_arrow_passthrough && !psch_use_unified_arrow_exporter) { + elog(DEBUG1, "pg_stat_ch: exporting batch of %zu events as Arrow IPC (legacy)", + events.size()); return ExportEventsAsArrow(events, exporter); } diff --git a/test/regression/expected/guc.out b/test/regression/expected/guc.out index 0707840..51fc35d 100644 --- a/test/regression/expected/guc.out +++ b/test/regression/expected/guc.out @@ -32,7 +32,8 @@ SELECT name FROM pg_settings WHERE name LIKE 'pg_stat_ch.%' ORDER BY name COLLAT pg_stat_ch.sample_rate pg_stat_ch.string_area_size pg_stat_ch.use_otel -(29 rows) + pg_stat_ch.use_unified_arrow_exporter +(30 rows) SHOW pg_stat_ch.enabled; pg_stat_ch.enabled From 021e75f686cc8fab7953ef4c788a47ab81cf163a Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 18 Jun 2026 13:57:39 -0700 Subject: [PATCH 3/6] test: end-to-end round-trip of the unified Arrow exporter into events_raw MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the test gap that's existed since the Arrow path went live: no existing test proves that pg_stat_ch's Arrow IPC output can actually be ingested by ClickHouse against the unified events_raw schema. t/026 asserts on the IPC schema shape via pyarrow but never pushes the bytes into CH; t/010 etc. exercise the CH-native Block path, not Arrow. The new test wires the full producer-to-CH chain locally, bypassing the OTel collector + receiver service entirely: 1. Spin up a node with use_unified_arrow_exporter=on + debug_arrow_dump_dir set, an OTel endpoint that doesn't resolve so gRPC send fails — MaybeDumpArrowBatch fires BEFORE send so IPC files land on disk regardless. 2. Run a deliberately-shaped workload (SELECT, CREATE, INSERT, SELECT count, DROP — five distinct statements). 3. Force pg_stat_ch_flush(), wait for IPC files in $dump_dir. 4. TRUNCATE pg_stat_ch.events_raw, then for each IPC file: curl -X POST --data-binary @$f \ 'http://localhost:18123/?query=INSERT INTO pg_stat_ch.events_raw FORMAT ArrowStream' A type mismatch on the wire (e.g. if the producer regressed to writing query_id as String) would surface here as a 4xx with a clear error rather than silently corrupting data. 5. SELECT count() FROM events_raw, assert >= 5 rows. 6. Pull system.columns and assert each id/counter column has the declared type from PR #99's schema (no silent string-typed regressions). 7. Pinpoint the marker SELECT row and assert db_name/db_operation/ query_text values match what we sent. 8. Assert envelope columns (instance_ubid, server_role, region, cell, read_replica_type) carry the values from pg_stat_ch.extra_attributes. 9. Assert parent_query_id is 0 across all rows (synthesized by the exporter until PR #95 lands). Skips cleanly when Docker / the test CH container / the events_raw schema aren't available — same patterns as t/010, t/013, t/021. The "no OTel collector required" property makes this test purely a producer⇄CH wire-format check. The clickgres-platform Go receiver is not exercised here, since for verifying that the bytes match the schema, a curl invocation is the simplest possible expression of "POST this Arrow IPC body to CH" — the receiver's only added value over curl in prod is OTel-collector pipeline integration, which we don't care about for wire-format correctness. Co-Authored-By: Claude Opus 4.7 (1M context) --- t/036_unified_arrow_e2e.pl | 184 +++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 t/036_unified_arrow_e2e.pl diff --git a/t/036_unified_arrow_e2e.pl b/t/036_unified_arrow_e2e.pl new file mode 100644 index 0000000..5ef4ccd --- /dev/null +++ b/t/036_unified_arrow_e2e.pl @@ -0,0 +1,184 @@ +#!/usr/bin/env perl +# Test: end-to-end round-trip of the new OTelArrowExporter into events_raw. +# +# Closes the test gap that's existed since the Arrow path went live: nothing +# currently proves that pg_stat_ch's Arrow IPC output can actually be ingested +# by ClickHouse against the unified events_raw schema. t/026_arrow_dump asserts +# on the IPC schema shape via pyarrow but never pushes the bytes into CH. +# +# Flow: +# 1. Spin up a node with use_unified_arrow_exporter=on + +# debug_arrow_dump_dir set. The OTel endpoint points at a non-existent +# collector — the bgworker's gRPC send fails, but MaybeDumpArrowBatch +# fires BEFORE the send, so IPC files still land on disk regardless. +# 2. Run a known set of queries to populate the queue. +# 3. Wait for IPC files to appear in the dump dir. +# 4. TRUNCATE pg_stat_ch.events_raw. +# 5. For each IPC file: curl POST it to CH as +# INSERT INTO pg_stat_ch.events_raw FORMAT ArrowStream. +# 6. Assert: row count matches what we sent, column types are what +# events_raw declares (no silent string-to-Int coercion), known queries +# land with the right db_name/db_operation/query_text values. + +use strict; +use warnings; +use lib 't'; +use File::Temp qw(tempdir); +use File::Basename; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use psch; + +if (!psch_clickhouse_available()) { + plan skip_all => 'Docker not available, skipping end-to-end Arrow test'; +} + +my $ch_check = `curl -s 'http://localhost:18123/' --data 'SELECT 1' 2>/dev/null`; +if ($ch_check !~ /^1/) { + plan skip_all => 'ClickHouse container not running'; +} + +# Verify the events_raw schema is present (applied by the goose migration +# step in CI). If we hit an empty CH or a different schema, fail loudly +# rather than silently inserting into nothing. +my $schema_check = psch_query_clickhouse( + "SELECT count() FROM system.columns WHERE database = 'pg_stat_ch' AND table = 'events_raw'"); +chomp $schema_check; +if ($schema_check eq '' || $schema_check < 50) { + plan skip_all => "events_raw not initialized in CH (got $schema_check columns); " . + "run goose migrations against schema/migrations/ first"; +} + +# Pre-flight: ensure events_raw is empty so we don't measure leftover rows. +psch_query_clickhouse('TRUNCATE TABLE pg_stat_ch.events_raw'); + +my $dump_dir = tempdir('psch_unified_arrow_e2e_XXXX', TMPDIR => 1, CLEANUP => 1); + +my $node = PostgreSQL::Test::Cluster->new('unified_arrow_e2e'); +$node->init(); +$node->append_conf('postgresql.conf', qq{ +shared_preload_libraries = 'pg_stat_ch' +pg_stat_ch.enabled = on +pg_stat_ch.queue_capacity = 65536 +pg_stat_ch.flush_interval_ms = 100 +pg_stat_ch.batch_max = 100 +pg_stat_ch.use_otel = on +pg_stat_ch.otel_endpoint = 'localhost:14317' +pg_stat_ch.otel_arrow_passthrough = on +pg_stat_ch.use_unified_arrow_exporter = on +pg_stat_ch.debug_arrow_dump_dir = '$dump_dir' +pg_stat_ch.hostname = 'unified-arrow-e2e-host' +pg_stat_ch.extra_attributes = 'instance_ubid:test-instance;server_role:primary;region:test-region;cell:test-cell;read_replica_type:none' +}); +$node->start(); +$node->safe_psql('postgres', 'CREATE EXTENSION pg_stat_ch'); + +# ---------------------------------------------------------------------------- +# Run a deliberately-shaped workload so the assertions can pin specific rows. +# ---------------------------------------------------------------------------- +psch_reset_stats($node); + +$node->safe_psql('postgres', 'SELECT 42 AS marker_select'); +$node->safe_psql('postgres', 'CREATE TABLE unified_e2e_t (id int)'); +$node->safe_psql('postgres', 'INSERT INTO unified_e2e_t VALUES (1), (2), (3)'); +$node->safe_psql('postgres', 'SELECT count(*) FROM unified_e2e_t'); +$node->safe_psql('postgres', 'DROP TABLE unified_e2e_t'); + +# Force a flush so the bgworker drains the queue promptly. +$node->safe_psql('postgres', 'SELECT pg_stat_ch_flush()'); + +# ---------------------------------------------------------------------------- +# Wait for IPC files in the dump dir. The bgworker writes them out before +# the gRPC SendArrowBatch call, so we see them even though OTel send fails. +# ---------------------------------------------------------------------------- +my @ipc_files; +my $deadline = time() + 10; +while (time() < $deadline) { + @ipc_files = glob("$dump_dir/*.ipc"); + last if @ipc_files > 0; + select(undef, undef, undef, 0.2); +} + +cmp_ok(scalar @ipc_files, '>=', 1, "Unified exporter produced at least one IPC dump file"); + +# Each file is non-empty (ZSTD-compressed Arrow IPC has some envelope bytes +# even for zero rows; for our payload it should be hundreds of bytes). +for my $f (@ipc_files) { + cmp_ok(-s $f, '>', 0, basename($f) . " is non-empty"); +} + +# ---------------------------------------------------------------------------- +# Post each IPC file to CH as INSERT INTO ... FORMAT ArrowStream and verify +# CH accepts the wire format. A 200 response means ArrowStream parsed the +# IPC, found events_raw columns by name, and inserted at least one row. +# Any column type mismatch (e.g. if the producer regressed to writing +# query_id as String) would surface here as a 4xx with a clear error. +# ---------------------------------------------------------------------------- +my $insert_url = + 'http://localhost:18123/?query=' . + 'INSERT%20INTO%20pg_stat_ch.events_raw%20FORMAT%20ArrowStream'; + +my $total_posted = 0; +for my $f (@ipc_files) { + my $err_file = "$f.err"; + my $rc = system("curl -sS -X POST -H 'Content-Type: application/octet-stream' " . + "--data-binary \@$f '$insert_url' 2>$err_file >/dev/null"); + my $stderr = -s $err_file ? do { + open my $fh, '<', $err_file; local $/; <$fh> + } : ''; + is($rc, 0, basename($f) . " accepted by CH ArrowStream parser" . + ($stderr ? " (stderr: $stderr)" : "")); + ++$total_posted; +} +cmp_ok($total_posted, '>=', 1, "Posted at least one IPC file to CH"); + +# ---------------------------------------------------------------------------- +# Verify: rows landed, with the right typing, populated by our known workload. +# ---------------------------------------------------------------------------- +my $rowcount = psch_query_clickhouse('SELECT count() FROM pg_stat_ch.events_raw'); +chomp $rowcount; +cmp_ok($rowcount, '>=', 5, "events_raw has at least the 5 workload queries (got $rowcount)"); + +# Column types: pull system.columns and assert no surprises. If the new +# exporter regressed to the legacy string-encoded id columns, query_id / +# pid would come back as String here. +my $types = psch_query_clickhouse( + "SELECT name, type FROM system.columns " . + "WHERE database = 'pg_stat_ch' AND table = 'events_raw' " . + "AND name IN ('query_id', 'pid', 'err_elevel', " . + "'parallel_workers_planned', 'duration_us', 'shared_blks_hit') " . + "ORDER BY name FORMAT TSV"); + +# system.columns reflects the schema's declared types, not the inserted bytes. +# But a wire-format mismatch on insert would have failed the curl above +# already (CH refuses ArrowStream inserts when an Arrow column's logical +# type can't be cast to the target schema's column type). +like($types, qr/^duration_us\tUInt64\b/m, 'duration_us declared UInt64'); +like($types, qr/^err_elevel\tUInt8\b/m, 'err_elevel declared UInt8'); +like($types, qr/^parallel_workers_planned\tInt16\b/m, 'parallel_workers_planned declared Int16'); +like($types, qr/^pid\tInt32\b/m, 'pid declared Int32'); +like($types, qr/^query_id\tInt64\b/m, 'query_id declared Int64'); +like($types, qr/^shared_blks_hit\tInt64\b/m, 'shared_blks_hit declared Int64'); + +# Known-row assertions: pinpoint the SELECT 42 AS marker_select event. +my $marker = psch_query_clickhouse( + "SELECT db_name, db_operation, query_text FROM pg_stat_ch.events_raw " . + "WHERE query_text LIKE '%marker_select%' AND query_text NOT LIKE '%pg_stat_ch%' " . + "ORDER BY ts DESC LIMIT 1 FORMAT TSV"); +chomp $marker; +like($marker, qr/^postgres\tSELECT\t/, "marker SELECT landed with db_name=postgres, db_operation=SELECT"); +like($marker, qr/marker_select/, "marker SELECT preserved query_text"); + +# Envelope columns from extra_attributes were threaded through. +my $envelope = psch_query_clickhouse( + "SELECT DISTINCT instance_ubid, server_role, region, cell, read_replica_type " . + "FROM pg_stat_ch.events_raw WHERE instance_ubid != '' LIMIT 1 FORMAT TSV"); +chomp $envelope; +is($envelope, "test-instance\tprimary\ttest-region\ttest-cell\tnone", + "envelope columns populated from extra_attributes"); + +$node->stop(); +done_testing(); From 30188f95b24390e50f7f834a3d9badc7d10db173 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 18 Jun 2026 14:37:43 -0700 Subject: [PATCH 4/6] fix(exporter): address Cursor + Copilot review nits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three real findings worth addressing, plus a TODO for one that's worth flagging but bigger than this PR: src/export/otel_arrow_exporter.cc: * NumExported() now returns row_count_ instead of delegating to the inner OTelExporter. The inner exporter's exported_count_ accumulates across batches because we never call inner_->BeginBatch() (its per-record LogRecord state machine is unused on this path), and PschExportBatch fetch_add's NumExported() into shared stats once per successful commit — so the cumulative inner count would cause quadratic over-reporting. (Cursor Bugbot, medium.) * ExtraAttrs class comment said "last write wins on duplicate keys" but Get() does a linear scan from the front and returns the first match. Comment now reflects what the code actually does. Duplicate keys in pg_stat_ch.extra_attributes are a user error in practice; no behavior change. (Copilot.) * Added a TODO(memory-budget) at CommitBatch documenting the missing mid-batch flush against psch_otel_max_block_bytes. The legacy ExportEventsAsArrowInternal flushes when the builder exceeds 3 MiB; this exporter ships the whole batch in one IPC. Acceptable while the GUC defaults off, but the budget check has to land before the default flips. Plumbing it through requires either invalidating caller-held column shared_ptrs at the flush boundary or threading a per-row size hook through the StatsExporter interface. (Cursor Bugbot, high — disagreeing on severity for the shadow-rollout window but acknowledging the underlying gap.) * Comment note on MaybeDumpArrowBatch acknowledging the intentional duplication with stats_exporter.cc — both copies die when the legacy path retires, so extracting a shared helper now would be a header just to delete it later. (Copilot, push-back.) t/036_unified_arrow_e2e.pl: * curl now uses --fail-with-body so HTTP 4xx/5xx from CH surfaces as a non-zero exit at the assertion site. The downstream SELECT count() assertion would catch a real ingestion failure too, but the sharper error at the source is a strict improvement. (Copilot.) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/export/otel_arrow_exporter.cc | 29 ++++++++++++++++++++++++++--- t/036_unified_arrow_e2e.pl | 8 +++++++- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/src/export/otel_arrow_exporter.cc b/src/export/otel_arrow_exporter.cc index 90ad080..ffd71ef 100644 --- a/src/export/otel_arrow_exporter.cc +++ b/src/export/otel_arrow_exporter.cc @@ -54,6 +54,11 @@ constexpr int64_t kPostgresEpochOffsetUs = 946684800000000LL; // IPC batch to disk before shipping. Used by t/026_arrow_dump and the new // end-to-end test to assert on Arrow output without standing up an OTel // collector. +// +// Intentionally a near-duplicate of stats_exporter.cc:MaybeDumpArrowBatch. +// Both copies die when arrow_batch.cc is retired after the +// use_unified_arrow_exporter cutover; extracting a shared helper now would +// create a header just to delete it later. void MaybeDumpArrowBatch(const uint8_t* data, size_t len) { if (data == nullptr || len == 0 || psch_debug_arrow_dump_dir == nullptr || *psch_debug_arrow_dump_dir == '\0') { @@ -112,8 +117,8 @@ struct ArrowSlot { std::shared_ptr builder; }; -// Parse "key1:val1;key2:val2" into a flat list. Last write wins on duplicate -// keys. Empty input -> empty list. +// Parse "key1:val1;key2:val2" into a flat list. First match wins on +// duplicate keys (Get linear-scans from the front). Empty input -> empty list. class ExtraAttrs { public: explicit ExtraAttrs(const char* raw) { @@ -197,7 +202,14 @@ class OTelArrowExporter : public StatsExporter { bool IsConnected() const final { return inner_->IsConnected(); } int NumConsecutiveFailures() const final { return inner_->NumConsecutiveFailures(); } void ResetFailures() final { inner_->ResetFailures(); } - int NumExported() const final { return inner_->NumExported(); } + // Report rows from THIS batch, not the inner exporter's cumulative count. + // BeginBatch resets row_count_; the inner's exported_count_ accumulates + // across batches because we never call inner_->BeginBatch() (the inner's + // per-record LogRecord state machine is unused on this path). + // PschExportBatch reads NumExported() once per successful CommitBatch and + // fetch_adds it into shared stats, so a cumulative value here would cause + // quadratic over-reporting. + int NumExported() const final { return row_count_; } private: // -- Column wrappers --------------------------------------------------- @@ -431,6 +443,17 @@ bool OTelArrowExporter::CommitBatch() { return true; } + // TODO(memory-budget): the legacy ExportEventsAsArrowInternal flushes + // mid-batch when the Arrow builder's estimated bytes exceed + // psch_otel_max_block_bytes (default 3 MiB). This exporter ships the + // whole batch in one IPC, so a backlog up to psch_batch_max (default + // 200000) can produce an oversized payload that exceeds gRPC's 4 MiB + // wire cap or otelcol's 20 MiB HTTP body cap. Acceptable for the + // GUC-default-off shadow rollout, but the budget check needs to land + // before the GUC default flips. Plumbing it through requires either + // invalidating caller-held column shared_ptrs at the flush boundary + // or threading a per-row size hook through the StatsExporter interface. + std::vector> fields; fields.reserve(slots_.size()); std::vector> arrays; diff --git a/t/036_unified_arrow_e2e.pl b/t/036_unified_arrow_e2e.pl index 5ef4ccd..c0e9cf8 100644 --- a/t/036_unified_arrow_e2e.pl +++ b/t/036_unified_arrow_e2e.pl @@ -124,7 +124,13 @@ my $total_posted = 0; for my $f (@ipc_files) { my $err_file = "$f.err"; - my $rc = system("curl -sS -X POST -H 'Content-Type: application/octet-stream' " . + # --fail-with-body makes HTTP 4xx/5xx responses from CH surface as a + # non-zero curl exit code (default curl returns 0 on any completed + # HTTP transfer regardless of status). The subsequent SELECT count() + # assertion would catch ingestion failure too, but failing here gives + # a sharper error message at the point the bug actually happened. + my $rc = system("curl -sS --fail-with-body -X POST " . + "-H 'Content-Type: application/octet-stream' " . "--data-binary \@$f '$insert_url' 2>$err_file >/dev/null"); my $stderr = -s $err_file ? do { open my $fh, '<', $err_file; local $/; <$fh> From ae0334cc70d6177649e40e84688a01347ae6cec5 Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 18 Jun 2026 15:27:57 -0700 Subject: [PATCH 5/6] fix(guc): make use_unified_arrow_exporter PGC_POSTMASTER The bgworker constructs the exporter implementation once at init time based on this GUC, but PschExportBatch was re-reading it every cycle to choose between the legacy ExportEventsAsArrow bypass and the new unified ExportEventStats path. Under PGC_SIGHUP, the two reads can disagree: - Init=on, runtime=off: dispatcher takes the legacy bypass and calls SendArrowBatch on an OTelArrowExporter, which doesn't override it, so every batch silently drops. - Init=off, runtime=on: dispatcher runs the unified path through a plain OTelExporter, which ships OTLP log records instead of Arrow IPC. events_raw never sees the data. Flag is a producer-shape feature toggle, not an operational tunable; flipping it should be a deliberate restart. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/config/guc.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/config/guc.c b/src/config/guc.c index 8aaa5b1..46f9641 100644 --- a/src/config/guc.c +++ b/src/config/guc.c @@ -364,10 +364,13 @@ void PschInitGuc(void) { "builds Arrow IPC via the typed StatsExporter column interface, writing the " "events_raw schema (typed integer ids, no sprintf decimal-string encoding). " "When off, the legacy arrow_batch.cc path runs and produces the query_logs_arrow " - "wire shape. Default off; flip on to opt a producer into the new exporter.", + "wire shape. Default off; flip on to opt a producer into the new exporter. " + "PGC_POSTMASTER: the bgworker picks the exporter implementation at init time, " + "so a runtime change would mismatch the per-cycle dispatcher (Arrow batches " + "would silently drop, since OTelArrowExporter does not implement SendArrowBatch).", &psch_use_unified_arrow_exporter, false, - PGC_SIGHUP, + PGC_POSTMASTER, 0, NULL, NULL, NULL); From 34d6b49a521fcea28c4dc97082139f34d8e6dd4d Mon Sep 17 00:00:00 2001 From: Josh Ventura Date: Thu, 18 Jun 2026 16:26:24 -0700 Subject: [PATCH 6/6] feat(exporter): mid-batch flush at row boundary against psch_otel_max_block_bytes OTelArrowExporter accumulated rows into a single Arrow IPC up to psch_batch_max (default 200000) with no byte ceiling, so a queue backlog could produce a payload exceeding gRPC's 4 MiB wire cap or otelcol's HTTP body cap. Closes the gap relative to ExportEventsAsArrowInternal which flushes mid-batch when the builder's estimated bytes cross psch_otel_max_block_bytes. Column wrappers each take a pointer to bytes_estimate_ and bump it per Append (sizeof for fixed-width, value bytes + 4 for var-length offsets). BeginRow samples bytes_estimate_ at the row boundary; if it has crossed max_block_bytes_ and at least one row is already accumulated, Flush() runs, ships the chunk, and resets per-flush state. Arrow ArrayBuilder::Finish leaves builders empty and reusable, so subsequent chunks share the same slot vector + column shared_ptrs without any re-registration. NumExported now reports exported_in_batch_ (cumulative across all chunks in the active batch) instead of the per-chunk row_count_, otherwise mid-batch flushes would erase the dispatcher's view of work done. A sticky batch_failed_ flag poisons CommitBatch after any mid-batch Flush failure so partial batches never count as a success. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/export/otel_arrow_exporter.cc | 139 ++++++++++++++++++++++-------- 1 file changed, 105 insertions(+), 34 deletions(-) diff --git a/src/export/otel_arrow_exporter.cc b/src/export/otel_arrow_exporter.cc index ffd71ef..38400b1 100644 --- a/src/export/otel_arrow_exporter.cc +++ b/src/export/otel_arrow_exporter.cc @@ -202,99 +202,127 @@ class OTelArrowExporter : public StatsExporter { bool IsConnected() const final { return inner_->IsConnected(); } int NumConsecutiveFailures() const final { return inner_->NumConsecutiveFailures(); } void ResetFailures() final { inner_->ResetFailures(); } - // Report rows from THIS batch, not the inner exporter's cumulative count. - // BeginBatch resets row_count_; the inner's exported_count_ accumulates - // across batches because we never call inner_->BeginBatch() (the inner's - // per-record LogRecord state machine is unused on this path). - // PschExportBatch reads NumExported() once per successful CommitBatch and - // fetch_adds it into shared stats, so a cumulative value here would cause - // quadratic over-reporting. - int NumExported() const final { return row_count_; } + // Report rows from THIS batch (sum across mid-batch flushes + the final + // flush), not the inner exporter's cumulative count. + // BeginBatch resets exported_in_batch_; each successful Flush adds the + // chunk's row count. The inner's exported_count_ accumulates across + // batches because we never call inner_->BeginBatch() (the inner's per-record + // LogRecord state machine is unused on this path). PschExportBatch reads + // NumExported() once per successful CommitBatch and fetch_adds it into + // shared stats, so a cumulative value here would cause quadratic + // over-reporting. + int NumExported() const final { return exported_in_batch_; } private: // -- Column wrappers --------------------------------------------------- // Nested so they can inherit from StatsExporter::Column (protected in // the base; visible to derived classes and their nested types). Each one - // Append-forwards to a single typed builder; Crunch() is unused on this - // path — builders are finished together in CommitBatch when we have all - // slots in hand. + // Append-forwards to a single typed builder and bumps the exporter's + // bytes_estimate_ for mid-batch flush bookkeeping. Crunch() is unused on + // this path — builders are finished together in Flush() (invoked + // mid-batch when bytes_estimate_ crosses max_block_bytes_, and once at + // CommitBatch for the residual chunk). + + // Variable-length columns over-estimate offset overhead at 4 bytes/value + // (32-bit offsets), ignoring the dictionary's shared backing store. For LC + // columns this overshoots when values repeat — fine; bytes_estimate_ is a + // budget threshold, not a wire-size predictor, and over-counting just + // means flushing slightly earlier than strictly necessary. + static constexpr size_t kVarLenOffsetBytes = 4; template class ArrowNumColumn : public Column { public: - explicit ArrowNumColumn(BuilderT* builder) : builder_(builder) {} + ArrowNumColumn(BuilderT* builder, size_t* bytes) : builder_(builder), bytes_(bytes) {} void Append(const T& v) final { const arrow::Status s = builder_->Append(static_cast(v)); if (!s.ok()) { LogArrowFailure("Arrow numeric append", s); + return; } + *bytes_ += sizeof(typename BuilderT::value_type); } void Crunch() final {} private: BuilderT* const builder_; + size_t* const bytes_; }; class ArrowDictStrColumn : public Column { public: - explicit ArrowDictStrColumn(DictBuilder* builder) : builder_(builder) {} + ArrowDictStrColumn(DictBuilder* builder, size_t* bytes) : builder_(builder), bytes_(bytes) {} void Append(const std::string& v) final { const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); if (!s.ok()) { LogArrowFailure("Arrow dict str append", s); + return; } + *bytes_ += v.size() + kVarLenOffsetBytes; } void Crunch() final {} private: DictBuilder* const builder_; + size_t* const bytes_; }; class ArrowDictSvColumn : public Column { public: - explicit ArrowDictSvColumn(DictBuilder* builder) : builder_(builder) {} + ArrowDictSvColumn(DictBuilder* builder, size_t* bytes) : builder_(builder), bytes_(bytes) {} void Append(const std::string_view& v) final { const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); if (!s.ok()) { LogArrowFailure("Arrow dict sv append", s); + return; } + *bytes_ += v.size() + kVarLenOffsetBytes; } void Crunch() final {} private: DictBuilder* const builder_; + size_t* const bytes_; }; class ArrowUtf8SvColumn : public Column { public: - explicit ArrowUtf8SvColumn(arrow::StringBuilder* builder) : builder_(builder) {} + ArrowUtf8SvColumn(arrow::StringBuilder* builder, size_t* bytes) + : builder_(builder), bytes_(bytes) {} void Append(const std::string_view& v) final { const arrow::Status s = builder_->Append(v.data(), static_cast(v.size())); if (!s.ok()) { LogArrowFailure("Arrow utf8 sv append", s); + return; } + *bytes_ += v.size() + kVarLenOffsetBytes; } void Crunch() final {} private: arrow::StringBuilder* const builder_; + size_t* const bytes_; }; class ArrowTimestampColumn : public Column { public: - explicit ArrowTimestampColumn(arrow::TimestampBuilder* builder) : builder_(builder) {} + ArrowTimestampColumn(arrow::TimestampBuilder* builder, size_t* bytes) + : builder_(builder), bytes_(bytes) {} // Caller passes Unix microseconds; the column is MICRO-precision, so // append directly with no conversion. void Append(const int64_t& v) final { const arrow::Status s = builder_->Append(v); if (!s.ok()) { LogArrowFailure("Arrow timestamp append", s); + return; } + *bytes_ += sizeof(int64_t); } void Crunch() final {} private: arrow::TimestampBuilder* const builder_; + size_t* const bytes_; }; // Schema-side helpers: create a builder, register a slot, return a wrapper. @@ -304,28 +332,28 @@ class OTelArrowExporter : public StatsExporter { auto* raw = builder.get(); slots_.push_back( {std::string(name), arrow::field(std::string(name), std::move(dtype)), std::move(builder)}); - return std::make_shared>(raw); + return std::make_shared>(raw, &bytes_estimate_); } shared_ptr> MakeDictStr(string_view name) { auto builder = std::make_shared(); auto* raw = builder.get(); slots_.push_back( {std::string(name), arrow::field(std::string(name), DictUtf8Type()), std::move(builder)}); - return std::make_shared(raw); + return std::make_shared(raw, &bytes_estimate_); } shared_ptr> MakeDictSv(string_view name) { auto builder = std::make_shared(); auto* raw = builder.get(); slots_.push_back( {std::string(name), arrow::field(std::string(name), DictUtf8Type()), std::move(builder)}); - return std::make_shared(raw); + return std::make_shared(raw, &bytes_estimate_); } shared_ptr> MakeUtf8Sv(string_view name) { auto builder = std::make_shared(); auto* raw = builder.get(); slots_.push_back( {std::string(name), arrow::field(std::string(name), arrow::utf8()), std::move(builder)}); - return std::make_shared(raw); + return std::make_shared(raw, &bytes_estimate_); } shared_ptr> MakeTimestamp(string_view name) { auto builder = @@ -333,9 +361,15 @@ class OTelArrowExporter : public StatsExporter { auto* raw = builder.get(); slots_.push_back( {std::string(name), arrow::field(std::string(name), TimestampType()), std::move(builder)}); - return std::make_shared(raw); + return std::make_shared(raw, &bytes_estimate_); } + // Builds the IPC stream from the current builder state, ships it via the + // inner exporter, and resets per-flush counters. After successful + // ArrayBuilder::Finish each builder is reset to empty (Arrow contract), so + // mid-batch chunks reuse the same slot vector and column wrappers transparently. + bool Flush(); + // ---------------------------------------------------------------- // Columns the events_raw schema declares that the StatsExporter caller // does not explicitly populate. The exporter synthesizes them on @@ -357,7 +391,20 @@ class OTelArrowExporter : public StatsExporter { std::unique_ptr inner_; std::vector slots_; + // Rows accumulated in the current chunk (resets after each Flush). The + // total rows exported across all chunks in the active batch live in + // exported_in_batch_ below. int row_count_ = 0; + int exported_in_batch_ = 0; + // Conservative running estimate of how many bytes the current chunk would + // produce. Bumped by each column wrapper's Append; sampled at row boundary + // to decide whether to flush mid-batch. + size_t bytes_estimate_ = 0; + size_t max_block_bytes_ = 0; + // Sticky: any Flush failure inside the batch poisons CommitBatch so the + // dispatcher doesn't see partial rows charged twice or a half-shipped batch + // counted as success. + bool batch_failed_ = false; // Synthesized columns (populated implicitly in BeginRow). shared_ptr> inst_ubid_; @@ -420,10 +467,28 @@ void OTelArrowExporter::RegisterEnvelopeColumns() { void OTelArrowExporter::BeginBatch() { slots_.clear(); row_count_ = 0; + exported_in_batch_ = 0; + bytes_estimate_ = 0; + batch_failed_ = false; + // Match ExportEventsAsArrowInternal's floor so a misconfigured tiny GUC + // value can't degrade into a per-row flush loop. + max_block_bytes_ = std::max(65536, static_cast(psch_otel_max_block_bytes)); RegisterEnvelopeColumns(); } void OTelArrowExporter::BeginRow() { + if (batch_failed_) { + return; + } + // Mid-batch flush at the row boundary: only safe between rows (column + // builders are row-aligned), and only meaningful once at least one row is + // already accumulated. + if (row_count_ > 0 && bytes_estimate_ >= max_block_bytes_) { + if (!Flush()) { + batch_failed_ = true; + return; + } + } ++row_count_; // Synthesized columns fire here so the call site doesn't need to know // about them. @@ -439,21 +504,16 @@ void OTelArrowExporter::BeginRow() { } bool OTelArrowExporter::CommitBatch() { + if (batch_failed_) { + return false; + } if (slots_.empty() || row_count_ == 0) { return true; } + return Flush(); +} - // TODO(memory-budget): the legacy ExportEventsAsArrowInternal flushes - // mid-batch when the Arrow builder's estimated bytes exceed - // psch_otel_max_block_bytes (default 3 MiB). This exporter ships the - // whole batch in one IPC, so a backlog up to psch_batch_max (default - // 200000) can produce an oversized payload that exceeds gRPC's 4 MiB - // wire cap or otelcol's 20 MiB HTTP body cap. Acceptable for the - // GUC-default-off shadow rollout, but the budget check needs to land - // before the GUC default flips. Plumbing it through requires either - // invalidating caller-held column shared_ptrs at the flush boundary - // or threading a per-row size hook through the StatsExporter interface. - +bool OTelArrowExporter::Flush() { std::vector> fields; fields.reserve(slots_.size()); std::vector> arrays; @@ -463,7 +523,10 @@ bool OTelArrowExporter::CommitBatch() { fields.push_back(slot.field); std::shared_ptr array; // arrow::ArrayBuilder::Finish is virtual; DictBuilder's override returns - // a DictionaryArray downcast to Array. No RTTI needed. + // a DictionaryArray downcast to Array. The call also resets the builder + // back to an empty state, so the next chunk in this same batch can + // reuse it without re-registering slots or invalidating the column + // shared_ptrs the dispatcher is still holding. const arrow::Status status = slot.builder->Finish(&array); if (!status.ok()) { LogArrowFailure(("Arrow finish " + slot.name).c_str(), status); @@ -514,7 +577,15 @@ bool OTelArrowExporter::CommitBatch() { const auto buf_len = static_cast(buf->size()); MaybeDumpArrowBatch(buf->data(), buf_len); - return inner_->SendArrowBatch(buf->data(), buf_len, row_count_); + if (!inner_->SendArrowBatch(buf->data(), buf_len, row_count_)) { + return false; + } + // Builders are already reset by Finish above; clear our row-side state so + // continued appends from the dispatcher start fresh. + exported_in_batch_ += row_count_; + row_count_ = 0; + bytes_estimate_ = 0; + return true; } } // namespace