Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions src/Database/Concerns/HasPending.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,51 @@ public static function pullPending(array $attributes = []): Model&Tenant
*/
public static function pullPendingFromPool(bool $firstOrCreate = false, array $attributes = []): ?Tenant
{
$tenant = DB::transaction(function () use ($attributes): ?Tenant {
/** @var (Model&Tenant)|null $tenant */
$tenant = static::onlyPending()->first();

if ($tenant !== null) {
event(new PullingPendingTenant($tenant));
$tenant->update(array_merge($attributes, [
'pending_since' => null,
]));
// Attempt pulling a pending tenant.
// The loop handles the case where a single tenant is being pulled by multiple processes at the same time.
// If a tenant was pulled by a concurrent process, try pulling the next one in the pool.
while (true) {
/** @var (Model&Tenant)|null $pullCandidate */
$pullCandidate = static::onlyPending()->first();

if ($pullCandidate === null) {
return $firstOrCreate ? static::create($attributes) : null;
}

return $tenant;
});
// Fired before the claim, so it can fire once per attempt, including for a candidate
// that ends up being claimed by a concurrent process (in which case the loop retries).
// PendingTenantPulled (below) fires exactly once, for the pulled tenant.
event(new PullingPendingTenant($pullCandidate));

if ($tenant === null) {
return $firstOrCreate ? static::create($attributes) : null;
}
$tenant = DB::transaction(function () use ($pullCandidate, $attributes): ?Tenant {
$tenantWasPulled = static::onlyPending()
->whereKey($pullCandidate->getKey())
->update([$pullCandidate->getColumnForQuery('pending_since') => null]) > 0;

if (! $tenantWasPulled) {
return null;
}

// The tenant's pending_since was just cleared, and e.g. a PullingPendingTenant listener
// may have made changes to the tenant, so re-fetch it to get it in the correct state.
/** @var Model&Tenant $pulledTenant */
$pulledTenant = static::findOrFail($pullCandidate->getKey());

if (! empty($attributes)) {
$pulledTenant->update($attributes);
}

// Only triggered if a tenant that was pulled from the pool is returned
event(new PendingTenantPulled($tenant));
return $pulledTenant;
});

return $tenant;
if ($tenant === null) {
// If another pull claimed this tenant first, try claiming the next one
continue;
}

event(new PendingTenantPulled($tenant));

return $tenant;
}
}
}
47 changes: 47 additions & 0 deletions tests/PendingTenantsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Illuminate\Database\QueryException;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Schema;
use Illuminate\Support\Str;
Expand Down Expand Up @@ -126,6 +127,52 @@
expect(Tenant::withPending()->get()->count())->toBe(1); // All tenants
});

test('pulling a pending tenant retries when the tenant is claimed concurrently', function () {
Tenant::createPending();
Tenant::createPending();

$stolenId = null;

Event::listen(PullingPendingTenant::class, function (PullingPendingTenant $event) use (&$stolenId) {
if ($stolenId !== null) {
return;
}

$stolenId = $event->tenant->id;

// Steal the tenant like a concurrent process would
Tenant::onlyPending()
->whereKey($event->tenant->id)
->update([$event->tenant->getColumnForQuery('pending_since') => null]);
});

$pulled = Tenant::pullPendingFromPool();

expect($pulled)->not()->toBeNull();
expect($pulled->id)->not()->toBe($stolenId); // Stolen tenant was skipped, the next one was claimed by the pull
expect(Tenant::onlyPending()->count())->toBe(0); // Both tenants claimed
});

test('the pull is rolled back and the tenant stays in the pool if setting attributes fails', function () {
// Pulling a tenant and setting its attributes happen in one transaction,
// so if setting the attributes fails, the whole pull rolls back and the tenant stays in the pool.
Schema::table('tenants', function (Blueprint $table) {
$table->string('slug')->nullable()->unique();
});

Tenant::$extraCustomColumns = ['slug'];

Tenant::create(['slug' => 'taken']);
Tenant::createPending();

// During the pull, set slug to 'taken', which is already used by another tenant to make the attribute update throw
expect(fn () => Tenant::pullPendingFromPool(false, ['slug' => 'taken']))
->toThrow(QueryException::class);

// The pull rolled back, so the tenant is still pending
expect(Tenant::onlyPending()->count())->toBe(1);
});

test('withoutPending chained with where clauses returns correct results', function () {
$tenant = Tenant::create();
$pendingTenant = Tenant::createPending();
Expand Down
Loading