Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/core/v1alpha1/tidb_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ const (
TiDBGroupAvailableReason = "TiDBGroupAvailable"
)

const (
// AnnoKeySmoothUpgradePhase is set on a TiDBGroup while a smooth upgrade (DDL pause) is in progress.
AnnoKeySmoothUpgradePhase = "tidb.core.pingcap.com/smooth-upgrade-phase"
// AnnoValSmoothUpgradePhaseInProgress is the value of AnnoKeySmoothUpgradePhase during an active smooth upgrade.
AnnoValSmoothUpgradePhaseInProgress = "in-progress"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true

Expand Down
13 changes: 13 additions & 0 deletions pkg/compatibility/semver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ func (c *constraints) check(v *semver.Version) bool {
return c.Check(v)
}

// smoothUpgradeMinVersion is the minimum version that supports smooth upgrade DDL pause.
var smoothUpgradeMinVersion = MustNewConstraints(">= 7.5.0")

// SupportsSmoothUpgrade returns true if the given version string supports the smooth upgrade
// DDL pause/resume mechanism (requires TiDB >= v7.5.0).
func SupportsSmoothUpgrade(version string) bool {
v, err := semver.NewVersion(version)
if err != nil {
return false
}
return Check(v, smoothUpgradeMinVersion)
}

func MustNewConstraints(expr string) Constraints {
v, err := semver.NewConstraint(expr)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions pkg/compatibility/semver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,29 @@ import (
"github.com/stretchr/testify/assert"
)

func TestSupportsSmoothUpgrade(t *testing.T) {
cases := []struct {
version string
want bool
}{
{"v7.5.0", true},
{"v7.5.1", true},
{"v8.0.0", true},
{"v7.5.0-alpha", false}, // pre-release of 7.5.0 is before 7.5.0
{"v7.5.1-alpha", true}, // pre-release of 7.5.1 is after 7.5.0
{"v7.4.99", false},
{"v7.4.0", false},
{"v6.0.0", false},
{"invalid", false},
{"", false},
}
for _, c := range cases {
t.Run(c.version, func(tt *testing.T) {
assert.Equal(tt, c.want, SupportsSmoothUpgrade(c.version))
})
}
}

func TestCheck(t *testing.T) {
cases := []struct {
desc string
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/tidbgroup/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task
),

tasks.TaskService(state, r.Client),
tasks.TaskSmoothUpgradeStart(state, r.Client),
tasks.TaskUpdater(state, r.Client, r.AllocateFactory, r.AdoptManager),
common.TaskGroupStatusSelector[scope.TiDBGroup](state),
common.TaskGroupConditionSuspended[scope.TiDBGroup](state),
common.TaskGroupConditionReady[scope.TiDBGroup](state),
common.TaskGroupConditionSynced[scope.TiDBGroup](state),
common.TaskStatusRevisionAndReplicas[scope.TiDBGroup](state),
tasks.TaskStatusAvailable(state),
tasks.TaskSmoothUpgradeFinish(state, r.Client),
common.TaskStatusPersister[scope.TiDBGroup](state, r.Client),
)

Expand Down
179 changes: 179 additions & 0 deletions pkg/controllers/tidbgroup/tasks/upgrade.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tasks

import (
"context"
"encoding/json"
"fmt"
"time"

"k8s.io/apimachinery/pkg/types"

"github.com/pingcap/tidb-operator/api/v2/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/apicall"
coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1"
"github.com/pingcap/tidb-operator/v2/pkg/client"
"github.com/pingcap/tidb-operator/v2/pkg/compatibility"
"github.com/pingcap/tidb-operator/v2/pkg/runtime/scope"
tidbapi "github.com/pingcap/tidb-operator/v2/pkg/tidbapi/v1"
"github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3"
)

const (
smoothUpgradeRequestTimeout = 10 * time.Second
smoothUpgradeRetryInterval = 10 * time.Second
)

// tidbClientFactory creates a TiDB HTTP client for the given instance.
// Accepting this as a parameter enables test injection without changing task semantics.
type tidbClientFactory func(ctx context.Context, c client.Client, ck *v1alpha1.Cluster, tidb *v1alpha1.TiDB) (tidbapi.TiDBClient, error)

// TaskSmoothUpgradeStart calls /upgrade/start on a healthy TiDB instance before rolling upgrade begins.
// It is a no-op when the change is not a version upgrade, or when either the source or target version
// does not support smooth upgrade (< v7.5.0).
func TaskSmoothUpgradeStart(state *ReconcileContext, c client.Client) task.Task {
return taskSmoothUpgradeStart(state, c, newTiDBClientForGroup)
}

func taskSmoothUpgradeStart(state *ReconcileContext, c client.Client, factory tidbClientFactory) task.Task {
return task.NameTaskFunc("SmoothUpgradeStart", func(ctx context.Context) task.Result {
dbg := state.TiDBGroup()

if !needVersionUpgrade(dbg) {
return task.Complete().With("not a version upgrade, skipping smooth upgrade start")
}
if !compatibility.SupportsSmoothUpgrade(dbg.Status.Version) ||
!compatibility.SupportsSmoothUpgrade(dbg.Spec.Template.Spec.Version) {
return task.Complete().With("version does not support smooth upgrade, skipping")
}
if dbg.Annotations[v1alpha1.AnnoKeySmoothUpgradePhase] == v1alpha1.AnnoValSmoothUpgradePhaseInProgress {
return task.Complete().With("smooth upgrade already started")
}

tidb := pickReadyTiDB(state.TiDBSlice())
if tidb == nil {
return task.Retry(smoothUpgradeRetryInterval).With("no ready TiDB instance available for upgrade/start")
}

tidbClient, err := factory(ctx, c, state.Cluster(), tidb)
if err != nil {
return task.Retry(smoothUpgradeRetryInterval).With("cannot create TiDB client for upgrade/start: %v", err)
}

if err := tidbClient.UpgradeStart(ctx, dbg.Spec.Template.Spec.Keyspace); err != nil {
return task.Retry(smoothUpgradeRetryInterval).With("upgrade/start failed, will retry: %v", err)
}

phase := v1alpha1.AnnoValSmoothUpgradePhaseInProgress
if err := patchSmoothUpgradeAnnotation(ctx, c, dbg, &phase); err != nil {
return task.Fail().With("failed to set smooth upgrade annotation: %w", err)
}

return task.Complete().With("smooth upgrade started, DDL paused")
})
}

// TaskSmoothUpgradeFinish calls /upgrade/finish on a healthy TiDB instance after all pods are upgraded.
// It must run after TaskStatusRevisionAndReplicas so that dbg.Status.Version reflects the new version,
// making needVersionUpgrade() return false as the "all done" signal.
func TaskSmoothUpgradeFinish(state *ReconcileContext, c client.Client) task.Task {
return taskSmoothUpgradeFinish(state, c, newTiDBClientForGroup)
}

func taskSmoothUpgradeFinish(state *ReconcileContext, c client.Client, factory tidbClientFactory) task.Task {
return task.NameTaskFunc("SmoothUpgradeFinish", func(ctx context.Context) task.Result {
dbg := state.TiDBGroup()

if dbg.Annotations[v1alpha1.AnnoKeySmoothUpgradePhase] != v1alpha1.AnnoValSmoothUpgradePhaseInProgress {
return task.Complete().With("no smooth upgrade in progress")
}
if needVersionUpgrade(dbg) {
return task.Complete().With("upgrade still in progress, finish not yet")
}

tidb := pickReadyTiDB(state.TiDBSlice())
if tidb == nil {
return task.Retry(smoothUpgradeRetryInterval).With("no ready TiDB instance available for upgrade/finish")
}

tidbClient, err := factory(ctx, c, state.Cluster(), tidb)
if err != nil {
return task.Retry(smoothUpgradeRetryInterval).With("cannot create TiDB client for upgrade/finish: %v", err)
}

if err := tidbClient.UpgradeFinish(ctx); err != nil {
return task.Retry(smoothUpgradeRetryInterval).With("upgrade/finish failed, will retry: %v", err)
}

if err := patchSmoothUpgradeAnnotation(ctx, c, dbg, nil); err != nil {
return task.Fail().With("failed to remove smooth upgrade annotation: %w", err)
}

return task.Complete().With("smooth upgrade finished, DDL resumed")
})
}

// pickReadyTiDB returns the first TiDB instance that is in the Ready state.
func pickReadyTiDB(dbs []*v1alpha1.TiDB) *v1alpha1.TiDB {
for _, db := range dbs {
if coreutil.IsReady[scope.TiDB](db) {
return db
}
}
return nil
}

// newTiDBClientForGroup creates a TiDB HTTP client targeting the given TiDB instance.
func newTiDBClientForGroup(ctx context.Context, c client.Client, ck *v1alpha1.Cluster, tidb *v1alpha1.TiDB) (tidbapi.TiDBClient, error) {
url := coreutil.InstanceAdvertiseURL[scope.TiDB](ck, tidb, coreutil.TiDBStatusPort(tidb))
if !coreutil.IsTLSClusterEnabled(ck) {
return tidbapi.NewTiDBClient(url, smoothUpgradeRequestTimeout, nil), nil
}
tlsConfig, err := apicall.GetClientTLSConfig(ctx, c, ck)
if err != nil {
return nil, fmt.Errorf("cannot get TLS config: %w", err)
}
return tidbapi.NewTiDBClient(url, smoothUpgradeRequestTimeout, tlsConfig), nil
}

type annotationPatch struct {
Metadata annotationPatchMetadata `json:"metadata"`
}

type annotationPatchMetadata struct {
ResourceVersion string `json:"resourceVersion"`
Annotations map[string]*string `json:"annotations"`
}

// patchSmoothUpgradeAnnotation sets (value non-nil) or deletes (value nil) the smooth upgrade annotation.
func patchSmoothUpgradeAnnotation(ctx context.Context, c client.Client, dbg *v1alpha1.TiDBGroup, value *string) error {
p := annotationPatch{
Metadata: annotationPatchMetadata{
ResourceVersion: dbg.GetResourceVersion(),
Annotations: map[string]*string{
v1alpha1.AnnoKeySmoothUpgradePhase: value,
},
},
}
data, err := json.Marshal(&p)
if err != nil {
return fmt.Errorf("invalid patch: %w", err)
}
if err := c.Patch(ctx, dbg, client.RawPatch(types.MergePatchType, data)); err != nil {
return fmt.Errorf("cannot patch smooth upgrade annotation on %s/%s: %w", dbg.Namespace, dbg.Name, err)
}
return nil
}
Loading
Loading