From b41b9e48f58990e3167e131a3111f481b4d4323f Mon Sep 17 00:00:00 2001 From: John Sanda Date: Tue, 25 Aug 2020 16:46:45 -0400 Subject: [PATCH 01/11] initial support for using C* authentication with reaper Before Reaper can successfully deploy, Cassandra roles have to be created and the reaper keyspace needs to be initialized. When the statefulset is created, the reaper container is not deployed initially anymore. When the schema init job completes, the new ReaperStatus.SchemaInitialized field is set to true. On the subsequent reconciliation loop the statefulset(s) are updated to deploy the reaper container. --- .../templates/customresourcedefinition.yaml | 5 ++ ...datastax.com_cassandradatacenters_crd.yaml | 5 ++ .../v1beta1/cassandradatacenter_types.go | 36 ++++++++- .../v1beta1/zz_generated.deepcopy.go | 17 ++++ operator/pkg/reconciliation/constructor.go | 77 ++++++++++++++----- .../pkg/reconciliation/reconcile_racks.go | 31 +++++++- .../pkg/reconciliation/reconcile_reaper.go | 77 +++++++++++++++---- operator/pkg/reconciliation/secrets.go | 77 +++++++++++++++++++ 8 files changed, 283 insertions(+), 42 deletions(-) diff --git a/charts/cass-operator-chart/templates/customresourcedefinition.yaml b/charts/cass-operator-chart/templates/customresourcedefinition.yaml index f14e4cb86..2147454f2 100644 --- a/charts/cass-operator-chart/templates/customresourcedefinition.yaml +++ b/charts/cass-operator-chart/templates/customresourcedefinition.yaml @@ -6280,6 +6280,11 @@ spec: quietPeriod: format: date-time type: string + reaperStatus: + properties: + schemaInitialized: + type: boolean + type: object superUserUpserted: description: Deprecated. Use usersUpserted instead. The timestamp at which CQL superuser credentials were last upserted to the management diff --git a/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml b/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml index fd6c24ae0..a3149e56f 100644 --- a/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml +++ b/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml @@ -6270,6 +6270,11 @@ spec: quietPeriod: format: date-time type: string + reaperStatus: + properties: + schemaInitialized: + type: boolean + type: object superUserUpserted: description: Deprecated. Use usersUpserted instead. The timestamp at which CQL superuser credentials were last upserted to the management diff --git a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go index 70933e432..21ca33314 100644 --- a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go +++ b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go @@ -6,13 +6,13 @@ package v1beta1 import ( "encoding/json" "fmt" - "os" - "github.com/Jeffail/gabs" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "os" + "strings" "github.com/datastax/cass-operator/operator/pkg/serverconfig" "github.com/datastax/cass-operator/operator/pkg/utils" @@ -337,6 +337,10 @@ func NewDatacenterCondition(conditionType DatacenterConditionType, status corev1 } } +type ReaperStatus struct { + SchemaInitialized bool `json:"schemaInitialized,omitempty"` +} + // CassandraDatacenterStatus defines the observed state of CassandraDatacenter // +k8s:openapi-gen=true type CassandraDatacenterStatus struct { @@ -376,6 +380,8 @@ type CassandraDatacenterStatus struct { // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + ReaperStatus ReaperStatus `json:"reaperStatus,omitempty"` } // +genclient @@ -577,6 +583,32 @@ func (dc *CassandraDatacenter) GetSuperuserSecretNamespacedName() types.Namespac } } +func (dc *CassandraDatacenter) GetReaperUserSecretNamespacedName() types.NamespacedName { + name := dc.Spec.ClusterName + "-reaper" + namespace := dc.Namespace + + return types.NamespacedName{Name: name, Namespace: namespace} +} + +func (dc *CassandraDatacenter) IsReaperEnabled() bool { + return dc.Spec.Reaper != nil && dc.Spec.Reaper.Enabled && dc.Spec.ServerType == "cassandra" +} + +func (dc *CassandraDatacenter) DeployReaper() bool { + return dc.IsReaperEnabled() && dc.Status.ReaperStatus.SchemaInitialized +} + +func (dc *CassandraDatacenter) IsAuthenticationEnabled() (bool, error) { + b, err := dc.Spec.Config.MarshalJSON() + if err != nil { + return false, err + } + + s := string(b) + + return strings.Contains(s, "PasswordAuthenticator"), nil +} + // GetConfigAsJSON gets a JSON-encoded string suitable for passing to configBuilder func (dc *CassandraDatacenter) GetConfigAsJSON() (string, error) { diff --git a/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go b/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go index b986d628a..b89eaeb55 100644 --- a/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go +++ b/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go @@ -177,6 +177,7 @@ func (in *CassandraDatacenterStatus) DeepCopyInto(out *CassandraDatacenterStatus copy(*out, *in) } in.QuietPeriod.DeepCopyInto(&out.QuietPeriod) + out.ReaperStatus = in.ReaperStatus return } @@ -405,6 +406,22 @@ func (in *ReaperConfig) DeepCopy() *ReaperConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReaperStatus) DeepCopyInto(out *ReaperStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReaperStatus. +func (in *ReaperStatus) DeepCopy() *ReaperStatus { + if in == nil { + return nil + } + out := new(ReaperStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StorageConfig) DeepCopyInto(out *StorageConfig) { *out = *in diff --git a/operator/pkg/reconciliation/constructor.go b/operator/pkg/reconciliation/constructor.go index 40084dc8f..015206d6b 100644 --- a/operator/pkg/reconciliation/constructor.go +++ b/operator/pkg/reconciliation/constructor.go @@ -554,9 +554,12 @@ func buildContainers(dc *api.CassandraDatacenter, serverVolumeMounts []corev1.Vo loggerContainer.Resources = *getResourcesOrDefault(&dc.Spec.SystemLoggerResources, &DefaultsLoggerContainer) containers := []corev1.Container{cassContainer, loggerContainer} - if dc.Spec.Reaper != nil && dc.Spec.Reaper.Enabled && dc.Spec.ServerType == "cassandra" { - reaperContainer := buildReaperContainer(dc) - containers = append(containers, reaperContainer) + if dc.DeployReaper() { + reaperContainer, err := buildReaperContainer(dc) + if err != nil { + return containers, err + } + containers = append(containers, *reaperContainer) } return containers, nil @@ -658,7 +661,53 @@ func buildPodTemplateSpec(dc *api.CassandraDatacenter, zone string, rackName str return baseTemplate, nil } -func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job { +func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { + envVars := []corev1.EnvVar{ + { + Name: "KEYSPACE", + Value: ReaperKeyspace, + }, + { + Name: "CONTACT_POINTS", + Value: dc.GetSeedServiceName(), + }, + { + Name: "REPLICATION", + Value: getReaperReplication(dc), + }, + } + + authEnabled, err := dc.IsAuthenticationEnabled() + if err != nil { + return nil, err + } + + if authEnabled { + secretName := dc.GetReaperUserSecretNamespacedName() + envVars = append(envVars, corev1.EnvVar{ + Name: "USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "username", + }, + }, + }) + envVars = append(envVars, corev1.EnvVar{ + Name: "PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "password", + }, + }, + }) + } + return &v1.Job{ TypeMeta: metav1.TypeMeta{ Kind: "Job", @@ -677,27 +726,15 @@ func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job { { Name: getReaperSchemaInitJobName(dc), Image: ReaperSchemaInitJobImage, - ImagePullPolicy: corev1.PullIfNotPresent, - Env: []corev1.EnvVar{ - { - Name: "KEYSPACE", - Value: ReaperKeyspace, - }, - { - Name: "CONTACT_POINTS", - Value: dc.GetSeedServiceName(), - }, - { - Name: "REPLICATION", - Value: getReaperReplication(dc), - }, - }, + //ImagePullPolicy: corev1.PullIfNotPresent, + ImagePullPolicy: corev1.PullAlways, + Env: envVars, }, }, }, }, }, - } + }, nil } func addVolumes(dc *api.CassandraDatacenter, baseTemplate *corev1.PodTemplateSpec) []corev1.Volume { diff --git a/operator/pkg/reconciliation/reconcile_racks.go b/operator/pkg/reconciliation/reconcile_racks.go index 4b376a5bd..28b0695ac 100644 --- a/operator/pkg/reconciliation/reconcile_racks.go +++ b/operator/pkg/reconciliation/reconcile_racks.go @@ -111,6 +111,18 @@ func (rc *ReconciliationContext) CheckSuperuserSecretCreation() result.Reconcile return result.Continue() } +func (rc *ReconciliationContext) CheckReaperUserSecretCreation() result.ReconcileResult { + rc.ReqLogger.Info("reconcile_racks::CheckReaperUserSecretCreation") + + _, err := rc.retrieveReaperUserSecretOrCreate() + if err != nil { + rc.ReqLogger.Error(err, "error retrieving Reaper user secret for CassandraDatacenter.") + return result.Error(err) + } + + return result.Continue() +} + func (rc *ReconciliationContext) CheckInternodeCredentialCreation() result.ReconcileResult { rc.ReqLogger.Info("reconcile_racks::CheckInternodeCredentialCreation") @@ -759,6 +771,13 @@ func (rc *ReconciliationContext) GetUsers() []api.CassandraUser { SecretName: dc.GetSuperuserSecretNamespacedName().Name, }) + if dc.IsReaperEnabled() { + users = append(users, api.CassandraUser{ + Superuser: true, + SecretName: dc.GetReaperUserSecretNamespacedName().Name, + }) + } + return users } @@ -2069,6 +2088,10 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } + if recResult := rc.CheckReaperUserSecretCreation(); recResult.Completed() { + return recResult.Output() + } + if recResult := rc.CheckInternodeCredentialCreation(); recResult.Completed() { return recResult.Output() } @@ -2105,10 +2128,6 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } - if recResult := rc.CheckReaperSchemaInitialized(); recResult.Completed() { - return recResult.Output() - } - if recResult := rc.CheckRollingRestart(); recResult.Completed() { return recResult.Output() } @@ -2129,6 +2148,10 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } + if recResult := rc.CheckReaperSchemaInitialized(); recResult.Completed() { + return recResult.Output() + } + if recResult := rc.CheckClearActionConditions(); recResult.Completed() { return recResult.Output() } diff --git a/operator/pkg/reconciliation/reconcile_reaper.go b/operator/pkg/reconciliation/reconcile_reaper.go index b78cb90ca..a671443cb 100644 --- a/operator/pkg/reconciliation/reconcile_reaper.go +++ b/operator/pkg/reconciliation/reconcile_reaper.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "math" + "sigs.k8s.io/controller-runtime/pkg/client" "strconv" ) @@ -30,12 +31,57 @@ const ( ReaperSchemaInitJobImage = "jsanda/reaper-init-keyspace:latest" ) -func buildReaperContainer(dc *api.CassandraDatacenter) corev1.Container { +func buildReaperContainer(dc *api.CassandraDatacenter) (*corev1.Container, error) { ports := []corev1.ContainerPort{ {Name: "ui", ContainerPort: ReaperUIPort, Protocol: "TCP"}, {Name: "admin", ContainerPort: ReaperAdminPort, Protocol: "TCP"}, } + envVars := []corev1.EnvVar{ + {Name: "REAPER_STORAGE_TYPE", Value: "cassandra"}, + {Name: "REAPER_ENABLE_DYNAMIC_SEED_LIST", Value: "false"}, + {Name: "REAPER_DATACENTER_AVAILABILITY", Value: "SIDECAR"}, + {Name: "REAPER_SERVER_APP_PORT", Value: strconv.Itoa(ReaperUIPort)}, + {Name: "REAPER_SERVER_ADMIN_PORT", Value: strconv.Itoa(ReaperAdminPort)}, + {Name: "REAPER_CASS_CLUSTER_NAME", Value: dc.ClusterName}, + {Name: "REAPER_CASS_CONTACT_POINTS", Value: fmt.Sprintf("[%s]", dc.GetSeedServiceName())}, + {Name: "REAPER_AUTH_ENABLED", Value: "false"}, + {Name: "REAPER_JMX_AUTH_USERNAME", Value: ""}, + {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ""}, + } + + cassandraAuthEnabled, err := dc.IsAuthenticationEnabled() + if err != nil { + return nil, err + } + + if cassandraAuthEnabled { + secretName := dc.GetReaperUserSecretNamespacedName() + envVars = append(envVars, corev1.EnvVar{Name: "REAPER_CASS_AUTH_ENABLED", Value: "true"}) + envVars = append(envVars, corev1.EnvVar{ + Name: "REAPER_CASS_AUTH_USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "username", + }, + }, + }) + envVars = append(envVars, corev1.EnvVar{ + Name: "REAPER_CASS_AUTH_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "password", + }, + }, + }) + } + container := corev1.Container{ Name: ReaperContainerName, Image: getReaperImage(dc), @@ -43,22 +89,11 @@ func buildReaperContainer(dc *api.CassandraDatacenter) corev1.Container { Ports: ports, LivenessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, int(60 * dc.Spec.Size), 10), ReadinessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 30, 15), - Env: []corev1.EnvVar{ - {Name: "REAPER_STORAGE_TYPE", Value: "cassandra"}, - {Name: "REAPER_ENABLE_DYNAMIC_SEED_LIST", Value: "false"}, - {Name: "REAPER_DATACENTER_AVAILABILITY", Value: "SIDECAR"}, - {Name: "REAPER_SERVER_APP_PORT", Value: strconv.Itoa(ReaperUIPort)}, - {Name: "REAPER_SERVER_ADMIN_PORT", Value: strconv.Itoa(ReaperAdminPort)}, - {Name: "REAPER_CASS_CLUSTER_NAME", Value: dc.ClusterName}, - {Name: "REAPER_CASS_CONTACT_POINTS", Value: fmt.Sprintf("[%s]", dc.GetSeedServiceName())}, - {Name: "REAPER_AUTH_ENABLED", Value: "false"}, - {Name: "REAPER_JMX_AUTH_USERNAME", Value: ""}, - {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ""}, - }, + Env: envVars, Resources: *getResourcesOrDefault(&dc.Spec.Reaper.Resources, &DefaultsReaperContainer), } - return container + return &container, nil } func getReaperImage(dc *api.CassandraDatacenter) string { @@ -81,7 +116,7 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile rc.ReqLogger.Info("reconcile_reaper::CheckReaperSchemaInitialized") - if rc.Datacenter.Spec.Reaper == nil || !rc.Datacenter.Spec.Reaper.Enabled { + if !rc.Datacenter.IsReaperEnabled() { return result.Continue() } @@ -91,7 +126,11 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: rc.Datacenter.Namespace, Name: jobName}, schemaJob) if err != nil && errors.IsNotFound(err) { // Create the job - schemaJob := buildInitReaperSchemaJob(rc.Datacenter) + schemaJob, err := buildInitReaperSchemaJob(rc.Datacenter) + if err != nil { + rc.ReqLogger.Error(err, "failed to create Reaper schema init job") + return result.Error(err) + } rc.ReqLogger.Info("creating Reaper schema init job", ReaperSchemaInitJob, schemaJob.Name) if err := setControllerReference(rc.Datacenter, schemaJob, rc.Scheme); err != nil { rc.ReqLogger.Error(err, "failed to set owner reference", ReaperSchemaInitJob, schemaJob.Name) @@ -106,6 +145,12 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile } else if err != nil { return result.Error(err) } else if jobFinished(schemaJob) { + patch := client.MergeFrom(rc.Datacenter.DeepCopy()) + rc.Datacenter.Status.ReaperStatus.SchemaInitialized = true + if err = rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { + rc.ReqLogger.Error(err, "error updating the reaper status") + return result.Error(err) + } return result.Continue() } else { return result.RequeueSoon(2) diff --git a/operator/pkg/reconciliation/secrets.go b/operator/pkg/reconciliation/secrets.go index 905a023fe..fd2cc461b 100644 --- a/operator/pkg/reconciliation/secrets.go +++ b/operator/pkg/reconciliation/secrets.go @@ -50,6 +50,12 @@ func generateUtf8Password() (string, error) { } func buildDefaultSuperuserSecret(dc *api.CassandraDatacenter) (*corev1.Secret, error) { + //if dc.ShouldGenerateSuperuserSecret() { + // return buildUserSecret(dc.GetSuperuserSecretNamespacedName(), dc.Spec.ClusterName + "-superuser") + // + //} + // + //return nil, nil var secret *corev1.Secret = nil if dc.ShouldGenerateSuperuserSecret() { @@ -79,6 +85,39 @@ func buildDefaultSuperuserSecret(dc *api.CassandraDatacenter) (*corev1.Secret, e return secret, nil } +func buildReaperUserSecret(dc *api.CassandraDatacenter) (*corev1.Secret, error) { + if dc.IsReaperEnabled() { + return buildUserSecret(dc.GetReaperUserSecretNamespacedName(), dc.Spec.ClusterName + "-reaper") + } + + return nil, nil +} + +func buildUserSecret(secretNamespacedName types.NamespacedName, username string) (*corev1.Secret, error) { + secret := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: secretNamespacedName.Namespace, + Name: secretNamespacedName.Name, + }, + } + + password, err := generateUtf8Password() + if err != nil { + return nil, fmt.Errorf("Failed to generate password for user %s: %w", username, err) + } + + secret.Data = map[string][]byte{ + "username": []byte(username), + "password": []byte(password), + } + + return secret, nil +} + func (rc *ReconciliationContext) retrieveSecret(secretNamespacedName types.NamespacedName) (*corev1.Secret, error) { secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -109,6 +148,12 @@ func (rc *ReconciliationContext) retrieveSuperuserSecret() (*corev1.Secret, erro return rc.retrieveSecret(secretNamespacedName) } +func (rc *ReconciliationContext) retrieveReaperUserSecret() (*corev1.Secret, error) { + dc := rc.Datacenter + secretNamespacedName := dc.GetReaperUserSecretNamespacedName() + return rc.retrieveSecret(secretNamespacedName) +} + func (rc *ReconciliationContext) retrieveSuperuserSecretOrCreateDefault() (*corev1.Secret, error) { dc := rc.Datacenter @@ -136,6 +181,38 @@ func (rc *ReconciliationContext) retrieveSuperuserSecretOrCreateDefault() (*core return secret, nil } +func (rc *ReconciliationContext) retrieveReaperUserSecretOrCreate() (*corev1.Secret, error) { + dc := rc.Datacenter + + // TODO if we go from enabled to disabled, should the secret get deleted? + if !dc.IsReaperEnabled() { + return nil, nil + } + + secret, retrieveErr := rc.retrieveReaperUserSecret() + if retrieveErr != nil { + if errors.IsNotFound(retrieveErr) { + secret, err := buildReaperUserSecret(dc) + + if err == nil && secret == nil { + return nil, retrieveErr + } + + if err == nil { + err = rc.Client.Create(rc.Ctx, secret) + } + + if err != nil { + return nil, fmt.Errorf("Failed to create reaper user secret: %w", err) + } + } + } else { + return nil, retrieveErr + } + + return secret, nil +} + func (rc *ReconciliationContext) createInternodeCACredential() (*corev1.Secret, error) { secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{ From 17b2474689408105a1c08488f20a4a5bf58e14be Mon Sep 17 00:00:00 2001 From: John Sanda Date: Tue, 25 Aug 2020 18:00:47 -0400 Subject: [PATCH 02/11] update tests --- .../v1beta1/cassandradatacenter_types_test.go | 56 +++++++++++++++++++ .../pkg/reconciliation/constructor_test.go | 10 ++++ .../reconciliation/reconcile_reaper_test.go | 16 +++++- 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go index 71bffd818..637c87c7f 100644 --- a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go +++ b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go @@ -211,6 +211,62 @@ func Test_GenerateBaseConfigString(t *testing.T) { } } +func TestCassandraDataCenter_IsAuthenticationEnabled(t *testing.T) { + tests := []struct { + name string + dc *CassandraDatacenter + want bool + errString string + }{ + { + name: "auth enabled", + dc: &CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: "exampleDC", + }, + Spec: CassandraDatacenterSpec{ + ClusterName: "exampleCluster", + Config: []byte("{\"cassandra-yaml\":{\"authenticator\":\"PasswordAuthenticator\"}}"), + }, + }, + want: true, + errString: "", + }, + { + name: "auth disabled", + dc: &CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: "exampleDC", + }, + Spec: CassandraDatacenterSpec{ + ClusterName: "exampleCluster", + Config: []byte("{\"cassandra-yaml\":{\"authenticator\":\"AllowAuthenticator\"}}"), + }, + }, + want: false, + errString: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.dc.IsAuthenticationEnabled() + if got != tt.want { + t.Errorf("IsAuthenticationEnabled() got = %v, want %v", got, tt.want) + } + if err == nil { + if tt.errString != "" { + t.Errorf("IsAuthenticationEnabled() err = %v, want %v", err, tt.errString) + } + } else { + if err.Error() != tt.errString { + t.Errorf("IsAuthenticationEnabled() err = %v, want %v", err, tt.errString) + } + } + }) + } +} + func TestCassandraDatacenter_GetContainerPorts(t *testing.T) { type fields struct { TypeMeta metav1.TypeMeta diff --git a/operator/pkg/reconciliation/constructor_test.go b/operator/pkg/reconciliation/constructor_test.go index 11d35ff06..4e45139a5 100644 --- a/operator/pkg/reconciliation/constructor_test.go +++ b/operator/pkg/reconciliation/constructor_test.go @@ -221,6 +221,11 @@ func TestCassandraDatacenter_buildContainers_reaper_resources(t *testing.T) { }, }, }, + Status: api.CassandraDatacenterStatus{ + ReaperStatus: api.ReaperStatus{ + SchemaInitialized: true, + }, + }, } containers, err := buildContainers(dc, []corev1.VolumeMount{}) @@ -242,6 +247,11 @@ func TestCassandraDatacenter_buildContainers_reaper_resources_set_when_not_speci Enabled: true, }, }, + Status: api.CassandraDatacenterStatus{ + ReaperStatus: api.ReaperStatus{ + SchemaInitialized: true, + }, + }, } containers, err := buildContainers(dc, []corev1.VolumeMount{}) diff --git a/operator/pkg/reconciliation/reconcile_reaper_test.go b/operator/pkg/reconciliation/reconcile_reaper_test.go index 6cd3e0cc6..d90688cb4 100644 --- a/operator/pkg/reconciliation/reconcile_reaper_test.go +++ b/operator/pkg/reconciliation/reconcile_reaper_test.go @@ -16,7 +16,9 @@ import ( func TestReconcileReaper_buildInitReaperSchemaJob(t *testing.T) { dc := newCassandraDatacenter() - job := buildInitReaperSchemaJob(dc) + job, err := buildInitReaperSchemaJob(dc) + + assert.NoError(t, err) assert.Equal(t, getReaperSchemaInitJobName(dc), job.Name) assert.Equal(t, dc.GetDatacenterLabels(), job.Labels) @@ -49,6 +51,7 @@ func TestReconcileReaper_newReaperService(t *testing.T) { func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { rc, _, cleanupMockScr := setupTest() + rc.Datacenter.Spec.ServerType = "cassandra" rc.Datacenter.Spec.Reaper = &api.ReaperConfig{Enabled: true} defer cleanupMockScr() @@ -69,6 +72,17 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { err = rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: rc.Datacenter.Namespace, Name: jobName}, job) assert.NoErrorf(t, err, "failed to get job %s", jobName) + + job.Status.Conditions = append(job.Status.Conditions, v1batch.JobCondition{ + Type: v1batch.JobComplete, + Status: corev1.ConditionTrue, + }) + + err = rc.Client.Status().Update(rc.Ctx, job) + assert.NoError(t, err) + + reconcileResult = rc.CheckReaperSchemaInitialized() + assert.False(t, reconcileResult.Completed()) } func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { From 211c7de0f6f8f80cf31ed62977cd2e163d76184a Mon Sep 17 00:00:00 2001 From: John Sanda Date: Tue, 25 Aug 2020 18:19:01 -0400 Subject: [PATCH 03/11] update and tests --- .../pkg/reconciliation/constructor_test.go | 22 +++++++++++++ operator/pkg/reconciliation/secrets_test.go | 32 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/operator/pkg/reconciliation/constructor_test.go b/operator/pkg/reconciliation/constructor_test.go index 4e45139a5..02d2775bb 100644 --- a/operator/pkg/reconciliation/constructor_test.go +++ b/operator/pkg/reconciliation/constructor_test.go @@ -264,6 +264,28 @@ func TestCassandraDatacenter_buildContainers_reaper_resources_set_when_not_speci } } +func TestCassandraDatacenter_buildContainers_no_reaper(t *testing.T) { + dc := &api.CassandraDatacenter{ + Spec: api.CassandraDatacenterSpec{ + ClusterName: "bob", + ServerType: "cassandra", + ServerVersion: "3.11.6", + Reaper: &api.ReaperConfig{ + Enabled: true, + }, + }, + } + + containers, err := buildContainers(dc, []corev1.VolumeMount{}) + assert.NotNil(t, containers, "Unexpected containers containers received") + assert.Nil(t, err, "Unexpected error encountered") + + assert.Len(t, containers, 2, "Unexpected number of containers containers returned") + for _, container := range containers { + assert.NotEqual(t, container.Name, ReaperContainerName) + } +} + func TestCassandraDatacenter_buildPodTemplateSpec_containers_merge(t *testing.T) { testContainer := corev1.Container{} testContainer.Name = "test-container" diff --git a/operator/pkg/reconciliation/secrets_test.go b/operator/pkg/reconciliation/secrets_test.go index e624eb2a7..a5d1ac627 100644 --- a/operator/pkg/reconciliation/secrets_test.go +++ b/operator/pkg/reconciliation/secrets_test.go @@ -70,6 +70,38 @@ func Test_buildDefaultSuperuserSecret(t *testing.T) { }) } +func Test_buildReaperUserSecret(t *testing.T) { + t.Run("test reaper user secret is created", func(t *testing.T) { + dc := &api.CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: "exampleDC", + Namespace: "examplens", + }, + Spec: api.CassandraDatacenterSpec{ + ClusterName: "exampleCluster", + ServerType: "cassandra", + Reaper: &api.ReaperConfig{ + Enabled: true, + }, + }, + } + secret, err := buildReaperUserSecret(dc) + if err != nil { + t.Errorf("should not have returned an error %w", err) + return + } + + if secret.ObjectMeta.Namespace != dc.ObjectMeta.Namespace { + t.Errorf("expected secret in namespace '%s' but was '%s", dc.ObjectMeta.Namespace, secret.ObjectMeta.Namespace) + } + + expectedSecretName := fmt.Sprintf("%s-reaper", dc.Spec.ClusterName) + if secret.ObjectMeta.Name != expectedSecretName { + t.Errorf("expected default secret name '%s' but was '%s'", expectedSecretName, secret.ObjectMeta.Name) + } + }) +} + func Test_validateCassandraUserSecretContent(t *testing.T) { var ( name = "datacenter-example" From e3e45f6b144ac742348e9b85b1afecbe1012aa25 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Tue, 25 Aug 2020 18:25:27 -0400 Subject: [PATCH 04/11] requeue with delay after schema job finishes to allow schema changes to propagate --- operator/pkg/reconciliation/reconcile_reaper.go | 5 ++++- operator/pkg/reconciliation/reconcile_reaper_test.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/operator/pkg/reconciliation/reconcile_reaper.go b/operator/pkg/reconciliation/reconcile_reaper.go index a671443cb..ef115f85a 100644 --- a/operator/pkg/reconciliation/reconcile_reaper.go +++ b/operator/pkg/reconciliation/reconcile_reaper.go @@ -151,7 +151,10 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile rc.ReqLogger.Error(err, "error updating the reaper status") return result.Error(err) } - return result.Continue() + // Requeue with a delay to give a chance for C* schema changes to propagate + // + // TODO Should the delay be adjusted based on the C* cluster size? + return result.RequeueSoon(5) } else { return result.RequeueSoon(2) } diff --git a/operator/pkg/reconciliation/reconcile_reaper_test.go b/operator/pkg/reconciliation/reconcile_reaper_test.go index d90688cb4..3a4e22621 100644 --- a/operator/pkg/reconciliation/reconcile_reaper_test.go +++ b/operator/pkg/reconciliation/reconcile_reaper_test.go @@ -82,7 +82,7 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { assert.NoError(t, err) reconcileResult = rc.CheckReaperSchemaInitialized() - assert.False(t, reconcileResult.Completed()) + assert.True(t, reconcileResult.Completed()) } func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { From 3a3c2b7f6f49ae32758ffce083dfb24e61e66141 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Tue, 25 Aug 2020 23:40:07 -0400 Subject: [PATCH 05/11] update integration test to use C* cluster with auth enabled --- .../enable_reaper/enable_reaper_suite_test.go | 2 +- .../testdata/oss-three-node-with-auth-dc.yaml | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 tests/testdata/oss-three-node-with-auth-dc.yaml diff --git a/tests/enable_reaper/enable_reaper_suite_test.go b/tests/enable_reaper/enable_reaper_suite_test.go index 23f2f522b..ff67c410b 100644 --- a/tests/enable_reaper/enable_reaper_suite_test.go +++ b/tests/enable_reaper/enable_reaper_suite_test.go @@ -20,7 +20,7 @@ var ( testName = "Enable Reaper" namespace = "test-enable-reaper" dcName = "dc1" - dcYaml = "../testdata/oss-three-rack-three-node-dc.yaml" + dcYaml = "../testdata/oss-three-node-with-auth-dc.yaml" operatorYaml = "../testdata/operator.yaml" dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName) dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) diff --git a/tests/testdata/oss-three-node-with-auth-dc.yaml b/tests/testdata/oss-three-node-with-auth-dc.yaml new file mode 100644 index 000000000..2ac34f6f6 --- /dev/null +++ b/tests/testdata/oss-three-node-with-auth-dc.yaml @@ -0,0 +1,32 @@ +apiVersion: cassandra.datastax.com/v1beta1 +kind: CassandraDatacenter +metadata: + name: dc1 +spec: + clusterName: cluster1 + serverType: cassandra + serverVersion: "3.11.6" + serverImage: datastax/cassandra-mgmtapi-3_11_6:v0.1.5 + managementApiAuth: + insecure: {} + size: 3 + storageConfig: + cassandraDataVolumeClaimSpec: + storageClassName: server-storage + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + racks: + - name: r1 + - name: r2 + - name: r3 + config: + cassandra-yaml: + authenticator: org.apache.cassandra.auth.PasswordAuthenticator + authorizer: org.apache.cassandra.auth.CassandraAuthorizer + role_manager: org.apache.cassandra.auth.CassandraRoleManager + jvm-options: + initial_heap_size: "800m" + max_heap_size: "800m" From 68d0bc427df9857492633cf8dff985d77790c6a7 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Mon, 31 Aug 2020 18:28:51 -0400 Subject: [PATCH 06/11] remove auth check and check for schema agreement after reaper schema job finishes --- .../v1beta1/cassandradatacenter_types.go | 17 +--- .../v1beta1/cassandradatacenter_types_test.go | 56 -------------- operator/pkg/httphelper/client.go | 1 + operator/pkg/httphelper/client_test.go | 1 + operator/pkg/reconciliation/constructor.go | 45 +++++------ .../pkg/reconciliation/reconcile_racks.go | 2 +- .../pkg/reconciliation/reconcile_reaper.go | 77 ++++++++++--------- .../reconciliation/reconcile_reaper_test.go | 55 ++++++++++++- 8 files changed, 115 insertions(+), 139 deletions(-) diff --git a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go index 21ca33314..4eaeb4c59 100644 --- a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go +++ b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go @@ -7,15 +7,13 @@ import ( "encoding/json" "fmt" "github.com/Jeffail/gabs" + "github.com/datastax/cass-operator/operator/pkg/serverconfig" + "github.com/datastax/cass-operator/operator/pkg/utils" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "os" - "strings" - - "github.com/datastax/cass-operator/operator/pkg/serverconfig" - "github.com/datastax/cass-operator/operator/pkg/utils" ) const ( @@ -598,17 +596,6 @@ func (dc *CassandraDatacenter) DeployReaper() bool { return dc.IsReaperEnabled() && dc.Status.ReaperStatus.SchemaInitialized } -func (dc *CassandraDatacenter) IsAuthenticationEnabled() (bool, error) { - b, err := dc.Spec.Config.MarshalJSON() - if err != nil { - return false, err - } - - s := string(b) - - return strings.Contains(s, "PasswordAuthenticator"), nil -} - // GetConfigAsJSON gets a JSON-encoded string suitable for passing to configBuilder func (dc *CassandraDatacenter) GetConfigAsJSON() (string, error) { diff --git a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go index 637c87c7f..71bffd818 100644 --- a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go +++ b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types_test.go @@ -211,62 +211,6 @@ func Test_GenerateBaseConfigString(t *testing.T) { } } -func TestCassandraDataCenter_IsAuthenticationEnabled(t *testing.T) { - tests := []struct { - name string - dc *CassandraDatacenter - want bool - errString string - }{ - { - name: "auth enabled", - dc: &CassandraDatacenter{ - ObjectMeta: metav1.ObjectMeta{ - Name: "exampleDC", - }, - Spec: CassandraDatacenterSpec{ - ClusterName: "exampleCluster", - Config: []byte("{\"cassandra-yaml\":{\"authenticator\":\"PasswordAuthenticator\"}}"), - }, - }, - want: true, - errString: "", - }, - { - name: "auth disabled", - dc: &CassandraDatacenter{ - ObjectMeta: metav1.ObjectMeta{ - Name: "exampleDC", - }, - Spec: CassandraDatacenterSpec{ - ClusterName: "exampleCluster", - Config: []byte("{\"cassandra-yaml\":{\"authenticator\":\"AllowAuthenticator\"}}"), - }, - }, - want: false, - errString: "", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := tt.dc.IsAuthenticationEnabled() - if got != tt.want { - t.Errorf("IsAuthenticationEnabled() got = %v, want %v", got, tt.want) - } - if err == nil { - if tt.errString != "" { - t.Errorf("IsAuthenticationEnabled() err = %v, want %v", err, tt.errString) - } - } else { - if err.Error() != tt.errString { - t.Errorf("IsAuthenticationEnabled() err = %v, want %v", err, tt.errString) - } - } - }) - } -} - func TestCassandraDatacenter_GetContainerPorts(t *testing.T) { type fields struct { TypeMeta metav1.TypeMeta diff --git a/operator/pkg/httphelper/client.go b/operator/pkg/httphelper/client.go index 61eaada56..5f54f54c7 100644 --- a/operator/pkg/httphelper/client.go +++ b/operator/pkg/httphelper/client.go @@ -52,6 +52,7 @@ type EndpointState struct { IsAlive string `json:"IS_ALIVE"` NativeTransportAddress string `json:"NATIVE_TRANSPORT_ADDRESS"` RpcAddress string `json:"RPC_ADDRESS"` + Schema string `json:"SCHEMA"` } func (x *EndpointState) GetRpcAddress() string { diff --git a/operator/pkg/httphelper/client_test.go b/operator/pkg/httphelper/client_test.go index e5ede0083..23d3b12f0 100644 --- a/operator/pkg/httphelper/client_test.go +++ b/operator/pkg/httphelper/client_test.go @@ -101,4 +101,5 @@ func Test_parseMetadataEndpointsResponseBody(t *testing.T) { assert.Equal(t, 2, len(endpoints.Entity)) assert.Equal(t, "10.233.90.45", endpoints.Entity[0].RpcAddress) assert.Equal(t, "95c157dc-2811-446a-a541-9faaab2e6930", endpoints.Entity[0].HostID) + assert.Equal(t, "e84b6a60-24cf-30ca-9b58-452d92911703", endpoints.Entity[0].Schema) } diff --git a/operator/pkg/reconciliation/constructor.go b/operator/pkg/reconciliation/constructor.go index 015206d6b..a3196c154 100644 --- a/operator/pkg/reconciliation/constructor.go +++ b/operator/pkg/reconciliation/constructor.go @@ -677,36 +677,29 @@ func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { }, } - authEnabled, err := dc.IsAuthenticationEnabled() - if err != nil { - return nil, err - } - - if authEnabled { - secretName := dc.GetReaperUserSecretNamespacedName() - envVars = append(envVars, corev1.EnvVar{ - Name: "USERNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: secretName.Name, - }, - Key: "username", + secretName := dc.GetReaperUserSecretNamespacedName() + envVars = append(envVars, corev1.EnvVar{ + Name: "USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, }, + Key: "username", }, - }) - envVars = append(envVars, corev1.EnvVar{ - Name: "PASSWORD", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: secretName.Name, - }, - Key: "password", + }, + }) + envVars = append(envVars, corev1.EnvVar{ + Name: "PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, }, + Key: "password", }, - }) - } + }, + }) return &v1.Job{ TypeMeta: metav1.TypeMeta{ diff --git a/operator/pkg/reconciliation/reconcile_racks.go b/operator/pkg/reconciliation/reconcile_racks.go index 28b0695ac..0d3f4a949 100644 --- a/operator/pkg/reconciliation/reconcile_racks.go +++ b/operator/pkg/reconciliation/reconcile_racks.go @@ -2148,7 +2148,7 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } - if recResult := rc.CheckReaperSchemaInitialized(); recResult.Completed() { + if recResult := rc.CheckReaperSchemaInitialized(endpointData); recResult.Completed() { return recResult.Output() } diff --git a/operator/pkg/reconciliation/reconcile_reaper.go b/operator/pkg/reconciliation/reconcile_reaper.go index ef115f85a..217ec993f 100644 --- a/operator/pkg/reconciliation/reconcile_reaper.go +++ b/operator/pkg/reconciliation/reconcile_reaper.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/datastax/cass-operator/operator/internal/result" api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1" + "github.com/datastax/cass-operator/operator/pkg/httphelper" v1batch "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -50,37 +51,30 @@ func buildReaperContainer(dc *api.CassandraDatacenter) (*corev1.Container, error {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ""}, } - cassandraAuthEnabled, err := dc.IsAuthenticationEnabled() - if err != nil { - return nil, err - } - - if cassandraAuthEnabled { - secretName := dc.GetReaperUserSecretNamespacedName() - envVars = append(envVars, corev1.EnvVar{Name: "REAPER_CASS_AUTH_ENABLED", Value: "true"}) - envVars = append(envVars, corev1.EnvVar{ - Name: "REAPER_CASS_AUTH_USERNAME", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: secretName.Name, - }, - Key: "username", + secretName := dc.GetReaperUserSecretNamespacedName() + envVars = append(envVars, corev1.EnvVar{Name: "REAPER_CASS_AUTH_ENABLED", Value: "true"}) + envVars = append(envVars, corev1.EnvVar{ + Name: "REAPER_CASS_AUTH_USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, }, + Key: "username", }, - }) - envVars = append(envVars, corev1.EnvVar{ - Name: "REAPER_CASS_AUTH_PASSWORD", - ValueFrom: &corev1.EnvVarSource{ - SecretKeyRef: &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: secretName.Name, - }, - Key: "password", + }, + }) + envVars = append(envVars, corev1.EnvVar{ + Name: "REAPER_CASS_AUTH_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, }, + Key: "password", }, - }) - } + }, + }) container := corev1.Container{ Name: ReaperContainerName, @@ -110,7 +104,7 @@ func getReaperPullPolicy(dc *api.CassandraDatacenter) corev1.PullPolicy { return dc.Spec.Reaper.ImagePullPolicy } -func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.ReconcileResult { +func (rc *ReconciliationContext) CheckReaperSchemaInitialized(endpoints httphelper.CassMetadataEndpoints) result.ReconcileResult { // Using a job eventually get replaced with calls to the mgmt api once it has support for // creating keyspaces and tables. @@ -145,16 +139,17 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile } else if err != nil { return result.Error(err) } else if jobFinished(schemaJob) { - patch := client.MergeFrom(rc.Datacenter.DeepCopy()) - rc.Datacenter.Status.ReaperStatus.SchemaInitialized = true - if err = rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { - rc.ReqLogger.Error(err, "error updating the reaper status") - return result.Error(err) + if checkSchemaAgreement(endpoints) { + patch := client.MergeFrom(rc.Datacenter.DeepCopy()) + rc.Datacenter.Status.ReaperStatus.SchemaInitialized = true + if err = rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { + rc.ReqLogger.Error(err, "error updating the reaper status") + return result.Error(err) + } + return result.Continue() + } else { + return result.RequeueSoon(5) } - // Requeue with a delay to give a chance for C* schema changes to propagate - // - // TODO Should the delay be adjusted based on the C* cluster size? - return result.RequeueSoon(5) } else { return result.RequeueSoon(2) } @@ -240,4 +235,12 @@ func newReaperService(dc *api.CassandraDatacenter) *corev1.Service { Selector: dc.GetDatacenterLabels(), }, } +} + +func checkSchemaAgreement(endpoints httphelper.CassMetadataEndpoints) bool { + schemaVersions := make(map[string]bool) + for _, state := range endpoints.Entity { + schemaVersions[state.Schema] = true + } + return len(schemaVersions) == 1 } \ No newline at end of file diff --git a/operator/pkg/reconciliation/reconcile_reaper_test.go b/operator/pkg/reconciliation/reconcile_reaper_test.go index 3a4e22621..3da1f10b7 100644 --- a/operator/pkg/reconciliation/reconcile_reaper_test.go +++ b/operator/pkg/reconciliation/reconcile_reaper_test.go @@ -4,6 +4,7 @@ import ( "testing" api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1" + "github.com/datastax/cass-operator/operator/pkg/httphelper" "github.com/stretchr/testify/assert" v1batch "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -28,11 +29,35 @@ func TestReconcileReaper_buildInitReaperSchemaJob(t *testing.T) { assert.Equal(t, ReaperSchemaInitJobImage, container.Image) + secretName := dc.GetReaperUserSecretNamespacedName() + expectedEnvVars := []corev1.EnvVar{ {Name: "KEYSPACE", Value: ReaperKeyspace}, {Name: "CONTACT_POINTS", Value: dc.GetSeedServiceName()}, {Name: "REPLICATION", Value: "{'class': 'NetworkTopologyStrategy', 'ReaperSchemaJobTest': 3}"}, } + expectedEnvVars = append(expectedEnvVars, corev1.EnvVar{ + Name: "USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "username", + }, + }, + }) + expectedEnvVars = append(expectedEnvVars, corev1.EnvVar{ + Name: "PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "password", + }, + }, + }) assert.ElementsMatch(t, expectedEnvVars, container.Env) } @@ -59,7 +84,18 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { rc.Client = fake.NewFakeClient(trackObjects...) - reconcileResult := rc.CheckReaperSchemaInitialized() + endpoints := httphelper.CassMetadataEndpoints{ + Entity: []httphelper.EndpointState{ + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + }, + } + + reconcileResult := rc.CheckReaperSchemaInitialized(endpoints) assert.True(t, reconcileResult.Completed()) result, err := reconcileResult.Output() @@ -81,8 +117,8 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { err = rc.Client.Status().Update(rc.Ctx, job) assert.NoError(t, err) - reconcileResult = rc.CheckReaperSchemaInitialized() - assert.True(t, reconcileResult.Completed()) + reconcileResult = rc.CheckReaperSchemaInitialized(endpoints) + assert.False(t, reconcileResult.Completed()) } func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { @@ -93,7 +129,18 @@ func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { rc.Client = fake.NewFakeClient(trackObjects...) - reconcileResult := rc.CheckReaperSchemaInitialized() + endpoints := httphelper.CassMetadataEndpoints{ + Entity: []httphelper.EndpointState{ + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + }, + } + + reconcileResult := rc.CheckReaperSchemaInitialized(endpoints) assert.False(t, reconcileResult.Completed()) job := &v1batch.Job{} From 49c13a718d47df406f51a5ce5d30684b3bf35e09 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Sat, 12 Sep 2020 23:01:16 -0400 Subject: [PATCH 07/11] and helper function and update verification code --- mage/ginkgo/lib.go | 8 ++++++++ tests/enable_reaper/enable_reaper_suite_test.go | 10 +++------- tests/testdata/oss-three-node-with-auth-dc.yaml | 4 ++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/mage/ginkgo/lib.go b/mage/ginkgo/lib.go index 663155cd8..72a130fff 100644 --- a/mage/ginkgo/lib.go +++ b/mage/ginkgo/lib.go @@ -239,6 +239,14 @@ func (ns *NsWrapper) WaitForDatacenterOperatorProgress(dcName string, progressVa ns.WaitForOutputAndLog(step, k, progressValue, timeout) } +func (ns *NsWrapper) WaitForReaperSchemaInitialized(dcName string, timeout int) { + step := fmt.Sprintf("checking the reaper status schemaInitialized is set to true") + json := "jsonpath={.status.reaperStatus.schemaInitialized}" + k := kubectl.Get("CassandraDatacenter", dcName). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "true", timeout) +} + func (ns *NsWrapper) WaitForSuperUserUpserted(dcName string, timeout int) { json := "jsonpath={.status.superUserUpserted}" k := kubectl.Get("CassandraDatacenter", dcName). diff --git a/tests/enable_reaper/enable_reaper_suite_test.go b/tests/enable_reaper/enable_reaper_suite_test.go index ff67c410b..5caf3cd18 100644 --- a/tests/enable_reaper/enable_reaper_suite_test.go +++ b/tests/enable_reaper/enable_reaper_suite_test.go @@ -61,9 +61,9 @@ var _ = Describe(testName, func() { nodeStatusesHostIds := ns.GetNodeStatusesHostIds(dcName) Expect(len(nodeStatusesHostIds), 3) - ns.WaitForDatacenterReady(dcName) ns.WaitForDatacenterCondition(dcName, "Ready", string(corev1.ConditionTrue)) ns.WaitForDatacenterCondition(dcName, "Initialized", string(corev1.ConditionTrue)) + ns.WaitForDatacenterReady(dcName) step = "enable Reaper" json := `{"spec": {"reaper": {"enabled": true}}}` @@ -71,13 +71,9 @@ var _ = Describe(testName, func() { ns.ExecAndLog(step, k) ns.WaitForDatacenterOperatorProgress(dcName, "Updating", 30) - ns.WaitForDatacenterOperatorProgress(dcName, "Ready", 600) - step = "check that Reaper container is deployed" - json = `jsonpath={.items[*].spec.containers[?(@.name=="reaper")].name}` - k = kubectl.Get("pods"). - FormatOutput(json) - ns.WaitForOutputAndLog(step, k, "reaper reaper reaper", 20) + ns.WaitForReaperSchemaInitialized(dcName, 900) + ns.WaitForDatacenterOperatorProgress(dcName, "Ready", 120) step = "disable Reaper" json = `{"spec": {"reaper": {"enabled": false}}}` diff --git a/tests/testdata/oss-three-node-with-auth-dc.yaml b/tests/testdata/oss-three-node-with-auth-dc.yaml index 2ac34f6f6..a4daa7362 100644 --- a/tests/testdata/oss-three-node-with-auth-dc.yaml +++ b/tests/testdata/oss-three-node-with-auth-dc.yaml @@ -5,8 +5,8 @@ metadata: spec: clusterName: cluster1 serverType: cassandra - serverVersion: "3.11.6" - serverImage: datastax/cassandra-mgmtapi-3_11_6:v0.1.5 + serverVersion: "3.11.7" + serverImage: datastax/cassandra-mgmtapi-3_11_7:v0.1.12 managementApiAuth: insecure: {} size: 3 From 633e26b59df8720989c92b200fbef7292b597530 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Sat, 12 Sep 2020 23:12:38 -0400 Subject: [PATCH 08/11] update replication_factor of system_auth keyspace This commit adds a function for altering a keyspace which I needed to configure the RF of system_auth. When authentication is enabled, system_auth should never have RF = 1. Because the amount of data is very small, increase the RF even when authentication is disabled is not a big deal. In my testing for reaper sidecar, I found that the RF of system_auth has to be changed when authentication is enabled; otherwise, there will be lots of flapping of containers as only one C* node will have the credentials and it might not the node that is up. --- operator/pkg/httphelper/client.go | 46 +++++++++++++++++++ .../pkg/reconciliation/reconcile_racks.go | 35 ++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/operator/pkg/httphelper/client.go b/operator/pkg/httphelper/client.go index 5f54f54c7..179890d11 100644 --- a/operator/pkg/httphelper/client.go +++ b/operator/pkg/httphelper/client.go @@ -237,6 +237,52 @@ func (client *NodeMgmtClient) CallKeyspaceCleanupEndpoint(pod *corev1.Pod, jobs return err } +type ReplicationSetting struct { + Datacenter string + ReplicationFactor int +} + +func (client *NodeMgmtClient) CallAlterKeyspaceEndpoint(pod *corev1.Pod, keyspace string, replicationSettings[]ReplicationSetting) error { + client.Log.Info( + "calling Management API alter keyspace - POST /api/v0/ops/keyspace/alter", + "pod", pod.Name, + ) + postData := make(map[string]interface{}) + + postData["keyspace_name"] = keyspace + replication := make([]map[string]interface{}, 0) + + for _, r := range replicationSettings { + dcReplication := make(map[string]interface{}) + dcReplication["dc_name"] = r.Datacenter + dcReplication["replication_factor"] = r.ReplicationFactor + + replication = append(replication, dcReplication) + } + postData["replication_settings"] = replication + + body, err := json.Marshal(postData) + if err != nil { + return err + } + + podHost, err := BuildPodHostFromPod(pod) + if err != nil { + return err + } + + request := nodeMgmtRequest{ + endpoint: "/api/v0/ops/keyspace/alter", + host: podHost, + method: http.MethodPost, + timeout: time.Second * 20, + body: body, + } + + _, err = callNodeMgmtEndpoint(client, request, "application/json") + return err +} + func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev1.Pod, replaceIp string) error { // talk to the pod via IP because we are dialing up a pod that isn't ready, // so it won't be reachable via the service and pod DNS diff --git a/operator/pkg/reconciliation/reconcile_racks.go b/operator/pkg/reconciliation/reconcile_racks.go index 0d3f4a949..91fb4a9c0 100644 --- a/operator/pkg/reconciliation/reconcile_racks.go +++ b/operator/pkg/reconciliation/reconcile_racks.go @@ -5,6 +5,7 @@ package reconciliation import ( "fmt" + "math" "reflect" "sort" "strings" @@ -844,6 +845,36 @@ func (rc *ReconciliationContext) CreateUsers() result.ReconcileResult { return result.Continue() } +func (rc *ReconciliationContext) ConfigureSystemAuthReplication() result.ReconcileResult { + dc := rc.Datacenter + + if dc.Spec.Stopped { + rc.ReqLogger.Info("cluster is stopped, skipping ConfigureSystemAuthReplication") + return result.Continue() + } + + pod := rc.dcPods[0] + + replication := []httphelper.ReplicationSetting{ + { + Datacenter: dc.Name, + ReplicationFactor: getSystemAuthRF(dc), + }, + } + + err := rc.NodeMgmtClient.CallAlterKeyspaceEndpoint(pod, "system_auth", replication) + + if err != nil { + return result.Error(err) + } + + return result.Continue() +} + +func getSystemAuthRF(dc *api.CassandraDatacenter) int { + return int(math.Min(float64(3), float64(dc.Spec.Size))) +} + func findHostIdForIpFromEndpointsData(endpointsData []httphelper.EndpointState, ip string) string { for _, data := range endpointsData { if data.GetRpcAddress() == ip { @@ -2144,6 +2175,10 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } + if recResult := rc.ConfigureSystemAuthReplication(); recResult.Completed() { + return recResult.Output() + } + if recResult := rc.CreateUsers(); recResult.Completed() { return recResult.Output() } From 3906a140af26381aaba92267d27cf2fb0ed4fa4b Mon Sep 17 00:00:00 2001 From: John Sanda Date: Sat, 12 Sep 2020 23:22:02 -0400 Subject: [PATCH 09/11] deploy a second job to apply reaper schema changes Reaper coordinates schema changes with distributed locks implemented in a C* table. This is done in order to avoid concurrent schema updates which is problematic in C*. The locks table uses IP addresses. When (not if) the reaper sidecar container liveness and/or readiness probe fails and the container restarts, it will come back up with a differnet IP address. Reaper will then have to wait to reacquire the lock which can lead to additional liveness and/or readiness probe failures. By applying schema updates with a single instance via a job, these problems go away. Lastly, based on this as well as on previous experience, I firmly believe that C* schema changes in a k8s deployment should always be done in a k8s job to avoid these types of problems. --- operator/pkg/reconciliation/constructor.go | 44 ++++++++++-- .../pkg/reconciliation/constructor_test.go | 2 +- operator/pkg/reconciliation/defaults.go | 2 +- .../pkg/reconciliation/reconcile_reaper.go | 72 ++++++++++++++++--- .../reconciliation/reconcile_reaper_test.go | 2 +- 5 files changed, 104 insertions(+), 18 deletions(-) diff --git a/operator/pkg/reconciliation/constructor.go b/operator/pkg/reconciliation/constructor.go index a3196c154..50ff09af1 100644 --- a/operator/pkg/reconciliation/constructor.go +++ b/operator/pkg/reconciliation/constructor.go @@ -554,8 +554,9 @@ func buildContainers(dc *api.CassandraDatacenter, serverVolumeMounts []corev1.Vo loggerContainer.Resources = *getResourcesOrDefault(&dc.Spec.SystemLoggerResources, &DefaultsLoggerContainer) containers := []corev1.Container{cassContainer, loggerContainer} - if dc.DeployReaper() { - reaperContainer, err := buildReaperContainer(dc) + + if dc.IsReaperEnabled() { + reaperContainer, err := buildReaperContainer(dc, false) if err != nil { return containers, err } @@ -661,6 +662,42 @@ func buildPodTemplateSpec(dc *api.CassandraDatacenter, zone string, rackName str return baseTemplate, nil } +func buildReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { + container, err := buildReaperContainer(dc, true) + if err != nil { + return nil, err + } + + container.Env = append(container.Env, corev1.EnvVar{ + Name: "SCHEMA_ONLY", + Value: "true", + }) + container.Image = ReaperDefaultImage + container.ImagePullPolicy = corev1.PullIfNotPresent + + jobName := fmt.Sprintf("%s-reaper-schema", dc.Spec.ClusterName) + + return &v1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: dc.Namespace, + Name: jobName, + Labels: dc.GetDatacenterLabels(), + }, + Spec: v1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{*container}, + }, + }, + }, + }, nil +} + func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { envVars := []corev1.EnvVar{ { @@ -719,8 +756,7 @@ func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { { Name: getReaperSchemaInitJobName(dc), Image: ReaperSchemaInitJobImage, - //ImagePullPolicy: corev1.PullIfNotPresent, - ImagePullPolicy: corev1.PullAlways, + ImagePullPolicy: corev1.PullIfNotPresent, Env: envVars, }, }, diff --git a/operator/pkg/reconciliation/constructor_test.go b/operator/pkg/reconciliation/constructor_test.go index 02d2775bb..b9c5b9334 100644 --- a/operator/pkg/reconciliation/constructor_test.go +++ b/operator/pkg/reconciliation/constructor_test.go @@ -271,7 +271,7 @@ func TestCassandraDatacenter_buildContainers_no_reaper(t *testing.T) { ServerType: "cassandra", ServerVersion: "3.11.6", Reaper: &api.ReaperConfig{ - Enabled: true, + Enabled: false, }, }, } diff --git a/operator/pkg/reconciliation/defaults.go b/operator/pkg/reconciliation/defaults.go index 9a41e06fe..69ff0bd47 100644 --- a/operator/pkg/reconciliation/defaults.go +++ b/operator/pkg/reconciliation/defaults.go @@ -8,5 +8,5 @@ var ( DefaultsConfigInitContainer = buildResourceRequirements(256, 256) // Provides reasonable defaults for the reaper sidecar container. - DefaultsReaperContainer = buildResourceRequirements(2000, 512) + DefaultsReaperContainer = buildResourceRequirements(800, 512) ) diff --git a/operator/pkg/reconciliation/reconcile_reaper.go b/operator/pkg/reconciliation/reconcile_reaper.go index 217ec993f..de7061ade 100644 --- a/operator/pkg/reconciliation/reconcile_reaper.go +++ b/operator/pkg/reconciliation/reconcile_reaper.go @@ -22,7 +22,9 @@ import ( const ( ReaperUIPort = 7080 ReaperAdminPort = 7081 - ReaperDefaultImage = "thelastpickle/cassandra-reaper:2.0.5" + // This is the default image until https://github.com/thelastpickle/cassandra-reaper/pull/957 + // gets merged. + ReaperDefaultImage = "jsanda/cassandra-reaper:k8s-sidecar" ReaperDefaultPullPolicy = corev1.PullIfNotPresent ReaperContainerName = "reaper" ReaperHealthCheckPath = "/healthcheck" @@ -30,9 +32,11 @@ const ( ReaperSchemaInitJob = "ReaperSchemaInitJob" // This code currently lives at https://github.com/jsanda/create_keyspace. ReaperSchemaInitJobImage = "jsanda/reaper-init-keyspace:latest" + ReaperDefaultJmxUsername = "cassandra" + ReaperDefaultJmxPassword = "cassandra" ) -func buildReaperContainer(dc *api.CassandraDatacenter) (*corev1.Container, error) { +func buildReaperContainer(dc *api.CassandraDatacenter, schemaOnly bool) (*corev1.Container, error) { ports := []corev1.ContainerPort{ {Name: "ui", ContainerPort: ReaperUIPort, Protocol: "TCP"}, {Name: "admin", ContainerPort: ReaperAdminPort, Protocol: "TCP"}, @@ -45,10 +49,12 @@ func buildReaperContainer(dc *api.CassandraDatacenter) (*corev1.Container, error {Name: "REAPER_SERVER_APP_PORT", Value: strconv.Itoa(ReaperUIPort)}, {Name: "REAPER_SERVER_ADMIN_PORT", Value: strconv.Itoa(ReaperAdminPort)}, {Name: "REAPER_CASS_CLUSTER_NAME", Value: dc.ClusterName}, - {Name: "REAPER_CASS_CONTACT_POINTS", Value: fmt.Sprintf("[%s]", dc.GetSeedServiceName())}, + {Name: "REAPER_CASS_CONTACT_POINTS", Value: fmt.Sprintf("[%s]", dc.GetDatacenterServiceName())}, {Name: "REAPER_AUTH_ENABLED", Value: "false"}, - {Name: "REAPER_JMX_AUTH_USERNAME", Value: ""}, - {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ""}, + {Name: "REAPER_JMX_AUTH_USERNAME", Value: ReaperDefaultJmxUsername}, + {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ReaperDefaultJmxPassword}, + {Name: "REAPER_LOCAL_CASS_CLUSTER", Value: dc.ClusterName}, + {Name: "IS_K8S", Value: "true"}, } secretName := dc.GetReaperUserSecretNamespacedName() @@ -76,15 +82,22 @@ func buildReaperContainer(dc *api.CassandraDatacenter) (*corev1.Container, error }, }) + var resources corev1.ResourceRequirements + if schemaOnly { + resources = buildResourceRequirements(500, 384) + } else { + resources = *getResourcesOrDefault(&dc.Spec.Reaper.Resources, &DefaultsReaperContainer) + } + container := corev1.Container{ Name: ReaperContainerName, Image: getReaperImage(dc), ImagePullPolicy: getReaperPullPolicy(dc), Ports: ports, - LivenessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, int(60 * dc.Spec.Size), 10), - ReadinessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 30, 15), + LivenessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 15, 20), + ReadinessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 15, 20), Env: envVars, - Resources: *getResourcesOrDefault(&dc.Spec.Reaper.Resources, &DefaultsReaperContainer), + Resources: resources, } return &container, nil @@ -110,7 +123,7 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized(endpoints httphelp rc.ReqLogger.Info("reconcile_reaper::CheckReaperSchemaInitialized") - if !rc.Datacenter.IsReaperEnabled() { + if !rc.Datacenter.IsReaperEnabled() || rc.Datacenter.Status.ReaperStatus.SchemaInitialized { return result.Continue() } @@ -139,6 +152,42 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized(endpoints httphelp } else if err != nil { return result.Error(err) } else if jobFinished(schemaJob) { + if checkSchemaAgreement(endpoints) { + return rc.checkReaperSchemaJob(endpoints) + } else { + rc.ReqLogger.Info("no schema agreement yet") + return result.RequeueSoon(5) + } + } else { + return result.RequeueSoon(2) + } +} + +func (rc *ReconciliationContext) checkReaperSchemaJob(endpoints httphelper.CassMetadataEndpoints) result.ReconcileResult { + jobName := fmt.Sprintf("%s-reaper-schema", rc.Datacenter.Spec.ClusterName) + job := &v1batch.Job{} + + err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: rc.Datacenter.Namespace, Name: jobName}, job) + if err != nil && errors.IsNotFound(err) { + job, err := buildReaperSchemaJob(rc.Datacenter) + if err != nil { + rc.ReqLogger.Error(err, "failed create Reaper schema job") + return result.Error(err) + } + rc.ReqLogger.Info("creating Reaper schema job", "JobName", job.Name) + if err := setControllerReference(rc.Datacenter, job, rc.Scheme); err != nil { + rc.ReqLogger.Error(err, "failed to set owner reference", "ReaperSchemaJob", job.Name) + return result.Error(err) + } + if err := rc.Client.Create(rc.Ctx, job); err != nil { + rc.ReqLogger.Error(err, "failed to create job", "ReaperSchemaJob", job.Name) + return result.Error(err) + } else { + return result.RequeueSoon(2) + } + } else if err != nil { + return result.Error(err) + } else if jobFinished(job) { if checkSchemaAgreement(endpoints) { patch := client.MergeFrom(rc.Datacenter.DeepCopy()) rc.Datacenter.Status.ReaperStatus.SchemaInitialized = true @@ -146,12 +195,13 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized(endpoints httphelp rc.ReqLogger.Error(err, "error updating the reaper status") return result.Error(err) } - return result.Continue() + rc.ReqLogger.Info("schema agreement") + return result.RequeueSoon(0) } else { return result.RequeueSoon(5) } } else { - return result.RequeueSoon(2) + return result.RequeueSoon(5) } } diff --git a/operator/pkg/reconciliation/reconcile_reaper_test.go b/operator/pkg/reconciliation/reconcile_reaper_test.go index 3da1f10b7..99a7ae236 100644 --- a/operator/pkg/reconciliation/reconcile_reaper_test.go +++ b/operator/pkg/reconciliation/reconcile_reaper_test.go @@ -118,7 +118,7 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { assert.NoError(t, err) reconcileResult = rc.CheckReaperSchemaInitialized(endpoints) - assert.False(t, reconcileResult.Completed()) + assert.True(t, reconcileResult.Completed()) } func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { From c2e10e7d62768e4895f7cc5be746a7dfb85b3e18 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Sat, 12 Sep 2020 23:39:22 -0400 Subject: [PATCH 10/11] clean up from debugging --- operator/pkg/reconciliation/secrets.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/operator/pkg/reconciliation/secrets.go b/operator/pkg/reconciliation/secrets.go index fd2cc461b..600809e25 100644 --- a/operator/pkg/reconciliation/secrets.go +++ b/operator/pkg/reconciliation/secrets.go @@ -50,12 +50,6 @@ func generateUtf8Password() (string, error) { } func buildDefaultSuperuserSecret(dc *api.CassandraDatacenter) (*corev1.Secret, error) { - //if dc.ShouldGenerateSuperuserSecret() { - // return buildUserSecret(dc.GetSuperuserSecretNamespacedName(), dc.Spec.ClusterName + "-superuser") - // - //} - // - //return nil, nil var secret *corev1.Secret = nil if dc.ShouldGenerateSuperuserSecret() { From ed3fa2aaa3c9a6a0c9bd6692c87959c8aa0db077 Mon Sep 17 00:00:00 2001 From: John Sanda Date: Tue, 15 Sep 2020 17:34:52 -0400 Subject: [PATCH 11/11] fix formatting --- .../pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go | 3 ++- operator/pkg/httphelper/client.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go index 4eaeb4c59..eb45645ac 100644 --- a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go +++ b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go @@ -6,6 +6,8 @@ package v1beta1 import ( "encoding/json" "fmt" + "os" + "github.com/Jeffail/gabs" "github.com/datastax/cass-operator/operator/pkg/serverconfig" "github.com/datastax/cass-operator/operator/pkg/utils" @@ -13,7 +15,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "os" ) const ( diff --git a/operator/pkg/httphelper/client.go b/operator/pkg/httphelper/client.go index 179890d11..988549222 100644 --- a/operator/pkg/httphelper/client.go +++ b/operator/pkg/httphelper/client.go @@ -242,7 +242,7 @@ type ReplicationSetting struct { ReplicationFactor int } -func (client *NodeMgmtClient) CallAlterKeyspaceEndpoint(pod *corev1.Pod, keyspace string, replicationSettings[]ReplicationSetting) error { +func (client *NodeMgmtClient) CallAlterKeyspaceEndpoint(pod *corev1.Pod, keyspace string, replicationSettings []ReplicationSetting) error { client.Log.Info( "calling Management API alter keyspace - POST /api/v0/ops/keyspace/alter", "pod", pod.Name,