Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -6280,6 +6280,11 @@ spec:
quietPeriod:
format: date-time
type: string
reaperStatus:
properties:
schemaInitialized:
type: boolean
type: object
superUserUpserted:
description: Deprecated. Use usersUpserted instead. The timestamp at
which CQL superuser credentials were last upserted to the management
Expand Down
8 changes: 8 additions & 0 deletions mage/ginkgo/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ func (ns *NsWrapper) WaitForDatacenterOperatorProgress(dcName string, progressVa
ns.WaitForOutputAndLog(step, k, progressValue, timeout)
}

func (ns *NsWrapper) WaitForReaperSchemaInitialized(dcName string, timeout int) {
step := fmt.Sprintf("checking the reaper status schemaInitialized is set to true")
json := "jsonpath={.status.reaperStatus.schemaInitialized}"
k := kubectl.Get("CassandraDatacenter", dcName).
FormatOutput(json)
ns.WaitForOutputAndLog(step, k, "true", timeout)
}

func (ns *NsWrapper) WaitForSuperUserUpserted(dcName string, timeout int) {
json := "jsonpath={.status.superUserUpserted}"
k := kubectl.Get("CassandraDatacenter", dcName).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6270,6 +6270,11 @@ spec:
quietPeriod:
format: date-time
type: string
reaperStatus:
properties:
schemaInitialized:
type: boolean
type: object
superUserUpserted:
description: Deprecated. Use usersUpserted instead. The timestamp at
which CQL superuser credentials were last upserted to the management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"os"

"github.com/Jeffail/gabs"
"github.com/datastax/cass-operator/operator/pkg/serverconfig"
"github.com/datastax/cass-operator/operator/pkg/utils"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/datastax/cass-operator/operator/pkg/serverconfig"
"github.com/datastax/cass-operator/operator/pkg/utils"
)

const (
Expand Down Expand Up @@ -337,6 +336,10 @@ func NewDatacenterCondition(conditionType DatacenterConditionType, status corev1
}
}

type ReaperStatus struct {
SchemaInitialized bool `json:"schemaInitialized,omitempty"`
}

// CassandraDatacenterStatus defines the observed state of CassandraDatacenter
// +k8s:openapi-gen=true
type CassandraDatacenterStatus struct {
Expand Down Expand Up @@ -376,6 +379,8 @@ type CassandraDatacenterStatus struct {

// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

ReaperStatus ReaperStatus `json:"reaperStatus,omitempty"`
}

// +genclient
Expand Down Expand Up @@ -577,6 +582,21 @@ func (dc *CassandraDatacenter) GetSuperuserSecretNamespacedName() types.Namespac
}
}

func (dc *CassandraDatacenter) GetReaperUserSecretNamespacedName() types.NamespacedName {
name := dc.Spec.ClusterName + "-reaper"
namespace := dc.Namespace

return types.NamespacedName{Name: name, Namespace: namespace}
}

func (dc *CassandraDatacenter) IsReaperEnabled() bool {
return dc.Spec.Reaper != nil && dc.Spec.Reaper.Enabled && dc.Spec.ServerType == "cassandra"
}

func (dc *CassandraDatacenter) DeployReaper() bool {
return dc.IsReaperEnabled() && dc.Status.ReaperStatus.SchemaInitialized
}

// GetConfigAsJSON gets a JSON-encoded string suitable for passing to configBuilder
func (dc *CassandraDatacenter) GetConfigAsJSON() (string, error) {

Expand Down
17 changes: 17 additions & 0 deletions operator/pkg/apis/cassandra/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions operator/pkg/httphelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type EndpointState struct {
IsAlive string `json:"IS_ALIVE"`
NativeTransportAddress string `json:"NATIVE_TRANSPORT_ADDRESS"`
RpcAddress string `json:"RPC_ADDRESS"`
Schema string `json:"SCHEMA"`
}

func (x *EndpointState) GetRpcAddress() string {
Expand Down Expand Up @@ -236,6 +237,52 @@ func (client *NodeMgmtClient) CallKeyspaceCleanupEndpoint(pod *corev1.Pod, jobs
return err
}

type ReplicationSetting struct {
Datacenter string
ReplicationFactor int
}

func (client *NodeMgmtClient) CallAlterKeyspaceEndpoint(pod *corev1.Pod, keyspace string, replicationSettings []ReplicationSetting) error {
client.Log.Info(
"calling Management API alter keyspace - POST /api/v0/ops/keyspace/alter",
"pod", pod.Name,
)
postData := make(map[string]interface{})

postData["keyspace_name"] = keyspace
replication := make([]map[string]interface{}, 0)

for _, r := range replicationSettings {
dcReplication := make(map[string]interface{})
dcReplication["dc_name"] = r.Datacenter
dcReplication["replication_factor"] = r.ReplicationFactor

replication = append(replication, dcReplication)
}
postData["replication_settings"] = replication

body, err := json.Marshal(postData)
if err != nil {
return err
}

podHost, err := BuildPodHostFromPod(pod)
if err != nil {
return err
}

request := nodeMgmtRequest{
endpoint: "/api/v0/ops/keyspace/alter",
host: podHost,
method: http.MethodPost,
timeout: time.Second * 20,
body: body,
}

_, err = callNodeMgmtEndpoint(client, request, "application/json")
return err
}

func (client *NodeMgmtClient) CallLifecycleStartEndpointWithReplaceIp(pod *corev1.Pod, replaceIp string) error {
// talk to the pod via IP because we are dialing up a pod that isn't ready,
// so it won't be reachable via the service and pod DNS
Expand Down
1 change: 1 addition & 0 deletions operator/pkg/httphelper/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,5 @@ func Test_parseMetadataEndpointsResponseBody(t *testing.T) {
assert.Equal(t, 2, len(endpoints.Entity))
assert.Equal(t, "10.233.90.45", endpoints.Entity[0].RpcAddress)
assert.Equal(t, "95c157dc-2811-446a-a541-9faaab2e6930", endpoints.Entity[0].HostID)
assert.Equal(t, "e84b6a60-24cf-30ca-9b58-452d92911703", endpoints.Entity[0].Schema)
}
104 changes: 85 additions & 19 deletions operator/pkg/reconciliation/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,13 @@ func buildContainers(dc *api.CassandraDatacenter, serverVolumeMounts []corev1.Vo
loggerContainer.Resources = *getResourcesOrDefault(&dc.Spec.SystemLoggerResources, &DefaultsLoggerContainer)

containers := []corev1.Container{cassContainer, loggerContainer}
if dc.Spec.Reaper != nil && dc.Spec.Reaper.Enabled && dc.Spec.ServerType == "cassandra" {
reaperContainer := buildReaperContainer(dc)
containers = append(containers, reaperContainer)

if dc.IsReaperEnabled() {
reaperContainer, err := buildReaperContainer(dc, false)
if err != nil {
return containers, err
}
containers = append(containers, *reaperContainer)
}

return containers, nil
Expand Down Expand Up @@ -658,7 +662,82 @@ func buildPodTemplateSpec(dc *api.CassandraDatacenter, zone string, rackName str
return baseTemplate, nil
}

func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job {
func buildReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) {
container, err := buildReaperContainer(dc, true)
if err != nil {
return nil, err
}

container.Env = append(container.Env, corev1.EnvVar{
Name: "SCHEMA_ONLY",
Value: "true",
})
container.Image = ReaperDefaultImage
container.ImagePullPolicy = corev1.PullIfNotPresent

jobName := fmt.Sprintf("%s-reaper-schema", dc.Spec.ClusterName)

return &v1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
APIVersion: "batch/v1",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: dc.Namespace,
Name: jobName,
Labels: dc.GetDatacenterLabels(),
},
Spec: v1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{*container},
},
},
},
}, nil
}

