Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions perf-tests/clusterloader2/pkg/imagepreload/imagepreload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ limitations under the License.
package imagepreload

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"strings"
"sync"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/config"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/flags"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/framework"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/framework/client"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/measurement/util"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/measurement/util/informer"
"k8s.io/kubernetes/perf-tests/clusterloader2/pkg/measurement/util/runtimeobjects"
perfutil "k8s.io/kubernetes/perf-tests/clusterloader2/pkg/util"
Expand Down Expand Up @@ -98,9 +100,14 @@ func (c *controller) PreloadImages() error {
defer close(stopCh)

nodeInformer := informer.NewInformer(
kclient,
"nodes",
util.NewObjectSelector(),
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return kclient.CoreV1().Nodes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return kclient.CoreV1().Nodes().Watch(options)
},
},
func(old, new interface{}) { c.checkNode(new, doneNodes) })
if err := informer.StartAndSync(nodeInformer, stopCh, informerTimeout); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package common

import (
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -135,9 +139,16 @@ func (s *serviceCreationLatencyMeasurement) start() error {
s.stopCh = make(chan struct{})

i := informer.NewInformer(
s.client,
"services",
s.selector,
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
s.selector.ApplySelectors(&options)
return s.client.CoreV1().ServicesWithMultiTenancy(s.selector.Namespace, util.GetTenant()).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
s.selector.ApplySelectors(&options)
return s.client.CoreV1().ServicesWithMultiTenancy(s.selector.Namespace, util.GetTenant()).Watch(options)
},
},
func(oldObj, newObj interface{}) {
f := func() {
s.handleObject(oldObj, newObj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package slos

import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"strings"
"time"

Expand Down Expand Up @@ -52,15 +56,24 @@ func createPodStartupLatencyMeasurement() measurement.Measurement {
return &podStartupLatencyMeasurement{
selector: measurementutil.NewObjectSelector(),
podStartupEntries: measurementutil.NewObjectTransitionTimes(podStartupLatencyMeasurementName),
eventQueue: workqueue.New(),
}
}

type eventData struct {
obj interface{}
recvTime time.Time
}

type podStartupLatencyMeasurement struct {
selector *measurementutil.ObjectSelector
isRunning bool
stopCh chan struct{}
podStartupEntries *measurementutil.ObjectTransitionTimes
threshold time.Duration
// This queue can potentially grow indefinitely, beacause we put all changes here.
// Usually it's not recommended pattern, but we need it for measuring PodStartupLatency.
eventQueue *workqueue.Type
}

// Execute supports two actions:
Expand Down Expand Up @@ -111,18 +124,52 @@ func (p *podStartupLatencyMeasurement) start(c clientset.Interface) error {
p.isRunning = true
p.stopCh = make(chan struct{})
i := informer.NewInformer(
c,
"pods",
p.selector,
p.checkPod,
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
p.selector.ApplySelectors(&options)
return c.CoreV1().PodsWithMultiTenancy(p.selector.Namespace, util.GetTenant()).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
p.selector.ApplySelectors(&options)
return c.CoreV1().PodsWithMultiTenancy(p.selector.Namespace, util.GetTenant()).Watch(options)
},
},
p.addEvent,
)
return informer.StartAndSync(i, p.stopCh, informerSyncTimeout)
}

func (p *podStartupLatencyMeasurement) addEvent(_, obj interface{}) {
event := &eventData{obj: obj, recvTime: time.Now()}
p.eventQueue.Add(event)
}

func (p *podStartupLatencyMeasurement) processEvents() {
for p.processNextWorkItem() {
}
}

func (p *podStartupLatencyMeasurement) processNextWorkItem() bool {
item, quit := p.eventQueue.Get()
if quit {
return false
}
defer p.eventQueue.Done(item)

event, ok := item.(*eventData)
if !ok {
klog.Warningf("Couldn't convert work item to evetData: %v", item)
return true
}
p.processEvent(event)
return true
}

func (p *podStartupLatencyMeasurement) stop() {
if p.isRunning {
p.isRunning = false
close(p.stopCh)
p.eventQueue.ShutDown()
}
}

Expand Down Expand Up @@ -203,7 +250,8 @@ func (p *podStartupLatencyMeasurement) gatherScheduleTimes(c clientset.Interface
return nil
}

func (p *podStartupLatencyMeasurement) checkPod(_, obj interface{}) {
func (p *podStartupLatencyMeasurement) processEvent(event *eventData) {
obj, recvTime := event.obj, event.recvTime
if obj == nil {
return
}
Expand All @@ -214,7 +262,7 @@ func (p *podStartupLatencyMeasurement) checkPod(_, obj interface{}) {
if pod.Status.Phase == corev1.PodRunning {
key := createMetaNamespaceKey(pod.Namespace, pod.Name)
if _, found := p.podStartupEntries.Get(key, createPhase); !found {
ct := time.Now()
ct := recvTime
p.podStartupEntries.Set(key, watchPhase, ct)
p.podStartupEntries.Set(key, createPhase, pod.CreationTimestamp.Time)
var startTime metav1.Time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func (w *waitForControlledPodsRunningMeasurement) updateOpResourceVersion(runtim
func (w *waitForControlledPodsRunningMeasurement) getObjectCountAndMaxVersion() (int, uint64, error) {
var desiredCount int
var maxResourceVersion uint64
klog.Infof("debug: list pods with selector: %v and %v", w.selector.LabelSelector, w.selector.FieldSelector)
objects, err := runtimeobjects.ListRuntimeObjectsForKind(
w.clusterFramework.GetDynamicClients().GetClient(),
w.gvr, w.kind, w.selector.Namespace, w.selector.LabelSelector, w.selector.FieldSelector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewResourceUsageGatherer(c clientset.Interface, host string, port int, prov
// Tracks kube-system pods if no valid PodList is passed in.
var err error
if pods == nil {
klog.Infof("debug: list kube-system pod option")
pods, err = c.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("listing pods error: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,19 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

measurementutil "k8s.io/kubernetes/perf-tests/clusterloader2/pkg/measurement/util"
perfutil "k8s.io/kubernetes/perf-tests/clusterloader2/pkg/util"
)

// NewInformer creates a new informer
// for given kind, namespace, fieldSelector and labelSelector.
func NewInformer(
c clientset.Interface,
kind string,
selector *measurementutil.ObjectSelector,
lw cache.ListerWatcher,
handleObj func(interface{}, interface{}),
) cache.SharedInformer {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = selector.FieldSelector
options.LabelSelector = selector.LabelSelector
}
listerWatcher := cache.NewFilteredListWatchFromClientWithMultiTenancy(c.CoreV1(), kind, selector.Namespace, optionsModifier, perfutil.GetTenant())
informer := cache.NewSharedInformer(listerWatcher, nil, 0)
informer := cache.NewSharedInformer(lw, nil, 0)
addEventHandler(informer, handleObj)

return informer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func initPodStore(c clientset.Interface) (*PodStore, error) {

// List returns list of pods (that satisfy conditions provided to NewPodStore).
func (s *PodStore) List() []*v1.Pod {
klog.Infof("debug: podStore List")
objects := s.Store.List()
klog.Infof("debug: got %v objects", len(objects))
pods := make([]*v1.Pod, 0, len(objects))
for _, o := range objects {
pods = append(pods, o.(*v1.Pod))
Expand Down Expand Up @@ -230,6 +232,7 @@ func NewPVCStore(c clientset.Interface, selector *ObjectSelector) (*PVCStore, er

// List returns list of pvcs (that satisfy conditions provided to NewPVCStore).
func (s *PVCStore) List() []*v1.PersistentVolumeClaim {
klog.Infof("debug: pvcStore List")
objects := s.Store.List()
pvcs := make([]*v1.PersistentVolumeClaim, 0, len(objects))
for _, o := range objects {
Expand Down Expand Up @@ -266,6 +269,7 @@ func NewPVStore(c clientset.Interface, selector *ObjectSelector) (*PVStore, erro

// List returns list of pvs (that satisfy conditions provided to NewPVStore).
func (s *PVStore) List() []*v1.PersistentVolume {
klog.Infof("debug: pvStore List")
objects := s.Store.List()
pvs := make([]*v1.PersistentVolume, 0, len(objects))
for _, o := range objects {
Expand Down Expand Up @@ -302,6 +306,7 @@ func NewNodeStore(c clientset.Interface, selector *ObjectSelector) (*NodeStore,

// List returns list of nodes that satisfy conditions provided to NewNodeStore.
func (s *NodeStore) List() []*v1.Node {
klog.Infof("debug: nodeStore List")
objects := s.Store.List()
nodes := make([]*v1.Node, 0, len(objects))
for _, o := range objects {
Expand Down
6 changes: 6 additions & 0 deletions perf-tests/clusterloader2/pkg/measurement/util/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (o *ObjectSelector) String() string {
return CreateSelectorsString(o.Namespace, o.LabelSelector, o.FieldSelector)
}

// ApplySelectors sets label and field selectors in a given ListOptions object.
func (o *ObjectSelector) ApplySelectors(options *metav1.ListOptions) {
options.FieldSelector = o.FieldSelector
options.LabelSelector = o.LabelSelector
}

// CreateSelectorsString creates a string representation for given namespace, label selector and field selector.
func CreateSelectorsString(namespace, labelSelector, fieldSelector string) string {
var selectorsStrings []string
Expand Down
16 changes: 14 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
"net/http"
"reflect"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -599,6 +600,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
klog.Infof("debug 1: list from storage")
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}

Expand All @@ -613,10 +615,11 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
klog.Infof("debug 2: list from storage")
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}

trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
trace := utiltrace.New(fmt.Sprintf("gettolist cacher %v: List", c.objectType.String()))
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
Expand Down Expand Up @@ -668,6 +671,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
klog.Infof("debug 1: list from storage %v", c.objectType.String())
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}

Expand All @@ -682,10 +686,12 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
klog.Infof("debug 2: list from storage")
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}

trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
klog.Infof(fmt.Sprintf("list cacher %v: List", c.objectType.String()))
trace := utiltrace.New(fmt.Sprintf("list cacher %v: List", c.objectType.String()))
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
Expand All @@ -706,6 +712,12 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if err != nil {
return err
}

klog.Infof(fmt.Sprintf("Listed %d items from cache", len(objs)))
if len(objs) > 3000 {
klog.Infof("debug stack: \n %v", string(debug.Stack()))
}

trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues [
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
for _, matchValue := range matchValues {
klog.Infof("debug: matchValue indexName-value: %v-%v", matchValue.IndexName, matchValue.Value)
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
return result, w.resourceVersion, nil
}
Expand All @@ -368,6 +369,7 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, matchValues [
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *utiltrace.Trace) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
// RUnlock the rlock started in the watiUntilFreshAndBlock()
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
Expand Down