From 9c0528aee4845b367c42d8bfecb983894df610a0 Mon Sep 17 00:00:00 2001 From: Andrei Kvapil Date: Fri, 12 Jun 2026 22:56:13 +0300 Subject: [PATCH 1/2] fix(rest): absorb informer-cache lag on snapshot reads in the CSI restore hot path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit linstor-csi's CreateVolume-from-snapshot sequence POSTs the snapshot create and immediately GETs it back (size guard), then POSTs snapshot-restore-resource. The create writes straight to the apiserver while the follow-up reads are served from the controller-runtime informer cache, which may not have observed the write yet — the spurious 404 fails the whole CreateVolume (TestGroupJ/CSICreateVolumeFromSnapshot in the release gate). This is the same read-after-write class Bug 124 already mitigated for RG/RD reads on the CreateVolume path; extend the established cache-retry helper family to snapshot reads and use it in the snapshot GET handler and both restore handlers. Co-Authored-By: Claude Signed-off-by: Andrei Kvapil --- pkg/rest/cache_retry.go | 43 ++++++++++++++++++++++++++++++++++++ pkg/rest/snapshot_restore.go | 11 +++++++-- pkg/rest/snapshots.go | 8 ++++++- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/pkg/rest/cache_retry.go b/pkg/rest/cache_retry.go index c2fed53e..1da7d31f 100644 --- a/pkg/rest/cache_retry.go +++ b/pkg/rest/cache_retry.go @@ -125,3 +125,46 @@ func getRDWithCacheRetry(ctx context.Context, st store.Store, name string) (apiv return apiv1.ResourceDefinition{}, errors.Wrapf(err, "get resource definition %q after %d retries", name, cacheRetryAttempts) } + +// getSnapshotWithCacheRetry returns the Snapshot `snapName` on RD +// `rdName`, retrying on store.ErrNotFound to absorb informer-cache +// lag after a fresh write that landed on a sibling apiserver replica. +// Same semantics as getRGWithCacheRetry. +// +// linstor-csi's CreateVolume-from-snapshot sequence is `POST +// /v1/resource-definitions/{rd}/snapshots`, then an immediate `GET +// /v1/resource-definitions/{rd}/snapshots/{snap}` (size guard), then +// `POST .../snapshot-restore-resource/{snap}`. The follow-up reads +// race the informer cache exactly like the RG/RD reads above — the +// create wrote straight to the apiserver, the read is served from a +// cache that may not have observed the write yet — and a spurious +// 404 fails the whole CreateVolume. +func getSnapshotWithCacheRetry(ctx context.Context, st store.Store, rdName, snapName string) (apiv1.Snapshot, error) { + var ( + snap apiv1.Snapshot + err error + ) + + for attempt := range cacheRetryAttempts { + snap, err = st.Snapshots().Get(ctx, rdName, snapName) + if err == nil { + return snap, nil + } + + if !errors.Is(err, store.ErrNotFound) { + return apiv1.Snapshot{}, errors.Wrapf(err, "get snapshot %s/%s", rdName, snapName) + } + + if attempt == cacheRetryAttempts-1 { + break + } + + select { + case <-ctx.Done(): + return apiv1.Snapshot{}, errors.Wrap(ctx.Err(), "get snapshot: context cancelled") + case <-time.After(cacheRetryDelay): + } + } + + return apiv1.Snapshot{}, errors.Wrapf(err, "get snapshot %s/%s after %d retries", rdName, snapName, cacheRetryAttempts) +} diff --git a/pkg/rest/snapshot_restore.go b/pkg/rest/snapshot_restore.go index c01fb2ba..41dddeec 100644 --- a/pkg/rest/snapshot_restore.go +++ b/pkg/rest/snapshot_restore.go @@ -101,7 +101,10 @@ func (s *Server) handleSnapshotRestoreVolumeDefinition(w http.ResponseWriter, r return } - snap, err := s.Store.Snapshots().Get(r.Context(), srcRD, snapName) + // Cache-retry (Bug 124 class): linstor-csi restores VDs right + // after the snapshot create POST; absorb informer-cache lag on + // the source-snapshot read instead of 404-ing the restore. + snap, err := getSnapshotWithCacheRetry(r.Context(), s.Store, srcRD, snapName) if err != nil { writeStoreError(w, err) @@ -257,7 +260,11 @@ func (s *Server) handleSnapshotRestore(w http.ResponseWriter, r *http.Request) { return } - snap, err := s.Store.Snapshots().Get(r.Context(), srcRD, snapName) + // Cache-retry (Bug 124 class): linstor-csi's CreateVolume-from- + // snapshot POSTs the restore right after the snapshot create; + // absorb informer-cache lag on the source-snapshot read instead + // of 404-ing the restore. + snap, err := getSnapshotWithCacheRetry(r.Context(), s.Store, srcRD, snapName) if err != nil { writeStoreError(w, err) diff --git a/pkg/rest/snapshots.go b/pkg/rest/snapshots.go index 9ca44e0f..f2ae91fc 100644 --- a/pkg/rest/snapshots.go +++ b/pkg/rest/snapshots.go @@ -273,7 +273,13 @@ func (s *Server) handleSnapshotGet(w http.ResponseWriter, r *http.Request) { rd := r.PathValue("rd") snapName := r.PathValue("snap") - snap, err := s.Store.Snapshots().Get(r.Context(), rd, snapName) + // Cache-retry (Bug 124 class): linstor-csi GETs the snapshot + // immediately after the create POST (CreateVolume-from-snapshot + // size guard). The create wrote straight to the apiserver; this + // read is served from the informer cache, which may not have + // observed the write yet — absorb the lag instead of failing + // the whole CreateVolume with a spurious 404. + snap, err := getSnapshotWithCacheRetry(r.Context(), s.Store, rd, snapName) if err != nil { writeStoreError(w, err) From a249784e12d27ded84e4593aa42518b5e352a8ad Mon Sep 17 00:00:00 2001 From: Andrei Kvapil Date: Fri, 12 Jun 2026 22:56:29 +0300 Subject: [PATCH 2/2] test(rest): pin snapshot create-then-immediately-get under cache lag Model the informer-cache read-after-write lag in a SnapshotStore wrapper (a successful Create arms NotFound misses for subsequent Gets of that snapshot) and pin the linstor-csi hot-path contract on the wire: snapshot create POST followed by an immediate GET and an immediate snapshot-restore-resource POST must both succeed once the cache catches up within the retry budget, while a genuinely unknown snapshot still surfaces 404. Co-Authored-By: Claude Signed-off-by: Andrei Kvapil --- pkg/rest/cache_retry_test.go | 241 ++++++++++++++++++++++++++++++++++- 1 file changed, 237 insertions(+), 4 deletions(-) diff --git a/pkg/rest/cache_retry_test.go b/pkg/rest/cache_retry_test.go index d17b0f43..65ead14f 100644 --- a/pkg/rest/cache_retry_test.go +++ b/pkg/rest/cache_retry_test.go @@ -24,6 +24,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "sync" "sync/atomic" "testing" "time" @@ -78,13 +79,64 @@ func (f *flakyRDStore) Get(ctx context.Context, name string) (apiv1.ResourceDefi return f.ResourceDefinitionStore.Get(ctx, name) //nolint:wrapcheck // pass-through } -// flakyStore lets us substitute the RG / RD views with flaky ones -// while everything else keeps using the wrapped InMemory. +// lagSnapshotStore wraps an underlying SnapshotStore and models the +// production read-after-write informer-cache lag: a successful +// Create() arms `missBudget` — the next `missBudget` Get() calls on +// the just-created snapshot return store.ErrNotFound (the cache has +// not observed the apiserver write yet), then reads delegate to the +// real store (the watch event arrived). This is the exact shape +// linstor-csi's CreateVolume-from-snapshot trips over: POST snapshot +// create, then an immediate GET on a replica whose cache trails. +type lagSnapshotStore struct { + store.SnapshotStore + + missBudget int + + mu sync.Mutex + pending map[string]int +} + +func (f *lagSnapshotStore) Create(ctx context.Context, snap *apiv1.Snapshot) error { + err := f.SnapshotStore.Create(ctx, snap) + if err != nil { + return err //nolint:wrapcheck // pass-through + } + + f.mu.Lock() + defer f.mu.Unlock() + + if f.pending == nil { + f.pending = map[string]int{} + } + + f.pending[snap.ResourceName+"/"+snap.Name] = f.missBudget + + return nil +} + +func (f *lagSnapshotStore) Get(ctx context.Context, rdName, snapName string) (apiv1.Snapshot, error) { + f.mu.Lock() + + key := rdName + "/" + snapName + if left := f.pending[key]; left > 0 { + f.pending[key] = left - 1 + f.mu.Unlock() + + return apiv1.Snapshot{}, errors.Wrapf(store.ErrNotFound, "snapshot %q on RD %q", snapName, rdName) + } + f.mu.Unlock() + + return f.SnapshotStore.Get(ctx, rdName, snapName) //nolint:wrapcheck // pass-through +} + +// flakyStore lets us substitute the RG / RD / Snapshot views with +// flaky ones while everything else keeps using the wrapped InMemory. type flakyStore struct { store.Store - rgs *flakyRGStore - rds *flakyRDStore + rgs *flakyRGStore + rds *flakyRDStore + snaps *lagSnapshotStore } func (f *flakyStore) ResourceGroups() store.ResourceGroupStore { @@ -103,6 +155,14 @@ func (f *flakyStore) ResourceDefinitions() store.ResourceDefinitionStore { return f.rds } +func (f *flakyStore) Snapshots() store.SnapshotStore { + if f.snaps == nil { + return f.Store.Snapshots() + } + + return f.snaps +} + func TestGetRGWithCacheRetry_SucceedsAfterCacheMiss(t *testing.T) { t.Parallel() @@ -252,3 +312,176 @@ func TestSpawn_SurvivesCacheMissOnRGGet(t *testing.T) { t.Fatalf("expected at least 2 RG Get attempts, got %d", flaky.rgs.calls.Load()) } } + +func TestGetSnapshotWithCacheRetry_SucceedsAfterCacheMiss(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + if err := st.ResourceDefinitions().Create(t.Context(), &apiv1.ResourceDefinition{Name: "pvc-1"}); err != nil { + t.Fatalf("seed RD: %v", err) + } + + flaky := &flakyStore{ + Store: st, + snaps: &lagSnapshotStore{ + SnapshotStore: st.Snapshots(), + missBudget: 2, // two cache misses after the create, then real + }, + } + + err := flaky.Snapshots().Create(t.Context(), + &apiv1.Snapshot{Name: "snap-1", ResourceName: "pvc-1"}) + if err != nil { + t.Fatalf("seed snapshot: %v", err) + } + + snap, err := getSnapshotWithCacheRetry(t.Context(), flaky, "pvc-1", "snap-1") + if err != nil { + t.Fatalf("getSnapshotWithCacheRetry: %v", err) + } + + if snap.Name != "snap-1" || snap.ResourceName != "pvc-1" { + t.Fatalf("got %q on %q, want snap-1 on pvc-1", snap.Name, snap.ResourceName) + } +} + +func TestGetSnapshotWithCacheRetry_RealNotFoundStillSurfaces(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + // Snapshot is never created, so every retry returns NotFound — + // the caller's 404 contract (unknown snapshot → 404 envelope) + // must survive the retry wrapper. + start := time.Now() + + _, err := getSnapshotWithCacheRetry(t.Context(), st, "pvc-1", "does-not-exist") + if !errors.Is(err, store.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } + + minWait := time.Duration(cacheRetryAttempts-1) * cacheRetryDelay + if elapsed := time.Since(start); elapsed < minWait { + t.Fatalf("retry loop returned in %s, expected at least %s", elapsed, minWait) + } +} + +// TestSnapshotGet_SurvivesCacheMissAfterCreate pins the F1 hot path +// end-to-end on the wire: linstor-csi's CreateVolume-from-snapshot +// GETs the snapshot IMMEDIATELY after the create POST (size guard +// before snapshot-restore-resource). With the informer-cached store +// the create writes straight to the apiserver while the follow-up +// read is served from a cache that may not have observed the write +// yet — the GET must absorb that lag instead of 404-ing the whole +// CreateVolume (TestGroupJ/CSICreateVolumeFromSnapshot regression). +func TestSnapshotGet_SurvivesCacheMissAfterCreate(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + + if err := st.ResourceDefinitions().Create(t.Context(), &apiv1.ResourceDefinition{Name: "pvc-clone-src"}); err != nil { + t.Fatalf("seed RD: %v", err) + } + + flaky := &flakyStore{ + Store: st, + snaps: &lagSnapshotStore{ + SnapshotStore: st.Snapshots(), + missBudget: 1, // first read after create → NotFound, then real + }, + } + + base, stop := startServerWithStore(t, flaky) + defer stop() + + body, err := json.Marshal(apiv1.Snapshot{Name: "snap-1"}) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + createResp := httpPost(t, base+"/v1/resource-definitions/pvc-clone-src/snapshots", body) + defer func() { _ = createResp.Body.Close() }() + + if createResp.StatusCode != http.StatusCreated { + t.Fatalf("create status: got %d, want 201", createResp.StatusCode) + } + + // Immediate follow-up GET — the exact linstor-csi sequence. + getResp := httpGet(t, base+"/v1/resource-definitions/pvc-clone-src/snapshots/snap-1") + defer func() { _ = getResp.Body.Close() }() + + if getResp.StatusCode != http.StatusOK { + t.Fatalf("get-after-create status: got %d, want 200 (cache-lag 404 must be absorbed)", + getResp.StatusCode) + } + + var got apiv1.Snapshot + if err := json.NewDecoder(getResp.Body).Decode(&got); err != nil { + t.Fatalf("decode snapshot: %v", err) + } + + if got.Name != "snap-1" || got.ResourceName != "pvc-clone-src" { + t.Fatalf("got %q on %q, want snap-1 on pvc-clone-src", got.Name, got.ResourceName) + } +} + +// TestSnapshotRestore_SurvivesCacheMissAfterCreate pins the second +// half of the same hot path: `POST .../snapshot-restore-resource/ +// {snap}` right after the snapshot create. In production with N +// apiserver replicas the restore POST can land on a replica whose +// cache has not observed the snapshot yet — the source-snapshot read +// must absorb the lag instead of failing the restore with 404. +func TestSnapshotRestore_SurvivesCacheMissAfterCreate(t *testing.T) { + t.Parallel() + + st := store.NewInMemory() + ctx := t.Context() + + if err := st.ResourceDefinitions().Create(ctx, &apiv1.ResourceDefinition{Name: "pvc-clone-src"}); err != nil { + t.Fatalf("seed RD: %v", err) + } + + // The restore path refuses vol-less snapshots (Bug 151), so the + // seeded source carries one VD. + if err := st.VolumeDefinitions().Create(ctx, "pvc-clone-src", &apiv1.VolumeDefinition{ + VolumeNumber: 0, + SizeKib: 1024, + }); err != nil { + t.Fatalf("seed VD: %v", err) + } + + flaky := &flakyStore{ + Store: st, + snaps: &lagSnapshotStore{ + SnapshotStore: st.Snapshots(), + missBudget: 1, // first read after create → NotFound, then real + }, + } + + base, stop := startServerWithStore(t, flaky) + defer stop() + + body, err := json.Marshal(apiv1.Snapshot{Name: "snap-1"}) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + createResp := httpPost(t, base+"/v1/resource-definitions/pvc-clone-src/snapshots", body) + defer func() { _ = createResp.Body.Close() }() + + if createResp.StatusCode != http.StatusCreated { + t.Fatalf("create status: got %d, want 201", createResp.StatusCode) + } + + restoreBody := []byte(`{"to_resource":"pvc-clone-dest"}`) + + restoreResp := httpPost(t, + base+"/v1/resource-definitions/pvc-clone-src/snapshot-restore-resource/snap-1", restoreBody) + defer func() { _ = restoreResp.Body.Close() }() + + if restoreResp.StatusCode != http.StatusCreated { + t.Fatalf("restore-after-create status: got %d, want 201 (cache-lag 404 must be absorbed)", + restoreResp.StatusCode) + } +}