func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) (*v1.Job, error) {
envVars := []corev1.EnvVar{
{
Name: "KEYSPACE",
Value: ReaperKeyspace,
},
{
Name: "CONTACT_POINTS",
Value: dc.GetSeedServiceName(),
},
{
Name: "REPLICATION",
Value: getReaperReplication(dc),
},
}

secretName := dc.GetReaperUserSecretNamespacedName()
envVars = append(envVars, corev1.EnvVar{
Name: "USERNAME",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName.Name,
},
Key: "username",
},
},
})
envVars = append(envVars, corev1.EnvVar{
Name: "PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName.Name,
},
Key: "password",
},
},
})

return &v1.Job{
TypeMeta: metav1.TypeMeta{
Kind: "Job",
Expand All @@ -678,26 +757,13 @@ func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job {
Name: getReaperSchemaInitJobName(dc),
Image: ReaperSchemaInitJobImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: "KEYSPACE",
Value: ReaperKeyspace,
},
{
Name: "CONTACT_POINTS",
Value: dc.GetSeedServiceName(),
},
{
Name: "REPLICATION",
Value: getReaperReplication(dc),
},
},
Env: envVars,
},
},
},
},
},
}
}, nil
}

func addVolumes(dc *api.CassandraDatacenter, baseTemplate *corev1.PodTemplateSpec) []corev1.Volume {
Expand Down
32 changes: 32 additions & 0 deletions operator/pkg/reconciliation/constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func TestCassandraDatacenter_buildContainers_reaper_resources(t *testing.T) {
},
},
},
Status: api.CassandraDatacenterStatus{
ReaperStatus: api.ReaperStatus{
SchemaInitialized: true,
},
},
}

containers, err := buildContainers(dc, []corev1.VolumeMount{})
Expand All @@ -242,6 +247,11 @@ func TestCassandraDatacenter_buildContainers_reaper_resources_set_when_not_speci
Enabled: true,
},
},
Status: api.CassandraDatacenterStatus{
ReaperStatus: api.ReaperStatus{
SchemaInitialized: true,
},
},
}

containers, err := buildContainers(dc, []corev1.VolumeMount{})
Expand All @@ -254,6 +264,28 @@ func TestCassandraDatacenter_buildContainers_reaper_resources_set_when_not_speci
}
}

func TestCassandraDatacenter_buildContainers_no_reaper(t *testing.T) {
dc := &api.CassandraDatacenter{
Spec: api.CassandraDatacenterSpec{
ClusterName: "bob",
ServerType: "cassandra",
ServerVersion: "3.11.6",
Reaper: &api.ReaperConfig{
Enabled: false,
},
},
}

containers, err := buildContainers(dc, []corev1.VolumeMount{})
assert.NotNil(t, containers, "Unexpected containers containers received")
assert.Nil(t, err, "Unexpected error encountered")

assert.Len(t, containers, 2, "Unexpected number of containers containers returned")
for _, container := range containers {
assert.NotEqual(t, container.Name, ReaperContainerName)
}
}

func TestCassandraDatacenter_buildPodTemplateSpec_containers_merge(t *testing.T) {
testContainer := corev1.Container{}
testContainer.Name = "test-container"
Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/reconciliation/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ var (
DefaultsConfigInitContainer = buildResourceRequirements(256, 256)

// Provides reasonable defaults for the reaper sidecar container.
DefaultsReaperContainer = buildResourceRequirements(2000, 512)
DefaultsReaperContainer = buildResourceRequirements(800, 512)
)
Loading