diff --git a/.rbenv-version b/.rbenv-version index 2eec2d9d5..cf5ef83b0 100644 --- a/.rbenv-version +++ b/.rbenv-version @@ -1 +1,2 @@ +1.9.3-p194 jruby-1.7.0-dev diff --git a/Gemfile b/Gemfile index 4070b5383..be17f3620 100644 --- a/Gemfile +++ b/Gemfile @@ -1,7 +1,9 @@ -source :rubygems +source 'https://rubygems.org' gem 'eventmachine', '~>1.0.0.rc.4' gem 'redis', "~> 3.0.2" +gem 'aws-s3', "~> 0.6.3" + if RUBY_PLATFORM == 'java' gem 'json-jruby' else diff --git a/Gemfile.lock b/Gemfile.lock index eae434999..69be65678 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,17 +1,22 @@ GEM - remote: http://rubygems.org/ + remote: https://rubygems.org/ specs: + aws-s3 (0.6.3) + builder + mime-types + xml-simple + builder (3.0.4) eventmachine (1.0.0.rc.4) eventmachine (1.0.0.rc.4-java) json (1.5.0) json (1.5.0-java) - json-jruby (1.5.0-java) - json (= 1.5.0) metaclass (0.0.1) + mime-types (1.23) mocha (0.11.3) metaclass (~> 0.0.1) rake (0.9.2.2) redis (3.0.2) + xml-simple (1.1.2) yard (0.8.2) PLATFORMS @@ -19,8 +24,9 @@ PLATFORMS ruby DEPENDENCIES + aws-s3 (~> 0.6.3) eventmachine (~> 1.0.0.rc.4) - json-jruby + json mocha rake redis (~> 3.0.2) diff --git a/README.md b/README.md index 98750173c..52fe6ff77 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,10 @@ implementation](https://github.com/etsy/statsd), which they described in a [blog post](http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/). Batsd differs from etsy's statsd implementation primarily in how it stores data --- data is stored to a combination of Redis and flat files on disk. You can -read more about persistence in [About: -Persistence](http://noahhl.github.com/batsd/doc/file.persistence.html). +-- data is stored to a combination of Redis and flat files on disk (or AWS S3). You can +read more about persistence in [About: Persistence](http://noahhl.github.com/batsd/doc/file.persistence.html). -Batsd grew out of usage at [37signals](http://37signals.com), where it has been -used for the last year. An [earlier form](https://github.com/noahhl/statsd-server) was -inspired by [quasor](https://github.com/quasor/statsd). +Batsd grew out of usage at [37signals](http://37signals.com), where it has been used for the last year. An [earlier form](https://github.com/noahhl/statsd-server) was inspired by [quasor](https://github.com/quasor/statsd). ### Documentation: @@ -50,11 +47,26 @@ Example config.yml # Where to store data. Data at the first retention level is stored # in redis; further data retentions are stored on disk - # Root path to store disk aggregations - root: /statsd redis: :host: 127.0.0.1 :port: 6379 + + # Which file store to use. Available are: diskstore and s3 + filestore: 'diskstore' + + + # Diskstore configuration. + # This is only if you have filestore set to diskstore + diskstore: + # Root path to store disk aggregations + :root: tmp/statsd + + # S3 authentication and storage information + # This is only if you have filestore set to s3 + s3: + :access_key: 'access key goes here' + :secret_access_key: 'secret key goes here' + :bucket: 'bucket name goes here' # Configure how much data to retain at what intervals # Key is seconds, value is number of measurements at that diff --git a/config.yml b/config.yml index 2e0bdc847..0ebce16d2 100644 --- a/config.yml +++ b/config.yml @@ -1,11 +1,23 @@ bind: 0.0.0.0 port: 8125 -root: tmp/statsd + +filestore: 'diskstore' + +s3: + :access_key: 'access key goes here' + :secret_access_key: 'secret key goes here' + :bucket: 'bucket name goes here' + +diskstore: + :root: tmp/statsd + redis: host: 127.0.0.1 port: 6379 + retentions: 10: 360 #1 hour 60: 10080 #1 week 600: 52594 #1 year + autotruncate: false diff --git a/lib/batsd.rb b/lib/batsd.rb index 208179abf..9142cc262 100644 --- a/lib/batsd.rb +++ b/lib/batsd.rb @@ -1,10 +1,13 @@ require 'benchmark' require 'eventmachine' require 'redis' +require 'aws/s3' require 'core-ext/array' -require 'batsd/diskstore' +require 'batsd/filestore' +require 'batsd/filestore/diskstore' +require 'batsd/filestore/s3' require 'batsd/redis' require 'batsd/threadpool' require 'batsd/receiver' diff --git a/lib/batsd/deleter.rb b/lib/batsd/deleter.rb index 722a25843..6943e1f9c 100644 --- a/lib/batsd/deleter.rb +++ b/lib/batsd/deleter.rb @@ -6,13 +6,13 @@ class Deleter # Create a new deleter # - # * Establish the diskstore that will be used + # * Establish the filestore that will be used # * Establish the redis connection that will be needed # def initialize(options={}) @options = options - @redis = Batsd::Redis.new(options ) - @diskstore = Batsd::Diskstore.new(options[:root]) + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) end def delete(statistic) @@ -37,7 +37,7 @@ def delete(statistic) # other retentions retentions.each do |retention| key = "#{statistic}:#{retention}" - @diskstore.delete(@diskstore.build_filename(key), :delete_empty_dirs => true) + @filestore.delete(@filestore.build_filename(key), delete_empty_dirs: true) end end deletions diff --git a/lib/batsd/filestore.rb b/lib/batsd/filestore.rb new file mode 100644 index 000000000..788ed1360 --- /dev/null +++ b/lib/batsd/filestore.rb @@ -0,0 +1,35 @@ +module Batsd + + # A wrapper around all file stores (Diskstore and S3), includes commonality and control functions + class Filestore + attr_accessor :root + + # Creates an instance of the right child depending on configuration + def self.init(options) + + unless options[:filestore] + Batsd::Diskstore.new diskstore: { root: (options[:root] || options[:diskstore][:root]) } + else + case options[:filestore].downcase + when 'diskstore' then + Batsd::Diskstore.new(options) + when 's3' then + Batsd::S3.new(options) + end + end + end + + def build_filename(statistic) + return unless statistic + paths = [] + file_hash = Digest::MD5.hexdigest(statistic) + + paths << @root if @root + paths << file_hash[0,2] + paths << file_hash[2,2] + paths << file_hash + + File.join(paths) + end + end +end diff --git a/lib/batsd/diskstore.rb b/lib/batsd/filestore/diskstore.rb similarity index 86% rename from lib/batsd/diskstore.rb rename to lib/batsd/filestore/diskstore.rb index 55dbdda4d..e9ee9d1c9 100644 --- a/lib/batsd/diskstore.rb +++ b/lib/batsd/filestore/diskstore.rb @@ -1,26 +1,11 @@ -require 'digest' require 'fileutils' - module Batsd # Handles disk operations -- writing, truncating, and reading - class Diskstore + class Diskstore < Filestore # Create a new diskstore object - def initialize(root) - @root = root - end - - # Calculate the filename that will be used to store the - # metric to disk. - # - # Filenames are MD5 hashes of the statistic name, including any - # aggregation-based suffix, and are stored in two levels of nested - # directories (e.g., /00/01/0001s0d03dd0s030d03d) - # - def build_filename(statistic) - return unless statistic - file_hash = Digest::MD5.hexdigest(statistic) - File.join(@root, file_hash[0,2], file_hash[2,2], file_hash) + def initialize(options) + @root = options[:diskstore][:root] end # Append a value to a file diff --git a/lib/batsd/filestore/s3.rb b/lib/batsd/filestore/s3.rb new file mode 100644 index 000000000..8b8959517 --- /dev/null +++ b/lib/batsd/filestore/s3.rb @@ -0,0 +1,110 @@ +module Batsd + # Handles writing, truncating, and reading on AWS S3 buckets + # Meant to be a replacement for Batsd::Diskstore for usage on heroku + class S3 < Filestore + S3_FOLDER_EXT = '_$folder$' + + def initialize(options) + @credentials = options[:s3] + @bucket = options[:s3][:bucket] + end + + # Fetch a file from AWS S3 + def fetch_file(filename) + establish_connection + data = AWS::S3::S3Object.value(filename, @bucket) if AWS::S3::S3Object.exists?(filename, @bucket) + + data || '' + end + + + # Store a file to AWS S3 + # folders and file types are handled by the gem + + def store_file(filename, file_data) + establish_connection + + AWS::S3::S3Object.store filename, StringIO.new(file_data), @bucket, access: :authenticated_read + end + + # Append a value to a file + # + # Open the file in append mode (creating directories needed along + # the way), write the value and a newline, and close the file again. + # + def append_value_to_file(filename, value, attempts=0) + file_data = fetch_file(filename) + "#{value}\n" + + store_file filename, file_data + rescue Exception => e + puts "Encountered an error trying to store to #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + if attempts < 2 + puts "Retrying #{filename} for the #{attempts+1} time" + append_value_to_file(filename, value, attempts+1) + end + end + + # Reads the set of values in the range desired from file + # + # Reads until it reaches end_ts or the end fo the file. Returns an array + # of {timestamp: ts, value: v} hashes. + # + def read(statistic, start_ts, end_ts) + datapoints = [] + filename = build_filename statistic + + begin + file_data = fetch_file(filename) + file_data.split("\n").each do |line| + ts, value = line.split + if ts >= start_ts && ts <= end_ts + datapoints << { timestamp: ts.to_i, value: value } + end + end + rescue Exception => e + puts "Encountered an error trying to read #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + end + + datapoints + end + + # Truncates a file by rewriting to a temp file everything after the since + # timestamp that is provided. The temp file is then renaemed to the + # original. + # + def truncate(filename, since) + puts "Truncating #{filename} since #{since}" if ENV["VVERBOSE"] + + truncated_file_data = '' + old_file_data = fetch_file(filename) + + old_file_data.split("\n").each do |line| + truncated_file_data += "#{line}\n" if(line.split[0] >= since rescue true) + end + + store_file filename, truncated_file_data + + truncated_file_data + rescue Exception => e + puts "Encountered an error trying to truncate #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + end + + # Deletes a file, if it exists. + # If :delete_empty_dirs is true, empty directories will be deleted too. + # TODO: Support for :delete_empty_dirs + # + def delete(filename, options={}) + establish_connection + + AWS::S3::S3Object.find(filename, @bucket).delete if AWS::S3::S3Object.exists? filename, @bucket + rescue Exception => e + puts "Encountered an error trying to delete #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + end + + def establish_connection + unless AWS::S3::Base.connected? + AWS::S3::Base.establish_connection! access_key_id: @credentials[:access_key], secret_access_key: @credentials[:secret_access_key] + end + end + end +end diff --git a/lib/batsd/handlers/counter.rb b/lib/batsd/handlers/counter.rb index 1b621e848..90336af29 100644 --- a/lib/batsd/handlers/counter.rb +++ b/lib/batsd/handlers/counter.rb @@ -10,17 +10,18 @@ class Handler::Counter < Handler # Set up a new handler to handle counters # # * Set up a redis client - # * Set up a diskstore client to write aggregates to disk + # * Set up a filestore client to write aggregates to disk # * Initialize last flush timers to now # def initialize(options) - @redis = Batsd::Redis.new(options) - @diskstore = Batsd::Diskstore.new(options[:root]) - @counters = @active_counters = {} - @retentions = options[:retentions].keys - @flush_interval = @retentions.first now = Time.now.to_i - @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } + + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) + @counters = @active_counters = {} + @retentions = options[:retentions].keys + @flush_interval = @retentions.first + @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } super end @@ -101,7 +102,7 @@ def flush value = @redis.get_and_clear_key(key) if value value = "#{ts} #{value}" - @diskstore.append_value_to_file(@diskstore.build_filename(key), value) + @filestore.append_value_to_file(@filestore.build_filename(key), value) end end end diff --git a/lib/batsd/handlers/gauge.rb b/lib/batsd/handlers/gauge.rb index 4aa448425..741ac3389 100644 --- a/lib/batsd/handlers/gauge.rb +++ b/lib/batsd/handlers/gauge.rb @@ -12,11 +12,11 @@ class Handler::Gauge < Handler # Set up a new handler to handle gauges # # * Set up a redis client - # * Set up a diskstore client to write aggregates to disk + # * Set up a filestore client to write aggregates to disk # def initialize(options) - @redis = Batsd::Redis.new(options) - @diskstore = Batsd::Diskstore.new(options[:root]) + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) super end @@ -34,7 +34,7 @@ def handle(key, value, sample_rate) end value = "#{timestamp} #{value}" key = "gauges:#{key}" - @diskstore.append_value_to_file(@diskstore.build_filename(key), value) + @filestore.append_value_to_file(@filestore.build_filename(key), value) @redis.add_datapoint key end end diff --git a/lib/batsd/handlers/timer.rb b/lib/batsd/handlers/timer.rb index a375d73fd..38174de9d 100644 --- a/lib/batsd/handlers/timer.rb +++ b/lib/batsd/handlers/timer.rb @@ -10,18 +10,19 @@ class Handler::Timer < Handler # Set up a new handler to handle timers # # * Set up a redis client - # * Set up a diskstore client to write aggregates to disk + # * Set up a filestore client to write aggregates to disk # * Initialize last flush timers to now # def initialize(options) - @redis = Batsd::Redis.new(options) - @diskstore = Batsd::Diskstore.new(options[:root]) - @retentions = options[:retentions].keys - @flush_interval = @retentions.first - @active_timers = {} - @timers = {} now = Time.now.to_i - @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } + + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) + @retentions = options[:retentions].keys + @flush_interval = @retentions.first + @active_timers = {} + @timers = {} + @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } @fast_threadpool = Threadpool.new((options[:threadpool_size] || 100)/2) super end @@ -107,7 +108,7 @@ def flush name = aggregation end val = (count > 1 ? values.send(aggregation.to_sym) : values.first) - @diskstore.append_value_to_file(@diskstore.build_filename("#{key}:#{name}:#{retention}"), "#{timestamp} #{val}") + @filestore.append_value_to_file(@filestore.build_filename("#{key}:#{name}:#{retention}"), "#{timestamp} #{val}") end end end diff --git a/lib/batsd/server.rb b/lib/batsd/server.rb index e7b0e3250..31eefe105 100644 --- a/lib/batsd/server.rb +++ b/lib/batsd/server.rb @@ -11,19 +11,19 @@ def self.config def self.config=(config) @config=config end - - # Set up a redis and diskstore instance per connection + + # Set up a redis and filestore instance per connection # so they don't step on each other. Since redis commands # are happening in a deferrable, intentionally not using EM-redis def post_init puts "batsd server ready and waiting on #{Batsd::Server.config[:port]} to ship data upon request\n" @redis = Batsd::Redis.new(Batsd::Server.config) - @diskstore = Batsd::Diskstore.new(Batsd::Server.config[:root]) + @filestore = Batsd::Filestore.init(Batsd::Server.config) end - + def unbind @redis.client.quit - end + end # Handle a command received over the server port and return # the datapoints, values, or a PONG as requested. @@ -43,7 +43,7 @@ def receive_data(msg) datapoints, interval = [], 0 if metric.match(/^gauge/) - datapoints = @diskstore.read(metric, begin_time, end_time) + datapoints = @filestore.read(metric, begin_time, end_time) else Batsd::Server.config[:retentions].each_with_index do |retention, index| if (index != Batsd::Server.config[:retentions].count - 1) && (Time.now.to_i - (retention[0] * retention[1]) > begin_time.to_i) @@ -55,7 +55,7 @@ def receive_data(msg) datapoints = @redis.values_from_zset(metric, begin_time, end_time) break else - datapoints = @diskstore.read("#{metric}:#{retention[0]}", begin_time, end_time) + datapoints = @filestore.read("#{metric}:#{retention[0]}", begin_time, end_time) break end end @@ -77,7 +77,7 @@ def receive_data(msg) end end end - + # Bind to port+2 and serve up data over TCP. Offers access to # both the set of datapoints and the values as JSON arrays. class Daemon diff --git a/lib/batsd/truncator.rb b/lib/batsd/truncator.rb index 2dd6f08d7..aa4fca384 100644 --- a/lib/batsd/truncator.rb +++ b/lib/batsd/truncator.rb @@ -6,14 +6,14 @@ class Truncator # Create a new truncator # - # * Establish the diskstore that will be used + # * Establish the filestore that will be used # * Establish the redis connection that will be needed # def initialize(options={}) - @options = options + @options = options @retentions = options[:retentions].keys - @redis = Batsd::Redis.new(options ) - @diskstore = Batsd::Diskstore.new(options[:root]) + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) @threadpool = Threadpool.new(options[:truncate_threadpool_size] || 10) end @@ -46,10 +46,10 @@ def run(retention) else # Stored on disk keys.each_slice(100) do |keys| - @threadpool.queue @diskstore, keys, retention, min_ts do |diskstore, keys, retention, min_ts| + @threadpool.queue @filestore, keys, retention, min_ts do |filestore, keys, retention, min_ts| keys.each do |key| key = "#{key}:#{retention}" - diskstore.truncate(diskstore.build_filename(key), min_ts.to_s) + filestore.truncate(filestore.build_filename(key), min_ts.to_s) end end end diff --git a/test/unit/diskstore_test.rb b/test/unit/diskstore_test.rb index 56e690e05..13c48f532 100644 --- a/test/unit/diskstore_test.rb +++ b/test/unit/diskstore_test.rb @@ -2,7 +2,7 @@ class DiskstoreTest < Test::Unit::TestCase def setup - @diskstore = Batsd::Diskstore.new("test/data") + @diskstore = Batsd::Diskstore.new({diskstore: {root: "test/data"} }) @statistic = "counters:test_counter:60" end diff --git a/test/unit/filestore_test.rb b/test/unit/filestore_test.rb new file mode 100644 index 000000000..d2f4acd50 --- /dev/null +++ b/test/unit/filestore_test.rb @@ -0,0 +1,40 @@ +require 'test_helper' +class FilestoreTest < Test::Unit::TestCase + + def setup + @filestore = Batsd::Filestore.new() + @statistic = "counters:test_counter:60" + end + + def test_filename_calculation_with_root + @filestore.root = 'test/data' + assert_equal "test/data/37/2a/372a5d5450ef177a737f6a92c0246436", @filestore.build_filename(@statistic) + end + + def test_filename_calculation_without_root # happens with S3 + @filestore.root = nil + assert_equal "37/2a/372a5d5450ef177a737f6a92c0246436", @filestore.build_filename(@statistic) + end + + def test_init_diskstore + options = { + filestore: 'diskstore', + diskstore: { + root: 'test/data' + } + } + + assert_equal Batsd::Filestore.init(options).class, Batsd::Diskstore + end + + def test_init_s3 + options = { + filestore: 's3', + s3: { + bucket: 'bucket' + } + } + + assert_equal Batsd::Filestore.init(options).class, Batsd::S3 + end +end diff --git a/test/unit/s3_test.rb b/test/unit/s3_test.rb new file mode 100644 index 000000000..28881c4fc --- /dev/null +++ b/test/unit/s3_test.rb @@ -0,0 +1,79 @@ +require 'test_helper' +require 'mocha/setup' + +class S3Test < Test::Unit::TestCase + + def setup + Batsd::S3.any_instance.stubs(:establish_connection).returns(true) + @bucket = 'fake-bucket' + + @s3 = Batsd::S3.new({ + s3: { + bucket: @bucket + } + }) + + @statistic = "counters:test_counter:60" + end + + def teardown + #FileUtils.rm("test/data/37/2a/372a5d5450ef177a737f6a92c0246436") rescue nil + end + + def test_store_writes_to_file + now = Time.now.to_i + value = "#{now} #{12}" + + @s3.expects(:fetch_file).with('37/2a/372a5d5450ef177a737f6a92c0246436').returns('') + @s3.expects(:store_file).with('37/2a/372a5d5450ef177a737f6a92c0246436', "#{value}\n") + @s3.append_value_to_file(@s3.build_filename(@statistic), value) + end + + def test_read_reads_from_file + now = Time.now.to_i - 50 + fake_file = '' + (1..50).each do |i| + fake_file += "#{now + i} #{i}\n" + end + + @s3.expects(:fetch_file).with(@s3.build_filename(@statistic)).returns(fake_file).at_least_once + + full_result = @s3.read(@statistic, now.to_s, (now + 50).to_s) + assert_equal 50, full_result.length + assert_equal 25, full_result[24][:value].to_f + partial_result = @s3.read(@statistic, (now+25).to_s, (now + 35).to_s) + assert_equal 11, partial_result.length + assert_equal 27, partial_result[2][:value].to_f + end + + def test_truncate_cleans_up_file + now = Time.now.to_i - 50 + fake_file = '' + (1..50).each do |i| + fake_file += "#{now + i} #{i}\n" + end + + @s3.expects(:fetch_file).with(@s3.build_filename(@statistic)).returns(fake_file).at_least_once + @s3.expects(:store_file).with(@s3.build_filename(@statistic), anything).at_least_once + + assert_equal 50, @s3.read(@statistic, now.to_s, (now + 50).to_s).length + + updated_file = @s3.truncate(@s3.build_filename(@statistic), (now+25).to_s) + + @s3.expects(:fetch_file).with(@s3.build_filename(@statistic)).returns(updated_file).at_least_once + + assert_equal 26, @s3.read(@statistic, now.to_s, (now + 50).to_s).length + end + + def test_delete_unlinks_file + s3_object = stub + s3_object.expects(:delete).returns(true) + + AWS::S3::S3Object.expects(:exists?).with(@s3.build_filename(@statistic), @bucket).returns(true) + AWS::S3::S3Object.expects(:find).with(@s3.build_filename(@statistic), @bucket).returns(s3_object) + + @s3.delete(@s3.build_filename(@statistic)) + end + + +end