diff --git a/charts/cass-operator-chart/templates/customresourcedefinition.yaml b/charts/cass-operator-chart/templates/customresourcedefinition.yaml index f14e4cb86..2147454f2 100644 --- a/charts/cass-operator-chart/templates/customresourcedefinition.yaml +++ b/charts/cass-operator-chart/templates/customresourcedefinition.yaml @@ -6280,6 +6280,11 @@ spec: quietPeriod: format: date-time type: string + reaperStatus: + properties: + schemaInitialized: + type: boolean + type: object superUserUpserted: description: Deprecated. Use usersUpserted instead. The timestamp at which CQL superuser credentials were last upserted to the management diff --git a/mage/ginkgo/lib.go b/mage/ginkgo/lib.go index 663155cd8..72a130fff 100644 --- a/mage/ginkgo/lib.go +++ b/mage/ginkgo/lib.go @@ -239,6 +239,14 @@ func (ns *NsWrapper) WaitForDatacenterOperatorProgress(dcName string, progressVa ns.WaitForOutputAndLog(step, k, progressValue, timeout) } +func (ns *NsWrapper) WaitForReaperSchemaInitialized(dcName string, timeout int) { + step := fmt.Sprintf("checking the reaper status schemaInitialized is set to true") + json := "jsonpath={.status.reaperStatus.schemaInitialized}" + k := kubectl.Get("CassandraDatacenter", dcName). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "true", timeout) +} + func (ns *NsWrapper) WaitForSuperUserUpserted(dcName string, timeout int) { json := "jsonpath={.status.superUserUpserted}" k := kubectl.Get("CassandraDatacenter", dcName). diff --git a/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml b/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml index fd6c24ae0..a3149e56f 100644 --- a/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml +++ b/operator/deploy/crds/cassandra.datastax.com_cassandradatacenters_crd.yaml @@ -6270,6 +6270,11 @@ spec: quietPeriod: format: date-time type: string + reaperStatus: + properties: + schemaInitialized: + type: boolean + type: object superUserUpserted: description: Deprecated. Use usersUpserted instead. The timestamp at which CQL superuser credentials were last upserted to the management diff --git a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go index 70933e432..eb45645ac 100644 --- a/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go +++ b/operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go @@ -9,13 +9,12 @@ import ( "os" "github.com/Jeffail/gabs" + "github.com/datastax/cass-operator/operator/pkg/serverconfig" + "github.com/datastax/cass-operator/operator/pkg/utils" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - - "github.com/datastax/cass-operator/operator/pkg/serverconfig" - "github.com/datastax/cass-operator/operator/pkg/utils" ) const ( @@ -337,6 +336,10 @@ func NewDatacenterCondition(conditionType DatacenterConditionType, status corev1 } } +type ReaperStatus struct { + SchemaInitialized bool `json:"schemaInitialized,omitempty"` +} + // CassandraDatacenterStatus defines the observed state of CassandraDatacenter // +k8s:openapi-gen=true type CassandraDatacenterStatus struct { @@ -376,6 +379,8 @@ type CassandraDatacenterStatus struct { // +optional ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + ReaperStatus ReaperStatus `json:"reaperStatus,omitempty"` } // +genclient @@ -577,6 +582,21 @@ func (dc *CassandraDatacenter) GetSuperuserSecretNamespacedName() types.Namespac } } +func (dc *CassandraDatacenter) GetReaperUserSecretNamespacedName() types.NamespacedName { + name := dc.Spec.ClusterName + "-reaper" + namespace := dc.Namespace + + return types.NamespacedName{Name: name, Namespace: namespace} +} + +func (dc *CassandraDatacenter) IsReaperEnabled() bool { + return dc.Spec.Reaper != nil && dc.Spec.Reaper.Enabled && dc.Spec.ServerType == "cassandra" +} + +func (dc *CassandraDatacenter) DeployReaper() bool { + return dc.IsReaperEnabled() && dc.Status.ReaperStatus.SchemaInitialized +} + // GetConfigAsJSON gets a JSON-encoded string suitable for passing to configBuilder func (dc *CassandraDatacenter) GetConfigAsJSON() (string, error) { diff --git a/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go b/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go index b986d628a..b89eaeb55 100644 --- a/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go +++ b/operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go @@ -177,6 +177,7 @@ func (in *CassandraDatacenterStatus) DeepCopyInto(out *CassandraDatacenterStatus copy(*out, *in) } in.QuietPeriod.DeepCopyInto(&out.QuietPeriod) + out.ReaperStatus = in.ReaperStatus return } @@ -405,6 +406,22 @@ func (in *ReaperConfig) DeepCopy() *ReaperConfig { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReaperStatus) DeepCopyInto(out *ReaperStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReaperStatus. +func (in *ReaperStatus) DeepCopy() *ReaperStatus { + if in == nil { + return nil + } + out := new(ReaperStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *StorageConfig) DeepCopyInto(out *StorageConfig) { *out = *in diff --git a/operator/pkg/httphelper/client.go b/operator/pkg/httphelper/client.go index 61eaada56..988549222 100644 --- a/operator/pkg/httphelper/client.go +++ b/operator/pkg/httphelper/client.go @@ -52,6 +52,7 @@ type EndpointState struct { IsAlive string `json:"IS_ALIVE"` NativeTransportAddress string `json:"NATIVE_TRANSPORT_ADDRESS"` RpcAddress string `json:"RPC_ADDRESS"` + Schema string `json:"SCHEMA"` } func (x *EndpointState) GetRpcAddress() string { @@ -236,6 +237,52 @@ func (client *NodeMgmtClient) CallKeyspaceCleanupEndpoint(pod *corev1.Pod, jobs return err } +type ReplicationSetting struct { + Datacenter string + ReplicationFactor int +} + +func (client *NodeMgmtClient) CallAlterKeyspaceEndpoint(pod *corev1.Pod, keyspace string, replicationSettings []ReplicationSetting) error { + client.Log.Info( + "calling Management API alter keyspace - POST /api/v0/ops/keyspace/alter", + "pod", pod.Name, + ) + postData := make(map[string]interface{}) + + postData["keyspace_name"] = keyspace + replication := make([]map[string]interface{}, 0) + + for _, r := range replicationSettings { + dcReplication := make(map[string]interface{}) + dcReplication["dc_name"] = r.Datacenter + dcReplication["replication_factor"] = r.ReplicationFactor + + replication = append(replication, dcReplication) + } + postData["replication_settings"] = replication + + body, err := json.Marshal(postData) + if err != nil { + return err + } + + podHost, err := BuildPodHostFromPod(pod) + if err != nil { + return err + } + + request := nodeMgmtRequest{ + endpoint: "/api/v0/ops/keyspace/alter", + host: podHost, + method: http.MethodPost, + timeout: time.Second * 20, + body: body, + } + + _, err = callNodeMgmtEndpoint(client, request, "application/json") + return err +} + func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev1.Pod, replaceIp string) error { // talk to the pod via IP because we are dialing up a pod that isn't ready, // so it won't be reachable via the service and pod DNS diff --git a/operator/pkg/httphelper/client_test.go b/operator/pkg/httphelper/client_test.go index e5ede0083..23d3b12f0 100644 --- a/operator/pkg/httphelper/client_test.go +++ b/operator/pkg/httphelper/client_test.go @@ -101,4 +101,5 @@ func Test_parseMetadataEndpointsResponseBody(t *testing.T) { assert.Equal(t, 2, len(endpoints.Entity)) assert.Equal(t, "10.233.90.45", endpoints.Entity[0].RpcAddress) assert.Equal(t, "95c157dc-2811-446a-a541-9faaab2e6930", endpoints.Entity[0].HostID) + assert.Equal(t, "e84b6a60-24cf-30ca-9b58-452d92911703", endpoints.Entity[0].Schema) } diff --git a/operator/pkg/reconciliation/constructor.go b/operator/pkg/reconciliation/constructor.go index 40084dc8f..50ff09af1 100644 --- a/operator/pkg/reconciliation/constructor.go +++ b/operator/pkg/reconciliation/constructor.go @@ -554,9 +554,13 @@ func buildContainers(dc *api.CassandraDatacenter, serverVolumeMounts []corev1.Vo loggerContainer.Resources = *getResourcesOrDefault(&dc.Spec.SystemLoggerResources, &DefaultsLoggerContainer) containers := []corev1.Container{cassContainer, loggerContainer} - if dc.Spec.Reaper != nil && dc.Spec.Reaper.Enabled && dc.Spec.ServerType == "cassandra" { - reaperContainer := buildReaperContainer(dc) - containers = append(containers, reaperContainer) + + if dc.IsReaperEnabled() { + reaperContainer, err := buildReaperContainer(dc, false) + if err != nil { + return containers, err + } + containers = append(containers, *reaperContainer) } return containers, nil @@ -658,7 +662,82 @@ func buildPodTemplateSpec(dc *api.CassandraDatacenter, zone string, rackName str return baseTemplate, nil } -func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job { +func buildReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { + container, err := buildReaperContainer(dc, true) + if err != nil { + return nil, err + } + + container.Env = append(container.Env, corev1.EnvVar{ + Name: "SCHEMA_ONLY", + Value: "true", + }) + container.Image = ReaperDefaultImage + container.ImagePullPolicy = corev1.PullIfNotPresent + + jobName := fmt.Sprintf("%s-reaper-schema", dc.Spec.ClusterName) + + return &v1.Job{ + TypeMeta: metav1.TypeMeta{ + Kind: "Job", + APIVersion: "batch/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: dc.Namespace, + Name: jobName, + Labels: dc.GetDatacenterLabels(), + }, + Spec: v1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{*container}, + }, + }, + }, + }, nil +} + +func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) { + envVars := []corev1.EnvVar{ + { + Name: "KEYSPACE", + Value: ReaperKeyspace, + }, + { + Name: "CONTACT_POINTS", + Value: dc.GetSeedServiceName(), + }, + { + Name: "REPLICATION", + Value: getReaperReplication(dc), + }, + } + + secretName := dc.GetReaperUserSecretNamespacedName() + envVars = append(envVars, corev1.EnvVar{ + Name: "USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "username", + }, + }, + }) + envVars = append(envVars, corev1.EnvVar{ + Name: "PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "password", + }, + }, + }) + return &v1.Job{ TypeMeta: metav1.TypeMeta{ Kind: "Job", @@ -678,26 +757,13 @@ func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job { Name: getReaperSchemaInitJobName(dc), Image: ReaperSchemaInitJobImage, ImagePullPolicy: corev1.PullIfNotPresent, - Env: []corev1.EnvVar{ - { - Name: "KEYSPACE", - Value: ReaperKeyspace, - }, - { - Name: "CONTACT_POINTS", - Value: dc.GetSeedServiceName(), - }, - { - Name: "REPLICATION", - Value: getReaperReplication(dc), - }, - }, + Env: envVars, }, }, }, }, }, - } + }, nil } func addVolumes(dc *api.CassandraDatacenter, baseTemplate *corev1.PodTemplateSpec) []corev1.Volume { diff --git a/operator/pkg/reconciliation/constructor_test.go b/operator/pkg/reconciliation/constructor_test.go index 11d35ff06..b9c5b9334 100644 --- a/operator/pkg/reconciliation/constructor_test.go +++ b/operator/pkg/reconciliation/constructor_test.go @@ -221,6 +221,11 @@ func TestCassandraDatacenter_buildContainers_reaper_resources(t *testing.T) { }, }, }, + Status: api.CassandraDatacenterStatus{ + ReaperStatus: api.ReaperStatus{ + SchemaInitialized: true, + }, + }, } containers, err := buildContainers(dc, []corev1.VolumeMount{}) @@ -242,6 +247,11 @@ func TestCassandraDatacenter_buildContainers_reaper_resources_set_when_not_speci Enabled: true, }, }, + Status: api.CassandraDatacenterStatus{ + ReaperStatus: api.ReaperStatus{ + SchemaInitialized: true, + }, + }, } containers, err := buildContainers(dc, []corev1.VolumeMount{}) @@ -254,6 +264,28 @@ func TestCassandraDatacenter_buildContainers_reaper_resources_set_when_not_speci } } +func TestCassandraDatacenter_buildContainers_no_reaper(t *testing.T) { + dc := &api.CassandraDatacenter{ + Spec: api.CassandraDatacenterSpec{ + ClusterName: "bob", + ServerType: "cassandra", + ServerVersion: "3.11.6", + Reaper: &api.ReaperConfig{ + Enabled: false, + }, + }, + } + + containers, err := buildContainers(dc, []corev1.VolumeMount{}) + assert.NotNil(t, containers, "Unexpected containers containers received") + assert.Nil(t, err, "Unexpected error encountered") + + assert.Len(t, containers, 2, "Unexpected number of containers containers returned") + for _, container := range containers { + assert.NotEqual(t, container.Name, ReaperContainerName) + } +} + func TestCassandraDatacenter_buildPodTemplateSpec_containers_merge(t *testing.T) { testContainer := corev1.Container{} testContainer.Name = "test-container" diff --git a/operator/pkg/reconciliation/defaults.go b/operator/pkg/reconciliation/defaults.go index 9a41e06fe..69ff0bd47 100644 --- a/operator/pkg/reconciliation/defaults.go +++ b/operator/pkg/reconciliation/defaults.go @@ -8,5 +8,5 @@ var ( DefaultsConfigInitContainer = buildResourceRequirements(256, 256) // Provides reasonable defaults for the reaper sidecar container. - DefaultsReaperContainer = buildResourceRequirements(2000, 512) + DefaultsReaperContainer = buildResourceRequirements(800, 512) ) diff --git a/operator/pkg/reconciliation/reconcile_racks.go b/operator/pkg/reconciliation/reconcile_racks.go index 4b376a5bd..91fb4a9c0 100644 --- a/operator/pkg/reconciliation/reconcile_racks.go +++ b/operator/pkg/reconciliation/reconcile_racks.go @@ -5,6 +5,7 @@ package reconciliation import ( "fmt" + "math" "reflect" "sort" "strings" @@ -111,6 +112,18 @@ func (rc *ReconciliationContext) CheckSuperuserSecretCreation() result.Reconcile return result.Continue() } +func (rc *ReconciliationContext) CheckReaperUserSecretCreation() result.ReconcileResult { + rc.ReqLogger.Info("reconcile_racks::CheckReaperUserSecretCreation") + + _, err := rc.retrieveReaperUserSecretOrCreate() + if err != nil { + rc.ReqLogger.Error(err, "error retrieving Reaper user secret for CassandraDatacenter.") + return result.Error(err) + } + + return result.Continue() +} + func (rc *ReconciliationContext) CheckInternodeCredentialCreation() result.ReconcileResult { rc.ReqLogger.Info("reconcile_racks::CheckInternodeCredentialCreation") @@ -759,6 +772,13 @@ func (rc *ReconciliationContext) GetUsers() []api.CassandraUser { SecretName: dc.GetSuperuserSecretNamespacedName().Name, }) + if dc.IsReaperEnabled() { + users = append(users, api.CassandraUser{ + Superuser: true, + SecretName: dc.GetReaperUserSecretNamespacedName().Name, + }) + } + return users } @@ -825,6 +845,36 @@ func (rc *ReconciliationContext) CreateUsers() result.ReconcileResult { return result.Continue() } +func (rc *ReconciliationContext) ConfigureSystemAuthReplication() result.ReconcileResult { + dc := rc.Datacenter + + if dc.Spec.Stopped { + rc.ReqLogger.Info("cluster is stopped, skipping ConfigureSystemAuthReplication") + return result.Continue() + } + + pod := rc.dcPods[0] + + replication := []httphelper.ReplicationSetting{ + { + Datacenter: dc.Name, + ReplicationFactor: getSystemAuthRF(dc), + }, + } + + err := rc.NodeMgmtClient.CallAlterKeyspaceEndpoint(pod, "system_auth", replication) + + if err != nil { + return result.Error(err) + } + + return result.Continue() +} + +func getSystemAuthRF(dc *api.CassandraDatacenter) int { + return int(math.Min(float64(3), float64(dc.Spec.Size))) +} + func findHostIdForIpFromEndpointsData(endpointsData []httphelper.EndpointState, ip string) string { for _, data := range endpointsData { if data.GetRpcAddress() == ip { @@ -2069,6 +2119,10 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } + if recResult := rc.CheckReaperUserSecretCreation(); recResult.Completed() { + return recResult.Output() + } + if recResult := rc.CheckInternodeCredentialCreation(); recResult.Completed() { return recResult.Output() } @@ -2105,10 +2159,6 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } - if recResult := rc.CheckReaperSchemaInitialized(); recResult.Completed() { - return recResult.Output() - } - if recResult := rc.CheckRollingRestart(); recResult.Completed() { return recResult.Output() } @@ -2125,10 +2175,18 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) { return recResult.Output() } + if recResult := rc.ConfigureSystemAuthReplication(); recResult.Completed() { + return recResult.Output() + } + if recResult := rc.CreateUsers(); recResult.Completed() { return recResult.Output() } + if recResult := rc.CheckReaperSchemaInitialized(endpointData); recResult.Completed() { + return recResult.Output() + } + if recResult := rc.CheckClearActionConditions(); recResult.Completed() { return recResult.Output() } diff --git a/operator/pkg/reconciliation/reconcile_reaper.go b/operator/pkg/reconciliation/reconcile_reaper.go index b78cb90ca..de7061ade 100644 --- a/operator/pkg/reconciliation/reconcile_reaper.go +++ b/operator/pkg/reconciliation/reconcile_reaper.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/datastax/cass-operator/operator/internal/result" api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1" + "github.com/datastax/cass-operator/operator/pkg/httphelper" v1batch "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -14,13 +15,16 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "math" + "sigs.k8s.io/controller-runtime/pkg/client" "strconv" ) const ( ReaperUIPort = 7080 ReaperAdminPort = 7081 - ReaperDefaultImage = "thelastpickle/cassandra-reaper:2.0.5" + // This is the default image until https://github.com/thelastpickle/cassandra-reaper/pull/957 + // gets merged. + ReaperDefaultImage = "jsanda/cassandra-reaper:k8s-sidecar" ReaperDefaultPullPolicy = corev1.PullIfNotPresent ReaperContainerName = "reaper" ReaperHealthCheckPath = "/healthcheck" @@ -28,37 +32,75 @@ const ( ReaperSchemaInitJob = "ReaperSchemaInitJob" // This code currently lives at https://github.com/jsanda/create_keyspace. ReaperSchemaInitJobImage = "jsanda/reaper-init-keyspace:latest" + ReaperDefaultJmxUsername = "cassandra" + ReaperDefaultJmxPassword = "cassandra" ) -func buildReaperContainer(dc *api.CassandraDatacenter) corev1.Container { +func buildReaperContainer(dc *api.CassandraDatacenter, schemaOnly bool) (*corev1.Container, error) { ports := []corev1.ContainerPort{ {Name: "ui", ContainerPort: ReaperUIPort, Protocol: "TCP"}, {Name: "admin", ContainerPort: ReaperAdminPort, Protocol: "TCP"}, } + envVars := []corev1.EnvVar{ + {Name: "REAPER_STORAGE_TYPE", Value: "cassandra"}, + {Name: "REAPER_ENABLE_DYNAMIC_SEED_LIST", Value: "false"}, + {Name: "REAPER_DATACENTER_AVAILABILITY", Value: "SIDECAR"}, + {Name: "REAPER_SERVER_APP_PORT", Value: strconv.Itoa(ReaperUIPort)}, + {Name: "REAPER_SERVER_ADMIN_PORT", Value: strconv.Itoa(ReaperAdminPort)}, + {Name: "REAPER_CASS_CLUSTER_NAME", Value: dc.ClusterName}, + {Name: "REAPER_CASS_CONTACT_POINTS", Value: fmt.Sprintf("[%s]", dc.GetDatacenterServiceName())}, + {Name: "REAPER_AUTH_ENABLED", Value: "false"}, + {Name: "REAPER_JMX_AUTH_USERNAME", Value: ReaperDefaultJmxUsername}, + {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ReaperDefaultJmxPassword}, + {Name: "REAPER_LOCAL_CASS_CLUSTER", Value: dc.ClusterName}, + {Name: "IS_K8S", Value: "true"}, + } + + secretName := dc.GetReaperUserSecretNamespacedName() + envVars = append(envVars, corev1.EnvVar{Name: "REAPER_CASS_AUTH_ENABLED", Value: "true"}) + envVars = append(envVars, corev1.EnvVar{ + Name: "REAPER_CASS_AUTH_USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "username", + }, + }, + }) + envVars = append(envVars, corev1.EnvVar{ + Name: "REAPER_CASS_AUTH_PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "password", + }, + }, + }) + + var resources corev1.ResourceRequirements + if schemaOnly { + resources = buildResourceRequirements(500, 384) + } else { + resources = *getResourcesOrDefault(&dc.Spec.Reaper.Resources, &DefaultsReaperContainer) + } + container := corev1.Container{ Name: ReaperContainerName, Image: getReaperImage(dc), ImagePullPolicy: getReaperPullPolicy(dc), Ports: ports, - LivenessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, int(60 * dc.Spec.Size), 10), - ReadinessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 30, 15), - Env: []corev1.EnvVar{ - {Name: "REAPER_STORAGE_TYPE", Value: "cassandra"}, - {Name: "REAPER_ENABLE_DYNAMIC_SEED_LIST", Value: "false"}, - {Name: "REAPER_DATACENTER_AVAILABILITY", Value: "SIDECAR"}, - {Name: "REAPER_SERVER_APP_PORT", Value: strconv.Itoa(ReaperUIPort)}, - {Name: "REAPER_SERVER_ADMIN_PORT", Value: strconv.Itoa(ReaperAdminPort)}, - {Name: "REAPER_CASS_CLUSTER_NAME", Value: dc.ClusterName}, - {Name: "REAPER_CASS_CONTACT_POINTS", Value: fmt.Sprintf("[%s]", dc.GetSeedServiceName())}, - {Name: "REAPER_AUTH_ENABLED", Value: "false"}, - {Name: "REAPER_JMX_AUTH_USERNAME", Value: ""}, - {Name: "REAPER_JMX_AUTH_PASSWORD", Value: ""}, - }, - Resources: *getResourcesOrDefault(&dc.Spec.Reaper.Resources, &DefaultsReaperContainer), + LivenessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 15, 20), + ReadinessProbe: probe(ReaperAdminPort, ReaperHealthCheckPath, 15, 20), + Env: envVars, + Resources: resources, } - return container + return &container, nil } func getReaperImage(dc *api.CassandraDatacenter) string { @@ -75,13 +117,13 @@ func getReaperPullPolicy(dc *api.CassandraDatacenter) corev1.PullPolicy { return dc.Spec.Reaper.ImagePullPolicy } -func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.ReconcileResult { +func (rc *ReconciliationContext) CheckReaperSchemaInitialized(endpoints httphelper.CassMetadataEndpoints) result.ReconcileResult { // Using a job eventually get replaced with calls to the mgmt api once it has support for // creating keyspaces and tables. rc.ReqLogger.Info("reconcile_reaper::CheckReaperSchemaInitialized") - if rc.Datacenter.Spec.Reaper == nil || !rc.Datacenter.Spec.Reaper.Enabled { + if !rc.Datacenter.IsReaperEnabled() || rc.Datacenter.Status.ReaperStatus.SchemaInitialized { return result.Continue() } @@ -91,7 +133,11 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: rc.Datacenter.Namespace, Name: jobName}, schemaJob) if err != nil && errors.IsNotFound(err) { // Create the job - schemaJob := buildInitReaperSchemaJob(rc.Datacenter) + schemaJob, err := buildInitReaperSchemaJob(rc.Datacenter) + if err != nil { + rc.ReqLogger.Error(err, "failed to create Reaper schema init job") + return result.Error(err) + } rc.ReqLogger.Info("creating Reaper schema init job", ReaperSchemaInitJob, schemaJob.Name) if err := setControllerReference(rc.Datacenter, schemaJob, rc.Scheme); err != nil { rc.ReqLogger.Error(err, "failed to set owner reference", ReaperSchemaInitJob, schemaJob.Name) @@ -106,12 +152,59 @@ func (rc *ReconciliationContext) CheckReaperSchemaInitialized() result.Reconcile } else if err != nil { return result.Error(err) } else if jobFinished(schemaJob) { - return result.Continue() + if checkSchemaAgreement(endpoints) { + return rc.checkReaperSchemaJob(endpoints) + } else { + rc.ReqLogger.Info("no schema agreement yet") + return result.RequeueSoon(5) + } } else { return result.RequeueSoon(2) } } +func (rc *ReconciliationContext) checkReaperSchemaJob(endpoints httphelper.CassMetadataEndpoints) result.ReconcileResult { + jobName := fmt.Sprintf("%s-reaper-schema", rc.Datacenter.Spec.ClusterName) + job := &v1batch.Job{} + + err := rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: rc.Datacenter.Namespace, Name: jobName}, job) + if err != nil && errors.IsNotFound(err) { + job, err := buildReaperSchemaJob(rc.Datacenter) + if err != nil { + rc.ReqLogger.Error(err, "failed create Reaper schema job") + return result.Error(err) + } + rc.ReqLogger.Info("creating Reaper schema job", "JobName", job.Name) + if err := setControllerReference(rc.Datacenter, job, rc.Scheme); err != nil { + rc.ReqLogger.Error(err, "failed to set owner reference", "ReaperSchemaJob", job.Name) + return result.Error(err) + } + if err := rc.Client.Create(rc.Ctx, job); err != nil { + rc.ReqLogger.Error(err, "failed to create job", "ReaperSchemaJob", job.Name) + return result.Error(err) + } else { + return result.RequeueSoon(2) + } + } else if err != nil { + return result.Error(err) + } else if jobFinished(job) { + if checkSchemaAgreement(endpoints) { + patch := client.MergeFrom(rc.Datacenter.DeepCopy()) + rc.Datacenter.Status.ReaperStatus.SchemaInitialized = true + if err = rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil { + rc.ReqLogger.Error(err, "error updating the reaper status") + return result.Error(err) + } + rc.ReqLogger.Info("schema agreement") + return result.RequeueSoon(0) + } else { + return result.RequeueSoon(5) + } + } else { + return result.RequeueSoon(5) + } +} + func getReaperSchemaInitJobName(dc *api.CassandraDatacenter) string { return fmt.Sprintf("%s-reaper-init-schema", dc.Spec.ClusterName) } @@ -192,4 +285,12 @@ func newReaperService(dc *api.CassandraDatacenter) *corev1.Service { Selector: dc.GetDatacenterLabels(), }, } +} + +func checkSchemaAgreement(endpoints httphelper.CassMetadataEndpoints) bool { + schemaVersions := make(map[string]bool) + for _, state := range endpoints.Entity { + schemaVersions[state.Schema] = true + } + return len(schemaVersions) == 1 } \ No newline at end of file diff --git a/operator/pkg/reconciliation/reconcile_reaper_test.go b/operator/pkg/reconciliation/reconcile_reaper_test.go index 6cd3e0cc6..99a7ae236 100644 --- a/operator/pkg/reconciliation/reconcile_reaper_test.go +++ b/operator/pkg/reconciliation/reconcile_reaper_test.go @@ -4,6 +4,7 @@ import ( "testing" api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1" + "github.com/datastax/cass-operator/operator/pkg/httphelper" "github.com/stretchr/testify/assert" v1batch "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -16,7 +17,9 @@ import ( func TestReconcileReaper_buildInitReaperSchemaJob(t *testing.T) { dc := newCassandraDatacenter() - job := buildInitReaperSchemaJob(dc) + job, err := buildInitReaperSchemaJob(dc) + + assert.NoError(t, err) assert.Equal(t, getReaperSchemaInitJobName(dc), job.Name) assert.Equal(t, dc.GetDatacenterLabels(), job.Labels) @@ -26,11 +29,35 @@ func TestReconcileReaper_buildInitReaperSchemaJob(t *testing.T) { assert.Equal(t, ReaperSchemaInitJobImage, container.Image) + secretName := dc.GetReaperUserSecretNamespacedName() + expectedEnvVars := []corev1.EnvVar{ {Name: "KEYSPACE", Value: ReaperKeyspace}, {Name: "CONTACT_POINTS", Value: dc.GetSeedServiceName()}, {Name: "REPLICATION", Value: "{'class': 'NetworkTopologyStrategy', 'ReaperSchemaJobTest': 3}"}, } + expectedEnvVars = append(expectedEnvVars, corev1.EnvVar{ + Name: "USERNAME", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "username", + }, + }, + }) + expectedEnvVars = append(expectedEnvVars, corev1.EnvVar{ + Name: "PASSWORD", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: secretName.Name, + }, + Key: "password", + }, + }, + }) assert.ElementsMatch(t, expectedEnvVars, container.Env) } @@ -49,6 +76,7 @@ func TestReconcileReaper_newReaperService(t *testing.T) { func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { rc, _, cleanupMockScr := setupTest() + rc.Datacenter.Spec.ServerType = "cassandra" rc.Datacenter.Spec.Reaper = &api.ReaperConfig{Enabled: true} defer cleanupMockScr() @@ -56,7 +84,18 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { rc.Client = fake.NewFakeClient(trackObjects...) - reconcileResult := rc.CheckReaperSchemaInitialized() + endpoints := httphelper.CassMetadataEndpoints{ + Entity: []httphelper.EndpointState{ + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + }, + } + + reconcileResult := rc.CheckReaperSchemaInitialized(endpoints) assert.True(t, reconcileResult.Completed()) result, err := reconcileResult.Output() @@ -69,6 +108,17 @@ func TestReconcileReaper_CheckReaperSchemaInitialized(t *testing.T) { err = rc.Client.Get(rc.Ctx, types.NamespacedName{Namespace: rc.Datacenter.Namespace, Name: jobName}, job) assert.NoErrorf(t, err, "failed to get job %s", jobName) + + job.Status.Conditions = append(job.Status.Conditions, v1batch.JobCondition{ + Type: v1batch.JobComplete, + Status: corev1.ConditionTrue, + }) + + err = rc.Client.Status().Update(rc.Ctx, job) + assert.NoError(t, err) + + reconcileResult = rc.CheckReaperSchemaInitialized(endpoints) + assert.True(t, reconcileResult.Completed()) } func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { @@ -79,7 +129,18 @@ func TestReconcileReaper_CheckReaperSchemaNotInitialized(t *testing.T) { rc.Client = fake.NewFakeClient(trackObjects...) - reconcileResult := rc.CheckReaperSchemaInitialized() + endpoints := httphelper.CassMetadataEndpoints{ + Entity: []httphelper.EndpointState{ + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + { + Schema: "e84b6a60-24cf-30ca-9b58-452d92911703", + }, + }, + } + + reconcileResult := rc.CheckReaperSchemaInitialized(endpoints) assert.False(t, reconcileResult.Completed()) job := &v1batch.Job{} diff --git a/operator/pkg/reconciliation/secrets.go b/operator/pkg/reconciliation/secrets.go index 905a023fe..600809e25 100644 --- a/operator/pkg/reconciliation/secrets.go +++ b/operator/pkg/reconciliation/secrets.go @@ -79,6 +79,39 @@ func buildDefaultSuperuserSecret(dc *api.CassandraDatacenter) (*corev1.Secret, e return secret, nil } +func buildReaperUserSecret(dc *api.CassandraDatacenter) (*corev1.Secret, error) { + if dc.IsReaperEnabled() { + return buildUserSecret(dc.GetReaperUserSecretNamespacedName(), dc.Spec.ClusterName + "-reaper") + } + + return nil, nil +} + +func buildUserSecret(secretNamespacedName types.NamespacedName, username string) (*corev1.Secret, error) { + secret := &corev1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: secretNamespacedName.Namespace, + Name: secretNamespacedName.Name, + }, + } + + password, err := generateUtf8Password() + if err != nil { + return nil, fmt.Errorf("Failed to generate password for user %s: %w", username, err) + } + + secret.Data = map[string][]byte{ + "username": []byte(username), + "password": []byte(password), + } + + return secret, nil +} + func (rc *ReconciliationContext) retrieveSecret(secretNamespacedName types.NamespacedName) (*corev1.Secret, error) { secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{ @@ -109,6 +142,12 @@ func (rc *ReconciliationContext) retrieveSuperuserSecret() (*corev1.Secret, erro return rc.retrieveSecret(secretNamespacedName) } +func (rc *ReconciliationContext) retrieveReaperUserSecret() (*corev1.Secret, error) { + dc := rc.Datacenter + secretNamespacedName := dc.GetReaperUserSecretNamespacedName() + return rc.retrieveSecret(secretNamespacedName) +} + func (rc *ReconciliationContext) retrieveSuperuserSecretOrCreateDefault() (*corev1.Secret, error) { dc := rc.Datacenter @@ -136,6 +175,38 @@ func (rc *ReconciliationContext) retrieveSuperuserSecretOrCreateDefault() (*core return secret, nil } +func (rc *ReconciliationContext) retrieveReaperUserSecretOrCreate() (*corev1.Secret, error) { + dc := rc.Datacenter + + // TODO if we go from enabled to disabled, should the secret get deleted? + if !dc.IsReaperEnabled() { + return nil, nil + } + + secret, retrieveErr := rc.retrieveReaperUserSecret() + if retrieveErr != nil { + if errors.IsNotFound(retrieveErr) { + secret, err := buildReaperUserSecret(dc) + + if err == nil && secret == nil { + return nil, retrieveErr + } + + if err == nil { + err = rc.Client.Create(rc.Ctx, secret) + } + + if err != nil { + return nil, fmt.Errorf("Failed to create reaper user secret: %w", err) + } + } + } else { + return nil, retrieveErr + } + + return secret, nil +} + func (rc *ReconciliationContext) createInternodeCACredential() (*corev1.Secret, error) { secret := &corev1.Secret{ TypeMeta: metav1.TypeMeta{ diff --git a/operator/pkg/reconciliation/secrets_test.go b/operator/pkg/reconciliation/secrets_test.go index e624eb2a7..a5d1ac627 100644 --- a/operator/pkg/reconciliation/secrets_test.go +++ b/operator/pkg/reconciliation/secrets_test.go @@ -70,6 +70,38 @@ func Test_buildDefaultSuperuserSecret(t *testing.T) { }) } +func Test_buildReaperUserSecret(t *testing.T) { + t.Run("test reaper user secret is created", func(t *testing.T) { + dc := &api.CassandraDatacenter{ + ObjectMeta: metav1.ObjectMeta{ + Name: "exampleDC", + Namespace: "examplens", + }, + Spec: api.CassandraDatacenterSpec{ + ClusterName: "exampleCluster", + ServerType: "cassandra", + Reaper: &api.ReaperConfig{ + Enabled: true, + }, + }, + } + secret, err := buildReaperUserSecret(dc) + if err != nil { + t.Errorf("should not have returned an error %w", err) + return + } + + if secret.ObjectMeta.Namespace != dc.ObjectMeta.Namespace { + t.Errorf("expected secret in namespace '%s' but was '%s", dc.ObjectMeta.Namespace, secret.ObjectMeta.Namespace) + } + + expectedSecretName := fmt.Sprintf("%s-reaper", dc.Spec.ClusterName) + if secret.ObjectMeta.Name != expectedSecretName { + t.Errorf("expected default secret name '%s' but was '%s'", expectedSecretName, secret.ObjectMeta.Name) + } + }) +} + func Test_validateCassandraUserSecretContent(t *testing.T) { var ( name = "datacenter-example" diff --git a/tests/enable_reaper/enable_reaper_suite_test.go b/tests/enable_reaper/enable_reaper_suite_test.go index 23f2f522b..5caf3cd18 100644 --- a/tests/enable_reaper/enable_reaper_suite_test.go +++ b/tests/enable_reaper/enable_reaper_suite_test.go @@ -20,7 +20,7 @@ var ( testName = "Enable Reaper" namespace = "test-enable-reaper" dcName = "dc1" - dcYaml = "../testdata/oss-three-rack-three-node-dc.yaml" + dcYaml = "../testdata/oss-three-node-with-auth-dc.yaml" operatorYaml = "../testdata/operator.yaml" dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName) dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) @@ -61,9 +61,9 @@ var _ = Describe(testName, func() { nodeStatusesHostIds := ns.GetNodeStatusesHostIds(dcName) Expect(len(nodeStatusesHostIds), 3) - ns.WaitForDatacenterReady(dcName) ns.WaitForDatacenterCondition(dcName, "Ready", string(corev1.ConditionTrue)) ns.WaitForDatacenterCondition(dcName, "Initialized", string(corev1.ConditionTrue)) + ns.WaitForDatacenterReady(dcName) step = "enable Reaper" json := `{"spec": {"reaper": {"enabled": true}}}` @@ -71,13 +71,9 @@ var _ = Describe(testName, func() { ns.ExecAndLog(step, k) ns.WaitForDatacenterOperatorProgress(dcName, "Updating", 30) - ns.WaitForDatacenterOperatorProgress(dcName, "Ready", 600) - step = "check that Reaper container is deployed" - json = `jsonpath={.items[*].spec.containers[?(@.name=="reaper")].name}` - k = kubectl.Get("pods"). - FormatOutput(json) - ns.WaitForOutputAndLog(step, k, "reaper reaper reaper", 20) + ns.WaitForReaperSchemaInitialized(dcName, 900) + ns.WaitForDatacenterOperatorProgress(dcName, "Ready", 120) step = "disable Reaper" json = `{"spec": {"reaper": {"enabled": false}}}` diff --git a/tests/testdata/oss-three-node-with-auth-dc.yaml b/tests/testdata/oss-three-node-with-auth-dc.yaml new file mode 100644 index 000000000..a4daa7362 --- /dev/null +++ b/tests/testdata/oss-three-node-with-auth-dc.yaml @@ -0,0 +1,32 @@ +apiVersion: cassandra.datastax.com/v1beta1 +kind: CassandraDatacenter +metadata: + name: dc1 +spec: + clusterName: cluster1 + serverType: cassandra + serverVersion: "3.11.7" + serverImage: datastax/cassandra-mgmtapi-3_11_7:v0.1.12 + managementApiAuth: + insecure: {} + size: 3 + storageConfig: + cassandraDataVolumeClaimSpec: + storageClassName: server-storage + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + racks: + - name: r1 + - name: r2 + - name: r3 + config: + cassandra-yaml: + authenticator: org.apache.cassandra.auth.PasswordAuthenticator + authorizer: org.apache.cassandra.auth.CassandraAuthorizer + role_manager: org.apache.cassandra.auth.CassandraRoleManager + jvm-options: + initial_heap_size: "800m" + max_heap_size: "800m"