Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions pkg/rest/cache_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,46 @@ func getRDWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv

return apiv1.ResourceDefinition{}, errors.Wrapf(err, "get resource definition %q after %d retries", name, cacheRetryAttempts)
}

// getSnapshotWithCacheRetry returns the Snapshot `snapName` on RD
// `rdName`, retrying on store.ErrNotFound to absorb informer-cache
// lag after a fresh write that landed on a sibling apiserver replica.
// Same semantics as getRGWithCacheRetry.
//
// linstor-csi's CreateVolume-from-snapshot sequence is `POST
// /v1/resource-definitions/{rd}/snapshots`, then an immediate `GET
// /v1/resource-definitions/{rd}/snapshots/{snap}` (size guard), then
// `POST .../snapshot-restore-resource/{snap}`. The follow-up reads
// race the informer cache exactly like the RG/RD reads above — the
// create wrote straight to the apiserver, the read is served from a
// cache that may not have observed the write yet — and a spurious
// 404 fails the whole CreateVolume.
func getSnapshotWithCacheRetry(ctx context.Context, st store.Store, rdName, snapName string) (apiv1.Snapshot, error) {
var (
snap apiv1.Snapshot
err error
)

for attempt := range cacheRetryAttempts {
snap, err = st.Snapshots().Get(ctx, rdName, snapName)
if err == nil {
return snap, nil
}

if !errors.Is(err, store.ErrNotFound) {
return apiv1.Snapshot{}, errors.Wrapf(err, "get snapshot %s/%s", rdName, snapName)
}

if attempt == cacheRetryAttempts-1 {
break
}

select {
case <-ctx.Done():
return apiv1.Snapshot{}, errors.Wrap(ctx.Err(), "get snapshot: context cancelled")
case <-time.After(cacheRetryDelay):
}
Comment on lines +162 to +166

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.After in a loop or a frequently called function can lead to short-term timer leaks because the underlying timer is not garbage collected until it expires, even if the context is cancelled and the select block exits early. Using time.NewTimer and stopping it on context cancellation avoids this resource leak.

		timer := time.NewTimer(cacheRetryDelay)
		select {
		case <-ctx.Done():
			timer.Stop()
			return apiv1.Snapshot{}, errors.Wrap(ctx.Err(), "get snapshot: context cancelled")
		case <-timer.C:
		}

}

return apiv1.Snapshot{}, errors.Wrapf(err, "get snapshot %s/%s after %d retries", rdName, snapName, cacheRetryAttempts)
}
241 changes: 237 additions & 4 deletions pkg/rest/cache_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -78,13 +79,64 @@ func (f *flakyRDStore) Get(ctx context.Context, name string) (apiv1.ResourceDefi
return f.ResourceDefinitionStore.Get(ctx, name) //nolint:wrapcheck // pass-through
}

// flakyStore lets us substitute the RG / RD views with flaky ones
// while everything else keeps using the wrapped InMemory.
// lagSnapshotStore wraps an underlying SnapshotStore and models the
// production read-after-write informer-cache lag: a successful
// Create() arms `missBudget` — the next `missBudget` Get() calls on
// the just-created snapshot return store.ErrNotFound (the cache has
// not observed the apiserver write yet), then reads delegate to the
// real store (the watch event arrived). This is the exact shape
// linstor-csi's CreateVolume-from-snapshot trips over: POST snapshot
// create, then an immediate GET on a replica whose cache trails.
type lagSnapshotStore struct {
store.SnapshotStore

missBudget int

mu sync.Mutex
pending map[string]int
}

func (f *lagSnapshotStore) Create(ctx context.Context, snap *apiv1.Snapshot) error {
err := f.SnapshotStore.Create(ctx, snap)
if err != nil {
return err //nolint:wrapcheck // pass-through
}

f.mu.Lock()
defer f.mu.Unlock()

if f.pending == nil {
f.pending = map[string]int{}
}

f.pending[snap.ResourceName+"/"+snap.Name] = f.missBudget

return nil
}

func (f *lagSnapshotStore) Get(ctx context.Context, rdName, snapName string) (apiv1.Snapshot, error) {
f.mu.Lock()

key := rdName + "/" + snapName
if left := f.pending[key]; left > 0 {
f.pending[key] = left - 1
f.mu.Unlock()

return apiv1.Snapshot{}, errors.Wrapf(store.ErrNotFound, "snapshot %q on RD %q", snapName, rdName)
}
f.mu.Unlock()

return f.SnapshotStore.Get(ctx, rdName, snapName) //nolint:wrapcheck // pass-through
}

// flakyStore lets us substitute the RG / RD / Snapshot views with
// flaky ones while everything else keeps using the wrapped InMemory.
type flakyStore struct {
store.Store

rgs *flakyRGStore
rds *flakyRDStore
rgs *flakyRGStore
rds *flakyRDStore
snaps *lagSnapshotStore
}

