From c41f4482d43b7338b239b211ceee0156db25c2d6 Mon Sep 17 00:00:00 2001 From: Vickenty Fesunov Date: Fri, 19 Aug 2016 13:19:33 +0200 Subject: [PATCH] Don't use get_lock in the default implementation. get_lock() changes behaviour in MySQL 5.7, which causes in turn causes workers to lock more rows than necessary. This patch changes the behavior of Data::Consumer::MySQL2 to claim rows by doing SELECT followed by UPDATE, instead of two SELECTs (second select was necessary to validate result of GET_LOCK() since it wasn't atomic, and was a major change from Data::Consumer::MySQL). SELECT followed by UPDATE allows two claim strategies to be implemented, both relying on implicit database locks: atomic CAS (used by default) and SELECT FOR UPDATE. --- lib/Data/Consumer/MySQL2.pm | 82 +++++++++++-------------------------- 1 file changed, 24 insertions(+), 58 deletions(-) diff --git a/lib/Data/Consumer/MySQL2.pm b/lib/Data/Consumer/MySQL2.pm index 7deaa62..f0fa2a3 100644 --- a/lib/Data/Consumer/MySQL2.pm +++ b/lib/Data/Consumer/MySQL2.pm @@ -41,7 +41,6 @@ $VERSION= '0.16'; table => 'T', id_field= > 'id', flag_field => 'done', - lock_prefix => $worker_name, unprocessed => 0, working => 1, processed => 2, @@ -88,17 +87,6 @@ The column name of the primary key of the table being processed The column name in the table being processed which shows whether an object is processed or not. -=item lock_prefix => 'my-lock-name' - -The prefix to use for the mysql locks. Defaults to C<$0-$table>. - -It is B recommended that end-users of this module explicitly -specify a lock_prefix in production environments. A multi-process -system relying on mutual exclusion B run into problems when -consuming from the same source if $0 and $table are not identical -between workers. Generally, using the name of the consuming module -should suffice (e.g. Your::Data::Consumer::Worker). - =item unprocessed => 0 The value of the C which indicates that an item is not @@ -137,25 +125,18 @@ same time it should ensure that a lock on the id is created. The query will be executed with the id of the last processed item, followed by the arguments provided by the C property. -=item check_sql +=item acquire_sql -=item check_args +=item acquire_args -These arguments are optional, unless you specify C yourself, in which case -it is required you also specify C as well. - -SQL select query which can be executed to verify that the item to be processed still has -the expected flag fields set appropriately. - -There is a very annoying and sublte race condition (possibly only in modern MySQL's) which -means that is possible that the query used for C might return an id for a record -which has already been processed. This query is used to avoid that race condition. +These arguments are optional, and will be synthesized from the other values if +not provided. -The query should validate any flag fields or constraints specified in C are -true, it should return only the id of the record to be processed. +SQL update query that claims item in the table. If number of rows updated by +this query is 0, item will be skipped. -The query will be executed with the id of the item to process, followed by the arguments -provided by the C property. +The query will be executed with the id of the item returned by select_sql, +followed by the arguments provided by the C property. =item update_sql @@ -172,7 +153,7 @@ and the id. =item release_args -These arguments are optional, and will be synthesized from the other values if not provided. +These arguments are optional. SQL select query which can be used to clear the currently held lock. @@ -187,7 +168,7 @@ sub new { my $self= $class->SUPER::new(); # let Data::Consumer bless the hash my @bad; - foreach my $opt (qw(unprocessed processed working failed lock_prefix)) { + foreach my $opt (qw(unprocessed processed working failed)) { if (ref $opts{$opt}) { push @bad, "option '$opt' is not allowed to be a ref in DC::MySQL2"; } elsif (!defined $opts{$opt}) { @@ -214,10 +195,6 @@ sub new { $opts{flag_field} ||= 'process_state'; $opts{init_id}= 0 unless exists $opts{init_id}; - if (!$opts{check_sql} and $opts{select_sql}) { - confess "In $class if you specify 'select_sql' you MUST provide 'check_sql' as well!"; - } - unless ( $opts{select_sql} ) { $opts{select_sql}= do { local $_= ' @@ -227,29 +204,29 @@ sub new { WHERE $id_field > ? AND $flag_field = ? - AND GET_LOCK( CONCAT_WS("=", ?, $id_field ), 0) != 0 LIMIT 1 '; s/^\s+//mg; s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - $opts{select_args}= [ $opts{unprocessed}, $opts{lock_prefix} ]; + $opts{select_args}= [ $opts{unprocessed} ]; + } - $opts{check_sql}= do { + unless ( $opts{acquire_sql} ) { + $opts{acquire_sql}= do { local $_= ' - SELECT - $id_field - FROM $table + UPDATE $table + SET $flag_field = ? WHERE - $id_field = ? - AND $flag_field = ? + $flag_field = ? + AND $id_field = ? '; s/^\s+//mg; s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - $opts{check_args}= [ $opts{unprocessed} ]; + $opts{acquire_args}= [ $opts{working}, $opts{unprocessed} ]; } $opts{update_sql} ||= do { @@ -263,17 +240,7 @@ sub new { s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; $_; }; - if ( !$opts{release_sql} ) { - $opts{release_sql}= do { - local $_= ' - SELECT RELEASE_LOCK( CONCAT_WS("=", ?, ? ) ) - '; - s/^\s+//mg; - s/\$(\w+)/$opts{$1} || confess "Option $1 is mandatory"/ge; - $_; - }; - $opts{release_args}= [ $opts{lock_prefix} ]; - } + %$self= %opts; return $self; @@ -324,12 +291,11 @@ sub acquire { my ($id)= $dbh->selectrow_array( $self->{select_sql}, undef, $self->{last_id}, @{ $self->{select_args} || [] } ); if ( defined $id ) { next if $self->is_ignored($id); - my ($got_id) = $dbh->selectrow_array( $self->{check_sql}, undef, $id, @{ $self->{check_args} || [] } ); - if ( not defined $got_id) { - $self->debug_warn(5, "race condition avoided for '$id', check_sql and select_sql did not line up!"); + my $claimed = $dbh->do( $self->{acquire_sql}, undef, @{ $self->{acquire_args} || [] }, $id ); + if ($claimed != 1) { + $self->debug_warn(5, "failed to claim '$id', acquire_sql updated $claimed rows"); next; } - $self->{last_lock}= $id; $self->debug_warn( 5, "acquired '$id'" ); } else { $self->debug_warn( 5, "acquire failed -- resource has been exhausted" ); @@ -344,7 +310,7 @@ sub acquire { sub release { my $self= shift; - return 0 unless exists $self->{last_lock}; + return 0 unless exists $self->{last_id} && defined $self->{release_sql}; my $res= $self->{dbh}