diff --git a/shard.lock b/shard.lock index a40fbef..7e50842 100644 --- a/shard.lock +++ b/shard.lock @@ -16,3 +16,7 @@ shards: git: https://github.com/84codes/logger.cr.git version: 1.0.2 + statsd: + git: https://github.com/miketheman/statsd.cr.git + version: 0.5.0 + diff --git a/shard.yml b/shard.yml index 0114066..de6c715 100644 --- a/shard.yml +++ b/shard.yml @@ -13,6 +13,8 @@ dependencies: github: cloudamqp/amq-protocol.cr logger: github: 84codes/logger.cr + statsd: + github: miketheman/statsd.cr development_dependencies: amqp-client: diff --git a/spec/amqproxy_spec.cr b/spec/amqproxy_spec.cr index f5f7477..877312a 100644 --- a/spec/amqproxy_spec.cr +++ b/spec/amqproxy_spec.cr @@ -1,8 +1,63 @@ require "./spec_helper" +private def with_server(& : UDPSocket ->) + server = UDPSocket.new + server.bind("localhost", 1234) + yield server +ensure + server.close if server +end + describe AMQProxy::Server do + describe "statsd" do + it "sends client connections statsd metrics" do + with_server do |statsd_server| + logger = Logger.new(STDOUT) + logger.level = Logger::DEBUG + statsd_client = AMQProxy::StatsdClient.new("localhost", 1234) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, statsd_client) + begin + spawn { s.listen("127.0.0.1", 5673) } + Fiber.yield + i = 0 + 10.times do + i = i + 1 + AMQP::Client.start("amqp://localhost:5673") do |conn| + conn.channel + + # This is also testing that connection pooling is working + if i == 1 + expected_message = "amqproxy.connections.upstream.created:1|c|@1" + statsd_server.gets(expected_message.bytesize).should eq expected_message + end + + expected_message = "amqproxy.connections.client.total:1|g" + statsd_server.gets(expected_message.bytesize).should eq expected_message + + expected_message = "amqproxy.connections.client.created:1|c|@1" + statsd_server.gets(expected_message.bytesize).should eq expected_message + end + + expected_message = "amqproxy.connections.client.total:0|g" + statsd_server.gets(expected_message.bytesize).should eq expected_message + + expected_message = "amqproxy.connections.client.disconnected:1|c|@1" + statsd_server.gets(expected_message.bytesize).should eq expected_message + + sleep 0.1 + end + ensure + s.stop_accepting_clients + end + end + end + end + it "keeps connections open" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG) + logger = Logger.new(STDOUT) + logger.level = Logger::DEBUG + + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -22,7 +77,9 @@ describe AMQProxy::Server do end it "can reconnect if upstream closes" do - s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG) + logger = Logger.new(STDOUT) + logger.level = Logger::DEBUG + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield @@ -46,7 +103,7 @@ describe AMQProxy::Server do it "responds to upstream heartbeats" do system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true - s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG) + s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new) begin spawn { s.listen("127.0.0.1", 5673) } Fiber.yield diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 924f505..ba6d964 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -1,6 +1,7 @@ require "spec" require "../src/amqproxy/server" require "../src/amqproxy/version" +require "../src/amqproxy/metrics_client" require "amqp-client" MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo " diff --git a/src/amqproxy.cr b/src/amqproxy.cr index ee113ad..961661f 100644 --- a/src/amqproxy.cr +++ b/src/amqproxy.cr @@ -1,5 +1,6 @@ require "./amqproxy/version" require "./amqproxy/server" +require "./amqproxy/metrics_client" require "option_parser" require "uri" require "ini" @@ -11,6 +12,8 @@ class AMQProxy::CLI @log_level : Logger::Severity = Logger::INFO @idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i @upstream = ENV["AMQP_URL"]? + @statsd_host = ENV["STATSD_HOST"]? || "" + @statsd_port = 8125 def parse_config(path) INI.parse(File.read(path)).each do |name, section| @@ -33,6 +36,14 @@ class AMQProxy::CLI else raise "Unsupported config #{name}/#{key}" end end + when "statsd" + section.each do |key, value| + case key + when "host" then @statsd_host = value + when "port" then @statsd_port = value.to_i + else raise "Unsupported config #{name}/#{key}" + end + end else raise "Unsupported config section #{name}" end end @@ -54,6 +65,8 @@ class AMQProxy::CLI parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } + parser.on("--statsd-ip=STATSD_IP", "StatsD IP to send metrics to (default disabled)") { |v| @statsd_host = v } + parser.on("--statsd-port=STATSD_PORT", "StatsD port to send metrics to (default is 8125)") { |v| @statsd_port = v.to_i } parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } end @@ -71,8 +84,8 @@ class AMQProxy::CLI port = u.port || default_port tls = u.scheme == "amqps" - server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout) - + metrics_client = @statsd_host.empty? ? DummyMetricsClient.new : StatsdClient.new(@statsd_host, @statsd_port) + server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout, metrics_client) first_shutdown = true shutdown = ->(_s : Signal) do if first_shutdown diff --git a/src/amqproxy/metrics_client.cr b/src/amqproxy/metrics_client.cr new file mode 100644 index 0000000..2b7445c --- /dev/null +++ b/src/amqproxy/metrics_client.cr @@ -0,0 +1,44 @@ +require "statsd" +require "socket" + +module AMQProxy + abstract class MetricsClient + abstract def increment(metric_name, sample_rate = nil, tags = nil) + abstract def gauge(metric_name, value, tags = nil) + end + + class DummyMetricsClient < MetricsClient + def increment(metric_name, sample_rate = nil, tags = nil) + end + + def gauge(metric_name, value, tags = nil) + end + end + + class StatsdClient < MetricsClient + PREFIX = "amqproxy" + + def initialize(statsd_host : String, statsd_port : Int32) + host = resolve(statsd_host, statsd_port) + @client = Statsd::Client.new(host, statsd_port) + end + + def increment(metric_name, sample_rate = nil, tags = nil) + @client.increment("#{PREFIX}.#{metric_name}", sample_rate, tags) + end + + def gauge(metric_name, value, tags = nil) + @client.gauge("#{PREFIX}.#{metric_name}", value, tags) + end + + private def resolve(host : String, port : Int32) + addr = "" + addr_info = Socket::Addrinfo.resolve(host, port, Socket::Family::INET, Socket::Type::STREAM, Socket::Protocol::IP) + addr_info.each { |a| + addr = a.ip_address.to_s.rchop(":#{port}") + break + } + addr + end + end +end diff --git a/src/amqproxy/pool.cr b/src/amqproxy/pool.cr index 71671ff..e24eff5 100644 --- a/src/amqproxy/pool.cr +++ b/src/amqproxy/pool.cr @@ -5,7 +5,7 @@ module AMQProxy getter :size @tls_ctx : OpenSSL::SSL::Context::Client? - def initialize(@host : String, @port : Int32, tls : Bool, @log : Logger, @idle_connection_timeout : Int32) + def initialize(@host : String, @port : Int32, tls : Bool, @log : Logger, @idle_connection_timeout : Int32, @metrics_client : MetricsClient = DummyMetricsClient.new) @pools = Hash(Tuple(String, String, String), Deque(Upstream)).new do |h, k| h[k] = Deque(Upstream).new end @@ -21,6 +21,8 @@ module AMQProxy if c.nil? || c.closed? c = Upstream.new(@host, @port, @tls_ctx, @log).connect(user, password, vhost) @size += 1 + c = Upstream.new(@host, @port, @tls_ctx, @log).connect(user, password, vhost) + @metrics_client.increment("connections.upstream.created", 1) end c.current_client = client c @@ -70,6 +72,7 @@ module AMQProxy @size -= 1 begin u.close "Pooled connection closed due to inactivity" + @metrics_client.increment("connections.upstream.closed", 1) rescue ex @log.error "Problem closing upstream: #{ex.inspect}" end @@ -82,6 +85,7 @@ module AMQProxy end end end + @metrics_client.gauge("connections.upstream.total", @size) end end end diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index c5aacbc..0593948 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -7,7 +7,7 @@ require "./upstream" module AMQProxy class Server - def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO, idle_connection_timeout = 5) + def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO, idle_connection_timeout = 5, @metrics_client : MetricsClient = DummyMetricsClient.new) @log = Logger.new(STDOUT) @log.level = log_level journald = @@ -25,7 +25,7 @@ module AMQProxy end @clients_lock = Mutex.new @clients = Array(Client).new - @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log, idle_connection_timeout) + @pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log, idle_connection_timeout, @metrics_client) @log.info "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" end @@ -74,15 +74,20 @@ module AMQProxy active_client(c) do @pool.borrow(user, password, vhost, c) do |u| # print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams" + @metrics_client.gauge("connections.client.total", client_connections) + @metrics_client.increment("connections.client.created", 1) + u.current_client = c c.read_loop(u) end rescue ex : Upstream::AccessError @log.error { "Access refused for user '#{user}' to vhost '#{vhost}', reason: #{ex.message}" } + @metrics_client.increment("connections.upstream.error.count", 1, tags: ["error:access_refused"]) close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED - #{ex.message}", 0_u16, 0_u16) close.to_io socket, IO::ByteFormat::NetworkEndian socket.flush rescue ex : Upstream::Error @log.error { "Upstream error for user '#{user}' to vhost '#{vhost}': #{ex.inspect} (cause: #{ex.cause.inspect})" } + @metrics_client.increment("connections.upstream.error.count", 1, tags: ["error:upstream_error"]) close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16) close.to_io socket, IO::ByteFormat::NetworkEndian socket.flush @@ -96,6 +101,8 @@ module AMQProxy ensure @log.debug { "Client disconnected: #{remote_address}" } socket.close rescue nil + @metrics_client.gauge("connections.client.total", client_connections) + @metrics_client.increment("connections.client.disconnected", 1) # print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams" end