diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 7deaa62..acc3d2d 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -323,7 +323,10 @@ sub acquire { $self->debug_warn( 5, "last_id was $self->{last_id}"); my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } ); if ( defined $id ) { - next if $self->is_ignored($id); + if ( $self->is_ignored($id) ) { + $self->{last_id}= $id; + next; + } my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } ); if ( not defined $got_id) { $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!"); diff --git a/t/01-mysql.t b/t/01-mysql.t index 6549609..11ece0d 100644 --- a/t/01-mysql.t +++ b/t/01-mysql.t @@ -7,6 +7,7 @@ use DBI; my $debug = @ARGV ? shift : $ENV{TEST_DEBUG}; our @fake_error; our @expect_fail; +our %ignored; our %process_state; our @connect_args; our $table; @@ -122,6 +123,11 @@ $consumer->consume(sub { my ($consumer,$id,$dbh) = @_; $debug and $consumer->debug_warn(0,"*** processing '$id'"); $debug and $consumer->debug_warn(0,$id,Dumper($dbh->selectrow_arrayref("select IS_USED_LOCK(CONCAT_WS('=','$0-$table',$id))"))); + if($ignored{$id}) { + $debug and $consumer->debug_warn(0,"* ignoring '$id' as requested\n"); + $consumer->ignore(); + return; + } sleep(1); $dbh->do("UPDATE `$table` SET `n` = `n` + 1 WHERE `id` = ?", undef, $id); diff --git a/t/07-mysql-ignore.t b/t/07-mysql-ignore.t new file mode 100644 index 0000000..173f6e6 --- /dev/null +++ b/t/07-mysql-ignore.t @@ -0,0 +1,26 @@ +use strict; +use warnings; +use Cwd; +our %process_state = ( + unprocessed => 94, + processed => 10, + working => 123, + failed => 666, +); + +our $object='Data::Consumer'; +our @expect_fail=([3,0,94]); +our %ignored=(3=>1); + +%ignored = %ignored; #silence warnings on 5.6.2 +@expect_fail = @expect_fail; #silence warnings on 5.6.2 +$object = $object; #silence warnings on 5.6.2 +%process_state = %process_state; # silence warnings on 5.6.2 + +my $file='t/01-mysql.t'; +my $res = do $file; +if (!defined $res) { + die "Error executing '$file': ",$@||$!,"\nCwd=". cwd(),"\n"; +} + +