From eb79c96e98e462e45cd2388fdb91ddbc8ba89469 Mon Sep 17 00:00:00 2001 From: Joe Li Date: Wed, 17 Jun 2026 15:53:44 -0700 Subject: [PATCH 1/2] fix(reports): bound webhook notification retry wall-clock time Webhook notification retries used backoff without a max_time bound, so a hanging or persistently-failing target could stall a worker for minutes per bad URL (up to ~5 socket waits at timeout=60 plus retry sleeps), starving sequential report dispatch. Add max_time=120 to the backoff decorator on WebhookNotification.send. factor/base/max_tries are unchanged, so legitimately-transient 5xx targets are still retried; max_time only caps total wall-clock so a bad target cannot monopolize a worker. max_time is checked between attempts, so the final in-flight request still runs its full timeout. Co-Authored-By: Claude Opus 4.8 (1M context) --- superset/reports/notifications/webhook.py | 14 +- .../reports/notifications/webhook_tests.py | 170 ++++++++++++++++++ 2 files changed, 183 insertions(+), 1 deletion(-) diff --git a/superset/reports/notifications/webhook.py b/superset/reports/notifications/webhook.py index c2e660765b6b..8f592ea5709c 100644 --- a/superset/reports/notifications/webhook.py +++ b/superset/reports/notifications/webhook.py @@ -128,7 +128,19 @@ def _validate_webhook_url(self, url: str) -> None: raise NotificationParamException("Webhook URL target host is not allowed.") @backoff.on_exception( - backoff.expo, NotificationUnprocessableException, factor=10, base=2, max_tries=5 + backoff.expo, + NotificationUnprocessableException, + factor=10, + base=2, + max_tries=5, + # Bound total wall-clock retry time. Without this, a hanging or + # persistently failing target can stall a worker for minutes per bad URL + # (up to ~5 socket waits at timeout=60 plus ~150s of retry sleeps), + # starving sequential report dispatch. max_time is checked between + # attempts, so the final in-flight request can still run its full + # timeout; factor is intentionally kept at 10 so legitimately-transient + # 5xx targets are not abandoned early. + max_time=120, ) @statsd_gauge("reports.webhook.send") def send(self) -> None: diff --git a/tests/unit_tests/reports/notifications/webhook_tests.py b/tests/unit_tests/reports/notifications/webhook_tests.py index da985baa6f28..b97695fd653d 100644 --- a/tests/unit_tests/reports/notifications/webhook_tests.py +++ b/tests/unit_tests/reports/notifications/webhook_tests.py @@ -16,11 +16,14 @@ # under the License. +import datetime as datetime_module + import pandas as pd import pytest from superset.reports.notifications.exceptions import ( NotificationParamException, + NotificationUnprocessableException, ) from superset.reports.notifications.webhook import WebhookNotification from superset.utils.core import HeaderDataType @@ -269,3 +272,170 @@ class MockResponse: with pytest.raises(NotificationParamException, match="redirect"): webhook_notification.send() + + +class _FakeBackoffDatetime: + """ + Drop-in for the ``datetime`` *module* referenced as ``backoff._sync.datetime``. + + backoff 2.2.1 computes the ``max_time`` elapsed via + ``datetime.datetime.now()`` inside ``backoff._sync`` (NOT via ``time``), so + patching only ``backoff._sync.time.sleep`` leaves ``elapsed`` pinned at 0 and + ``max_time`` never fires. This fake advances the clock a fixed ``step`` per + ``now()`` call so the wall-time bound is observable in a sub-second test. + A ``step`` of 0 holds the clock flat (elapsed stays 0). + """ + + def __init__(self, step_seconds: float) -> None: + base = datetime_module.datetime(2020, 1, 1) + state = {"calls": 0} + + class _FakeDateTime: + @staticmethod + def now() -> datetime_module.datetime: + offset = datetime_module.timedelta( + seconds=step_seconds * state["calls"] + ) + state["calls"] += 1 + return base + offset + + self.datetime = _FakeDateTime + + +def _make_webhook(mock_header_data) -> WebhookNotification: + from superset.reports.models import ReportRecipients, ReportRecipientType + from superset.reports.notifications.base import NotificationContent + + content = NotificationContent( + name="test alert", header_data=mock_header_data, description="Test description" + ) + return WebhookNotification( + recipient=ReportRecipients( + type=ReportRecipientType.WEBHOOK, + recipient_config_json='{"target": "https://example.com/webhook"}', + ), + content=content, + ) + + +class _MockServerErrorResponse: + status_code = 500 + text = "" + + +def _allow_internal_app() -> type: + class MockCurrentApp: + config = { + "ALERT_REPORTS_WEBHOOK_HTTPS_ONLY": True, + "ALERT_REPORTS_WEBHOOK_ALLOW_INTERNAL_HOSTS": True, + } + + return MockCurrentApp + + +def test_send_backoff_bounded_by_max_time(monkeypatch, mock_header_data) -> None: + """ + A persistently failing (500) target gives up on wall-time (``max_time``), + not just ``max_tries``. With the fake clock stepping +50s per backoff sample, + elapsed crosses ``max_time=120`` between the 2nd and 3rd POST, so exactly 3 + POSTs happen (distinct from ``max_tries=5``). The terminal exception type is + unchanged on giveup. + """ + webhook_notification = _make_webhook(mock_header_data) + post_calls: list[int] = [] + + def fake_post(*args, **kwargs) -> _MockServerErrorResponse: + post_calls.append(1) + return _MockServerErrorResponse() + + monkeypatch.setattr( + "superset.reports.notifications.webhook.current_app", _allow_internal_app() + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled", + lambda flag: True, + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.requests.post", fake_post + ) + monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None) + monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(50)) + + with pytest.raises(NotificationUnprocessableException): + webhook_notification.send() + + assert len(post_calls) == 3 + + +def test_send_flat_clock_falls_back_to_max_tries(monkeypatch, mock_header_data) -> None: + """ + Characterization (NOT a RED discriminator): with the clock held flat, + ``max_time`` can never fire, so ``max_tries=5`` governs and exactly 5 POSTs + happen. Passes on both buggy and fixed code; its job is to prove the 3-vs-5 + delta in ``test_send_backoff_bounded_by_max_time`` is attributable to + wall-time, not to ``max_tries``. + """ + webhook_notification = _make_webhook(mock_header_data) + post_calls: list[int] = [] + + def fake_post(*args, **kwargs) -> _MockServerErrorResponse: + post_calls.append(1) + return _MockServerErrorResponse() + + monkeypatch.setattr( + "superset.reports.notifications.webhook.current_app", _allow_internal_app() + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled", + lambda flag: True, + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.requests.post", fake_post + ) + monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None) + monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(0)) + + with pytest.raises(NotificationUnprocessableException): + webhook_notification.send() + + assert len(post_calls) == 5 + + +def test_send_max_time_does_not_abandon_recovering_target( + monkeypatch, mock_header_data +) -> None: + """ + No-regression guard: a target that fails twice (500) then succeeds on the + 3rd attempt — cumulative elapsed well under ``max_time`` — still succeeds. + Confirms ``max_time=120`` is not set so low that it abandons a target that + recovers within the normal retry window. + """ + webhook_notification = _make_webhook(mock_header_data) + post_calls: list[int] = [] + + class _OkResponse: + status_code = 200 + text = "" + + def fake_post(*args, **kwargs): + post_calls.append(1) + if len(post_calls) < 3: + return _MockServerErrorResponse() + return _OkResponse() + + monkeypatch.setattr( + "superset.reports.notifications.webhook.current_app", _allow_internal_app() + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled", + lambda flag: True, + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.requests.post", fake_post + ) + monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None) + monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(10)) + + webhook_notification.send() + + assert len(post_calls) == 3 From 96477b390af1c2967f5a4705302ed90d357e5aef Mon Sep 17 00:00:00 2001 From: Joe Li Date: Wed, 17 Jun 2026 15:54:23 -0700 Subject: [PATCH 2/2] fix(reports): handle missing executor user and revert Slack v2 atomically Two robustness fixes in the alert/report execution command: 1. Missing executor user: when the configured executor cannot be resolved (security_manager.find_user returns None), the content-generation sites (_get_screenshots / _get_csv_data / _get_embedded_data) now raise a dedicated ReportScheduleExecutorNotFoundError instead of failing later with an opaque NoneType error. The guard lives at the content sites so it raises inside the state machine's error envelope, preserving the ERROR execution-log row and the owner error notification. The run() boundary continues to delegate to the state machine (a missing user is tolerated there, matching prior behavior) so visibility is unchanged. 2. Slack v2 migration revert: update_report_schedule_slack_v2 now snapshots every recipient it mutates and reverts all of them on failure, instead of only the loop variable, and no longer raises UnboundLocalError when the failure occurs before the loop binds a recipient. Co-Authored-By: Claude Opus 4.8 (1M context) --- superset/commands/report/exceptions.py | 14 ++ superset/commands/report/execute.py | 76 +++++-- .../commands/report/execute_test.py | 192 ++++++++++++++++++ 3 files changed, 264 insertions(+), 18 deletions(-) diff --git a/superset/commands/report/exceptions.py b/superset/commands/report/exceptions.py index b51c2d1cd1f9..318e3df1175e 100644 --- a/superset/commands/report/exceptions.py +++ b/superset/commands/report/exceptions.py @@ -16,6 +16,7 @@ # under the License. import math +from typing import Optional from flask_babel import lazy_gettext as _ @@ -200,6 +201,19 @@ class ReportScheduleDataFrameFailedError(CommandException): message = _("Report Schedule execution failed when generating a dataframe.") +class ReportScheduleExecutorNotFoundError(CommandException): + status = 422 + + def __init__(self, username: str = "", exception: Optional[Exception] = None): + super().__init__( + _( + "Report Schedule executor user %(username)s was not found.", + username=f'"{username}"' if username else "", + ), + exception, + ) + + class ReportScheduleExecuteUnexpectedError(CommandException): message = _("Report Schedule execution got an unexpected error.") diff --git a/superset/commands/report/execute.py b/superset/commands/report/execute.py index fa5fb9422bdc..738276e6ee04 100644 --- a/superset/commands/report/execute.py +++ b/superset/commands/report/execute.py @@ -17,7 +17,7 @@ import logging from collections.abc import Sequence from datetime import datetime, timedelta -from typing import Any, Optional, Union +from typing import Any, Optional, TYPE_CHECKING, Union from uuid import UUID import pandas as pd @@ -37,6 +37,7 @@ ReportScheduleDataFrameFailedError, ReportScheduleDataFrameTimeout, ReportScheduleExecuteUnexpectedError, + ReportScheduleExecutorNotFoundError, ReportScheduleNotFoundError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, @@ -82,9 +83,36 @@ from superset.utils.slack import get_channels_with_search, SlackChannelTypes from superset.utils.urls import get_url_path +if TYPE_CHECKING: + from flask_appbuilder.security.sqla.models import User + logger = logging.getLogger(__name__) +def resolve_executor_user(model: ReportSchedule) -> tuple["User", str]: + """ + Resolve the executor user for a report schedule. + + Determines the configured executor username via ``get_executor`` and looks up + the corresponding user. A deleted/disabled user or a misconfigured + ``ALERT_REPORTS_EXECUTORS`` makes ``security_manager.find_user`` return + ``None``; rather than passing ``None`` into the webdriver/auth flow (which + fails with an opaque NoneType error), raise a dedicated, actionable error. + + :returns: the ``(user, username)`` pair — the username is returned alongside + the user because several call sites log it after resolution. + :raises ReportScheduleExecutorNotFoundError: if the executor user is missing. + """ + _, username = get_executor( + executors=app.config["ALERT_REPORTS_EXECUTORS"], + model=model, + ) + user = security_manager.find_user(username) + if user is None: + raise ReportScheduleExecutorNotFoundError(username) + return user, username + + class BaseReportState: current_states: list[ReportState] = [] initial: bool = False @@ -135,9 +163,17 @@ def update_report_schedule_slack_v2(self) -> None: Update the report schedule type and channels for all slack recipients to v2. V2 uses ids instead of names for channels. """ + # Track each recipient mutated in this pass with its original (type, + # config) so a partial failure can revert ALL of them — not just the + # loop variable. Restoring the values to their loaded state keeps the + # persisted rows unchanged regardless of any intervening commit. + mutated: list[tuple[ReportRecipients, ReportRecipientType, str]] = [] try: for recipient in self._report_schedule.recipients: if recipient.type == ReportRecipientType.SLACK: + mutated.append( + (recipient, recipient.type, recipient.recipient_config_json) + ) recipient.type = ReportRecipientType.SLACKV2 slack_recipients = json.loads(recipient.recipient_config_json) # V1 method allowed to use leading `#` in the channel name @@ -169,8 +205,15 @@ def update_report_schedule_slack_v2(self) -> None: } ) except Exception as ex: - # Revert to v1 to preserve configuration (requires manual fix) - recipient.type = ReportRecipientType.SLACK + # Revert every mutated recipient to v1 (both type AND config) to + # preserve configuration (requires manual fix). Reverting the full + # set — not just the loop variable — keeps earlier recipients + # consistent; iterating the snapshot also avoids the UnboundLocalError + # that a bare loop-variable reference raises on a pre-iteration + # failure (which would mask the real error). + for reverted_recipient, original_type, original_config in mutated: + reverted_recipient.type = original_type + reverted_recipient.recipient_config_json = original_config msg = f"Failed to update slack recipients to v2: {str(ex)}" logger.exception(msg) raise UpdateFailedError(msg) from ex @@ -423,11 +466,7 @@ def _get_screenshots(self) -> list[bytes]: """ start_time = datetime.utcnow() - _, username = get_executor( - executors=app.config["ALERT_REPORTS_EXECUTORS"], - model=self._report_schedule, - ) - user = security_manager.find_user(username) + user, username = resolve_executor_user(self._report_schedule) max_width = app.config["ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH"] @@ -509,11 +548,7 @@ def _get_pdf(self) -> bytes: def _get_csv_data(self) -> bytes: start_time = datetime.utcnow() url = self._get_url(result_format=ChartDataResultFormat.CSV) - _, username = get_executor( - executors=app.config["ALERT_REPORTS_EXECUTORS"], - model=self._report_schedule, - ) - user = security_manager.find_user(username) + user, username = resolve_executor_user(self._report_schedule) auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user) if self._report_schedule.chart.query_context is None: @@ -559,11 +594,7 @@ def _get_embedded_data(self) -> pd.DataFrame: start_time = datetime.utcnow() url = self._get_url(result_format=ChartDataResultFormat.JSON) - _, username = get_executor( - executors=app.config["ALERT_REPORTS_EXECUTORS"], - model=self._report_schedule, - ) - user = security_manager.find_user(username) + user, username = resolve_executor_user(self._report_schedule) auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user) if self._report_schedule.chart.query_context is None: @@ -1164,6 +1195,15 @@ def run(self) -> None: if not self._model: raise ReportScheduleExecuteUnexpectedError() + # Resolve the executor at the run() boundary the same way master + # does: tolerate a missing user (find_user -> None) so the state + # machine still runs and its error envelope writes the ERROR + # execution-log row and sends the owner notification. The dedicated + # ReportScheduleExecutorNotFoundError guard lives at the content + # sites (_get_screenshots / _get_csv_data / _get_embedded_data), + # which raise inside that envelope. Guarding here instead would + # surface a 422 above the state machine, suppressing both the log + # row and the owner notification. _, username = get_executor( executors=app.config["ALERT_REPORTS_EXECUTORS"], model=self._model, diff --git a/tests/unit_tests/commands/report/execute_test.py b/tests/unit_tests/commands/report/execute_test.py index a86c5dfffc35..7ded647207db 100644 --- a/tests/unit_tests/commands/report/execute_test.py +++ b/tests/unit_tests/commands/report/execute_test.py @@ -28,6 +28,7 @@ from superset.commands.report.exceptions import ( ReportScheduleAlertGracePeriodError, ReportScheduleCsvFailedError, + ReportScheduleExecutorNotFoundError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, @@ -995,6 +996,81 @@ def test_screenshot_width_calculation( ) +def _executor_report_state(mocker: MockerFixture) -> BaseReportState: + report_schedule = create_report_schedule(mocker) + # _get_csv_data/_get_embedded_data build a chart-data URL from chart_id + # before resolving the executor; give it a concrete value so URL building + # succeeds and the executor resolution is actually reached. + report_schedule.chart_id = 1 + report_schedule.force_screenshot = False + return BaseReportState( + report_schedule=report_schedule, + scheduled_dttm=datetime.now(), + execution_id=UUID("084e7ee6-5557-4ecd-9632-b7f39c9ec524"), + ) + + +@pytest.mark.parametrize( + "method_name", + ["_get_screenshots", "_get_csv_data", "_get_embedded_data"], +) +def test_get_content_raises_when_executor_user_missing( + app: SupersetApp, mocker: MockerFixture, method_name: str +) -> None: + """ + When the configured executor user cannot be resolved + (``security_manager.find_user`` returns ``None``), each content path raises a + dedicated ``ReportScheduleExecutorNotFoundError`` naming the username, rather + than passing ``None`` downstream and failing with an opaque NoneType error. + """ + app.config.update( + { + "ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH": 1600, + "WEBDRIVER_WINDOW": {"slice": (800, 600), "dashboard": (800, 600)}, + "ALERT_REPORTS_EXECUTORS": {}, + } + ) + report_state = _executor_report_state(mocker) + + with ( + patch("superset.commands.report.execute.security_manager") as mock_sm, + patch("superset.commands.report.execute.get_executor") as mock_get_executor, + patch("superset.commands.report.execute.machine_auth_provider_factory"), + ): + mock_get_executor.return_value = ("executor", "ghost_user") + mock_sm.find_user = mocker.MagicMock(return_value=None) + + with pytest.raises(ReportScheduleExecutorNotFoundError, match="ghost_user"): + getattr(report_state, method_name)() + + +def test_resolve_executor_user_returns_user_and_username( + app: SupersetApp, mocker: MockerFixture +) -> None: + """ + Happy path: when the executor user exists, the helper returns the + ``(user, username)`` tuple unchanged — locking the no-behavior-change exit + criterion for the four call sites. + """ + from superset.commands.report.execute import resolve_executor_user + + app.config.update({"ALERT_REPORTS_EXECUTORS": {}}) + report_schedule = create_report_schedule(mocker) + mock_user = mocker.MagicMock() + + with ( + patch("superset.commands.report.execute.security_manager") as mock_sm, + patch("superset.commands.report.execute.get_executor") as mock_get_executor, + ): + mock_get_executor.return_value = ("executor", "real_user") + mock_sm.find_user = mocker.MagicMock(return_value=mock_user) + + user, username = resolve_executor_user(report_schedule) + + assert user is mock_user + assert username == "real_user" + + def test_update_recipient_to_slack_v2(mocker: MockerFixture): """ Test converting a Slack recipient to Slack v2 format. @@ -1070,6 +1146,122 @@ def test_update_recipient_to_slack_v2_missing_channels(mocker: MockerFixture): mock_cmmd.update_report_schedule_slack_v2() +def test_update_recipient_to_slack_v2_reverts_all_on_partial_failure( + mocker: MockerFixture, +) -> None: + """ + When the second of two Slack recipients fails channel resolution, BOTH + recipients are fully reverted — type AND exact original + ``recipient_config_json`` string — not just the loop variable's type. This + prevents the intervening ``create_log`` commit from flushing a half-migrated, + inconsistent state. + """ + + def channels_side_effect(search_string, types, exact_match): + if search_string == "Channel-1": + return [ + { + "id": "id_channel_1", + "name": "Channel-1", + "is_member": True, + "is_private": False, + } + ] + # Second recipient: no channel found → length mismatch → UpdateFailedError + return [] + + mocker.patch( + "superset.commands.report.execute.get_channels_with_search", + side_effect=channels_side_effect, + ) + original_config_1 = json.dumps({"target": "Channel-1"}) + original_config_2 = json.dumps({"target": "Channel-2"}) + mock_report_schedule = ReportSchedule( + name="Test Report", + recipients=[ + ReportRecipients( + type=ReportRecipientType.SLACK, + recipient_config_json=original_config_1, + ), + ReportRecipients( + type=ReportRecipientType.SLACK, + recipient_config_json=original_config_2, + ), + ], + ) + + mock_cmmd = BaseReportState( + mock_report_schedule, "January 1, 2021", "execution_id_example" + ) + + with pytest.raises(UpdateFailedError): + mock_cmmd.update_report_schedule_slack_v2() + + first, second = mock_report_schedule.recipients + # The first recipient was mutated to v2 before the second failed; it must be + # reverted to its exact original type AND config string. + assert first.type == ReportRecipientType.SLACK + assert first.recipient_config_json == original_config_1 + assert second.type == ReportRecipientType.SLACK + assert second.recipient_config_json == original_config_2 + + +def test_update_recipient_to_slack_v2_pre_iteration_failure( + mocker: MockerFixture, +) -> None: + """ + A failure raised while accessing/iterating the recipients (before the loop + variable is bound) surfaces as ``UpdateFailedError``, not a ``NameError`` + that would mask the real error. + """ + + class _ExplodingRecipients: + def __iter__(self): + raise RuntimeError("recipients exploded") + + mock_report_schedule = mocker.MagicMock() + mock_report_schedule.recipients = _ExplodingRecipients() + + mock_cmmd = BaseReportState( + mock_report_schedule, "January 1, 2021", "execution_id_example" + ) + + with pytest.raises(UpdateFailedError): + mock_cmmd.update_report_schedule_slack_v2() + + +def test_update_recipient_to_slack_v2_no_slack_recipients_is_noop( + mocker: MockerFixture, +) -> None: + """ + With no SLACK recipients there is nothing to migrate: the method returns + without raising and leaves the non-Slack recipients untouched. + """ + mock_search = mocker.patch( + "superset.commands.report.execute.get_channels_with_search", + ) + mock_report_schedule = ReportSchedule( + recipients=[ + ReportRecipients( + type=ReportRecipientType.EMAIL, + recipient_config_json=json.dumps({"target": "user@example.com"}), + ), + ], + ) + + mock_cmmd: BaseReportState = BaseReportState( + mock_report_schedule, "January 1, 2021", "execution_id_example" + ) + mock_cmmd.update_report_schedule_slack_v2() + + assert mock_cmmd._report_schedule.recipients[0].type == ReportRecipientType.EMAIL + assert ( + mock_cmmd._report_schedule.recipients[0].recipient_config_json + == '{"target": "user@example.com"}' + ) + mock_search.assert_not_called() + + # --------------------------------------------------------------------------- # Tier 1: _update_query_context + create_log # ---------------------------------------------------------------------------