From fd028a62365cd9bd2116c4e942d1b682c55194aa Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Fri, 18 Jul 2025 10:08:37 -0500 Subject: [PATCH 01/26] Add initial logical replication handling --- spec/pg/replication_spec.cr | 110 +++++++++++ src/pg.cr | 4 + src/pg/connection.cr | 10 +- src/pg/replication.cr | 353 ++++++++++++++++++++++++++++++++++++ src/pq/connection.cr | 53 +++--- src/pq/conninfo.cr | 10 +- 6 files changed, 510 insertions(+), 30 deletions(-) create mode 100644 spec/pg/replication_spec.cr create mode 100644 src/pg/replication.cr diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr new file mode 100644 index 00000000..533c7819 --- /dev/null +++ b/spec/pg/replication_spec.cr @@ -0,0 +1,110 @@ +require "../spec_helper" + +struct TestMessageHandler + include PG::Replication::Handler + + def received(msg : PG::Replication::Begin) + pp begin: msg + end + + def received(msg : PG::Replication::Message) + pp message: msg + end + + def received(msg : PG::Replication::Commit) + pp commit: msg + end + + def received(msg : PG::Replication::Origin) + end + + def received(msg : PG::Replication::Relation) + pp relation: msg + end + + def received(msg : PG::Replication::Type) + end + + def received(msg : PG::Replication::Insert) + pp insert: msg + end + + def received(msg : PG::Replication::Update) + pp update: msg + end + + def received(msg : PG::Replication::Delete) + pp delete: msg + end + + def received(msg : PG::Replication::Truncate) + end + + def received(msg : PG::Replication::StreamStart) + end + + def received(msg : PG::Replication::StreamStop) + end + + def received(msg : PG::Replication::StreamCommit) + end + + def received(msg : PG::Replication::StreamAbort) + end + + def received(msg : PG::Replication::BeginPrepare) + end + + def received(msg : PG::Replication::Prepare) + end + + def received(msg : PG::Replication::CommitPrepared) + end + + def received(msg : PG::Replication::RollbackPrepared) + end + + def received(msg : PG::Replication::StreamPrepare) + end + + def received(msg : PG::Replication::TupleData) + end +end + +describe PG::Replication do + it "consumes the WAL" do + publication_name = "test_publication_#{Random::Secure.hex}" + slot_name = "test_slot_#{Random::Secure.hex}" + table_name = "test_table_#{Random::Secure.hex}" + handler = TestMessageHandler.new + PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES" + PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name + subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name) + sleep 100.milliseconds + PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" + PG_DB.exec <<-SQL + CREATE TABLE #{table_name}( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + string TEXT, + number INT8 + ) + SQL + id = PG_DB.query_one <<-SQL, as: UUID + INSERT INTO #{table_name} (string, number) + VALUES ('foo', 1) + RETURNING id + SQL + + PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id + PG_DB.exec "UPDATE #{table_name} SET number = 2 WHERE id = $1", id + + sleep 100.milliseconds + + pp handler + ensure + subscriber.try &.close + PG_DB.exec "SELECT pg_drop_replication_slot($1)", slot_name + PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" + PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" + end +end diff --git a/src/pg.cr b/src/pg.cr index 112b5d59..812d4621 100644 --- a/src/pg.cr +++ b/src/pg.cr @@ -31,6 +31,10 @@ module PG ListenConnection.new(url, channels, blocking, &blk) end + def self.connect_replication(url, *, handler, publication_name, slot_name) + Replication::Connection.new(url, handler, publication_name: publication_name, slot_name: slot_name) + end + class ListenConnection @conn : PG::Connection diff --git a/src/pg/connection.cr b/src/pg/connection.cr index 1f22202a..44654331 100644 --- a/src/pg/connection.cr +++ b/src/pg/connection.cr @@ -17,7 +17,7 @@ module PG super(options) begin - @connection.connect + @connection.connect(replication: @connection.conninfo.replication) rescue ex raise DB::ConnectionRefused.new(cause: ex) end @@ -95,6 +95,14 @@ module PG end end + protected def listen_replication(publication_name : String, slot_name : String, blocking : Bool = false, &block : Replication::Frame ->) + if blocking + @connection.start_replication_frame_loop(publication_name, slot_name, &block) + else + spawn { @connection.start_replication_frame_loop(publication_name, slot_name, &block) } + end + end + def version vers = connection.server_parameters["server_version"].partition(' ').first.split('.').map(&.to_i) {major: vers[0], minor: vers[1], patch: vers[2]? || 0} diff --git a/src/pg/replication.cr b/src/pg/replication.cr new file mode 100644 index 00000000..e8043abd --- /dev/null +++ b/src/pg/replication.cr @@ -0,0 +1,353 @@ +module PG::Replication + module Handler + def received(frame) + end + end + + class Connection + getter handler : Handler + getter publication_name : String + getter slot_name : String + + def initialize(uri : URI | String, @handler, *, @publication_name, @slot_name, blocking : Bool = false) + if uri.is_a? String + uri = URI.parse(uri) + else + uri = uri.dup + end + query_params = uri.query_params + query_params["replication"] = "database" + uri.query_params = query_params + @conn = DB.connect(uri).as(PG::Connection) + @conn.listen_replication( + publication_name: publication_name, + slot_name: slot_name, + blocking: blocking, + ) do |frame| + received frame + end + end + + def received(frame : CopyBoth) + end + + def received(frame : CopyData) + received frame.data + end + + def received(data : XLogData) + handler.received data.message + end + + def received(keepalive : KeepAlive) + if keepalive.response_expected? + # TODO: Actually respond + end + end + + def close + @conn.close + end + end + + abstract struct Frame + def self.from_io(io : IO) : self + case byte = io.read_byte + when nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'W' + CopyBoth.new(io) + when 'd' + CopyData.new(io) + else + raise Error.new("Unexpected byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end + + struct CopyBoth < Frame + getter format : Format + getter column_formats : Array(Int16) + + def initialize(io : IO) + size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) + sized = IO::Sized.new(io, size - 4) + @format = Format.new(sized.read_bytes(Int8, IO::ByteFormat::NetworkEndian)) + column_count = sized.read_bytes(Int16, IO::ByteFormat::NetworkEndian) + @column_formats = Array.new(column_count) do + sized.read_bytes(Int16, IO::ByteFormat::NetworkEndian) + end + sized.close + end + + enum Format : Int8 + Text = 0 + Binary = 1 + end + end + + struct CopyData < Frame + getter data : XLogData | KeepAlive + + def initialize(io : IO) + size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) + case byte = io.read_byte + when Nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'w' + @data = XLogData.new(io) + when 'k' + @data = KeepAlive.new(io) + else + raise Error.new("Unexpected CopyData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end + + struct XLogData + getter start : Int64 + getter end : Int64 + getter timestamp : Time + getter message : WALMessage + + def initialize(io : IO) + @start = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @timestamp = TimeParser.call(io) + @message = WALMessage.new(io) + end + end + + struct KeepAlive + getter end : Int64 + getter timestamp : Time + getter? response_expected : Bool + + def initialize(io : IO) + @end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @timestamp = TimeParser.call(io) + @response_expected = io.read_bytes(Int8, IO::ByteFormat::NetworkEndian) == 1 + end + end + + abstract struct WALMessage + def self.new(io : IO) : self + case byte = io.read_byte + when Nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'B' + Begin.new(io) + when 'C' + Commit.new(io) + when 'R' + Relation.new(io) + when 'I' + Insert.new(io) + when 'U' + Update.new(io) + else + raise Error.new("Unexpected WAL message byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + + protected def read(io : IO, int : Int.class) + io.read_bytes int, IO::ByteFormat::NetworkEndian + end + + protected def read_string(io : IO) : String + io.read_line('\0', chomp: true) + end + + protected def read_bytes(io : IO) : Bytes + bytes = Bytes.new(read(io, Int32)) + io.read_fully bytes + bytes + end + + protected def read_time(io : IO) : Time + TimeParser.call(io) + end + + protected def read_tuple_data(io) : TupleData + column_count = read(io, Int16) + Array.new(column_count) do + case byte = io.read_byte + when Nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'n' + nil + when 'u' + UnchangedTOASTValue.new + when 't' + read_string(io) + when 'b' + read_bytes(io) + else + raise Error.new("Unexpected TupleData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end + + alias TupleData = Array(UnchangedTOASTValue | Bytes | String | Nil) + record UnchangedTOASTValue + end + + struct Begin < WALMessage + getter final_lsn : Int64 + getter timestamp : Time + getter transaction_id : Int32 + + def initialize(io : IO) + @final_lsn = read(io, Int64) + @timestamp = TimeParser.call(io) + @transaction_id = read(io, Int32) + end + end + + struct Message < WALMessage + end + + struct Commit < WALMessage + getter flags : Int8 + getter begin_lsn : Int64 + getter end_lsn : Int64 + getter timestamp : Time + + def initialize(io : IO) + @flags = read(io, Int8) + @begin_lsn = read(io, Int64) + @end_lsn = read(io, Int64) + @timestamp = read_time(io) + end + end + + struct Origin < WALMessage + end + + struct Relation < WALMessage + getter transaction_id : Int32 + getter oid : Int32 + getter namespace : String + getter name : String + getter replica_identity : Int8 + getter columns : Array(Column) + + def initialize(io : IO) + @transaction_id = read(io, Int32) + @oid = read(io, Int32) + @namespace = read_string(io) + @name = read_string(io) + @replica_identity = read(io, Int8) + column_count = read(io, Int16) + @columns = Array.new(column_count) do + Column.new( + flags: read(io, Int8), + name: read_string(io), + oid: read(io, Int32), + type_modifier: read(io, Int32), + ) + end + end + + record Column, + flags : Int8, + name : String, + oid : Int32, + type_modifier : Int32 + end + + struct Type < WALMessage + end + + struct Insert < WALMessage + # getter transaction_id : Int32 + getter oid : Int32 + getter tuple_data : TupleData + + def initialize(io : IO) + # Only included in WAL protocol v2+ + # @transaction_id = read(io, Int32) + @oid = read(io, Int32) + # This is an 'N' indicating a new tuple + io.read_byte + @tuple_data = read_tuple_data(io) + end + end + + struct Update < WALMessage + getter oid : Int32 + getter key_tuple_data : TupleData? + getter old_tuple_data : TupleData? + getter new_tuple_data : TupleData + + def initialize(io : IO) + @oid = read(io, Int32) + submessage_type = read(io, UInt8) + case submessage_type + when 'K' + @key_tuple_data = read_tuple_data(io) + when 'O' + @old_tuple_data = read_tuple_data(io) + when 'N' + new_tuple_data = read_tuple_data(io) + end + + # If either 'K' or 'O' were specified above, then the next value is our + # new tuple. Otherwise, that was our new tuple, so we just assign it. + if new_tuple_data + @new_tuple_data = new_tuple_data + else + case byte = read(io, UInt8) + when 'N' + @new_tuple_data = read_tuple_data(io) + else + raise Error.new("Expected new TupleData byte marker, got: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end + end + + struct Delete < WALMessage + end + + struct Truncate < WALMessage + end + + struct StreamStart < WALMessage + end + + struct StreamStop < WALMessage + end + + struct StreamCommit < WALMessage + end + + struct StreamAbort < WALMessage + end + + struct BeginPrepare < WALMessage + end + + struct Prepare < WALMessage + end + + struct CommitPrepared < WALMessage + end + + struct RollbackPrepared < WALMessage + end + + struct StreamPrepare < WALMessage + end + + struct TupleData < WALMessage + end + + private module TimeParser + extend self + + def call(io : IO) : Time + Time.utc(2000, 1, 1) + io.read_bytes(Int64, IO::ByteFormat::NetworkEndian).microseconds + end + end +end diff --git a/src/pq/connection.cr b/src/pq/connection.cr index c226ddd4..8c64f51b 100644 --- a/src/pq/connection.cr +++ b/src/pq/connection.cr @@ -15,6 +15,7 @@ module PQ class Connection getter soc : UNIXSocket | TCPSocket | OpenSSL::SSL::Socket::Client getter server_parameters = Hash(String, String).new + getter conninfo : ConnInfo property notice_handler = Proc(Notice, Void).new { } property notification_handler = Proc(Notification, Void).new { } @mutex = Mutex.new @@ -144,7 +145,7 @@ module PQ soc.skip(count) end - def startup(args) + def startup(args : Array(String)) len = args.reduce(0) { |acc, arg| acc + arg.size + 1 } write_i32 len + 8 + 1 write_i32 0x30000 @@ -188,6 +189,17 @@ module PQ end end + def start_replication_frame_loop(publication_name : String, slot_name : String, &block : PG::Replication::Frame ->) + command = "START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', binary 'true', publication_names '#{publication_name}')" + send_query_message command + loop do + break if soc.closed? + block.call PG::Replication::Frame.from_io(soc) + rescue e : IO::Error + raise e unless soc.closed? + end + end + private def read_one_frame(frame_type) size = read_i32 slice = read_bytes(size - 4) @@ -195,38 +207,26 @@ module PQ end private def handle_async_frames(frame) - if frame.is_a?(Frame::ErrorResponse) - handle_error frame - true - elsif frame.is_a?(Frame::NotificationResponse) - handle_notification frame - true - elsif frame.is_a?(Frame::NoticeResponse) - handle_notice frame - true - elsif frame.is_a?(Frame::ParameterStatus) - handle_parameter frame - true - else - false - end + false end - private def handle_error(error_frame : Frame::ErrorResponse) + private def handle_async_frames(error_frame : Frame::ErrorResponse) expect_frame Frame::ReadyForQuery if @established notice_handler.call(error_frame.as_notice) raise PQError.new(error_frame.fields) end - private def handle_notice(frame : Frame::NoticeResponse) - notice_handler.call(frame.as_notice) + private def handle_async_frames(frame : Frame::NotificationResponse) + notification_handler.call(frame.as_notification) + true end - private def handle_notification(frame : Frame::NotificationResponse) - notification_handler.call(frame.as_notification) + private def handle_async_frames(frame : Frame::NoticeResponse) + notice_handler.call(frame.as_notice) + true end - private def handle_parameter(frame : Frame::ParameterStatus) + private def handle_async_frames(frame : Frame::ParameterStatus) @server_parameters[frame.key] = frame.value case frame.key when "client_encoding" @@ -239,18 +239,21 @@ module PQ raise ConnectionError.new( "Only on is supported for integer_datetimes, got: #{frame.value.inspect}") end - else - # ignore end + + true end - def connect + def connect(*, replication : String? = nil) startup_args = [ "user", @conninfo.user, "database", @conninfo.database, "application_name", @conninfo.application_name, "client_encoding", "utf8", ] + if replication + startup_args << "replication" << replication + end startup startup_args diff --git a/src/pq/conninfo.cr b/src/pq/conninfo.cr index 6e1b9223..a5d77ce6 100644 --- a/src/pq/conninfo.cr +++ b/src/pq/conninfo.cr @@ -38,10 +38,12 @@ module PQ # The application name. Optional (defaults to "crystal"). getter application_name : String + getter replication : String? + getter auth_methods : Array(String) = %w[scram-sha-256-plus scram-sha-256 md5] # Create a new ConnInfo from all parts - def initialize(host : String? = nil, database : String? = nil, user : String? = nil, password : String? = nil, port : Int | String? = nil, sslmode : String | Symbol? = nil, application_name : String? = nil) + def initialize(host : String? = nil, database : String? = nil, user : String? = nil, password : String? = nil, port : Int | String? = nil, sslmode : String | Symbol? = nil, application_name : String? = nil, @replication = nil) @host = default_host host db = default_database database @database = db.lchop('/') @@ -77,7 +79,7 @@ module PQ def initialize(uri : URI) params = URI::Params.parse(uri.query.to_s) hostname = uri.hostname.presence || params.fetch("host", "") - initialize(hostname, uri.path, uri.user, uri.password, uri.port, :prefer, params.fetch("application_name", nil)) + initialize(hostname, uri.path, uri.user, uri.password, uri.port, :prefer, params.fetch("application_name", nil), params["replication"]?) if q = uri.query HTTP::Params.parse(q) do |key, value| handle_sslparam(key, value) @@ -89,10 +91,10 @@ module PQ # # Valid keys match Postgres "conninfo" keys and are `"host"`, `"dbname"`, # `"user"`, `"password"`, `"port"`, `"sslmode"`, `"sslcert"`, `"sslkey"`, - # `"sslrootcert"` and `"application_name"`. + # `"sslrootcert"`, `"application_name"`, and `"replication"`. def initialize(params : Hash) initialize(params["host"]?, params["dbname"]?, params["user"]?, - params["password"]?, params["port"]?, params["sslmode"]?, params["application_name"]?) + params["password"]?, params["port"]?, params["sslmode"]?, params["application_name"]?, params["replication"]?) params.each do |key, value| handle_sslparam(key, value) end From b119fda895c25f75526088a495fbd222b077114a Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Fri, 18 Jul 2025 17:27:50 -0500 Subject: [PATCH 02/26] Remove misguided method overload --- spec/pg/replication_spec.cr | 3 --- src/pg/replication.cr | 3 --- 2 files changed, 6 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 533c7819..a45aeea5 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -66,9 +66,6 @@ struct TestMessageHandler def received(msg : PG::Replication::StreamPrepare) end - - def received(msg : PG::Replication::TupleData) - end end describe PG::Replication do diff --git a/src/pg/replication.cr b/src/pg/replication.cr index e8043abd..7358f156 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -340,9 +340,6 @@ module PG::Replication struct StreamPrepare < WALMessage end - struct TupleData < WALMessage - end - private module TimeParser extend self From a13473af22b3cc4f1d0597a0a4f224ffb7bc178b Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Fri, 18 Jul 2025 17:28:14 -0500 Subject: [PATCH 03/26] Drop all replication slots after the spec This way, if the replication slot wasn't created, or if another stuck around from a previous run, we don't run into misleading errors. --- spec/pg/replication_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index a45aeea5..6d6a55ce 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -100,7 +100,7 @@ describe PG::Replication do pp handler ensure subscriber.try &.close - PG_DB.exec "SELECT pg_drop_replication_slot($1)", slot_name + PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" end From 79acb66cb2e73d2207fbd5e88f23c6faf169bff6 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Fri, 18 Jul 2025 17:46:23 -0500 Subject: [PATCH 04/26] Only run replication spec if wal_level=logical --- spec/pg/replication_spec.cr | 76 +++++++++++++++++++------------------ 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 6d6a55ce..1614f3c8 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -68,40 +68,44 @@ struct TestMessageHandler end end -describe PG::Replication do - it "consumes the WAL" do - publication_name = "test_publication_#{Random::Secure.hex}" - slot_name = "test_slot_#{Random::Secure.hex}" - table_name = "test_table_#{Random::Secure.hex}" - handler = TestMessageHandler.new - PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES" - PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name - subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name) - sleep 100.milliseconds - PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" - PG_DB.exec <<-SQL - CREATE TABLE #{table_name}( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - string TEXT, - number INT8 - ) - SQL - id = PG_DB.query_one <<-SQL, as: UUID - INSERT INTO #{table_name} (string, number) - VALUES ('foo', 1) - RETURNING id - SQL - - PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id - PG_DB.exec "UPDATE #{table_name} SET number = 2 WHERE id = $1", id - - sleep 100.milliseconds - - pp handler - ensure - subscriber.try &.close - PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" - PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" - PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" - end +if PG_DB.query_one("SHOW wal_level", as: String) == "logical" + describe PG::Replication do + it "consumes the WAL" do + publication_name = "test_publication_#{Random::Secure.hex}" + slot_name = "test_slot_#{Random::Secure.hex}" + table_name = "test_table_#{Random::Secure.hex}" + handler = TestMessageHandler.new + PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES" + PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name + subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name) + sleep 100.milliseconds + PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" + PG_DB.exec <<-SQL + CREATE TABLE #{table_name}( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + string TEXT, + number INT8 + ) + SQL + id = PG_DB.query_one <<-SQL, as: UUID + INSERT INTO #{table_name} (string, number) + VALUES ('foo', 1) + RETURNING id + SQL + + PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id + PG_DB.exec "UPDATE #{table_name} SET number = 2 WHERE id = $1", id + + sleep 100.milliseconds + + pp handler + ensure + subscriber.try &.close + PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" + PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" + PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" + end + end +else + Log.warn { "Skipping #{__FILE__}, set wal_level=logical in postgresql.conf to enable" } end From a592c2e48fbb6815a991cc92a3958536e9f522c5 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 20 Jul 2025 18:33:40 -0500 Subject: [PATCH 05/26] Add Delete and Type events This commit also comments out some logical WAL messages that aren't supported with proto_version 1. --- spec/pg/replication_spec.cr | 144 ++++++++++++++++++++++++++++++------ src/pg/replication.cr | 114 ++++++++++++++++++++++------ src/pq/connection.cr | 2 + 3 files changed, 215 insertions(+), 45 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 1614f3c8..0d8aa050 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -1,70 +1,155 @@ require "../spec_helper" -struct TestMessageHandler +class TestMessageHandler include PG::Replication::Handler + getter relations : Hash(Int32, PG::Replication::Relation) = Hash(Int32, PG::Replication::Relation).new + getter transaction : Transaction? + # : Hash(Int32, Hash(Bytes, PG::Replication::TupleData)) + getter data = Hash(Int32, Hash(Bytes, PG::Replication::WALMessage::TupleData)).new { |h, k| + h[k] = {} of Bytes => PG::Replication::WALMessage::TupleData + } + def received(msg : PG::Replication::Begin) - pp begin: msg + if transaction + raise "We are already running a transaction" + end + + @transaction = Transaction.new( + id: msg.transaction_id, + final_lsn: msg.final_lsn, + timestamp: msg.timestamp, + ) end def received(msg : PG::Replication::Message) - pp message: msg end def received(msg : PG::Replication::Commit) - pp commit: msg + # pp committing: transaction + transaction!.events.each do |event| + if relation = relations[event.oid] + # There can be multiple parts of the primary key + case event.type + in .insert?, .update? + if (key = relation.columns.map_with_index { |column, index| index if column.flags.key? }.compact).any? + data[event.oid][Slice.join(key.map { |key_part_index| make_key(event.tuple_data[key_part_index]) })] = event.tuple_data + else + # What do? + end + in .delete? + if (key = relation.columns.map_with_index { |column, index| index if column.flags.key? }.compact).any? + data[event.oid].delete Slice.join(key.map { |key_part_index| make_key(event.tuple_data[key_part_index]) }) + else + # What do? + end + end + end + end + @transaction = nil end def received(msg : PG::Replication::Origin) end def received(msg : PG::Replication::Relation) - pp relation: msg + relations[msg.oid] = msg end def received(msg : PG::Replication::Type) end def received(msg : PG::Replication::Insert) - pp insert: msg + transaction!.insert oid: msg.oid, tuple_data: msg.tuple_data end def received(msg : PG::Replication::Update) - pp update: msg + transaction!.update oid: msg.oid, tuple_data: msg.new_tuple_data end def received(msg : PG::Replication::Delete) - pp delete: msg + transaction!.delete oid: msg.oid, tuple_data: {msg.key_tuple_data, msg.key_tuple_data}.first.not_nil! end def received(msg : PG::Replication::Truncate) end - def received(msg : PG::Replication::StreamStart) - end + # Requires proto_version >= 2 + # def received(msg : PG::Replication::StreamStart) + # end - def received(msg : PG::Replication::StreamStop) - end + # def received(msg : PG::Replication::StreamStop) + # end - def received(msg : PG::Replication::StreamCommit) - end + # def received(msg : PG::Replication::StreamCommit) + # end - def received(msg : PG::Replication::StreamAbort) - end + # def received(msg : PG::Replication::StreamAbort) + # end + + # Requires proto_version >= 3 + # def received(msg : PG::Replication::BeginPrepare) + # end + + # def received(msg : PG::Replication::Prepare) + # end + + # def received(msg : PG::Replication::CommitPrepared) + # end - def received(msg : PG::Replication::BeginPrepare) + # def received(msg : PG::Replication::RollbackPrepared) + # end + + # def received(msg : PG::Replication::StreamPrepare) + # end + + private def transaction! + transaction.not_nil! end - def received(msg : PG::Replication::Prepare) + private def make_key(value : String | Bytes) : Bytes + value.to_slice end - def received(msg : PG::Replication::CommitPrepared) + private def make_key(value : PG::Replication::WALMessage::UnchangedTOASTValue) : Bytes + raise ArgumentError.new("Using a TOASTed value in a primary key is unsupported") end - def received(msg : PG::Replication::RollbackPrepared) + private def make_key(value : Nil) : Bytes + Bytes.empty end - def received(msg : PG::Replication::StreamPrepare) + class Transaction + getter id : Int32 + getter final_lsn : Int64 + getter timestamp : Time + getter events : Array(Event) + + def initialize(@id, @final_lsn, @timestamp, @events = [] of Event) + end + + def insert(oid : Int32, tuple_data : PG::Replication::WALMessage::TupleData) + events << Event.new(oid, tuple_data, :insert) + end + + def update(oid : Int32, tuple_data : PG::Replication::WALMessage::TupleData) + events << Event.new(oid, tuple_data, :update) + end + + def delete(oid : Int32, tuple_data : PG::Replication::WALMessage::TupleData) + events << Event.new(oid, tuple_data, :delete) + end + + record Event, + oid : Int32, + tuple_data : PG::Replication::WALMessage::TupleData, + type : Type do + enum Type + Insert + Update + Delete + end + end end end @@ -74,11 +159,14 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" publication_name = "test_publication_#{Random::Secure.hex}" slot_name = "test_slot_#{Random::Secure.hex}" table_name = "test_table_#{Random::Secure.hex}" + type_name = "my_enum_#{Random::Secure.hex}" handler = TestMessageHandler.new PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES" PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name) sleep 100.milliseconds + PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" + PG_DB.exec "CREATE TYPE #{type_name} AS ENUM ('foo', 'bar', 'baz')" PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" PG_DB.exec <<-SQL CREATE TABLE #{table_name}( @@ -95,15 +183,25 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id PG_DB.exec "UPDATE #{table_name} SET number = 2 WHERE id = $1", id + PG_DB.exec "DELETE FROM #{table_name} WHERE id = $1", id - sleep 100.milliseconds + begin + PG_DB.transaction do |txn| + txn.connection.exec "INSERT INTO #{table_name} (string, number) VALUES ('bar', 42)" + raise "hell" + end + rescue + # this is expected + end - pp handler + sleep 100.milliseconds ensure subscriber.try &.close PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" + PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" + pp handler end end else diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 7358f156..7d5f5f58 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -132,6 +132,10 @@ module PG::Replication abstract struct WALMessage def self.new(io : IO) : self + {% if @type != PG::Replication::WALMessage %} + {% raise "Must implement #{@type}#initialize(io : IO)" %} + {% end %} + case byte = io.read_byte when Nil raise IO::EOFError.new("Connection was unexpectedly terminated") @@ -145,6 +149,10 @@ module PG::Replication Insert.new(io) when 'U' Update.new(io) + when 'D' + Delete.new(io) + when 'T' + Type.new(io) else raise Error.new("Unexpected WAL message byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") end @@ -225,7 +233,8 @@ module PG::Replication end struct Relation < WALMessage - getter transaction_id : Int32 + # Requires proto_version >= 2 + # getter transaction_id : Int32 getter oid : Int32 getter namespace : String getter name : String @@ -233,7 +242,7 @@ module PG::Replication getter columns : Array(Column) def initialize(io : IO) - @transaction_id = read(io, Int32) + # @transaction_id = read(io, Int32) @oid = read(io, Int32) @namespace = read_string(io) @name = read_string(io) @@ -241,7 +250,7 @@ module PG::Replication column_count = read(io, Int16) @columns = Array.new(column_count) do Column.new( - flags: read(io, Int8), + flags: Flags.new(read(io, Int8)), name: read_string(io), oid: read(io, Int32), type_modifier: read(io, Int32), @@ -250,13 +259,27 @@ module PG::Replication end record Column, - flags : Int8, + flags : Flags, name : String, oid : Int32, type_modifier : Int32 + @[::Flags] + enum Flags + Key = 1 + end end struct Type < WALMessage + # getter transaction_id : Int32 + getter oid : Int32 + getter namespace : String + getter data_type : String + + def initialize(io : IO) + @oid = read(io, Int32) + @namespace = read_string(io) + @data_type = read_string(io) + end end struct Insert < WALMessage @@ -308,37 +331,84 @@ module PG::Replication end struct Delete < WALMessage + # getter transaction_id : Int32 + getter oid : Int32 + getter key_tuple_data : TupleData? + getter old_tuple_data : TupleData? + + def initialize(io : IO) + # @transaction_id = read(io, Int32) # Requires proto_version >= 2 + @oid = read(io, Int32) + case byte = io.read_byte + when nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'K' + @key_tuple_data = read_tuple_data(io) + when 'O' + @old_tuple_data = read_tuple_data(io) + else + raise Error.new("Expected new TupleData byte marker, got: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end end struct Truncate < WALMessage - end + # transaction_id requires proto_version >= 2 + # getter transaction_id : Int32 + getter options : Options + getter relation_oids : Array(Int32) - struct StreamStart < WALMessage - end + def initialize(io : IO) + # @transaction_id = read(io, Int32) # Requires proto_version >= 2 + relation_count = read(io, Int32) + @options = Options.new(read(io, Int8)) + @relation_oids = Array.new(relation_count) do + read(io, Int32) + end + end - struct StreamStop < WALMessage + @[Flags] + enum Options : Int8 + CASCADE = 1 + RESTART_IDENTITY = 2 + end end - struct StreamCommit < WALMessage - end + # # StreamStart requires proto_version >= 2 + # struct StreamStart < WALMessage + # end - struct StreamAbort < WALMessage - end + # # StreamStop requires proto_version >= 2 + # struct StreamStop < WALMessage + # end - struct BeginPrepare < WALMessage - end + # # StreamCommit requires proto_version >= 2 + # struct StreamCommit < WALMessage + # end - struct Prepare < WALMessage - end + # # StreamAbort requires proto_version >= 2 + # struct StreamAbort < WALMessage + # end - struct CommitPrepared < WALMessage - end + # # BeginPrepare requires proto_version >=3 + # struct BeginPrepare < WALMessage + # end - struct RollbackPrepared < WALMessage - end + # # Prepare requires proto_version >=3 + # struct Prepare < WALMessage + # end - struct StreamPrepare < WALMessage - end + # # CommitPrepared requires proto_version >=3 + # struct CommitPrepared < WALMessage + # end + + # # RollbackPrepared requires proto_version >=3 + # struct RollbackPrepared < WALMessage + # end + + # # StreamPrepare requires proto_version >=3 + # struct StreamPrepare < WALMessage + # end private module TimeParser extend self diff --git a/src/pq/connection.cr b/src/pq/connection.cr index 8c64f51b..3deb4f23 100644 --- a/src/pq/connection.cr +++ b/src/pq/connection.cr @@ -197,6 +197,8 @@ module PQ block.call PG::Replication::Frame.from_io(soc) rescue e : IO::Error raise e unless soc.closed? + rescue e + Log.error(exception: e) end end From 23f7e8150e4485214df0395f0fbd08829fbe58ac Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 20 Jul 2025 20:19:07 -0500 Subject: [PATCH 06/26] Use more compatible `Log.error` overload Block-less `Log.error(exception : Exception)` was only introduced in recent versions of Crystal. This shard supports versions earlier than that, so we need to use the more compatible overload of the method. --- src/pq/connection.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pq/connection.cr b/src/pq/connection.cr index 3deb4f23..845cce5e 100644 --- a/src/pq/connection.cr +++ b/src/pq/connection.cr @@ -198,7 +198,7 @@ module PQ rescue e : IO::Error raise e unless soc.closed? rescue e - Log.error(exception: e) + Log.error(exception: e) { } end end From 2be23a44405da44149dd95711d9f07b8fb221517 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 20 Jul 2025 20:20:25 -0500 Subject: [PATCH 07/26] Add floats to the spec --- spec/pg/replication_spec.cr | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 0d8aa050..8793afd6 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -172,22 +172,28 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" CREATE TABLE #{table_name}( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), string TEXT, - number INT8 + int INT8, + float FLOAT8 ) SQL id = PG_DB.query_one <<-SQL, as: UUID - INSERT INTO #{table_name} (string, number) - VALUES ('foo', 1) + INSERT INTO #{table_name} (string, int, float) + VALUES ('foo', 1, 12.34) + RETURNING id + SQL + PG_DB.exec <<-SQL + INSERT INTO #{table_name} (string, int, float) + VALUES ('bar', 2, 56.78) RETURNING id SQL PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id - PG_DB.exec "UPDATE #{table_name} SET number = 2 WHERE id = $1", id + PG_DB.exec "UPDATE #{table_name} SET int = 2 WHERE id = $1", id PG_DB.exec "DELETE FROM #{table_name} WHERE id = $1", id begin PG_DB.transaction do |txn| - txn.connection.exec "INSERT INTO #{table_name} (string, number) VALUES ('bar', 42)" + txn.connection.exec "INSERT INTO #{table_name} (string, int) VALUES ('bar', 42)" raise "hell" end rescue From 29d6040d92fe1909b0819f4cc1d825a3eb9b5721 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Sun, 20 Jul 2025 20:28:00 -0500 Subject: [PATCH 08/26] Add TODOs for later `proto_version`s --- spec/pg/replication_spec.cr | 2 ++ src/pg/replication.cr | 2 ++ 2 files changed, 4 insertions(+) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 8793afd6..c699e7ef 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -74,6 +74,8 @@ class TestMessageHandler def received(msg : PG::Replication::Truncate) end + # TODO: Uncomment the methods below in order to test higher `proto_version`s + # Requires proto_version >= 2 # def received(msg : PG::Replication::StreamStart) # end diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 7d5f5f58..d9dfa924 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -374,6 +374,8 @@ module PG::Replication end end + # TODO: Uncomment the methods below in order to test higher `proto_version`s + # # StreamStart requires proto_version >= 2 # struct StreamStart < WALMessage # end From 6c7ca4c465bc3bf9936b35d5883f4fe427048e73 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 23 Jul 2025 18:16:42 -0500 Subject: [PATCH 09/26] Fix type subscriptions --- spec/pg/replication_spec.cr | 62 ++++++++++++++++++++++++++++++++++++- src/pg/replication.cr | 2 +- 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index c699e7ef..4b820e76 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -4,6 +4,7 @@ class TestMessageHandler include PG::Replication::Handler getter relations : Hash(Int32, PG::Replication::Relation) = Hash(Int32, PG::Replication::Relation).new + getter types = Hash(Int32, PG::Replication::Type).new getter transaction : Transaction? # : Hash(Int32, Hash(Bytes, PG::Replication::TupleData)) getter data = Hash(Int32, Hash(Bytes, PG::Replication::WALMessage::TupleData)).new { |h, k| @@ -56,7 +57,8 @@ class TestMessageHandler relations[msg.oid] = msg end - def received(msg : PG::Replication::Type) + def received(type : PG::Replication::Type) + types[type.oid] = type end def received(msg : PG::Replication::Insert) @@ -211,7 +213,65 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" pp handler end + + it_consumes_wal "new types" do |handler, context| + PG_DB.exec "CREATE TYPE #{context.type_name} AS ENUM ('one', 'two', 'three')" + # The type isn't sent until there's a table that uses it + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY DEFAULT gen_random_uuid(), thing #{context.type_name})" + # ... and the table isn't sent until there's a record in it + PG_DB.exec "INSERT INTO #{context.table_name} (thing) VALUES ('three')" + + # Wait for the insert to propagate + wait_for { handler.types.any? } + + handler.types.first.last.data_type.should eq context.type_name + end end else Log.warn { "Skipping #{__FILE__}, set wal_level=logical in postgresql.conf to enable" } end + +private def it_consumes_wal(name : String, **options, &block : TestMessageHandler, Context ->) + it "consumes #{name} from the WAL", **options do + context = Context.new + handler = TestMessageHandler.new(PG_DB) + PG_DB.exec "CREATE PUBLICATION #{context.publication_name} FOR ALL TABLES" + PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", context.slot_name + subscriber = PG.connect_replication DB_URL, + handler: handler, + publication_name: context.publication_name, + slot_name: context.slot_name + + begin + block.call handler, context + ensure + subscriber.try &.close + PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" + PG_DB.query_each "SELECT pubname::text FROM pg_publication_tables WHERE schemaname = 'public' and pubname::text LIKE 'test_publication_%' GROUP BY 1" do |rs| + PG_DB.exec "DROP PUBLICATION IF EXISTS #{rs.read(String)}" + end + PG_DB.query_each "SELECT tablename::text FROM pg_tables WHERE schemaname = 'public' and tablename LIKE 'test_table_%'" do |rs| + PG_DB.exec "DROP TABLE IF EXISTS #{rs.read(String)}" + end + PG_DB.query_each "SELECT typname::text FROM pg_type WHERE typname LIKE 'test_type_%'" do |rs| + PG_DB.exec "DROP TYPE IF EXISTS #{rs.read(String)}" + end + end + end +end + +private record Context, + table_name : String = "test_table_#{Random::Secure.hex}", + publication_name : String = "test_publication_#{Random::Secure.hex}", + slot_name : String = "test_slot_#{Random::Secure.hex}", + type_name : String = "test_type_#{Random::Secure.hex}" + +private def wait_for(condition = "the block to return truthy", timeout : Time::Span = 2.seconds, &) + start = Time.monotonic + until yield + if Time.monotonic - start > 2.seconds + raise "Timed out waiting for #{condition}" + end + sleep 1.millisecond + end +end diff --git a/src/pg/replication.cr b/src/pg/replication.cr index d9dfa924..446d2c10 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -151,7 +151,7 @@ module PG::Replication Update.new(io) when 'D' Delete.new(io) - when 'T' + when 'Y' Type.new(io) else raise Error.new("Unexpected WAL message byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") From b1c8209c8de4e4f68bf3c49b8e23310e20812cdb Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 23 Jul 2025 18:18:34 -0500 Subject: [PATCH 10/26] Add specific specs for Insert, Update, and Delete --- spec/pg/replication_spec.cr | 87 ++++++++++++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 11 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 4b820e76..9033848d 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -6,7 +6,6 @@ class TestMessageHandler getter relations : Hash(Int32, PG::Replication::Relation) = Hash(Int32, PG::Replication::Relation).new getter types = Hash(Int32, PG::Replication::Type).new getter transaction : Transaction? - # : Hash(Int32, Hash(Bytes, PG::Replication::TupleData)) getter data = Hash(Int32, Hash(Bytes, PG::Replication::WALMessage::TupleData)).new { |h, k| h[k] = {} of Bytes => PG::Replication::WALMessage::TupleData } @@ -27,7 +26,6 @@ class TestMessageHandler end def received(msg : PG::Replication::Commit) - # pp committing: transaction transaction!.events.each do |event| if relation = relations[event.oid] # There can be multiple parts of the primary key @@ -66,11 +64,13 @@ class TestMessageHandler end def received(msg : PG::Replication::Update) - transaction!.update oid: msg.oid, tuple_data: msg.new_tuple_data + transaction!.update oid: msg.oid, tuple_data: msg.new_tuple_data, + old_tuple_data: msg.old_tuple_data, + key_tuple_data: msg.key_tuple_data end def received(msg : PG::Replication::Delete) - transaction!.delete oid: msg.oid, tuple_data: {msg.key_tuple_data, msg.key_tuple_data}.first.not_nil! + transaction!.delete oid: msg.oid, tuple_data: {msg.key_tuple_data, msg.old_tuple_data}.first.not_nil! end def received(msg : PG::Replication::Truncate) @@ -129,6 +129,8 @@ class TestMessageHandler getter timestamp : Time getter events : Array(Event) + alias TupleData = PG::Replication::WALMessage::TupleData + def initialize(@id, @final_lsn, @timestamp, @events = [] of Event) end @@ -136,8 +138,8 @@ class TestMessageHandler events << Event.new(oid, tuple_data, :insert) end - def update(oid : Int32, tuple_data : PG::Replication::WALMessage::TupleData) - events << Event.new(oid, tuple_data, :update) + def update(oid : Int32, tuple_data : TupleData, old_tuple_data : TupleData?, key_tuple_data : TupleData?) + events << Event.new(oid, tuple_data, :update, old_tuple_data: old_tuple_data, key_tuple_data: key_tuple_data) end def delete(oid : Int32, tuple_data : PG::Replication::WALMessage::TupleData) @@ -146,8 +148,10 @@ class TestMessageHandler record Event, oid : Int32, - tuple_data : PG::Replication::WALMessage::TupleData, - type : Type do + tuple_data : TupleData, + type : Type, + old_tuple_data : TupleData? = nil, + key_tuple_data : TupleData? = nil do enum Type Insert Update @@ -168,7 +172,7 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES" PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name) - sleep 100.milliseconds + sleep 10.milliseconds PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" PG_DB.exec "CREATE TYPE #{type_name} AS ENUM ('foo', 'bar', 'baz')" PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" @@ -197,6 +201,7 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" begin PG_DB.transaction do |txn| + # This won't be reported to our replication handler txn.connection.exec "INSERT INTO #{table_name} (string, int) VALUES ('bar', 42)" raise "hell" end @@ -204,14 +209,74 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" # this is expected end - sleep 100.milliseconds + sleep 10.milliseconds ensure subscriber.try &.close PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" - pp handler + end + + it_consumes_wal "new relations" do |handler, context| + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY, string TEXT)" + # Apparently the Relation message isn't sent until after data is inserted + # into the table. Without this insert, the `wait_for` call times out. + PG_DB.exec "INSERT INTO #{context.table_name} (id, string) VALUES ($1, $2)", UUID.v7, "yep" + + wait_for { handler.relations.any? } + oid, relation = handler.relations.first + + relation.namespace.should eq "public" + relation.name.should eq context.table_name + # The `id` column must be indicated as part of the relation's primary key + relation.columns + .find! { |column| column.name == "id" } + .flags.key?.should eq true + end + + it_consumes_wal "inserts" do |handler, context| + id = UUID.v7 + string = "asdf" + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY, string TEXT)" + PG_DB.exec "INSERT INTO #{context.table_name} (id, string) VALUES ($1, $2)", id, string + + wait_for { handler.data.any?(&.last.any?) } + + _, records = handler.data.first + pk, tuple = records.first + pk.should eq id.bytes.to_slice + tuple[1].should eq string.to_slice + end + + it_consumes_wal "updates" do |handler, context| + id = UUID.v7 + string = "omg" + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY, string TEXT)" + PG_DB.exec "INSERT INTO #{context.table_name} (id, string) VALUES ($1, $2)", id, string + PG_DB.exec "UPDATE #{context.table_name} SET string = 'lol' WHERE id = $1", id + + # Wait for at least the insert to propagate + wait_for { handler.data.any?(&.last.any?) } + # Give the update just a little longer to come in + wait_for "record to be updated" do + _, records = handler.data.first + _, tuple = records.first + tuple[1] == "lol".to_slice + end + end + + it_consumes_wal "deletes" do |handler, context| + id = UUID.v7 + string = "omg" + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY, string TEXT)" + PG_DB.exec "INSERT INTO #{context.table_name} (id, string) VALUES ($1, $2)", id, string + PG_DB.exec "DELETE FROM #{context.table_name} WHERE id = $1", id + + # Wait for at least the insert to propagate + wait_for { handler.data.any?(&.last.any?) } + # Give the delete just a little longer to come in + wait_for "data to be deleted" { handler.data.first.last.none? } end it_consumes_wal "new types" do |handler, context| From 89ffa31741695493f48c79246b2beb71f5be74a1 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 23 Jul 2025 18:20:18 -0500 Subject: [PATCH 11/26] Handle keep-alives --- src/pg/replication.cr | 118 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 106 insertions(+), 12 deletions(-) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 446d2c10..907201f9 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -8,6 +8,7 @@ module PG::Replication getter handler : Handler getter publication_name : String getter slot_name : String + @latest_wal = 0i64 def initialize(uri : URI | String, @handler, *, @publication_name, @slot_name, blocking : Bool = false) if uri.is_a? String @@ -26,6 +27,13 @@ module PG::Replication ) do |frame| received frame end + + spawn do + loop do + sleep 10.seconds + send_keepalive + end + end end def received(frame : CopyBoth) @@ -40,14 +48,43 @@ module PG::Replication end def received(keepalive : KeepAlive) - if keepalive.response_expected? - # TODO: Actually respond - end + @latest_wal = keepalive.wal_end + send_keepalive if keepalive.response_expected? + end + + # This shouldn't ever be received, but it can be represented in memory so we + # need to include it for completeness. + def received(response : KeepAliveResponse) + raise NotImplementedError.new("KeepAliveResponses are intended to be sent, not received") end def close + send_keepalive @conn.close end + + def send_keepalive + write { send_keepalive! } + end + + private def send_keepalive! : Nil + CopyData.new( + StandbyStatusUpdate.new( + last_wal_byte_received: @latest_wal, + last_wal_byte_flushed: @latest_wal, + last_wal_byte_applied: @latest_wal, + ) + ) + .to_io @conn.connection.soc + + @conn.connection.soc.flush + end + + private def write(&) + write_mutex.synchronize { yield } + end + + getter write_mutex = Mutex.new end abstract struct Frame @@ -87,7 +124,7 @@ module PG::Replication end struct CopyData < Frame - getter data : XLogData | KeepAlive + getter data : XLogData | KeepAlive | KeepAliveResponse def initialize(io : IO) size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) @@ -102,32 +139,85 @@ module PG::Replication raise Error.new("Unexpected CopyData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") end end + + def initialize(@data) + end + + def to_io(io : IO) : Nil + buffer = IO::Memory.new + io << 'd' + payload = IO::Memory.new.tap { |buf| data.to_io buf }.to_slice + io.write_bytes payload.bytesize + 4, IO::ByteFormat::NetworkEndian + io.write payload + end end struct XLogData - getter start : Int64 - getter end : Int64 + getter wal_start : Int64 + getter wal_end : Int64 getter timestamp : Time getter message : WALMessage def initialize(io : IO) - @start = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) - @end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @wal_start = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @wal_end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) @timestamp = TimeParser.call(io) @message = WALMessage.new(io) end + + def to_io(io : IO) : Nil + raise NotImplementedError.new("XLogData messages are meant to be sent by the server, not received by the client") + end end struct KeepAlive - getter end : Int64 + getter wal_end : Int64 getter timestamp : Time getter? response_expected : Bool def initialize(io : IO) - @end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @wal_end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) @timestamp = TimeParser.call(io) @response_expected = io.read_bytes(Int8, IO::ByteFormat::NetworkEndian) == 1 end + + def to_io(io : IO) : Nil + raise NotImplementedError.new("KeepAlives are intended to be received from the server, not sent to it") + end + end + + abstract struct KeepAliveResponse + end + + struct StandbyStatusUpdate < KeepAliveResponse + getter last_wal_byte_received : Int64 + getter last_wal_byte_flushed : Int64 + getter last_wal_byte_applied : Int64 + getter timestamp : Time + getter? response_expected : Bool + + def initialize( + @last_wal_byte_received, + @last_wal_byte_flushed, + @last_wal_byte_applied, + *, + @timestamp = Time.utc, + @response_expected = false, + ) + end + + def to_io(io : IO) : Nil + io << 'r' + write io, last_wal_byte_received + write io, last_wal_byte_flushed + write io, last_wal_byte_applied + write io, (@timestamp - Time.utc(2000, 1, 1)).total_microseconds.to_i64 + write io, response_expected? ? 1u8 : 0u8 + end + + private def write(io, value) + io.write_bytes value, IO::ByteFormat::NetworkEndian + end end abstract struct WALMessage @@ -374,7 +464,7 @@ module PG::Replication end end - # TODO: Uncomment the methods below in order to test higher `proto_version`s + # TODO: Uncomment the methods below in order to support higher `proto_version`s # # StreamStart requires proto_version >= 2 # struct StreamStart < WALMessage @@ -416,7 +506,11 @@ module PG::Replication extend self def call(io : IO) : Time - Time.utc(2000, 1, 1) + io.read_bytes(Int64, IO::ByteFormat::NetworkEndian).microseconds + call io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + end + + def call(microseconds : Int64) + Time.utc(2000, 1, 1) + microseconds.microseconds end end end From b79af9c2f05d6f65f7577a432296c9ea352e5f41 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 23 Jul 2025 18:23:42 -0500 Subject: [PATCH 12/26] Fix spec This was leftover from an experiment I thought I'd removed --- spec/pg/replication_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 9033848d..66cf926e 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -299,7 +299,7 @@ end private def it_consumes_wal(name : String, **options, &block : TestMessageHandler, Context ->) it "consumes #{name} from the WAL", **options do context = Context.new - handler = TestMessageHandler.new(PG_DB) + handler = TestMessageHandler.new PG_DB.exec "CREATE PUBLICATION #{context.publication_name} FOR ALL TABLES" PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", context.slot_name subscriber = PG.connect_replication DB_URL, From b6cb95359c4ba23f245445f8c256c3c0a332d834 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 23 Jul 2025 18:24:10 -0500 Subject: [PATCH 13/26] Remove superfluous spec This is now broken down into multiple specs that test individual parts of this functionality. --- spec/pg/replication_spec.cr | 55 ------------------------------------- 1 file changed, 55 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 66cf926e..9b983e29 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -163,61 +163,6 @@ end if PG_DB.query_one("SHOW wal_level", as: String) == "logical" describe PG::Replication do - it "consumes the WAL" do - publication_name = "test_publication_#{Random::Secure.hex}" - slot_name = "test_slot_#{Random::Secure.hex}" - table_name = "test_table_#{Random::Secure.hex}" - type_name = "my_enum_#{Random::Secure.hex}" - handler = TestMessageHandler.new - PG_DB.exec "CREATE PUBLICATION #{publication_name} FOR ALL TABLES" - PG_DB.exec "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slot_name - subscriber = PG.connect_replication(DB_URL, handler: handler, publication_name: publication_name, slot_name: slot_name) - sleep 10.milliseconds - PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" - PG_DB.exec "CREATE TYPE #{type_name} AS ENUM ('foo', 'bar', 'baz')" - PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" - PG_DB.exec <<-SQL - CREATE TABLE #{table_name}( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - string TEXT, - int INT8, - float FLOAT8 - ) - SQL - id = PG_DB.query_one <<-SQL, as: UUID - INSERT INTO #{table_name} (string, int, float) - VALUES ('foo', 1, 12.34) - RETURNING id - SQL - PG_DB.exec <<-SQL - INSERT INTO #{table_name} (string, int, float) - VALUES ('bar', 2, 56.78) - RETURNING id - SQL - - PG_DB.exec "UPDATE #{table_name} SET string = 'bar' WHERE id = $1", id - PG_DB.exec "UPDATE #{table_name} SET int = 2 WHERE id = $1", id - PG_DB.exec "DELETE FROM #{table_name} WHERE id = $1", id - - begin - PG_DB.transaction do |txn| - # This won't be reported to our replication handler - txn.connection.exec "INSERT INTO #{table_name} (string, int) VALUES ('bar', 42)" - raise "hell" - end - rescue - # this is expected - end - - sleep 10.milliseconds - ensure - subscriber.try &.close - PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" - PG_DB.exec "DROP PUBLICATION IF EXISTS #{publication_name}" - PG_DB.exec "DROP TABLE IF EXISTS #{table_name}" - PG_DB.exec "DROP TYPE IF EXISTS #{type_name}" - end - it_consumes_wal "new relations" do |handler, context| PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY, string TEXT)" # Apparently the Relation message isn't sent until after data is inserted From a1afa15f4976b13dc6dcc00bf7c4625b2145b842 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 23 Jul 2025 23:30:32 -0500 Subject: [PATCH 14/26] Add test for schema changes --- spec/pg/replication_spec.cr | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index 9b983e29..dd59b536 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -236,6 +236,20 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" handler.types.first.last.data_type.should eq context.type_name end + + it_consumes_wal "schema changes" do |handler, context| + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY DEFAULT gen_random_uuid())" + PG_DB.exec "INSERT INTO #{context.table_name} (id) VALUES ($1)", UUID.v7 + wait_for { handler.relations.any? } + # Make sure we get that the table has 1 column before proceeding + wait_for { handler.relations.first.last.columns.size == 1 } + + PG_DB.exec "ALTER TABLE #{context.table_name} ADD COLUMN string TEXT" + PG_DB.exec "INSERT INTO #{context.table_name} (id, string) VALUES ($1, $2)", UUID.v7, "my string" + + # Now the table has 2 columns + wait_for { handler.relations.first.last.columns.size == 2 } + end end else Log.warn { "Skipping #{__FILE__}, set wal_level=logical in postgresql.conf to enable" } From cae35b62862fafa9a6de0cff5974fa5c6386747a Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Thu, 24 Jul 2025 19:48:29 -0500 Subject: [PATCH 15/26] Handle Truncation, Message, and Origin WALMessages I don't know how to test Message and Origin yet, but they need to be there to at least consume the bytes off the wire. --- spec/pg/replication_spec.cr | 12 ++++++++++++ src/pg/replication.cr | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index dd59b536..f0bb1890 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -5,6 +5,7 @@ class TestMessageHandler getter relations : Hash(Int32, PG::Replication::Relation) = Hash(Int32, PG::Replication::Relation).new getter types = Hash(Int32, PG::Replication::Type).new + getter truncations = [] of PG::Replication::Truncate getter transaction : Transaction? getter data = Hash(Int32, Hash(Bytes, PG::Replication::WALMessage::TupleData)).new { |h, k| h[k] = {} of Bytes => PG::Replication::WALMessage::TupleData @@ -74,6 +75,7 @@ class TestMessageHandler end def received(msg : PG::Replication::Truncate) + truncations << msg end # TODO: Uncomment the methods below in order to test higher `proto_version`s @@ -250,6 +252,16 @@ if PG_DB.query_one("SHOW wal_level", as: String) == "logical" # Now the table has 2 columns wait_for { handler.relations.first.last.columns.size == 2 } end + + it_consumes_wal "truncations" do |handler, context| + PG_DB.exec "CREATE TABLE #{context.table_name} (id UUID PRIMARY KEY DEFAULT gen_random_uuid())" + PG_DB.exec "INSERT INTO #{context.table_name} (id) VALUES ($1)", UUID.v7 + wait_for { handler.relations.any? } + + PG_DB.exec "TRUNCATE #{context.table_name}" + + wait_for { handler.truncations.any? } + end end else Log.warn { "Skipping #{__FILE__}, set wal_level=logical in postgresql.conf to enable" } diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 907201f9..91ce3163 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -241,8 +241,14 @@ module PG::Replication Update.new(io) when 'D' Delete.new(io) + when 'T' + Truncate.new(io) when 'Y' Type.new(io) + when 'M' + Message.new(io) + when 'O' + Origin.new(io) else raise Error.new("Unexpected WAL message byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") end @@ -303,6 +309,25 @@ module PG::Replication end struct Message < WALMessage + # Requires proto_version >= 2 + # getter transaction_id : Int32 + getter flags : Flags + getter lsn : Int64 + getter prefix : String + getter content : Bytes + + def initialize(io : IO) + # @transaction_id = read(io, Int32) # Requires proto_version >= 2 + @flags = Flags.new(read(io, Int8)) + @lsn = read(io, Int64) + @prefix = read_string(io) + @content = read_bytes(io) + end + + @[::Flags] + enum Flags : Int8 + Transactional + end end struct Commit < WALMessage @@ -320,6 +345,13 @@ module PG::Replication end struct Origin < WALMessage + getter lsn : Int64 + getter name : String + + def initialize(io : IO) + @lsn = read(io, Int64) + @name = read_string(io) + end end struct Relation < WALMessage @@ -459,12 +491,13 @@ module PG::Replication @[Flags] enum Options : Int8 + NONE = 0 CASCADE = 1 RESTART_IDENTITY = 2 end end - # TODO: Uncomment the methods below in order to support higher `proto_version`s + # TODO: Uncomment the types below in order to support higher `proto_version`s # # StreamStart requires proto_version >= 2 # struct StreamStart < WALMessage From 3813a07c34289846fa4ae4d68b0ffed8ef40b2ba Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Thu, 24 Jul 2025 20:54:55 -0500 Subject: [PATCH 16/26] Make the handler update last bytes flushed/applied --- spec/pg/replication_spec.cr | 6 +++ src/pg/replication.cr | 77 +++++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 12 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index f0bb1890..b16b82b8 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -11,6 +11,12 @@ class TestMessageHandler h[k] = {} of Bytes => PG::Replication::WALMessage::TupleData } + def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection, &) + yield + connection.last_wal_byte_flushed = msg.wal_end + connection.last_wal_byte_applied = msg.wal_end + end + def received(msg : PG::Replication::Begin) if transaction raise "We are already running a transaction" diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 91ce3163..bdf6d007 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -1,5 +1,31 @@ module PG::Replication module Handler + # This method must be defined in order to tell the `Connection` how much of + # the WAL has been flushed and applied. + # + # ``` + # connection = PG.listen_replication db_url, + # handler: MyHandler.new, + # publication_name: "my_publication", + # slot_name: "my_replication_slot" + # + # class MyHandler + # include PG::Replication::Handler + # + # @last_wal_byte_flushed = 0i64 + # @last_wal_byte_applied = 0i64 + # + # def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection, &) + # yield + # connection.last_wal_byte_flushed = msg.wal_end + # if msg.data.is_a? PG::Replication::Commit + # connection.last_wal_byte_applied = msg.wal_end + # end + # end + # end + # ``` + abstract def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection) + def received(frame) end end @@ -8,8 +34,12 @@ module PG::Replication getter handler : Handler getter publication_name : String getter slot_name : String - @latest_wal = 0i64 + getter last_wal_byte_received = 0i64 + property last_wal_byte_flushed = 0i64 + property last_wal_byte_applied = 0i64 + getter? closed = false + # :nodoc: def initialize(uri : URI | String, @handler, *, @publication_name, @slot_name, blocking : Bool = false) if uri.is_a? String uri = URI.parse(uri) @@ -29,26 +59,37 @@ module PG::Replication end spawn do - loop do + while closed? sleep 10.seconds - send_keepalive + begin + send_keepalive + rescue ex : IO::Error + break if closed? + raise ex + end end end end + # :nodoc: def received(frame : CopyBoth) end + # :nodoc: def received(frame : CopyData) received frame.data end + # Handle the `XLogData` message that wraps `WALMessage`s def received(data : XLogData) - handler.received data.message + @last_wal_byte_received = data.wal_end + handler.received data, self do + handler.received data.message + end end + # :nodoc: def received(keepalive : KeepAlive) - @latest_wal = keepalive.wal_end send_keepalive if keepalive.response_expected? end @@ -59,8 +100,13 @@ module PG::Replication end def close + return if closed? + # We attempt to send off one last keepalive to let the server know where + # we left off. send_keepalive @conn.close + ensure + @closed = true end def send_keepalive @@ -70,21 +116,28 @@ module PG::Replication private def send_keepalive! : Nil CopyData.new( StandbyStatusUpdate.new( - last_wal_byte_received: @latest_wal, - last_wal_byte_flushed: @latest_wal, - last_wal_byte_applied: @latest_wal, + last_wal_byte_received: last_wal_byte_received, + last_wal_byte_flushed: last_wal_byte_flushed, + last_wal_byte_applied: last_wal_byte_applied, ) - ) - .to_io @conn.connection.soc + ).to_io socket - @conn.connection.soc.flush + flush end private def write(&) write_mutex.synchronize { yield } end - getter write_mutex = Mutex.new + private getter write_mutex = Mutex.new + + private def flush + socket.flush + end + + private def socket + @conn.connection.soc + end end abstract struct Frame From ef0eee16249b54ee436bcc84cdd818016a7458a1 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Thu, 24 Jul 2025 20:56:37 -0500 Subject: [PATCH 17/26] Clarify comment --- src/pg/replication.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index bdf6d007..df25a690 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -550,7 +550,7 @@ module PG::Replication end end - # TODO: Uncomment the types below in order to support higher `proto_version`s + # TODO: Implement the types below in order to support higher `proto_version`s # # StreamStart requires proto_version >= 2 # struct StreamStart < WALMessage From d85e11b4df51cf4810ed01000150dd8f2191e040 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Thu, 24 Jul 2025 20:58:26 -0500 Subject: [PATCH 18/26] Fix abstract method signature The method yields, so the abstract method needs to have the `&` there. --- src/pg/replication.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index df25a690..d072391d 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -24,7 +24,7 @@ module PG::Replication # end # end # ``` - abstract def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection) + abstract def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection, &) def received(frame) end From 39a8ac8868f115fbb0bea34273b54ef79e41f3be Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Thu, 24 Jul 2025 22:09:09 -0500 Subject: [PATCH 19/26] Rename variable This was misnamed before. An XLogData *has* a message, but it is not itself a message. --- spec/pg/replication_spec.cr | 6 +++--- src/pg/replication.cr | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index b16b82b8..d44991ce 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -11,10 +11,10 @@ class TestMessageHandler h[k] = {} of Bytes => PG::Replication::WALMessage::TupleData } - def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection, &) + def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &) yield - connection.last_wal_byte_flushed = msg.wal_end - connection.last_wal_byte_applied = msg.wal_end + connection.last_wal_byte_flushed = data.wal_end + connection.last_wal_byte_applied = data.wal_end end def received(msg : PG::Replication::Begin) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index d072391d..9383808f 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -24,7 +24,7 @@ module PG::Replication # end # end # ``` - abstract def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection, &) + abstract def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &) def received(frame) end From c97e1f81033dd0879356c991c502a3213707ff20 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Tue, 19 Aug 2025 17:07:55 -0500 Subject: [PATCH 20/26] Extract replication messages to their own files --- spec/pg/replication_spec.cr | 6 +- src/pg/record.cr | 43 ++ src/pg/replication.cr | 430 +------------------- src/pg/replication/begin.cr | 16 + src/pg/replication/commit.cr | 17 + src/pg/replication/copy_both.cr | 24 ++ src/pg/replication/copy_data.cr | 34 ++ src/pg/replication/delete.cr | 25 ++ src/pg/replication/error_frame.cr | 36 ++ src/pg/replication/frame.cr | 26 ++ src/pg/replication/insert.cr | 18 + src/pg/replication/keep_alive.cr | 19 + src/pg/replication/keep_alive_response.cr | 4 + src/pg/replication/message.cr | 25 ++ src/pg/replication/origin.cr | 13 + src/pg/replication/read.cr | 23 ++ src/pg/replication/relation.cr | 40 ++ src/pg/replication/standby_status_update.cr | 34 ++ src/pg/replication/time_parser.cr | 13 + src/pg/replication/truncate.cr | 26 ++ src/pg/replication/type.cr | 16 + src/pg/replication/update.cr | 36 ++ src/pg/replication/wal_message.cr | 74 ++++ src/pg/replication/x_log_data.cr | 22 + 24 files changed, 595 insertions(+), 425 deletions(-) create mode 100644 src/pg/record.cr create mode 100644 src/pg/replication/begin.cr create mode 100644 src/pg/replication/commit.cr create mode 100644 src/pg/replication/copy_both.cr create mode 100644 src/pg/replication/copy_data.cr create mode 100644 src/pg/replication/delete.cr create mode 100644 src/pg/replication/error_frame.cr create mode 100644 src/pg/replication/frame.cr create mode 100644 src/pg/replication/insert.cr create mode 100644 src/pg/replication/keep_alive.cr create mode 100644 src/pg/replication/keep_alive_response.cr create mode 100644 src/pg/replication/message.cr create mode 100644 src/pg/replication/origin.cr create mode 100644 src/pg/replication/read.cr create mode 100644 src/pg/replication/relation.cr create mode 100644 src/pg/replication/standby_status_update.cr create mode 100644 src/pg/replication/time_parser.cr create mode 100644 src/pg/replication/truncate.cr create mode 100644 src/pg/replication/type.cr create mode 100644 src/pg/replication/update.cr create mode 100644 src/pg/replication/wal_message.cr create mode 100644 src/pg/replication/x_log_data.cr diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index d44991ce..ed3ea930 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -289,8 +289,8 @@ private def it_consumes_wal(name : String, **options, &block : TestMessageHandle ensure subscriber.try &.close PG_DB.exec "SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name LIKE 'test_slot_%'" - PG_DB.query_each "SELECT pubname::text FROM pg_publication_tables WHERE schemaname = 'public' and pubname::text LIKE 'test_publication_%' GROUP BY 1" do |rs| - PG_DB.exec "DROP PUBLICATION IF EXISTS #{rs.read(String)}" + PG_DB.query_each "SELECT DISTINCT pubname::text FROM pg_publication_tables WHERE schemaname = 'public' and pubname::text LIKE 'test_publication_%'" do |rs| + PG_DB.exec "DROP PUBLICATION #{rs.read(String)}" end PG_DB.query_each "SELECT tablename::text FROM pg_tables WHERE schemaname = 'public' and tablename LIKE 'test_table_%'" do |rs| PG_DB.exec "DROP TABLE IF EXISTS #{rs.read(String)}" @@ -314,6 +314,6 @@ private def wait_for(condition = "the block to return truthy", timeout : Time::S if Time.monotonic - start > 2.seconds raise "Timed out waiting for #{condition}" end - sleep 1.millisecond + sleep 5.milliseconds end end diff --git a/src/pg/record.cr b/src/pg/record.cr new file mode 100644 index 00000000..30435f4f --- /dev/null +++ b/src/pg/record.cr @@ -0,0 +1,43 @@ +struct PG::Record(*T) + getter data : T + + def self.read_from(reader : Reader) + new reader.read({{T.map(&.instance)}}) + end + + def initialize(@data) + end + + struct Reader + getter bytes : Bytes + getter size : Int32 + getter connection : Connection + + def initialize(@bytes, @size, @connection) + end + + def read(types : Tuple(*T)) forall T + io = ResultSet::Buffer.new(IO::Memory.new(@bytes), @bytes.size, @connection) + + {% begin %} + { + {% for type in T %} + read({{type}}, io).as({{type.instance}}), + {% end %} + } + {% end %} + end + + private def read(type : T.class, io : IO) : T forall T + oid = io.read_bytes(Int32, IO::ByteFormat::BigEndian) + size = io.read_bytes(Int32, IO::ByteFormat::BigEndian) + Decoders.from_oid(oid).decode(io, size, oid) + end + end +end + +class DB::ResultSet + def read(type : PG::Record.class) + type.read_from read(PG::Record::Reader) + end +end diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 9383808f..a70a8b65 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -1,3 +1,8 @@ +require "./replication/frame" +require "./replication/x_log_data" +require "./replication/copy_data" +require "./replication/standby_status_update" + module PG::Replication module Handler # This method must be defined in order to tell the `Connection` how much of @@ -80,6 +85,9 @@ module PG::Replication received frame.data end + def received(frame : ErrorFrame) + end + # Handle the `XLogData` message that wraps `WALMessage`s def received(data : XLogData) @last_wal_byte_received = data.wal_end @@ -140,416 +148,6 @@ module PG::Replication end end - abstract struct Frame - def self.from_io(io : IO) : self - case byte = io.read_byte - when nil - raise IO::EOFError.new("Connection was unexpectedly terminated") - when 'W' - CopyBoth.new(io) - when 'd' - CopyData.new(io) - else - raise Error.new("Unexpected byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") - end - end - end - - struct CopyBoth < Frame - getter format : Format - getter column_formats : Array(Int16) - - def initialize(io : IO) - size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) - sized = IO::Sized.new(io, size - 4) - @format = Format.new(sized.read_bytes(Int8, IO::ByteFormat::NetworkEndian)) - column_count = sized.read_bytes(Int16, IO::ByteFormat::NetworkEndian) - @column_formats = Array.new(column_count) do - sized.read_bytes(Int16, IO::ByteFormat::NetworkEndian) - end - sized.close - end - - enum Format : Int8 - Text = 0 - Binary = 1 - end - end - - struct CopyData < Frame - getter data : XLogData | KeepAlive | KeepAliveResponse - - def initialize(io : IO) - size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) - case byte = io.read_byte - when Nil - raise IO::EOFError.new("Connection was unexpectedly terminated") - when 'w' - @data = XLogData.new(io) - when 'k' - @data = KeepAlive.new(io) - else - raise Error.new("Unexpected CopyData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") - end - end - - def initialize(@data) - end - - def to_io(io : IO) : Nil - buffer = IO::Memory.new - io << 'd' - payload = IO::Memory.new.tap { |buf| data.to_io buf }.to_slice - io.write_bytes payload.bytesize + 4, IO::ByteFormat::NetworkEndian - io.write payload - end - end - - struct XLogData - getter wal_start : Int64 - getter wal_end : Int64 - getter timestamp : Time - getter message : WALMessage - - def initialize(io : IO) - @wal_start = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) - @wal_end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) - @timestamp = TimeParser.call(io) - @message = WALMessage.new(io) - end - - def to_io(io : IO) : Nil - raise NotImplementedError.new("XLogData messages are meant to be sent by the server, not received by the client") - end - end - - struct KeepAlive - getter wal_end : Int64 - getter timestamp : Time - getter? response_expected : Bool - - def initialize(io : IO) - @wal_end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) - @timestamp = TimeParser.call(io) - @response_expected = io.read_bytes(Int8, IO::ByteFormat::NetworkEndian) == 1 - end - - def to_io(io : IO) : Nil - raise NotImplementedError.new("KeepAlives are intended to be received from the server, not sent to it") - end - end - - abstract struct KeepAliveResponse - end - - struct StandbyStatusUpdate < KeepAliveResponse - getter last_wal_byte_received : Int64 - getter last_wal_byte_flushed : Int64 - getter last_wal_byte_applied : Int64 - getter timestamp : Time - getter? response_expected : Bool - - def initialize( - @last_wal_byte_received, - @last_wal_byte_flushed, - @last_wal_byte_applied, - *, - @timestamp = Time.utc, - @response_expected = false, - ) - end - - def to_io(io : IO) : Nil - io << 'r' - write io, last_wal_byte_received - write io, last_wal_byte_flushed - write io, last_wal_byte_applied - write io, (@timestamp - Time.utc(2000, 1, 1)).total_microseconds.to_i64 - write io, response_expected? ? 1u8 : 0u8 - end - - private def write(io, value) - io.write_bytes value, IO::ByteFormat::NetworkEndian - end - end - - abstract struct WALMessage - def self.new(io : IO) : self - {% if @type != PG::Replication::WALMessage %} - {% raise "Must implement #{@type}#initialize(io : IO)" %} - {% end %} - - case byte = io.read_byte - when Nil - raise IO::EOFError.new("Connection was unexpectedly terminated") - when 'B' - Begin.new(io) - when 'C' - Commit.new(io) - when 'R' - Relation.new(io) - when 'I' - Insert.new(io) - when 'U' - Update.new(io) - when 'D' - Delete.new(io) - when 'T' - Truncate.new(io) - when 'Y' - Type.new(io) - when 'M' - Message.new(io) - when 'O' - Origin.new(io) - else - raise Error.new("Unexpected WAL message byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") - end - end - - protected def read(io : IO, int : Int.class) - io.read_bytes int, IO::ByteFormat::NetworkEndian - end - - protected def read_string(io : IO) : String - io.read_line('\0', chomp: true) - end - - protected def read_bytes(io : IO) : Bytes - bytes = Bytes.new(read(io, Int32)) - io.read_fully bytes - bytes - end - - protected def read_time(io : IO) : Time - TimeParser.call(io) - end - - protected def read_tuple_data(io) : TupleData - column_count = read(io, Int16) - Array.new(column_count) do - case byte = io.read_byte - when Nil - raise IO::EOFError.new("Connection was unexpectedly terminated") - when 'n' - nil - when 'u' - UnchangedTOASTValue.new - when 't' - read_string(io) - when 'b' - read_bytes(io) - else - raise Error.new("Unexpected TupleData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") - end - end - end - - alias TupleData = Array(UnchangedTOASTValue | Bytes | String | Nil) - record UnchangedTOASTValue - end - - struct Begin < WALMessage - getter final_lsn : Int64 - getter timestamp : Time - getter transaction_id : Int32 - - def initialize(io : IO) - @final_lsn = read(io, Int64) - @timestamp = TimeParser.call(io) - @transaction_id = read(io, Int32) - end - end - - struct Message < WALMessage - # Requires proto_version >= 2 - # getter transaction_id : Int32 - getter flags : Flags - getter lsn : Int64 - getter prefix : String - getter content : Bytes - - def initialize(io : IO) - # @transaction_id = read(io, Int32) # Requires proto_version >= 2 - @flags = Flags.new(read(io, Int8)) - @lsn = read(io, Int64) - @prefix = read_string(io) - @content = read_bytes(io) - end - - @[::Flags] - enum Flags : Int8 - Transactional - end - end - - struct Commit < WALMessage - getter flags : Int8 - getter begin_lsn : Int64 - getter end_lsn : Int64 - getter timestamp : Time - - def initialize(io : IO) - @flags = read(io, Int8) - @begin_lsn = read(io, Int64) - @end_lsn = read(io, Int64) - @timestamp = read_time(io) - end - end - - struct Origin < WALMessage - getter lsn : Int64 - getter name : String - - def initialize(io : IO) - @lsn = read(io, Int64) - @name = read_string(io) - end - end - - struct Relation < WALMessage - # Requires proto_version >= 2 - # getter transaction_id : Int32 - getter oid : Int32 - getter namespace : String - getter name : String - getter replica_identity : Int8 - getter columns : Array(Column) - - def initialize(io : IO) - # @transaction_id = read(io, Int32) - @oid = read(io, Int32) - @namespace = read_string(io) - @name = read_string(io) - @replica_identity = read(io, Int8) - column_count = read(io, Int16) - @columns = Array.new(column_count) do - Column.new( - flags: Flags.new(read(io, Int8)), - name: read_string(io), - oid: read(io, Int32), - type_modifier: read(io, Int32), - ) - end - end - - record Column, - flags : Flags, - name : String, - oid : Int32, - type_modifier : Int32 - @[::Flags] - enum Flags - Key = 1 - end - end - - struct Type < WALMessage - # getter transaction_id : Int32 - getter oid : Int32 - getter namespace : String - getter data_type : String - - def initialize(io : IO) - @oid = read(io, Int32) - @namespace = read_string(io) - @data_type = read_string(io) - end - end - - struct Insert < WALMessage - # getter transaction_id : Int32 - getter oid : Int32 - getter tuple_data : TupleData - - def initialize(io : IO) - # Only included in WAL protocol v2+ - # @transaction_id = read(io, Int32) - @oid = read(io, Int32) - # This is an 'N' indicating a new tuple - io.read_byte - @tuple_data = read_tuple_data(io) - end - end - - struct Update < WALMessage - getter oid : Int32 - getter key_tuple_data : TupleData? - getter old_tuple_data : TupleData? - getter new_tuple_data : TupleData - - def initialize(io : IO) - @oid = read(io, Int32) - submessage_type = read(io, UInt8) - case submessage_type - when 'K' - @key_tuple_data = read_tuple_data(io) - when 'O' - @old_tuple_data = read_tuple_data(io) - when 'N' - new_tuple_data = read_tuple_data(io) - end - - # If either 'K' or 'O' were specified above, then the next value is our - # new tuple. Otherwise, that was our new tuple, so we just assign it. - if new_tuple_data - @new_tuple_data = new_tuple_data - else - case byte = read(io, UInt8) - when 'N' - @new_tuple_data = read_tuple_data(io) - else - raise Error.new("Expected new TupleData byte marker, got: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") - end - end - end - end - - struct Delete < WALMessage - # getter transaction_id : Int32 - getter oid : Int32 - getter key_tuple_data : TupleData? - getter old_tuple_data : TupleData? - - def initialize(io : IO) - # @transaction_id = read(io, Int32) # Requires proto_version >= 2 - @oid = read(io, Int32) - case byte = io.read_byte - when nil - raise IO::EOFError.new("Connection was unexpectedly terminated") - when 'K' - @key_tuple_data = read_tuple_data(io) - when 'O' - @old_tuple_data = read_tuple_data(io) - else - raise Error.new("Expected new TupleData byte marker, got: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") - end - end - end - - struct Truncate < WALMessage - # transaction_id requires proto_version >= 2 - # getter transaction_id : Int32 - getter options : Options - getter relation_oids : Array(Int32) - - def initialize(io : IO) - # @transaction_id = read(io, Int32) # Requires proto_version >= 2 - relation_count = read(io, Int32) - @options = Options.new(read(io, Int8)) - @relation_oids = Array.new(relation_count) do - read(io, Int32) - end - end - - @[Flags] - enum Options : Int8 - NONE = 0 - CASCADE = 1 - RESTART_IDENTITY = 2 - end - end - # TODO: Implement the types below in order to support higher `proto_version`s # # StreamStart requires proto_version >= 2 @@ -587,16 +185,4 @@ module PG::Replication # # StreamPrepare requires proto_version >=3 # struct StreamPrepare < WALMessage # end - - private module TimeParser - extend self - - def call(io : IO) : Time - call io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) - end - - def call(microseconds : Int64) - Time.utc(2000, 1, 1) + microseconds.microseconds - end - end end diff --git a/src/pg/replication/begin.cr b/src/pg/replication/begin.cr new file mode 100644 index 00000000..a04ba651 --- /dev/null +++ b/src/pg/replication/begin.cr @@ -0,0 +1,16 @@ +require "./wal_message" +require "./time_parser" + +module PG::Replication + struct Begin < WALMessage + getter final_lsn : Int64 + getter timestamp : Time + getter transaction_id : Int32 + + def initialize(io : IO) + @final_lsn = read(io, Int64) + @timestamp = TimeParser.call(io) + @transaction_id = read(io, Int32) + end + end +end diff --git a/src/pg/replication/commit.cr b/src/pg/replication/commit.cr new file mode 100644 index 00000000..6c78ba81 --- /dev/null +++ b/src/pg/replication/commit.cr @@ -0,0 +1,17 @@ +require "./wal_message" + +module PG::Replication + struct Commit < WALMessage + getter flags : Int8 + getter begin_lsn : Int64 + getter end_lsn : Int64 + getter timestamp : Time + + def initialize(io : IO) + @flags = read(io, Int8) + @begin_lsn = read(io, Int64) + @end_lsn = read(io, Int64) + @timestamp = read_time(io) + end + end +end diff --git a/src/pg/replication/copy_both.cr b/src/pg/replication/copy_both.cr new file mode 100644 index 00000000..63fd8e93 --- /dev/null +++ b/src/pg/replication/copy_both.cr @@ -0,0 +1,24 @@ +require "./frame" + +module PG::Replication + struct CopyBoth < Frame + getter format : Format + getter column_formats : Array(Int16) + + def initialize(io : IO) + size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) + sized = IO::Sized.new(io, size - 4) + @format = Format.new(sized.read_bytes(Int8, IO::ByteFormat::NetworkEndian)) + column_count = sized.read_bytes(Int16, IO::ByteFormat::NetworkEndian) + @column_formats = Array.new(column_count) do + sized.read_bytes(Int16, IO::ByteFormat::NetworkEndian) + end + sized.close + end + + enum Format : Int8 + Text = 0 + Binary = 1 + end + end +end diff --git a/src/pg/replication/copy_data.cr b/src/pg/replication/copy_data.cr new file mode 100644 index 00000000..43548894 --- /dev/null +++ b/src/pg/replication/copy_data.cr @@ -0,0 +1,34 @@ +module PG::Replication + struct CopyData < Frame + getter data : XLogData | KeepAlive | KeepAliveResponse + + def initialize(io : IO) + size = io.read_bytes(Int32, IO::ByteFormat::NetworkEndian) + case byte = io.read_byte + when Nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'w' + @data = XLogData.new(io) + when 'k' + @data = KeepAlive.new(io) + else + raise Error.new("Unexpected CopyData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + + def initialize(@data) + end + + def to_io(io : IO) : Nil + buffer = IO::Memory.new + io << 'd' + payload = IO::Memory.new.tap { |buf| data.to_io buf }.to_slice + io.write_bytes payload.bytesize + 4, IO::ByteFormat::NetworkEndian + io.write payload + end + end +end + +require "./x_log_data" +require "./keep_alive" +require "./keep_alive_response" diff --git a/src/pg/replication/delete.cr b/src/pg/replication/delete.cr new file mode 100644 index 00000000..03a4ec08 --- /dev/null +++ b/src/pg/replication/delete.cr @@ -0,0 +1,25 @@ +require "./wal_message" + +module PG::Replication + struct Delete < WALMessage + # getter transaction_id : Int32 + getter oid : Int32 + getter key_tuple_data : TupleData? + getter old_tuple_data : TupleData? + + def initialize(io : IO) + # @transaction_id = read(io, Int32) # Requires proto_version >= 2 + @oid = read(io, Int32) + case byte = io.read_byte + when nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'K' + @key_tuple_data = read_tuple_data(io) + when 'O' + @old_tuple_data = read_tuple_data(io) + else + raise Error.new("Expected new TupleData byte marker, got: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end +end diff --git a/src/pg/replication/error_frame.cr b/src/pg/replication/error_frame.cr new file mode 100644 index 00000000..a4449b03 --- /dev/null +++ b/src/pg/replication/error_frame.cr @@ -0,0 +1,36 @@ +module PG::Replication + struct ErrorFrame < Frame + getter severity : Severity? + getter message : String? + getter detail : String? + getter hint : String? + getter misc = [] of {Char, String} + + def initialize(io : IO) + size = read(io, Int32) + pp size: size + loop do + case byte = read(io, UInt8) + when 0 then return + when 'S' then @severity = Severity.parse(read_string(io)) + when 'M' then @message = read_string(io) + when 'D' then @detail = read_string(io) + when 'H' then @hint = read_string(io) + else + @misc << {byte.chr, read_string(io)} + end + end + end + end + + enum Severity + LOG + INFO + DEBUG + NOTICE + WARNING + ERROR + FATAL + PANIC + end +end diff --git a/src/pg/replication/frame.cr b/src/pg/replication/frame.cr new file mode 100644 index 00000000..994f145e --- /dev/null +++ b/src/pg/replication/frame.cr @@ -0,0 +1,26 @@ +require "./read" + +module PG::Replication + abstract struct Frame + include Read + + def self.from_io(io : IO) : self + case byte = io.read_byte + when nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'W' + CopyBoth.new(io) + when 'd' + CopyData.new(io) + when 'E' + ErrorFrame.new(io) + else + raise Error.new("Unexpected byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end +end + +require "./copy_both" +require "./copy_data" +require "./error_frame" diff --git a/src/pg/replication/insert.cr b/src/pg/replication/insert.cr new file mode 100644 index 00000000..81b6afd3 --- /dev/null +++ b/src/pg/replication/insert.cr @@ -0,0 +1,18 @@ +require "./wal_message" + +module PG::Replication + struct Insert < WALMessage + # getter transaction_id : Int32 + getter oid : Int32 + getter tuple_data : TupleData + + def initialize(io : IO) + # Only included in WAL protocol v2+ + # @transaction_id = read(io, Int32) + @oid = read(io, Int32) + # This is an 'N' indicating a new tuple + io.read_byte + @tuple_data = read_tuple_data(io) + end + end +end diff --git a/src/pg/replication/keep_alive.cr b/src/pg/replication/keep_alive.cr new file mode 100644 index 00000000..cd3a9025 --- /dev/null +++ b/src/pg/replication/keep_alive.cr @@ -0,0 +1,19 @@ +require "./time_parser" + +module PG::Replication + struct KeepAlive + getter wal_end : Int64 + getter timestamp : Time + getter? response_expected : Bool + + def initialize(io : IO) + @wal_end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @timestamp = TimeParser.call(io) + @response_expected = io.read_bytes(Int8, IO::ByteFormat::NetworkEndian) == 1 + end + + def to_io(io : IO) : Nil + raise NotImplementedError.new("KeepAlives are intended to be received from the server, not sent to it") + end + end +end diff --git a/src/pg/replication/keep_alive_response.cr b/src/pg/replication/keep_alive_response.cr new file mode 100644 index 00000000..a40af1f4 --- /dev/null +++ b/src/pg/replication/keep_alive_response.cr @@ -0,0 +1,4 @@ +module PG::Replication + abstract struct KeepAliveResponse + end +end diff --git a/src/pg/replication/message.cr b/src/pg/replication/message.cr new file mode 100644 index 00000000..25527415 --- /dev/null +++ b/src/pg/replication/message.cr @@ -0,0 +1,25 @@ +require "./wal_message" + +module PG::Replication + struct Message < WALMessage + # Requires proto_version >= 2 + # getter transaction_id : Int32 + getter flags : Flags + getter lsn : Int64 + getter prefix : String + getter content : Bytes + + def initialize(io : IO) + # @transaction_id = read(io, Int32) # Requires proto_version >= 2 + @flags = Flags.new(read(io, Int8)) + @lsn = read(io, Int64) + @prefix = read_string(io) + @content = read_bytes(io) + end + + @[::Flags] + enum Flags : Int8 + Transactional + end + end +end diff --git a/src/pg/replication/origin.cr b/src/pg/replication/origin.cr new file mode 100644 index 00000000..1761ba2a --- /dev/null +++ b/src/pg/replication/origin.cr @@ -0,0 +1,13 @@ +require "./wal_message" + +module PG::Replication + struct Origin < WALMessage + getter lsn : Int64 + getter name : String + + def initialize(io : IO) + @lsn = read(io, Int64) + @name = read_string(io) + end + end +end diff --git a/src/pg/replication/read.cr b/src/pg/replication/read.cr new file mode 100644 index 00000000..216f7b4e --- /dev/null +++ b/src/pg/replication/read.cr @@ -0,0 +1,23 @@ +require "./time_parser" + +module PG::Replication + module Read + protected def read(io : IO, int : Int.class) + io.read_bytes int, IO::ByteFormat::NetworkEndian + end + + protected def read_string(io : IO) : String + io.read_line('\0', chomp: true) + end + + protected def read_bytes(io : IO) : Bytes + bytes = Bytes.new(read(io, Int32)) + io.read_fully bytes + bytes + end + + protected def read_time(io : IO) : Time + TimeParser.call(io) + end + end +end diff --git a/src/pg/replication/relation.cr b/src/pg/replication/relation.cr new file mode 100644 index 00000000..d5d7be8a --- /dev/null +++ b/src/pg/replication/relation.cr @@ -0,0 +1,40 @@ +require "./wal_message" + +module PG::Replication + struct Relation < WALMessage + # Requires proto_version >= 2 + # getter transaction_id : Int32 + getter oid : Int32 + getter namespace : String + getter name : String + getter replica_identity : Int8 + getter columns : Array(Column) + + def initialize(io : IO) + # @transaction_id = read(io, Int32) + @oid = read(io, Int32) + @namespace = read_string(io) + @name = read_string(io) + @replica_identity = read(io, Int8) + column_count = read(io, Int16) + @columns = Array.new(column_count) do + Column.new( + flags: Flags.new(read(io, Int8)), + name: read_string(io), + oid: read(io, Int32), + type_modifier: read(io, Int32), + ) + end + end + + record Column, + flags : Flags, + name : String, + oid : Int32, + type_modifier : Int32 + @[::Flags] + enum Flags + Key = 1 + end + end +end diff --git a/src/pg/replication/standby_status_update.cr b/src/pg/replication/standby_status_update.cr new file mode 100644 index 00000000..b8bc6af2 --- /dev/null +++ b/src/pg/replication/standby_status_update.cr @@ -0,0 +1,34 @@ +require "./keep_alive_response" + +module PG::Replication + struct StandbyStatusUpdate < KeepAliveResponse + getter last_wal_byte_received : Int64 + getter last_wal_byte_flushed : Int64 + getter last_wal_byte_applied : Int64 + getter timestamp : Time + getter? response_expected : Bool + + def initialize( + @last_wal_byte_received, + @last_wal_byte_flushed, + @last_wal_byte_applied, + *, + @timestamp = Time.utc, + @response_expected = false, + ) + end + + def to_io(io : IO) : Nil + io << 'r' + write io, last_wal_byte_received + write io, last_wal_byte_flushed + write io, last_wal_byte_applied + write io, (@timestamp - Time.utc(2000, 1, 1)).total_microseconds.to_i64 + write io, response_expected? ? 1u8 : 0u8 + end + + private def write(io, value) + io.write_bytes value, IO::ByteFormat::NetworkEndian + end + end +end diff --git a/src/pg/replication/time_parser.cr b/src/pg/replication/time_parser.cr new file mode 100644 index 00000000..c0f12efe --- /dev/null +++ b/src/pg/replication/time_parser.cr @@ -0,0 +1,13 @@ +module PG::Replication + private module TimeParser + extend self + + def call(io : IO) : Time + call io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + end + + def call(microseconds : Int64) + Time.utc(2000, 1, 1) + microseconds.microseconds + end + end +end diff --git a/src/pg/replication/truncate.cr b/src/pg/replication/truncate.cr new file mode 100644 index 00000000..cdfb890b --- /dev/null +++ b/src/pg/replication/truncate.cr @@ -0,0 +1,26 @@ +require "./wal_message" + +module PG::Replication + struct Truncate < WALMessage + # transaction_id requires proto_version >= 2 + # getter transaction_id : Int32 + getter options : Options + getter relation_oids : Array(Int32) + + def initialize(io : IO) + # @transaction_id = read(io, Int32) # Requires proto_version >= 2 + relation_count = read(io, Int32) + @options = Options.new(read(io, Int8)) + @relation_oids = Array.new(relation_count) do + read(io, Int32) + end + end + + @[Flags] + enum Options : Int8 + NONE = 0 + CASCADE = 1 + RESTART_IDENTITY = 2 + end + end +end diff --git a/src/pg/replication/type.cr b/src/pg/replication/type.cr new file mode 100644 index 00000000..bc62f2d0 --- /dev/null +++ b/src/pg/replication/type.cr @@ -0,0 +1,16 @@ +require "./wal_message" + +module PG::Replication + struct Type < WALMessage + # getter transaction_id : Int32 + getter oid : Int32 + getter namespace : String + getter data_type : String + + def initialize(io : IO) + @oid = read(io, Int32) + @namespace = read_string(io) + @data_type = read_string(io) + end + end +end diff --git a/src/pg/replication/update.cr b/src/pg/replication/update.cr new file mode 100644 index 00000000..213be7ef --- /dev/null +++ b/src/pg/replication/update.cr @@ -0,0 +1,36 @@ +require "./wal_message" + +module PG::Replication + struct Update < WALMessage + getter oid : Int32 + getter key_tuple_data : TupleData? + getter old_tuple_data : TupleData? + getter new_tuple_data : TupleData + + def initialize(io : IO) + @oid = read(io, Int32) + submessage_type = read(io, UInt8) + case submessage_type + when 'K' + @key_tuple_data = read_tuple_data(io) + when 'O' + @old_tuple_data = read_tuple_data(io) + when 'N' + new_tuple_data = read_tuple_data(io) + end + + # If either 'K' or 'O' were specified above, then the next value is our + # new tuple. Otherwise, that was our new tuple, so we just assign it. + if new_tuple_data + @new_tuple_data = new_tuple_data + else + case byte = read(io, UInt8) + when 'N' + @new_tuple_data = read_tuple_data(io) + else + raise Error.new("Expected new TupleData byte marker, got: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end + end +end diff --git a/src/pg/replication/wal_message.cr b/src/pg/replication/wal_message.cr new file mode 100644 index 00000000..bb1495c8 --- /dev/null +++ b/src/pg/replication/wal_message.cr @@ -0,0 +1,74 @@ +require "./read" + +module PG::Replication + abstract struct WALMessage + include Read + + def self.new(io : IO) : self + {% if @type != PG::Replication::WALMessage %} + {% raise "Must implement #{@type}#initialize(io : IO)" %} + {% end %} + + case byte = io.read_byte + when Nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'B' + Begin.new(io) + when 'C' + Commit.new(io) + when 'R' + Relation.new(io) + when 'I' + Insert.new(io) + when 'U' + Update.new(io) + when 'D' + Delete.new(io) + when 'T' + Truncate.new(io) + when 'Y' + Type.new(io) + when 'M' + Message.new(io) + when 'O' + Origin.new(io) + else + raise Error.new("Unexpected WAL message byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + + protected def read_tuple_data(io) : TupleData + column_count = read(io, Int16) + Array.new(column_count) do + case byte = io.read_byte + when Nil + raise IO::EOFError.new("Connection was unexpectedly terminated") + when 'n' + nil + when 'u' + UnchangedTOASTValue.new + when 't' + read_string(io) + when 'b' + read_bytes(io) + else + raise Error.new("Unexpected TupleData byte marker: 0x#{byte.to_s(16)} (#{byte.chr.inspect})") + end + end + end + + alias TupleData = Array(UnchangedTOASTValue | Bytes | String | Nil) + record UnchangedTOASTValue + end +end + +require "./begin" +require "./commit" +require "./relation" +require "./insert" +require "./update" +require "./delete" +require "./truncate" +require "./type" +require "./message" +require "./origin" diff --git a/src/pg/replication/x_log_data.cr b/src/pg/replication/x_log_data.cr new file mode 100644 index 00000000..57e2e390 --- /dev/null +++ b/src/pg/replication/x_log_data.cr @@ -0,0 +1,22 @@ +require "./wal_message" +require "./time_parser" + +module PG::Replication + struct XLogData + getter wal_start : Int64 + getter wal_end : Int64 + getter timestamp : Time + getter message : WALMessage + + def initialize(io : IO) + @wal_start = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @wal_end = io.read_bytes(Int64, IO::ByteFormat::NetworkEndian) + @timestamp = TimeParser.call(io) + @message = WALMessage.new(io) + end + + def to_io(io : IO) : Nil + raise NotImplementedError.new("XLogData messages are meant to be sent by the server, not received by the client") + end + end +end From 50cdcb54865e1491e5d4a1898e7b803d2d93f14b Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Mon, 27 Oct 2025 00:03:45 -0400 Subject: [PATCH 21/26] Correct doc comments The method signatures were updated, but the comments still referenced the previous method signatures. --- src/pg/replication.cr | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index a70a8b65..c2a4c03f 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -17,20 +17,36 @@ module PG::Replication # class MyHandler # include PG::Replication::Handler # - # @last_wal_byte_flushed = 0i64 - # @last_wal_byte_applied = 0i64 - # - # def received(msg : PG::Replication::XLogData, connection : PG::Replication::Connection, &) + # def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &) # yield - # connection.last_wal_byte_flushed = msg.wal_end - # if msg.data.is_a? PG::Replication::Commit - # connection.last_wal_byte_applied = msg.wal_end + # connection.last_wal_byte_flushed = data.wal_end + # if data.message.is_a? PG::Replication::Commit + # connection.last_wal_byte_applied = data.wal_end # end # end # end # ``` abstract def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &) + # Override this method with any of the `WALMessage` subclasses to handle + # receiving replication messages of that type. + # + # ``` + # class MyHandler + # include PG::Replication::Handler + # + # # Using some hypothetical Kafka client for CDC + # def initialize(@kafka : Kafka::Client) + # end + # + # def received(insert : PG::Replication::Insert) + # @kafka.publisher.publish({ + # oid: insert.oid, + # data: insert.tuple_data, + # }.to_msgpack) + # end + # end + # ``` def received(frame) end end From 766fc62cc9976f0b157eb0528bf88b5c17df57c6 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Fri, 7 Nov 2025 20:07:16 -0500 Subject: [PATCH 22/26] Fix keepalive responder Had the loop condition inverted by mistake. --- src/pg/replication.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index c2a4c03f..20f50284 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -80,7 +80,7 @@ module PG::Replication end spawn do - while closed? + until closed? sleep 10.seconds begin send_keepalive From ff3fd1e27a8177f74da574026f0d6e15a5621ee2 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Thu, 5 Feb 2026 13:48:00 -0500 Subject: [PATCH 23/26] Fix comment --- spec/pg/replication_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/pg/replication_spec.cr b/spec/pg/replication_spec.cr index ed3ea930..9115bf9c 100644 --- a/spec/pg/replication_spec.cr +++ b/spec/pg/replication_spec.cr @@ -84,7 +84,7 @@ class TestMessageHandler truncations << msg end - # TODO: Uncomment the methods below in order to test higher `proto_version`s + # TODO: Implement the methods below in order to test higher `proto_version`s # Requires proto_version >= 2 # def received(msg : PG::Replication::StreamStart) From aad0585ac40d0245fa10099a349abd1c23e91541 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 27 May 2026 14:45:25 -0400 Subject: [PATCH 24/26] Handle disconnections from the server If we get too far behind on processing data from the server, it could disconnect us. This commit handles that disconnection by reconnecting from and picking up where we left off. --- src/pg/connection.cr | 6 +-- src/pg/replication.cr | 97 ++++++++++++++++++++++++++++++++++++------- src/pq/connection.cr | 17 ++++---- 3 files changed, 96 insertions(+), 24 deletions(-) diff --git a/src/pg/connection.cr b/src/pg/connection.cr index 44654331..bc9fad0c 100644 --- a/src/pg/connection.cr +++ b/src/pg/connection.cr @@ -95,11 +95,11 @@ module PG end end - protected def listen_replication(publication_name : String, slot_name : String, blocking : Bool = false, &block : Replication::Frame ->) + protected def listen_replication(publication_name : String, slot_name : String, start_lsn : Int64 = 0i64, blocking : Bool = false, &block : Replication::Frame ->) if blocking - @connection.start_replication_frame_loop(publication_name, slot_name, &block) + @connection.start_replication_frame_loop(publication_name, slot_name, start_lsn, &block) else - spawn { @connection.start_replication_frame_loop(publication_name, slot_name, &block) } + spawn { @connection.start_replication_frame_loop(publication_name, slot_name, start_lsn, &block) } end end diff --git a/src/pg/replication.cr b/src/pg/replication.cr index 20f50284..d05d653b 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -55,6 +55,7 @@ module PG::Replication getter handler : Handler getter publication_name : String getter slot_name : String + @uri : URI getter last_wal_byte_received = 0i64 property last_wal_byte_flushed = 0i64 property last_wal_byte_applied = 0i64 @@ -70,28 +71,92 @@ module PG::Replication query_params = uri.query_params query_params["replication"] = "database" uri.query_params = query_params - @conn = DB.connect(uri).as(PG::Connection) - @conn.listen_replication( - publication_name: publication_name, - slot_name: slot_name, - blocking: blocking, - ) do |frame| - received frame - end + @uri = uri + + @conn = DB.connect(@uri).as(PG::Connection) + # Periodically tell the server how far we've gotten so it can release WAL + # segments we've already processed and keep the connection alive. spawn do until closed? sleep 10.seconds begin send_keepalive - rescue ex : IO::Error - break if closed? - raise ex + rescue IO::Error + # The connection dropped out from under us. `run_loop` handles + # reconnection, so just skip this round. + end + end + end + + if blocking + run_loop + else + spawn { run_loop } + end + end + + # Consume the replication stream, reconnecting (and resuming from the last + # flushed WAL position) whenever the connection drops, until `#close` is + # called. + private def run_loop + backoff = base_reconnect_backoff + until closed? + begin + @conn.listen_replication( + publication_name: publication_name, + slot_name: slot_name, + start_lsn: last_wal_byte_flushed, + blocking: true, + ) do |frame| + received frame + end + # `listen_replication` only returns when the socket is closed. If we + # didn't close it ourselves, treat it as a drop and reconnect. + break if closed? + rescue ex : IO::Error + break if closed? + Log.warn(exception: ex) { "Replication connection lost; reconnecting" } + rescue ex + break if closed? + Log.error(exception: ex) { "Unexpected error in replication loop; reconnecting" } + end + + # The connection dropped. Re-establish it, backing off if the server + # isn't ready yet. + until closed? + sleep backoff + break if closed? + begin + reconnect + backoff = base_reconnect_backoff + break + rescue ex + backoff = {backoff * 2, max_reconnect_backoff}.min + Log.error(exception: ex) { "Failed to reconnect replication; retrying in #{backoff}" } end end end end + private def reconnect + write_mutex.synchronize do + begin + @conn.close + rescue + end + @conn = DB.connect(@uri).as(PG::Connection) + end + end + + private def base_reconnect_backoff + 1.second + end + + private def max_reconnect_backoff + 30.seconds + end + # :nodoc: def received(frame : CopyBoth) end @@ -125,12 +190,16 @@ module PG::Replication def close return if closed? + # Flag the shutdown first so `run_loop` treats the imminent socket close + # as deliberate rather than a dropped connection to reconnect. + @closed = true # We attempt to send off one last keepalive to let the server know where # we left off. - send_keepalive + begin + send_keepalive + rescue IO::Error + end @conn.close - ensure - @closed = true end def send_keepalive diff --git a/src/pq/connection.cr b/src/pq/connection.cr index 3d6ca425..a43b634a 100644 --- a/src/pq/connection.cr +++ b/src/pq/connection.cr @@ -191,16 +191,19 @@ module PQ end end - def start_replication_frame_loop(publication_name : String, slot_name : String, &block : PG::Replication::Frame ->) - command = "START_REPLICATION SLOT #{slot_name} LOGICAL 0/0 (proto_version '1', binary 'true', publication_names '#{publication_name}')" + def start_replication_frame_loop(publication_name : String, slot_name : String, start_lsn : Int64 = 0i64, &block : PG::Replication::Frame ->) + lsn = "%X/%X" % {start_lsn >> 32, start_lsn & 0xFFFF_FFFF} + command = "START_REPLICATION SLOT #{slot_name} LOGICAL #{lsn} (proto_version '1', binary 'true', publication_names '#{publication_name}')" send_query_message command loop do break if soc.closed? - block.call PG::Replication::Frame.from_io(soc) - rescue e : IO::Error - raise e unless soc.closed? - rescue e - Log.error(exception: e) { } + begin + block.call PG::Replication::Frame.from_io(soc) + rescue e : IO::Error + soc.closed? ? break : raise e + rescue e + Log.error(exception: e) { } + end end end From de7771ffc65bb2ad39777903ebd98a8127abc887 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 27 May 2026 22:36:50 -0400 Subject: [PATCH 25/26] Make keepalive backstop interval configurable It was hardcoded to 10 seconds before because that is quite comfortably lower than the Postgres default `wal_sender_timeout` of 60 seconds, but some Postgres installations configure that setting to lower than that. For example, EDB's `cloudnative-pg` sets it to 5 seconds. If we fall behind on the WAL stream and don't process the `KeepAlive` message from the server within that time, Postgres will disconnect us. Making this configurable allows apps to receive some setting to set it to whatever is needed. For example, with `cloudnative-pg`, you'd set it to 2-3 seconds. --- src/pg.cr | 4 ++-- src/pg/replication.cr | 17 +++++++++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/pg.cr b/src/pg.cr index 812d4621..78ef47e9 100644 --- a/src/pg.cr +++ b/src/pg.cr @@ -31,8 +31,8 @@ module PG ListenConnection.new(url, channels, blocking, &blk) end - def self.connect_replication(url, *, handler, publication_name, slot_name) - Replication::Connection.new(url, handler, publication_name: publication_name, slot_name: slot_name) + def self.connect_replication(url, *, handler, publication_name, slot_name, keepalive_interval : Time::Span = 10.seconds) + Replication::Connection.new(url, handler, publication_name: publication_name, slot_name: slot_name, keepalive_interval: keepalive_interval) end class ListenConnection diff --git a/src/pg/replication.cr b/src/pg/replication.cr index d05d653b..a2de446f 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -56,13 +56,26 @@ module PG::Replication getter publication_name : String getter slot_name : String @uri : URI + # How often the backstop fiber sends a standby status update to keep the + # server from tripping its `wal_sender_timeout`. Ideally set to less than + # half of that timeout to avoid being disconnected by the Postgres server. + # Defaults to 10 seconds. + getter keepalive_interval : Time::Span getter last_wal_byte_received = 0i64 property last_wal_byte_flushed = 0i64 property last_wal_byte_applied = 0i64 getter? closed = false # :nodoc: - def initialize(uri : URI | String, @handler, *, @publication_name, @slot_name, blocking : Bool = false) + def initialize( + uri : URI | String, + @handler, + *, + @publication_name, + @slot_name, + @keepalive_interval = 10.seconds, + blocking : Bool = false + ) if uri.is_a? String uri = URI.parse(uri) else @@ -79,7 +92,7 @@ module PG::Replication # segments we've already processed and keep the connection alive. spawn do until closed? - sleep 10.seconds + sleep keepalive_interval begin send_keepalive rescue IO::Error From 57ebb1eadf00a033280dddd78468944856621964 Mon Sep 17 00:00:00 2001 From: Jamie Gaskins Date: Wed, 27 May 2026 22:48:47 -0400 Subject: [PATCH 26/26] Fix formatting --- src/pg/replication.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pg/replication.cr b/src/pg/replication.cr index a2de446f..5b543257 100644 --- a/src/pg/replication.cr +++ b/src/pg/replication.cr @@ -74,7 +74,7 @@ module PG::Replication @publication_name, @slot_name, @keepalive_interval = 10.seconds, - blocking : Bool = false + blocking : Bool = false, ) if uri.is_a? String uri = URI.parse(uri)