From 9206712f7cb07590ea92b6a84e20999ac22e539d Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Fri, 30 Dec 2016 12:48:46 -0800 Subject: [PATCH 01/11] Add support for streaming stdout/stderr from Child invocations --- lib/posix/spawn/child.rb | 27 +++++++++++++++++++++++++-- test/test_child.rb | 30 ++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index 98e3ae2..70973e2 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -95,6 +95,13 @@ def initialize(*args) @options[:pgroup] = true end @options.delete(:chdir) if @options[:chdir].nil? + @streaming = false + if streams = @options.delete(:streams) + @stdout_block = streams[:stdout] + @stderr_block = streams[:stderr] + + @streaming = !!@stdout_block || !!@stderr_block + end exec! if !@options.delete(:noexec) end @@ -244,14 +251,30 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) # read from stdout and stderr streams ready[0].each do |fd| - buf = (fd == stdout) ? @out : @err + chunk = nil begin - buf << fd.readpartial(BUFSIZE) + chunk = fd.readpartial(BUFSIZE) rescue Errno::EAGAIN, Errno::EINTR rescue EOFError readers.delete(fd) fd.close end + + if chunk + if fd == stdout + if @streaming && @stdout_block + @stdout_block.call(chunk) + else + @out << chunk + end + else + if @streaming && @stderr_block + @stderr_block.call(chunk) + else + @err << chunk + end + end + end end # keep tabs on the total amount of time we've spent here diff --git a/test/test_child.rb b/test/test_child.rb index e869c64..3362518 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -224,6 +224,36 @@ def test_utf8_input_long assert p.success? end + def test_streaming_stdout + stdout_buf = "" + stdout_stream = Proc.new do |chunk| + stdout_buf << chunk + end + + input = "hello!" + p = Child.new('cat', :input => input, :streams => { + :stdout => stdout_stream + }) + + assert p.success? + assert_equal input, stdout_buf + end + + def test_streaming_stderr + stderr_buf = "" + stderr_stream = Proc.new do |chunk| + stderr_buf << chunk + end + + input = "hello!" + p = Child.new('ls', '-?', :input => input, :streams => { + :stderr => stderr_stream + }) + + refute p.success? + refute stderr_buf.empty? + end + ## # Assertion Helpers From 0c02f3337575a9fcdd1ba6135882afcbb0aaf290 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Fri, 30 Dec 2016 13:21:14 -0800 Subject: [PATCH 02/11] allow aborting early while streaming --- lib/posix/spawn.rb | 4 ++++ lib/posix/spawn/child.rb | 9 +++++++-- test/test_child.rb | 33 +++++++++++++++++++++++++++++++-- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/lib/posix/spawn.rb b/lib/posix/spawn.rb index ac01638..10f55bc 100644 --- a/lib/posix/spawn.rb +++ b/lib/posix/spawn.rb @@ -337,6 +337,10 @@ class MaximumOutputExceeded < StandardError class TimeoutExceeded < StandardError end + # Exception raised when output streaming is aborted early. + class Aborted < StandardError + end + private # Turns the various varargs incantations supported by Process::spawn into a diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index 70973e2..87b6bf3 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -260,21 +260,26 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) fd.close end + abort = false if chunk if fd == stdout if @streaming && @stdout_block - @stdout_block.call(chunk) + abort = !!@stdout_block.call(chunk) else @out << chunk end else if @streaming && @stderr_block - @stderr_block.call(chunk) + abort = !!@stderr_block.call(chunk) else @err << chunk end end end + + if @streaming && abort + raise Aborted + end end # keep tabs on the total amount of time we've spent here diff --git a/test/test_child.rb b/test/test_child.rb index 3362518..14118fb 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -228,6 +228,8 @@ def test_streaming_stdout stdout_buf = "" stdout_stream = Proc.new do |chunk| stdout_buf << chunk + + false end input = "hello!" @@ -243,10 +245,11 @@ def test_streaming_stderr stderr_buf = "" stderr_stream = Proc.new do |chunk| stderr_buf << chunk + + false end - input = "hello!" - p = Child.new('ls', '-?', :input => input, :streams => { + p = Child.new('ls', '-?', :streams => { :stderr => stderr_stream }) @@ -254,6 +257,32 @@ def test_streaming_stderr refute stderr_buf.empty? end + def test_streaming_stdout_aborted + stdout_stream = Proc.new do |chunk| + true + end + + input = "hello!" + assert_raises POSIX::Spawn::Aborted do + p = Child.new('cat', :input => input, :streams => { + :stdout => stdout_stream + }) + end + end + + def test_streaming_stderr_aborted + stderr_stream = Proc.new do |chunk| + true + end + + input = "hello!" + assert_raises POSIX::Spawn::Aborted do + p = Child.new('ls', '-?', :streams => { + :stderr => stderr_stream + }) + end + end + ## # Assertion Helpers From b6e35f8fde34fcb13c888bb47007f57dddfa2219 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 09:16:48 -0800 Subject: [PATCH 03/11] Switch to a writer interface from using a proc This requires that the stdout and stderr stream objects passed respond to `#write` and `#string` methods. --- lib/posix/spawn/child.rb | 50 ++++++++++++++++++---------------------- test/test_child.rb | 30 +++++++++--------------- 2 files changed, 34 insertions(+), 46 deletions(-) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index 87b6bf3..d7c99fb 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -95,13 +95,15 @@ def initialize(*args) @options[:pgroup] = true end @options.delete(:chdir) if @options[:chdir].nil? - @streaming = false - if streams = @options.delete(:streams) - @stdout_block = streams[:stdout] - @stderr_block = streams[:stderr] - @streaming = !!@stdout_block || !!@stderr_block + @stdout_buffer = StringIO.new + @stderr_buffer = StringIO.new + + if streams = @options.delete(:streams) + @stdout_buffer = streams[:stdout] if streams[:stdout] + @stderr_buffer = streams[:stderr] if streams[:stderr] end + exec! if !@options.delete(:noexec) end @@ -131,10 +133,14 @@ def self.build(*args) end # All data written to the child process's stdout stream as a String. - attr_reader :out + def out + @stdout_buffer.string + end # All data written to the child process's stderr stream as a String. - attr_reader :err + def err + @stderr_buffer.string + end # A Process::Status object with information on how the child exited. attr_reader :status @@ -203,16 +209,16 @@ def exec! # exceeds the amount specified by the max argument. def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) max = nil if max && max <= 0 - @out, @err = '', '' # force all string and IO encodings to BINARY under 1.9 for now - if @out.respond_to?(:force_encoding) and stdin.respond_to?(:set_encoding) + if @stdout_buffer.respond_to?(:set_encoding) + bin_encoding = Encoding::BINARY [stdin, stdout, stderr].each do |fd| - fd.set_encoding('BINARY', 'BINARY') + fd.set_encoding(bin_encoding, bin_encoding) end - @out.force_encoding('BINARY') - @err.force_encoding('BINARY') - input = input.dup.force_encoding('BINARY') if input + @stdout_buffer.set_encoding(bin_encoding) + @stderr_buffer.set_encoding(bin_encoding) + input = input.dup.force_encoding(bin_encoding) if input end timeout = nil if timeout && timeout <= 0.0 @@ -263,21 +269,13 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) abort = false if chunk if fd == stdout - if @streaming && @stdout_block - abort = !!@stdout_block.call(chunk) - else - @out << chunk - end + abort = (@stdout_buffer.write(chunk) == 0) else - if @streaming && @stderr_block - abort = !!@stderr_block.call(chunk) - else - @err << chunk - end + abort = (@stderr_buffer.write(chunk) == 0) end end - if @streaming && abort + if abort raise Aborted end end @@ -290,12 +288,10 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) end # maybe we've hit our max output - if max && ready[0].any? && (@out.size + @err.size) > max + if max && ready[0].any? && (@stdout_buffer.size + @stderr_buffer.size) > max raise MaximumOutputExceeded end end - - [@out, @err] end # Wait for the child process to exit diff --git a/test/test_child.rb b/test/test_child.rb index 14118fb..4fcaffe 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -131,7 +131,7 @@ def test_max_with_stubborn_child_pgroup_kill def test_max_with_partial_output p = Child.build('yes', :max => 100_000) - assert_nil p.out + assert_empty p.out assert_raises MaximumOutputExceeded do p.exec! end @@ -225,12 +225,7 @@ def test_utf8_input_long end def test_streaming_stdout - stdout_buf = "" - stdout_stream = Proc.new do |chunk| - stdout_buf << chunk - - false - end + stdout_stream = StringIO.new input = "hello!" p = Child.new('cat', :input => input, :streams => { @@ -238,28 +233,24 @@ def test_streaming_stdout }) assert p.success? - assert_equal input, stdout_buf + assert_equal input, stdout_stream.string end def test_streaming_stderr - stderr_buf = "" - stderr_stream = Proc.new do |chunk| - stderr_buf << chunk - - false - end + stderr_stream = StringIO.new p = Child.new('ls', '-?', :streams => { :stderr => stderr_stream }) refute p.success? - refute stderr_buf.empty? + assert stderr_stream.size > 0 end def test_streaming_stdout_aborted - stdout_stream = Proc.new do |chunk| - true + stdout_stream = StringIO.new + def stdout_stream.write(chunk) + 0 end input = "hello!" @@ -271,8 +262,9 @@ def test_streaming_stdout_aborted end def test_streaming_stderr_aborted - stderr_stream = Proc.new do |chunk| - true + stderr_stream = StringIO.new + def stderr_stream.write(chunk) + 0 end input = "hello!" From 254704aaad32206cee2304b5dd72a53fb129904d Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 21:15:49 -0800 Subject: [PATCH 04/11] go back to proc-based streams, cleanup a little --- lib/posix/spawn/child.rb | 62 +++++++++++++++++++++++----------------- test/test_child.rb | 24 +++++++++------- 2 files changed, 49 insertions(+), 37 deletions(-) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index d7c99fb..51cb5ee 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -96,12 +96,24 @@ def initialize(*args) end @options.delete(:chdir) if @options[:chdir].nil? - @stdout_buffer = StringIO.new - @stderr_buffer = StringIO.new + @out, @err = "", "" + + @stdout_stream = Proc.new do |chunk| + @out << chunk + end + + @stderr_stream = Proc.new do |chunk| + @err << chunk + end if streams = @options.delete(:streams) - @stdout_buffer = streams[:stdout] if streams[:stdout] - @stderr_buffer = streams[:stderr] if streams[:stderr] + if streams[:stdout] + @stdout_stream = streams[:stdout] + end + + if streams[:stderr] + @stderr_stream = streams[:stderr] + end end exec! if !@options.delete(:noexec) @@ -133,14 +145,10 @@ def self.build(*args) end # All data written to the child process's stdout stream as a String. - def out - @stdout_buffer.string - end + attr_reader :out # All data written to the child process's stderr stream as a String. - def err - @stderr_buffer.string - end + attr_reader :err # A Process::Status object with information on how the child exited. attr_reader :status @@ -191,6 +199,11 @@ def exec! # Maximum buffer size for reading BUFSIZE = (32 * 1024) + @@encoding_aware = "".respond_to?(:force_encoding) + def encoding_aware? + @@encoding_aware + end + # Start a select loop writing any input on the child's stdin and reading # any output from the child's stdout or stderr. # @@ -209,15 +222,16 @@ def exec! # exceeds the amount specified by the max argument. def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) max = nil if max && max <= 0 + @out, @err = "", "" # force all string and IO encodings to BINARY under 1.9 for now - if @stdout_buffer.respond_to?(:set_encoding) + if encoding_aware? bin_encoding = Encoding::BINARY [stdin, stdout, stderr].each do |fd| fd.set_encoding(bin_encoding, bin_encoding) end - @stdout_buffer.set_encoding(bin_encoding) - @stderr_buffer.set_encoding(bin_encoding) + @out.force_encoding(bin_encoding) + @err.force_encoding(bin_encoding) input = input.dup.force_encoding(bin_encoding) if input end @@ -236,6 +250,9 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) slice_method = input.respond_to?(:byteslice) ? :byteslice : :slice t = timeout + streams = {stdout => @stdout_stream, stderr => @stderr_stream} + + bytes_seen = 0 while readers.any? || writers.any? ready = IO.select(readers, writers, readers + writers, t) raise TimeoutExceeded if ready.nil? @@ -260,24 +277,15 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) chunk = nil begin chunk = fd.readpartial(BUFSIZE) + + raise Aborted unless streams[fd].call(chunk) + + bytes_seen += chunk.bytesize rescue Errno::EAGAIN, Errno::EINTR rescue EOFError readers.delete(fd) fd.close end - - abort = false - if chunk - if fd == stdout - abort = (@stdout_buffer.write(chunk) == 0) - else - abort = (@stderr_buffer.write(chunk) == 0) - end - end - - if abort - raise Aborted - end end # keep tabs on the total amount of time we've spent here @@ -288,7 +296,7 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) end # maybe we've hit our max output - if max && ready[0].any? && (@stdout_buffer.size + @stderr_buffer.size) > max + if max && ready[0].any? && bytes_seen > max raise MaximumOutputExceeded end end diff --git a/test/test_child.rb b/test/test_child.rb index 4fcaffe..fba8f66 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -225,7 +225,10 @@ def test_utf8_input_long end def test_streaming_stdout - stdout_stream = StringIO.new + stdout_buff = "" + stdout_stream = Proc.new do |chunk| + stdout_buff << chunk + end input = "hello!" p = Child.new('cat', :input => input, :streams => { @@ -233,24 +236,26 @@ def test_streaming_stdout }) assert p.success? - assert_equal input, stdout_stream.string + assert_equal input, stdout_buff end def test_streaming_stderr - stderr_stream = StringIO.new + stderr_buff = "" + stderr_stream = Proc.new do |chunk| + stderr_buff << chunk + end p = Child.new('ls', '-?', :streams => { :stderr => stderr_stream }) refute p.success? - assert stderr_stream.size > 0 + assert stderr_buff.size > 0 end def test_streaming_stdout_aborted - stdout_stream = StringIO.new - def stdout_stream.write(chunk) - 0 + stdout_stream = Proc.new do |chunk| + false end input = "hello!" @@ -262,9 +267,8 @@ def stdout_stream.write(chunk) end def test_streaming_stderr_aborted - stderr_stream = StringIO.new - def stderr_stream.write(chunk) - 0 + stderr_stream = Proc.new do |chunk| + false end input = "hello!" From 25dc138f7e8719e6181096a3d9fd5a480ad17938 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 21:24:58 -0800 Subject: [PATCH 05/11] be explicit with default streams --- lib/posix/spawn/child.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index 51cb5ee..d1f4e43 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -100,10 +100,14 @@ def initialize(*args) @stdout_stream = Proc.new do |chunk| @out << chunk + + true end @stderr_stream = Proc.new do |chunk| @err << chunk + + true end if streams = @options.delete(:streams) From 54647e65c70fceac31fd58756989b6033b4f6a47 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 21:26:13 -0800 Subject: [PATCH 06/11] just to keep the diff smaller --- lib/posix/spawn/child.rb | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index d1f4e43..a0d5c0a 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -226,17 +226,16 @@ def encoding_aware? # exceeds the amount specified by the max argument. def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) max = nil if max && max <= 0 - @out, @err = "", "" + @out, @err = '', '' # force all string and IO encodings to BINARY under 1.9 for now if encoding_aware? - bin_encoding = Encoding::BINARY [stdin, stdout, stderr].each do |fd| - fd.set_encoding(bin_encoding, bin_encoding) + fd.set_encoding('BINARY', 'BINARY') end - @out.force_encoding(bin_encoding) - @err.force_encoding(bin_encoding) - input = input.dup.force_encoding(bin_encoding) if input + @out.force_encoding('BINARY') + @err.force_encoding('BINARY') + input = input.dup.force_encoding('BINARY') if input end timeout = nil if timeout && timeout <= 0.0 From f263cfcc4fa575e1f4d707fb8f3b3e6915915d7c Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 21:27:07 -0800 Subject: [PATCH 07/11] a couple of other reverts --- lib/posix/spawn/child.rb | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index a0d5c0a..2d617ea 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -203,11 +203,6 @@ def exec! # Maximum buffer size for reading BUFSIZE = (32 * 1024) - @@encoding_aware = "".respond_to?(:force_encoding) - def encoding_aware? - @@encoding_aware - end - # Start a select loop writing any input on the child's stdin and reading # any output from the child's stdout or stderr. # @@ -229,7 +224,7 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) @out, @err = '', '' # force all string and IO encodings to BINARY under 1.9 for now - if encoding_aware? + if @out.respond_to?(:force_encoding) and stdin.respond_to?(:set_encoding) [stdin, stdout, stderr].each do |fd| fd.set_encoding('BINARY', 'BINARY') end From ac327f00536db4fe883b01e7ca33ed806d45ac77 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 21:48:57 -0800 Subject: [PATCH 08/11] reuse a buffer for reading chunks from streams --- lib/posix/spawn/child.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/posix/spawn/child.rb b/lib/posix/spawn/child.rb index 2d617ea..0140583 100644 --- a/lib/posix/spawn/child.rb +++ b/lib/posix/spawn/child.rb @@ -251,6 +251,7 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) streams = {stdout => @stdout_stream, stderr => @stderr_stream} bytes_seen = 0 + chunk_buffer = "" while readers.any? || writers.any? ready = IO.select(readers, writers, readers + writers, t) raise TimeoutExceeded if ready.nil? @@ -272,13 +273,12 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil) # read from stdout and stderr streams ready[0].each do |fd| - chunk = nil begin - chunk = fd.readpartial(BUFSIZE) + fd.readpartial(BUFSIZE, chunk_buffer) - raise Aborted unless streams[fd].call(chunk) + raise Aborted unless streams[fd].call(chunk_buffer) - bytes_seen += chunk.bytesize + bytes_seen += chunk_buffer.bytesize rescue Errno::EAGAIN, Errno::EINTR rescue EOFError readers.delete(fd) From 4bf5d80fb2aae4f774bd477b002d407af2e417f8 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Tue, 3 Jan 2017 23:44:16 -0800 Subject: [PATCH 09/11] test crossing default buffer bounary --- test/test_child.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/test_child.rb b/test/test_child.rb index fba8f66..fd46c18 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -279,6 +279,24 @@ def test_streaming_stderr_aborted end end + def test_streaming_stdout_over_default_bufsize_boundary + stdout_buff = "" + chunk_count = 0 + stdout_stream = Proc.new do |chunk| + chunk_count += 1 + stdout_buff << chunk + end + + limit = Child::BUFSIZE * 2 + + begin + Child.new('yes', :streams => {:stdout => stdout_stream}, :max => limit) + rescue POSIX::Spawn::MaximumOutputExceeded + end + + assert chunk_count > 1 + end + ## # Assertion Helpers From 90acd3013ac71ccc39affa76221c00f831028472 Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Wed, 4 Jan 2017 09:57:42 -0800 Subject: [PATCH 10/11] use assert_raises --- test/test_child.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/test_child.rb b/test/test_child.rb index fd46c18..8ecaa9c 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -289,9 +289,8 @@ def test_streaming_stdout_over_default_bufsize_boundary limit = Child::BUFSIZE * 2 - begin + assert_raises POSIX::Spawn::MaximumOutputExceeded do Child.new('yes', :streams => {:stdout => stdout_stream}, :max => limit) - rescue POSIX::Spawn::MaximumOutputExceeded end assert chunk_count > 1 From c83a7ef38a1860068b56106de6570a3319ce042a Mon Sep 17 00:00:00 2001 From: Brian Lopez Date: Wed, 4 Jan 2017 16:18:39 -0800 Subject: [PATCH 11/11] add failing test making sure we only ever receive :max bytes --- test/test_child.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_child.rb b/test/test_child.rb index 8ecaa9c..e09a458 100644 --- a/test/test_child.rb +++ b/test/test_child.rb @@ -293,6 +293,7 @@ def test_streaming_stdout_over_default_bufsize_boundary Child.new('yes', :streams => {:stdout => stdout_stream}, :max => limit) end + assert_equal limit, stdout_buff.bytesize assert chunk_count > 1 end