From 315f604cc1900f6dfa39ec2c397750f1aab0791a Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Thu, 24 Feb 2022 21:08:31 -0800 Subject: [PATCH 01/17] feature - add statsd sink Signed-off-by: Elliot Fehr --- shard.lock | 3 +++ shard.yml | 2 ++ spec/amqproxy_spec.cr | 4 ++-- spec/spec_helper.cr | 1 + src/amqproxy.cr | 8 +++++++- src/amqproxy/metrics_client.cr | 32 ++++++++++++++++++++++++++++++++ src/amqproxy/pool.cr | 3 ++- src/amqproxy/server.cr | 8 ++++++-- 8 files changed, 55 insertions(+), 6 deletions(-) create mode 100644 src/amqproxy/metrics_client.cr diff --git a/shard.lock b/shard.lock index 0f59051..c54a78b 100644 --- a/shard.lock +++ b/shard.lock @@ -12,3 +12,6 @@ shards: git: https://github.com/84codes/logger.cr.git version: 1.0.2 + statsd: + git: https://github.com/miketheman/statsd.cr.git + version: 0.3.0 diff --git a/shard.yml b/shard.yml index 2e6c175..69dc6b7 100644 --- a/shard.yml +++ b/shard.yml @@ -13,6 +13,8 @@ dependencies: github: cloudamqp/amq-protocol.cr logger: github: 84codes/logger.cr + statsd: + github: miketheman/statsd.cr development_dependencies: amqp-client: diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index f5f7477..e4a9a68 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -2,7 +2,7 @@ require "./spec_helper" describe AMQProxy::Server do it "keeps connections open" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, Logger::DEBUG) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -22,7 +22,7 @@ describe AMQProxy::Server do end it "can reconnect if upstream closes" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, Logger::DEBUG) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 61ae0b0..4072128 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,6 +1,7 @@ require "spec" require "../src/amqproxy/server" require "../src/amqproxy/version" +require "../src/amqproxy/metrics_client" require "amqp-client" MAYBE_SUDO = ENV.has_key?("NO_SUDO") ? "" : "sudo " diff --git a/src/amqproxy.cr b/src/amqproxy.cr index e0939d0..0a3a735 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -1,5 +1,6 @@ require "./amqproxy/version" require "./amqproxy/server" +require "./amqproxy/metrics_client" require "option_parser" require "uri" require "ini" @@ -41,6 +42,8 @@ class AMQProxy::CLI end def run + statsd_host = "" + statsd_port = 8125 p = OptionParser.parse do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| @@ -54,6 +57,8 @@ class AMQProxy::CLI parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } + parser.on("--statsd-ip=STATSD_IP", "StatsD IP to send metrics to (default disabled)") { |v| statsd_host = v } + parser.on("--statsd-port=STATSD_PORT", "StatsD port to send metrics to (default is 8125)") { |v| statsd_port = v.to_i } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end @@ -71,7 +76,8 @@ class AMQProxy::CLI port = u.port || default_port tls = u.scheme == "amqps" - server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout) + metrics_client = statsd_host.empty? ? DummyMetricsClient.new : StatsdClient.new(statsd_host, statsd_port) + server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout, metrics_client) first_shutdown = true shutdown = ->(_s : Signal) do diff --git a/src/amqproxy/metrics_client.cr b/src/amqproxy/metrics_client.cr new file mode 100644 index 0000000..d9453a2 --- /dev/null +++ b/src/amqproxy/metrics_client.cr @@ -0,0 +1,32 @@ +require "statsd" + +module AMQProxy + abstract class MetricsClient + abstract def increment(metric_name, sample_rate = nil, tags = nil) + abstract def gauge(metric_name, value, tags = nil) + end + + class DummyMetricsClient < MetricsClient + def increment(metric_name, sample_rate = nil, tags = nil) + end + + def gauge(metric_name, value, tags = nil) + end + end + + class StatsdClient < MetricsClient + PREFIX = "amqproxy" + + def initialize(statsd_host : String, statsd_port : Int32) + @client = Statsd::Client.new statsd_host, statsd_port + end + + def increment(metric_name, sample_rate = nil, tags = nil) + @client.increment("#{PREFIX}.#{metric_name}", sample_rate, tags) + end + + def gauge(metric_name, value, tags = nil) + @client.gauge("#{PREFIX}.#{metric_name}", value, tags) + end + end +end diff --git a/src/amqproxy/pool.cr b/src/amqproxy/pool.cr index c054a6a..a86ff98 100644 --- a/src/amqproxy/pool.cr +++ b/src/amqproxy/pool.cr @@ -2,7 +2,7 @@ module AMQProxy class Pool getter :size - def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger, @idle_connection_timeout : Int32) + def initialize(@host : String, @port : Int32, @tls : Bool, @metrics_client : MetricsClient, @log : Logger, @idle_connection_timeout : Int32) @pools = Hash(Tuple(String, String, String), Deque(Upstream)).new do |h, k| h[k] = Deque(Upstream).new end @@ -76,6 +76,7 @@ module AMQProxy end end end + @metrics_client.gauge("connections.upstream.total", @size) end end end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 47d3d3e..b614e95 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -7,7 +7,7 @@ require "./upstream" module AMQProxy class Server - def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO, idle_connection_timeout = 5) + def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO, idle_connection_timeout = 5, @metrics_client : MetricsClient = DummyMetricsClient.new) @log = Logger.new(STDOUT) @log.level = log_level journald = @@ -24,7 +24,7 @@ module AMQProxy io << message end @clients = Array(Client).new - @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log, idle_connection_timeout) + @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @metrics_client, @log, idle_connection_timeout) @log.info "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" end @@ -69,6 +69,7 @@ module AMQProxy active_client(c) do @pool.borrow(user, password, vhost) do |u| # print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams" + @metrics_client.gauge("connections.client.total", client_connections) u.current_client = c c.read_loop(u) ensure @@ -77,11 +78,13 @@ module AMQProxy end rescue ex : Upstream::AccessError @log.error { "Access refused for user '#{user}' to vhost '#{vhost}', reason: #{ex.message}" } + @metrics_client.increment("connections.upstream.error.count", 1, tags: ["error:access_refused"]) close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED - #{ex.message}", 0_u16, 0_u16) close.to_io socket, IO::ByteFormat::NetworkEndian socket.flush rescue ex : Upstream::Error @log.error { "Upstream error for user '#{user}' to vhost '#{vhost}': #{ex.inspect} (cause: #{ex.cause.inspect})" } + @metrics_client.increment("connections.upstream.error.count", 1, tags: ["error:upstream_error"]) close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16) close.to_io socket, IO::ByteFormat::NetworkEndian socket.flush @@ -91,6 +94,7 @@ module AMQProxy ensure @log.debug { "Client disconnected: #{remote_address}" } socket.close rescue nil + @metrics_client.gauge("connections.client.total", client_connections) # print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams" end From 1c17d6b9218eb32173df034092346c56427965f7 Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Fri, 25 Feb 2022 08:02:21 -0800 Subject: [PATCH 02/17] use a DNS name instead of an IP Signed-off-by: Elliot Fehr --- src/amqproxy/metrics_client.cr | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/amqproxy/metrics_client.cr b/src/amqproxy/metrics_client.cr index d9453a2..1b7f9df 100644 --- a/src/amqproxy/metrics_client.cr +++ b/src/amqproxy/metrics_client.cr @@ -1,4 +1,5 @@ require "statsd" +require "socket" module AMQProxy abstract class MetricsClient @@ -18,7 +19,8 @@ module AMQProxy PREFIX = "amqproxy" def initialize(statsd_host : String, statsd_port : Int32) - @client = Statsd::Client.new statsd_host, statsd_port + host = resolve(statsd_host, statsd_port) + @client = Statsd::Client.new(host, statsd_port) end def increment(metric_name, sample_rate = nil, tags = nil) @@ -28,5 +30,15 @@ module AMQProxy def gauge(metric_name, value, tags = nil) @client.gauge("#{PREFIX}.#{metric_name}", value, tags) end + + private def resolve(host : String, port : Int32) + addr = "" + addr_info = Socket::Addrinfo.resolve(host, port, Socket::Family::INET, Socket::Type::STREAM, Socket::Protocol::IP) + addr_info.each { |a| + addr = a.ip_address.to_s.rchop(":#{port}") + break + } + return addr + end end end From fbf1342091f9e6f4064edcc4a2ebb6ba25d2a490 Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 16 Mar 2022 11:43:10 -0700 Subject: [PATCH 03/17] add test for statsd metrics Signed-off-by: Elliot Fehr --- spec/amqproxy_spec.cr | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index e4a9a68..87d81b3 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -1,6 +1,41 @@ require "./spec_helper" +private def with_server(& : UDPSocket ->) + server = UDPSocket.new + server.bind("localhost", 1234) + yield server +ensure + server.close if server +end + describe AMQProxy::Server do + describe "statsd" do + it "sends client connections statsd metrics" do + with_server do |statsd_server| + statsd_client = AMQProxy::StatsdClient.new("localhost", 1234) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, statsd_client, Logger::DEBUG) + begin + spawn { s.listen("127.0.0.1", 5673) } + Fiber.yield + 10.times do + AMQP::Client.start("amqp://localhost:5673") do |conn| + conn.channel + + expected_message = "amqproxy.connections.client.total:1|g" + statsd_server.gets(expected_message.bytesize).should eq expected_message + end + + expected_message = "amqproxy.connections.client.total:0|g" + statsd_server.gets(expected_message.bytesize).should eq expected_message + sleep 0.1 + end + ensure + s.close + end + end + end + end + it "keeps connections open" do s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, Logger::DEBUG) begin From ef69f588a2a83815ccdf8f5ed4d3a5ee239c0616 Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Thu, 17 Mar 2022 13:24:23 -0700 Subject: [PATCH 04/17] fix lint warning Signed-off-by: Elliot Fehr --- src/amqproxy/metrics_client.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/metrics_client.cr b/src/amqproxy/metrics_client.cr index 1b7f9df..2b7445c 100644 --- a/src/amqproxy/metrics_client.cr +++ b/src/amqproxy/metrics_client.cr @@ -38,7 +38,7 @@ module AMQProxy addr = a.ip_address.to_s.rchop(":#{port}") break } - return addr + addr end end end From cc8a9e53486f3083f0eafc6f3742bfcfb73fea11 Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 11 May 2022 12:50:10 -0700 Subject: [PATCH 05/17] bugfix variable type + import Signed-off-by: Elliot Fehr --- src/amqproxy.cr | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/amqproxy.cr b/src/amqproxy.cr index 0a3a735..9d2af65 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -12,6 +12,8 @@ class AMQProxy::CLI @log_level : Logger::Severity = Logger::INFO @idle_connection_timeout = 5 @upstream = ENV["AMQP_URL"]? + @statsd_host = "" + @statsd_port = 8125 def parse_config(path) INI.parse(File.read(path)).each do |name, section| @@ -34,6 +36,14 @@ class AMQProxy::CLI else raise "Unsupported config #{name}/#{key}" end end + when "statsd" + section.each do |key, value| + case key + when "host" then @statsd_host = value + when "port" then @statsd_port = value.to_i + else raise "Unsupported config #{name}/#{key}" + end + end else raise "Unsupported config section #{name}" end end @@ -42,8 +52,6 @@ class AMQProxy::CLI end def run - statsd_host = "" - statsd_port = 8125 p = OptionParser.parse do |parser| parser.banner = "Usage: amqproxy [options] [amqp upstream url]" parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| @@ -57,8 +65,8 @@ class AMQProxy::CLI parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } - parser.on("--statsd-ip=STATSD_IP", "StatsD IP to send metrics to (default disabled)") { |v| statsd_host = v } - parser.on("--statsd-port=STATSD_PORT", "StatsD port to send metrics to (default is 8125)") { |v| statsd_port = v.to_i } + parser.on("--statsd-ip=STATSD_IP", "StatsD IP to send metrics to (default disabled)") { |v| @statsd_host = v } + parser.on("--statsd-port=STATSD_PORT", "StatsD port to send metrics to (default is 8125)") { |v| @statsd_port = v.to_i } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end @@ -76,7 +84,7 @@ class AMQProxy::CLI port = u.port || default_port tls = u.scheme == "amqps" - metrics_client = statsd_host.empty? ? DummyMetricsClient.new : StatsdClient.new(statsd_host, statsd_port) + metrics_client = @statsd_host.empty? ? DummyMetricsClient.new : StatsdClient.new(@statsd_host, @statsd_port) server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout, metrics_client) first_shutdown = true From d7a92db6423f7ce4ae22ea0699ced91bfab35d4b Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 11 May 2022 15:37:30 -0700 Subject: [PATCH 06/17] add stats to calculate rate of connection operations Signed-off-by: Elliot Fehr --- src/amqproxy/pool.cr | 2 ++ src/amqproxy/server.cr | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/amqproxy/pool.cr b/src/amqproxy/pool.cr index a86ff98..f477e45 100644 --- a/src/amqproxy/pool.cr +++ b/src/amqproxy/pool.cr @@ -17,6 +17,7 @@ module AMQProxy if c.nil? || c.closed? @size += 1 c = Upstream.new(@host, @port, @tls, @log).connect(user, password, vhost) + @metrics_client.increment("connections.upstream.created", 1) end c end @@ -64,6 +65,7 @@ module AMQProxy @size -= 1 begin u.close "Pooled connection closed due to inactivity" + @metrics_client.increment("connections.upstream.closed", 1) rescue ex @log.error "Problem closing upstream: #{ex.inspect}" end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index b614e95..d572e87 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -70,6 +70,7 @@ module AMQProxy @pool.borrow(user, password, vhost) do |u| # print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams" @metrics_client.gauge("connections.client.total", client_connections) + @metrics_client.increment("connections.client.created", 1) u.current_client = c c.read_loop(u) ensure @@ -95,6 +96,7 @@ module AMQProxy @log.debug { "Client disconnected: #{remote_address}" } socket.close rescue nil @metrics_client.gauge("connections.client.total", client_connections) + @metrics_client.increment("connections.client.disconnected", 1) # print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams" end From 4d8c0d8e5939a94660f87e67ce8cfd1d07f76b16 Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 11 May 2022 17:09:16 -0700 Subject: [PATCH 07/17] fix statsd tests Signed-off-by: Elliot Fehr --- spec/amqproxy_spec.cr | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index 87d81b3..c4f78ed 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -17,16 +17,31 @@ describe AMQProxy::Server do begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield + i = 0 10.times do + i = i + 1 AMQP::Client.start("amqp://localhost:5673") do |conn| conn.channel + # This is also testing that connection pooling is working + if i == 1 + expected_message = "amqproxy.connections.upstream.created:1|c|@1" + statsd_server.gets(expected_message.bytesize).should eq expected_message + end + expected_message = "amqproxy.connections.client.total:1|g" statsd_server.gets(expected_message.bytesize).should eq expected_message + + expected_message = "amqproxy.connections.client.created:1|c|@1" + statsd_server.gets(expected_message.bytesize).should eq expected_message end expected_message = "amqproxy.connections.client.total:0|g" statsd_server.gets(expected_message.bytesize).should eq expected_message + + expected_message = "amqproxy.connections.client.disconnected:1|c|@1" + statsd_server.gets(expected_message.bytesize).should eq expected_message + sleep 0.1 end ensure From ab38f8944052a177abd1d5fc6673601cd5318e1a Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 1 Jun 2022 10:25:37 -0700 Subject: [PATCH 08/17] init logger outside of server and print statsd sink on startup Signed-off-by: Elliot Fehr --- src/amqproxy.cr | 1 - src/amqproxy/metrics_client.cr | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amqproxy.cr b/src/amqproxy.cr index 9d2af65..ac8a60b 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -86,7 +86,6 @@ class AMQProxy::CLI metrics_client = @statsd_host.empty? ? DummyMetricsClient.new : StatsdClient.new(@statsd_host, @statsd_port) server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout, metrics_client) - first_shutdown = true shutdown = ->(_s : Signal) do if first_shutdown diff --git a/src/amqproxy/metrics_client.cr b/src/amqproxy/metrics_client.cr index 2b7445c..6cddb11 100644 --- a/src/amqproxy/metrics_client.cr +++ b/src/amqproxy/metrics_client.cr @@ -18,9 +18,10 @@ module AMQProxy class StatsdClient < MetricsClient PREFIX = "amqproxy" - def initialize(statsd_host : String, statsd_port : Int32) + def initialize(log : Logger, statsd_host : String, statsd_port : Int32) host = resolve(statsd_host, statsd_port) @client = Statsd::Client.new(host, statsd_port) + log.info "Statsd sink configured at: #{statsd_host}:#{statsd_port}" end def increment(metric_name, sample_rate = nil, tags = nil) From 1940c766f2fbcd66b8a19438c2eef6cc92f541db Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 1 Jun 2022 10:32:52 -0700 Subject: [PATCH 09/17] fixing tests Signed-off-by: Elliot Fehr --- spec/amqproxy_spec.cr | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index c4f78ed..da2d1b8 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -12,8 +12,10 @@ describe AMQProxy::Server do describe "statsd" do it "sends client connections statsd metrics" do with_server do |statsd_server| - statsd_client = AMQProxy::StatsdClient.new("localhost", 1234) - s = AMQProxy::Server.new("127.0.0.1", 5672, false, statsd_client, Logger::DEBUG) + logger = Logger.new(STDOUT) + logger.level = Logger::DEBUG + statsd_client = AMQProxy::StatsdClient.new(logger, "localhost", 1234) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, statsd_client, logger) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -52,7 +54,9 @@ describe AMQProxy::Server do end it "keeps connections open" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, Logger::DEBUG) + logger = Logger.new(STDOUT) + logger.level = Logger::DEBUG + s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, logger) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -72,7 +76,9 @@ describe AMQProxy::Server do end it "can reconnect if upstream closes" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, Logger::DEBUG) + logger = Logger.new(STDOUT) + logger.level = Logger::DEBUG + s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, logger) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield From 722255ed084062f5c0fc93c8fceac1b1277d5aa0 Mon Sep 17 00:00:00 2001 From: Elliot Fehr Date: Wed, 8 Jun 2022 09:09:50 -0700 Subject: [PATCH 10/17] default value for metrics client + move to last argument Signed-off-by: Elliot Fehr --- src/amqproxy/pool.cr | 2 +- src/amqproxy/server.cr | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amqproxy/pool.cr b/src/amqproxy/pool.cr index f477e45..97e0597 100644 --- a/src/amqproxy/pool.cr +++ b/src/amqproxy/pool.cr @@ -2,7 +2,7 @@ module AMQProxy class Pool getter :size - def initialize(@host : String, @port : Int32, @tls : Bool, @metrics_client : MetricsClient, @log : Logger, @idle_connection_timeout : Int32) + def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger, @idle_connection_timeout : Int32, @metrics_client : MetricsClient = DummyMetricsClient.new) @pools = Hash(Tuple(String, String, String), Deque(Upstream)).new do |h, k| h[k] = Deque(Upstream).new end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index d572e87..93748fd 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -24,7 +24,7 @@ module AMQProxy io << message end @clients = Array(Client).new - @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @metrics_client, @log, idle_connection_timeout) + @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log, idle_connection_timeout, @metrics_client) @log.info "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" end From efc18c35d3656c8253b5d01c2ce548f186661f2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 2 Aug 2022 19:17:02 +0200 Subject: [PATCH 11/17] updated statsd dependency --- shard.lock | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/shard.lock b/shard.lock index c54a78b..f54eef5 100644 --- a/shard.lock +++ b/shard.lock @@ -14,4 +14,5 @@ shards: statsd: git: https://github.com/miketheman/statsd.cr.git - version: 0.3.0 + version: 0.5.0 + From b0f743cd2b155162b7ae601c5b1920acef67bb7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 2 Aug 2022 19:27:28 +0200 Subject: [PATCH 12/17] rm logger from metrics client --- src/amqproxy/metrics_client.cr | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/amqproxy/metrics_client.cr b/src/amqproxy/metrics_client.cr index 6cddb11..2b7445c 100644 --- a/src/amqproxy/metrics_client.cr +++ b/src/amqproxy/metrics_client.cr @@ -18,10 +18,9 @@ module AMQProxy class StatsdClient < MetricsClient PREFIX = "amqproxy" - def initialize(log : Logger, statsd_host : String, statsd_port : Int32) + def initialize(statsd_host : String, statsd_port : Int32) host = resolve(statsd_host, statsd_port) @client = Statsd::Client.new(host, statsd_port) - log.info "Statsd sink configured at: #{statsd_host}:#{statsd_port}" end def increment(metric_name, sample_rate = nil, tags = nil) From 73b52e59ea7da180543b87531cf801be248752af Mon Sep 17 00:00:00 2001 From: Gabe Williamson Date: Thu, 10 Aug 2023 15:39:53 -0500 Subject: [PATCH 13/17] Default to localhost to trigger a ci build --- src/amqproxy.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy.cr b/src/amqproxy.cr index 47eb42a..f32c751 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -12,7 +12,7 @@ class AMQProxy::CLI @log_level : Logger::Severity = Logger::INFO @idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i @upstream = ENV["AMQP_URL"]? - @statsd_host = "" + @statsd_host = "localhost" @statsd_port = 8125 def parse_config(path) From 5b90b85821648a046540b9394d61d542ecc0c35f Mon Sep 17 00:00:00 2001 From: Gabe Williamson Date: Thu, 10 Aug 2023 15:41:45 -0500 Subject: [PATCH 14/17] flip to an environment variable --- src/amqproxy.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy.cr b/src/amqproxy.cr index f32c751..0793f11 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -12,7 +12,7 @@ class AMQProxy::CLI @log_level : Logger::Severity = Logger::INFO @idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i @upstream = ENV["AMQP_URL"]? - @statsd_host = "localhost" + @statsd_host = ENV["STATSD_HOST"]? @statsd_port = 8125 def parse_config(path) From a725986d8a46401754a4c930891746990e8f1d47 Mon Sep 17 00:00:00 2001 From: Gabe Williamson Date: Thu, 10 Aug 2023 15:49:48 -0500 Subject: [PATCH 15/17] fix the failing test --- spec/amqproxy_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index da2d1b8..3afa7d4 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -14,7 +14,7 @@ describe AMQProxy::Server do with_server do |statsd_server| logger = Logger.new(STDOUT) logger.level = Logger::DEBUG - statsd_client = AMQProxy::StatsdClient.new(logger, "localhost", 1234) + statsd_client = AMQProxy::StatsdClient.new("localhost", 1234) s = AMQProxy::Server.new("127.0.0.1", 5672, false, statsd_client, logger) begin spawn { s.listen("127.0.0.1", 5673) } From f9670ffb9dc9ee5013f7146b6607de69c6b86df0 Mon Sep 17 00:00:00 2001 From: Gabe Williamson Date: Thu, 10 Aug 2023 16:02:00 -0500 Subject: [PATCH 16/17] fix dumb merge --- src/amqproxy.cr | 2 +- src/amqproxy/pool.cr | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/amqproxy.cr b/src/amqproxy.cr index 0793f11..961661f 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -12,7 +12,7 @@ class AMQProxy::CLI @log_level : Logger::Severity = Logger::INFO @idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i @upstream = ENV["AMQP_URL"]? - @statsd_host = ENV["STATSD_HOST"]? + @statsd_host = ENV["STATSD_HOST"]? || "" @statsd_port = 8125 def parse_config(path) diff --git a/src/amqproxy/pool.cr b/src/amqproxy/pool.cr index 86b418c..e24eff5 100644 --- a/src/amqproxy/pool.cr +++ b/src/amqproxy/pool.cr @@ -5,13 +5,13 @@ module AMQProxy getter :size @tls_ctx : OpenSSL::SSL::Context::Client? - def initialize(@host : String, @port : Int32, @tls : Bool, @log : Logger, @idle_connection_timeout : Int32, @metrics_client : MetricsClient = DummyMetricsClient.new) + def initialize(@host : String, @port : Int32, tls : Bool, @log : Logger, @idle_connection_timeout : Int32, @metrics_client : MetricsClient = DummyMetricsClient.new) @pools = Hash(Tuple(String, String, String), Deque(Upstream)).new do |h, k| h[k] = Deque(Upstream).new end @lock = Mutex.new @size = 0 - @tls_ctx = OpenSSL::SSL::Context::Client.new if @tls + @tls_ctx = OpenSSL::SSL::Context::Client.new if tls spawn shrink_pool_loop, name: "shrink pool loop" end @@ -21,7 +21,7 @@ module AMQProxy if c.nil? || c.closed? c = Upstream.new(@host, @port, @tls_ctx, @log).connect(user, password, vhost) @size += 1 - c = Upstream.new(@host, @port, @tls, @log).connect(user, password, vhost) + c = Upstream.new(@host, @port, @tls_ctx, @log).connect(user, password, vhost) @metrics_client.increment("connections.upstream.created", 1) end c.current_client = client From 70d7f89a89ed694c2aade4b1c64f54689ddbe178 Mon Sep 17 00:00:00 2001 From: Gabe Williamson Date: Thu, 10 Aug 2023 16:27:30 -0500 Subject: [PATCH 17/17] finish fixing the weirdly merged tests? --- spec/amqproxy_spec.cr | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index 3afa7d4..877312a 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -15,7 +15,7 @@ describe AMQProxy::Server do logger = Logger.new(STDOUT) logger.level = Logger::DEBUG statsd_client = AMQProxy::StatsdClient.new("localhost", 1234) - s = AMQProxy::Server.new("127.0.0.1", 5672, false, statsd_client, logger) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, statsd_client) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -47,7 +47,7 @@ describe AMQProxy::Server do sleep 0.1 end ensure - s.close + s.stop_accepting_clients end end end @@ -56,7 +56,8 @@ describe AMQProxy::Server do it "keeps connections open" do logger = Logger.new(STDOUT) logger.level = Logger::DEBUG - s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, logger) + + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -78,7 +79,7 @@ describe AMQProxy::Server do it "can reconnect if upstream closes" do logger = Logger.new(STDOUT) logger.level = Logger::DEBUG - s = AMQProxy::Server.new("127.0.0.1", 5672, false, AMQProxy::DummyMetricsClient.new, logger) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -102,7 +103,7 @@ describe AMQProxy::Server do it "responds to upstream heartbeats" do system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true - s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield