Skip to content
This repository was archived by the owner on Jan 11, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ var (
// Anthos target configuration
flGKEClusterLocation string
flGKECluster string
// TODO(gvso): Support multiple namespaces or all namespaces if none is
// specified.
flGKENamespace string

// Fully-managed target config. Empty array means all regions.
flRegions []string
Expand Down Expand Up @@ -106,7 +103,6 @@ func init() {
flag.StringVar(&flLabelSelector, "label", "rollout-strategy=gradual", "filter services based on a label (e.g. team=backend)")
flag.StringVar(&flGKEClusterLocation, "cluster-location", "", "(-platform=gke only) zone in which the cluster is located")
flag.StringVar(&flGKECluster, "cluster", "", "(-platform=gke only) ID of the cluster")
flag.StringVar(&flGKENamespace, "namespace", "default", "(-platform=gke only) Kubernetes namespace where to look for Knative services")
flag.StringVar(&flRegionsString, "regions", "", "the Cloud Run regions where the services should be looked at")
flag.Var(&flSteps, "step", "a percentage in traffic the candidate should go through")
flag.StringVar(&flStepsString, "steps", "5,20,50,80", "define steps in one flag separated by commas (e.g. 5,30,60)")
Expand Down Expand Up @@ -168,7 +164,7 @@ func main() {
if flPlatform == config.PlatformManaged {
target = config.NewManagedTarget(flProject, flRegions, flLabelSelector)
} else {
target = config.NewGKETarget(flProject, flGKEClusterLocation, flGKECluster, flGKENamespace, flLabelSelector)
target = config.NewGKETarget(flProject, flGKEClusterLocation, flGKECluster, flLabelSelector)
}
healthCriteria := healthCriteriaFromFlags(flMinRequestCount, flErrorRate, flLatencyP99, flLatencyP95, flLatencyP50)
printHealthCriteria(logger, healthCriteria)
Expand Down Expand Up @@ -232,9 +228,6 @@ func validateFlags() error {
if flGKEClusterLocation == "" {
return errors.Errorf("gke: cluster location must be specified")
}
if flGKENamespace == "" {
return errors.Errorf("gke: namespace cannot be empty")
}
return nil
}

Expand Down Expand Up @@ -264,11 +257,9 @@ func flagsToString() string {

if flPlatform == config.PlatformGKE {
str += fmt.Sprintf("-cluster-location=%s\n"+
"-cluster=%s\n"+
"-namespace=%s\n",
"-cluster=%s\n",
flGKEClusterLocation,
flGKECluster,
flGKENamespace,
)
} else {
regionsStr := "all"
Expand Down
36 changes: 20 additions & 16 deletions cmd/operator/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/sheets"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/metrics/stackdriver"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// runRollouts concurrently handles the rollout of the targeted services.
func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Strategy) []error {
svcs, err := getTargetedServices(ctx, logger, strategy.Target)
ctx = util.ContextWithLogger(ctx, logrus.NewEntry(logger))
svcs, err := getTargetedServices(ctx, strategy.Target)
if err != nil {
return []error{errors.Wrap(err, "failed to get targeted services")}
}
Expand Down Expand Up @@ -50,21 +51,15 @@ func runRollouts(ctx context.Context, logger *logrus.Logger, strategy config.Str

// handleRollout manages the rollout process for a single service.
func handleRollout(ctx context.Context, logger *logrus.Logger, service *rollout.ServiceRecord, strategy config.Strategy) error {
lg := logger.WithFields(logrus.Fields{
"project": service.Project,
"service": service.Metadata.Name,
"region": service.Region,
})
lg := logger.WithFields(service.KProvider.LoggingFields()).
WithField("namespace", service.Namespace).
WithField("service", service.Metadata.Name)

client, err := runapi.NewAPIClient(ctx, service.Region)
if err != nil {
return errors.Wrap(err, "failed to initialize Cloud Run API client")
}
metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Project, service.Region, service.Metadata.Name)
metricsProvider, err := chooseMetricsProvider(ctx, lg, service.Location, service)
if err != nil {
return errors.Wrap(err, "failed to initialize metrics provider")
}
roll := rollout.New(ctx, metricsProvider, service, strategy).WithClient(client).WithLogger(lg.Logger)
roll := rollout.New(ctx, metricsProvider, service, strategy).WithLogger(lg)

changed, err := roll.Rollout()
if err != nil {
Expand Down Expand Up @@ -94,11 +89,20 @@ func rolloutErrsToString(errs []error) (errsStr string) {

// chooseMetricsProvider checks the CLI flags and determine which metrics
// provider should be used for the rollout.
func chooseMetricsProvider(ctx context.Context, logger *logrus.Entry, project, region, svcName string) (metrics.Provider, error) {
func chooseMetricsProvider(ctx context.Context, logger *logrus.Entry, location string, svc *rollout.ServiceRecord) (metrics.Provider, error) {
if flGoogleSheetsID != "" {
logger.Debug("using Google Sheets as metrics provider")
return sheets.NewProvider(ctx, flGoogleSheetsID, "", region, svcName)
return sheets.NewProvider(ctx, flGoogleSheetsID, "", location, svc.Metadata.Name)
}

logger.Debug("using Cloud Monitoring (Stackdriver) as metrics provider")
return stackdriver.NewProvider(ctx, project, region, svcName)
provider, err := stackdriver.NewProvider(ctx, flProject, location, svc.Metadata.Name)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize stackdriver provider")
}
if flPlatform == config.PlatformGKE {
logger.Debug("pointing stackdriver to Cloud Run for Anthos")
provider = provider.WithGKEPlatform(svc.Namespace, svc.Metadata.ClusterName)
}
return provider, nil
}
152 changes: 123 additions & 29 deletions cmd/operator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,36 @@ import (
"sync"

"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/config"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/knative/cloudrun"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/rollout"
runapi "github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/run"
"github.com/GoogleCloudPlatform/cloud-run-release-manager/internal/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/api/run/v1"
"k8s.io/client-go/rest"
)

// getTargetedServices returned a list of service records that match the target
// configuration.
func getTargetedServices(ctx context.Context, logger *logrus.Logger, target config.Target) ([]*rollout.ServiceRecord, error) {
func getTargetedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
if target.Platform == config.PlatformManaged {
logger.Debug("getting services on Cloud Run fully managed")
return getManagedServices(ctx, target)
}
logger.Debug("getting services on Cloud Run for Anthos")
return getGKEServices(ctx, target)
}

// getManagedServices returned a list of services from Cloud Run fully managed
// that match the target.
//
// It fetches the matches from each of the specified regions in the target
// configuration. If no regions are explicitly specified, it gets the list of
// Cloud Run regions and queries all of them.
func getManagedServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
logger.Debug("querying Cloud Run API to get all targeted services")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -27,17 +46,94 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf
wg sync.WaitGroup
)

regions, err := determineRegions(ctx, logger, target)
regions, err := determineRegions(ctx, target)
if err != nil {
return nil, errors.Wrap(err, "cannot determine regions")
}

for _, region := range regions {
wg.Add(1)

go func(ctx context.Context, logger *logrus.Logger, region, labelSelector string) {
go func(ctx context.Context, logger *logrus.Entry, project, region, labelSelector string) {
defer wg.Done()

provider, err := cloudrun.NewFullyManagedProvider(ctx, project, region)
if err != nil {
retError = errors.Wrap(err, "failed to initialize Cloud Run fully managed client")
cancel()
return
}
svcs, err := getServicesByLabel(ctx, provider, project, labelSelector)
if err != nil {
retError = err
cancel()
return
}

for _, svc := range svcs {
mu.Lock()
retServices = append(retServices, newServiceRecord(svc, provider, project, region))
mu.Unlock()
}
}(ctx, logger, target.Project, region, target.LabelSelector)
}

wg.Wait()
return retServices, retError
}

// getGKEServices get the services running on Cloud Run for Anthos.
func getGKEServices(ctx context.Context, target config.Target) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
provider, err := cloudrun.NewGKEProvider(ctx, target.Project, target.GKEClusterLocation, target.GKEClusterName)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize Cloud Run for Anthos client")
}

logger.Debugf("querying for services on GKE cluster %s", target.GKEClusterName)
config := &rest.Config{
Transport: provider.HTTPClient.Transport,
Host: provider.Endpoint,
}
svcRecords, err := getKnativeServices(ctx, provider, config, target.LabelSelector)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve services on GKE cluster")
}
for i := 0; i < len(svcRecords); i++ {
svcRecords[i].Location = target.GKEClusterLocation
}
return svcRecords, nil
}

// getKnativeServices gets all the namespaces in a Kubernetes cluster and query
// each of them to get services matching the label selector.
func getKnativeServices(ctx context.Context, provider knative.Provider, config *rest.Config, labelSelector string) ([]*rollout.ServiceRecord, error) {
logger := util.LoggerFrom(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var (
retServices []*rollout.ServiceRecord
retError error
mu sync.Mutex
wg sync.WaitGroup
)

logger.Debug("querying for Kubernetes namespaces")
namespaces, err := knative.Namespaces(config)
if err != nil {
return nil, errors.Wrap(err, "failed to get namespaces")
}
logger.WithField("n", len(namespaces)).Debug("finished retrieving Kubernetes namespaces")

for _, namespace := range namespaces {
wg.Add(1)

go func(ctx context.Context, logger *logrus.Entry, namespace, labelSelector string) {
defer wg.Done()
svcs, err := getServicesByRegionAndLabel(ctx, logger, target.Project, region, target.LabelSelector)

logger.Debugf("querying for services in Kubernetes cluster, namespace %s", namespace)
svcs, err := getServicesByLabel(ctx, provider, namespace, labelSelector)
if err != nil {
retError = err
cancel()
Expand All @@ -46,56 +142,53 @@ func getTargetedServices(ctx context.Context, logger *logrus.Logger, target conf

for _, svc := range svcs {
mu.Lock()
retServices = append(retServices, newServiceRecord(svc, target.Project, region))
retServices = append(retServices, newServiceRecord(svc, provider, namespace, ""))
mu.Unlock()
}

}(ctx, logger, region, target.LabelSelector)
}(ctx, logger, namespace.GetName(), labelSelector)
}

wg.Wait()
return retServices, retError
}

// getServicesByRegionAndLabel returns all the service records that match the
// labelSelector in a specific region.
func getServicesByRegionAndLabel(ctx context.Context, logger *logrus.Logger, project, region, labelSelector string) ([]*run.Service, error) {
// getServicesByLabel returns all the service records that match the label
// selector.
//
// For Cloud Run fully managed, the namespace is the project ID.
func getServicesByLabel(ctx context.Context, provider knative.Provider, namespace, labelSelector string) ([]*run.Service, error) {
logger := util.LoggerFrom(ctx)
lg := logger.WithFields(logrus.Fields{
"region": region,
"namespace": namespace,
"labelSelector": labelSelector,
})

lg.Debug("querying Cloud Run services")
runclient, err := runapi.NewAPIClient(ctx, region)
lg.Debug("querying for services in provider")
svcs, err := provider.ListServices(namespace, labelSelector)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize Cloud Run client")
return nil, errors.Wrapf(err, "failed to get services with label %q", labelSelector)
}

svcs, err := runclient.ServicesWithLabelSelector(project, labelSelector)
if err != nil {
return nil, errors.Wrapf(err, "failed to get services with label %q in region %q", labelSelector, region)
}

lg.WithField("n", len(svcs)).Debug("finished retrieving services from the API")
lg.WithField("n", len(svcs)).Debug("finished retrieving services")
return svcs, nil
}

// determineRegions gets the regions the label selector should be searched at.
// Used for Cloud Run fuly managed.
//
// If the target configuration does not specify any regions, the entire list of
// regions is retrieved from API.
func determineRegions(ctx context.Context, logger *logrus.Logger, target config.Target) ([]string, error) {
func determineRegions(ctx context.Context, target config.Target) ([]string, error) {
logger := util.LoggerFrom(ctx)
regions := target.Regions
if len(regions) != 0 {
logger.Debug("using predefined list of regions, skip querying from API")
return regions, nil
}

logger.Debug("retrieving all regions from the API")

lg := logrus.NewEntry(logger)
ctx = util.ContextWithLogger(ctx, lg)
regions, err := runapi.Regions(ctx, target.Project)
regions, err := cloudrun.FullyManagedRegions(ctx, target.Project)
if err != nil {
return nil, errors.Wrap(err, "cannot get list of regions from Cloud Run API")
}
Expand All @@ -105,10 +198,11 @@ func determineRegions(ctx context.Context, logger *logrus.Logger, target config.
}

// newServiceRecord creates a new service record.
func newServiceRecord(svc *run.Service, project, region string) *rollout.ServiceRecord {
func newServiceRecord(svc *run.Service, provider knative.Provider, namespace, location string) *rollout.ServiceRecord {
return &rollout.ServiceRecord{
Service: svc,
Project: project,
Region: region,
Service: svc,
KProvider: provider,
Namespace: namespace,
Location: location,
}
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ require (
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.6.1
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
google.golang.org/api v0.28.0
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v12.0.0+incompatible
)
Loading