Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,39 @@ def __init__(
self.deferrable = deferrable
self.max_attempts = max_attempts

def _resume_if_paused(self) -> None:
"""
Resume the cluster first if it is paused, so it can be deleted.

A ``paused`` Redshift cluster cannot be deleted -- ``delete_cluster`` raises
``InvalidClusterStateFault`` ("There is an operation running on the Cluster") and no
amount of retrying helps, because a paused cluster never leaves that state on its own.
Left unhandled, the cluster is silently leaked (it stays paused indefinitely until
external cleanup reaps it). Resume it and wait until it is ``available`` before deleting.
"""
try:
cluster_state = self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
except self.hook.conn.exceptions.ClusterNotFoundFault:
return

if cluster_state != "paused":
return

self.log.info(
"Cluster %s is paused; resuming it before deletion (a paused cluster cannot be deleted).",
self.cluster_identifier,
)
self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier)
self.hook.conn.get_waiter("cluster_available").wait(
ClusterIdentifier=self.cluster_identifier,
WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": self.max_attempts},
)

def execute(self, context: Context):
# A paused cluster cannot be deleted; resume it first (otherwise the retry loop below
# would exhaust against InvalidClusterStateFault and the cluster would be leaked).
self._resume_if_paused()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something to ensure this stays transactional? If we resume the cluster, then fail sometime between now and the deletion then the cluster is now running unexpectedly when the user thought it was paused (essentially the inverse of the situation that we find ourselves in now).

We should at least make this an opt in perhaps if we can't ensure the operation is transactional.


while self._attempts:
try:
self.hook.delete_cluster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,60 @@ def test_delete_cluster_without_wait_for_completion(self, mock_conn):

mock_conn.cluster_status.assert_not_called()

@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
@mock.patch.object(RedshiftHook, "conn")
def test_delete_paused_cluster_resumes_first(self, mock_conn, mock_cluster_status):
"""A paused cluster must be resumed (and wait for available) before it can be deleted.

A paused cluster cannot be deleted -- ``delete_cluster`` raises
``InvalidClusterStateFault`` and no retry helps because a paused cluster never resumes on
its own, so it would be silently leaked. The operator resumes it first.
"""
# First lookup (in _resume_if_paused) reports paused.
mock_cluster_status.return_value = "paused"

redshift_operator = RedshiftDeleteClusterOperator(
task_id="task_test",
cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test",
wait_for_completion=False,
)
redshift_operator.execute(None)

# Cluster was resumed and waited-on before the delete call.
mock_conn.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster")
mock_conn.get_waiter.assert_called_once_with("cluster_available")
mock_conn.get_waiter.return_value.wait.assert_called_once_with(
ClusterIdentifier="test_cluster",
WaiterConfig={"Delay": 30, "MaxAttempts": 30},
)
mock_conn.delete_cluster.assert_called_once_with(
ClusterIdentifier="test_cluster",
SkipFinalClusterSnapshot=True,
FinalClusterSnapshotIdentifier="",
)

@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
@mock.patch.object(RedshiftHook, "conn")
def test_delete_available_cluster_does_not_resume(self, mock_conn, mock_cluster_status):
"""An already-available cluster is deleted directly, without a spurious resume."""
mock_cluster_status.return_value = "available"

redshift_operator = RedshiftDeleteClusterOperator(
task_id="task_test",
cluster_identifier="test_cluster",
aws_conn_id="aws_conn_test",
wait_for_completion=False,
)
redshift_operator.execute(None)

mock_conn.resume_cluster.assert_not_called()
mock_conn.delete_cluster.assert_called_once_with(
ClusterIdentifier="test_cluster",
SkipFinalClusterSnapshot=True,
FinalClusterSnapshotIdentifier="",
)

@mock.patch.object(RedshiftHook, "delete_cluster")
@mock.patch.object(RedshiftHook, "conn")
@mock.patch("time.sleep", return_value=None)
Expand Down
Loading