Skip to content

Commit 352da65

Browse files
Further optimize Bunny::Channel#basic_publish_batch
From 14% to 23% depending on the message size. 1 kib and 4 kiB workloads benefit the most.
1 parent fbcf3b0 commit 352da65

5 files changed

Lines changed: 124 additions & 11 deletions

File tree

ChangeLog.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ GitHub issue: [#700](https://github.com/ruby-amqp/bunny/issues/700).
7171

7272
### Significant Publisher Performance Improvements
7373

74-
**Performance** (100K messages, with [amq-protocol `2.4.0`](https://github.com/ruby-amqp/amq-protocol/releases/tag/v2.4.0) or later)
74+
Publisher performance improvements (100K messages, with [amq-protocol `2.4.0`](https://github.com/ruby-amqp/amq-protocol/releases/tag/v2.4.0) or later)
7575
with automatic publisher confirm tracking enabled (documented below):
7676

7777
| Approach | Throughput | vs 2.x confirms |
@@ -90,6 +90,9 @@ connection flow control on the RabbitMQ end).
9090
To migrate from `2.x`, simply replace `Channel#confirm_select` calls with with `Channel#confirm_select(tracking: true)`.
9191
That's it.
9292

93+
In addition, `Bunny::Channel#basic_publish_batch` benefits further from the write hot path optimizations
94+
that do not benefit `Bunny::Channel#basic_publish` much.
95+
9396
### Publisher Confirm Tracking
9497

9598
Bunny now supports [publisher confirm](https://www.rabbitmq.com/docs/publishers#data-safety)

benchmarks/throughput.rb

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#!/usr/bin/env ruby
2+
3+
require "bunny"
4+
5+
MESSAGE_SIZES = [
6+
["12 B", 12, 100_000],
7+
["1 KiB", 1_024, 100_000],
8+
["4 KiB", 4_096, 100_000],
9+
["16 KiB", 16_384, 100_000],
10+
]
11+
12+
PREFETCH = 500
13+
MULTI_ACK = 100
14+
BATCH_SIZE = 500
15+
16+
puts
17+
puts "-" * 72
18+
puts "Bunny #{Bunny::VERSION} on #{RUBY_DESCRIPTION}"
19+
puts "-" * 72
20+
21+
def run_benchmark(label, size, count, use_batch: false)
22+
payload = "x" * size
23+
24+
pub_conn = Bunny.new
25+
pub_conn.start
26+
con_conn = Bunny.new
27+
con_conn.start
28+
29+
pub_ch = pub_conn.create_channel
30+
con_ch = con_conn.create_channel
31+
con_ch.prefetch(PREFETCH)
32+
33+
q = con_ch.queue("bunny.bench", auto_delete: true)
34+
q.purge
35+
36+
received = 0
37+
done = Queue.new
38+
39+
q.subscribe(manual_ack: true) do |delivery_info, _properties, _body|
40+
received += 1
41+
if (received % MULTI_ACK).zero?
42+
con_ch.basic_ack(delivery_info.delivery_tag, true)
43+
end
44+
done.push(true) if received == count
45+
end
46+
47+
t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC)
48+
49+
if use_batch
50+
payloads = Array.new(BATCH_SIZE, payload)
51+
(count / BATCH_SIZE).times { pub_ch.basic_publish_batch(payloads, "", q.name, persistent: false) }
52+
remainder = count % BATCH_SIZE
53+
if remainder > 0
54+
pub_ch.basic_publish_batch(Array.new(remainder, payload), "", q.name, persistent: false)
55+
end
56+
else
57+
count.times { pub_ch.basic_publish(payload, "", q.name, persistent: false) }
58+
end
59+
done.pop
60+
61+
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0
62+
rate = (count / elapsed).round(0)
63+
mb_sec = (count * size / elapsed / 1_048_576.0).round(1)
64+
65+
pub_conn.close
66+
con_conn.close
67+
68+
[label, count, rate, mb_sec]
69+
end
70+
71+
puts
72+
puts "## basic_publish (per-message)"
73+
puts
74+
75+
results = []
76+
MESSAGE_SIZES.each do |label, size, count|
77+
r = run_benchmark(label, size, count)
78+
results << r
79+
printf "%-12s %6dK msgs %8d msg/sec %8.1f MB/sec\n", r[0], r[1] / 1000, r[2], r[3]
80+
end
81+
82+
puts
83+
puts "## basic_publish_batch (#{BATCH_SIZE} msgs/batch)"
84+
puts
85+
86+
batch_results = []
87+
MESSAGE_SIZES.each do |label, size, count|
88+
r = run_benchmark(label, size, count, use_batch: true)
89+
batch_results << r
90+
printf "%-12s %6dK msgs %8d msg/sec %8.1f MB/sec\n", r[0], r[1] / 1000, r[2], r[3]
91+
end
92+
93+
puts
94+
puts "-" * 72
95+
puts "| Workload | basic_publish | batch (#{BATCH_SIZE}) |"
96+
puts "|----------|--------:|-------:|"
97+
results.zip(batch_results).each do |r, br|
98+
puts "| #{r[1] / 1000}K x #{r[0]} | #{r[2]} msg/sec | #{br[2]} msg/sec |"
99+
end
100+
puts "-" * 72

lib/bunny/channel.rb

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ def open
261261
@connection.open_channel(self)
262262
# clear last channel error
263263
@last_channel_error = nil
264+
@frame_max = @connection.frame_max
264265

265266
@status = :open
266267

@@ -712,7 +713,7 @@ def basic_publish(payload, exchange, routing_key, opts = {})
712713
routing_key,
713714
opts[:mandatory],
714715
false,
715-
@connection.frame_max)
716+
@frame_max)
716717
@connection.send_frameset(frames, self)
717718

718719
wait_for_publish_confirm(seq_no, continuation) if continuation
@@ -781,7 +782,8 @@ def basic_publish_batch(payloads, exchange, routing_key, opts = {})
781782
end
782783
end
783784

784-
# Send all frames (outside the mutex)
785+
# Encode all messages into a single buffer and write once
786+
data = +""
785787
payloads.each do |payload|
786788
frames = AMQ::Protocol::Basic::Publish.encode(@id,
787789
payload,
@@ -790,9 +792,10 @@ def basic_publish_batch(payloads, exchange, routing_key, opts = {})
790792
routing_key,
791793
opts[:mandatory],
792794
false,
793-
@connection.frame_max)
794-
@connection.send_frameset(frames, self)
795+
@frame_max)
796+
frames.each { |frame| data << frame.encode }
795797
end
798+
@connection.send_raw_without_timeout(data, self)
796799

797800
self
798801
end

lib/bunny/session.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,7 +1474,8 @@ def send_frameset_without_timeout(frames, channel)
14741474
# locking. See a note about "single frame" methods in a comment in `send_frameset`. MK.
14751475
channel.synchronize do
14761476
if open?
1477-
frames.each { |frame| self.send_frame_without_timeout(frame, false) }
1477+
data = frames.reduce(+"") { |acc, frame| acc << frame.encode }
1478+
@transport.write_without_timeout(data)
14781479
signal_activity!
14791480
else
14801481
raise ConnectionClosedError.new(frames)
@@ -1490,10 +1491,14 @@ def send_raw_without_timeout(data, channel)
14901491
# If we synchronize on the channel, however, this is both thread safe and pretty fine-grained
14911492
# locking. Note that "single frame" methods do not need this kind of synchronization. MK.
14921493
channel.synchronize do
1493-
@transport.write(data)
1494-
signal_activity!
1494+
if open?
1495+
@transport.write(data)
1496+
signal_activity!
1497+
else
1498+
raise ConnectionClosedError.new("pre-encoded data (#{data.bytesize} bytes)")
1499+
end
14951500
end
1496-
end # send_frameset_without_timeout(frames)
1501+
end # send_raw_without_timeout(data)
14971502

14981503
# @return [String]
14991504
# @api public

lib/bunny/transport.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,10 @@ def write(data)
180180
# Writes data to the socket without timeout checks
181181
def write_without_timeout(data, raise_exceptions = false)
182182
begin
183-
@writes_mutex.synchronize { @socket.write(data) }
184-
@socket.flush
183+
@writes_mutex.synchronize do
184+
@socket.write(data)
185+
@socket.flush
186+
end
185187
rescue SystemCallError, Bunny::ConnectionError, IOError => e
186188
close
187189
raise e if raise_exceptions

0 commit comments

Comments
 (0)