Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions driver/nodeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
// clean up after ourselves if provisioning fails.
// this is required because if publishing never succeeds, unpublish is not
// called which leaves files around (and we may continue to renew if so).
//
// managed tracks whether THIS call started managing the volume. We must
// only tear down state that we ourselves created; blindly calling
// UnmanageVolume/RemoveVolume when a concurrent request is already managing
// the volume would destroy that goroutine's in-progress issuance.
managed := false
success := false
defer func() {
if !success {
if !success && managed {
ns.manager.UnmanageVolume(req.GetVolumeId())
_ = ns.mounter.Unmount(req.GetTargetPath())
_ = ns.store.RemoveVolume(req.GetVolumeId())
Expand Down Expand Up @@ -98,17 +104,28 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
isReadyToRequest, reason := ns.manager.IsVolumeReadyToRequest(req.GetVolumeId())
if isReadyToRequest {
log.V(4).Info("Waiting for certificate to be issued...")
if _, err := ns.manager.ManageVolumeImmediate(ctx, req.GetVolumeId()); err != nil {
var err error
managed, err = ns.manager.ManageVolumeImmediate(ctx, req.GetVolumeId())
if err != nil {
return nil, err
}
if !managed {
// A concurrent NodePublishVolume is already managing this volume.
// Return Aborted so the kubelet retries; proceeding to mount would
// give the pod an empty volume with no certificate.
return nil, status.Error(codes.Aborted, "volume is already being provisioned by another request, retry later")
}
log.Info("Volume registered for management")
} else {
if ns.continueOnNotReady {
log.V(4).Info("Skipping waiting for certificate to be issued")
ns.manager.ManageVolume(req.GetVolumeId())
log.V(4).Info("Volume registered for management")
if ns.manager.ManageVolume(req.GetVolumeId()) {
managed = true
log.V(4).Info("Volume registered for management")
}
} else {
log.Info("Unable to request a certificate right now, will be retried", "reason", reason)
_ = ns.store.RemoveVolume(req.GetVolumeId())
return nil, fmt.Errorf("volume is not yet ready to be setup, will be retried: %s", reason)
}
}
Expand Down
113 changes: 113 additions & 0 deletions test/integration/ready_to_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"os"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -272,3 +273,115 @@ func TestFailsIfNotReadyToRequest_ContinueOnNotReadyDisabled(t *testing.T) {
t.Errorf("failed to wait for storage backend to return NotFound: %v", err)
}
}

func TestConcurrentPublishForSameVolumeReturnsAbortedWithoutCrossCleanup(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

store := storage.NewMemoryFS()

blockFirstIssue := make(chan struct{})
firstIssueStarted := make(chan struct{})
var firstIssueStartOnce sync.Once

_, cl, stop := testdriver.Run(t, testdriver.Options{
Store: store,
ReadyToRequest: manager.AlwaysReadyToRequest,
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
firstIssueStartOnce.Do(func() {
close(firstIssueStarted)
})
<-blockFirstIssue
return nil, fmt.Errorf("forced issuance failure")
},
})
defer stop()

request1 := &csi.NodePublishVolumeRequest{
VolumeId: "test-vol",
VolumeContext: map[string]string{
"csi.storage.k8s.io/ephemeral": "true",
"csi.storage.k8s.io/pod.name": "the-pod-name",
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
},
TargetPath: t.TempDir(),
Readonly: true,
}

request2 := &csi.NodePublishVolumeRequest{
VolumeId: "test-vol",
VolumeContext: map[string]string{
"csi.storage.k8s.io/ephemeral": "true",
"csi.storage.k8s.io/pod.name": "the-pod-name",
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
},
TargetPath: t.TempDir(),
Readonly: true,
}

firstErr := make(chan error, 1)
go func() {
_, err := cl.NodePublishVolume(ctx, request1)
firstErr <- err
}()

select {
case <-firstIssueStarted:
case <-ctx.Done():
t.Fatalf("timed out waiting for first publish call to enter issuance")
}

_, err := cl.NodePublishVolume(ctx, request2)
if status.Code(err) != codes.Aborted {
t.Fatalf("unexpected error code from second concurrent publish call: %v", err)
}

if _, err := store.ReadMetadata("test-vol"); err != nil {
t.Fatalf("expected metadata to remain while first call is still in progress, got: %v", err)
}

close(blockFirstIssue)
if err := <-firstErr; err == nil {
t.Fatalf("expected first publish call to fail once unblocked")
}
}

func TestImmediateManagementErrorStillCleansUpOwnedState(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

store := storage.NewMemoryFS()

_, cl, stop := testdriver.Run(t, testdriver.Options{
Store: store,
ReadyToRequest: manager.AlwaysReadyToRequest,
GeneratePrivateKey: func(meta metadata.Metadata) (crypto.PrivateKey, error) {
return nil, fmt.Errorf("forced issuance failure")
},
})
defer stop()

_, err := cl.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeId: "test-vol",
VolumeContext: map[string]string{
"csi.storage.k8s.io/ephemeral": "true",
"csi.storage.k8s.io/pod.name": "the-pod-name",
"csi.storage.k8s.io/pod.namespace": "the-pod-namespace",
},
TargetPath: t.TempDir(),
Readonly: true,
})
if err == nil {
t.Fatalf("expected publish call to fail")
}

if err := wait.PollUntilContextCancel(ctx, time.Millisecond*50, true, func(ctx context.Context) (bool, error) {
_, err := store.ReadFiles("test-vol")
if errors.Is(err, storage.ErrNotFound) {
return true, nil
}
return false, nil
}); err != nil {
t.Fatalf("expected volume data to be cleaned up after failure: %v", err)
}
}