From 93687a99c0c67e6b67f133e5893fce85171b60bc Mon Sep 17 00:00:00 2001 From: Yves Orton Date: Wed, 1 Nov 2017 21:37:22 +0100 Subject: [PATCH 1/5] Data::Consumer::MySQL2 more generic fix for last_id not updated problem --- lib/Data/Consumer/MySQL2.pm | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 8a76cb5..a74fbe7 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -322,11 +322,10 @@ sub acquire { while (1) { $self->debug_warn( 5, "last_id was $self->{last_id}"); my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } ); + $self->{last_id}= $id; if ( defined $id ) { - if ( $self->is_ignored($id) ) { - $self->{last_id}= $id; - next; - } + next if $self->is_ignored($id); + my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } ); if ( not defined $got_id) { $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!"); @@ -337,7 +336,6 @@ sub acquire { } else { $self->debug_warn( 5, "acquire failed -- resource has been exhausted" ); } - $self->{last_id}= $id; last; } return $self->{last_id}; From d4278269160565945738949a9de01a9a704134e9 Mon Sep 17 00:00:00 2001 From: Vickenty Fesunov Date: Fri, 19 Aug 2016 13:19:33 +0200 Subject: [PATCH 2/5] Don't use get_lock in the default implementation. get_lock() changes behaviour in MySQL 5.7, which causes in turn causes workers to lock more rows than necessary. This patch changes the behavior of Data::Consumer::MySQL2 to claim rows by doing SELECT followed by UPDATE, instead of two SELECTs (second select was necessary to validate result of GET_LOCK() since it wasn't atomic, and was a major change from Data::Consumer::MySQL). SELECT followed by UPDATE allows two claim strategies to be implemented, both relying on implicit database locks: atomic CAS (used by default) and SELECT FOR UPDATE. --- lib/Data/Consumer/MySQL2.pm | 84 +++++++++++-------------------------- 1 file changed, 25 insertions(+), 59 deletions(-) diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index a74fbe7..13d933b 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -41,7 +41,6 @@ $VERSION= '0.17'; table => 'T', id_field= > 'id', flag_field => 'done', - lock_prefix => $worker_name, unprocessed => 0, working => 1, processed => 2, @@ -88,17 +87,6 @@ The column name of the primary key of the table being processed The column name in the table being processed which shows whether an object is processed or not. -=item lock_prefix => 'my-lock-name' - -The prefix to use for the mysql locks. Defaults to C<$0-$table>. - -It is B recommended that end-users of this module explicitly -specify a lock_prefix in production environments. A multi-process -system relying on mutual exclusion B run into problems when -consuming from the same source if $0 and $table are not identical -between workers. Generally, using the name of the consuming module -should suffice (e.g. Your::Data::Consumer::Worker). - =item unprocessed => 0 The value of the C which indicates that an item is not @@ -137,25 +125,18 @@ same time it should ensure that a lock on the id is created. The query will be executed with the id of the last processed item, followed by the arguments provided by the C property. -=item check_sql +=item acquire_sql -=item check_args +=item acquire_args -These arguments are optional, unless you specify C yourself, in which case -it is required you also specify C as well. - -SQL select query which can be executed to verify that the item to be processed still has -the expected flag fields set appropriately. - -There is a very annoying and sublte race condition (possibly only in modern MySQL's) which -means that is possible that the query used for C might return an id for a record -which has already been processed. This query is used to avoid that race condition. +These arguments are optional, and will be synthesized from the other values if +not provided. -The query should validate any flag fields or constraints specified in C are -true, it should return only the id of the record to be processed. +SQL update query that claims item in the table. If number of rows updated by +this query is 0, item will be skipped. -The query will be executed with the id of the item to process, followed by the arguments -provided by the C property. +The query will be executed with the id of the item returned by select_sql, +followed by the arguments provided by the C property. =item update_sql @@ -172,7 +153,7 @@ and the id. =item release_args -These arguments are optional, and will be synthesized from the other values if not provided. +These arguments are optional. SQL select query which can be used to clear the currently held lock. @@ -187,7 +168,7 @@ sub new { my $self= $class->SUPER::new(); # let Data::Consumer bless the hash my @bad; - foreach my $opt (qw(unprocessed processed working failed lock_prefix)) { + foreach my $opt (qw(unprocessed processed working failed)) { if (ref $opts{$opt}) { push @bad, "option '$opt' is not allowed to be a ref in DC::MySQL2"; } elsif (!defined $opts{$opt}) { @@ -214,10 +195,6 @@ sub new { $opts{flag_field} ||= 'process_state'; $opts{init_id}= 0 unless exists $opts{init_id}; - if (!$opts{check_sql} and $opts{select_sql}) { - confess "In $class if you specify 'select_sql' you MUST provide 'check_sql' as well!"; - } - unless ( $opts{select_sql} ) { $opts{select_sql}= do { local $_= ' @@ -227,29 +204,29 @@ sub new { WHERE $id_field > ? AND $flag_field = ? - AND GET_LOCK( CONCAT_WS("=", ?, $id_field ), 0) != 0 LIMIT 1 '; s/^\s+//mg; s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - $opts{select_args}= [ $opts{unprocessed}, $opts{lock_prefix} ]; + $opts{select_args}= [ $opts{unprocessed} ]; + } - $opts{check_sql}= do { + unless ( $opts{acquire_sql} ) { + $opts{acquire_sql}= do { local $_= ' - SELECT - $id_field - FROM $table + UPDATE $table + SET $flag_field = ? WHERE - $id_field = ? - AND $flag_field = ? + $flag_field = ? + AND $id_field = ? '; s/^\s+//mg; s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - $opts{check_args}= [ $opts{unprocessed} ]; + $opts{acquire_args}= [ $opts{working}, $opts{unprocessed} ]; } $opts{update_sql} ||= do { @@ -263,17 +240,7 @@ sub new { s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - if ( !$opts{release_sql} ) { - $opts{release_sql}= do { - local $_= ' - SELECT RELEASE_LOCK( CONCAT_WS("=", ?, ? ) ) - '; - s/^\s+//mg; - s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; - $_; - }; - $opts{release_args}= [ $opts{lock_prefix} ]; - } + %$self= %opts; return $self; @@ -325,13 +292,12 @@ sub acquire { $self->{last_id}= $id; if ( defined $id ) { next if $self->is_ignored($id); - - my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } ); - if ( not defined $got_id) { - $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!"); + + my $claimed = $dbh->do( $self->{acquire_sql}, undef, @{ $self->{acquire_args} || [] }, $id ); + if ($claimed != 1) { + $self->debug_warn(5, "failed to claim '$id', acquire_sql updated $claimed rows"); next; } - $self->{last_lock}= $id; $self->debug_warn( 5, "acquired '$id'" ); } else { $self->debug_warn( 5, "acquire failed -- resource has been exhausted" ); @@ -345,7 +311,7 @@ sub acquire { sub release { my $self= shift; - return 0 unless exists $self->{last_lock}; + return 0 unless exists $self->{last_id} && defined $self->{release_sql}; my $res= $self->{dbh} From 2f24409563fc70871ae40bf3609b4688dd04b5ab Mon Sep 17 00:00:00 2001 From: Yves Orton Date: Wed, 1 Nov 2017 21:34:15 +0100 Subject: [PATCH 3/5] check for old arguments --- lib/Data/Consumer/MySQL2.pm | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 13d933b..bd9a164 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -3,7 +3,7 @@ package Data::Consumer::MySQL2; use warnings; use strict; use DBI; -use Carp qw(confess); +use Carp qw(confess cluck); use warnings FATAL => 'all'; use base 'Data::Consumer'; use vars qw/$Debug $VERSION $Cmd $Fail/; @@ -157,7 +157,7 @@ These arguments are optional. SQL select query which can be used to clear the currently held lock. -Will be called with the arguments provided in release_args, plust the id. +Will be called with the arguments provided in release_args, plus the id. =back @@ -195,6 +195,9 @@ sub new { $opts{flag_field} ||= 'process_state'; $opts{init_id}= 0 unless exists $opts{init_id}; + if ($opts{lock_prefix}) { cluck "Ignoring 'lock_prefix' this, version does not use GET_LOCK()" } + if ($opts{check_sql} || $opts{check_args}) { confess "This version does not support 'check_sql', 'check_args'" } + unless ( $opts{select_sql} ) { $opts{select_sql}= do { local $_= ' From f116befcd310dcf2aeccd983ceb5b254174f6c9c Mon Sep 17 00:00:00 2001 From: Yves Orton Date: Wed, 1 Nov 2017 22:08:42 +0100 Subject: [PATCH 4/5] Rework release now that we do not use GET_LOCK() --- Changes | 5 ++++ lib/Data/Consumer.pm | 6 ++-- lib/Data/Consumer/MySQL2.pm | 57 ++++++++++++++++++++++--------------- 3 files changed, 43 insertions(+), 25 deletions(-) diff --git a/Changes b/Changes index e99b524..1b69cb0 100644 --- a/Changes +++ b/Changes @@ -1,5 +1,10 @@ Revision history for Data-Consumer +0.18 ???? + Remove GET_LOCK from MySQL2 - as of 5.7 the GET_LOCK() trick + doesn't work, as a lock request does not free up a previously + held lock. + 0.17 Wed Nov 1 2017 Fix a bug with ignored in MySQL2, thanks to Maksym Davydov diff --git a/lib/Data/Consumer.pm b/lib/Data/Consumer.pm index 840885f..5986901 100644 --- a/lib/Data/Consumer.pm +++ b/lib/Data/Consumer.pm @@ -136,7 +136,8 @@ ignored. =item release -This routine is to release any held locks in the object. +This routine is to release any held locks in the object. It is called at the very end of +processing, and by reset. =item _mark_as @@ -721,7 +722,8 @@ Returns an identifier to be used to identify the item acquired. Release any locks on the currently held item. -Normally there is no need to call this directly. +Normally there is no need to call this directly. It is called by reset() and +at the end of processing. =cut diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index bd9a164..558d6c3 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -26,11 +26,11 @@ Data::Consumer::MySQL2 - Data::Consumer implementation for a mysql database tabl =head1 VERSION -Version 0.17 +Version 0.18 =cut -$VERSION= '0.17'; +$VERSION= '0.18'; =head1 SYNOPSIS @@ -138,6 +138,20 @@ this query is 0, item will be skipped. The query will be executed with the id of the item returned by select_sql, followed by the arguments provided by the C property. +=item release_acquire_sql + +=item release_acquire_args + +These arguments are optional, and will be synthesized from acquire_sql/acquire_args +if not provided. + +SQL update query that releases a claimed item in the table. If number of rows updated by +this query is 0, item will be skipped. + +The query will be executed with the id of the item returned by select_sql, +followed by the arguments provided by the C property. If the latter +is not provided then this will be the *list* reverse of the acquire_args. + =item update_sql =item update_args @@ -149,16 +163,6 @@ SQL update query which can be used to change the status the record being process Will be executed with the arguments provided in update_args followed the new status, and the id. -=item release_sql - -=item release_args - -These arguments are optional. - -SQL select query which can be used to clear the currently held lock. - -Will be called with the arguments provided in release_args, plus the id. - =back =cut @@ -195,8 +199,9 @@ sub new { $opts{flag_field} ||= 'process_state'; $opts{init_id}= 0 unless exists $opts{init_id}; - if ($opts{lock_prefix}) { cluck "Ignoring 'lock_prefix' this, version does not use GET_LOCK()" } + if ($opts{lock_prefix}) { cluck "Ignoring 'lock_prefix' this version does not use GET_LOCK()" } if ($opts{check_sql} || $opts{check_args}) { confess "This version does not support 'check_sql', 'check_args'" } + if ($opts{release_sql} || $opts{release_args}) { confess "This version does not support 'release_sql', 'release_args'" } unless ( $opts{select_sql} ) { $opts{select_sql}= do { @@ -231,6 +236,10 @@ sub new { }; $opts{acquire_args}= [ $opts{working}, $opts{unprocessed} ]; } + unless ( exists $opts{release_acquire_sql} ) { + $opts{release_acquire_sql}= $opts{acquire_sql}; + $opts{release_acquire_args}= [ reverse @{$opts{acquire_args}} ]; + } $opts{update_sql} ||= do { local $_= ' @@ -261,7 +270,7 @@ Returns an identifier to be used to identify the item acquired. =head2 $object->release() -Release any locks on the currently held item. +Release any locks on the currently held item. This is called by reset, and at the end of processing. Normally there is no need to call this directly. @@ -314,17 +323,19 @@ sub acquire { sub release { my $self= shift; - return 0 unless exists $self->{last_id} && defined $self->{release_sql}; + my $id= delete $self->{last_id}; + + return 0 unless $self->{release_acquire_sql}; - my $res= - $self->{dbh} - ->do( $self->{release_sql}, undef, @{ $self->{release_args} || [] }, $self->{last_lock} ); - defined $res - or $self->error( "Failed to execute '$self->{release_sql}' with args '$self->{last_lock}': " - . $self->{dbh}->errstr() ); + my $dbh= $self->{dbh}; + + my $released = $dbh->do( $self->{release_acquire_sql}, undef, @{ $self->{release_acquire_args} || [] }, $id ); + if ($released != 1) { + $self->debug_warn(5, "failed to release '$id', release_acquire_sql updated $released rows"); + } else { + $self->debug_warn( 5, "released '$id'" ); + } - $self->debug_warn( 5, "release lock '$self->{last_lock}' status: $res" ); # XXX - delete $self->{last_lock}; return 1; } From 345a8be0b7f5f33cf680f35e4be25c7eea6af934 Mon Sep 17 00:00:00 2001 From: Vickenty Fesunov Date: Mon, 6 Nov 2017 18:07:09 +0100 Subject: [PATCH 5/5] Don't run release if nothing is claimed Avoids stepping into a perl bug where fatal warning in a destructor causes infinite recursion somewhere in perl guts (seems to be fixed in modern versions). --- lib/Data/Consumer/MySQL2.pm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 558d6c3..cb5e0cb 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -325,7 +325,7 @@ sub release { my $id= delete $self->{last_id}; - return 0 unless $self->{release_acquire_sql}; + return 0 unless defined $id && $self->{release_acquire_sql}; my $dbh= $self->{dbh};