diff --git a/driver/nodeserver.go b/driver/nodeserver.go index baed4ba..8c7affb 100644 --- a/driver/nodeserver.go +++ b/driver/nodeserver.go @@ -56,9 +56,15 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis // clean up after ourselves if provisioning fails. // this is required because if publishing never succeeds, unpublish is not // called which leaves files around (and we may continue to renew if so). + // + // managed tracks whether THIS call started managing the volume. We must + // only tear down state that we ourselves created; blindly calling + // UnmanageVolume/RemoveVolume when a concurrent request is already managing + // the volume would destroy that goroutine's in-progress issuance. + managed := false success := false defer func() { - if !success { + if !success && managed { ns.manager.UnmanageVolume(req.GetVolumeId()) _ = ns.mounter.Unmount(req.GetTargetPath()) _ = ns.store.RemoveVolume(req.GetVolumeId()) @@ -98,17 +104,28 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId()) if isReadyToRequest { log.V(4).Info("Waiting for certificate to be issued...") - if _, err := ns.manager.ManageVolumeImmediate(ctx, req.GetVolumeId()); err != nil { + var err error + managed, err = ns.manager.ManageVolumeImmediate(ctx, req.GetVolumeId()) + if err != nil { return nil, err } + if !managed { + // A concurrent NodePublishVolume is already managing this volume. + // Return Aborted so the kubelet retries; proceeding to mount would + // give the pod an empty volume with no certificate. + return nil, status.Error(codes.Aborted, "volume is already being provisioned by another request, retry later") + } log.Info("Volume registered for management") } else { if ns.continueOnNotReady { log.V(4).Info("Skipping waiting for certificate to be issued") - ns.manager.ManageVolume(req.GetVolumeId()) - log.V(4).Info("Volume registered for management") + if ns.manager.ManageVolume(req.GetVolumeId()) { + managed = true + log.V(4).Info("Volume registered for management") + } } else { log.Info("Unable to request a certificate right now, will be retried", "reason", reason) + _ = ns.store.RemoveVolume(req.GetVolumeId()) return nil, fmt.Errorf("volume is not yet ready to be setup, will be retried: %s", reason) } } diff --git a/test/integration/ready_to_request_test.go b/test/integration/ready_to_request_test.go index 57b1d7b..47ed433 100644 --- a/test/integration/ready_to_request_test.go +++ b/test/integration/ready_to_request_test.go @@ -24,6 +24,7 @@ import ( "fmt" "os" "reflect" + "sync" "testing" "time" @@ -272,3 +273,115 @@ func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) { t.Errorf("failed to wait for storage backend to return NotFound: %v", err) } } + +func TestConcurrentPublishForSameVolumeReturnsAbortedWithoutCrossCleanup(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + store := storage.NewMemoryFS() + + blockFirstIssue := make(chan struct{}) + firstIssueStarted := make(chan struct{}) + var firstIssueStartOnce sync.Once + + _, cl, stop := testdriver.Run(t, testdriver.Options{ + Store: store, + ReadyToRequest: manager.AlwaysReadyToRequest, + GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) { + firstIssueStartOnce.Do(func() { + close(firstIssueStarted) + }) + <-blockFirstIssue + return nil, fmt.Errorf("forced issuance failure") + }, + }) + defer stop() + + request1 := &csi.NodePublishVolumeRequest{ + VolumeId: "test-vol", + VolumeContext: map[string]string{ + "csi.storage.k8s.io/ephemeral": "true", + "csi.storage.k8s.io/pod.name": "the-pod-name", + "csi.storage.k8s.io/pod.namespace": "the-pod-namespace", + }, + TargetPath: t.TempDir(), + Readonly: true, + } + + request2 := &csi.NodePublishVolumeRequest{ + VolumeId: "test-vol", + VolumeContext: map[string]string{ + "csi.storage.k8s.io/ephemeral": "true", + "csi.storage.k8s.io/pod.name": "the-pod-name", + "csi.storage.k8s.io/pod.namespace": "the-pod-namespace", + }, + TargetPath: t.TempDir(), + Readonly: true, + } + + firstErr := make(chan error, 1) + go func() { + _, err := cl.NodePublishVolume(ctx, request1) + firstErr <- err + }() + + select { + case <-firstIssueStarted: + case <-ctx.Done(): + t.Fatalf("timed out waiting for first publish call to enter issuance") + } + + _, err := cl.NodePublishVolume(ctx, request2) + if status.Code(err) != codes.Aborted { + t.Fatalf("unexpected error code from second concurrent publish call: %v", err) + } + + if _, err := store.ReadMetadata("test-vol"); err != nil { + t.Fatalf("expected metadata to remain while first call is still in progress, got: %v", err) + } + + close(blockFirstIssue) + if err := <-firstErr; err == nil { + t.Fatalf("expected first publish call to fail once unblocked") + } +} + +func TestImmediateManagementErrorStillCleansUpOwnedState(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + store := storage.NewMemoryFS() + + _, cl, stop := testdriver.Run(t, testdriver.Options{ + Store: store, + ReadyToRequest: manager.AlwaysReadyToRequest, + GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) { + return nil, fmt.Errorf("forced issuance failure") + }, + }) + defer stop() + + _, err := cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ + VolumeId: "test-vol", + VolumeContext: map[string]string{ + "csi.storage.k8s.io/ephemeral": "true", + "csi.storage.k8s.io/pod.name": "the-pod-name", + "csi.storage.k8s.io/pod.namespace": "the-pod-namespace", + }, + TargetPath: t.TempDir(), + Readonly: true, + }) + if err == nil { + t.Fatalf("expected publish call to fail") + } + + if err := wait.PollUntilContextCancel(ctx, time.Millisecond*50, true, func(ctx context.Context) (bool, error) { + _, err := store.ReadFiles("test-vol") + if errors.Is(err, storage.ErrNotFound) { + return true, nil + } + return false, nil + }); err != nil { + t.Fatalf("expected volume data to be cleaned up after failure: %v", err) + } +}