diff --git a/superset/commands/report/exceptions.py b/superset/commands/report/exceptions.py index b51c2d1cd1f9..318e3df1175e 100644 --- a/superset/commands/report/exceptions.py +++ b/superset/commands/report/exceptions.py @@ -16,6 +16,7 @@ # under the License. import math +from typing import Optional from flask_babel import lazy_gettext as _ @@ -200,6 +201,19 @@ class ReportScheduleDataFrameFailedError(CommandException): message = _("Report Schedule execution failed when generating a dataframe.") +class ReportScheduleExecutorNotFoundError(CommandException): + status = 422 + + def __init__(self, username: str = "", exception: Optional[Exception] = None): + super().__init__( + _( + "Report Schedule executor user %(username)s was not found.", + username=f'"{username}"' if username else "", + ), + exception, + ) + + class ReportScheduleExecuteUnexpectedError(CommandException): message = _("Report Schedule execution got an unexpected error.") diff --git a/superset/commands/report/execute.py b/superset/commands/report/execute.py index fa5fb9422bdc..738276e6ee04 100644 --- a/superset/commands/report/execute.py +++ b/superset/commands/report/execute.py @@ -17,7 +17,7 @@ import logging from collections.abc import Sequence from datetime import datetime, timedelta -from typing import Any, Optional, Union +from typing import Any, Optional, TYPE_CHECKING, Union from uuid import UUID import pandas as pd @@ -37,6 +37,7 @@ ReportScheduleDataFrameFailedError, ReportScheduleDataFrameTimeout, ReportScheduleExecuteUnexpectedError, + ReportScheduleExecutorNotFoundError, ReportScheduleNotFoundError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, @@ -82,9 +83,36 @@ from superset.utils.slack import get_channels_with_search, SlackChannelTypes from superset.utils.urls import get_url_path +if TYPE_CHECKING: + from flask_appbuilder.security.sqla.models import User + logger = logging.getLogger(__name__) +def resolve_executor_user(model: ReportSchedule) -> tuple["User", str]: + """ + Resolve the executor user for a report schedule. + + Determines the configured executor username via ``get_executor`` and looks up + the corresponding user. A deleted/disabled user or a misconfigured + ``ALERT_REPORTS_EXECUTORS`` makes ``security_manager.find_user`` return + ``None``; rather than passing ``None`` into the webdriver/auth flow (which + fails with an opaque NoneType error), raise a dedicated, actionable error. + + :returns: the ``(user, username)`` pair — the username is returned alongside + the user because several call sites log it after resolution. + :raises ReportScheduleExecutorNotFoundError: if the executor user is missing. + """ + _, username = get_executor( + executors=app.config["ALERT_REPORTS_EXECUTORS"], + model=model, + ) + user = security_manager.find_user(username) + if user is None: + raise ReportScheduleExecutorNotFoundError(username) + return user, username + + class BaseReportState: current_states: list[ReportState] = [] initial: bool = False @@ -135,9 +163,17 @@ def update_report_schedule_slack_v2(self) -> None: Update the report schedule type and channels for all slack recipients to v2. V2 uses ids instead of names for channels. """ + # Track each recipient mutated in this pass with its original (type, + # config) so a partial failure can revert ALL of them — not just the + # loop variable. Restoring the values to their loaded state keeps the + # persisted rows unchanged regardless of any intervening commit. + mutated: list[tuple[ReportRecipients, ReportRecipientType, str]] = [] try: for recipient in self._report_schedule.recipients: if recipient.type == ReportRecipientType.SLACK: + mutated.append( + (recipient, recipient.type, recipient.recipient_config_json) + ) recipient.type = ReportRecipientType.SLACKV2 slack_recipients = json.loads(recipient.recipient_config_json) # V1 method allowed to use leading `#` in the channel name @@ -169,8 +205,15 @@ def update_report_schedule_slack_v2(self) -> None: } ) except Exception as ex: - # Revert to v1 to preserve configuration (requires manual fix) - recipient.type = ReportRecipientType.SLACK + # Revert every mutated recipient to v1 (both type AND config) to + # preserve configuration (requires manual fix). Reverting the full + # set — not just the loop variable — keeps earlier recipients + # consistent; iterating the snapshot also avoids the UnboundLocalError + # that a bare loop-variable reference raises on a pre-iteration + # failure (which would mask the real error). + for reverted_recipient, original_type, original_config in mutated: + reverted_recipient.type = original_type + reverted_recipient.recipient_config_json = original_config msg = f"Failed to update slack recipients to v2: {str(ex)}" logger.exception(msg) raise UpdateFailedError(msg) from ex @@ -423,11 +466,7 @@ def _get_screenshots(self) -> list[bytes]: """ start_time = datetime.utcnow() - _, username = get_executor( - executors=app.config["ALERT_REPORTS_EXECUTORS"], - model=self._report_schedule, - ) - user = security_manager.find_user(username) + user, username = resolve_executor_user(self._report_schedule) max_width = app.config["ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH"] @@ -509,11 +548,7 @@ def _get_pdf(self) -> bytes: def _get_csv_data(self) -> bytes: start_time = datetime.utcnow() url = self._get_url(result_format=ChartDataResultFormat.CSV) - _, username = get_executor( - executors=app.config["ALERT_REPORTS_EXECUTORS"], - model=self._report_schedule, - ) - user = security_manager.find_user(username) + user, username = resolve_executor_user(self._report_schedule) auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user) if self._report_schedule.chart.query_context is None: @@ -559,11 +594,7 @@ def _get_embedded_data(self) -> pd.DataFrame: start_time = datetime.utcnow() url = self._get_url(result_format=ChartDataResultFormat.JSON) - _, username = get_executor( - executors=app.config["ALERT_REPORTS_EXECUTORS"], - model=self._report_schedule, - ) - user = security_manager.find_user(username) + user, username = resolve_executor_user(self._report_schedule) auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user) if self._report_schedule.chart.query_context is None: @@ -1164,6 +1195,15 @@ def run(self) -> None: if not self._model: raise ReportScheduleExecuteUnexpectedError() + # Resolve the executor at the run() boundary the same way master + # does: tolerate a missing user (find_user -> None) so the state + # machine still runs and its error envelope writes the ERROR + # execution-log row and sends the owner notification. The dedicated + # ReportScheduleExecutorNotFoundError guard lives at the content + # sites (_get_screenshots / _get_csv_data / _get_embedded_data), + # which raise inside that envelope. Guarding here instead would + # surface a 422 above the state machine, suppressing both the log + # row and the owner notification. _, username = get_executor( executors=app.config["ALERT_REPORTS_EXECUTORS"], model=self._model, diff --git a/superset/reports/notifications/webhook.py b/superset/reports/notifications/webhook.py index c2e660765b6b..8f592ea5709c 100644 --- a/superset/reports/notifications/webhook.py +++ b/superset/reports/notifications/webhook.py @@ -128,7 +128,19 @@ def _validate_webhook_url(self, url: str) -> None: raise NotificationParamException("Webhook URL target host is not allowed.") @backoff.on_exception( - backoff.expo, NotificationUnprocessableException, factor=10, base=2, max_tries=5 + backoff.expo, + NotificationUnprocessableException, + factor=10, + base=2, + max_tries=5, + # Bound total wall-clock retry time. Without this, a hanging or + # persistently failing target can stall a worker for minutes per bad URL + # (up to ~5 socket waits at timeout=60 plus ~150s of retry sleeps), + # starving sequential report dispatch. max_time is checked between + # attempts, so the final in-flight request can still run its full + # timeout; factor is intentionally kept at 10 so legitimately-transient + # 5xx targets are not abandoned early. + max_time=120, ) @statsd_gauge("reports.webhook.send") def send(self) -> None: diff --git a/tests/unit_tests/commands/report/execute_test.py b/tests/unit_tests/commands/report/execute_test.py index a86c5dfffc35..7ded647207db 100644 --- a/tests/unit_tests/commands/report/execute_test.py +++ b/tests/unit_tests/commands/report/execute_test.py @@ -28,6 +28,7 @@ from superset.commands.report.exceptions import ( ReportScheduleAlertGracePeriodError, ReportScheduleCsvFailedError, + ReportScheduleExecutorNotFoundError, ReportSchedulePreviousWorkingError, ReportScheduleScreenshotFailedError, ReportScheduleScreenshotTimeout, @@ -995,6 +996,81 @@ def test_screenshot_width_calculation( ) +def _executor_report_state(mocker: MockerFixture) -> BaseReportState: + report_schedule = create_report_schedule(mocker) + # _get_csv_data/_get_embedded_data build a chart-data URL from chart_id + # before resolving the executor; give it a concrete value so URL building + # succeeds and the executor resolution is actually reached. + report_schedule.chart_id = 1 + report_schedule.force_screenshot = False + return BaseReportState( + report_schedule=report_schedule, + scheduled_dttm=datetime.now(), + execution_id=UUID("084e7ee6-5557-4ecd-9632-b7f39c9ec524"), + ) + + +@pytest.mark.parametrize( + "method_name", + ["_get_screenshots", "_get_csv_data", "_get_embedded_data"], +) +def test_get_content_raises_when_executor_user_missing( + app: SupersetApp, mocker: MockerFixture, method_name: str +) -> None: + """ + When the configured executor user cannot be resolved + (``security_manager.find_user`` returns ``None``), each content path raises a + dedicated ``ReportScheduleExecutorNotFoundError`` naming the username, rather + than passing ``None`` downstream and failing with an opaque NoneType error. + """ + app.config.update( + { + "ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH": 1600, + "WEBDRIVER_WINDOW": {"slice": (800, 600), "dashboard": (800, 600)}, + "ALERT_REPORTS_EXECUTORS": {}, + } + ) + report_state = _executor_report_state(mocker) + + with ( + patch("superset.commands.report.execute.security_manager") as mock_sm, + patch("superset.commands.report.execute.get_executor") as mock_get_executor, + patch("superset.commands.report.execute.machine_auth_provider_factory"), + ): + mock_get_executor.return_value = ("executor", "ghost_user") + mock_sm.find_user = mocker.MagicMock(return_value=None) + + with pytest.raises(ReportScheduleExecutorNotFoundError, match="ghost_user"): + getattr(report_state, method_name)() + + +def test_resolve_executor_user_returns_user_and_username( + app: SupersetApp, mocker: MockerFixture +) -> None: + """ + Happy path: when the executor user exists, the helper returns the + ``(user, username)`` tuple unchanged — locking the no-behavior-change exit + criterion for the four call sites. + """ + from superset.commands.report.execute import resolve_executor_user + + app.config.update({"ALERT_REPORTS_EXECUTORS": {}}) + report_schedule = create_report_schedule(mocker) + mock_user = mocker.MagicMock() + + with ( + patch("superset.commands.report.execute.security_manager") as mock_sm, + patch("superset.commands.report.execute.get_executor") as mock_get_executor, + ): + mock_get_executor.return_value = ("executor", "real_user") + mock_sm.find_user = mocker.MagicMock(return_value=mock_user) + + user, username = resolve_executor_user(report_schedule) + + assert user is mock_user + assert username == "real_user" + + def test_update_recipient_to_slack_v2(mocker: MockerFixture): """ Test converting a Slack recipient to Slack v2 format. @@ -1070,6 +1146,122 @@ def test_update_recipient_to_slack_v2_missing_channels(mocker: MockerFixture): mock_cmmd.update_report_schedule_slack_v2() +def test_update_recipient_to_slack_v2_reverts_all_on_partial_failure( + mocker: MockerFixture, +) -> None: + """ + When the second of two Slack recipients fails channel resolution, BOTH + recipients are fully reverted — type AND exact original + ``recipient_config_json`` string — not just the loop variable's type. This + prevents the intervening ``create_log`` commit from flushing a half-migrated, + inconsistent state. + """ + + def channels_side_effect(search_string, types, exact_match): + if search_string == "Channel-1": + return [ + { + "id": "id_channel_1", + "name": "Channel-1", + "is_member": True, + "is_private": False, + } + ] + # Second recipient: no channel found → length mismatch → UpdateFailedError + return [] + + mocker.patch( + "superset.commands.report.execute.get_channels_with_search", + side_effect=channels_side_effect, + ) + original_config_1 = json.dumps({"target": "Channel-1"}) + original_config_2 = json.dumps({"target": "Channel-2"}) + mock_report_schedule = ReportSchedule( + name="Test Report", + recipients=[ + ReportRecipients( + type=ReportRecipientType.SLACK, + recipient_config_json=original_config_1, + ), + ReportRecipients( + type=ReportRecipientType.SLACK, + recipient_config_json=original_config_2, + ), + ], + ) + + mock_cmmd = BaseReportState( + mock_report_schedule, "January 1, 2021", "execution_id_example" + ) + + with pytest.raises(UpdateFailedError): + mock_cmmd.update_report_schedule_slack_v2() + + first, second = mock_report_schedule.recipients + # The first recipient was mutated to v2 before the second failed; it must be + # reverted to its exact original type AND config string. + assert first.type == ReportRecipientType.SLACK + assert first.recipient_config_json == original_config_1 + assert second.type == ReportRecipientType.SLACK + assert second.recipient_config_json == original_config_2 + + +def test_update_recipient_to_slack_v2_pre_iteration_failure( + mocker: MockerFixture, +) -> None: + """ + A failure raised while accessing/iterating the recipients (before the loop + variable is bound) surfaces as ``UpdateFailedError``, not a ``NameError`` + that would mask the real error. + """ + + class _ExplodingRecipients: + def __iter__(self): + raise RuntimeError("recipients exploded") + + mock_report_schedule = mocker.MagicMock() + mock_report_schedule.recipients = _ExplodingRecipients() + + mock_cmmd = BaseReportState( + mock_report_schedule, "January 1, 2021", "execution_id_example" + ) + + with pytest.raises(UpdateFailedError): + mock_cmmd.update_report_schedule_slack_v2() + + +def test_update_recipient_to_slack_v2_no_slack_recipients_is_noop( + mocker: MockerFixture, +) -> None: + """ + With no SLACK recipients there is nothing to migrate: the method returns + without raising and leaves the non-Slack recipients untouched. + """ + mock_search = mocker.patch( + "superset.commands.report.execute.get_channels_with_search", + ) + mock_report_schedule = ReportSchedule( + recipients=[ + ReportRecipients( + type=ReportRecipientType.EMAIL, + recipient_config_json=json.dumps({"target": "user@example.com"}), + ), + ], + ) + + mock_cmmd: BaseReportState = BaseReportState( + mock_report_schedule, "January 1, 2021", "execution_id_example" + ) + mock_cmmd.update_report_schedule_slack_v2() + + assert mock_cmmd._report_schedule.recipients[0].type == ReportRecipientType.EMAIL + assert ( + mock_cmmd._report_schedule.recipients[0].recipient_config_json + == '{"target": "user@example.com"}' + ) + mock_search.assert_not_called() + + # --------------------------------------------------------------------------- # Tier 1: _update_query_context + create_log # --------------------------------------------------------------------------- diff --git a/tests/unit_tests/reports/notifications/webhook_tests.py b/tests/unit_tests/reports/notifications/webhook_tests.py index da985baa6f28..b97695fd653d 100644 --- a/tests/unit_tests/reports/notifications/webhook_tests.py +++ b/tests/unit_tests/reports/notifications/webhook_tests.py @@ -16,11 +16,14 @@ # under the License. +import datetime as datetime_module + import pandas as pd import pytest from superset.reports.notifications.exceptions import ( NotificationParamException, + NotificationUnprocessableException, ) from superset.reports.notifications.webhook import WebhookNotification from superset.utils.core import HeaderDataType @@ -269,3 +272,170 @@ class MockResponse: with pytest.raises(NotificationParamException, match="redirect"): webhook_notification.send() + + +class _FakeBackoffDatetime: + """ + Drop-in for the ``datetime`` *module* referenced as ``backoff._sync.datetime``. + + backoff 2.2.1 computes the ``max_time`` elapsed via + ``datetime.datetime.now()`` inside ``backoff._sync`` (NOT via ``time``), so + patching only ``backoff._sync.time.sleep`` leaves ``elapsed`` pinned at 0 and + ``max_time`` never fires. This fake advances the clock a fixed ``step`` per + ``now()`` call so the wall-time bound is observable in a sub-second test. + A ``step`` of 0 holds the clock flat (elapsed stays 0). + """ + + def __init__(self, step_seconds: float) -> None: + base = datetime_module.datetime(2020, 1, 1) + state = {"calls": 0} + + class _FakeDateTime: + @staticmethod + def now() -> datetime_module.datetime: + offset = datetime_module.timedelta( + seconds=step_seconds * state["calls"] + ) + state["calls"] += 1 + return base + offset + + self.datetime = _FakeDateTime + + +def _make_webhook(mock_header_data) -> WebhookNotification: + from superset.reports.models import ReportRecipients, ReportRecipientType + from superset.reports.notifications.base import NotificationContent + + content = NotificationContent( + name="test alert", header_data=mock_header_data, description="Test description" + ) + return WebhookNotification( + recipient=ReportRecipients( + type=ReportRecipientType.WEBHOOK, + recipient_config_json='{"target": "https://example.com/webhook"}', + ), + content=content, + ) + + +class _MockServerErrorResponse: + status_code = 500 + text = "" + + +def _allow_internal_app() -> type: + class MockCurrentApp: + config = { + "ALERT_REPORTS_WEBHOOK_HTTPS_ONLY": True, + "ALERT_REPORTS_WEBHOOK_ALLOW_INTERNAL_HOSTS": True, + } + + return MockCurrentApp + + +def test_send_backoff_bounded_by_max_time(monkeypatch, mock_header_data) -> None: + """ + A persistently failing (500) target gives up on wall-time (``max_time``), + not just ``max_tries``. With the fake clock stepping +50s per backoff sample, + elapsed crosses ``max_time=120`` between the 2nd and 3rd POST, so exactly 3 + POSTs happen (distinct from ``max_tries=5``). The terminal exception type is + unchanged on giveup. + """ + webhook_notification = _make_webhook(mock_header_data) + post_calls: list[int] = [] + + def fake_post(*args, **kwargs) -> _MockServerErrorResponse: + post_calls.append(1) + return _MockServerErrorResponse() + + monkeypatch.setattr( + "superset.reports.notifications.webhook.current_app", _allow_internal_app() + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled", + lambda flag: True, + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.requests.post", fake_post + ) + monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None) + monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(50)) + + with pytest.raises(NotificationUnprocessableException): + webhook_notification.send() + + assert len(post_calls) == 3 + + +def test_send_flat_clock_falls_back_to_max_tries(monkeypatch, mock_header_data) -> None: + """ + Characterization (NOT a RED discriminator): with the clock held flat, + ``max_time`` can never fire, so ``max_tries=5`` governs and exactly 5 POSTs + happen. Passes on both buggy and fixed code; its job is to prove the 3-vs-5 + delta in ``test_send_backoff_bounded_by_max_time`` is attributable to + wall-time, not to ``max_tries``. + """ + webhook_notification = _make_webhook(mock_header_data) + post_calls: list[int] = [] + + def fake_post(*args, **kwargs) -> _MockServerErrorResponse: + post_calls.append(1) + return _MockServerErrorResponse() + + monkeypatch.setattr( + "superset.reports.notifications.webhook.current_app", _allow_internal_app() + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled", + lambda flag: True, + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.requests.post", fake_post + ) + monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None) + monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(0)) + + with pytest.raises(NotificationUnprocessableException): + webhook_notification.send() + + assert len(post_calls) == 5 + + +def test_send_max_time_does_not_abandon_recovering_target( + monkeypatch, mock_header_data +) -> None: + """ + No-regression guard: a target that fails twice (500) then succeeds on the + 3rd attempt — cumulative elapsed well under ``max_time`` — still succeeds. + Confirms ``max_time=120`` is not set so low that it abandons a target that + recovers within the normal retry window. + """ + webhook_notification = _make_webhook(mock_header_data) + post_calls: list[int] = [] + + class _OkResponse: + status_code = 200 + text = "" + + def fake_post(*args, **kwargs): + post_calls.append(1) + if len(post_calls) < 3: + return _MockServerErrorResponse() + return _OkResponse() + + monkeypatch.setattr( + "superset.reports.notifications.webhook.current_app", _allow_internal_app() + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.feature_flag_manager.is_feature_enabled", + lambda flag: True, + ) + monkeypatch.setattr( + "superset.reports.notifications.webhook.requests.post", fake_post + ) + monkeypatch.setattr("backoff._sync.time.sleep", lambda *a, **k: None) + monkeypatch.setattr("backoff._sync.datetime", _FakeBackoffDatetime(10)) + + webhook_notification.send() + + assert len(post_calls) == 3