func (f *flakyStore) ResourceGroups() store.ResourceGroupStore {
Expand All @@ -103,6 +155,14 @@ func (f *flakyStore) ResourceDefinitions() store.ResourceDefinitionStore {
return f.rds
}

func (f *flakyStore) Snapshots() store.SnapshotStore {
if f.snaps == nil {
return f.Store.Snapshots()
}

return f.snaps
}

func TestGetRGWithCacheRetry_SucceedsAfterCacheMiss(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -252,3 +312,176 @@ func TestSpawn_SurvivesCacheMissOnRGGet(t *testing.T) {
t.Fatalf("expected at least 2 RG Get attempts, got %d", flaky.rgs.calls.Load())
}
}

func TestGetSnapshotWithCacheRetry_SucceedsAfterCacheMiss(t *testing.T) {
t.Parallel()

st := store.NewInMemory()

if err := st.ResourceDefinitions().Create(t.Context(), &apiv1.ResourceDefinition{Name: "pvc-1"}); err != nil {
t.Fatalf("seed RD: %v", err)
}

flaky := &flakyStore{
Store: st,
snaps: &lagSnapshotStore{
SnapshotStore: st.Snapshots(),
missBudget: 2, // two cache misses after the create, then real
},
}

err := flaky.Snapshots().Create(t.Context(),
&apiv1.Snapshot{Name: "snap-1", ResourceName: "pvc-1"})
if err != nil {
t.Fatalf("seed snapshot: %v", err)
}

snap, err := getSnapshotWithCacheRetry(t.Context(), flaky, "pvc-1", "snap-1")
if err != nil {
t.Fatalf("getSnapshotWithCacheRetry: %v", err)
}

if snap.Name != "snap-1" || snap.ResourceName != "pvc-1" {
t.Fatalf("got %q on %q, want snap-1 on pvc-1", snap.Name, snap.ResourceName)
}
}

func TestGetSnapshotWithCacheRetry_RealNotFoundStillSurfaces(t *testing.T) {
t.Parallel()

st := store.NewInMemory()

// Snapshot is never created, so every retry returns NotFound —
// the caller's 404 contract (unknown snapshot → 404 envelope)
// must survive the retry wrapper.
start := time.Now()

_, err := getSnapshotWithCacheRetry(t.Context(), st, "pvc-1", "does-not-exist")
if !errors.Is(err, store.ErrNotFound) {
t.Fatalf("expected ErrNotFound, got %v", err)
}

minWait := time.Duration(cacheRetryAttempts-1) * cacheRetryDelay
if elapsed := time.Since(start); elapsed < minWait {
t.Fatalf("retry loop returned in %s, expected at least %s", elapsed, minWait)
}
}

// TestSnapshotGet_SurvivesCacheMissAfterCreate pins the F1 hot path
// end-to-end on the wire: linstor-csi's CreateVolume-from-snapshot
// GETs the snapshot IMMEDIATELY after the create POST (size guard
// before snapshot-restore-resource). With the informer-cached store
// the create writes straight to the apiserver while the follow-up
// read is served from a cache that may not have observed the write
// yet — the GET must absorb that lag instead of 404-ing the whole
// CreateVolume (TestGroupJ/CSICreateVolumeFromSnapshot regression).
func TestSnapshotGet_SurvivesCacheMissAfterCreate(t *testing.T) {
t.Parallel()

st := store.NewInMemory()

if err := st.ResourceDefinitions().Create(t.Context(), &apiv1.ResourceDefinition{Name: "pvc-clone-src"}); err != nil {
t.Fatalf("seed RD: %v", err)
}

flaky := &flakyStore{
Store: st,
snaps: &lagSnapshotStore{
SnapshotStore: st.Snapshots(),
missBudget: 1, // first read after create → NotFound, then real
},
}

base, stop := startServerWithStore(t, flaky)
defer stop()

body, err := json.Marshal(apiv1.Snapshot{Name: "snap-1"})
if err != nil {
t.Fatalf("marshal: %v", err)
}

createResp := httpPost(t, base+"/v1/resource-definitions/pvc-clone-src/snapshots", body)
defer func() { _ = createResp.Body.Close() }()

if createResp.StatusCode != http.StatusCreated {
t.Fatalf("create status: got %d, want 201", createResp.StatusCode)
}

// Immediate follow-up GET — the exact linstor-csi sequence.
getResp := httpGet(t, base+"/v1/resource-definitions/pvc-clone-src/snapshots/snap-1")
defer func() { _ = getResp.Body.Close() }()

if getResp.StatusCode != http.StatusOK {
t.Fatalf("get-after-create status: got %d, want 200 (cache-lag 404 must be absorbed)",
getResp.StatusCode)
}

var got apiv1.Snapshot
if err := json.NewDecoder(getResp.Body).Decode(&got); err != nil {
t.Fatalf("decode snapshot: %v", err)
}

if got.Name != "snap-1" || got.ResourceName != "pvc-clone-src" {
t.Fatalf("got %q on %q, want snap-1 on pvc-clone-src", got.Name, got.ResourceName)
}
}

// TestSnapshotRestore_SurvivesCacheMissAfterCreate pins the second
// half of the same hot path: `POST .../snapshot-restore-resource/
// {snap}` right after the snapshot create. In production with N
// apiserver replicas the restore POST can land on a replica whose
// cache has not observed the snapshot yet — the source-snapshot read
// must absorb the lag instead of failing the restore with 404.
func TestSnapshotRestore_SurvivesCacheMissAfterCreate(t *testing.T) {
t.Parallel()

st := store.NewInMemory()
ctx := t.Context()

if err := st.ResourceDefinitions().Create(ctx, &apiv1.ResourceDefinition{Name: "pvc-clone-src"}); err != nil {
t.Fatalf("seed RD: %v", err)
}

// The restore path refuses vol-less snapshots (Bug 151), so the
// seeded source carries one VD.
if err := st.VolumeDefinitions().Create(ctx, "pvc-clone-src", &apiv1.VolumeDefinition{
VolumeNumber: 0,
SizeKib: 1024,
}); err != nil {
t.Fatalf("seed VD: %v", err)
}

flaky := &flakyStore{
Store: st,
snaps: &lagSnapshotStore{
SnapshotStore: st.Snapshots(),
missBudget: 1, // first read after create → NotFound, then real
},
}

base, stop := startServerWithStore(t, flaky)
defer stop()

body, err := json.Marshal(apiv1.Snapshot{Name: "snap-1"})
if err != nil {
t.Fatalf("marshal: %v", err)
}

createResp := httpPost(t, base+"/v1/resource-definitions/pvc-clone-src/snapshots", body)
defer func() { _ = createResp.Body.Close() }()

if createResp.StatusCode != http.StatusCreated {
t.Fatalf("create status: got %d, want 201", createResp.StatusCode)
}

restoreBody := []byte(`{"to_resource":"pvc-clone-dest"}`)

restoreResp := httpPost(t,
base+"/v1/resource-definitions/pvc-clone-src/snapshot-restore-resource/snap-1", restoreBody)
defer func() { _ = restoreResp.Body.Close() }()

if restoreResp.StatusCode != http.StatusCreated {
t.Fatalf("restore-after-create status: got %d, want 201 (cache-lag 404 must be absorbed)",
restoreResp.StatusCode)
}
}
11 changes: 9 additions & 2 deletions pkg/rest/snapshot_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func (s *Server) handleSnapshotRestoreVolumeDefinition(w http.ResponseWriter, r
return
}

snap, err := s.Store.Snapshots().Get(r.Context(), srcRD, snapName)
// Cache-retry (Bug 124 class): linstor-csi restores VDs right
// after the snapshot create POST; absorb informer-cache lag on
// the source-snapshot read instead of 404-ing the restore.
snap, err := getSnapshotWithCacheRetry(r.Context(), s.Store, srcRD, snapName)
if err != nil {
writeStoreError(w, err)

Expand Down Expand Up @@ -257,7 +260,11 @@ func (s *Server) handleSnapshotRestore(w http.ResponseWriter, r *http.Request) {
return
}

snap, err := s.Store.Snapshots().Get(r.Context(), srcRD, snapName)
// Cache-retry (Bug 124 class): linstor-csi's CreateVolume-from-
// snapshot POSTs the restore right after the snapshot create;
// absorb informer-cache lag on the source-snapshot read instead
// of 404-ing the restore.
snap, err := getSnapshotWithCacheRetry(r.Context(), s.Store, srcRD, snapName)
if err != nil {
writeStoreError(w, err)

Expand Down
8 changes: 7 additions & 1 deletion pkg/rest/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,13 @@ func (s *Server) handleSnapshotGet(w http.ResponseWriter, r *http.Request) {
rd := r.PathValue("rd")
snapName := r.PathValue("snap")

snap, err := s.Store.Snapshots().Get(r.Context(), rd, snapName)
// Cache-retry (Bug 124 class): linstor-csi GETs the snapshot
// immediately after the create POST (CreateVolume-from-snapshot
// size guard). The create wrote straight to the apiserver; this
// read is served from the informer cache, which may not have
// observed the write yet — absorb the lag instead of failing
// the whole CreateVolume with a spurious 404.
snap, err := getSnapshotWithCacheRetry(r.Context(), s.Store, rd, snapName)
if err != nil {
writeStoreError(w, err)

Expand Down