From a974b89ec4163db6754d985587e023723e970c44 Mon Sep 17 00:00:00 2001 From: Amog Iska Date: Sun, 7 Jun 2026 21:33:36 -0700 Subject: [PATCH 1/4] feat(arrow): add read_replica resource attribute column --- src/config/guc.c | 2 +- src/export/arrow_batch.cc | 8 +++++++- t/026_arrow_dump.pl | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/config/guc.c b/src/config/guc.c index 53a9d8f..e21a86d 100644 --- a/src/config/guc.c +++ b/src/config/guc.c @@ -373,7 +373,7 @@ void PschInitGuc(void) { "pg_stat_ch.extra_attributes", "Key-value pairs appended to exported Arrow batches.", "Semicolon-separated k:v pairs for resource columns: " - "'instance_ubid:abc;server_role:primary;region:us-east-1'.", + "'instance_ubid:abc;server_role:primary;read_replica:false;region:us-east-1'.", &psch_extra_attributes, "", PGC_SIGHUP, diff --git a/src/export/arrow_batch.cc b/src/export/arrow_batch.cc index 75d4ebf..02a9df4 100644 --- a/src/export/arrow_batch.cc +++ b/src/export/arrow_batch.cc @@ -193,6 +193,7 @@ struct ArrowBatchBuilder::Impl { arrow::StringBuilder instance_ubid_builder; arrow::StringBuilder server_ubid_builder; DictBuilder server_role_builder; + DictBuilder read_replica_builder; DictBuilder region_builder; DictBuilder cell_builder; DictBuilder service_version_builder; @@ -260,6 +261,7 @@ struct ArrowBatchBuilder::Impl { arrow::field("instance_ubid", arrow::utf8()), arrow::field("server_ubid", arrow::utf8()), arrow::field("server_role", DictionaryUtf8Type()), + arrow::field("read_replica", DictionaryUtf8Type()), arrow::field("region", DictionaryUtf8Type()), arrow::field("cell", DictionaryUtf8Type()), arrow::field("service_version", DictionaryUtf8Type()), @@ -424,6 +426,7 @@ struct ArrowBatchBuilder::Impl { "Arrow instance_ubid append") || !AppendString(&server_ubid_builder, ExtraAttr("server_ubid"), "Arrow server_ubid append") || !AppendString(&server_role_builder, ExtraAttr("server_role"), "Arrow server_role append") || + !AppendString(&read_replica_builder, ExtraAttr("read_replica"), "Arrow read_replica append") || !AppendString(®ion_builder, ExtraAttr("region"), "Arrow region append") || !AppendString(&cell_builder, ExtraAttr("cell"), "Arrow cell append") || !AppendString(&service_version_builder, service_version, "Arrow service_version append") || @@ -436,7 +439,8 @@ struct ArrowBatchBuilder::Impl { kFixedBytesPerRow + db_name.size() + db_user.size() + app.size() + client_addr.size() + query_text.size() + err_message.size() + err_sqlstate.size() + service_version.size() + ExtraAttr("instance_ubid").size() + ExtraAttr("server_ubid").size() + - ExtraAttr("server_role").size() + ExtraAttr("region").size() + ExtraAttr("cell").size() + + ExtraAttr("server_role").size() + ExtraAttr("read_replica").size() + + ExtraAttr("region").size() + ExtraAttr("cell").size() + ExtraAttr("host_id").size() + ExtraAttr("pod_name").size(); ++num_rows; return true; @@ -519,6 +523,7 @@ struct ArrowBatchBuilder::Impl { !add_array(&instance_ubid_builder, "Arrow instance_ubid finish") || !add_array(&server_ubid_builder, "Arrow server_ubid finish") || !add_dict_array(&server_role_builder, "Arrow server_role finish") || + !add_dict_array(&read_replica_builder, "Arrow read_replica finish") || !add_dict_array(®ion_builder, "Arrow region finish") || !add_dict_array(&cell_builder, "Arrow cell finish") || !add_dict_array(&service_version_builder, "Arrow service_version finish") || @@ -625,6 +630,7 @@ struct ArrowBatchBuilder::Impl { instance_ubid_builder.Reset(); server_ubid_builder.Reset(); server_role_builder.ResetFull(); + read_replica_builder.ResetFull(); region_builder.ResetFull(); cell_builder.ResetFull(); service_version_builder.ResetFull(); diff --git a/t/026_arrow_dump.pl b/t/026_arrow_dump.pl index 0d842e7..37b56c6 100644 --- a/t/026_arrow_dump.pl +++ b/t/026_arrow_dump.pl @@ -186,7 +186,7 @@ 'duration_us', 'rows', 'pid', 'query_id', 'shared_blks_hit', 'shared_blks_read', 'wal_records', 'wal_bytes', - 'service_version', 'region', + 'service_version', 'region', 'read_replica', ] missing = [c for c in expected if schema.get_field_index(c) == -1] if missing: From 4d69146ddd1eb99577d2307645795d67fb075938 Mon Sep 17 00:00:00 2001 From: Amog Iska Date: Mon, 8 Jun 2026 08:15:05 -0700 Subject: [PATCH 2/4] test(arrow): assert read_replica is dictionary-encoded --- t/026_arrow_dump.pl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/t/026_arrow_dump.pl b/t/026_arrow_dump.pl index 37b56c6..56af0e6 100644 --- a/t/026_arrow_dump.pl +++ b/t/026_arrow_dump.pl @@ -203,6 +203,11 @@ rows_field = schema.field('rows') assert rows_field.type == pa.uint64(), f"rows type wrong: {rows_field.type}" +# read_replica is a dictionary-encoded column (like server_role/region), not plain utf8. +rr_field = schema.field('read_replica') +assert pa.types.is_dictionary(rr_field.type), f"read_replica not dict-encoded: {rr_field.type}" +assert rr_field.type.value_type == pa.utf8(), f"read_replica dict value type wrong: {rr_field.type.value_type}" + print(f"OK:fields={len(schema)},rows={total_rows}") PYEOF From 0205364ba295a360ca148bf7e5b523f7162bd86b Mon Sep 17 00:00:00 2001 From: Amog Iska Date: Mon, 8 Jun 2026 08:27:57 -0700 Subject: [PATCH 3/4] chore(arrow): clang-format read_replica additions --- src/export/arrow_batch.cc | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/export/arrow_batch.cc b/src/export/arrow_batch.cc index 02a9df4..1923f3a 100644 --- a/src/export/arrow_batch.cc +++ b/src/export/arrow_batch.cc @@ -426,7 +426,8 @@ struct ArrowBatchBuilder::Impl { "Arrow instance_ubid append") || !AppendString(&server_ubid_builder, ExtraAttr("server_ubid"), "Arrow server_ubid append") || !AppendString(&server_role_builder, ExtraAttr("server_role"), "Arrow server_role append") || - !AppendString(&read_replica_builder, ExtraAttr("read_replica"), "Arrow read_replica append") || + !AppendString(&read_replica_builder, ExtraAttr("read_replica"), + "Arrow read_replica append") || !AppendString(®ion_builder, ExtraAttr("region"), "Arrow region append") || !AppendString(&cell_builder, ExtraAttr("cell"), "Arrow cell append") || !AppendString(&service_version_builder, service_version, "Arrow service_version append") || @@ -435,13 +436,13 @@ struct ArrowBatchBuilder::Impl { return false; } - estimated_bytes += - kFixedBytesPerRow + db_name.size() + db_user.size() + app.size() + client_addr.size() + - query_text.size() + err_message.size() + err_sqlstate.size() + service_version.size() + - ExtraAttr("instance_ubid").size() + ExtraAttr("server_ubid").size() + - ExtraAttr("server_role").size() + ExtraAttr("read_replica").size() + - ExtraAttr("region").size() + ExtraAttr("cell").size() + - ExtraAttr("host_id").size() + ExtraAttr("pod_name").size(); + estimated_bytes += kFixedBytesPerRow + db_name.size() + db_user.size() + app.size() + + client_addr.size() + query_text.size() + err_message.size() + + err_sqlstate.size() + service_version.size() + + ExtraAttr("instance_ubid").size() + ExtraAttr("server_ubid").size() + + ExtraAttr("server_role").size() + ExtraAttr("read_replica").size() + + ExtraAttr("region").size() + ExtraAttr("cell").size() + + ExtraAttr("host_id").size() + ExtraAttr("pod_name").size(); ++num_rows; return true; } From 9c4969ea43d4ce5dceb3bdfcfb24ac871a82bb69 Mon Sep 17 00:00:00 2001 From: Amog Iska Date: Mon, 8 Jun 2026 09:36:43 -0700 Subject: [PATCH 4/4] refactor(arrow): rename read_replica column to read_replica_type enum --- src/config/guc.c | 2 +- src/export/arrow_batch.cc | 14 +++++++------- t/026_arrow_dump.pl | 10 +++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/config/guc.c b/src/config/guc.c index e21a86d..97b5473 100644 --- a/src/config/guc.c +++ b/src/config/guc.c @@ -373,7 +373,7 @@ void PschInitGuc(void) { "pg_stat_ch.extra_attributes", "Key-value pairs appended to exported Arrow batches.", "Semicolon-separated k:v pairs for resource columns: " - "'instance_ubid:abc;server_role:primary;read_replica:false;region:us-east-1'.", + "'instance_ubid:abc;server_role:primary;read_replica_type:regional;region:us-east-1'.", &psch_extra_attributes, "", PGC_SIGHUP, diff --git a/src/export/arrow_batch.cc b/src/export/arrow_batch.cc index 1923f3a..63bb650 100644 --- a/src/export/arrow_batch.cc +++ b/src/export/arrow_batch.cc @@ -193,7 +193,7 @@ struct ArrowBatchBuilder::Impl { arrow::StringBuilder instance_ubid_builder; arrow::StringBuilder server_ubid_builder; DictBuilder server_role_builder; - DictBuilder read_replica_builder; + DictBuilder read_replica_type_builder; DictBuilder region_builder; DictBuilder cell_builder; DictBuilder service_version_builder; @@ -261,7 +261,7 @@ struct ArrowBatchBuilder::Impl { arrow::field("instance_ubid", arrow::utf8()), arrow::field("server_ubid", arrow::utf8()), arrow::field("server_role", DictionaryUtf8Type()), - arrow::field("read_replica", DictionaryUtf8Type()), + arrow::field("read_replica_type", DictionaryUtf8Type()), arrow::field("region", DictionaryUtf8Type()), arrow::field("cell", DictionaryUtf8Type()), arrow::field("service_version", DictionaryUtf8Type()), @@ -426,8 +426,8 @@ struct ArrowBatchBuilder::Impl { "Arrow instance_ubid append") || !AppendString(&server_ubid_builder, ExtraAttr("server_ubid"), "Arrow server_ubid append") || !AppendString(&server_role_builder, ExtraAttr("server_role"), "Arrow server_role append") || - !AppendString(&read_replica_builder, ExtraAttr("read_replica"), - "Arrow read_replica append") || + !AppendString(&read_replica_type_builder, ExtraAttr("read_replica_type"), + "Arrow read_replica_type append") || !AppendString(®ion_builder, ExtraAttr("region"), "Arrow region append") || !AppendString(&cell_builder, ExtraAttr("cell"), "Arrow cell append") || !AppendString(&service_version_builder, service_version, "Arrow service_version append") || @@ -440,7 +440,7 @@ struct ArrowBatchBuilder::Impl { client_addr.size() + query_text.size() + err_message.size() + err_sqlstate.size() + service_version.size() + ExtraAttr("instance_ubid").size() + ExtraAttr("server_ubid").size() + - ExtraAttr("server_role").size() + ExtraAttr("read_replica").size() + + ExtraAttr("server_role").size() + ExtraAttr("read_replica_type").size() + ExtraAttr("region").size() + ExtraAttr("cell").size() + ExtraAttr("host_id").size() + ExtraAttr("pod_name").size(); ++num_rows; @@ -524,7 +524,7 @@ struct ArrowBatchBuilder::Impl { !add_array(&instance_ubid_builder, "Arrow instance_ubid finish") || !add_array(&server_ubid_builder, "Arrow server_ubid finish") || !add_dict_array(&server_role_builder, "Arrow server_role finish") || - !add_dict_array(&read_replica_builder, "Arrow read_replica finish") || + !add_dict_array(&read_replica_type_builder, "Arrow read_replica_type finish") || !add_dict_array(®ion_builder, "Arrow region finish") || !add_dict_array(&cell_builder, "Arrow cell finish") || !add_dict_array(&service_version_builder, "Arrow service_version finish") || @@ -631,7 +631,7 @@ struct ArrowBatchBuilder::Impl { instance_ubid_builder.Reset(); server_ubid_builder.Reset(); server_role_builder.ResetFull(); - read_replica_builder.ResetFull(); + read_replica_type_builder.ResetFull(); region_builder.ResetFull(); cell_builder.ResetFull(); service_version_builder.ResetFull(); diff --git a/t/026_arrow_dump.pl b/t/026_arrow_dump.pl index 56af0e6..ca0083d 100644 --- a/t/026_arrow_dump.pl +++ b/t/026_arrow_dump.pl @@ -186,7 +186,7 @@ 'duration_us', 'rows', 'pid', 'query_id', 'shared_blks_hit', 'shared_blks_read', 'wal_records', 'wal_bytes', - 'service_version', 'region', 'read_replica', + 'service_version', 'region', 'read_replica_type', ] missing = [c for c in expected if schema.get_field_index(c) == -1] if missing: @@ -203,10 +203,10 @@ rows_field = schema.field('rows') assert rows_field.type == pa.uint64(), f"rows type wrong: {rows_field.type}" -# read_replica is a dictionary-encoded column (like server_role/region), not plain utf8. -rr_field = schema.field('read_replica') -assert pa.types.is_dictionary(rr_field.type), f"read_replica not dict-encoded: {rr_field.type}" -assert rr_field.type.value_type == pa.utf8(), f"read_replica dict value type wrong: {rr_field.type.value_type}" +# read_replica_type is a dictionary-encoded column (like server_role/region), not plain utf8. +rr_field = schema.field('read_replica_type') +assert pa.types.is_dictionary(rr_field.type), f"read_replica_type not dict-encoded: {rr_field.type}" +assert rr_field.type.value_type == pa.utf8(), f"read_replica_type dict value type wrong: {rr_field.type.value_type}" print(f"OK:fields={len(schema)},rows={total_rows}") PYEOF