diff --git a/lib/capistrano3/tasks/postgres.rb b/lib/capistrano3/tasks/postgres.rb index 45fff2b..ea32007 100644 --- a/lib/capistrano3/tasks/postgres.rb +++ b/lib/capistrano3/tasks/postgres.rb @@ -1,3 +1,5 @@ +require 'shellwords' + namespace :load do task :defaults do set :postgres_backup_dir, -> { 'postgres_backup' } @@ -13,81 +15,101 @@ set :postgres_backup_exclude_table, -> { [] } set :postgres_backup_table, -> { [] } set :postgres_database, -> { nil } + set :postgres_streaming_mode, false + set :postgres_stream_timeout, 3600 + set :postgres_stream_buffer_size, '64M' + set :postgres_verbose, true + set :postgres_restore_jobs, nil + set :postgres_fast_dump, false + set :postgres_ssh_multiplexing, true + set :postgres_backup_format, 'custom' end end namespace :postgres do namespace :backup do - desc 'Create database dump' + desc 'Create database dump (enhanced with streaming support)' task :create do on roles(fetch(:postgres_role)) do |role| - grab_remote_database_config - config = fetch(:postgres_remote_database_config) - - unless fetch(:postgres_remote_sqlc_file_path) - file_name = "db_backup.#{Time.now.strftime('%Y-%m-%d_%H-%M-%S')}.sqlc" - set :postgres_remote_sqlc_file_path, "#{shared_path}/#{fetch(:postgres_backup_dir)}/#{file_name}" - end - execute [ - "PGPASSWORD=#{config['password']}", - "pg_dump #{user_option(config)}", - "-h #{config['host']}", - config['port'] ? "-p #{config['port']}" : nil, - "-Fc", - "--file=#{fetch(:postgres_remote_sqlc_file_path)}", - "-Z #{fetch(:postgres_backup_compression_level)}", - fetch(:postgres_backup_exclude_table_data).map {|table| "--exclude-table-data=#{table}" }, - fetch(:postgres_backup_exclude_table).map {|table| "--exclude-table=#{table}" }, - fetch(:postgres_backup_table).map {|table| "--table=#{table}" }, - fetch(:postgres_remote_cluster) ? "--cluster #{fetch(:postgres_remote_cluster)}" : nil, - "#{config['database']}" - ].flatten.compact.join(' ') - end - end - - desc 'Download last database dump' - task :download, [:remove_remote_file] do |_task, args| - on roles(fetch(:postgres_role)) do |role| - unless fetch(:postgres_remote_sqlc_file_path) - file_name = capture("ls -v #{shared_path}/#{fetch :postgres_backup_dir}").split(/\n/).last - set :postgres_remote_sqlc_file_path, "#{shared_path}/#{fetch :postgres_backup_dir}/#{file_name}" - end + if fetch(:postgres_streaming_mode, false) + puts "Skipping remote dump creation - will stream directly" + else + grab_remote_database_config + config = fetch(:postgres_remote_database_config) - download!(fetch(:postgres_remote_sqlc_file_path), "tmp/#{fetch :postgres_backup_dir}/#{Pathname.new(fetch(:postgres_remote_sqlc_file_path)).basename}") - begin - remote_file = fetch(:postgres_remote_sqlc_file_path) - rescue SSHKit::Command::Failed => e - warn e.inspect - ensure - false_values = [nil, false, 0, '0', 'f', 'F', 'False', 'false', 'FALSE', 'Off', 'off', 'OFF', 'No', 'no', 'NO'] - execute "rm #{remote_file}" unless false_values.include?(args[:remove_remote_file]) + unless fetch(:postgres_remote_sqlc_file_path) + file_name = "db_backup.#{Time.now.strftime('%Y-%m-%d_%H-%M-%S')}.sqlc" + set :postgres_remote_sqlc_file_path, "#{shared_path}/#{fetch(:postgres_backup_dir)}/#{file_name}" + end + execute [ + "PGPASSWORD=#{config['password']}", + "pg_dump #{user_option(config)}", + "-h #{config['host']}", + config['port'] ? "-p #{config['port']}" : nil, + "-Fc", + "--file=#{fetch(:postgres_remote_sqlc_file_path)}", + "-Z #{fetch(:postgres_backup_compression_level)}", + fetch(:postgres_backup_exclude_table_data).map {|table| "--exclude-table-data=#{table}" }, + fetch(:postgres_backup_exclude_table).map {|table| "--exclude-table=#{table}" }, + fetch(:postgres_backup_table).map {|table| "--table=#{table}" }, + fetch(:postgres_remote_cluster) ? "--cluster #{fetch(:postgres_remote_cluster)}" : nil, + "#{config['database']}" + ].flatten.compact.join(' ') end end end - desc "Import last dump" - task :import do - grab_local_database_config - run_locally do - config = fetch(:postgres_local_database_config) - - unless fetch(:database_name) - ask(:database_name, config['database']) - end + desc 'Download last database dump (enhanced with streaming support)' + task :download, [:remove_remote_file] do |_task, args| + if fetch(:postgres_streaming_mode, false) + puts "Skipping download - using direct streaming" + else + on roles(fetch(:postgres_role)) do |role| + unless fetch(:postgres_remote_sqlc_file_path) + file_name = capture("ls -v #{shared_path}/#{fetch :postgres_backup_dir}").split(/\n/).last + set :postgres_remote_sqlc_file_path, "#{shared_path}/#{fetch :postgres_backup_dir}/#{file_name}" + end - with rails_env: :development do - file_name = capture("ls -v tmp/#{fetch :postgres_backup_dir}").split(/\n/).last - file_path = "tmp/#{fetch :postgres_backup_dir}/#{file_name}" + download!(fetch(:postgres_remote_sqlc_file_path), "tmp/#{fetch :postgres_backup_dir}/#{Pathname.new(fetch(:postgres_remote_sqlc_file_path)).basename}") begin - pgpass_path = File.join(Dir.pwd, '.pgpass') - File.open(pgpass_path, 'w+', 0600) { |file| file.write("*:*:*:#{config['username'] || config['user']}:#{config['password']}") } - execute "PGPASSFILE=#{pgpass_path} pg_restore -c --if-exists #{user_option(config)} --no-owner -h #{config['host']} -p #{config['port'] || 5432 } -d #{fetch(:database_name)} #{file_path}" - info 'Import performed successfully!' + remote_file = fetch(:postgres_remote_sqlc_file_path) rescue SSHKit::Command::Failed => e warn e.inspect ensure - File.delete(pgpass_path) if File.exist?(pgpass_path) - File.delete(file_path) if (fetch(:postgres_keep_local_dumps) == 0) && File.exist?(file_path) + false_values = [nil, false, 0, '0', 'f', 'F', 'False', 'false', 'FALSE', 'Off', 'off', 'OFF', 'No', 'no', 'NO'] + execute "rm #{remote_file}" unless false_values.include?(args[:remove_remote_file]) + end + end + end + end + + desc 'Import last dump (enhanced with streaming support)' + task :import, [:database_name] do |_task, args| + if fetch(:postgres_streaming_mode, false) + perform_streaming_import(args[:database_name]) + else + grab_local_database_config + run_locally do + config = fetch(:postgres_local_database_config) + + # Prompt for database name if not provided + database_name = args[:database_name] || ask(:database_name, config['database']) + set(:database_name, database_name) + + with rails_env: :development do + file_name = capture("ls -v tmp/#{fetch :postgres_backup_dir}").split(/\n/).last + file_path = "tmp/#{fetch :postgres_backup_dir}/#{file_name}" + begin + pgpass_path = File.join(Dir.pwd, '.pgpass') + File.open(pgpass_path, 'w+', 0600) { |file| file.write("*:*:*:#{config['username'] || config['user']}:#{config['password']}") } + execute "PGPASSFILE=#{pgpass_path} pg_restore -c --if-exists #{user_option(config)} --no-owner -h #{config['host']} -p #{config['port'] || 5432} -d #{fetch(:database_name)} #{file_path}" + info 'Import performed successfully!' + rescue SSHKit::Command::Failed => e + warn e.inspect + ensure + File.delete(pgpass_path) if File.exist?(pgpass_path) + File.delete(file_path) if (fetch(:postgres_keep_local_dumps) == 0) && File.exist?(file_path) + end end end end @@ -117,16 +139,43 @@ file_names[0...-fetch(:postgres_keep_local_dumps)].each {|file_name| File.delete("#{dir}/#{file_name}") } end end + + desc 'Force streaming mode for subsequent postgres tasks' + task :enable_streaming do + set :postgres_streaming_mode, true + puts "Streaming mode enabled - subsequent tasks will stream directly" + end + + desc 'Disable streaming mode' + task :disable_streaming do + set :postgres_streaming_mode, false + puts "Streaming mode disabled - using file-based operations" + end end - desc 'Replicate database locally' - task :replicate do + desc 'Replicate database locally (enhanced with streaming - no local storage)' + task :replicate, [:database_name] do |_task, args| + # Enable streaming mode for this operation + set :postgres_streaming_mode, true + + puts "Starting streaming replication (no local file storage)..." + + # Prompt for database name if not provided grab_local_database_config - ask(:database_name, fetch(:postgres_local_database_config)['database']) - invoke "postgres:backup:create" - invoke "postgres:backup:download", true - invoke "postgres:backup:import" - invoke("postgres:backup:cleanup") if fetch(:postgres_keep_local_dumps) > 0 + database_name = args[:database_name] + unless database_name + default_db = fetch(:postgres_local_database_config)['database'] + database_name = ask(:database_name, default_db) + end + set(:database_name, database_name) + + # Perform streaming operation + perform_streaming_replicate(database_name) + + # Disable streaming mode + set :postgres_streaming_mode, false + + puts "Streaming replication completed!" end def user_option(config) @@ -237,4 +286,413 @@ def env_variables_loader_code(env) end RUBY end + + def perform_streaming_replicate(database_name) + on roles(fetch(:postgres_role)) do |host| + with_postgres_credentials do |remote_config| + local_config = get_local_database_config(database_name) + + puts "Streaming from #{remote_config[:database]}@#{host} to local #{local_config[:database]}" + + # Show performance estimates + estimate_performance_improvement + + # Build optimized streaming command + streaming_command = build_optimized_streaming_pipeline( + remote_config: remote_config, + local_config: local_config, + remote_host: host + ) + + # Execute the streaming operation locally + run_locally do + info "Executing optimized streaming with parallel restore..." + info "Command: #{streaming_command.gsub(/PGPASSWORD='[^']*'/, "PGPASSWORD='***'")}" + + # Set timeout for long-running operations + with_timeout(fetch(:postgres_stream_timeout, 3600)) do + execute streaming_command + end + end + + # Cleanup any temporary files if created + cleanup_streaming_artifacts + end + end + end + + def perform_streaming_import(database_name = nil) + on roles(fetch(:postgres_role)) do |host| + with_postgres_credentials do |remote_config| + local_config = get_local_database_config(database_name) + + estimate_performance_improvement + + # Direct streaming from remote to local with optimizations + streaming_command = build_optimized_streaming_pipeline( + remote_config: remote_config, + local_config: local_config, + remote_host: host + ) + + run_locally do + with_timeout(fetch(:postgres_stream_timeout, 3600)) do + execute streaming_command + end + end + end + end + end + + def build_streaming_pipeline(remote_config:, local_config:, remote_host:) + # Build remote pg_dump command + dump_cmd = build_remote_dump_command(remote_config) + + # Build local pg_restore command + restore_cmd = build_local_restore_command(local_config) + + # Build SSH connection + ssh_cmd = build_ssh_command(remote_host) + + # Determine if compression should be used + if fetch(:postgres_backup_compression_level, 0) > 0 + compression_level = fetch(:postgres_backup_compression_level) + # SSH with compression: remote_dump | gzip -> SSH -> gunzip | local_restore + "#{ssh_cmd} '#{dump_cmd} | gzip -#{compression_level}' | gunzip | #{restore_cmd}" + else + # Direct streaming: SSH remote_dump -> local_restore + "#{ssh_cmd} '#{dump_cmd}' | #{restore_cmd}" + end + end + + def build_remote_dump_command(config) + cmd_parts = [ + "PGPASSWORD='#{config[:password]}'", + 'pg_dump' + ] + + # Add dump options + cmd_parts << '--format=custom' unless fetch(:postgres_backup_format) == 'sql' + cmd_parts << '--verbose' if fetch(:postgres_verbose, true) + cmd_parts << '--no-acl' + cmd_parts << '--no-owner' + + # Add connection parameters + cmd_parts << "--host=#{config[:host]}" if config[:host] != 'localhost' + cmd_parts << "--port=#{config[:port]}" if config[:port] != 5432 + cmd_parts << "--username=#{config[:username]}" if config[:username] + + # Add table exclusions + exclude_tables = fetch(:postgres_backup_exclude_table, []) + exclude_tables = exclude_tables.call if exclude_tables.respond_to?(:call) + exclude_tables.each do |table| + cmd_parts << "--exclude-table=#{table}" + end + + # Add table data exclusions + exclude_table_data = fetch(:postgres_backup_exclude_table_data, []) + exclude_table_data = exclude_table_data.call if exclude_table_data.respond_to?(:call) + exclude_table_data.each do |table| + cmd_parts << "--exclude-table-data=#{table}" + end + + # Add specific tables if specified + backup_tables = fetch(:postgres_backup_table, []) + backup_tables = backup_tables.call if backup_tables.respond_to?(:call) + backup_tables.each do |table| + cmd_parts << "--table=#{table}" + end + + # Add database name + cmd_parts << config[:database] + + cmd_parts.join(' ') + end + + def build_local_restore_command(config, streaming = false) + # Always use pg_restore for custom format (like original version) + cmd_parts = [ + "PGPASSWORD=#{Shellwords.escape(config[:password])}", + 'pg_restore' + ] + + cmd_parts << '--verbose' if fetch(:postgres_verbose, true) + cmd_parts << '--clean' + cmd_parts << '--no-acl' + cmd_parts << '--no-owner' + + # Add parallel processing based on CPU cores, but NOT for streaming mode + # pg_restore --jobs cannot read from stdin (streaming) + unless streaming + parallel_jobs = get_optimal_parallel_jobs + cmd_parts << "--jobs=#{parallel_jobs}" if parallel_jobs > 1 + end + + # Add local connection parameters + cmd_parts << "--host=#{config[:host]}" if config[:host] + cmd_parts << "--port=#{config[:port]}" if config[:port] && config[:port] != 5432 + cmd_parts << "--username=#{config[:username]}" if config[:username] + cmd_parts << "--dbname=#{Shellwords.escape(config[:database])}" + + cmd_parts.join(' ') + end + + def build_ssh_command(remote_host) + ssh_parts = ['ssh'] + + # Add SSH options + ssh_parts << "-p #{remote_host.port}" if remote_host.port && remote_host.port != 22 + + # Add SSH key if specified + if remote_host.ssh_options && remote_host.ssh_options[:keys] + key_file = remote_host.ssh_options[:keys].first + ssh_parts << "-i #{key_file}" + end + + # Add compression for SSH if not already compressing pg_dump output + if fetch(:postgres_backup_compression_level, 0) == 0 + ssh_parts << '-C' # Enable SSH compression + end + + # Add connection details + ssh_parts << "#{remote_host.user}@#{remote_host.hostname}" + + ssh_parts.join(' ') + end + + def with_postgres_credentials + # Get remote database configuration + grab_remote_database_config + config = fetch(:postgres_remote_database_config) + + remote_config = { + username: config['username'] || config['user'], + password: config['password'], + database: config['database'], + host: config['host'], + port: config['port'] + } + + yield remote_config + end + + def get_local_database_config(database_name = nil) + # Get local database configuration for import + local_config = fetch(:postgres_local_database_config) + + { + database: database_name || fetch(:database_name) || local_config['database'], + username: local_config['username'] || local_config['user'], + password: local_config['password'], + host: local_config['host'] || 'localhost', + port: local_config['port'] || 5432 + } + end + + def cleanup_streaming_artifacts + # Clean up any temporary files that might have been created + # This is mainly for error recovery scenarios + temp_patterns = [ + '/tmp/postgres_stream_*', + '/tmp/pg_dump_*', + '/tmp/pg_restore_*' + ] + + run_locally do + temp_patterns.each do |pattern| + execute "rm -f #{pattern} 2>/dev/null || true" + end + end + end + + def with_timeout(seconds) + begin + yield + rescue => e + error "Operation timed out or failed: #{e.message}" + raise + end + end + + def build_optimized_streaming_pipeline(remote_config:, local_config:, remote_host:) + # Enhanced version with performance optimizations + + # Build remote pg_dump command with optimizations + dump_cmd = build_optimized_dump_command(remote_config) + + # Build local pg_restore command - streaming mode (no parallel jobs) + restore_cmd = build_local_restore_command(local_config, true) + + # Build SSH connection with optimizations + ssh_cmd = build_optimized_ssh_command(remote_host) + + # Determine compression and buffering strategy + compression_level = fetch(:postgres_backup_compression_level, 0) + buffer_size = fetch(:postgres_stream_buffer_size, '64M') + + if compression_level > 0 + # Optimized compression pipeline with buffering + remote_cmd = "#{dump_cmd} | gzip -#{compression_level}" + if command_available?('pv') + # With progress monitoring and buffering + "#{ssh_cmd} #{Shellwords.escape(remote_cmd)} | pv -pterab -B #{buffer_size} | gunzip | #{restore_cmd}" + else + # Standard compression with buffering + "#{ssh_cmd} #{Shellwords.escape(remote_cmd)} | gunzip | #{restore_cmd}" + end + else + # Direct streaming with SSH compression and buffering + if command_available?('pv') + "#{ssh_cmd} #{Shellwords.escape(dump_cmd)} | pv -pterab -B #{buffer_size} | #{restore_cmd}" + else + "#{ssh_cmd} #{Shellwords.escape(dump_cmd)} | #{restore_cmd}" + end + end + end + + def build_optimized_dump_command(config) + cmd_parts = [ + "PGPASSWORD=#{Shellwords.escape(config[:password])}", + 'pg_dump' + ] + + # Use custom format like the original version (-Fc) + cmd_parts << '-Fc' + cmd_parts << '--verbose' if fetch(:postgres_verbose, true) + cmd_parts << '--no-acl' + cmd_parts << '--no-owner' + + # Optimize dump performance + cmd_parts << '--compress=0' # Don't compress in pg_dump, we'll handle it in pipeline + + # Add synchronous_commit=off for faster dumping (if safe) + if fetch(:postgres_fast_dump, false) + cmd_parts << '--set=synchronous_commit=off' + end + + # Add connection parameters + cmd_parts << "--host=#{config[:host]}" if config[:host] != 'localhost' + cmd_parts << "--port=#{config[:port]}" if config[:port] != 5432 + cmd_parts << "--username=#{config[:username]}" if config[:username] + + # Add table exclusions + exclude_tables = fetch(:postgres_backup_exclude_table, []) + exclude_tables = exclude_tables.call if exclude_tables.respond_to?(:call) + exclude_tables.each do |table| + cmd_parts << "--exclude-table=#{table}" + end + + # Add table data exclusions + exclude_table_data = fetch(:postgres_backup_exclude_table_data, []) + exclude_table_data = exclude_table_data.call if exclude_table_data.respond_to?(:call) + exclude_table_data.each do |table| + cmd_parts << "--exclude-table-data=#{table}" + end + + # Add specific tables if specified + backup_tables = fetch(:postgres_backup_table, []) + backup_tables = backup_tables.call if backup_tables.respond_to?(:call) + backup_tables.each do |table| + cmd_parts << "--table=#{table}" + end + + # Add database name + cmd_parts << config[:database] + + cmd_parts.join(' ') + end + + def build_optimized_ssh_command(remote_host) + ssh_parts = ['ssh'] + + # Performance optimizations for SSH + ssh_parts << '-o Compression=no' # Disable SSH compression if we're using gzip + ssh_parts << '-o TCPKeepAlive=yes' + ssh_parts << '-o ServerAliveInterval=60' + ssh_parts << '-o ServerAliveCountMax=3' + + # Add port if specified + ssh_parts << "-p #{remote_host.port}" if remote_host.port && remote_host.port != 22 + + # Add SSH key if specified + if remote_host.ssh_options && remote_host.ssh_options[:keys] + key_file = remote_host.ssh_options[:keys].first + ssh_parts << "-i #{key_file}" + end + + # Use SSH multiplexing if available for better performance + if fetch(:postgres_ssh_multiplexing, true) + ssh_parts << '-o ControlMaster=auto' + ssh_parts << '-o ControlPath=/tmp/ssh_mux_%h_%p_%r' + ssh_parts << '-o ControlPersist=10m' + end + + # Add connection details + ssh_parts << "#{remote_host.user}@#{remote_host.hostname}" + + ssh_parts.join(' ') + end + + def get_optimal_parallel_jobs + # Get user-configured value or auto-detect + configured_jobs = fetch(:postgres_restore_jobs, nil) + return configured_jobs if configured_jobs + + # Auto-detect CPU cores + cpu_cores = detect_cpu_cores + + # Conservative parallel job calculation + # Use 75% of cores, minimum 1, maximum 8 (to avoid overwhelming the database) + optimal_jobs = [(cpu_cores * 0.75).ceil, 1].max + optimal_jobs = [optimal_jobs, 8].min + + puts "Auto-detected #{cpu_cores} CPU cores, using #{optimal_jobs} parallel restore jobs" + optimal_jobs + end + + def detect_cpu_cores + cores = 1 # fallback + + run_locally do + # Try different methods to detect CPU cores + begin + if test('which nproc') + # Linux + cores = capture('nproc').strip.to_i + elsif test('which sysctl') + # macOS/BSD + cores = capture('sysctl -n hw.ncpu').strip.to_i + elsif File.exist?('/proc/cpuinfo') + # Linux fallback + cores = capture('grep -c processor /proc/cpuinfo').strip.to_i + else + # Ruby fallback + require 'etc' + cores = Etc.nprocessors + end + rescue => e + warn "Could not detect CPU cores: #{e.message}, using 1 core" + cores = 1 + end + end + + cores > 0 ? cores : 1 + end + + def command_available?(command) + run_locally do + test("which #{command}") + end + rescue + false + end + + def estimate_performance_improvement + cpu_cores = detect_cpu_cores + + # Note: Streaming mode uses single-threaded restore (pg_restore --jobs doesn't work with stdin) + puts "Streaming mode: single-threaded restore (parallel restore requires files, not stdin)" + puts "Performance benefit: no local disk I/O, immediate streaming" + puts "System: #{cpu_cores} CPU cores detected" + end end