From f8a432b52c16a370f1bf2ff207025b16e911864d Mon Sep 17 00:00:00 2001 From: Maksym Davydov Date: Tue, 19 Sep 2017 13:12:39 +0200 Subject: [PATCH 1/2] Fixed consumer getting stuck in acquire() after item is explicitly ignored Without a fix, if we encounter this scenario, we will get an infinite loop in acquire(): 1. start consuming 2. explicitly ignore() any item 3. continue consuming 4. wait till reset() is called (queue is exhausted or consume() called again) 4. try consume more items 5. acquire() receives already ignored item 6. item is ignored but $self->{last_id} is not updated 7. we skip item and loop back to #5, where process same item again Fix just updates $self->{last_id} when we skip an ignored item. This logic is similar to original logic in Data::Consumer::MySQL. --- lib/Data/Consumer/MySQL2.pm | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 7deaa62..acc3d2d 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -323,7 +323,10 @@ sub acquire { $self->debug_warn( 5, "last_id was $self->{last_id}"); my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } ); if ( defined $id ) { - next if $self->is_ignored($id); + if ( $self->is_ignored($id) ) { + $self->{last_id}= $id; + next; + } my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } ); if ( not defined $got_id) { $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!"); From 0644a2fb11ade2017e4cd5e56bcbceff8eac8306 Mon Sep 17 00:00:00 2001 From: Maksym Davydov Date: Tue, 19 Sep 2017 13:41:41 +0200 Subject: [PATCH 2/2] Added test case for infinite loop in acquire() after item is explicitely ignored --- t/01-mysql.t | 6 ++++++ t/07-mysql-ignore.t | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 t/07-mysql-ignore.t diff --git a/t/01-mysql.t b/t/01-mysql.t index 6549609..11ece0d 100644 --- a/t/01-mysql.t +++ b/t/01-mysql.t @@ -7,6 +7,7 @@ use DBI; my $debug = @ARGV ? shift : $ENV{TEST_DEBUG}; our @fake_error; our @expect_fail; +our %ignored; our %process_state; our @connect_args; our $table; @@ -122,6 +123,11 @@ $consumer->consume(sub { my ($consumer,$id,$dbh) = @_; $debug and $consumer->debug_warn(0,"*** processing '$id'"); $debug and $consumer->debug_warn(0,$id,Dumper($dbh->selectrow_arrayref("select IS_USED_LOCK(CONCAT_WS('=','$0-$table',$id))"))); + if($ignored{$id}) { + $debug and $consumer->debug_warn(0,"* ignoring '$id' as requested\n"); + $consumer->ignore(); + return; + } sleep(1); $dbh->do("UPDATE `$table` SET `n` = `n` + 1 WHERE `id` = ?", undef, $id); diff --git a/t/07-mysql-ignore.t b/t/07-mysql-ignore.t new file mode 100644 index 0000000..173f6e6 --- /dev/null +++ b/t/07-mysql-ignore.t @@ -0,0 +1,26 @@ +use strict; +use warnings; +use Cwd; +our %process_state = ( + unprocessed => 94, + processed => 10, + working => 123, + failed => 666, +); + +our $object='Data::Consumer'; +our @expect_fail=([3,0,94]); +our %ignored=(3=>1); + +%ignored = %ignored; #silence warnings on 5.6.2 +@expect_fail = @expect_fail; #silence warnings on 5.6.2 +$object = $object; #silence warnings on 5.6.2 +%process_state = %process_state; # silence warnings on 5.6.2 + +my $file='t/01-mysql.t'; +my $res = do $file; +if (!defined $res) { + die "Error executing '$file': ",$@||$!,"\nCwd=". cwd(),"\n"; +} + +