From b3ae9163009ec3e29f95b341377c494ad0bddb6c Mon Sep 17 00:00:00 2001 From: Sean Ghaeli Date: Tue, 23 Jun 2026 23:23:43 +0000 Subject: [PATCH] Resume a paused Redshift cluster before deleting it (opt-in) A ``paused`` Redshift cluster cannot be deleted: ``delete_cluster`` raises ``InvalidClusterStateFault`` ("There is an operation running on the Cluster"), and ``RedshiftDeleteClusterOperator``'s retry loop can't recover, since a paused cluster never leaves that state on its own, so the cluster is left behind -- silently leaked until external cleanup reaps it. Add an opt-in ``resume_if_paused`` flag (default ``False``): when enabled, a paused cluster is resumed (and waited on until ``available``) before deletion. Clusters that are not paused are unaffected. Resume and delete are separate, non-transactional AWS calls: if the task fails after resuming but before deleting, the cluster is left running rather than paused. The flag is therefore opt-in and documented as such, so it is only used when leaving a paused cluster behind is the worse outcome (e.g. ephemeral test-cluster teardown). Generated-by: Claude Code (Opus via Claude Code) on behalf of Sean Ghaeli --- .../amazon/aws/operators/redshift_cluster.py | 47 ++++++++ .../aws/operators/test_redshift_cluster.py | 109 ++++++++++++++++++ 2 files changed, 156 insertions(+) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py index ea0d1272db8c6..5e7961aad90d9 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -837,6 +837,8 @@ class RedshiftDeleteClusterOperator(AwsBaseOperator[RedshiftHook]): :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state :param deferrable: Run operator in the deferrable mode. :param max_attempts: (Deferrable mode only) The maximum number of attempts to be made + :param resume_if_paused: If ``True``, resume a paused cluster and wait for it + to become ``available`` before deleting it. Defaults to ``False``. """ template_fields: Sequence[str] = aws_template_fields( @@ -856,6 +858,7 @@ def __init__( poll_interval: int = 30, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), max_attempts: int = 30, + resume_if_paused: bool = False, **kwargs, ): super().__init__(**kwargs) @@ -871,8 +874,52 @@ def __init__( self._attempt_interval = 15 self.deferrable = deferrable self.max_attempts = max_attempts + self.resume_if_paused = resume_if_paused + + def _resume_if_paused(self) -> None: + """ + Resume the cluster if it is paused. + + A paused Redshift cluster cannot be deleted. If the cluster is currently paused, resume it + and wait until it reaches the ``available`` state before continuing. + """ + # Gated behind the opt-in ``resume_if_paused`` flag: resume and delete are two separate, + # non-transactional AWS calls, so a failure between them would leave the cluster running. + try: + cluster_state = self.hook.cluster_status(cluster_identifier=self.cluster_identifier) + except self.hook.conn.exceptions.ClusterNotFoundFault: + self.log.info( + "Cluster %s not found while checking whether resume is required.", + self.cluster_identifier, + ) + return + + if cluster_state != "paused": + self.log.info( + "Cluster %s is in state %s; skipping resume.", + self.cluster_identifier, + cluster_state, + ) + return + + self.log.info( + "Cluster %s is paused; resuming it before deletion (a paused cluster cannot be deleted).", + self.cluster_identifier, + ) + self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier) + self.hook.conn.get_waiter("cluster_available").wait( + ClusterIdentifier=self.cluster_identifier, + WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": self.max_attempts}, + ) def execute(self, context: Context): + # A paused cluster cannot be deleted; optionally resume it first (otherwise the retry loop + # below would exhaust against InvalidClusterStateFault and the cluster would be leaked). + # Opt-in (resume_if_paused) because resume+delete is not transactional -- see + # _resume_if_paused. + if self.resume_if_paused: + self._resume_if_paused() + while self._attempts: try: self.hook.delete_cluster( diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py index db6466125a035..a767fe09f5507 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py @@ -748,6 +748,115 @@ def test_delete_cluster_without_wait_for_completion(self, mock_conn): mock_conn.cluster_status.assert_not_called() + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_paused_cluster_resumes_first_when_opted_in(self, mock_conn, mock_cluster_status): + """With ``resume_if_paused=True`` a paused cluster is resumed (and waited on) before delete. + + A paused cluster cannot be deleted -- ``delete_cluster`` raises + ``InvalidClusterStateFault`` and no retry helps because a paused cluster never resumes on + its own, so it would be silently leaked. Opting in resumes it first. + """ + # First lookup (in _resume_if_paused) reports paused. + mock_cluster_status.return_value = "paused" + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + resume_if_paused=True, + ) + redshift_operator.execute(None) + + # Cluster was resumed and waited-on before the delete call. + mock_conn.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") + mock_conn.get_waiter.assert_called_once_with("cluster_available") + mock_conn.get_waiter.return_value.wait.assert_called_once_with( + ClusterIdentifier="test_cluster", + WaiterConfig={"Delay": 30, "MaxAttempts": 30}, + ) + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_paused_cluster_does_not_resume_by_default(self, mock_conn, mock_cluster_status): + """Default behavior is opt-out: a paused cluster is NOT resumed without ``resume_if_paused``. + + Resume and delete are not transactional, so auto-resuming a paused cluster could leave it + running if the task fails between the two calls. The default must therefore preserve the + prior behavior and never resume on its own. + """ + mock_cluster_status.return_value = "paused" + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + ) + redshift_operator.execute(None) + + # No resume attempted; the cluster_status lookup is not even performed. + mock_conn.resume_cluster.assert_not_called() + mock_cluster_status.assert_not_called() + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_available_cluster_does_not_resume(self, mock_conn, mock_cluster_status): + """An already-available cluster is deleted directly, without a spurious resume.""" + mock_cluster_status.return_value = "available" + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + resume_if_paused=True, + ) + redshift_operator.execute(None) + + mock_conn.resume_cluster.assert_not_called() + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_missing_cluster_skips_resume(self, mock_conn, mock_cluster_status): + """A missing cluster (ClusterNotFoundFault during the status check) is ignored, not raised.""" + not_found = boto3.client("redshift").exceptions.ClusterNotFoundFault({}, "test") + mock_conn.exceptions.ClusterNotFoundFault = type(not_found) + mock_cluster_status.side_effect = not_found + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + resume_if_paused=True, + ) + redshift_operator.execute(None) + + # No resume attempted; deletion still proceeds (delete_cluster handles the missing cluster). + mock_conn.resume_cluster.assert_not_called() + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + @mock.patch.object(RedshiftHook, "delete_cluster") @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None)