diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py index ea0d1272db8c6..fe96305c035d2 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py @@ -837,6 +837,12 @@ class RedshiftDeleteClusterOperator(AwsBaseOperator[RedshiftHook]): :param poll_interval: Time (in seconds) to wait between two consecutive calls to check cluster state :param deferrable: Run operator in the deferrable mode. :param max_attempts: (Deferrable mode only) The maximum number of attempts to be made + :param resume_if_paused: If True, a ``paused`` cluster is resumed (and waited on until + ``available``) before deletion, since a paused cluster cannot be deleted and would + otherwise be left behind. Defaults to ``False``. Note that resume and delete are + separate, non-transactional AWS calls: if the task fails after resuming but before + deleting, the cluster is left running rather than paused. Enable only when leaving a + paused cluster behind is the worse outcome (e.g. ephemeral test-cluster teardown). """ template_fields: Sequence[str] = aws_template_fields( @@ -856,6 +862,7 @@ def __init__( poll_interval: int = 30, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), max_attempts: int = 30, + resume_if_paused: bool = False, **kwargs, ): super().__init__(**kwargs) @@ -871,8 +878,50 @@ def __init__( self._attempt_interval = 15 self.deferrable = deferrable self.max_attempts = max_attempts + self.resume_if_paused = resume_if_paused + + def _resume_if_paused(self) -> None: + """ + Resume the cluster first if it is paused, so it can be deleted. + + A ``paused`` Redshift cluster cannot be deleted -- ``delete_cluster`` raises + ``InvalidClusterStateFault`` ("There is an operation running on the Cluster") and no + amount of retrying helps, because a paused cluster never leaves that state on its own. + Left unhandled, the cluster is silently leaked (it stays paused indefinitely until + external cleanup reaps it). Resume it and wait until it is ``available`` before deleting. + + Gated behind ``resume_if_paused`` (opt-in). Resume and delete are two separate AWS calls + and cannot be made transactional: if the task fails after the resume succeeds but before + the delete does, the cluster is left ``available`` (running) rather than paused -- the + inverse surprise. Callers opt in only when leaving a paused cluster behind is the worse + outcome (e.g. teardown of an ephemeral test cluster). + """ + try: + cluster_state = self.hook.cluster_status(cluster_identifier=self.cluster_identifier) + except self.hook.conn.exceptions.ClusterNotFoundFault: + return + + if cluster_state != "paused": + return + + self.log.info( + "Cluster %s is paused; resuming it before deletion (a paused cluster cannot be deleted).", + self.cluster_identifier, + ) + self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier) + self.hook.conn.get_waiter("cluster_available").wait( + ClusterIdentifier=self.cluster_identifier, + WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": self.max_attempts}, + ) def execute(self, context: Context): + # A paused cluster cannot be deleted; optionally resume it first (otherwise the retry loop + # below would exhaust against InvalidClusterStateFault and the cluster would be leaked). + # Opt-in (resume_if_paused) because resume+delete is not transactional -- see + # _resume_if_paused. + if self.resume_if_paused: + self._resume_if_paused() + while self._attempts: try: self.hook.delete_cluster( diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py index db6466125a035..281662a56cdf6 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py @@ -748,6 +748,90 @@ def test_delete_cluster_without_wait_for_completion(self, mock_conn): mock_conn.cluster_status.assert_not_called() + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_paused_cluster_resumes_first_when_opted_in(self, mock_conn, mock_cluster_status): + """With ``resume_if_paused=True`` a paused cluster is resumed (and waited on) before delete. + + A paused cluster cannot be deleted -- ``delete_cluster`` raises + ``InvalidClusterStateFault`` and no retry helps because a paused cluster never resumes on + its own, so it would be silently leaked. Opting in resumes it first. + """ + # First lookup (in _resume_if_paused) reports paused. + mock_cluster_status.return_value = "paused" + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + resume_if_paused=True, + ) + redshift_operator.execute(None) + + # Cluster was resumed and waited-on before the delete call. + mock_conn.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster") + mock_conn.get_waiter.assert_called_once_with("cluster_available") + mock_conn.get_waiter.return_value.wait.assert_called_once_with( + ClusterIdentifier="test_cluster", + WaiterConfig={"Delay": 30, "MaxAttempts": 30}, + ) + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_paused_cluster_does_not_resume_by_default(self, mock_conn, mock_cluster_status): + """Default behavior is opt-out: a paused cluster is NOT resumed without ``resume_if_paused``. + + Resume and delete are not transactional, so auto-resuming a paused cluster could leave it + running if the task fails between the two calls. The default must therefore preserve the + prior behavior and never resume on its own. + """ + mock_cluster_status.return_value = "paused" + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + ) + redshift_operator.execute(None) + + # No resume attempted; the cluster_status lookup is not even performed. + mock_conn.resume_cluster.assert_not_called() + mock_cluster_status.assert_not_called() + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + + @mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status") + @mock.patch.object(RedshiftHook, "conn") + def test_delete_available_cluster_does_not_resume(self, mock_conn, mock_cluster_status): + """An already-available cluster is deleted directly, without a spurious resume.""" + mock_cluster_status.return_value = "available" + + redshift_operator = RedshiftDeleteClusterOperator( + task_id="task_test", + cluster_identifier="test_cluster", + aws_conn_id="aws_conn_test", + wait_for_completion=False, + resume_if_paused=True, + ) + redshift_operator.execute(None) + + mock_conn.resume_cluster.assert_not_called() + mock_conn.delete_cluster.assert_called_once_with( + ClusterIdentifier="test_cluster", + SkipFinalClusterSnapshot=True, + FinalClusterSnapshotIdentifier="", + ) + @mock.patch.object(RedshiftHook, "delete_cluster") @mock.patch.object(RedshiftHook, "conn") @mock.patch("time.sleep", return_value=None)