From dd778a7171bd284a5ac5180b4c89d50f5e582bc3 Mon Sep 17 00:00:00 2001 From: tennix Date: Tue, 12 May 2026 16:12:34 -0700 Subject: [PATCH] feat: pause DDL during TiDB version upgrade (smooth upgrade) Introduce /upgrade/start and /upgrade/finish lifecycle hooks in the TiDBGroup reconciliation loop, mirroring TiUP's smooth upgrade behavior. - Before rolling upgrade begins, call POST /upgrade/start on a healthy TiDB instance to pause DDL (global for Dedicated, keyspace-scoped for Premium / TiDB Worker). - After all instances reach the new version, call POST /upgrade/finish to resume DDL. - Guards: only fires when spec.version changes AND both source and target versions support smooth upgrade (>= v7.5.0); no-op for scale/config changes. - Annotation tidb.core.pingcap.com/smooth-upgrade-phase tracks in-flight state across operator restarts. Co-Authored-By: Claude Sonnet 4.6 --- api/core/v1alpha1/tidb_types.go | 7 + pkg/compatibility/semver.go | 13 + pkg/compatibility/semver_test.go | 23 ++ pkg/controllers/tidbgroup/builder.go | 2 + pkg/controllers/tidbgroup/tasks/upgrade.go | 179 +++++++++ .../tidbgroup/tasks/upgrade_test.go | 358 ++++++++++++++++++ pkg/tidbapi/v1/client.go | 24 ++ pkg/tidbapi/v1/client_test.go | 90 +++++ pkg/tidbapi/v1/types.go | 6 + 9 files changed, 702 insertions(+) create mode 100644 pkg/controllers/tidbgroup/tasks/upgrade.go create mode 100644 pkg/controllers/tidbgroup/tasks/upgrade_test.go diff --git a/api/core/v1alpha1/tidb_types.go b/api/core/v1alpha1/tidb_types.go index dde09be606b..6bc17a71849 100644 --- a/api/core/v1alpha1/tidb_types.go +++ b/api/core/v1alpha1/tidb_types.go @@ -51,6 +51,13 @@ const ( TiDBGroupAvailableReason = "TiDBGroupAvailable" ) +const ( + // AnnoKeySmoothUpgradePhase is set on a TiDBGroup while a smooth upgrade (DDL pause) is in progress. + AnnoKeySmoothUpgradePhase = "tidb.core.pingcap.com/smooth-upgrade-phase" + // AnnoValSmoothUpgradePhaseInProgress is the value of AnnoKeySmoothUpgradePhase during an active smooth upgrade. + AnnoValSmoothUpgradePhaseInProgress = "in-progress" +) + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:object:root=true diff --git a/pkg/compatibility/semver.go b/pkg/compatibility/semver.go index 1208645806c..97b6935e10c 100644 --- a/pkg/compatibility/semver.go +++ b/pkg/compatibility/semver.go @@ -42,6 +42,19 @@ func (c *constraints) check(v *semver.Version) bool { return c.Check(v) } +// smoothUpgradeMinVersion is the minimum version that supports smooth upgrade DDL pause. +var smoothUpgradeMinVersion = MustNewConstraints(">= 7.5.0") + +// SupportsSmoothUpgrade returns true if the given version string supports the smooth upgrade +// DDL pause/resume mechanism (requires TiDB >= v7.5.0). +func SupportsSmoothUpgrade(version string) bool { + v, err := semver.NewVersion(version) + if err != nil { + return false + } + return Check(v, smoothUpgradeMinVersion) +} + func MustNewConstraints(expr string) Constraints { v, err := semver.NewConstraint(expr) if err != nil { diff --git a/pkg/compatibility/semver_test.go b/pkg/compatibility/semver_test.go index 8984c039a9e..feb2fa11c33 100644 --- a/pkg/compatibility/semver_test.go +++ b/pkg/compatibility/semver_test.go @@ -21,6 +21,29 @@ import ( "github.com/stretchr/testify/assert" ) +func TestSupportsSmoothUpgrade(t *testing.T) { + cases := []struct { + version string + want bool + }{ + {"v7.5.0", true}, + {"v7.5.1", true}, + {"v8.0.0", true}, + {"v7.5.0-alpha", false}, // pre-release of 7.5.0 is before 7.5.0 + {"v7.5.1-alpha", true}, // pre-release of 7.5.1 is after 7.5.0 + {"v7.4.99", false}, + {"v7.4.0", false}, + {"v6.0.0", false}, + {"invalid", false}, + {"", false}, + } + for _, c := range cases { + t.Run(c.version, func(tt *testing.T) { + assert.Equal(tt, c.want, SupportsSmoothUpgrade(c.version)) + }) + } +} + func TestCheck(t *testing.T) { cases := []struct { desc string diff --git a/pkg/controllers/tidbgroup/builder.go b/pkg/controllers/tidbgroup/builder.go index e193ab0eb95..a769d93698a 100644 --- a/pkg/controllers/tidbgroup/builder.go +++ b/pkg/controllers/tidbgroup/builder.go @@ -58,6 +58,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task ), tasks.TaskService(state, r.Client), + tasks.TaskSmoothUpgradeStart(state, r.Client), tasks.TaskUpdater(state, r.Client, r.AllocateFactory, r.AdoptManager), common.TaskGroupStatusSelector[scope.TiDBGroup](state), common.TaskGroupConditionSuspended[scope.TiDBGroup](state), @@ -65,6 +66,7 @@ func (r *Reconciler) NewRunner(state *tasks.ReconcileContext, reporter task.Task common.TaskGroupConditionSynced[scope.TiDBGroup](state), common.TaskStatusRevisionAndReplicas[scope.TiDBGroup](state), tasks.TaskStatusAvailable(state), + tasks.TaskSmoothUpgradeFinish(state, r.Client), common.TaskStatusPersister[scope.TiDBGroup](state, r.Client), ) diff --git a/pkg/controllers/tidbgroup/tasks/upgrade.go b/pkg/controllers/tidbgroup/tasks/upgrade.go new file mode 100644 index 00000000000..8a688d13860 --- /dev/null +++ b/pkg/controllers/tidbgroup/tasks/upgrade.go @@ -0,0 +1,179 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/types" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/apicall" + coreutil "github.com/pingcap/tidb-operator/v2/pkg/apiutil/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/client" + "github.com/pingcap/tidb-operator/v2/pkg/compatibility" + "github.com/pingcap/tidb-operator/v2/pkg/runtime/scope" + tidbapi "github.com/pingcap/tidb-operator/v2/pkg/tidbapi/v1" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +const ( + smoothUpgradeRequestTimeout = 10 * time.Second + smoothUpgradeRetryInterval = 10 * time.Second +) + +// tidbClientFactory creates a TiDB HTTP client for the given instance. +// Accepting this as a parameter enables test injection without changing task semantics. +type tidbClientFactory func(ctx context.Context, c client.Client, ck *v1alpha1.Cluster, tidb *v1alpha1.TiDB) (tidbapi.TiDBClient, error) + +// TaskSmoothUpgradeStart calls /upgrade/start on a healthy TiDB instance before rolling upgrade begins. +// It is a no-op when the change is not a version upgrade, or when either the source or target version +// does not support smooth upgrade (< v7.5.0). +func TaskSmoothUpgradeStart(state *ReconcileContext, c client.Client) task.Task { + return taskSmoothUpgradeStart(state, c, newTiDBClientForGroup) +} + +func taskSmoothUpgradeStart(state *ReconcileContext, c client.Client, factory tidbClientFactory) task.Task { + return task.NameTaskFunc("SmoothUpgradeStart", func(ctx context.Context) task.Result { + dbg := state.TiDBGroup() + + if !needVersionUpgrade(dbg) { + return task.Complete().With("not a version upgrade, skipping smooth upgrade start") + } + if !compatibility.SupportsSmoothUpgrade(dbg.Status.Version) || + !compatibility.SupportsSmoothUpgrade(dbg.Spec.Template.Spec.Version) { + return task.Complete().With("version does not support smooth upgrade, skipping") + } + if dbg.Annotations[v1alpha1.AnnoKeySmoothUpgradePhase] == v1alpha1.AnnoValSmoothUpgradePhaseInProgress { + return task.Complete().With("smooth upgrade already started") + } + + tidb := pickReadyTiDB(state.TiDBSlice()) + if tidb == nil { + return task.Retry(smoothUpgradeRetryInterval).With("no ready TiDB instance available for upgrade/start") + } + + tidbClient, err := factory(ctx, c, state.Cluster(), tidb) + if err != nil { + return task.Retry(smoothUpgradeRetryInterval).With("cannot create TiDB client for upgrade/start: %v", err) + } + + if err := tidbClient.UpgradeStart(ctx, dbg.Spec.Template.Spec.Keyspace); err != nil { + return task.Retry(smoothUpgradeRetryInterval).With("upgrade/start failed, will retry: %v", err) + } + + phase := v1alpha1.AnnoValSmoothUpgradePhaseInProgress + if err := patchSmoothUpgradeAnnotation(ctx, c, dbg, &phase); err != nil { + return task.Fail().With("failed to set smooth upgrade annotation: %w", err) + } + + return task.Complete().With("smooth upgrade started, DDL paused") + }) +} + +// TaskSmoothUpgradeFinish calls /upgrade/finish on a healthy TiDB instance after all pods are upgraded. +// It must run after TaskStatusRevisionAndReplicas so that dbg.Status.Version reflects the new version, +// making needVersionUpgrade() return false as the "all done" signal. +func TaskSmoothUpgradeFinish(state *ReconcileContext, c client.Client) task.Task { + return taskSmoothUpgradeFinish(state, c, newTiDBClientForGroup) +} + +func taskSmoothUpgradeFinish(state *ReconcileContext, c client.Client, factory tidbClientFactory) task.Task { + return task.NameTaskFunc("SmoothUpgradeFinish", func(ctx context.Context) task.Result { + dbg := state.TiDBGroup() + + if dbg.Annotations[v1alpha1.AnnoKeySmoothUpgradePhase] != v1alpha1.AnnoValSmoothUpgradePhaseInProgress { + return task.Complete().With("no smooth upgrade in progress") + } + if needVersionUpgrade(dbg) { + return task.Complete().With("upgrade still in progress, finish not yet") + } + + tidb := pickReadyTiDB(state.TiDBSlice()) + if tidb == nil { + return task.Retry(smoothUpgradeRetryInterval).With("no ready TiDB instance available for upgrade/finish") + } + + tidbClient, err := factory(ctx, c, state.Cluster(), tidb) + if err != nil { + return task.Retry(smoothUpgradeRetryInterval).With("cannot create TiDB client for upgrade/finish: %v", err) + } + + if err := tidbClient.UpgradeFinish(ctx); err != nil { + return task.Retry(smoothUpgradeRetryInterval).With("upgrade/finish failed, will retry: %v", err) + } + + if err := patchSmoothUpgradeAnnotation(ctx, c, dbg, nil); err != nil { + return task.Fail().With("failed to remove smooth upgrade annotation: %w", err) + } + + return task.Complete().With("smooth upgrade finished, DDL resumed") + }) +} + +// pickReadyTiDB returns the first TiDB instance that is in the Ready state. +func pickReadyTiDB(dbs []*v1alpha1.TiDB) *v1alpha1.TiDB { + for _, db := range dbs { + if coreutil.IsReady[scope.TiDB](db) { + return db + } + } + return nil +} + +// newTiDBClientForGroup creates a TiDB HTTP client targeting the given TiDB instance. +func newTiDBClientForGroup(ctx context.Context, c client.Client, ck *v1alpha1.Cluster, tidb *v1alpha1.TiDB) (tidbapi.TiDBClient, error) { + url := coreutil.InstanceAdvertiseURL[scope.TiDB](ck, tidb, coreutil.TiDBStatusPort(tidb)) + if !coreutil.IsTLSClusterEnabled(ck) { + return tidbapi.NewTiDBClient(url, smoothUpgradeRequestTimeout, nil), nil + } + tlsConfig, err := apicall.GetClientTLSConfig(ctx, c, ck) + if err != nil { + return nil, fmt.Errorf("cannot get TLS config: %w", err) + } + return tidbapi.NewTiDBClient(url, smoothUpgradeRequestTimeout, tlsConfig), nil +} + +type annotationPatch struct { + Metadata annotationPatchMetadata `json:"metadata"` +} + +type annotationPatchMetadata struct { + ResourceVersion string `json:"resourceVersion"` + Annotations map[string]*string `json:"annotations"` +} + +// patchSmoothUpgradeAnnotation sets (value non-nil) or deletes (value nil) the smooth upgrade annotation. +func patchSmoothUpgradeAnnotation(ctx context.Context, c client.Client, dbg *v1alpha1.TiDBGroup, value *string) error { + p := annotationPatch{ + Metadata: annotationPatchMetadata{ + ResourceVersion: dbg.GetResourceVersion(), + Annotations: map[string]*string{ + v1alpha1.AnnoKeySmoothUpgradePhase: value, + }, + }, + } + data, err := json.Marshal(&p) + if err != nil { + return fmt.Errorf("invalid patch: %w", err) + } + if err := c.Patch(ctx, dbg, client.RawPatch(types.MergePatchType, data)); err != nil { + return fmt.Errorf("cannot patch smooth upgrade annotation on %s/%s: %w", dbg.Namespace, dbg.Name, err) + } + return nil +} diff --git a/pkg/controllers/tidbgroup/tasks/upgrade_test.go b/pkg/controllers/tidbgroup/tasks/upgrade_test.go new file mode 100644 index 00000000000..e8714dda292 --- /dev/null +++ b/pkg/controllers/tidbgroup/tasks/upgrade_test.go @@ -0,0 +1,358 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tasks + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/pingcap/tidb-operator/api/v2/core/v1alpha1" + "github.com/pingcap/tidb-operator/v2/pkg/client" + tidbapi "github.com/pingcap/tidb-operator/v2/pkg/tidbapi/v1" + "github.com/pingcap/tidb-operator/v2/pkg/utils/fake" + "github.com/pingcap/tidb-operator/v2/pkg/utils/task/v3" +) + +// fakeTiDBClient is a minimal tidbapi.TiDBClient for upgrade task tests. +type fakeTiDBClient struct { + upgradeStartCalled bool + upgradeStartErr error + upgradeStartKS string + + upgradeFinishCalled bool + upgradeFinishErr error +} + +func (f *fakeTiDBClient) GetHealth(_ context.Context) (bool, error) { return true, nil } +func (f *fakeTiDBClient) GetInfo(_ context.Context) (*tidbapi.ServerInfo, error) { + return &tidbapi.ServerInfo{}, nil +} +func (f *fakeTiDBClient) SetServerLabels(_ context.Context, _ map[string]string) error { return nil } +func (f *fakeTiDBClient) GetPoolStatus(_ context.Context) (*tidbapi.PoolStatus, error) { + return &tidbapi.PoolStatus{State: tidbapi.PoolStateActivated}, nil +} +func (f *fakeTiDBClient) Activate(_ context.Context, _ string) error { return nil } + +func (f *fakeTiDBClient) UpgradeStart(_ context.Context, keyspace string) error { + f.upgradeStartCalled = true + f.upgradeStartKS = keyspace + return f.upgradeStartErr +} + +func (f *fakeTiDBClient) UpgradeFinish(_ context.Context) error { + f.upgradeFinishCalled = true + return f.upgradeFinishErr +} + +var _ tidbapi.TiDBClient = (*fakeTiDBClient)(nil) + +// fakeReadyTiDB creates a TiDB instance with the Ready condition set to true. +func fakeReadyTiDB() *v1alpha1.TiDB { + return fake.FakeObj("tidb-0", func(obj *v1alpha1.TiDB) *v1alpha1.TiDB { + obj.Status.Conditions = append(obj.Status.Conditions, metav1.Condition{ + Type: v1alpha1.CondReady, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Unix(0, 0), + }) + return obj + }) +} + +func TestTaskSmoothUpgradeStart(t *testing.T) { + cases := []struct { + desc string + dbg *v1alpha1.TiDBGroup + dbs []*v1alpha1.TiDB + apiErr error + expectedStatus task.Status + wantAPICalled bool + wantKeyspace string + wantAnnotation bool + }{ + { + desc: "scale only — no version upgrade", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v8.0.0" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: false, + }, + { + desc: "source version < v7.5, skip", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v7.5.0" + o.Status.Version = "v7.4.0" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: false, + }, + { + desc: "target version < v7.5, skip", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v7.4.0" + o.Status.Version = "v7.5.0" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: false, + }, + { + desc: "annotation already set — idempotent", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + o.Annotations = map[string]string{ + v1alpha1.AnnoKeySmoothUpgradePhase: v1alpha1.AnnoValSmoothUpgradePhaseInProgress, + } + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: false, + }, + { + desc: "no ready instance — retry", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + return o + }), + dbs: []*v1alpha1.TiDB{}, + expectedStatus: task.SRetry, + wantAPICalled: false, + }, + { + desc: "API error — retry", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + apiErr: fmt.Errorf("connection refused"), + expectedStatus: task.SRetry, + wantAPICalled: true, + }, + { + desc: "Dedicated (empty keyspace) — success", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: true, + wantKeyspace: "", + wantAnnotation: true, + }, + { + desc: "Premium tenant keyspace — success", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + o.Spec.Template.Spec.Keyspace = "tenant1" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: true, + wantKeyspace: "tenant1", + wantAnnotation: true, + }, + { + desc: "Premium system keyspace — success", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + o.Spec.Template.Spec.Keyspace = "system" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: true, + wantKeyspace: "system", + wantAnnotation: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fakeClient := &fakeTiDBClient{upgradeStartErr: c.apiErr} + + factory := func(_ context.Context, _ client.Client, _ *v1alpha1.Cluster, _ *v1alpha1.TiDB) (tidbapi.TiDBClient, error) { + return fakeClient, nil + } + + ctx := context.Background() + fc := client.NewFakeClient(c.dbg, fake.FakeObj[v1alpha1.Cluster]("cluster")) + + rc := &ReconcileContext{ + State: &state{ + dbg: c.dbg, + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: c.dbs, + }, + } + + res, done := task.RunTask(ctx, taskSmoothUpgradeStart(rc, fc, factory)) + assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.wantAPICalled, fakeClient.upgradeStartCalled, c.desc) + if c.wantAPICalled && c.apiErr == nil { + assert.Equal(tt, c.wantKeyspace, fakeClient.upgradeStartKS, c.desc) + } + + if c.wantAnnotation { + updated := &v1alpha1.TiDBGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKeyFromObject(c.dbg), updated)) + assert.Equal(tt, v1alpha1.AnnoValSmoothUpgradePhaseInProgress, + updated.Annotations[v1alpha1.AnnoKeySmoothUpgradePhase], c.desc) + } + }) + } +} + +func TestTaskSmoothUpgradeFinish(t *testing.T) { + cases := []struct { + desc string + dbg *v1alpha1.TiDBGroup + dbs []*v1alpha1.TiDB + apiErr error + expectedStatus task.Status + wantAPICalled bool + wantAnnotationGone bool + }{ + { + desc: "no annotation — no-op", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v8.0.0" + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: false, + }, + { + desc: "annotation set but upgrade still in progress", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v7.5.0" + o.Annotations = map[string]string{ + v1alpha1.AnnoKeySmoothUpgradePhase: v1alpha1.AnnoValSmoothUpgradePhaseInProgress, + } + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: false, + }, + { + desc: "no ready instance — retry", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v8.0.0" + o.Annotations = map[string]string{ + v1alpha1.AnnoKeySmoothUpgradePhase: v1alpha1.AnnoValSmoothUpgradePhaseInProgress, + } + return o + }), + dbs: []*v1alpha1.TiDB{}, + expectedStatus: task.SRetry, + wantAPICalled: false, + }, + { + desc: "API error — retry", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v8.0.0" + o.Annotations = map[string]string{ + v1alpha1.AnnoKeySmoothUpgradePhase: v1alpha1.AnnoValSmoothUpgradePhaseInProgress, + } + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + apiErr: fmt.Errorf("connection refused"), + expectedStatus: task.SRetry, + wantAPICalled: true, + }, + { + desc: "success — annotation removed", + dbg: fake.FakeObj("dbg", func(o *v1alpha1.TiDBGroup) *v1alpha1.TiDBGroup { + o.Spec.Template.Spec.Version = "v8.0.0" + o.Status.Version = "v8.0.0" + o.Annotations = map[string]string{ + v1alpha1.AnnoKeySmoothUpgradePhase: v1alpha1.AnnoValSmoothUpgradePhaseInProgress, + } + return o + }), + dbs: []*v1alpha1.TiDB{fakeReadyTiDB()}, + expectedStatus: task.SComplete, + wantAPICalled: true, + wantAnnotationGone: true, + }, + } + + for i := range cases { + c := &cases[i] + t.Run(c.desc, func(tt *testing.T) { + tt.Parallel() + + fakeClient := &fakeTiDBClient{upgradeFinishErr: c.apiErr} + + factory := func(_ context.Context, _ client.Client, _ *v1alpha1.Cluster, _ *v1alpha1.TiDB) (tidbapi.TiDBClient, error) { + return fakeClient, nil + } + + ctx := context.Background() + fc := client.NewFakeClient(c.dbg, fake.FakeObj[v1alpha1.Cluster]("cluster")) + + rc := &ReconcileContext{ + State: &state{ + dbg: c.dbg, + cluster: fake.FakeObj[v1alpha1.Cluster]("cluster"), + dbs: c.dbs, + }, + } + + res, done := task.RunTask(ctx, taskSmoothUpgradeFinish(rc, fc, factory)) + assert.Equal(tt, c.expectedStatus.String(), res.Status().String(), c.desc) + assert.False(tt, done, c.desc) + assert.Equal(tt, c.wantAPICalled, fakeClient.upgradeFinishCalled, c.desc) + + if c.wantAnnotationGone { + updated := &v1alpha1.TiDBGroup{} + require.NoError(tt, fc.Get(ctx, client.ObjectKeyFromObject(c.dbg), updated)) + _, hasAnno := updated.Annotations[v1alpha1.AnnoKeySmoothUpgradePhase] + assert.False(tt, hasAnno, c.desc) + } + }) + } +} diff --git a/pkg/tidbapi/v1/client.go b/pkg/tidbapi/v1/client.go index afd6f7d99ef..d24efff5d44 100644 --- a/pkg/tidbapi/v1/client.go +++ b/pkg/tidbapi/v1/client.go @@ -33,6 +33,8 @@ const ( labelsPath = "labels" tidbPoolActivatePath = "tidb-pool/activate" tidbPoolStatusPath = "tidb-pool/status" + upgradeStartPath = "upgrade/start" + upgradeFinishPath = "upgrade/finish" ) // TiDBClient provides TiDB server's APIs used by TiDB Operator. @@ -48,6 +50,12 @@ type TiDBClient interface { GetPoolStatus(ctx context.Context) (*PoolStatus, error) // Activate sets the keyspace of a standby TiDB instance. Activate(ctx context.Context, keyspace string) error + + // UpgradeStart notifies TiDB to pause DDL before a rolling upgrade. + // keyspace is empty for Dedicated (global pause) or a keyspace name for Premium (scoped pause). + UpgradeStart(ctx context.Context, keyspace string) error + // UpgradeFinish notifies TiDB to resume DDL after all instances are upgraded. + UpgradeFinish(ctx context.Context) error } // tidbClient is the default implementation of TiDBClient. @@ -132,6 +140,22 @@ func (c *tidbClient) Activate(ctx context.Context, keyspace string) error { return err } +func (c *tidbClient) UpgradeStart(ctx context.Context, keyspace string) error { + buffer := bytes.NewBuffer(nil) + if err := json.NewEncoder(buffer).Encode(&UpgradeRequest{KeyspaceName: keyspace}); err != nil { + return fmt.Errorf("encode upgrade request to json failed: %w", err) + } + apiURL := fmt.Sprintf("%s/%s", c.url, upgradeStartPath) + _, err := httputil.PostBodyOK(ctx, c.httpClient, apiURL, buffer) + return err +} + +func (c *tidbClient) UpgradeFinish(ctx context.Context) error { + apiURL := fmt.Sprintf("%s/%s", c.url, upgradeFinishPath) + _, err := httputil.PostBodyOK(ctx, c.httpClient, apiURL, bytes.NewBuffer(nil)) + return err +} + func (c *tidbClient) GetPoolStatus(ctx context.Context) (*PoolStatus, error) { apiURL := fmt.Sprintf("%s/%s", c.url, tidbPoolStatusPath) body, err := httputil.GetBodyOK(ctx, c.httpClient, apiURL) diff --git a/pkg/tidbapi/v1/client_test.go b/pkg/tidbapi/v1/client_test.go index caf46fd80a6..53ccbf9c2fd 100644 --- a/pkg/tidbapi/v1/client_test.go +++ b/pkg/tidbapi/v1/client_test.go @@ -158,3 +158,93 @@ func TestTiDBClient_GetPoolStatus(t *testing.T) { }) } } + +func TestTiDBClient_UpgradeStart(t *testing.T) { + cases := []struct { + desc string + keyspace string + wantBody string + respCode int + wantErr bool + }{ + { + desc: "dedicated (empty keyspace)", + keyspace: "", + wantBody: `{}`, + respCode: http.StatusOK, + }, + { + desc: "premium tenant keyspace", + keyspace: "tenant1", + wantBody: `{"keyspace_name":"tenant1"}`, + respCode: http.StatusOK, + }, + { + desc: "server error", + keyspace: "", + respCode: http.StatusInternalServerError, + wantErr: true, + }, + } + for i := range cases { + c := cases[i] + t.Run(c.desc, func(tt *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(tt, "/upgrade/start", r.URL.Path) + assert.Equal(tt, http.MethodPost, r.Method) + if c.wantBody != "" { + body, err := io.ReadAll(r.Body) + assert.NoError(tt, err) + assert.JSONEq(tt, c.wantBody, string(body)) + } + w.WriteHeader(c.respCode) + })) + defer server.Close() + + client := NewTiDBClient(server.URL, 5*time.Second, nil) + err := client.UpgradeStart(context.Background(), c.keyspace) + if c.wantErr { + assert.Error(tt, err) + } else { + assert.NoError(tt, err) + } + }) + } +} + +func TestTiDBClient_UpgradeFinish(t *testing.T) { + cases := []struct { + desc string + respCode int + wantErr bool + }{ + { + desc: "success", + respCode: http.StatusOK, + }, + { + desc: "server error", + respCode: http.StatusInternalServerError, + wantErr: true, + }, + } + for i := range cases { + c := cases[i] + t.Run(c.desc, func(tt *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(tt, "/upgrade/finish", r.URL.Path) + assert.Equal(tt, http.MethodPost, r.Method) + w.WriteHeader(c.respCode) + })) + defer server.Close() + + client := NewTiDBClient(server.URL, 5*time.Second, nil) + err := client.UpgradeFinish(context.Background()) + if c.wantErr { + assert.Error(tt, err) + } else { + assert.NoError(tt, err) + } + }) + } +} diff --git a/pkg/tidbapi/v1/types.go b/pkg/tidbapi/v1/types.go index 20a350b3a19..73a95d1c0e5 100644 --- a/pkg/tidbapi/v1/types.go +++ b/pkg/tidbapi/v1/types.go @@ -33,6 +33,12 @@ type ActivateRequest struct { TiDBEnableDDL bool `json:"tidb_enable_ddl"` } +// UpgradeRequest is the request body for /upgrade/start. +// KeyspaceName is optional: empty means global DDL pause (Dedicated), non-empty scopes the pause to that keyspace (Premium). +type UpgradeRequest struct { + KeyspaceName string `json:"keyspace_name,omitempty"` +} + type PoolStatus struct { State PoolState `json:"state"` // unused