Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ shards:
git: https://github.com/84codes/logger.cr.git
version: 1.0.2

statsd:
git: https://github.com/miketheman/statsd.cr.git
version: 0.5.0

2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dependencies:
github: cloudamqp/amq-protocol.cr
logger:
github: 84codes/logger.cr
statsd:
github: miketheman/statsd.cr

development_dependencies:
amqp-client:
Expand Down
63 changes: 60 additions & 3 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,63 @@
require "./spec_helper"

private def with_server(& : UDPSocket ->)
server = UDPSocket.new
server.bind("localhost", 1234)
yield server
ensure
server.close if server
end

describe AMQProxy::Server do
describe "statsd" do
it "sends client connections statsd metrics" do
with_server do |statsd_server|
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
statsd_client = AMQProxy::StatsdClient.new("localhost", 1234)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, statsd_client)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
i = 0
10.times do
i = i + 1
AMQP::Client.start("amqp://localhost:5673") do |conn|
conn.channel

# This is also testing that connection pooling is working
if i == 1
expected_message = "amqproxy.connections.upstream.created:1|c|@1"
statsd_server.gets(expected_message.bytesize).should eq expected_message
end

expected_message = "amqproxy.connections.client.total:1|g"
statsd_server.gets(expected_message.bytesize).should eq expected_message

expected_message = "amqproxy.connections.client.created:1|c|@1"
statsd_server.gets(expected_message.bytesize).should eq expected_message
end

expected_message = "amqproxy.connections.client.total:0|g"
statsd_server.gets(expected_message.bytesize).should eq expected_message

expected_message = "amqproxy.connections.client.disconnected:1|c|@1"
statsd_server.gets(expected_message.bytesize).should eq expected_message

sleep 0.1
end
ensure
s.stop_accepting_clients
end
end
end
end

it "keeps connections open" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG

s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -22,7 +77,9 @@ describe AMQProxy::Server do
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
logger = Logger.new(STDOUT)
logger.level = Logger::DEBUG
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand All @@ -46,7 +103,7 @@ describe AMQProxy::Server do

it "responds to upstream heartbeats" do
system("#{MAYBE_SUDO}rabbitmqctl eval 'application:set_env(rabbit, heartbeat, 1).' > /dev/null").should be_true
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG, 5, AMQProxy::DummyMetricsClient.new)
begin
spawn { s.listen("127.0.0.1", 5673) }
Fiber.yield
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "spec"
require "../src/amqproxy/server"
require "../src/amqproxy/version"
require "../src/amqproxy/metrics_client"
require "amqp-client"

MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "
17 changes: 15 additions & 2 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require "./amqproxy/version"
require "./amqproxy/server"
require "./amqproxy/metrics_client"
require "option_parser"
require "uri"
require "ini"
Expand All @@ -11,6 +12,8 @@ class AMQProxy::CLI
@log_level : Logger::Severity = Logger::INFO
@idle_connection_timeout : Int32 = ENV.fetch("IDLE_CONNECTION_TIMEOUT", "5").to_i
@upstream = ENV["AMQP_URL"]?
@statsd_host = ENV["STATSD_HOST"]? || ""
@statsd_port = 8125

def parse_config(path)
INI.parse(File.read(path)).each do |name, section|
Expand All @@ -33,6 +36,14 @@ class AMQProxy::CLI
else raise "Unsupported config #{name}/#{key}"
end
end
when "statsd"
section.each do |key, value|
case key
when "host" then @statsd_host = value
when "port" then @statsd_port = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
end
else raise "Unsupported config section #{name}"
end
end
Expand All @@ -54,6 +65,8 @@ class AMQProxy::CLI
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
parser.on("--statsd-ip=STATSD_IP", "StatsD IP to send metrics to (default disabled)") { |v| @statsd_host = v }
parser.on("--statsd-port=STATSD_PORT", "StatsD port to send metrics to (default is 8125)") { |v| @statsd_port = v.to_i }
parser.invalid_option { |arg| abort "Invalid argument: #{arg}" }
end

Expand All @@ -71,8 +84,8 @@ class AMQProxy::CLI
port = u.port || default_port
tls = u.scheme == "amqps"

server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout)

metrics_client = @statsd_host.empty? ? DummyMetricsClient.new : StatsdClient.new(@statsd_host, @statsd_port)
server = AMQProxy::Server.new(u.host || "", port, tls, @log_level, @idle_connection_timeout, metrics_client)
first_shutdown = true
shutdown = ->(_s : Signal) do
if first_shutdown
Expand Down
44 changes: 44 additions & 0 deletions src/amqproxy/metrics_client.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
require "statsd"
require "socket"

