diff --git a/kubernetes/Makefile b/kubernetes/Makefile index 648f1d2fd..4cab32992 100644 --- a/kubernetes/Makefile +++ b/kubernetes/Makefile @@ -337,7 +337,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified .PHONY: deploy deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config. cd config/manager && $(KUSTOMIZE) edit set image controller=${CONTROLLER_IMG} - $(KUSTOMIZE) build config/default | $(KUBECTL) apply -f - + $(KUSTOMIZE) build config/default | sed 's|TASK_EXECUTOR_IMAGE_PLACEHOLDER|$(TASK_EXECUTOR_IMG)|g' | $(KUBECTL) apply -f - .PHONY: undeploy undeploy: kustomize ## Undeploy controller from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion. diff --git a/kubernetes/README-ZH.md b/kubernetes/README-ZH.md index 0ac8fc0f4..51fe89fc4 100644 --- a/kubernetes/README-ZH.md +++ b/kubernetes/README-ZH.md @@ -30,6 +30,94 @@ Pool 自定义资源维护一个预热的计算资源池,以实现快速沙箱 - 基于需求的自动资源分配和释放 - 实时状态监控,显示总数、已分配和可用资源 +### Pod 回收策略 + +Pool CRD 支持可配置的 Pod 回收策略,用于确定 BatchSandbox 删除时如何处理 Pod: + +#### 策略类型 + +| 策略 | 描述 | +|------|------| +| `Delete`(默认) | BatchSandbox 删除时直接删除 Pod | +| `Reuse` | 重置 Pod 后归还资源池以供复用 | + +#### Reuse 策略的重要行为变更 + +使用 `podRecyclePolicy: Reuse` 时,控制器会自动对 Pod 规格进行以下修改: + +| 变更 | 原因 | +|------|------| +| `restartPolicy` 从 `Never` 改为 `Always` | 重置时需要重启容器 | +| `shareProcessNamespace` 设置为 `true` | Sidecar 通信所需 | +| 添加 `SYS_PTRACE` capability | nsenter 访问容器命名空间所需 | +| 注入 `task-executor` sidecar | 处理 Pod 重置操作 | +| 添加 `sandbox-storage` volume | 主容器与 sidecar 的共享存储 | + +#### Reuse 策略的前置条件 + +使用 `Reuse` 策略时,**必须**在部署控制器时配置 `task-executor-image` 参数: + +```sh +# 使用 Helm +helm install opensandbox-controller ./charts/opensandbox-controller \ + --set controller.taskExecutorImage=/opensandbox-task-executor: \ + --namespace opensandbox-system + +# 使用 Kustomize +make deploy CONTROLLER_IMG=/opensandbox-controller: \ + TASK_EXECUTOR_IMG=/opensandbox-task-executor: +``` + +> **注意**:如果未配置 `task-executor-image`,`Reuse` 策略将降级为 `Delete` 并输出警告日志。 + +#### 配置示例 + +```yaml +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: reuse-pool +spec: + podRecyclePolicy: Reuse # 启用 Pod 复用 + resetSpec: + mainContainerName: sandbox-container # 可选:默认为第一个容器 + cleanDirectories: # 可选:重置时清理的目录 + - "/tmp/*" + - "/var/cache/**" + timeoutSeconds: 60 # 可选:10-600 秒,默认 60 + template: + spec: + containers: + - name: sandbox-container + image: ubuntu:latest + command: ["sleep", "3600"] + capacitySpec: + bufferMax: 10 + bufferMin: 2 + poolMax: 20 + poolMin: 5 +``` + +#### 重置工作流程 + +当使用 `Reuse` 策略的 BatchSandbox 被删除时: + +1. **停止任务**:停止 Pod 中所有正在运行的任务 +2. **清理目录**:清理指定的目录(支持 glob 模式) +3. **重启主容器**:通过 SIGTERM/SIGKILL 重启主容器 +4. **归还资源池**:Pod 被归还到资源池以供复用 + +#### 资源池状态 + +资源池状态包含 `resetting` 字段,用于追踪正在重置的 Pod: + +```sh +kubectl get pool reuse-pool + +NAME TOTAL ALLOCATED AVAILABLE RESETTING AGE +reuse-pool 10 3 5 2 5m +``` + ### 任务编排 集成的任务管理系统,在沙箱内执行自定义工作负载: - **可选执行**:任务调度完全可选 - 可以在不带任务的情况下创建沙箱 diff --git a/kubernetes/README.md b/kubernetes/README.md index c7f56bfa8..33df9f63e 100644 --- a/kubernetes/README.md +++ b/kubernetes/README.md @@ -30,6 +30,94 @@ The Pool custom resource maintains a pool of pre-warmed compute resources to ena - Automatic resource allocation and deallocation based on demand - Real-time status monitoring showing total, allocated, and available resources +### Pod Recycle Policy + +The Pool CRD supports configurable pod recycle policies that determine how pods are handled when a BatchSandbox is deleted: + +#### Policy Types + +| Policy | Description | +|--------|-------------| +| `Delete` (default) | Delete the pod directly when BatchSandbox is deleted | +| `Reuse` | Reset the pod and return it to the pool for reuse | + +#### Important Behavioral Changes for Reuse Policy + +When using `podRecyclePolicy: Reuse`, the controller automatically makes the following changes to pod specs: + +| Change | Reason | +|--------|--------| +| `restartPolicy` changed from `Never` to `Always` | Required for container restart during reset | +| `shareProcessNamespace` set to `true` | Required for sidecar communication | +| `SYS_PTRACE` capability added | Required for nsenter to access container namespaces | +| `task-executor` sidecar injected | Handles pod reset operations | +| `sandbox-storage` volume added | Shared storage between main container and sidecar | + +#### Prerequisites for Reuse Policy + +To use the `Reuse` policy, you **must** configure the `task-executor-image` parameter when deploying the controller: + +```sh +# Using Helm +helm install opensandbox-controller ./charts/opensandbox-controller \ + --set controller.taskExecutorImage=/opensandbox-task-executor: \ + --namespace opensandbox-system + +# Using Kustomize +make deploy CONTROLLER_IMG=/opensandbox-controller: \ + TASK_EXECUTOR_IMG=/opensandbox-task-executor: +``` + +> **Note**: If `task-executor-image` is not configured, the `Reuse` policy will fall back to `Delete` with a warning log. + +#### Configuration Example + +```yaml +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: reuse-pool +spec: + podRecyclePolicy: Reuse # Enable pod reuse + resetSpec: + mainContainerName: sandbox-container # Optional: defaults to first container + cleanDirectories: # Optional: directories to clean during reset + - "/tmp/*" + - "/var/cache/**" + timeoutSeconds: 60 # Optional: 10-600 seconds, default 60 + template: + spec: + containers: + - name: sandbox-container + image: ubuntu:latest + command: ["sleep", "3600"] + capacitySpec: + bufferMax: 10 + bufferMin: 2 + poolMax: 20 + poolMin: 5 +``` + +#### How Reset Works + +When a BatchSandbox with `Reuse` policy is deleted: + +1. **Stop Tasks**: All running tasks in the pod are stopped +2. **Clean Directories**: Specified directories are cleaned (supports glob patterns) +3. **Restart Main Container**: The main container is restarted via SIGTERM/SIGKILL +4. **Return to Pool**: The pod is returned to the pool for reuse + +#### Pool Status with Reset + +The Pool status includes a `resetting` field to track pods being reset: + +```sh +kubectl get pool reuse-pool + +NAME TOTAL ALLOCATED AVAILABLE RESETTING AGE +reuse-pool 10 3 5 2 5m +``` + ### Task Orchestration Integrated task management system that executes custom workloads within sandboxes: - **Optional Execution**: Task scheduling is completely optional - sandboxes can be created without tasks diff --git a/kubernetes/apis/sandbox/v1alpha1/pool_types.go b/kubernetes/apis/sandbox/v1alpha1/pool_types.go index 3c8b7e812..9d0643e34 100644 --- a/kubernetes/apis/sandbox/v1alpha1/pool_types.go +++ b/kubernetes/apis/sandbox/v1alpha1/pool_types.go @@ -32,6 +32,19 @@ type PoolSpec struct { // CapacitySpec controls the size of the resource pool. // +kubebuilder:validation:Required CapacitySpec CapacitySpec `json:"capacitySpec"` + // PodRecyclePolicy specifies how to handle allocated Pods when a pooled BatchSandbox is deleted. + // - Delete: (default) Delete the allocated Pod directly. + // - Reuse: Reset the Pod before returning it to the pool; if reset fails, delete it. + // Note: Reuse policy requires task-executor image to be configured in controller. + // If not configured, pods will be deleted with a warning log. + // +optional + // +kubebuilder:default=Delete + // +kubebuilder:validation:Enum=Delete;Reuse + PodRecyclePolicy PodRecyclePolicy `json:"podRecyclePolicy,omitempty"` + // ResetSpec specifies the reset configuration when PodRecyclePolicy is Reuse. + // Ignored when PodRecyclePolicy is Delete. + // +optional + ResetSpec *ResetSpec `json:"resetSpec,omitempty"` } type CapacitySpec struct { @@ -53,6 +66,37 @@ type CapacitySpec struct { PoolMin int32 `json:"poolMin"` } +// PodRecyclePolicy defines the policy for recycling pooled Pods. +type PodRecyclePolicy string + +const ( + // PodRecyclePolicyDelete deletes the allocated Pod directly. + PodRecyclePolicyDelete PodRecyclePolicy = "Delete" + // PodRecyclePolicyReuse resets the Pod before returning it to the pool. + // Requires task-executor image to be configured in controller. + PodRecyclePolicyReuse PodRecyclePolicy = "Reuse" +) + +// ResetSpec specifies how to reset a Pod before returning it to the pool. +type ResetSpec struct { + // MainContainerName specifies which container is the main container for reset purposes. + // The main container will be restarted during reset. + // If not specified, the first container in the pod template is used. + // +optional + MainContainerName string `json:"mainContainerName,omitempty"` + // CleanDirectories specifies directories to clean during reset. + // Supports glob patterns like "/tmp/*", "/var/cache/**". + // Default: ["/tmp"] + // +optional + CleanDirectories []string `json:"cleanDirectories,omitempty"` + // TimeoutSeconds specifies the timeout for reset operation in seconds. + // +optional + // +kubebuilder:default=60 + // +kubebuilder:validation:Minimum=10 + // +kubebuilder:validation:Maximum=600 + TimeoutSeconds int64 `json:"timeoutSeconds,omitempty"` +} + // PoolStatus defines the observed state of Pool. type PoolStatus struct { // ObservedGeneration is the most recent generation observed for this BatchSandbox. It corresponds to the @@ -66,6 +110,9 @@ type PoolStatus struct { Allocated int32 `json:"allocated"` // Available is the number of nodes currently available in the pool. Available int32 `json:"available"` + // Resetting is the number of Pods currently being reset. + // +optional + Resetting int32 `json:"resetting,omitempty"` } // +genclient @@ -75,6 +122,7 @@ type PoolStatus struct { // +kubebuilder:printcolumn:name="TOTAL",type="integer",JSONPath=".status.total",description="The number of all nodes in pool." // +kubebuilder:printcolumn:name="ALLOCATED",type="integer",JSONPath=".status.allocated",description="The number of allocated nodes in pool." // +kubebuilder:printcolumn:name="AVAILABLE",type="integer",JSONPath=".status.available",description="The number of available nodes in pool." +// +kubebuilder:printcolumn:name="RESETTING",type="integer",JSONPath=".status.resetting",description="The number of pods being reset in pool." // Pool is the Schema for the pools API. type Pool struct { metav1.TypeMeta `json:",inline"` diff --git a/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go b/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go index 82cae84c6..0d21b680c 100644 --- a/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go +++ b/kubernetes/apis/sandbox/v1alpha1/zz_generated.deepcopy.go @@ -233,6 +233,11 @@ func (in *PoolSpec) DeepCopyInto(out *PoolSpec) { (*in).DeepCopyInto(*out) } out.CapacitySpec = in.CapacitySpec + if in.ResetSpec != nil { + in, out := &in.ResetSpec, &out.ResetSpec + *out = new(ResetSpec) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PoolSpec. @@ -292,6 +297,26 @@ func (in *ProcessTask) DeepCopy() *ProcessTask { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResetSpec) DeepCopyInto(out *ResetSpec) { + *out = *in + if in.CleanDirectories != nil { + in, out := &in.CleanDirectories, &out.CleanDirectories + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResetSpec. +func (in *ResetSpec) DeepCopy() *ResetSpec { + if in == nil { + return nil + } + out := new(ResetSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TaskSpec) DeepCopyInto(out *TaskSpec) { *out = *in diff --git a/kubernetes/charts/opensandbox-controller/README.md b/kubernetes/charts/opensandbox-controller/README.md index fe177c14e..ddd8ec98e 100644 --- a/kubernetes/charts/opensandbox-controller/README.md +++ b/kubernetes/charts/opensandbox-controller/README.md @@ -80,6 +80,8 @@ kubectl delete crd pools.sandbox.opensandbox.io | `controller.podLabels` | Additional labels for controller pods | `{}` | | `controller.podAnnotations` | Additional annotations for controller pods | `{}` | | `controller.priorityClassName` | Priority class name for controller pods | `""` | +| `controller.taskExecutorImage` | Task executor image for pod reuse policy (required for Reuse policy) | `""` | +| `controller.taskExecutorResources` | Task executor sidecar resources in format "cpu,memory" | `200m,128Mi` | ### RBAC Parameters @@ -149,6 +151,20 @@ imagePullSecrets: - name: myregistrykey ``` +### Enable Pod Reuse Policy + +To use the `Reuse` pod recycle policy, you must configure the task-executor image: + +```yaml +controller: + taskExecutorImage: myregistry.example.com/opensandbox-task-executor:v0.1.0 + taskExecutorResources: 200m,128Mi # Optional: defaults to "200m,128Mi" +``` + +> **Note**: Without `taskExecutorImage` configured, pools with `podRecyclePolicy: Reuse` will fall back to `Delete` behavior. + +See [Pod Recycle Policy](../README.md#pod-recycle-policy) for more details. + ### Node Affinity ```yaml diff --git a/kubernetes/charts/opensandbox-controller/templates/deployment.yaml b/kubernetes/charts/opensandbox-controller/templates/deployment.yaml index 433e38533..45b73c6fb 100644 --- a/kubernetes/charts/opensandbox-controller/templates/deployment.yaml +++ b/kubernetes/charts/opensandbox-controller/templates/deployment.yaml @@ -58,6 +58,12 @@ spec: {{- if and .Values.controller.kubeClient (gt .Values.controller.kubeClient.burst 0) }} - --kube-client-burst={{ .Values.controller.kubeClient.burst }} {{- end }} + {{- if .Values.controller.taskExecutorImage }} + - --task-executor-image={{ .Values.controller.taskExecutorImage }} + {{- end }} + {{- if .Values.controller.taskExecutorResources }} + - --task-executor-resources={{ .Values.controller.taskExecutorResources }} + {{- end }} ports: - name: health containerPort: 8081 diff --git a/kubernetes/charts/opensandbox-controller/values.yaml b/kubernetes/charts/opensandbox-controller/values.yaml index 66b177645..c143ecf93 100644 --- a/kubernetes/charts/opensandbox-controller/values.yaml +++ b/kubernetes/charts/opensandbox-controller/values.yaml @@ -104,6 +104,15 @@ controller: # -- Priority class name for controller pods priorityClassName: "" + # -- Task executor image for pod reuse policy support. + # Required when using Pool with podRecyclePolicy: Reuse. + # If not configured, Reuse policy will fall back to Delete with a warning log. + taskExecutorImage: "" + + # -- Task executor sidecar resources in format "cpu,memory". + # Example: "200m,128Mi". Both request and limit will be set to the same value. + taskExecutorResources: "200m,128Mi" + # -- Image pull secrets for private registries imagePullSecrets: [] # - name: myregistrykey diff --git a/kubernetes/cmd/controller/main.go b/kubernetes/cmd/controller/main.go index 1e95cc281..2b7ab618d 100644 --- a/kubernetes/cmd/controller/main.go +++ b/kubernetes/cmd/controller/main.go @@ -77,6 +77,10 @@ func main() { var kubeClientQPS float64 var kubeClientBurst int + // Task executor image for pod reset support + var taskExecutorImage string + var taskExecutorResources string + flag.StringVar(&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ "Use :8443 for HTTPS or :8080 for HTTP, or leave as 0 to disable the metrics service.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -104,6 +108,10 @@ func main() { flag.BoolVar(&logCompress, "log-compress", true, "Compress determines if the rotated log files should be compressed using gzip") flag.Float64Var(&kubeClientQPS, "kube-client-qps", 100, "QPS for Kubernetes client rate limiter.") flag.IntVar(&kubeClientBurst, "kube-client-burst", 200, "Burst for Kubernetes client rate limiter.") + flag.StringVar(&taskExecutorImage, "task-executor-image", "", + "Task executor image for pod reset support. If not set, Reuse policy will be disabled.") + flag.StringVar(&taskExecutorResources, "task-executor-resources", "200m,128Mi", + "Task executor sidecar resources in format 'cpu,memory'. Example: '200m,128Mi'. Both request and limit will be set to the same value.") opts := zap.Options{} opts.BindFlags(flag.CommandLine) @@ -260,10 +268,12 @@ func main() { os.Exit(1) } if err := (&controller.PoolReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor("pool-controller"), - Allocator: controller.NewDefaultAllocator(mgr.GetClient()), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor("pool-controller"), + Allocator: controller.NewDefaultAllocator(mgr.GetClient()), + TaskExecutorImage: taskExecutorImage, + TaskExecutorResources: taskExecutorResources, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Pool") os.Exit(1) diff --git a/kubernetes/cmd/task-executor/main.go b/kubernetes/cmd/task-executor/main.go index 1872b5bc5..ee9a83b74 100644 --- a/kubernetes/cmd/task-executor/main.go +++ b/kubernetes/cmd/task-executor/main.go @@ -70,7 +70,7 @@ func main() { klog.InfoS("task manager started") // Initialize HTTP Handler and Router - handler := server.NewHandler(taskManager, cfg) + handler := server.NewHandler(taskManager, exec, cfg) router := server.NewRouter(handler) // Create HTTP Server diff --git a/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml b/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml index 8b987cada..885ef48e7 100644 --- a/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml +++ b/kubernetes/config/crd/bases/sandbox.opensandbox.io_pools.yaml @@ -27,6 +27,10 @@ spec: jsonPath: .status.available name: AVAILABLE type: integer + - description: The number of pods being reset in pool. + jsonPath: .status.resetting + name: RESETTING + type: integer name: v1alpha1 schema: openAPIV3Schema: @@ -84,6 +88,46 @@ spec: - poolMax - poolMin type: object + podRecyclePolicy: + default: Delete + description: |- + PodRecyclePolicy specifies how to handle allocated Pods when a pooled BatchSandbox is deleted. + - Delete: (default) Delete the allocated Pod directly. + - Reuse: Reset the Pod before returning it to the pool; if reset fails, delete it. + Note: Reuse policy requires task-executor image to be configured in controller. + If not configured, pods will be deleted with a warning log. + enum: + - Delete + - Reuse + type: string + resetSpec: + description: |- + ResetSpec specifies the reset configuration when PodRecyclePolicy is Reuse. + Ignored when PodRecyclePolicy is Delete. + properties: + cleanDirectories: + description: |- + CleanDirectories specifies directories to clean during reset. + Supports glob patterns like "/tmp/*", "/var/cache/**". + Default: ["/tmp"] + items: + type: string + type: array + mainContainerName: + description: |- + MainContainerName specifies which container is the main container for reset purposes. + The main container will be restarted during reset. + If not specified, the first container in the pod template is used. + type: string + timeoutSeconds: + default: 60 + description: TimeoutSeconds specifies the timeout for reset operation + in seconds. + format: int64 + maximum: 600 + minimum: 10 + type: integer + type: object template: description: Pod Template used to create pre-warmed nodes in the pool. x-kubernetes-preserve-unknown-fields: true @@ -109,6 +153,10 @@ spec: BatchSandbox's generation, which is updated on mutation by the API Server. format: int64 type: integer + resetting: + description: Resetting is the number of Pods currently being reset. + format: int32 + type: integer revision: description: Revision is the latest version of pool type: string diff --git a/kubernetes/config/default/kustomization.yaml b/kubernetes/config/default/kustomization.yaml index df90f6f10..73079bbd0 100644 --- a/kubernetes/config/default/kustomization.yaml +++ b/kubernetes/config/default/kustomization.yaml @@ -40,6 +40,11 @@ patches: - path: manager_metrics_patch.yaml target: kind: Deployment +# [TASK-EXECUTOR] Patch to add task-executor-image argument for PodRecyclePolicy Reuse support +- path: manager_task_executor_patch.yaml + target: + kind: Deployment + name: controller-manager # Uncomment the patches line if you enable Metrics and CertManager # [METRICS-WITH-CERTS] To enable metrics protected with certManager, uncomment the following line. diff --git a/kubernetes/config/default/manager_task_executor_patch.yaml b/kubernetes/config/default/manager_task_executor_patch.yaml new file mode 100644 index 000000000..783a4c28a --- /dev/null +++ b/kubernetes/config/default/manager_task_executor_patch.yaml @@ -0,0 +1,16 @@ +# Patch to add task-executor-image argument to the controller manager +# The TASK_EXECUTOR_IMAGE placeholder will be replaced by sed during deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: controller-manager + namespace: system +spec: + template: + spec: + containers: + - name: manager + args: + - --leader-elect + - --health-probe-bind-address=:8081 + - --task-executor-image=TASK_EXECUTOR_IMAGE_PLACEHOLDER diff --git a/kubernetes/config/manager/kustomization.yaml b/kubernetes/config/manager/kustomization.yaml index ae9eb8f4e..fdcb79fa0 100644 --- a/kubernetes/config/manager/kustomization.yaml +++ b/kubernetes/config/manager/kustomization.yaml @@ -4,8 +4,8 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: controller - newTag: dev + newName: example.com/sandbox-k8s + newTag: v0.0.1 - name: manager newName: sandbox-registry.cn-zhangjiakou.cr.aliyuncs.com/opensandbox/controller newTag: v0.0.1 diff --git a/kubernetes/internal/controller/apis.go b/kubernetes/internal/controller/apis.go index c32964aff..30055efba 100644 --- a/kubernetes/internal/controller/apis.go +++ b/kubernetes/internal/controller/apis.go @@ -32,6 +32,19 @@ const ( AnnoPoolAllocGenerationKey = "pool.opensandbox.io/alloc-generation" FinalizerTaskCleanup = "batch-sandbox.sandbox.opensandbox.io/task-cleanup" + + // LabelPodRecycleState indicates the current recycle state of a pooled Pod. + // Possible values: "", "Resetting", "ResetSucceeded", "ResetFailed". + LabelPodRecycleState = "pool.opensandbox.io/recycle-state" + LabelPodReuseEnabled = "pool.opensandbox.io/reuse-enabled" + FinalizerPodDisposal = "batch-sandbox.sandbox.opensandbox.io/pod-disposal" +) + +const ( + PodRecycleStateEmpty = "" + PodRecycleStateResetting = "Resetting" + PodRecycleStateResetSucceeded = "ResetSucceeded" + PodRecycleStateResetFailed = "ResetFailed" ) // AnnotationSandboxEndpoints Use the exported constant from pkg/utils diff --git a/kubernetes/internal/controller/batchsandbox_controller.go b/kubernetes/internal/controller/batchsandbox_controller.go index 6008d3cde..787a4d9dd 100644 --- a/kubernetes/internal/controller/batchsandbox_controller.go +++ b/kubernetes/internal/controller/batchsandbox_controller.go @@ -51,6 +51,7 @@ import ( "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/expectations" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/fieldindex" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/requeueduration" + taskexecutor "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) var ( @@ -58,6 +59,21 @@ var ( DurationStore = requeueduration.DurationStore{} ) +// ensureFinalizer ensures the finalizer is added to the object. +// Returns true if the finalizer was added (false if already exists). +func (r *BatchSandboxReconciler) ensureFinalizer(ctx context.Context, obj client.Object, finalizer string) (bool, error) { + log := logf.FromContext(ctx) + if controllerutil.ContainsFinalizer(obj, finalizer) { + return false, nil + } + if err := utils.UpdateFinalizer(r.Client, obj, utils.AddFinalizerOpType, finalizer); err != nil { + log.Error(err, "failed to add finalizer", "finalizer", finalizer) + return false, err + } + log.Info("added finalizer", "finalizer", finalizer) + return true, nil +} + // BatchSandboxReconciler reconciles a BatchSandbox object type BatchSandboxReconciler struct { client.Client @@ -123,18 +139,37 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request // handle finalizers if batchSbx.DeletionTimestamp == nil { + // Add task cleanup finalizer for BatchSandbox with task scheduling if taskStrategy.NeedTaskScheduling() { - if !controllerutil.ContainsFinalizer(batchSbx, FinalizerTaskCleanup) { - err := utils.UpdateFinalizer(r.Client, batchSbx, utils.AddFinalizerOpType, FinalizerTaskCleanup) - if err != nil { - log.Error(err, "failed to add finalizer", "finalizer", FinalizerTaskCleanup) - } else { - log.Info("added finalizer", "finalizer", FinalizerTaskCleanup) - } + if added, err := r.ensureFinalizer(ctx, batchSbx, FinalizerTaskCleanup); added || err != nil { + return ctrl.Result{}, err + } + } + // Add pod disposal finalizer for pooled BatchSandbox + if poolStrategy.IsPooledMode() { + if added, err := r.ensureFinalizer(ctx, batchSbx, FinalizerPodDisposal); added || err != nil { return ctrl.Result{}, err } } } else { + // Handle pod disposal before final cleanup + // This must be done before returning, regardless of task scheduling needs + if poolStrategy.IsPooledMode() && controllerutil.ContainsFinalizer(batchSbx, FinalizerPodDisposal) { + result, err := r.handlePodDisposal(ctx, batchSbx) + if err != nil { + log.Error(err, "failed to handle pod disposal") + return ctrl.Result{}, err + } + if result.RequeueAfter > 0 { + return result, nil + } + // Pod disposal completed, remove finalizer + if err := utils.UpdateFinalizer(r.Client, batchSbx, utils.RemoveFinalizerOpType, FinalizerPodDisposal); err != nil { + log.Error(err, "failed to remove finalizer", "finalizer", FinalizerPodDisposal) + return ctrl.Result{}, err + } + log.Info("pod disposal completed, removed finalizer", "finalizer", FinalizerPodDisposal) + } if !taskStrategy.NeedTaskScheduling() { return ctrl.Result{}, nil } @@ -557,3 +592,232 @@ func (r *BatchSandboxReconciler) SetupWithManager(mgr ctrl.Manager) error { WithOptions(controller.Options{MaxConcurrentReconciles: 32}). Complete(r) } + +// handlePodDisposal handles pod disposal when a pooled BatchSandbox is being deleted. +// It returns ctrl.Result with RequeueAfter > 0 if the operation is in progress. +func (r *BatchSandboxReconciler) handlePodDisposal(ctx context.Context, batchSbx *sandboxv1alpha1.BatchSandbox) (ctrl.Result, error) { + log := logf.FromContext(ctx) + + pool := &sandboxv1alpha1.Pool{} + if err := r.Get(ctx, types.NamespacedName{Name: batchSbx.Spec.PoolRef, Namespace: batchSbx.Namespace}, pool); err != nil { + if errors.IsNotFound(err) { + log.Info("Pool not found, skipping pod disposal", "pool", batchSbx.Spec.PoolRef) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + alloc, err := parseSandboxAllocation(batchSbx) + if err != nil { + return ctrl.Result{}, err + } + + policy := sandboxv1alpha1.PodRecyclePolicyDelete + if pool.Spec.PodRecyclePolicy != "" { + policy = pool.Spec.PodRecyclePolicy + } + + anyPodResetting := false + + for _, podName := range alloc.Pods { + pod := &corev1.Pod{} + if err := r.Get(ctx, types.NamespacedName{Name: podName, Namespace: batchSbx.Namespace}, pod); err != nil { + if errors.IsNotFound(err) { + continue + } + log.Error(err, "Failed to get pod", "pod", podName) + continue + } + + // supportsReuse is determined by pool controller at pod creation time: + // Pool's PodRecyclePolicy is Reuse and PoolReconciler has TaskExecutorImage configured + supportsReuse := pod.Labels[LabelPodReuseEnabled] == "true" + + if policy == sandboxv1alpha1.PodRecyclePolicyReuse && supportsReuse { + result, err := r.handleReusePolicy(ctx, batchSbx, pool, pod) + if err != nil { + log.Error(err, "Failed to handle reuse policy, falling back to delete", "pod", podName) + if err := r.Delete(ctx, pod); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete pod after reuse policy failure", "pod", podName) + } + continue + } + if result.RequeueAfter > 0 { + anyPodResetting = true + } + } else { + if policy == sandboxv1alpha1.PodRecyclePolicyReuse { + log.Info("Pod does not support reuse, falling back to delete", + "pod", podName, + "pool", pool.Name, + "reason", "task-executor not injected (controller may not have task-executor-image configured)") + } + // Delete the pod directly (don't release back to pool) + if err := r.Delete(ctx, pod); err != nil && !errors.IsNotFound(err) { + log.Error(err, "Failed to delete pod", "pod", podName) + } else { + log.Info("Pod deleted for Delete policy", "pod", podName, "pool", pool.Name) + } + } + } + + if anyPodResetting { + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + + return ctrl.Result{}, nil +} + +// handleReusePolicy handles the Reuse policy for a single pod. +func (r *BatchSandboxReconciler) handleReusePolicy(ctx context.Context, batchSbx *sandboxv1alpha1.BatchSandbox, pool *sandboxv1alpha1.Pool, pod *corev1.Pod) (ctrl.Result, error) { + log := logf.FromContext(ctx) + + recycleState := pod.Labels[LabelPodRecycleState] + + switch recycleState { + case PodRecycleStateResetting: + // Must poll the Reset API to detect completion/failure + log.Info("Pod is being reset, polling status", "pod", pod.Name) + return r.pollResetStatus(ctx, pool, pod) + + case PodRecycleStateResetSucceeded: + // Clear the recycle state label. If cleanup fails, requeue to retry. + log.Info("Pod reset succeeded, clearing recycle state label", "pod", pod.Name) + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + delete(pod.Labels, LabelPodRecycleState) + if err := r.Update(ctx, pod); err != nil { + log.Error(err, "Failed to clear recycle state label", "pod", pod.Name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + return ctrl.Result{}, nil + + case PodRecycleStateResetFailed: + log.Info("Pod reset failed, deleting", "pod", pod.Name) + if err := r.Delete(ctx, pod); err != nil && !errors.IsNotFound(err) { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + + default: + log.Info("Starting pod reset", "pod", pod.Name) + if err := r.startPodReset(ctx, pool, pod); err != nil { + log.Error(err, "Failed to start pod reset", "pod", pod.Name) + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[LabelPodRecycleState] = PodRecycleStateResetFailed + if err := r.Update(ctx, pod); err != nil { + log.Error(err, "Failed to mark pod reset failed", "pod", pod.Name) + } + return ctrl.Result{}, err + } + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } +} + +// callResetAPI calls the task-executor Reset API and returns the response. +func (r *BatchSandboxReconciler) callResetAPI(ctx context.Context, pool *sandboxv1alpha1.Pool, pod *corev1.Pod) (*taskexecutor.ResetResponse, error) { + log := logf.FromContext(ctx) + + timeoutSeconds := int64(60) + cleanDirectories := []string{"/tmp"} + mainContainerName := "" + + if pool.Spec.ResetSpec != nil { + if pool.Spec.ResetSpec.TimeoutSeconds > 0 { + timeoutSeconds = pool.Spec.ResetSpec.TimeoutSeconds + } + if len(pool.Spec.ResetSpec.CleanDirectories) > 0 { + cleanDirectories = pool.Spec.ResetSpec.CleanDirectories + } + mainContainerName = pool.Spec.ResetSpec.MainContainerName + } + + if pod.Status.PodIP == "" { + return nil, fmt.Errorf("pod has no IP assigned") + } + + client := taskexecutor.NewClientWithTimeout( + fmt.Sprintf("http://%s:%s", pod.Status.PodIP, taskexecutor.GetPort()), + time.Duration(timeoutSeconds+30)*time.Second, + ) + + resetReq := &taskexecutor.ResetRequest{ + TimeoutSeconds: timeoutSeconds, + CleanDirectories: cleanDirectories, + MainContainerName: mainContainerName, + } + + log.V(1).Info("Calling task-executor reset API", "pod", pod.Name, "podIP", pod.Status.PodIP) + + return client.Reset(ctx, resetReq) +} + +// handleResetResponse updates the pod state based on the Reset API response. +func (r *BatchSandboxReconciler) handleResetResponse(ctx context.Context, pod *corev1.Pod, resp *taskexecutor.ResetResponse) (ctrl.Result, error) { + log := logf.FromContext(ctx) + + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + + switch resp.Status { + case taskexecutor.ResetStatusSuccess: + log.Info("Pod reset completed successfully", "pod", pod.Name, "details", resp.Details) + pod.Labels[LabelPodRecycleState] = PodRecycleStateResetSucceeded + if err := r.Update(ctx, pod); err != nil { + log.Error(err, "Failed to update pod reset state", "pod", pod.Name) + } + return ctrl.Result{}, nil + + case taskexecutor.ResetStatusInProgress: + log.V(1).Info("Pod reset in progress", "pod", pod.Name) + pod.Labels[LabelPodRecycleState] = PodRecycleStateResetting + if err := r.Update(ctx, pod); err != nil { + log.Error(err, "Failed to update pod reset state", "pod", pod.Name) + } + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + + default: + log.Error(nil, "Pod reset failed", "pod", pod.Name, "status", resp.Status, "message", resp.Message) + pod.Labels[LabelPodRecycleState] = PodRecycleStateResetFailed + if err := r.Update(ctx, pod); err != nil { + log.Error(err, "Failed to update pod reset state", "pod", pod.Name) + } + return ctrl.Result{}, nil + } +} + +// pollResetStatus polls the Reset API to detect completion or failure. +func (r *BatchSandboxReconciler) pollResetStatus(ctx context.Context, pool *sandboxv1alpha1.Pool, pod *corev1.Pod) (ctrl.Result, error) { + resp, err := r.callResetAPI(ctx, pool, pod) + if err != nil { + logf.FromContext(ctx).Error(err, "Failed to poll reset status", "pod", pod.Name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + return r.handleResetResponse(ctx, pod, resp) +} + +// startPodReset starts the reset process for a pod. +func (r *BatchSandboxReconciler) startPodReset(ctx context.Context, pool *sandboxv1alpha1.Pool, pod *corev1.Pod) error { + log := logf.FromContext(ctx) + + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[LabelPodRecycleState] = PodRecycleStateResetting + if err := r.Update(ctx, pod); err != nil { + return err + } + + resp, err := r.callResetAPI(ctx, pool, pod) + if err != nil { + log.Error(err, "Failed to call task-executor reset API", "pod", pod.Name) + return err + } + + _, err = r.handleResetResponse(ctx, pod, resp) + return err +} diff --git a/kubernetes/internal/controller/batchsandbox_controller_test.go b/kubernetes/internal/controller/batchsandbox_controller_test.go index 5767f1cdb..5ee7e318c 100644 --- a/kubernetes/internal/controller/batchsandbox_controller_test.go +++ b/kubernetes/internal/controller/batchsandbox_controller_test.go @@ -52,6 +52,7 @@ import ( taskscheduler "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/scheduler" mock_scheduler "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/scheduler/mock" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/fieldindex" + taskexecutor "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) func init() { @@ -985,3 +986,323 @@ func Test_calPodIndex(t *testing.T) { }) } } + +// ============================================================ +// PodRecyclePolicy Unit Tests +// ============================================================ + +func TestHandleResetResponse_AllStatuses(t *testing.T) { + tests := []struct { + name string + respStatus taskexecutor.ResetStatus + wantLabel string + wantRequeue bool + requeueAfter time.Duration + }{ + { + name: "Success status -> ResetSucceeded label", + respStatus: taskexecutor.ResetStatusSuccess, + wantLabel: PodRecycleStateResetSucceeded, + wantRequeue: false, + }, + { + name: "InProgress status -> Resetting label and requeue", + respStatus: taskexecutor.ResetStatusInProgress, + wantLabel: PodRecycleStateResetting, + wantRequeue: true, + requeueAfter: 5 * time.Second, + }, + { + name: "Failed status -> ResetFailed label", + respStatus: taskexecutor.ResetStatusFailed, + wantLabel: PodRecycleStateResetFailed, + wantRequeue: false, + }, + { + name: "Timeout status -> ResetFailed label", + respStatus: taskexecutor.ResetStatusTimeout, + wantLabel: PodRecycleStateResetFailed, + wantRequeue: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup fake client with pod + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + } + fakeClient := fake.NewClientBuilder().WithScheme(testscheme).WithObjects(pod).Build() + + // Create reconciler + r := &BatchSandboxReconciler{ + Client: fakeClient, + Scheme: testscheme, + } + + // Create response + resp := &taskexecutor.ResetResponse{ + Status: tt.respStatus, + Message: "test message", + } + + // Call handleResetResponse + result, err := r.handleResetResponse(context.Background(), pod, resp) + if err != nil { + t.Errorf("handleResetResponse() error = %v", err) + return + } + + // Verify label + if pod.Labels[LabelPodRecycleState] != tt.wantLabel { + t.Errorf("handleResetResponse() label = %v, want %v", pod.Labels[LabelPodRecycleState], tt.wantLabel) + } + + // Verify requeue + if tt.wantRequeue { + if result.RequeueAfter != tt.requeueAfter { + t.Errorf("handleResetResponse() RequeueAfter = %v, want %v", result.RequeueAfter, tt.requeueAfter) + } + } else { + if result.RequeueAfter != 0 { + t.Errorf("handleResetResponse() should not requeue, got RequeueAfter = %v", result.RequeueAfter) + } + } + }) + } +} + +func TestHandleReusePolicy_StateTransitions(t *testing.T) { + tests := []struct { + name string + initialState string + wantRequeue bool + wantLabelChange string + }{ + { + name: "Empty state -> start reset", + initialState: PodRecycleStateEmpty, + wantRequeue: true, + wantLabelChange: PodRecycleStateResetting, + }, + { + name: "ResetSucceeded state -> clear label", + initialState: PodRecycleStateResetSucceeded, + wantRequeue: false, + }, + { + name: "ResetFailed state -> delete pod", + initialState: PodRecycleStateResetFailed, + wantRequeue: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Setup pool with Reuse policy + pool := &sandboxv1alpha1.Pool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + Spec: sandboxv1alpha1.PoolSpec{ + PodRecyclePolicy: sandboxv1alpha1.PodRecyclePolicyReuse, + }, + } + + // Setup pod with initial state + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{ + LabelPodReuseEnabled: "true", + }, + }, + } + if tt.initialState != "" { + pod.Labels[LabelPodRecycleState] = tt.initialState + } + + // Setup batchsandbox + batchSbx := &sandboxv1alpha1.BatchSandbox{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bs", + Namespace: "default", + }, + Spec: sandboxv1alpha1.BatchSandboxSpec{ + PoolRef: "test-pool", + }, + } + + fakeClient := fake.NewClientBuilder().WithScheme(testscheme).WithObjects(pool, pod, batchSbx).Build() + + r := &BatchSandboxReconciler{ + Client: fakeClient, + Scheme: testscheme, + } + + // For Empty state, we expect startPodReset to be called + // which requires task-executor client. Since we can't mock it easily, + // we just verify the state machine logic for other states. + if tt.initialState == PodRecycleStateEmpty { + // Empty state requires external API call, skip in this test + t.Skip("Empty state requires task-executor API mock") + } + + result, err := r.handleReusePolicy(context.Background(), batchSbx, pool, pod) + if err != nil { + t.Errorf("handleReusePolicy() error = %v", err) + return + } + + // Verify requeue behavior + if tt.wantRequeue { + if result.RequeueAfter == 0 { + t.Errorf("handleReusePolicy() should requeue") + } + } else { + if result.RequeueAfter != 0 { + t.Errorf("handleReusePolicy() should not requeue, got RequeueAfter = %v", result.RequeueAfter) + } + } + + // Verify label changes + if tt.initialState == PodRecycleStateResetSucceeded { + if _, exists := pod.Labels[LabelPodRecycleState]; exists { + t.Errorf("handleReusePolicy() should clear recycle-state label for ResetSucceeded") + } + } + }) + } +} + +func TestHandlePodDisposal_DeletePolicy(t *testing.T) { + // Setup pool with Delete policy + pool := &sandboxv1alpha1.Pool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + Spec: sandboxv1alpha1.PoolSpec{ + PodRecyclePolicy: sandboxv1alpha1.PodRecyclePolicyDelete, + }, + } + + // Setup pod + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + } + + // Setup batchsandbox with allocation + batchSbx := &sandboxv1alpha1.BatchSandbox{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bs", + Namespace: "default", + Annotations: map[string]string{ + AnnoAllocStatusKey: `{"pods":["test-pod"]}`, + }, + }, + Spec: sandboxv1alpha1.BatchSandboxSpec{ + PoolRef: "test-pool", + }, + } + + fakeClient := fake.NewClientBuilder().WithScheme(testscheme).WithObjects(pool, pod, batchSbx).Build() + + r := &BatchSandboxReconciler{ + Client: fakeClient, + Scheme: testscheme, + } + + // Call handlePodDisposal + result, err := r.handlePodDisposal(context.Background(), batchSbx) + if err != nil { + t.Errorf("handlePodDisposal() error = %v", err) + return + } + + // Should not requeue + if result.RequeueAfter != 0 { + t.Errorf("handlePodDisposal() should not requeue for Delete policy") + } + + // Pod should be deleted + err = fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-pod", Namespace: "default"}, &corev1.Pod{}) + if err == nil { + t.Errorf("handlePodDisposal() should delete pod for Delete policy") + } + if !errors.IsNotFound(err) { + t.Errorf("handlePodDisposal() expected NotFound error, got %v", err) + } +} + +func TestHandlePodDisposal_ReusePolicy_Fallback(t *testing.T) { + // Setup pool with Reuse policy + pool := &sandboxv1alpha1.Pool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + Spec: sandboxv1alpha1.PoolSpec{ + PodRecyclePolicy: sandboxv1alpha1.PodRecyclePolicyReuse, + }, + } + + // Setup pod WITHOUT reuse-enabled label (should fallback to delete) + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + // No LabelPodReuseEnabled label + }, + } + + // Setup batchsandbox with allocation + batchSbx := &sandboxv1alpha1.BatchSandbox{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bs", + Namespace: "default", + Annotations: map[string]string{ + AnnoAllocStatusKey: `{"pods":["test-pod"]}`, + }, + }, + Spec: sandboxv1alpha1.BatchSandboxSpec{ + PoolRef: "test-pool", + }, + } + + fakeClient := fake.NewClientBuilder().WithScheme(testscheme).WithObjects(pool, pod, batchSbx).Build() + + r := &BatchSandboxReconciler{ + Client: fakeClient, + Scheme: testscheme, + } + + // Call handlePodDisposal + result, err := r.handlePodDisposal(context.Background(), batchSbx) + if err != nil { + t.Errorf("handlePodDisposal() error = %v", err) + return + } + + // Should not requeue + if result.RequeueAfter != 0 { + t.Errorf("handlePodDisposal() should not requeue") + } + + // Pod should be deleted (fallback) + err = fakeClient.Get(context.Background(), types.NamespacedName{Name: "test-pod", Namespace: "default"}, &corev1.Pod{}) + if err == nil { + t.Errorf("handlePodDisposal() should delete pod (fallback to delete)") + } + if !errors.IsNotFound(err) { + t.Errorf("handlePodDisposal() expected NotFound error, got %v", err) + } +} diff --git a/kubernetes/internal/controller/pool_controller.go b/kubernetes/internal/controller/pool_controller.go index adde178a3..9cc401058 100644 --- a/kubernetes/internal/controller/pool_controller.go +++ b/kubernetes/internal/controller/pool_controller.go @@ -21,11 +21,13 @@ import ( gerrors "errors" "fmt" "sort" + "strings" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -33,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -65,9 +68,11 @@ var ( // PoolReconciler reconciles a Pool object type PoolReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - Allocator Allocator + Scheme *runtime.Scheme + Recorder record.EventRecorder + Allocator Allocator + TaskExecutorImage string // TaskExecutorImage is the image for task-executor sidecar. If empty, Reuse policy is disabled. + TaskExecutorResources string // TaskExecutorResources is the resources for task-executor sidecar in format "cpu,memory". Default: "200m,128Mi" } // +kubebuilder:rbac:groups=sandbox.opensandbox.io,resources=pools,verbs=get;list;watch;create;update;patch;delete @@ -308,6 +313,10 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp idlePods := make([]string, 0) for _, pod := range pods { if _, ok := status.PodAllocation[pod.Name]; !ok { + // Skip pods that are being reset + if pod.Labels[LabelPodRecycleState] == PodRecycleStateResetting { + continue + } idlePods = append(idlePods, pod.Name) } } @@ -406,7 +415,15 @@ func (r *PoolReconciler) scalePool(ctx context.Context, args *scaleArgs) error { func (r *PoolReconciler) updatePoolStatus(ctx context.Context, latestRevision string, pool *sandboxv1alpha1.Pool, pods []*corev1.Pod, podAllocation map[string]string) error { oldStatus := pool.Status.DeepCopy() availableCnt := int32(0) + resettingCnt := int32(0) + for _, pod := range pods { + // Count pods that are being reset + if pod.Labels[LabelPodRecycleState] == PodRecycleStateResetting { + resettingCnt++ + continue // Resetting pods are not available + } + if _, ok := podAllocation[pod.Name]; ok { continue } @@ -415,10 +432,12 @@ func (r *PoolReconciler) updatePoolStatus(ctx context.Context, latestRevision st } availableCnt++ } + pool.Status.ObservedGeneration = pool.Generation pool.Status.Total = int32(len(pods)) pool.Status.Allocated = int32(len(podAllocation)) pool.Status.Available = availableCnt + pool.Status.Resetting = resettingCnt pool.Status.Revision = latestRevision if equality.Semantic.DeepEqual(oldStatus, pool.Status) { return nil @@ -474,8 +493,18 @@ func (r *PoolReconciler) createPoolPod(ctx context.Context, pool *sandboxv1alpha pod.Namespace = pool.Namespace pod.Name = "" pod.GenerateName = pool.Name + "-" + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } pod.Labels[LabelPoolName] = pool.Name pod.Labels[LabelPoolRevision] = latestRevision + + // Inject task-executor sidecar if Reuse policy is enabled and TaskExecutorImage is configured + if pool.Spec.PodRecyclePolicy == sandboxv1alpha1.PodRecyclePolicyReuse && r.TaskExecutorImage != "" { + r.injectTaskExecutor(pod, pool) + pod.Labels[LabelPodReuseEnabled] = "true" + } + if err := ctrl.SetControllerReference(pool, pod, r.Scheme); err != nil { return err } @@ -487,3 +516,123 @@ func (r *PoolReconciler) createPoolPod(ctx context.Context, pool *sandboxv1alpha r.Recorder.Eventf(pool, corev1.EventTypeNormal, "SuccessfulCreate", "Created pool pod: %v", pod.Name) return nil } + +// injectTaskExecutor injects task-executor sidecar into the pod for reset support. +func (r *PoolReconciler) injectTaskExecutor(pod *corev1.Pod, pool *sandboxv1alpha1.Pool) { + // Enable process namespace sharing for sidecar communication + shareProcessNamespace := true + pod.Spec.ShareProcessNamespace = &shareProcessNamespace + + // Ensure restartPolicy allows container restart for Reuse policy. + // If restartPolicy is Never, the container won't restart after SIGTERM. + if pod.Spec.RestartPolicy == corev1.RestartPolicyNever { + klog.InfoS("Changing restartPolicy from Never to Always for Reuse policy support", + "pod", pod.Name, "pool", pool.Name) + pod.Spec.RestartPolicy = corev1.RestartPolicyAlways + } + + mainContainerName := r.getMainContainerName(pool, pod) + + // Add sandbox-storage volume to pod spec (used by both main container and task-executor) + pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ + Name: "sandbox-storage", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }) + + for i := range pod.Spec.Containers { + if pod.Spec.Containers[i].Name == mainContainerName { + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: "SANDBOX_MAIN_CONTAINER", + Value: mainContainerName, + }) + pod.Spec.Containers[i].VolumeMounts = append(pod.Spec.Containers[i].VolumeMounts, + corev1.VolumeMount{ + Name: "sandbox-storage", + MountPath: "/var/lib/sandbox", + }) + break + } + } + + cpu, memory := r.parseTaskExecutorResources() + + taskExecutorContainer := corev1.Container{ + Name: "task-executor", + Image: r.TaskExecutorImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Env: []corev1.EnvVar{ + {Name: "ENABLE_SIDECAR_MODE", Value: "true"}, + {Name: "MAIN_CONTAINER_NAME", Value: mainContainerName}, + }, + // Security context required for nsenter to access other container namespaces + SecurityContext: &corev1.SecurityContext{ + Capabilities: &corev1.Capabilities{ + Add: []corev1.Capability{ + "SYS_PTRACE", // Required for nsenter to access other process namespaces + }, + }, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(cpu), + corev1.ResourceMemory: resource.MustParse(memory), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(cpu), + corev1.ResourceMemory: resource.MustParse(memory), + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "sandbox-storage", + MountPath: "/var/lib/sandbox", + }, + }, + } + pod.Spec.Containers = append(pod.Spec.Containers, taskExecutorContainer) +} + +// parseTaskExecutorResources parses the task-executor resources config. +// Format: "cpu,memory" (e.g., "200m,128Mi") +// Returns (cpu, memory) with defaults "200m", "128Mi" if parsing fails. +func (r *PoolReconciler) parseTaskExecutorResources() (cpu, memory string) { + const ( + defaultCPU = "200m" + defaultMemory = "128Mi" + ) + + if r.TaskExecutorResources == "" { + return defaultCPU, defaultMemory + } + + parts := strings.Split(r.TaskExecutorResources, ",") + if len(parts) != 2 { + klog.InfoS("Invalid task-executor-resources format, using defaults", + "input", r.TaskExecutorResources, "expected", "cpu,memory") + return defaultCPU, defaultMemory + } + + cpu = strings.TrimSpace(parts[0]) + memory = strings.TrimSpace(parts[1]) + + if cpu == "" || memory == "" { + klog.InfoS("Empty cpu or memory in task-executor-resources, using defaults", + "cpu", cpu, "memory", memory) + return defaultCPU, defaultMemory + } + + return cpu, memory +} + +func (r *PoolReconciler) getMainContainerName(pool *sandboxv1alpha1.Pool, pod *corev1.Pod) string { + if pool.Spec.ResetSpec != nil && pool.Spec.ResetSpec.MainContainerName != "" { + return pool.Spec.ResetSpec.MainContainerName + } + if len(pod.Spec.Containers) > 0 { + return pod.Spec.Containers[0].Name + } + return "" +} diff --git a/kubernetes/internal/controller/pool_recycle_test.go b/kubernetes/internal/controller/pool_recycle_test.go new file mode 100644 index 000000000..9b1cf7115 --- /dev/null +++ b/kubernetes/internal/controller/pool_recycle_test.go @@ -0,0 +1,109 @@ +// Copyright 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + sandboxv1alpha1 "github.com/alibaba/OpenSandbox/sandbox-k8s/apis/sandbox/v1alpha1" +) + +func TestInjectTaskExecutor_BasicInjection(t *testing.T) { + pool := &sandboxv1alpha1.Pool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + Spec: sandboxv1alpha1.PoolSpec{ + PodRecyclePolicy: sandboxv1alpha1.PodRecyclePolicyReuse, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "main", Image: "test-image"}, + }, + }, + } + + r := &PoolReconciler{ + TaskExecutorImage: "task-executor:latest", + TaskExecutorResources: "200m,128Mi", + } + + r.injectTaskExecutor(pod, pool) + + // Verify ShareProcessNamespace is true + if pod.Spec.ShareProcessNamespace == nil || !*pod.Spec.ShareProcessNamespace { + t.Error("ShareProcessNamespace should be true") + } + + // Verify task-executor container is injected + found := false + for _, c := range pod.Spec.Containers { + if c.Name == "task-executor" { + found = true + break + } + } + if !found { + t.Error("task-executor container should be injected") + } +} + +func TestInjectTaskExecutor_RestartPolicyChange(t *testing.T) { + pool := &sandboxv1alpha1.Pool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + Spec: sandboxv1alpha1.PoolSpec{ + PodRecyclePolicy: sandboxv1alpha1.PodRecyclePolicyReuse, + }, + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + {Name: "main", Image: "test-image"}, + }, + }, + } + + r := &PoolReconciler{ + TaskExecutorImage: "task-executor:latest", + TaskExecutorResources: "200m,128Mi", + } + + r.injectTaskExecutor(pod, pool) + + // Verify RestartPolicy changed to Always + if pod.Spec.RestartPolicy != corev1.RestartPolicyAlways { + t.Errorf("RestartPolicy should be Always, got %s", pod.Spec.RestartPolicy) + } +} diff --git a/kubernetes/internal/scheduler/default_scheduler.go b/kubernetes/internal/scheduler/default_scheduler.go index a4bf818f6..3f620f701 100644 --- a/kubernetes/internal/scheduler/default_scheduler.go +++ b/kubernetes/internal/scheduler/default_scheduler.go @@ -142,7 +142,6 @@ type taskClient interface { const ( defaultTimeout time.Duration = 3 * time.Second - defaultTaskPort = "5758" defaultSchConcurrency int = 10 ) @@ -151,7 +150,7 @@ func newTaskClient(ip string) taskClient { } func fmtEndpoint(podIP string) string { - return fmt.Sprintf("http://%s:%s", podIP, defaultTaskPort) + return fmt.Sprintf("http://%s:%s", podIP, api.GetPort()) } type defaultTaskScheduler struct { diff --git a/kubernetes/internal/task-executor/manager/interface.go b/kubernetes/internal/task-executor/manager/interface.go index ee899dfdf..4ff810b42 100644 --- a/kubernetes/internal/task-executor/manager/interface.go +++ b/kubernetes/internal/task-executor/manager/interface.go @@ -17,6 +17,7 @@ package manager import ( "context" + "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/runtime" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/types" ) @@ -37,4 +38,12 @@ type TaskManager interface { Start(ctx context.Context) Stop() + + // GetExecutor returns the underlying executor for reset operations. + GetExecutor() runtime.Executor + + // Clear immediately stops and cleans up all tasks. + // Used for reset scenario: synchronous stop + clean store + clean memory. + // Returns the number of tasks that were stopped. + Clear(ctx context.Context) (int, error) } diff --git a/kubernetes/internal/task-executor/manager/task_manager.go b/kubernetes/internal/task-executor/manager/task_manager.go index 9a00a7727..6449d758d 100644 --- a/kubernetes/internal/task-executor/manager/task_manager.go +++ b/kubernetes/internal/task-executor/manager/task_manager.go @@ -263,6 +263,32 @@ func (m *taskManager) Stop() { klog.InfoS("task manager stopped") } +// GetExecutor returns the underlying executor for reset operations. +func (m *taskManager) GetExecutor() runtime.Executor { + return m.executor +} + +// Clear immediately stops and cleans up all tasks. +// Used for reset scenario: synchronous stop + clean store + clean memory. +// Returns the number of tasks that were stopped. +func (m *taskManager) Clear(ctx context.Context) (int, error) { + m.mu.Lock() + defer m.mu.Unlock() + + stopped := 0 + for name, task := range m.tasks { + if !isTerminalState(task.Status.State) { + if err := m.executor.Stop(ctx, task); err != nil { + klog.ErrorS(err, "failed to stop task", "name", name) + } + stopped++ + } + m.store.Delete(ctx, name) + delete(m.tasks, name) + } + return stopped, nil +} + // createTaskLocked creates a task without acquiring the lock func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) error { if task == nil || task.Name == "" { diff --git a/kubernetes/internal/task-executor/runtime/composite.go b/kubernetes/internal/task-executor/runtime/composite.go index 7cceed164..9891ac5fe 100644 --- a/kubernetes/internal/task-executor/runtime/composite.go +++ b/kubernetes/internal/task-executor/runtime/composite.go @@ -89,3 +89,13 @@ func (e *compositeExecutor) Stop(ctx context.Context, task *types.Task) error { } return delegate.Stop(ctx, task) } + +// RestartMainContainer restarts the main container (used for reset). +// It uses the process executor for restart operations. +func (e *compositeExecutor) RestartMainContainer(ctx context.Context, mainContainerName string) error { + // Restart is only supported by process executor + if e.processExec == nil { + return fmt.Errorf("process executor not available for restart") + } + return e.processExec.RestartMainContainer(ctx, mainContainerName) +} diff --git a/kubernetes/internal/task-executor/runtime/container.go b/kubernetes/internal/task-executor/runtime/container.go index 63fc0ba03..3c1a597b2 100644 --- a/kubernetes/internal/task-executor/runtime/container.go +++ b/kubernetes/internal/task-executor/runtime/container.go @@ -53,3 +53,8 @@ func (e *containerExecutor) Inspect(ctx context.Context, task *types.Task) (*typ func (e *containerExecutor) Stop(ctx context.Context, task *types.Task) error { return errors.New("container mode is not implemented yet - use process mode instead") } + +// RestartMainContainer is not implemented for container mode yet. +func (e *containerExecutor) RestartMainContainer(ctx context.Context, mainContainerName string) error { + return errors.New("container mode is not implemented yet - use process mode instead") +} diff --git a/kubernetes/internal/task-executor/runtime/interface.go b/kubernetes/internal/task-executor/runtime/interface.go index 4609ccef8..26d6ebf35 100644 --- a/kubernetes/internal/task-executor/runtime/interface.go +++ b/kubernetes/internal/task-executor/runtime/interface.go @@ -27,4 +27,8 @@ type Executor interface { Inspect(ctx context.Context, task *types.Task) (*types.Status, error) Stop(ctx context.Context, task *types.Task) error + + // RestartMainContainer restarts the main container (used for reset). + // This is separated from Reset for cleaner responsibility division. + RestartMainContainer(ctx context.Context, mainContainerName string) error } diff --git a/kubernetes/internal/task-executor/runtime/process.go b/kubernetes/internal/task-executor/runtime/process.go index 607854f51..6e83ffec6 100644 --- a/kubernetes/internal/task-executor/runtime/process.go +++ b/kubernetes/internal/task-executor/runtime/process.go @@ -451,3 +451,59 @@ func getProcEnviron(pid int) ([]string, error) { } return envs, nil } + +// RestartMainContainer restarts the main container by sending SIGTERM +// This is a public method that can be called directly for reset operations. +func (e *processExecutor) RestartMainContainer(ctx context.Context, mainContainerName string) error { + return e.restartMainContainer(ctx, mainContainerName) +} + +// restartMainContainer restarts the main container by sending SIGTERM first, +// then SIGKILL if the process doesn't exit gracefully. +func (e *processExecutor) restartMainContainer(ctx context.Context, mainContainerName string) error { + // Find the main container PID by environment variable + mainPID, err := e.findPidByEnvVar("SANDBOX_MAIN_CONTAINER", mainContainerName) + if err != nil { + return fmt.Errorf("failed to find main container: %w", err) + } + + klog.InfoS("Found main container for restart", "pid", mainPID, "container", mainContainerName) + + // Step 1: Send SIGTERM for graceful shutdown + if err := syscall.Kill(mainPID, syscall.SIGTERM); err != nil { + return fmt.Errorf("failed to send SIGTERM to main container: %w", err) + } + + klog.InfoS("Sent SIGTERM to main container", "pid", mainPID) + + // Step 2: Wait for process to exit gracefully (container runtime will restart it based on restartPolicy) + gracefulTimeout := 30 * time.Second + gracefulDeadline := time.Now().Add(gracefulTimeout) + for time.Now().Before(gracefulDeadline) { + if !isProcessRunning(mainPID) { + klog.InfoS("Main container process exited gracefully", "pid", mainPID) + return nil + } + time.Sleep(500 * time.Millisecond) + } + + // Step 3: Process didn't exit gracefully, send SIGKILL for forceful termination + klog.InfoS("Main container did not exit gracefully, sending SIGKILL", "pid", mainPID) + if err := syscall.Kill(mainPID, syscall.SIGKILL); err != nil { + return fmt.Errorf("failed to send SIGKILL to main container: %w", err) + } + + // Step 4: Wait for SIGKILL to take effect + forceKillWaitTimeout := 5 * time.Second + forceKillDeadline := time.Now().Add(forceKillWaitTimeout) + for time.Now().Before(forceKillDeadline) { + if !isProcessRunning(mainPID) { + klog.InfoS("Main container process killed forcefully", "pid", mainPID) + return nil + } + time.Sleep(100 * time.Millisecond) + } + + // Step 5: Process still running after SIGKILL - this is an error + return fmt.Errorf("main container still running after SIGKILL, pid: %d", mainPID) +} diff --git a/kubernetes/internal/task-executor/server/handler.go b/kubernetes/internal/task-executor/server/handler.go index 522b94b36..b26563cbd 100644 --- a/kubernetes/internal/task-executor/server/handler.go +++ b/kubernetes/internal/task-executor/server/handler.go @@ -15,9 +15,13 @@ package server import ( + "context" "encoding/json" "fmt" "net/http" + "os" + "path/filepath" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -26,6 +30,7 @@ import ( "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/config" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/manager" + "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/runtime" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/types" api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) @@ -36,22 +41,43 @@ type ErrorResponse struct { Message string `json:"message"` } +type resetState struct { + mu sync.Mutex + status api.ResetStatus + message string + details *api.ResetDetails + startTime time.Time +} + type Handler struct { - manager manager.TaskManager - config *config.Config + manager manager.TaskManager + executor runtime.Executor + config *config.Config + reset *resetState } -func NewHandler(mgr manager.TaskManager, cfg *config.Config) *Handler { +func NewHandler(mgr manager.TaskManager, exec runtime.Executor, cfg *config.Config) *Handler { if mgr == nil { klog.Warning("TaskManager is nil, handler may not work properly") } if cfg == nil { klog.Warning("Config is nil, handler may not work properly") } - return &Handler{ - manager: mgr, - config: cfg, + h := &Handler{ + manager: mgr, + executor: exec, + config: cfg, + reset: &resetState{ + status: api.ResetStatusNone, + }, } + return h +} + +func (h *Handler) isResetting() bool { + h.reset.mu.Lock() + defer h.reset.mu.Unlock() + return h.reset.status == api.ResetStatusInProgress } func (h *Handler) CreateTask(w http.ResponseWriter, r *http.Request) { @@ -60,6 +86,12 @@ func (h *Handler) CreateTask(w http.ResponseWriter, r *http.Request) { return } + // Block task creation during reset to prevent race conditions + if h.isResetting() { + writeError(w, http.StatusServiceUnavailable, "task creation is blocked: reset operation in progress") + return + } + var apiTask api.Task if err := json.NewDecoder(r.Body).Decode(&apiTask); err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) @@ -86,9 +118,7 @@ func (h *Handler) CreateTask(w http.ResponseWriter, r *http.Request) { response := convertInternalToAPITask(created) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusCreated) - json.NewEncoder(w).Encode(response) + writeJSON(w, http.StatusCreated, response) klog.InfoS("task created via API", "name", apiTask.Name) } @@ -99,6 +129,12 @@ func (h *Handler) SyncTasks(w http.ResponseWriter, r *http.Request) { return } + // Block task sync during reset to prevent race conditions + if h.isResetting() { + writeError(w, http.StatusServiceUnavailable, "task sync is blocked: reset operation in progress") + return + } + var apiTasks []api.Task if err := json.NewDecoder(r.Body).Decode(&apiTasks); err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) @@ -130,8 +166,7 @@ func (h *Handler) SyncTasks(w http.ResponseWriter, r *http.Request) { } } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) + writeJSON(w, http.StatusOK, response) klog.V(1).InfoS("tasks synced via API", "count", len(response)) } @@ -158,8 +193,7 @@ func (h *Handler) GetTask(w http.ResponseWriter, r *http.Request) { response := convertInternalToAPITask(task) - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) + writeJSON(w, http.StatusOK, response) } func (h *Handler) ListTasks(w http.ResponseWriter, r *http.Request) { @@ -182,16 +216,235 @@ func (h *Handler) ListTasks(w http.ResponseWriter, r *http.Request) { } } - w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) + writeJSON(w, http.StatusOK, response) } func (h *Handler) Health(w http.ResponseWriter, r *http.Request) { response := map[string]string{ "status": "healthy", } + writeJSON(w, http.StatusOK, response) +} + +// Reset handles the reset operation for pod recycling. +// It returns immediately with the current status. If no reset is in progress, it starts a new one in a goroutine. +func (h *Handler) Reset(w http.ResponseWriter, r *http.Request) { + if h.manager == nil { + writeError(w, http.StatusInternalServerError, "task manager not initialized") + return + } + + var req api.ResetRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid request body: %v", err)) + return + } + + if !h.config.EnableSidecarMode { + klog.ErrorS(nil, "Reset is only supported in sidecar mode") + writeJSON(w, http.StatusOK, &api.ResetResponse{ + Status: api.ResetStatusNotSupported, + Message: "Reset is only supported in sidecar mode where task-executor runs as a sidecar container", + }) + return + } + + h.reset.mu.Lock() + + // Case 1: Reset already in progress -> return current status (idempotent) + if h.reset.status == api.ResetStatusInProgress { + klog.InfoS("Reset already in progress, returning current status") + resp := h.currentResetResponse() + h.reset.mu.Unlock() + writeJSON(w, http.StatusOK, resp) + return + } + + // Case 2: Reset already completed -> return terminal status + if h.reset.status == api.ResetStatusSuccess || + h.reset.status == api.ResetStatusFailed || + h.reset.status == api.ResetStatusTimeout || + h.reset.status == api.ResetStatusNotSupported { + klog.InfoS("Reset already completed, returning terminal status", "status", h.reset.status) + resp := h.currentResetResponse() + h.reset.mu.Unlock() + writeJSON(w, http.StatusOK, resp) + return + } + + // Case 3: Start a new reset + h.reset.status = api.ResetStatusInProgress + h.reset.startTime = time.Now() + h.reset.message = "Reset started" + h.reset.details = &api.ResetDetails{} + + resp := h.currentResetResponse() + + klog.InfoS("Starting reset operation", "mainContainer", req.MainContainerName) + + h.reset.mu.Unlock() + + go h.executeReset(req) + + writeJSON(w, http.StatusOK, resp) +} + +func (h *Handler) currentResetResponse() *api.ResetResponse { + return &api.ResetResponse{ + Status: h.reset.status, + Message: h.reset.message, + Details: h.reset.details, + } +} + +func (h *Handler) executeReset(req api.ResetRequest) { + defer func() { + if err := recover(); err != nil { + h.reset.mu.Lock() + if h.reset.status == api.ResetStatusInProgress { + h.reset.status = api.ResetStatusFailed + h.reset.message = fmt.Sprintf("reset goroutine panic: %v", err) + } + h.reset.mu.Unlock() + klog.ErrorS(nil, "Reset goroutine panicked", "error", err) + } + }() + + timeout := time.Duration(req.TimeoutSeconds) * time.Second + if timeout <= 0 { + timeout = 60 * time.Second + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + klog.InfoS("Starting reset operation", "timeout", timeout) + + // Step 1: Stop and clean up all tasks + stopped, err := h.manager.Clear(ctx) + if err != nil { + h.setResetFailed(fmt.Sprintf("failed to clear tasks: %v", err)) + return + } + h.reset.mu.Lock() + h.reset.details.TasksStopped = stopped + h.reset.mu.Unlock() + klog.InfoS("Stopped tasks during reset", "count", stopped) + + // Step 2: Clean task data directory + if err := h.cleanTaskDataDir(); err != nil { + h.setResetFailed(fmt.Sprintf("failed to clean task data dir: %v", err)) + return + } + + // Step 3: Clean user-specified directories + if len(req.CleanDirectories) > 0 { + cleaned, err := h.cleanDirectories(req.CleanDirectories) + if err != nil { + h.setResetFailed(fmt.Sprintf("failed to clean directories: %v", err)) + return + } + h.reset.mu.Lock() + h.reset.details.DirectoriesCleaned = cleaned + h.reset.mu.Unlock() + klog.InfoS("Cleaned directories during reset", "directories", cleaned) + } + + // Step 4: Restart main container + mainContainer := req.MainContainerName + if mainContainer == "" { + mainContainer = h.config.MainContainerName + } + if mainContainer != "" && h.executor != nil { + if err := h.executor.RestartMainContainer(ctx, mainContainer); err != nil { + h.setResetFailed(fmt.Sprintf("failed to restart main container: %v", err)) + return + } + h.reset.mu.Lock() + h.reset.details.MainContainerRestarted = true + h.reset.mu.Unlock() + klog.InfoS("Restarted main container during reset", "container", mainContainer) + } + + // Check if context was cancelled (timeout) + if ctx.Err() == context.DeadlineExceeded { + h.setResetStatus(api.ResetStatusTimeout, "reset operation timed out") + return + } + + h.reset.mu.Lock() + h.reset.status = api.ResetStatusSuccess + h.reset.message = "Reset completed successfully" + h.reset.mu.Unlock() + + klog.InfoS("Reset operation completed successfully") +} + +func (h *Handler) setResetStatus(status api.ResetStatus, message string) { + h.reset.mu.Lock() + h.reset.status = status + h.reset.message = message + h.reset.mu.Unlock() + if status == api.ResetStatusFailed || status == api.ResetStatusTimeout { + klog.ErrorS(nil, "Reset operation failed", "status", status, "message", message) + } +} + +func (h *Handler) setResetFailed(message string) { + h.setResetStatus(api.ResetStatusFailed, message) +} + +func (h *Handler) cleanTaskDataDir() error { + if h.config.DataDir == "" { + return nil + } + + entries, err := os.ReadDir(h.config.DataDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to read data directory: %w", err) + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + taskDir := filepath.Join(h.config.DataDir, entry.Name()) + if err := os.RemoveAll(taskDir); err != nil { + klog.ErrorS(err, "Failed to remove task directory", "path", taskDir) + } else { + klog.InfoS("Removed task directory", "path", taskDir) + } + } + return nil +} + +func (h *Handler) cleanDirectories(dirs []string) ([]string, error) { + var cleaned []string + for _, dir := range dirs { + matches, err := filepath.Glob(dir) + if err != nil { + klog.ErrorS(err, "Invalid glob pattern", "pattern", dir) + continue + } + + for _, match := range matches { + if err := os.RemoveAll(match); err != nil { + klog.ErrorS(err, "Failed to clean directory", "path", match) + continue + } + cleaned = append(cleaned, match) + klog.InfoS("Cleaned directory", "path", match) + } + } + return cleaned, nil +} + +func writeJSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(response) + w.WriteHeader(status) + json.NewEncoder(w).Encode(data) } func (h *Handler) DeleteTask(w http.ResponseWriter, r *http.Request) { diff --git a/kubernetes/internal/task-executor/server/handler_test.go b/kubernetes/internal/task-executor/server/handler_test.go index 3cadac5c1..f599f6965 100644 --- a/kubernetes/internal/task-executor/server/handler_test.go +++ b/kubernetes/internal/task-executor/server/handler_test.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/config" + "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/runtime" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/types" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils" api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" @@ -99,9 +100,43 @@ func (m *MockTaskManager) Delete(ctx context.Context, id string) error { func (m *MockTaskManager) Start(ctx context.Context) {} func (m *MockTaskManager) Stop() {} +// GetExecutor returns a mock executor for testing +func (m *MockTaskManager) GetExecutor() runtime.Executor { + return &MockExecutor{} +} + +// Clear stops and cleans up all tasks (for testing) +func (m *MockTaskManager) Clear(ctx context.Context) (int, error) { + if m.err != nil { + return 0, m.err + } + count := len(m.tasks) + m.tasks = make(map[string]*types.Task) + return count, nil +} + +// MockExecutor implements runtime.Executor for testing +type MockExecutor struct{} + +func (e *MockExecutor) Start(ctx context.Context, task *types.Task) error { + return nil +} + +func (e *MockExecutor) Inspect(ctx context.Context, task *types.Task) (*types.Status, error) { + return &types.Status{}, nil +} + +func (e *MockExecutor) Stop(ctx context.Context, task *types.Task) error { + return nil +} + +func (e *MockExecutor) RestartMainContainer(ctx context.Context, mainContainerName string) error { + return nil +} + func TestHandler_Health(t *testing.T) { cfg := &config.Config{} - h := NewHandler(NewMockTaskManager(), cfg) + h := NewHandler(NewMockTaskManager(), &MockExecutor{}, cfg) req := httptest.NewRequest("GET", "/health", nil) w := httptest.NewRecorder() @@ -115,7 +150,7 @@ func TestHandler_Health(t *testing.T) { func TestHandler_CreateTask(t *testing.T) { mgr := NewMockTaskManager() cfg := &config.Config{} - h := NewHandler(mgr, cfg) + h := NewHandler(mgr, &MockExecutor{}, cfg) task := api.Task{ Name: "test-task", @@ -143,7 +178,7 @@ func TestHandler_GetTask(t *testing.T) { mgr := NewMockTaskManager() mgr.tasks["test-task"] = &types.Task{Name: "test-task"} cfg := &config.Config{} - h := NewHandler(mgr, cfg) + h := NewHandler(mgr, &MockExecutor{}, cfg) router := NewRouter(h) req := httptest.NewRequest("GET", "/tasks/test-task", nil) @@ -166,7 +201,7 @@ func TestHandler_DeleteTask(t *testing.T) { mgr := NewMockTaskManager() mgr.tasks["test-task"] = &types.Task{Name: "test-task"} cfg := &config.Config{} - h := NewHandler(mgr, cfg) + h := NewHandler(mgr, &MockExecutor{}, cfg) router := NewRouter(h) req := httptest.NewRequest("DELETE", "/tasks/test-task", nil) @@ -188,7 +223,7 @@ func TestHandler_ListTasks(t *testing.T) { mgr.tasks["task-1"] = &types.Task{Name: "task-1"} mgr.tasks["task-2"] = &types.Task{Name: "task-2"} cfg := &config.Config{} - h := NewHandler(mgr, cfg) + h := NewHandler(mgr, &MockExecutor{}, cfg) req := httptest.NewRequest("GET", "/getTasks", nil) w := httptest.NewRecorder() @@ -209,7 +244,7 @@ func TestHandler_ListTasks(t *testing.T) { func TestHandler_SyncTasks(t *testing.T) { mgr := NewMockTaskManager() cfg := &config.Config{} - h := NewHandler(mgr, cfg) + h := NewHandler(mgr, &MockExecutor{}, cfg) tasks := []api.Task{ {Name: "task-1", Process: &api.Process{}}, @@ -234,7 +269,7 @@ func TestHandler_Errors(t *testing.T) { mgr := NewMockTaskManager() mgr.err = errors.New("mock error") cfg := &config.Config{} - h := NewHandler(mgr, cfg) + h := NewHandler(mgr, &MockExecutor{}, cfg) // Create fail task := api.Task{Name: "fail"} @@ -427,3 +462,154 @@ func TestConvertInternalToAPITask_Timeout(t *testing.T) { assert.Equal(t, later.Unix(), apiTask.ProcessStatus.Terminated.FinishedAt.Unix()) }) } + +// ============================================================ +// Reset API State Machine Tests +// ============================================================ + +func TestReset_StateMachine(t *testing.T) { + t.Run("Initial state is None", func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: true} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + // Initial state should be None + assert.Equal(t, api.ResetStatusNone, h.reset.status) + }) + + t.Run("None -> InProgress on first call", func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: true} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + body, _ := json.Marshal(api.ResetRequest{}) + req := httptest.NewRequest("POST", "/reset", bytes.NewReader(body)) + w := httptest.NewRecorder() + + h.Reset(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + + var resp api.ResetResponse + json.NewDecoder(w.Body).Decode(&resp) + assert.Equal(t, api.ResetStatusInProgress, resp.Status) + + // Internal state should be InProgress + assert.Equal(t, api.ResetStatusInProgress, h.reset.status) + }) + + t.Run("InProgress -> InProgress (idempotent)", func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: true} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + // Set state to InProgress manually + h.reset.status = api.ResetStatusInProgress + + body, _ := json.Marshal(api.ResetRequest{}) + req := httptest.NewRequest("POST", "/reset", bytes.NewReader(body)) + w := httptest.NewRecorder() + + h.Reset(w, req) + + var resp api.ResetResponse + json.NewDecoder(w.Body).Decode(&resp) + assert.Equal(t, api.ResetStatusInProgress, resp.Status) + }) + + t.Run("Terminal status not retrigger", func(t *testing.T) { + terminalStatuses := []api.ResetStatus{ + api.ResetStatusSuccess, + api.ResetStatusFailed, + api.ResetStatusTimeout, + } + + for _, status := range terminalStatuses { + t.Run(string(status), func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: true} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + // Set terminal state + h.reset.status = status + h.reset.message = "previous result" + + body, _ := json.Marshal(api.ResetRequest{}) + req := httptest.NewRequest("POST", "/reset", bytes.NewReader(body)) + w := httptest.NewRecorder() + + h.Reset(w, req) + + var resp api.ResetResponse + json.NewDecoder(w.Body).Decode(&resp) + // Should return the same terminal status, not start new reset + assert.Equal(t, status, resp.Status) + assert.Equal(t, "previous result", resp.Message) + }) + } + }) + + t.Run("NotSupported in non-sidecar mode", func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: false} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + body, _ := json.Marshal(api.ResetRequest{}) + req := httptest.NewRequest("POST", "/reset", bytes.NewReader(body)) + w := httptest.NewRecorder() + + h.Reset(w, req) + + var resp api.ResetResponse + json.NewDecoder(w.Body).Decode(&resp) + assert.Equal(t, api.ResetStatusNotSupported, resp.Status) + }) +} + +func TestCreateTask_BlockedDuringReset(t *testing.T) { + t.Run("Create task blocked during reset", func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: true} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + // Set state to InProgress + h.reset.status = api.ResetStatusInProgress + + task := api.Task{Name: "blocked-task", Process: &api.Process{Command: []string{"echo"}}} + body, _ := json.Marshal(task) + req := httptest.NewRequest("POST", "/tasks", bytes.NewReader(body)) + w := httptest.NewRecorder() + + h.CreateTask(w, req) + + // Should return 503 Service Unavailable + assert.Equal(t, http.StatusServiceUnavailable, w.Code) + + // Task should not be created + _, exists := mgr.tasks["blocked-task"] + assert.False(t, exists) + }) + + t.Run("Create task allowed after reset completes", func(t *testing.T) { + mgr := NewMockTaskManager() + cfg := &config.Config{EnableSidecarMode: true} + h := NewHandler(mgr, &MockExecutor{}, cfg) + + // Set state to Success (terminal state) + h.reset.status = api.ResetStatusSuccess + + task := api.Task{Name: "allowed-task", Process: &api.Process{Command: []string{"echo"}}} + body, _ := json.Marshal(task) + req := httptest.NewRequest("POST", "/tasks", bytes.NewReader(body)) + w := httptest.NewRecorder() + + h.CreateTask(w, req) + + // Should succeed + assert.Equal(t, http.StatusCreated, w.Code) + + // Task should be created + _, exists := mgr.tasks["allowed-task"] + assert.True(t, exists) + }) +} diff --git a/kubernetes/internal/task-executor/server/router.go b/kubernetes/internal/task-executor/server/router.go index 52d3bd006..ccbbfbae5 100644 --- a/kubernetes/internal/task-executor/server/router.go +++ b/kubernetes/internal/task-executor/server/router.go @@ -27,6 +27,7 @@ func NewRouter(h *Handler) http.Handler { mux.HandleFunc("GET /tasks/{id}", h.GetTask) mux.HandleFunc("DELETE /tasks/{id}", h.DeleteTask) mux.HandleFunc("GET /health", h.Health) + mux.HandleFunc("POST /reset", h.Reset) return mux } diff --git a/kubernetes/pkg/task-executor/client.go b/kubernetes/pkg/task-executor/client.go index 57f8c271d..c5058c2e3 100644 --- a/kubernetes/pkg/task-executor/client.go +++ b/kubernetes/pkg/task-executor/client.go @@ -145,3 +145,54 @@ func (c *Client) Get(ctx context.Context) (*Task, error) { // No tasks return nil, nil } + +// Reset calls the reset API on the task-executor to prepare the pod for reuse. +// Reset is only supported in sidecar mode. +// This method is idempotent - if a reset is already in progress, it returns the current status. +func (c *Client) Reset(ctx context.Context, req *ResetRequest) (*ResetResponse, error) { + if c == nil { + return nil, fmt.Errorf("client is nil") + } + + data, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal reset request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/reset", bytes.NewReader(data)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("network error: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("server error: status=%d, body=%s", resp.StatusCode, string(body)) + } + + var resetResp ResetResponse + if err := json.NewDecoder(resp.Body).Decode(&resetResp); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return &resetResp, nil +} + +// NewClientWithTimeout creates a new client with a custom timeout. +func NewClientWithTimeout(baseURL string, timeout time.Duration) *Client { + if baseURL == "" { + klog.Warning("baseURL is empty, client may not work properly") + } + return &Client{ + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: timeout, + }, + } +} diff --git a/kubernetes/pkg/task-executor/client_test.go b/kubernetes/pkg/task-executor/client_test.go new file mode 100644 index 000000000..733ea3d70 --- /dev/null +++ b/kubernetes/pkg/task-executor/client_test.go @@ -0,0 +1,89 @@ +// Copyright 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task_executor + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func TestClient_Reset_Success(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/reset" { + t.Errorf("expected path /reset, got %s", r.URL.Path) + } + if r.Method != "POST" { + t.Errorf("expected POST, got %s", r.Method) + } + + resp := ResetResponse{ + Status: ResetStatusSuccess, + Message: "Reset completed", + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + })) + defer server.Close() + + client := NewClient(server.URL) + resp, err := client.Reset(context.Background(), &ResetRequest{}) + if err != nil { + t.Errorf("Reset() error = %v", err) + return + } + if resp.Status != ResetStatusSuccess { + t.Errorf("Reset() status = %v, want %v", resp.Status, ResetStatusSuccess) + } +} + +func TestClient_Reset_ServerError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("internal error")) + })) + defer server.Close() + + client := NewClient(server.URL) + _, err := client.Reset(context.Background(), &ResetRequest{}) + if err == nil { + t.Error("Reset() should return error on server error") + } +} + +func TestClient_Reset_Timeout(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := NewClientWithTimeout(server.URL, 10*time.Millisecond) + _, err := client.Reset(context.Background(), &ResetRequest{}) + if err == nil { + t.Error("Reset() should return error on timeout") + } +} + +func TestClient_Reset_NilClient(t *testing.T) { + var client *Client + _, err := client.Reset(context.Background(), &ResetRequest{}) + if err == nil { + t.Error("Reset() should return error for nil client") + } +} diff --git a/kubernetes/pkg/task-executor/constants.go b/kubernetes/pkg/task-executor/constants.go new file mode 100644 index 000000000..6bca2ce43 --- /dev/null +++ b/kubernetes/pkg/task-executor/constants.go @@ -0,0 +1,32 @@ +// Copyright 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package task_executor + +import "os" + +const ( + // DefaultPort is the default port for task-executor API + DefaultPort = "5758" + // PortEnvVar is the environment variable name for custom port + PortEnvVar = "TASK_EXECUTOR_PORT" +) + +// GetPort returns the task-executor port from environment variable or default +func GetPort() string { + if port := os.Getenv(PortEnvVar); port != "" { + return port + } + return DefaultPort +} diff --git a/kubernetes/pkg/task-executor/types.go b/kubernetes/pkg/task-executor/types.go index 5a3ba5abc..c7d2088ca 100644 --- a/kubernetes/pkg/task-executor/types.go +++ b/kubernetes/pkg/task-executor/types.go @@ -97,3 +97,57 @@ type Terminated struct { // +optional FinishedAt metav1.Time `json:"finishedAt,omitempty"` } + +// ResetRequest defines the reset request for pod recycling. +// Reset is only supported in sidecar mode, where task-executor runs as a sidecar +// and uses nsenter to operate on the main container's namespace. +type ResetRequest struct { + // TimeoutSeconds specifies the timeout for reset operation in seconds. + // +optional + TimeoutSeconds int64 `json:"timeoutSeconds,omitempty"` + // CleanDirectories specifies directories to clean during reset. + // Supports glob patterns like "/tmp/*", "/var/cache/**". + // +optional + CleanDirectories []string `json:"cleanDirectories,omitempty"` + // MainContainerName specifies the name of the main container to restart. + // If not specified, the default main container name from config will be used. + // +optional + MainContainerName string `json:"mainContainerName,omitempty"` +} + +// ResetResponse defines the response for reset operation. +type ResetResponse struct { + // Status indicates the result of reset operation. + Status ResetStatus `json:"status"` + // Message provides additional information about the reset result. + Message string `json:"message"` + // Details provides detailed information about the reset operation. + // +optional + Details *ResetDetails `json:"details,omitempty"` +} + +// ResetStatus defines the status of reset operation. +type ResetStatus string + +const ( + // ResetStatusNone indicates no reset has been initiated yet. + // This is the initial state after task-executor starts. + ResetStatusNone ResetStatus = "None" + ResetStatusSuccess ResetStatus = "Success" + ResetStatusFailed ResetStatus = "Failed" + ResetStatusTimeout ResetStatus = "Timeout" + // ResetStatusInProgress is returned when Reset is called while a reset is already ongoing. + ResetStatusInProgress ResetStatus = "InProgress" + // ResetStatusNotSupported indicates reset is not supported in current mode (sidecar mode required). + ResetStatusNotSupported ResetStatus = "NotSupported" +) + +// ResetDetails provides detailed information about the reset operation. +type ResetDetails struct { + // TasksStopped is the number of tasks that were stopped. + TasksStopped int `json:"tasksStopped"` + // DirectoriesCleaned is the list of directories that were cleaned. + DirectoriesCleaned []string `json:"directoriesCleaned"` + // MainContainerRestarted indicates whether the main container was restarted. + MainContainerRestarted bool `json:"mainContainerRestarted"` +} diff --git a/kubernetes/test/e2e/pod_recycle_policy_test.go b/kubernetes/test/e2e/pod_recycle_policy_test.go new file mode 100644 index 000000000..9e11d61b8 --- /dev/null +++ b/kubernetes/test/e2e/pod_recycle_policy_test.go @@ -0,0 +1,633 @@ +// Copyright 2025 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/alibaba/OpenSandbox/sandbox-k8s/test/utils" +) + +// PodRecyclePolicy E2E tests for Issue #452 +// Tests the pod disposal policy when pooled BatchSandbox is deleted. +// Two policies are supported: +// - Delete (default): Pod is deleted when BatchSandbox is deleted +// - Reuse: Pod is reset and returned to pool for reuse +var _ = Describe("PodRecyclePolicy", Ordered, func() { + const testNamespace = "default" + + BeforeAll(func() { + By("waiting for controller to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager", + "-n", namespace, "-o", "jsonpath={.items[0].status.phase}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal("Running")) + }, 2*time.Minute).Should(Succeed()) + }) + + SetDefaultEventuallyTimeout(2 * time.Minute) + SetDefaultEventuallyPollingInterval(time.Second) + + Context("Delete Policy", func() { + It("should delete pods when BatchSandbox is deleted with Delete policy", func() { + const poolName = "test-pool-delete-policy" + const batchSandboxName = "test-bs-delete-policy" + + By("creating a Pool with Delete policy (default)") + poolYAML, err := renderTemplate("testdata/pool-delete-policy.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-delete-policy.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create Pool") + + By("waiting for Pool to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(totalStr).NotTo(BeEmpty()) + }, 2*time.Minute).Should(Succeed()) + + By("creating a BatchSandbox using the pool") + bsYAML, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": 1, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-delete-policy.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("recording allocated pod names") + var allocatedPodNames []string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + allocStatusJSON, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatusJSON).NotTo(BeEmpty()) + + var allocStatus struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(allocStatusJSON), &allocStatus) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(allocStatus.Pods)).To(BeNumerically(">", 0)) + allocatedPodNames = allocStatus.Pods + }, 2*time.Minute).Should(Succeed()) + + By("deleting the BatchSandbox") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying pods are deleted (not returned to pool)") + Eventually(func(g Gomega) { + for _, podName := range allocatedPodNames { + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", testNamespace) + _, err := utils.Run(cmd) + g.Expect(err).To(HaveOccurred(), "Pod %s should be deleted", podName) + g.Expect(err.Error()).To(ContainSubstring("not found")) + } + }, 2*time.Minute).Should(Succeed()) + + By("cleaning up the Pool") + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should use Delete policy for existing pools without podRecyclePolicy field (backward compatibility)", func() { + const poolName = "test-pool-compat" + const batchSandboxName = "test-bs-compat" + + By("creating a Pool without podRecyclePolicy field (old behavior)") + poolYAML, err := renderTemplate("testdata/pool-basic.yaml", map[string]interface{}{ + "PoolName": poolName, + "SandboxImage": sandboxImage, + "Namespace": testNamespace, + "BufferMax": 3, + "BufferMin": 1, + "PoolMax": 5, + "PoolMin": 2, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-compat.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create Pool") + + By("waiting for Pool to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(totalStr).NotTo(BeEmpty()) + }, 2*time.Minute).Should(Succeed()) + + By("creating a BatchSandbox using the pool") + bsYAML, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": 1, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-compat.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("recording allocated pod names") + var allocatedPodNames []string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + allocStatusJSON, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatusJSON).NotTo(BeEmpty()) + + var allocStatus struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(allocStatusJSON), &allocStatus) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(allocStatus.Pods)).To(BeNumerically(">", 0)) + allocatedPodNames = allocStatus.Pods + }, 2*time.Minute).Should(Succeed()) + + By("deleting the BatchSandbox") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying pods are deleted (default Delete policy for backward compatibility)") + Eventually(func(g Gomega) { + for _, podName := range allocatedPodNames { + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", testNamespace) + _, err := utils.Run(cmd) + g.Expect(err).To(HaveOccurred(), "Pod %s should be deleted (default Delete policy)", podName) + g.Expect(err.Error()).To(ContainSubstring("not found")) + } + }, 2*time.Minute).Should(Succeed()) + + By("cleaning up the Pool") + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("Reuse Policy", func() { + It("should reset and return pods to pool when BatchSandbox is deleted with Reuse policy", func() { + const poolName = "test-pool-reuse-policy" + const batchSandboxName = "test-bs-reuse-policy" + + By("creating a Pool with Reuse policy and task-executor sidecar") + poolYAML, err := renderTemplate("testdata/pool-reuse-policy.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + "PoolMax": 1, // Use poolMax=1 so there is only 1 pod, ensuring the same pod is reused + "PoolMin": 1, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-reuse-policy.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create Pool") + + By("waiting for Pool to be ready and pods have task-executor sidecar") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(totalStr).NotTo(BeEmpty()) + + // Verify pods have the reuse-enabled label + cmd = exec.Command("kubectl", "get", "pods", "-n", testNamespace, + "-l", fmt.Sprintf("sandbox.opensandbox.io/pool-name=%s", poolName), + "-o", "jsonpath={.items[*].metadata.labels.pool\\.opensandbox\\.io/reuse-enabled}") + reuseLabels, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(reuseLabels).To(ContainSubstring("true"), "Pods should have reuse-enabled=true label") + }, 2*time.Minute).Should(Succeed()) + + By("verifying pool pods have task-executor sidecar injected") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-n", testNamespace, + "-l", fmt.Sprintf("sandbox.opensandbox.io/pool-name=%s", poolName), + "-o", "jsonpath={.items[0].spec.containers[*].name}") + containerNames, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(containerNames).To(ContainSubstring("task-executor"), "Pool pod should have task-executor sidecar") + }, 30*time.Second).Should(Succeed()) + + By("verifying pool pods have shareProcessNamespace=true") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-n", testNamespace, + "-l", fmt.Sprintf("sandbox.opensandbox.io/pool-name=%s", poolName), + "-o", "jsonpath={.items[0].spec.shareProcessNamespace}") + shareNs, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(shareNs).To(Equal("true"), "Pool pod should have shareProcessNamespace=true for nsenter support") + }, 30*time.Second).Should(Succeed()) + + By("verifying task-executor container has SYS_PTRACE capability") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-n", testNamespace, + "-l", fmt.Sprintf("sandbox.opensandbox.io/pool-name=%s", poolName), + "-o", "jsonpath={.items[0].spec.containers[?(@.name=='task-executor')].securityContext.capabilities.add}") + caps, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(caps).To(ContainSubstring("SYS_PTRACE"), "task-executor container should have SYS_PTRACE capability") + }, 30*time.Second).Should(Succeed()) + + By("recording initial pool total") + cmd = exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + initialTotalStr, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("creating a BatchSandbox using the pool") + bsYAML, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": 1, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-reuse-policy.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("recording allocated pod names") + var allocatedPodNames []string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + allocStatusJSON, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatusJSON).NotTo(BeEmpty()) + + var allocStatus struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(allocStatusJSON), &allocStatus) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(allocStatus.Pods)).To(BeNumerically(">", 0)) + allocatedPodNames = allocStatus.Pods + }, 2*time.Minute).Should(Succeed()) + + By("verifying BatchSandbox has pod-disposal finalizer") + cmd = exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.finalizers}") + finalizers, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + Expect(finalizers).To(ContainSubstring("batch-sandbox.sandbox.opensandbox.io/pod-disposal"), + "Pooled BatchSandbox should have pod-disposal finalizer") + + By("deleting the BatchSandbox") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("verifying pods still exist (returned to pool after reset)") + Eventually(func(g Gomega) { + for _, podName := range allocatedPodNames { + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", testNamespace, + "-o", "jsonpath={.status.phase}") + phase, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred(), "Pod %s should still exist", podName) + g.Expect(phase).To(Equal("Running"), "Pod %s should be Running", podName) + } + }, 2*time.Minute).Should(Succeed()) + + By("verifying pool total remains the same (pods returned)") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(totalStr).To(Equal(initialTotalStr), "Pool total should remain the same after pod return") + }, 30*time.Second).Should(Succeed()) + + By("verifying pods can be reallocated to a new BatchSandbox (same pod reused)") + // Create another BatchSandbox to verify the same pod is reused + const batchSandboxName2 = "test-bs-reuse-policy-2" + bsYAML2, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName2, + "Namespace": testNamespace, + "Replicas": 1, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile2 := filepath.Join("/tmp", "test-bs-reuse-policy-2.yaml") + err = os.WriteFile(bsFile2, []byte(bsYAML2), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile2) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile2) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + var reallocatedPodNames []string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName2, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + allocStatusJSON, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatusJSON).NotTo(BeEmpty()) + + var allocStatus struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(allocStatusJSON), &allocStatus) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(allocStatus.Pods)).To(Equal(1), "Second BatchSandbox should allocate 1 pod") + reallocatedPodNames = allocStatus.Pods + }, 2*time.Minute).Should(Succeed()) + + By("verifying the reallocated pod is the same pod that was previously reset (pod identity preserved)") + Expect(reallocatedPodNames).To(HaveLen(1)) + Expect(allocatedPodNames).To(HaveLen(1)) + Expect(reallocatedPodNames[0]).To(Equal(allocatedPodNames[0]), + "The same pod %s should be reused by the second BatchSandbox after reset", + allocatedPodNames[0]) + + By("cleaning up BatchSandbox") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName2, "-n", testNamespace) + _, _ = utils.Run(cmd) + + By("cleaning up the Pool") + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + }) + }) +}) + +// ============================================================ +// Case 2: Multiple Pods Concurrent Reset (includes pool.status.resetting verification) +// ============================================================ + +var _ = Describe("PodRecyclePolicy - Multiple Pods Reuse", Ordered, func() { + const testNamespace = "default" + + BeforeAll(func() { + By("waiting for controller to be ready") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pods", "-l", "control-plane=controller-manager", + "-n", namespace, "-o", "jsonpath={.items[0].status.phase}") + output, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(output).To(Equal("Running")) + }, 2*time.Minute).Should(Succeed()) + }) + + SetDefaultEventuallyTimeout(3 * time.Minute) + SetDefaultEventuallyPollingInterval(time.Second) + + Context("Reuse policy with multiple pods", func() { + It("should reset multiple pods concurrently and return all to pool", func() { + const poolName = "test-pool-multi-pod" + const batchSandboxName = "test-bs-multi-pod" + + By("creating a Pool with Reuse policy (poolMin=3)") + poolYAML, err := renderTemplate("testdata/pool-reuse-multi-pod.yaml", map[string]interface{}{ + "PoolName": poolName, + "Namespace": testNamespace, + "TaskExecutorImage": taskExecutorImage, + "PoolMin": 3, + }) + Expect(err).NotTo(HaveOccurred()) + + poolFile := filepath.Join("/tmp", "test-pool-multi-pod.yaml") + err = os.WriteFile(poolFile, []byte(poolYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(poolFile) + + cmd := exec.Command("kubectl", "apply", "-f", poolFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred(), "Failed to create Pool") + + By("waiting for Pool total >= 3") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + total := 0 + fmt.Sscanf(totalStr, "%d", &total) + g.Expect(total).To(BeNumerically(">=", 3)) + }, 3*time.Minute).Should(Succeed()) + + By("recording initial pool total") + cmd = exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + initialTotalStr, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("creating a BatchSandbox with replicas=3") + bsYAML, err := renderTemplate("testdata/batchsandbox-pooled-no-expire.yaml", map[string]interface{}{ + "BatchSandboxName": batchSandboxName, + "Namespace": testNamespace, + "Replicas": 3, + "PoolName": poolName, + }) + Expect(err).NotTo(HaveOccurred()) + + bsFile := filepath.Join("/tmp", "test-bs-multi-pod.yaml") + err = os.WriteFile(bsFile, []byte(bsYAML), 0644) + Expect(err).NotTo(HaveOccurred()) + defer os.Remove(bsFile) + + cmd = exec.Command("kubectl", "apply", "-f", bsFile) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("waiting for all 3 pods to be allocated") + var allocatedPodNames []string + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "batchsandbox", batchSandboxName, "-n", testNamespace, + "-o", "jsonpath={.metadata.annotations.sandbox\\.opensandbox\\.io/alloc-status}") + allocStatusJSON, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(allocStatusJSON).NotTo(BeEmpty()) + + var allocStatus struct { + Pods []string `json:"pods"` + } + err = json.Unmarshal([]byte(allocStatusJSON), &allocStatus) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(len(allocStatus.Pods)).To(Equal(3), "Should have exactly 3 pods allocated") + allocatedPodNames = allocStatus.Pods + }, 3*time.Minute).Should(Succeed()) + + By("deleting the BatchSandbox to trigger concurrent reset for all 3 pods") + cmd = exec.Command("kubectl", "delete", "batchsandbox", batchSandboxName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + // Try to observe resetting > 0 (best-effort, the window may be short) + By("attempting to observe pool.status.resetting > 0 during reset (best-effort)") + resettingObserved := false + for i := 0; i < 10; i++ { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.resetting}") + resettingStr, err := utils.Run(cmd) + if err == nil { + resetting := 0 + fmt.Sscanf(resettingStr, "%d", &resetting) + if resetting > 0 { + resettingObserved = true + break + } + } + time.Sleep(500 * time.Millisecond) + } + if resettingObserved { + By("observed pool.status.resetting > 0 during reset") + } else { + By("reset completed too quickly to observe resetting > 0, skipping intermediate check") + } + + By("verifying all 3 pods still exist and are Running after reset") + Eventually(func(g Gomega) { + for _, podName := range allocatedPodNames { + cmd := exec.Command("kubectl", "get", "pod", podName, "-n", testNamespace, + "-o", "jsonpath={.status.phase}") + phase, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred(), "Pod %s should still exist after reset", podName) + g.Expect(phase).To(Equal("Running"), "Pod %s should be Running after reset", podName) + } + }, 3*time.Minute).Should(Succeed()) + + By("verifying pool total remains the same (all pods returned to pool)") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.total}") + totalStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(totalStr).To(Equal(initialTotalStr), "Pool total should remain unchanged after concurrent reset") + }, 30*time.Second).Should(Succeed()) + + By("verifying pool.status.resetting == 0 after reset completes") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.resetting}") + resettingStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + // Empty string means 0 (omitempty field) + if resettingStr != "" { + resetting := 0 + fmt.Sscanf(resettingStr, "%d", &resetting) + g.Expect(resetting).To(Equal(0), "pool.status.resetting should be 0 after reset completes") + } + }, 3*time.Minute).Should(Succeed()) + + By("verifying pool available >= 3 (all pods available again)") + Eventually(func(g Gomega) { + cmd := exec.Command("kubectl", "get", "pool", poolName, "-n", testNamespace, + "-o", "jsonpath={.status.available}") + availStr, err := utils.Run(cmd) + g.Expect(err).NotTo(HaveOccurred()) + avail := 0 + fmt.Sscanf(availStr, "%d", &avail) + g.Expect(avail).To(BeNumerically(">=", 3), "All reset pods should be available in pool") + }, 30*time.Second).Should(Succeed()) + + By("cleaning up the Pool") + cmd = exec.Command("kubectl", "delete", "pool", poolName, "-n", testNamespace) + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + }) + }) +}) + +// Helper function to check if string slice contains a string +func containsString(slice []string, s string) bool { + for _, item := range slice { + if item == s { + return true + } + } + return false +} + +// Helper function to get non-empty lines from a string +func getNonEmptyLines(s string) []string { + var lines []string + for _, line := range strings.Split(s, "\n") { + if line != "" { + lines = append(lines, line) + } + } + return lines +} diff --git a/kubernetes/test/e2e/testdata/pool-delete-policy.yaml b/kubernetes/test/e2e/testdata/pool-delete-policy.yaml new file mode 100644 index 000000000..e7aa93395 --- /dev/null +++ b/kubernetes/test/e2e/testdata/pool-delete-policy.yaml @@ -0,0 +1,19 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: {{.PoolName}} + namespace: {{.Namespace}} +spec: + template: + spec: + containers: + - name: sandbox-container + image: {{.TaskExecutorImage}} + command: ["/bin/sh", "-c", "sleep infinity"] + capacitySpec: + bufferMax: 0 + bufferMin: 0 + poolMax: 5 + poolMin: 2 + # podRecyclePolicy: Delete is the default, explicitly set for clarity + podRecyclePolicy: Delete diff --git a/kubernetes/test/e2e/testdata/pool-reuse-multi-pod.yaml b/kubernetes/test/e2e/testdata/pool-reuse-multi-pod.yaml new file mode 100644 index 000000000..2e5c9ab8d --- /dev/null +++ b/kubernetes/test/e2e/testdata/pool-reuse-multi-pod.yaml @@ -0,0 +1,30 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: {{.PoolName}} + namespace: {{.Namespace}} +spec: + template: + metadata: + labels: + app: sandbox + spec: + containers: + - name: sandbox-container + image: {{.TaskExecutorImage}} + command: ["/bin/sh", "-c", "sleep infinity"] + env: + - name: SANDBOX_MAIN_CONTAINER + value: "sandbox-container" + capacitySpec: + bufferMax: 0 + bufferMin: 0 + poolMax: 10 + poolMin: {{.PoolMin}} + # Reuse policy for multi-pod concurrent reset testing + podRecyclePolicy: Reuse + resetSpec: + mainContainerName: sandbox-container + cleanDirectories: + - /tmp + timeoutSeconds: 60 diff --git a/kubernetes/test/e2e/testdata/pool-reuse-no-task-executor.yaml b/kubernetes/test/e2e/testdata/pool-reuse-no-task-executor.yaml new file mode 100644 index 000000000..8860d2cb8 --- /dev/null +++ b/kubernetes/test/e2e/testdata/pool-reuse-no-task-executor.yaml @@ -0,0 +1,25 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: {{.PoolName}} + namespace: {{.Namespace}} +spec: + template: + spec: + containers: + - name: sandbox-container + image: {{.SandboxImage}} + command: ["/bin/sh", "-c", "sleep infinity"] + capacitySpec: + bufferMax: 0 + bufferMin: 0 + poolMax: 5 + poolMin: 2 + # Reuse policy but without task-executor sidecar + # Controller should fallback to Delete policy + podRecyclePolicy: Reuse + resetSpec: + mainContainerName: sandbox-container + cleanDirectories: + - /tmp + timeoutSeconds: 60 diff --git a/kubernetes/test/e2e/testdata/pool-reuse-policy.yaml b/kubernetes/test/e2e/testdata/pool-reuse-policy.yaml new file mode 100644 index 000000000..7f9c46853 --- /dev/null +++ b/kubernetes/test/e2e/testdata/pool-reuse-policy.yaml @@ -0,0 +1,28 @@ +apiVersion: sandbox.opensandbox.io/v1alpha1 +kind: Pool +metadata: + name: {{.PoolName}} + namespace: {{.Namespace}} +spec: + template: + metadata: + labels: + app: sandbox + spec: + containers: + - name: sandbox-container + image: {{.TaskExecutorImage}} + command: ["/bin/sh", "-c", "sleep infinity"] + capacitySpec: + bufferMax: 0 + bufferMin: 0 + poolMax: {{.PoolMax}} + poolMin: {{.PoolMin}} + # Reuse policy - pods will be reset and returned to pool + podRecyclePolicy: Reuse + resetSpec: + mainContainerName: sandbox-container + cleanDirectories: + - /tmp + - /var/cache/* + timeoutSeconds: 60