diff --git a/Changes b/Changes index e99b524..1b69cb0 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,10 @@ Revision history for Data-Consumer +0.18 ???? + Remove GET_LOCK from MySQL2 - as of 5.7 the GET_LOCK() trick + doesn't work, as a lock request does not free up a previously + held lock. + 0.17 Wed Nov 1 2017 Fix a bug with ignored in MySQL2, thanks to Maksym Davydov diff --git a/lib/Data/Consumer.pm b/lib/Data/Consumer.pm index 840885f..5986901 100644 --- a/lib/Data/Consumer.pm +++ b/lib/Data/Consumer.pm @@ -136,7 +136,8 @@ ignored. =item release -This routine is to release any held locks in the object. +This routine is to release any held locks in the object. It is called at the very end of +processing, and by reset. =item _mark_as @@ -721,7 +722,8 @@ Returns an identifier to be used to identify the item acquired. Release any locks on the currently held item. -Normally there is no need to call this directly. +Normally there is no need to call this directly. It is called by reset() and +at the end of processing. =cut diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 8a76cb5..cb5e0cb 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -3,7 +3,7 @@ package Data::Consumer::MySQL2; use warnings; use strict; use DBI; -use Carp qw(confess); +use Carp qw(confess cluck); use warnings FATAL => 'all'; use base 'Data::Consumer'; use vars qw/$Debug $VERSION $Cmd $Fail/; @@ -26,11 +26,11 @@ Data::Consumer::MySQL2 - Data::Consumer implementation for a mysql database tabl =head1 VERSION -Version 0.17 +Version 0.18 =cut -$VERSION= '0.17'; +$VERSION= '0.18'; =head1 SYNOPSIS @@ -41,7 +41,6 @@ $VERSION= '0.17'; table => 'T', id_field= > 'id', flag_field => 'done', - lock_prefix => $worker_name, unprocessed => 0, working => 1, processed => 2, @@ -88,17 +87,6 @@ The column name of the primary key of the table being processed The column name in the table being processed which shows whether an object is processed or not. -=item lock_prefix => 'my-lock-name' - -The prefix to use for the mysql locks. Defaults to C<$0-$table>. - -It is B recommended that end-users of this module explicitly -specify a lock_prefix in production environments. A multi-process -system relying on mutual exclusion B run into problems when -consuming from the same source if $0 and $table are not identical -between workers. Generally, using the name of the consuming module -should suffice (e.g. Your::Data::Consumer::Worker). - =item unprocessed => 0 The value of the C which indicates that an item is not @@ -137,25 +125,32 @@ same time it should ensure that a lock on the id is created. The query will be executed with the id of the last processed item, followed by the arguments provided by the C property. -=item check_sql +=item acquire_sql + +=item acquire_args + +These arguments are optional, and will be synthesized from the other values if +not provided. + +SQL update query that claims item in the table. If number of rows updated by +this query is 0, item will be skipped. -=item check_args +The query will be executed with the id of the item returned by select_sql, +followed by the arguments provided by the C property. -These arguments are optional, unless you specify C yourself, in which case -it is required you also specify C as well. +=item release_acquire_sql -SQL select query which can be executed to verify that the item to be processed still has -the expected flag fields set appropriately. +=item release_acquire_args -There is a very annoying and sublte race condition (possibly only in modern MySQL's) which -means that is possible that the query used for C might return an id for a record -which has already been processed. This query is used to avoid that race condition. +These arguments are optional, and will be synthesized from acquire_sql/acquire_args +if not provided. -The query should validate any flag fields or constraints specified in C are -true, it should return only the id of the record to be processed. +SQL update query that releases a claimed item in the table. If number of rows updated by +this query is 0, item will be skipped. -The query will be executed with the id of the item to process, followed by the arguments -provided by the C property. +The query will be executed with the id of the item returned by select_sql, +followed by the arguments provided by the C property. If the latter +is not provided then this will be the *list* reverse of the acquire_args. =item update_sql @@ -168,16 +163,6 @@ SQL update query which can be used to change the status the record being process Will be executed with the arguments provided in update_args followed the new status, and the id. -=item release_sql - -=item release_args - -These arguments are optional, and will be synthesized from the other values if not provided. - -SQL select query which can be used to clear the currently held lock. - -Will be called with the arguments provided in release_args, plust the id. - =back =cut @@ -187,7 +172,7 @@ sub new { my $self= $class->SUPER::new(); # let Data::Consumer bless the hash my @bad; - foreach my $opt (qw(unprocessed processed working failed lock_prefix)) { + foreach my $opt (qw(unprocessed processed working failed)) { if (ref $opts{$opt}) { push @bad, "option '$opt' is not allowed to be a ref in DC::MySQL2"; } elsif (!defined $opts{$opt}) { @@ -214,9 +199,9 @@ sub new { $opts{flag_field} ||= 'process_state'; $opts{init_id}= 0 unless exists $opts{init_id}; - if (!$opts{check_sql} and $opts{select_sql}) { - confess "In $class if you specify 'select_sql' you MUST provide 'check_sql' as well!"; - } + if ($opts{lock_prefix}) { cluck "Ignoring 'lock_prefix' this version does not use GET_LOCK()" } + if ($opts{check_sql} || $opts{check_args}) { confess "This version does not support 'check_sql', 'check_args'" } + if ($opts{release_sql} || $opts{release_args}) { confess "This version does not support 'release_sql', 'release_args'" } unless ( $opts{select_sql} ) { $opts{select_sql}= do { @@ -227,29 +212,33 @@ sub new { WHERE $id_field > ? AND $flag_field = ? - AND GET_LOCK( CONCAT_WS("=", ?, $id_field ), 0) != 0 LIMIT 1 '; s/^\s+//mg; s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - $opts{select_args}= [ $opts{unprocessed}, $opts{lock_prefix} ]; + $opts{select_args}= [ $opts{unprocessed} ]; + } - $opts{check_sql}= do { + unless ( $opts{acquire_sql} ) { + $opts{acquire_sql}= do { local $_= ' - SELECT - $id_field - FROM $table + UPDATE $table + SET $flag_field = ? WHERE - $id_field = ? - AND $flag_field = ? + $flag_field = ? + AND $id_field = ? '; s/^\s+//mg; s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - $opts{check_args}= [ $opts{unprocessed} ]; + $opts{acquire_args}= [ $opts{working}, $opts{unprocessed} ]; + } + unless ( exists $opts{release_acquire_sql} ) { + $opts{release_acquire_sql}= $opts{acquire_sql}; + $opts{release_acquire_args}= [ reverse @{$opts{acquire_args}} ]; } $opts{update_sql} ||= do { @@ -263,17 +252,7 @@ sub new { s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - if ( !$opts{release_sql} ) { - $opts{release_sql}= do { - local $_= ' - SELECT RELEASE_LOCK( CONCAT_WS("=", ?, ? ) ) - '; - s/^\s+//mg; - s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; - $_; - }; - $opts{release_args}= [ $opts{lock_prefix} ]; - } + %$self= %opts; return $self; @@ -291,7 +270,7 @@ Returns an identifier to be used to identify the item acquired. =head2 $object->release() -Release any locks on the currently held item. +Release any locks on the currently held item. This is called by reset, and at the end of processing. Normally there is no need to call this directly. @@ -322,22 +301,19 @@ sub acquire { while (1) { $self->debug_warn( 5, "last_id was $self->{last_id}"); my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } ); + $self->{last_id}= $id; if ( defined $id ) { - if ( $self->is_ignored($id) ) { - $self->{last_id}= $id; - next; - } - my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } ); - if ( not defined $got_id) { - $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!"); + next if $self->is_ignored($id); + + my $claimed = $dbh->do( $self->{acquire_sql}, undef, @{ $self->{acquire_args} || [] }, $id ); + if ($claimed != 1) { + $self->debug_warn(5, "failed to claim '$id', acquire_sql updated $claimed rows"); next; } - $self->{last_lock}= $id; $self->debug_warn( 5, "acquired '$id'" ); } else { $self->debug_warn( 5, "acquire failed -- resource has been exhausted" ); } - $self->{last_id}= $id; last; } return $self->{last_id}; @@ -347,17 +323,19 @@ sub acquire { sub release { my $self= shift; - return 0 unless exists $self->{last_lock}; + my $id= delete $self->{last_id}; + + return 0 unless defined $id && $self->{release_acquire_sql}; - my $res= - $self->{dbh} - ->do( $self->{release_sql}, undef, @{ $self->{release_args} || [] }, $self->{last_lock} ); - defined $res - or $self->error( "Failed to execute '$self->{release_sql}' with args '$self->{last_lock}': " - . $self->{dbh}->errstr() ); + my $dbh= $self->{dbh}; + + my $released = $dbh->do( $self->{release_acquire_sql}, undef, @{ $self->{release_acquire_args} || [] }, $id ); + if ($released != 1) { + $self->debug_warn(5, "failed to release '$id', release_acquire_sql updated $released rows"); + } else { + $self->debug_warn( 5, "released '$id'" ); + } - $self->debug_warn( 5, "release lock '$self->{last_lock}' status: $res" ); # XXX - delete $self->{last_lock}; return 1; }