From 6f0601533f67b9a4e41a3fdb9e1889394d4f2874 Mon Sep 17 00:00:00 2001 From: Nima Gardideh Date: Wed, 10 Jul 2013 11:34:21 -0400 Subject: [PATCH 1/5] Initial attempt at creating the s3 store. Creating tests now. --- .rbenv-version | 1 + Gemfile | 4 +- Gemfile.lock | 14 +++- config.yml | 14 +++- lib/batsd.rb | 5 +- lib/batsd/deleter.rb | 8 +- lib/batsd/filestore.rb | 21 +++++ lib/batsd/{ => filestore}/diskstore.rb | 22 +---- lib/batsd/filestore/s3.rb | 111 +++++++++++++++++++++++++ lib/batsd/handlers/counter.rb | 17 ++-- lib/batsd/handlers/gauge.rb | 8 +- lib/batsd/handlers/timer.rb | 19 +++-- lib/batsd/server.rb | 16 ++-- lib/batsd/truncator.rb | 12 +-- 14 files changed, 208 insertions(+), 64 deletions(-) create mode 100644 lib/batsd/filestore.rb rename lib/batsd/{ => filestore}/diskstore.rb (85%) create mode 100644 lib/batsd/filestore/s3.rb diff --git a/.rbenv-version b/.rbenv-version index 2eec2d9d5..cf5ef83b0 100644 --- a/.rbenv-version +++ b/.rbenv-version @@ -1 +1,2 @@ +1.9.3-p194 jruby-1.7.0-dev diff --git a/Gemfile b/Gemfile index 4070b5383..be17f3620 100644 --- a/Gemfile +++ b/Gemfile @@ -1,7 +1,9 @@ -source :rubygems +source 'https://rubygems.org' gem 'eventmachine', '~>1.0.0.rc.4' gem 'redis', "~> 3.0.2" +gem 'aws-s3', "~> 0.6.3" + if RUBY_PLATFORM == 'java' gem 'json-jruby' else diff --git a/Gemfile.lock b/Gemfile.lock index eae434999..69be65678 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,17 +1,22 @@ GEM - remote: http://rubygems.org/ + remote: https://rubygems.org/ specs: + aws-s3 (0.6.3) + builder + mime-types + xml-simple + builder (3.0.4) eventmachine (1.0.0.rc.4) eventmachine (1.0.0.rc.4-java) json (1.5.0) json (1.5.0-java) - json-jruby (1.5.0-java) - json (= 1.5.0) metaclass (0.0.1) + mime-types (1.23) mocha (0.11.3) metaclass (~> 0.0.1) rake (0.9.2.2) redis (3.0.2) + xml-simple (1.1.2) yard (0.8.2) PLATFORMS @@ -19,8 +24,9 @@ PLATFORMS ruby DEPENDENCIES + aws-s3 (~> 0.6.3) eventmachine (~> 1.0.0.rc.4) - json-jruby + json mocha rake redis (~> 3.0.2) diff --git a/config.yml b/config.yml index 2e0bdc847..7519695b3 100644 --- a/config.yml +++ b/config.yml @@ -1,11 +1,23 @@ bind: 0.0.0.0 port: 8125 -root: tmp/statsd + +file_store: 'diskstore' + +s3: + access_key: 'access key goes here' + secret_access_key: 'secret acecss key goes here' + bucket: 'bucket name goes here' + +diskstore: + root: tmp/statsd + redis: host: 127.0.0.1 port: 6379 + retentions: 10: 360 #1 hour 60: 10080 #1 week 600: 52594 #1 year + autotruncate: false diff --git a/lib/batsd.rb b/lib/batsd.rb index 208179abf..8803e5ab7 100644 --- a/lib/batsd.rb +++ b/lib/batsd.rb @@ -1,10 +1,13 @@ require 'benchmark' require 'eventmachine' require 'redis' +require 'aws/s3' require 'core-ext/array' -require 'batsd/diskstore' +require 'batsd/file_store' +require 'batsd/file_store/diskstore' +require 'batsd/file_store/s3store' require 'batsd/redis' require 'batsd/threadpool' require 'batsd/receiver' diff --git a/lib/batsd/deleter.rb b/lib/batsd/deleter.rb index 722a25843..6943e1f9c 100644 --- a/lib/batsd/deleter.rb +++ b/lib/batsd/deleter.rb @@ -6,13 +6,13 @@ class Deleter # Create a new deleter # - # * Establish the diskstore that will be used + # * Establish the filestore that will be used # * Establish the redis connection that will be needed # def initialize(options={}) @options = options - @redis = Batsd::Redis.new(options ) - @diskstore = Batsd::Diskstore.new(options[:root]) + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) end def delete(statistic) @@ -37,7 +37,7 @@ def delete(statistic) # other retentions retentions.each do |retention| key = "#{statistic}:#{retention}" - @diskstore.delete(@diskstore.build_filename(key), :delete_empty_dirs => true) + @filestore.delete(@filestore.build_filename(key), delete_empty_dirs: true) end end deletions diff --git a/lib/batsd/filestore.rb b/lib/batsd/filestore.rb new file mode 100644 index 000000000..2fc61de10 --- /dev/null +++ b/lib/batsd/filestore.rb @@ -0,0 +1,21 @@ +module Batsd + + # A wrapper around all file stores (Diskstore and S3), includes commonality and control functions + class Filestore + # Creates an instance of the right child depending on configuration + def self.init(options) + case Batsd::Server.config[:filestore].downcase + when 'diskstore' then + Batsd::Diskstore.new(Batsd::Server.config) + when 's3' then + Batsd::S3.new(Batsd::Server.config) + end + end + + def build_filename(statistic, root='') + return unless statistic + file_hash = Digest::MD5.hexdigest(statistic) + File.join(@root, file_hash[0,2], file_hash[2,2], file_hash) + end + end +end diff --git a/lib/batsd/diskstore.rb b/lib/batsd/filestore/diskstore.rb similarity index 85% rename from lib/batsd/diskstore.rb rename to lib/batsd/filestore/diskstore.rb index 55dbdda4d..e470d39d4 100644 --- a/lib/batsd/diskstore.rb +++ b/lib/batsd/filestore/diskstore.rb @@ -1,26 +1,12 @@ -require 'digest' require 'fileutils' module Batsd # Handles disk operations -- writing, truncating, and reading - class Diskstore + class Diskstore < Filestore # Create a new diskstore object - def initialize(root) - @root = root - end - - # Calculate the filename that will be used to store the - # metric to disk. - # - # Filenames are MD5 hashes of the statistic name, including any - # aggregation-based suffix, and are stored in two levels of nested - # directories (e.g., /00/01/0001s0d03dd0s030d03d) - # - def build_filename(statistic) - return unless statistic - file_hash = Digest::MD5.hexdigest(statistic) - File.join(@root, file_hash[0,2], file_hash[2,2], file_hash) + def initialize(options) + @root = options[:diskstore][:root] end # Append a value to a file @@ -49,7 +35,7 @@ def append_value_to_file(filename, value, attempts=0) # def read(statistic, start_ts, end_ts) datapoints = [] - filename = build_filename(statistic) + filename = build_filename(statistic, @root) begin File.open(filename, 'r') do |file| while (line = file.gets) diff --git a/lib/batsd/filestore/s3.rb b/lib/batsd/filestore/s3.rb new file mode 100644 index 000000000..21101a803 --- /dev/null +++ b/lib/batsd/filestore/s3.rb @@ -0,0 +1,111 @@ +module Batsd + # Handles writing, truncating, and reading on AWS S3 buckets + # Meant to be a replacement for Batsd::Diskstore for usage on heroku + class S3 < Filestore + S3_FOLDER_EXT = '_$folder$' + + def initialize(options) + @credentials = options[:s3] + @bucket = options[:s3][:bucket] + end + + # Fetch a file from our AWS S3 + def fetch_file(filename) + data = AWS::S3::S3Object.value(filename, @bucket) if AWS::S3::S3Object.exists?(filename, @bucket) + + data || '' + end + + + # Store a file to AWS S3 + # folders and file types are handled by the gem + + def store_file(filename, file_data) + AWS::S3::S3Object.store filename, StringIO.new(file_data), @bucket, access: :authenticated_read + end + + # Append a value to a file + # + # Open the file in append mode (creating directories needed along + # the way), write the value and a newline, and close the file again. + # + def append_value_to_file(filename, value, attempts=0) + establish_connection + + file_data = fetch_file(filename) + "#{value}\n" + + store_file filename, file_data + rescue Exception => e + puts "Encountered an error trying to store to #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + if attempts < 2 + puts "Retrying #{filename} for the #{attempts+1} time" + append_value_to_file(filename, value, attempts+1) + end + end + + # Reads the set of values in the range desired from file + # + # Reads until it reaches end_ts or the end fo the file. Returns an array + # of {timestamp: ts, value: v} hashes. + # + def read(statistic, start_ts, end_ts) + establish_connection + + datapoints = [] + filename = build_filename statistic + + begin + file_data = fetch_file + file_data.split("\n").each do |line| + ts, value = line.split + if ts >= start_ts && ts <= end_ts + datapoints << { timestamp: ts.to_i, value: value } + end + end + rescue Exception => e + puts "Encountered an error trying to read #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + end + + datapoints + end + + # Truncates a file by rewriting to a temp file everything after the since + # timestamp that is provided. The temp file is then renaemed to the + # original. + # + def truncate(filename, since) + establish_connection + + puts "Truncating #{filename} since #{since}" if ENV["VVERBOSE"] + + truncated_file_data = '' + old_file_data = fetch_file(filename) + + old_file_data.split("\n").each do |line| + truncated_file_data += line if(line.split[0] >= since rescue true) + end + + store_file filename, truncated_file_data + rescue Exception => e + puts "Encountered an error trying to truncate #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + end + + # Deletes a file, if it exists. + # If :delete_empty_dirs is true, empty directories will be deleted too. + # TODO: Support for :delete_empty_dirs + # + def delete(filename, options={}) + establish_connection + + AWS::S3::S3Object.find(filename, @bucket).delete if AWS::S3::S3Object.exists? filename, @bucket + rescue Exception => e + puts "Encountered an error trying to delete #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" + end + + def establish_connection + unless AWS::S3::Base.connected? + AWS::S3::Base.establish_connection! access_key_id: @credentials[:access_key], secret_access_key: @credentials[:secret_access_key] + end + end + end +end diff --git a/lib/batsd/handlers/counter.rb b/lib/batsd/handlers/counter.rb index 1b621e848..90336af29 100644 --- a/lib/batsd/handlers/counter.rb +++ b/lib/batsd/handlers/counter.rb @@ -10,17 +10,18 @@ class Handler::Counter < Handler # Set up a new handler to handle counters # # * Set up a redis client - # * Set up a diskstore client to write aggregates to disk + # * Set up a filestore client to write aggregates to disk # * Initialize last flush timers to now # def initialize(options) - @redis = Batsd::Redis.new(options) - @diskstore = Batsd::Diskstore.new(options[:root]) - @counters = @active_counters = {} - @retentions = options[:retentions].keys - @flush_interval = @retentions.first now = Time.now.to_i - @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } + + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) + @counters = @active_counters = {} + @retentions = options[:retentions].keys + @flush_interval = @retentions.first + @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } super end @@ -101,7 +102,7 @@ def flush value = @redis.get_and_clear_key(key) if value value = "#{ts} #{value}" - @diskstore.append_value_to_file(@diskstore.build_filename(key), value) + @filestore.append_value_to_file(@filestore.build_filename(key), value) end end end diff --git a/lib/batsd/handlers/gauge.rb b/lib/batsd/handlers/gauge.rb index 4aa448425..c42dba4f3 100644 --- a/lib/batsd/handlers/gauge.rb +++ b/lib/batsd/handlers/gauge.rb @@ -12,11 +12,11 @@ class Handler::Gauge < Handler # Set up a new handler to handle gauges # # * Set up a redis client - # * Set up a diskstore client to write aggregates to disk + # * Set up a filestore client to write aggregates to disk # def initialize(options) - @redis = Batsd::Redis.new(options) - @diskstore = Batsd::Diskstore.new(options[:root]) + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Fielstore.init(options) super end @@ -34,7 +34,7 @@ def handle(key, value, sample_rate) end value = "#{timestamp} #{value}" key = "gauges:#{key}" - @diskstore.append_value_to_file(@diskstore.build_filename(key), value) + @filestore.append_value_to_file(@filestore.build_filename(key), value) @redis.add_datapoint key end end diff --git a/lib/batsd/handlers/timer.rb b/lib/batsd/handlers/timer.rb index a375d73fd..38174de9d 100644 --- a/lib/batsd/handlers/timer.rb +++ b/lib/batsd/handlers/timer.rb @@ -10,18 +10,19 @@ class Handler::Timer < Handler # Set up a new handler to handle timers # # * Set up a redis client - # * Set up a diskstore client to write aggregates to disk + # * Set up a filestore client to write aggregates to disk # * Initialize last flush timers to now # def initialize(options) - @redis = Batsd::Redis.new(options) - @diskstore = Batsd::Diskstore.new(options[:root]) - @retentions = options[:retentions].keys - @flush_interval = @retentions.first - @active_timers = {} - @timers = {} now = Time.now.to_i - @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } + + @redis = Batsd::Redis.new(options) + @filestore = Batsd::Filestore.init(options) + @retentions = options[:retentions].keys + @flush_interval = @retentions.first + @active_timers = {} + @timers = {} + @last_flushes = @retentions.inject({}){|l, r| l[r] = now; l } @fast_threadpool = Threadpool.new((options[:threadpool_size] || 100)/2) super end @@ -107,7 +108,7 @@ def flush name = aggregation end val = (count > 1 ? values.send(aggregation.to_sym) : values.first) - @diskstore.append_value_to_file(@diskstore.build_filename("#{key}:#{name}:#{retention}"), "#{timestamp} #{val}") + @filestore.append_value_to_file(@filestore.build_filename("#{key}:#{name}:#{retention}"), "#{timestamp} #{val}") end end end diff --git a/lib/batsd/server.rb b/lib/batsd/server.rb index e7b0e3250..31eefe105 100644 --- a/lib/batsd/server.rb +++ b/lib/batsd/server.rb @@ -11,19 +11,19 @@ def self.config def self.config=(config) @config=config end - - # Set up a redis and diskstore instance per connection + + # Set up a redis and filestore instance per connection # so they don't step on each other. Since redis commands # are happening in a deferrable, intentionally not using EM-redis def post_init puts "batsd server ready and waiting on #{Batsd::Server.config[:port]} to ship data upon request\n" @redis = Batsd::Redis.new(Batsd::Server.config) - @diskstore = Batsd::Diskstore.new(Batsd::Server.config[:root]) + @filestore = Batsd::Filestore.init(Batsd::Server.config) end - + def unbind @redis.client.quit - end + end # Handle a command received over the server port and return # the datapoints, values, or a PONG as requested. @@ -43,7 +43,7 @@ def receive_data(msg) datapoints, interval = [], 0 if metric.match(/^gauge/) - datapoints = @diskstore.read(metric, begin_time, end_time) + datapoints = @filestore.read(metric, begin_time, end_time) else Batsd::Server.config[:retentions].each_with_index do |retention, index| if (index != Batsd::Server.config[:retentions].count - 1) && (Time.now.to_i - (retention[0] * retention[1]) > begin_time.to_i) @@ -55,7 +55,7 @@ def receive_data(msg) datapoints = @redis.values_from_zset(metric, begin_time, end_time) break else - datapoints = @diskstore.read("#{metric}:#{retention[0]}", begin_time, end_time) + datapoints = @filestore.read("#{metric}:#{retention[0]}", begin_time, end_time) break end end @@ -77,7 +77,7 @@ def receive_data(msg) end end end - + # Bind to port+2 and serve up data over TCP. Offers access to # both the set of datapoints and the values as JSON arrays. class Daemon diff --git a/lib/batsd/truncator.rb b/lib/batsd/truncator.rb index 2dd6f08d7..e1aea8954 100644 --- a/lib/batsd/truncator.rb +++ b/lib/batsd/truncator.rb @@ -6,14 +6,14 @@ class Truncator # Create a new truncator # - # * Establish the diskstore that will be used + # * Establish the filestore that will be used # * Establish the redis connection that will be needed # def initialize(options={}) - @options = options + @options = options @retentions = options[:retentions].keys - @redis = Batsd::Redis.new(options ) - @diskstore = Batsd::Diskstore.new(options[:root]) + @redis = Batsd::Redis.new(options ) + @filestore = Batsd::Filestore.init(options) @threadpool = Threadpool.new(options[:truncate_threadpool_size] || 10) end @@ -46,10 +46,10 @@ def run(retention) else # Stored on disk keys.each_slice(100) do |keys| - @threadpool.queue @diskstore, keys, retention, min_ts do |diskstore, keys, retention, min_ts| + @threadpool.queue @filestore, keys, retention, min_ts do |filestore, keys, retention, min_ts| keys.each do |key| key = "#{key}:#{retention}" - diskstore.truncate(diskstore.build_filename(key), min_ts.to_s) + filestore.truncate(filestore.build_filename(key), min_ts.to_s) end end end From 962c5c42a1b40a402dfed8a16f10ef60e04b6035 Mon Sep 17 00:00:00 2001 From: Nima Gardideh Date: Wed, 10 Jul 2013 12:58:17 -0400 Subject: [PATCH 2/5] Added test and fixed a few issues with all filestores. --- config.yml | 2 +- lib/batsd.rb | 6 +-- lib/batsd/filestore.rb | 19 ++++++-- lib/batsd/filestore/diskstore.rb | 2 +- lib/batsd/filestore/s3.rb | 8 ++-- lib/batsd/handlers/gauge.rb | 2 +- lib/batsd/server.rb | 2 +- lib/batsd/truncator.rb | 2 +- test/unit/diskstore_test.rb | 2 +- test/unit/filestore_test.rb | 40 ++++++++++++++++ test/unit/s3_test.rb | 79 ++++++++++++++++++++++++++++++++ 11 files changed, 147 insertions(+), 17 deletions(-) create mode 100644 test/unit/filestore_test.rb create mode 100644 test/unit/s3_test.rb diff --git a/config.yml b/config.yml index 7519695b3..ec0478714 100644 --- a/config.yml +++ b/config.yml @@ -1,7 +1,7 @@ bind: 0.0.0.0 port: 8125 -file_store: 'diskstore' +filestore: 'diskstore' s3: access_key: 'access key goes here' diff --git a/lib/batsd.rb b/lib/batsd.rb index 8803e5ab7..9142cc262 100644 --- a/lib/batsd.rb +++ b/lib/batsd.rb @@ -5,9 +5,9 @@ require 'core-ext/array' -require 'batsd/file_store' -require 'batsd/file_store/diskstore' -require 'batsd/file_store/s3store' +require 'batsd/filestore' +require 'batsd/filestore/diskstore' +require 'batsd/filestore/s3' require 'batsd/redis' require 'batsd/threadpool' require 'batsd/receiver' diff --git a/lib/batsd/filestore.rb b/lib/batsd/filestore.rb index 2fc61de10..f734a0e44 100644 --- a/lib/batsd/filestore.rb +++ b/lib/batsd/filestore.rb @@ -2,20 +2,29 @@ module Batsd # A wrapper around all file stores (Diskstore and S3), includes commonality and control functions class Filestore + attr_accessor :root + # Creates an instance of the right child depending on configuration def self.init(options) - case Batsd::Server.config[:filestore].downcase + case options[:filestore].downcase when 'diskstore' then - Batsd::Diskstore.new(Batsd::Server.config) + Batsd::Diskstore.new(options) when 's3' then - Batsd::S3.new(Batsd::Server.config) + Batsd::S3.new(options) end end - def build_filename(statistic, root='') + def build_filename(statistic) return unless statistic + paths = [] file_hash = Digest::MD5.hexdigest(statistic) - File.join(@root, file_hash[0,2], file_hash[2,2], file_hash) + + paths << @root if @root + paths << file_hash[0,2] + paths << file_hash[2,2] + paths << file_hash + + File.join(paths) end end end diff --git a/lib/batsd/filestore/diskstore.rb b/lib/batsd/filestore/diskstore.rb index e470d39d4..f3590268a 100644 --- a/lib/batsd/filestore/diskstore.rb +++ b/lib/batsd/filestore/diskstore.rb @@ -35,7 +35,7 @@ def append_value_to_file(filename, value, attempts=0) # def read(statistic, start_ts, end_ts) datapoints = [] - filename = build_filename(statistic, @root) + filename = build_filename(statistic) begin File.open(filename, 'r') do |file| while (line = file.gets) diff --git a/lib/batsd/filestore/s3.rb b/lib/batsd/filestore/s3.rb index 21101a803..45020f25a 100644 --- a/lib/batsd/filestore/s3.rb +++ b/lib/batsd/filestore/s3.rb @@ -9,7 +9,7 @@ def initialize(options) @bucket = options[:s3][:bucket] end - # Fetch a file from our AWS S3 + # Fetch a file from AWS S3 def fetch_file(filename) data = AWS::S3::S3Object.value(filename, @bucket) if AWS::S3::S3Object.exists?(filename, @bucket) @@ -55,7 +55,7 @@ def read(statistic, start_ts, end_ts) filename = build_filename statistic begin - file_data = fetch_file + file_data = fetch_file(filename) file_data.split("\n").each do |line| ts, value = line.split if ts >= start_ts && ts <= end_ts @@ -82,10 +82,12 @@ def truncate(filename, since) old_file_data = fetch_file(filename) old_file_data.split("\n").each do |line| - truncated_file_data += line if(line.split[0] >= since rescue true) + truncated_file_data += "#{line}\n" if(line.split[0] >= since rescue true) end store_file filename, truncated_file_data + + truncated_file_data rescue Exception => e puts "Encountered an error trying to truncate #{filename}: #{e} #{e.message} #{e.backtrace if ENV["VERBOSE"]}" end diff --git a/lib/batsd/handlers/gauge.rb b/lib/batsd/handlers/gauge.rb index c42dba4f3..741ac3389 100644 --- a/lib/batsd/handlers/gauge.rb +++ b/lib/batsd/handlers/gauge.rb @@ -16,7 +16,7 @@ class Handler::Gauge < Handler # def initialize(options) @redis = Batsd::Redis.new(options) - @filestore = Batsd::Fielstore.init(options) + @filestore = Batsd::Filestore.init(options) super end diff --git a/lib/batsd/server.rb b/lib/batsd/server.rb index 31eefe105..871c9ed79 100644 --- a/lib/batsd/server.rb +++ b/lib/batsd/server.rb @@ -18,7 +18,7 @@ def self.config=(config) def post_init puts "batsd server ready and waiting on #{Batsd::Server.config[:port]} to ship data upon request\n" @redis = Batsd::Redis.new(Batsd::Server.config) - @filestore = Batsd::Filestore.init(Batsd::Server.config) + @filestore = Batsd::Filestore.init(options) end def unbind diff --git a/lib/batsd/truncator.rb b/lib/batsd/truncator.rb index e1aea8954..aa4fca384 100644 --- a/lib/batsd/truncator.rb +++ b/lib/batsd/truncator.rb @@ -12,7 +12,7 @@ class Truncator def initialize(options={}) @options = options @retentions = options[:retentions].keys - @redis = Batsd::Redis.new(options ) + @redis = Batsd::Redis.new(options) @filestore = Batsd::Filestore.init(options) @threadpool = Threadpool.new(options[:truncate_threadpool_size] || 10) end diff --git a/test/unit/diskstore_test.rb b/test/unit/diskstore_test.rb index 56e690e05..13c48f532 100644 --- a/test/unit/diskstore_test.rb +++ b/test/unit/diskstore_test.rb @@ -2,7 +2,7 @@ class DiskstoreTest < Test::Unit::TestCase def setup - @diskstore = Batsd::Diskstore.new("test/data") + @diskstore = Batsd::Diskstore.new({diskstore: {root: "test/data"} }) @statistic = "counters:test_counter:60" end diff --git a/test/unit/filestore_test.rb b/test/unit/filestore_test.rb new file mode 100644 index 000000000..d2f4acd50 --- /dev/null +++ b/test/unit/filestore_test.rb @@ -0,0 +1,40 @@ +require 'test_helper' +class FilestoreTest < Test::Unit::TestCase + + def setup + @filestore = Batsd::Filestore.new() + @statistic = "counters:test_counter:60" + end + + def test_filename_calculation_with_root + @filestore.root = 'test/data' + assert_equal "test/data/37/2a/372a5d5450ef177a737f6a92c0246436", @filestore.build_filename(@statistic) + end + + def test_filename_calculation_without_root # happens with S3 + @filestore.root = nil + assert_equal "37/2a/372a5d5450ef177a737f6a92c0246436", @filestore.build_filename(@statistic) + end + + def test_init_diskstore + options = { + filestore: 'diskstore', + diskstore: { + root: 'test/data' + } + } + + assert_equal Batsd::Filestore.init(options).class, Batsd::Diskstore + end + + def test_init_s3 + options = { + filestore: 's3', + s3: { + bucket: 'bucket' + } + } + + assert_equal Batsd::Filestore.init(options).class, Batsd::S3 + end +end diff --git a/test/unit/s3_test.rb b/test/unit/s3_test.rb new file mode 100644 index 000000000..28881c4fc --- /dev/null +++ b/test/unit/s3_test.rb @@ -0,0 +1,79 @@ +require 'test_helper' +require 'mocha/setup' + +class S3Test < Test::Unit::TestCase + + def setup + Batsd::S3.any_instance.stubs(:establish_connection).returns(true) + @bucket = 'fake-bucket' + + @s3 = Batsd::S3.new({ + s3: { + bucket: @bucket + } + }) + + @statistic = "counters:test_counter:60" + end + + def teardown + #FileUtils.rm("test/data/37/2a/372a5d5450ef177a737f6a92c0246436") rescue nil + end + + def test_store_writes_to_file + now = Time.now.to_i + value = "#{now} #{12}" + + @s3.expects(:fetch_file).with('37/2a/372a5d5450ef177a737f6a92c0246436').returns('') + @s3.expects(:store_file).with('37/2a/372a5d5450ef177a737f6a92c0246436', "#{value}\n") + @s3.append_value_to_file(@s3.build_filename(@statistic), value) + end + + def test_read_reads_from_file + now = Time.now.to_i - 50 + fake_file = '' + (1..50).each do |i| + fake_file += "#{now + i} #{i}\n" + end + + @s3.expects(:fetch_file).with(@s3.build_filename(@statistic)).returns(fake_file).at_least_once + + full_result = @s3.read(@statistic, now.to_s, (now + 50).to_s) + assert_equal 50, full_result.length + assert_equal 25, full_result[24][:value].to_f + partial_result = @s3.read(@statistic, (now+25).to_s, (now + 35).to_s) + assert_equal 11, partial_result.length + assert_equal 27, partial_result[2][:value].to_f + end + + def test_truncate_cleans_up_file + now = Time.now.to_i - 50 + fake_file = '' + (1..50).each do |i| + fake_file += "#{now + i} #{i}\n" + end + + @s3.expects(:fetch_file).with(@s3.build_filename(@statistic)).returns(fake_file).at_least_once + @s3.expects(:store_file).with(@s3.build_filename(@statistic), anything).at_least_once + + assert_equal 50, @s3.read(@statistic, now.to_s, (now + 50).to_s).length + + updated_file = @s3.truncate(@s3.build_filename(@statistic), (now+25).to_s) + + @s3.expects(:fetch_file).with(@s3.build_filename(@statistic)).returns(updated_file).at_least_once + + assert_equal 26, @s3.read(@statistic, now.to_s, (now + 50).to_s).length + end + + def test_delete_unlinks_file + s3_object = stub + s3_object.expects(:delete).returns(true) + + AWS::S3::S3Object.expects(:exists?).with(@s3.build_filename(@statistic), @bucket).returns(true) + AWS::S3::S3Object.expects(:find).with(@s3.build_filename(@statistic), @bucket).returns(s3_object) + + @s3.delete(@s3.build_filename(@statistic)) + end + + +end From c514cbf00782ecdb7d263716edd3d2531e875618 Mon Sep 17 00:00:00 2001 From: Nima Gardideh Date: Wed, 10 Jul 2013 15:05:42 -0400 Subject: [PATCH 3/5] Added backward compatibility support for the config file. --- config.yml | 8 ++++---- lib/batsd/filestore.rb | 15 ++++++++++----- lib/batsd/filestore/diskstore.rb | 1 - lib/batsd/filestore/s3.rb | 9 +++------ lib/batsd/server.rb | 2 +- 5 files changed, 18 insertions(+), 17 deletions(-) diff --git a/config.yml b/config.yml index ec0478714..0ebce16d2 100644 --- a/config.yml +++ b/config.yml @@ -4,12 +4,12 @@ port: 8125 filestore: 'diskstore' s3: - access_key: 'access key goes here' - secret_access_key: 'secret acecss key goes here' - bucket: 'bucket name goes here' + :access_key: 'access key goes here' + :secret_access_key: 'secret key goes here' + :bucket: 'bucket name goes here' diskstore: - root: tmp/statsd + :root: tmp/statsd redis: host: 127.0.0.1 diff --git a/lib/batsd/filestore.rb b/lib/batsd/filestore.rb index f734a0e44..788ed1360 100644 --- a/lib/batsd/filestore.rb +++ b/lib/batsd/filestore.rb @@ -6,11 +6,16 @@ class Filestore # Creates an instance of the right child depending on configuration def self.init(options) - case options[:filestore].downcase - when 'diskstore' then - Batsd::Diskstore.new(options) - when 's3' then - Batsd::S3.new(options) + + unless options[:filestore] + Batsd::Diskstore.new diskstore: { root: (options[:root] || options[:diskstore][:root]) } + else + case options[:filestore].downcase + when 'diskstore' then + Batsd::Diskstore.new(options) + when 's3' then + Batsd::S3.new(options) + end end end diff --git a/lib/batsd/filestore/diskstore.rb b/lib/batsd/filestore/diskstore.rb index f3590268a..e9ee9d1c9 100644 --- a/lib/batsd/filestore/diskstore.rb +++ b/lib/batsd/filestore/diskstore.rb @@ -1,5 +1,4 @@ require 'fileutils' - module Batsd # Handles disk operations -- writing, truncating, and reading class Diskstore < Filestore diff --git a/lib/batsd/filestore/s3.rb b/lib/batsd/filestore/s3.rb index 45020f25a..8b8959517 100644 --- a/lib/batsd/filestore/s3.rb +++ b/lib/batsd/filestore/s3.rb @@ -11,6 +11,7 @@ def initialize(options) # Fetch a file from AWS S3 def fetch_file(filename) + establish_connection data = AWS::S3::S3Object.value(filename, @bucket) if AWS::S3::S3Object.exists?(filename, @bucket) data || '' @@ -21,6 +22,8 @@ def fetch_file(filename) # folders and file types are handled by the gem def store_file(filename, file_data) + establish_connection + AWS::S3::S3Object.store filename, StringIO.new(file_data), @bucket, access: :authenticated_read end @@ -30,8 +33,6 @@ def store_file(filename, file_data) # the way), write the value and a newline, and close the file again. # def append_value_to_file(filename, value, attempts=0) - establish_connection - file_data = fetch_file(filename) + "#{value}\n" store_file filename, file_data @@ -49,8 +50,6 @@ def append_value_to_file(filename, value, attempts=0) # of {timestamp: ts, value: v} hashes. # def read(statistic, start_ts, end_ts) - establish_connection - datapoints = [] filename = build_filename statistic @@ -74,8 +73,6 @@ def read(statistic, start_ts, end_ts) # original. # def truncate(filename, since) - establish_connection - puts "Truncating #{filename} since #{since}" if ENV["VVERBOSE"] truncated_file_data = '' diff --git a/lib/batsd/server.rb b/lib/batsd/server.rb index 871c9ed79..31eefe105 100644 --- a/lib/batsd/server.rb +++ b/lib/batsd/server.rb @@ -18,7 +18,7 @@ def self.config=(config) def post_init puts "batsd server ready and waiting on #{Batsd::Server.config[:port]} to ship data upon request\n" @redis = Batsd::Redis.new(Batsd::Server.config) - @filestore = Batsd::Filestore.init(options) + @filestore = Batsd::Filestore.init(Batsd::Server.config) end def unbind From 3cfc79a9cad5b37e883a938fcc6322f5b641ac41 Mon Sep 17 00:00:00 2001 From: Nima Gardideh Date: Wed, 10 Jul 2013 15:11:10 -0400 Subject: [PATCH 4/5] Changed Readme.md to include information about S3. --- README.md | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 98750173c..a19416f11 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,10 @@ implementation](https://github.com/etsy/statsd), which they described in a [blog post](http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/). Batsd differs from etsy's statsd implementation primarily in how it stores data --- data is stored to a combination of Redis and flat files on disk. You can -read more about persistence in [About: -Persistence](http://noahhl.github.com/batsd/doc/file.persistence.html). +-- data is stored to a combination of Redis and flat files on disk (or AWS S3). You can +read more about persistence in [About: Persistence](http://noahhl.github.com/batsd/doc/file.persistence.html). -Batsd grew out of usage at [37signals](http://37signals.com), where it has been -used for the last year. An [earlier form](https://github.com/noahhl/statsd-server) was -inspired by [quasor](https://github.com/quasor/statsd). +Batsd grew out of usage at [37signals](http://37signals.com), where it has been used for the last year. An [earlier form](https://github.com/noahhl/statsd-server) was inspired by [quasor](https://github.com/quasor/statsd). ### Documentation: @@ -50,11 +47,26 @@ Example config.yml # Where to store data. Data at the first retention level is stored # in redis; further data retentions are stored on disk - # Root path to store disk aggregations - root: /statsd redis: :host: 127.0.0.1 :port: 6379 + + # Which file store to use. Available are: diskstore and s3 + filestore: 'diskstore' + + + # Diskstore configuration. + #This is only if you have filestore set to diskstore + diskstore: + # Root path to store disk aggregations + :root: tmp/statsd + + # S3 authentication and storage information + # This is only if you have filestore set to s3 + s3: + :access_key: 'access key goes here' + :secret_access_key: 'secret key goes here' + :bucket: 'bucket name goes here' # Configure how much data to retain at what intervals # Key is seconds, value is number of measurements at that From 00b1eb5b10fa6530626ba4aa9cd2fa7ecd9de672 Mon Sep 17 00:00:00 2001 From: Nima Gardideh Date: Wed, 10 Jul 2013 15:12:44 -0400 Subject: [PATCH 5/5] Fixed formatting on Readme.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a19416f11..52fe6ff77 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ Example config.yml # Diskstore configuration. - #This is only if you have filestore set to diskstore + # This is only if you have filestore set to diskstore diskstore: # Root path to store disk aggregations :root: tmp/statsd