module AMQProxy
abstract class MetricsClient
abstract def increment(metric_name, sample_rate = nil, tags = nil)
abstract def gauge(metric_name, value, tags = nil)
end

class DummyMetricsClient < MetricsClient
def increment(metric_name, sample_rate = nil, tags = nil)
end

def gauge(metric_name, value, tags = nil)
end
end

class StatsdClient < MetricsClient
PREFIX = "amqproxy"

def initialize(statsd_host : String, statsd_port : Int32)
host = resolve(statsd_host, statsd_port)
@client = Statsd::Client.new(host, statsd_port)
end

def increment(metric_name, sample_rate = nil, tags = nil)
@client.increment("#{PREFIX}.#{metric_name}", sample_rate, tags)
end

def gauge(metric_name, value, tags = nil)
@client.gauge("#{PREFIX}.#{metric_name}", value, tags)
end

private def resolve(host : String, port : Int32)
addr = ""
addr_info = Socket::Addrinfo.resolve(host, port, Socket::Family::INET, Socket::Type::STREAM, Socket::Protocol::IP)
addr_info.each { |a|
addr = a.ip_address.to_s.rchop(":#{port}")
break
}
addr
end
end
end
6 changes: 5 additions & 1 deletion src/amqproxy/pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module AMQProxy
getter :size
@tls_ctx : OpenSSL::SSL::Context::Client?

def initialize(@host : String, @port : Int32, tls : Bool, @log : Logger, @idle_connection_timeout : Int32)
def initialize(@host : String, @port : Int32, tls : Bool, @log : Logger, @idle_connection_timeout : Int32, @metrics_client : MetricsClient = DummyMetricsClient.new)
@pools = Hash(Tuple(String, String, String), Deque(Upstream)).new do |h, k|
h[k] = Deque(Upstream).new
end
Expand All @@ -21,6 +21,8 @@ module AMQProxy
if c.nil? || c.closed?
c = Upstream.new(@host, @port, @tls_ctx, @log).connect(user, password, vhost)
@size += 1
c = Upstream.new(@host, @port, @tls_ctx, @log).connect(user, password, vhost)
@metrics_client.increment("connections.upstream.created", 1)
end
c.current_client = client
c
Expand Down Expand Up @@ -70,6 +72,7 @@ module AMQProxy
@size -= 1
begin
u.close "Pooled connection closed due to inactivity"
@metrics_client.increment("connections.upstream.closed", 1)
rescue ex
@log.error "Problem closing upstream: #{ex.inspect}"
end
Expand All @@ -82,6 +85,7 @@ module AMQProxy
end
end
end
@metrics_client.gauge("connections.upstream.total", @size)
end
end
end
Expand Down
11 changes: 9 additions & 2 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require "./upstream"

module AMQProxy
class Server
def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO, idle_connection_timeout = 5)
def initialize(upstream_host, upstream_port, upstream_tls, log_level = Logger::INFO, idle_connection_timeout = 5, @metrics_client : MetricsClient = DummyMetricsClient.new)
@log = Logger.new(STDOUT)
@log.level = log_level
journald =
Expand All @@ -25,7 +25,7 @@ module AMQProxy
end
@clients_lock = Mutex.new
@clients = Array(Client).new
@pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log, idle_connection_timeout)
@pool = Pool.new(upstream_host, upstream_port, upstream_tls, @log, idle_connection_timeout, @metrics_client)
@log.info "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}"
end

Expand Down Expand Up @@ -74,15 +74,20 @@ module AMQProxy
active_client(c) do
@pool.borrow(user, password, vhost, c) do |u|
# print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams"
@metrics_client.gauge("connections.client.total", client_connections)
@metrics_client.increment("connections.client.created", 1)
u.current_client = c
c.read_loop(u)
end
rescue ex : Upstream::AccessError
@log.error { "Access refused for user '#{user}' to vhost '#{vhost}', reason: #{ex.message}" }
@metrics_client.increment("connections.upstream.error.count", 1, tags: ["error:access_refused"])
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED - #{ex.message}", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
socket.flush
rescue ex : Upstream::Error
@log.error { "Upstream error for user '#{user}' to vhost '#{vhost}': #{ex.inspect} (cause: #{ex.cause.inspect})" }
@metrics_client.increment("connections.upstream.error.count", 1, tags: ["error:upstream_error"])
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
socket.flush
Expand All @@ -96,6 +101,8 @@ module AMQProxy
ensure
@log.debug { "Client disconnected: #{remote_address}" }
socket.close rescue nil
@metrics_client.gauge("connections.client.total", client_connections)
@metrics_client.increment("connections.client.disconnected", 1)
# print "\r#{@clients.size} clients\t\t #{@pool.size} upstreams"
end

Expand Down