From 8eb0697a8c43aabb081fa2f1216dfc677f10e5a1 Mon Sep 17 00:00:00 2001 From: Ricardo Maraschini Date: Thu, 23 Apr 2026 13:41:27 +0200 Subject: [PATCH 1/3] OCPBUGS-78480: add watchlist new semantic support to project watcher support watch list by handling sendInitialEvents option, which sends initial state followed by a bookmark with initial-events-end annotation when enabled. --- .../apiserver/registry/project/proxy/proxy.go | 15 +++- pkg/project/auth/watch.go | 24 ++++- pkg/project/auth/watch_test.go | 87 +++++++++++++++++-- 3 files changed, 117 insertions(+), 9 deletions(-) diff --git a/pkg/project/apiserver/registry/project/proxy/proxy.go b/pkg/project/apiserver/registry/project/proxy/proxy.go index 76c239d6e5..de2cab672e 100644 --- a/pkg/project/apiserver/registry/project/proxy/proxy.go +++ b/pkg/project/apiserver/registry/project/proxy/proxy.go @@ -114,7 +114,18 @@ func (s *REST) Watch(ctx context.Context, options *metainternal.ListOptions) (wa return nil, fmt.Errorf("no user") } - includeAllExistingProjects := (options != nil) && options.ResourceVersion == "0" + // includeAllExistingProjects (RV="0") triggers sending initial state. + // sendBookmark (from SendInitialEvents) triggers sending a bookmark with + // k8s.io/initial-events-end annotation after initial events (if any). + includeAllExistingProjects, sendBookmark := false, false + if options != nil { + if options.ResourceVersion == "0" { + includeAllExistingProjects = true + } + if options.SendInitialEvents != nil && *options.SendInitialEvents { + sendBookmark = true + } + } allowedNamespaces, err := scope.ScopesToVisibleNamespaces(userInfo.GetExtra()[authorizationapi.ScopesKey], s.authCache.GetClusterRoleLister(), true) if err != nil { @@ -122,7 +133,7 @@ func (s *REST) Watch(ctx context.Context, options *metainternal.ListOptions) (wa } m := projectutil.MatchProject(apihelpers.InternalListOptionsToSelectors(options)) - watcher := projectauth.NewUserProjectWatcher(userInfo, allowedNamespaces, s.projectCache, s.authCache, includeAllExistingProjects, m) + watcher := projectauth.NewUserProjectWatcher(userInfo, allowedNamespaces, s.projectCache, s.authCache, includeAllExistingProjects, m, sendBookmark) s.authCache.AddWatcher(watcher) go watcher.Watch() diff --git a/pkg/project/auth/watch.go b/pkg/project/auth/watch.go index 59f387ae08..9bf67ca229 100644 --- a/pkg/project/auth/watch.go +++ b/pkg/project/auth/watch.go @@ -64,6 +64,8 @@ type userProjectWatcher struct { initialProjects []corev1.Namespace // knownProjects maps name to resourceVersion knownProjects map[string]string + + sendBookmark bool } var ( @@ -72,7 +74,7 @@ var ( watchChannelHWM kstorage.HighWaterMark ) -func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate) *userProjectWatcher { +func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate, sendBookmark bool) *userProjectWatcher { namespaces, _ := authCache.List(user, labels.Everything()) knownProjects := map[string]string{} for _, namespace := range namespaces.Items { @@ -98,6 +100,8 @@ func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projec authCache: authCache, initialProjects: initialProjects, knownProjects: knownProjects, + + sendBookmark: sendBookmark, } w.emit = func(e watch.Event) { // if dealing with project events, ensure that we only emit events for projects @@ -186,6 +190,13 @@ func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, users, // Watch pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be // called as a goroutine. +// +// Design decision: This implementation balances KEP-3157 watch-list support with backward +// compatibility. Initial events are sent only when rv="0" (includeAllExistingProjects=true). +// For other rv values with SendInitialEvents=true, only the bookmark is sent. This approach +// acknowledges that project visibility depends on both namespace objects and RBAC state. Since +// RBAC changes don't update namespace ResourceVersions, permission-filtered views cannot provide +// the same consistency guarantees (resourceVersionMatch=NotOlderThan) as direct object watches. func (w *userProjectWatcher) Watch() { defer close(w.outgoing) defer func() { @@ -214,6 +225,17 @@ func (w *userProjectWatcher) Watch() { }) } + if w.sendBookmark { + w.emit(watch.Event{ + Type: watch.Bookmark, + Object: &projectapi.Project{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + }, + }) + } + for { select { case err := <-w.cacheError: diff --git a/pkg/project/auth/watch_test.go b/pkg/project/auth/watch_test.go index 105dac8ab3..2345190c38 100644 --- a/pkg/project/auth/watch_test.go +++ b/pkg/project/auth/watch_test.go @@ -23,7 +23,7 @@ import ( projectutil "github.com/openshift/openshift-apiserver/pkg/project/util" ) -func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, namespaces ...*corev1.Namespace) (*userProjectWatcher, *fakeAuthCache, chan struct{}) { +func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, includeAllExistingProjects bool, namespaces ...*corev1.Namespace) (*userProjectWatcher, *fakeAuthCache, chan struct{}) { objects := []runtime.Object{} for i := range namespaces { objects = append(objects, namespaces[i]) @@ -37,11 +37,14 @@ func newTestWatcher(username string, groups []string, predicate storage.Selectio "", ) fakeAuthCache := &fakeAuthCache{} + if includeAllExistingProjects { + fakeAuthCache.namespaces = namespaces + } stopCh := make(chan struct{}) go projectCache.Run(stopCh) - return NewUserProjectWatcher(&user.DefaultInfo{Name: username, Groups: groups}, sets.NewString("*"), projectCache, fakeAuthCache, false, predicate), fakeAuthCache, stopCh + return NewUserProjectWatcher(&user.DefaultInfo{Name: username, Groups: groups}, sets.NewString("*"), projectCache, fakeAuthCache, includeAllExistingProjects, predicate, false), fakeAuthCache, stopCh } type fakeAuthCache struct { @@ -66,7 +69,7 @@ func (w *fakeAuthCache) List(userInfo user.Info, selector labels.Selector) (*cor } func TestFullIncoming(t *testing.T) { - watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newNamespaces("ns-01")...) + watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01")...) defer close(stopCh) watcher.cacheIncoming = make(chan watch.Event) @@ -115,7 +118,7 @@ func TestFullIncoming(t *testing.T) { } func TestAddModifyDeleteEventsByUser(t *testing.T) { - watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newNamespaces("ns-01")...) + watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01")...) defer close(stopCh) go watcher.Watch() @@ -158,7 +161,7 @@ func TestProjectSelectionPredicate(t *testing.T) { field := fields.ParseSelectorOrDie("metadata.name=ns-03") m := projectutil.MatchProject(labels.Everything(), field) - watcher, _, stopCh := newTestWatcher("bob", nil, m, newNamespaces("ns-01", "ns-02", "ns-03")...) + watcher, _, stopCh := newTestWatcher("bob", nil, m, false, newNamespaces("ns-01", "ns-02", "ns-03")...) defer close(stopCh) if watcher.emit == nil { @@ -220,7 +223,7 @@ func TestProjectSelectionPredicate(t *testing.T) { } func TestAddModifyDeleteEventsByGroup(t *testing.T) { - watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), newNamespaces("ns-01")...) + watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), false, newNamespaces("ns-01")...) defer close(stopCh) go watcher.Watch() @@ -271,3 +274,75 @@ func newNamespaces(names ...string) []*corev1.Namespace { func matchAllPredicate() storage.SelectionPredicate { return projectutil.MatchProject(labels.Everything(), fields.Everything()) } + +func TestSendInitialEventsBookmark(t *testing.T) { + t.Run("with rv=0", func(t *testing.T) { + // rv="0" behavior: send initial events + bookmark + watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), true, newNamespaces("ns-01", "ns-02")...) + defer close(stopCh) + + // Enable bookmark for watch-list + watcher.sendBookmark = true + + go watcher.Watch() + + // expect 2 initial Added events + for i := 0; i < 2; i++ { + select { + case event := <-watcher.ResultChan(): + if event.Type != watch.Added { + t.Errorf("expected Added, got %v", event.Type) + } + case <-time.After(3 * time.Second): + t.Fatalf("timeout waiting for initial event %d", i) + } + } + + // expect bookmark with annotation + select { + case event := <-watcher.ResultChan(): + if event.Type != watch.Bookmark { + t.Errorf("expected Bookmark, got %v", event.Type) + } + project := event.Object.(*projectapi.Project) + if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" { + t.Errorf("expected initial-events-end annotation") + } + case <-time.After(3 * time.Second): + t.Fatalf("timeout waiting for bookmark") + } + }) + + t.Run("without rv=0", func(t *testing.T) { + // rv!="0" behavior: send bookmark only, no initial events + watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01", "ns-02")...) + defer close(stopCh) + + // Enable bookmark for watch-list + watcher.sendBookmark = true + + go watcher.Watch() + + // expect bookmark with annotation immediately + select { + case event := <-watcher.ResultChan(): + if event.Type != watch.Bookmark { + t.Errorf("expected Bookmark, got %v", event.Type) + } + project := event.Object.(*projectapi.Project) + if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" { + t.Errorf("expected initial-events-end annotation") + } + case <-time.After(3 * time.Second): + t.Fatalf("timeout waiting for bookmark") + } + + // verify no additional events + select { + case event := <-watcher.ResultChan(): + t.Fatalf("unexpected event after bookmark: %v", event) + case <-time.After(500 * time.Millisecond): + // expected - no more events + } + }) +} From e4e3717c7452f240fc7f34824d4e2e4213c19be6 Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Mon, 15 Jun 2026 12:13:06 -0400 Subject: [PATCH 2/3] add resource version --- .../validation/whitelist/whitelister_test.go | 94 +++++++++++++++++++ pkg/project/auth/watch.go | 14 ++- pkg/project/auth/watch_test.go | 63 ++++++++++--- 3 files changed, 156 insertions(+), 15 deletions(-) diff --git a/pkg/image/apis/image/validation/whitelist/whitelister_test.go b/pkg/image/apis/image/validation/whitelist/whitelister_test.go index d7c0275d23..0afbdc3c71 100644 --- a/pkg/image/apis/image/validation/whitelist/whitelister_test.go +++ b/pkg/image/apis/image/validation/whitelist/whitelister_test.go @@ -392,6 +392,100 @@ func assertExpectedError(t *testing.T, a, e error) { } } +// TestDefaultAllowedRegistries verifies that the default allowed registries +// (as configured by the cluster-openshift-apiserver-operator defaultconfig.yaml) +// correctly allow the expected registries and deny unlisted ones. +func TestDefaultAllowedRegistries(t *testing.T) { + ctx := context.TODO() + + // These are the default registries added by the operator's defaultconfig.yaml. + defaultAllowed := openshiftcontrolplanev1.AllowedRegistries{ + {DomainName: "image-registry.openshift-image-registry.svc:5000"}, + {DomainName: "quay.io"}, + {DomainName: "registry.redhat.io"}, + } + + rw, err := NewRegistryWhitelister(defaultAllowed, nil) + if err != nil { + t.Fatalf("unexpected error creating whitelister: %v", err) + } + + for _, tc := range []struct { + name string + hostname string + expectErr bool + }{ + { + name: "internal registry is allowed", + hostname: "image-registry.openshift-image-registry.svc:5000", + expectErr: false, + }, + { + name: "quay.io is allowed", + hostname: "quay.io", + expectErr: false, + }, + { + name: "registry.redhat.io is allowed", + hostname: "registry.redhat.io", + expectErr: false, + }, + { + name: "gcr.io is denied", + hostname: "gcr.io", + expectErr: true, + }, + { + name: "docker.io is denied", + hostname: "docker.io", + expectErr: true, + }, + { + name: "random registry is denied", + hostname: "my-private-registry.example.com", + expectErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := rw.AdmitHostname(ctx, tc.hostname, WhitelistTransportSecure) + if tc.expectErr && err == nil { + t.Errorf("expected error for hostname %q, got nil", tc.hostname) + } + if !tc.expectErr && err != nil { + t.Errorf("expected no error for hostname %q, got: %v", tc.hostname, err) + } + }) + } +} + +// TestEmptyAllowedRegistriesDeniesAll verifies that when AllowedRegistriesForImport +// is empty (the new default behavior after removing WhitelistAllRegistries fallback), +// all registries are denied. This is the deny-all behavior introduced by +// openshift/openshift-apiserver#607. +func TestEmptyAllowedRegistriesDeniesAll(t *testing.T) { + ctx := context.TODO() + + rw, err := NewRegistryWhitelister(openshiftcontrolplanev1.AllowedRegistries{}, nil) + if err != nil { + t.Fatalf("unexpected error creating whitelister: %v", err) + } + + for _, hostname := range []string{ + "quay.io", + "registry.redhat.io", + "docker.io", + "gcr.io", + "image-registry.openshift-image-registry.svc:5000", + } { + t.Run(hostname, func(t *testing.T) { + err := rw.AdmitHostname(ctx, hostname, WhitelistTransportSecure) + if err == nil { + t.Errorf("expected deny-all whitelister to reject %q, but it was allowed", hostname) + } + }) + } +} + func TestWhitelistRepository(t *testing.T) { registries := mkAllowed(false, "registry.example.org:5000") whitelister, err := NewRegistryWhitelister(registries, nil) diff --git a/pkg/project/auth/watch.go b/pkg/project/auth/watch.go index 9bf67ca229..44da7c9b9c 100644 --- a/pkg/project/auth/watch.go +++ b/pkg/project/auth/watch.go @@ -2,6 +2,7 @@ package auth import ( "errors" + "strconv" "sync" "k8s.io/klog/v2" @@ -65,7 +66,8 @@ type userProjectWatcher struct { // knownProjects maps name to resourceVersion knownProjects map[string]string - sendBookmark bool + sendBookmark bool + bookmarkResourceVersion string } var ( @@ -77,8 +79,12 @@ var ( func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate, sendBookmark bool) *userProjectWatcher { namespaces, _ := authCache.List(user, labels.Everything()) knownProjects := map[string]string{} + var maxRV int for _, namespace := range namespaces.Items { knownProjects[namespace.Name] = namespace.ResourceVersion + if n, err := strconv.Atoi(namespace.ResourceVersion); err == nil && n > maxRV { + maxRV = n + } } // this is optional. If they don't request it, don't include it. @@ -101,7 +107,8 @@ func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projec initialProjects: initialProjects, knownProjects: knownProjects, - sendBookmark: sendBookmark, + sendBookmark: sendBookmark, + bookmarkResourceVersion: strconv.Itoa(maxRV), } w.emit = func(e watch.Event) { // if dealing with project events, ensure that we only emit events for projects @@ -230,7 +237,8 @@ func (w *userProjectWatcher) Watch() { Type: watch.Bookmark, Object: &projectapi.Project{ ObjectMeta: metav1.ObjectMeta{ - Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + ResourceVersion: w.bookmarkResourceVersion, + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, }, }, }) diff --git a/pkg/project/auth/watch_test.go b/pkg/project/auth/watch_test.go index 2345190c38..ea3a4f95d6 100644 --- a/pkg/project/auth/watch_test.go +++ b/pkg/project/auth/watch_test.go @@ -1,6 +1,7 @@ package auth import ( + "fmt" "testing" "time" @@ -275,15 +276,51 @@ func matchAllPredicate() storage.SelectionPredicate { return projectutil.MatchProject(labels.Everything(), fields.Everything()) } +func newNamespacesWithRV(names ...string) []*corev1.Namespace { + ret := []*corev1.Namespace{} + for i, name := range names { + ret = append(ret, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + ResourceVersion: fmt.Sprintf("%d", i+10), + }, + }) + } + return ret +} + +func newBookmarkTestWatcher(username string, includeAllExistingProjects bool, namespaces ...*corev1.Namespace) (*userProjectWatcher, chan struct{}) { + objects := []runtime.Object{} + for i := range namespaces { + objects = append(objects, namespaces[i]) + } + mockClient := fakev1.NewSimpleClientset(objects...) + informers := informersv1.NewSharedInformerFactory(mockClient, controller.NoResyncPeriodFunc()) + projectCache := projectcache.NewProjectCache( + informers.Core().V1().Namespaces().Informer(), + mockClient.CoreV1().Namespaces(), + "", + ) + fakeAuthCache := &fakeAuthCache{namespaces: namespaces} + stopCh := make(chan struct{}) + go projectCache.Run(stopCh) + w := NewUserProjectWatcher( + &user.DefaultInfo{Name: username}, + sets.NewString("*"), + projectCache, + fakeAuthCache, + includeAllExistingProjects, + matchAllPredicate(), + true, + ) + return w, stopCh +} + func TestSendInitialEventsBookmark(t *testing.T) { t.Run("with rv=0", func(t *testing.T) { - // rv="0" behavior: send initial events + bookmark - watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), true, newNamespaces("ns-01", "ns-02")...) + watcher, stopCh := newBookmarkTestWatcher("bob", true, newNamespacesWithRV("ns-01", "ns-02")...) defer close(stopCh) - // Enable bookmark for watch-list - watcher.sendBookmark = true - go watcher.Watch() // expect 2 initial Added events @@ -298,7 +335,7 @@ func TestSendInitialEventsBookmark(t *testing.T) { } } - // expect bookmark with annotation + // expect bookmark with annotation and ResourceVersion select { case event := <-watcher.ResultChan(): if event.Type != watch.Bookmark { @@ -308,22 +345,21 @@ func TestSendInitialEventsBookmark(t *testing.T) { if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" { t.Errorf("expected initial-events-end annotation") } + if project.ResourceVersion != "11" { + t.Errorf("expected bookmark ResourceVersion %q, got %q", "11", project.ResourceVersion) + } case <-time.After(3 * time.Second): t.Fatalf("timeout waiting for bookmark") } }) t.Run("without rv=0", func(t *testing.T) { - // rv!="0" behavior: send bookmark only, no initial events - watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01", "ns-02")...) + watcher, stopCh := newBookmarkTestWatcher("bob", false, newNamespacesWithRV("ns-01", "ns-02")...) defer close(stopCh) - // Enable bookmark for watch-list - watcher.sendBookmark = true - go watcher.Watch() - // expect bookmark with annotation immediately + // expect bookmark with annotation and ResourceVersion immediately select { case event := <-watcher.ResultChan(): if event.Type != watch.Bookmark { @@ -333,6 +369,9 @@ func TestSendInitialEventsBookmark(t *testing.T) { if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" { t.Errorf("expected initial-events-end annotation") } + if project.ResourceVersion != "11" { + t.Errorf("expected bookmark ResourceVersion %q, got %q", "11", project.ResourceVersion) + } case <-time.After(3 * time.Second): t.Fatalf("timeout waiting for bookmark") } From 2a7784b9cc28f718ebe9ec4958948c7a9bd86d2f Mon Sep 17 00:00:00 2001 From: Allen Ray Date: Wed, 17 Jun 2026 09:49:55 -0400 Subject: [PATCH 3/3] skip predicate filtering for bookmark watch events Co-Authored-By: Claude Opus 4.6 (1M context) --- pkg/project/auth/watch.go | 12 ++++---- pkg/project/auth/watch_test.go | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/pkg/project/auth/watch.go b/pkg/project/auth/watch.go index 44da7c9b9c..140c70b4b0 100644 --- a/pkg/project/auth/watch.go +++ b/pkg/project/auth/watch.go @@ -111,11 +111,13 @@ func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projec bookmarkResourceVersion: strconv.Itoa(maxRV), } w.emit = func(e watch.Event) { - // if dealing with project events, ensure that we only emit events for projects - // that match the field or label selector specified by a consumer - if project, ok := e.Object.(*projectapi.Project); ok { - if matches, err := predicate.Matches(project); err != nil || !matches { - return + if e.Type != watch.Bookmark { + // if dealing with project events, ensure that we only emit events for projects + // that match the field or label selector specified by a consumer + if project, ok := e.Object.(*projectapi.Project); ok { + if matches, err := predicate.Matches(project); err != nil || !matches { + return + } } } diff --git a/pkg/project/auth/watch_test.go b/pkg/project/auth/watch_test.go index ea3a4f95d6..282c8eea0f 100644 --- a/pkg/project/auth/watch_test.go +++ b/pkg/project/auth/watch_test.go @@ -353,6 +353,56 @@ func TestSendInitialEventsBookmark(t *testing.T) { } }) + t.Run("bookmark bypasses field selector predicate", func(t *testing.T) { + // Verify that bookmark events are delivered even when a field selector + // (e.g. metadata.name=) would reject the bookmark's empty Name. + // This is critical for oc delete --wait which watches a specific project + // and needs the bookmark to complete the initial events stream. + field := fields.ParseSelectorOrDie("metadata.name=ns-01") + m := projectutil.MatchProject(labels.Everything(), field) + + objects := []runtime.Object{} + namespaces := newNamespacesWithRV("ns-01", "ns-02") + for i := range namespaces { + objects = append(objects, namespaces[i]) + } + mockClient := fakev1.NewSimpleClientset(objects...) + informers := informersv1.NewSharedInformerFactory(mockClient, controller.NoResyncPeriodFunc()) + projectCache := projectcache.NewProjectCache( + informers.Core().V1().Namespaces().Informer(), + mockClient.CoreV1().Namespaces(), + "", + ) + fakeAuthCache := &fakeAuthCache{namespaces: namespaces} + stopCh := make(chan struct{}) + defer close(stopCh) + go projectCache.Run(stopCh) + + w := NewUserProjectWatcher( + &user.DefaultInfo{Name: "bob"}, + sets.NewString("*"), + projectCache, + fakeAuthCache, + false, + m, + true, + ) + go w.Watch() + + select { + case event := <-w.ResultChan(): + if event.Type != watch.Bookmark { + t.Errorf("expected Bookmark, got %v", event.Type) + } + project := event.Object.(*projectapi.Project) + if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" { + t.Errorf("expected initial-events-end annotation") + } + case <-time.After(3 * time.Second): + t.Fatalf("timeout waiting for bookmark — predicate likely filtered it out") + } + }) + t.Run("without rv=0", func(t *testing.T) { watcher, stopCh := newBookmarkTestWatcher("bob", false, newNamespacesWithRV("ns-01", "ns-02")...) defer close(stopCh)