Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions superset/commands/report/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import math
from typing import Optional

from flask_babel import lazy_gettext as _

Expand Down Expand Up @@ -200,6 +201,19 @@ class ReportScheduleDataFrameFailedError(CommandException):
message = _("Report Schedule execution failed when generating a dataframe.")


class ReportScheduleExecutorNotFoundError(CommandException):
status = 422

def __init__(self, username: str = "", exception: Optional[Exception] = None):
super().__init__(
_(
"Report Schedule executor user %(username)s was not found.",
username=f'"{username}"' if username else "",
),
exception,
)


class ReportScheduleExecuteUnexpectedError(CommandException):
message = _("Report Schedule execution got an unexpected error.")

Expand Down
76 changes: 58 additions & 18 deletions superset/commands/report/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import logging
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import Any, Optional, Union
from typing import Any, Optional, TYPE_CHECKING, Union
from uuid import UUID

import pandas as pd
Expand All @@ -37,6 +37,7 @@
ReportScheduleDataFrameFailedError,
ReportScheduleDataFrameTimeout,
ReportScheduleExecuteUnexpectedError,
ReportScheduleExecutorNotFoundError,
ReportScheduleNotFoundError,
ReportSchedulePreviousWorkingError,
ReportScheduleScreenshotFailedError,
Expand Down Expand Up @@ -82,9 +83,36 @@
from superset.utils.slack import get_channels_with_search, SlackChannelTypes
from superset.utils.urls import get_url_path

if TYPE_CHECKING:
from flask_appbuilder.security.sqla.models import User

logger = logging.getLogger(__name__)


def resolve_executor_user(model: ReportSchedule) -> tuple["User", str]:
"""
Resolve the executor user for a report schedule.

Determines the configured executor username via ``get_executor`` and looks up
the corresponding user. A deleted/disabled user or a misconfigured
``ALERT_REPORTS_EXECUTORS`` makes ``security_manager.find_user`` return
``None``; rather than passing ``None`` into the webdriver/auth flow (which
fails with an opaque NoneType error), raise a dedicated, actionable error.

:returns: the ``(user, username)`` pair — the username is returned alongside
the user because several call sites log it after resolution.
:raises ReportScheduleExecutorNotFoundError: if the executor user is missing.
"""
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=model,
)
user = security_manager.find_user(username)
if user is None:
raise ReportScheduleExecutorNotFoundError(username)
return user, username


class BaseReportState:
current_states: list[ReportState] = []
initial: bool = False
Expand Down Expand Up @@ -135,9 +163,17 @@ def update_report_schedule_slack_v2(self) -> None:
Update the report schedule type and channels for all slack recipients to v2.
V2 uses ids instead of names for channels.
"""
# Track each recipient mutated in this pass with its original (type,
# config) so a partial failure can revert ALL of them — not just the
# loop variable. Restoring the values to their loaded state keeps the
# persisted rows unchanged regardless of any intervening commit.
mutated: list[tuple[ReportRecipients, ReportRecipientType, str]] = []
try:
for recipient in self._report_schedule.recipients:
if recipient.type == ReportRecipientType.SLACK:
mutated.append(
(recipient, recipient.type, recipient.recipient_config_json)
)
recipient.type = ReportRecipientType.SLACKV2
slack_recipients = json.loads(recipient.recipient_config_json)
# V1 method allowed to use leading `#` in the channel name
Expand Down Expand Up @@ -169,8 +205,15 @@ def update_report_schedule_slack_v2(self) -> None:
}
)
except Exception as ex:
# Revert to v1 to preserve configuration (requires manual fix)
recipient.type = ReportRecipientType.SLACK
# Revert every mutated recipient to v1 (both type AND config) to
# preserve configuration (requires manual fix). Reverting the full
# set — not just the loop variable — keeps earlier recipients
# consistent; iterating the snapshot also avoids the UnboundLocalError
# that a bare loop-variable reference raises on a pre-iteration
# failure (which would mask the real error).
for reverted_recipient, original_type, original_config in mutated:
reverted_recipient.type = original_type
reverted_recipient.recipient_config_json = original_config
msg = f"Failed to update slack recipients to v2: {str(ex)}"
logger.exception(msg)
raise UpdateFailedError(msg) from ex
Expand Down Expand Up @@ -423,11 +466,7 @@ def _get_screenshots(self) -> list[bytes]:
"""
start_time = datetime.utcnow()

_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
user, username = resolve_executor_user(self._report_schedule)

max_width = app.config["ALERT_REPORTS_MAX_CUSTOM_SCREENSHOT_WIDTH"]

Expand Down Expand Up @@ -509,11 +548,7 @@ def _get_pdf(self) -> bytes:
def _get_csv_data(self) -> bytes:
start_time = datetime.utcnow()
url = self._get_url(result_format=ChartDataResultFormat.CSV)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
user, username = resolve_executor_user(self._report_schedule)
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user)

if self._report_schedule.chart.query_context is None:
Expand Down Expand Up @@ -559,11 +594,7 @@ def _get_embedded_data(self) -> pd.DataFrame:
start_time = datetime.utcnow()

url = self._get_url(result_format=ChartDataResultFormat.JSON)
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._report_schedule,
)
user = security_manager.find_user(username)
user, username = resolve_executor_user(self._report_schedule)
auth_cookies = machine_auth_provider_factory.instance.get_auth_cookies(user)

if self._report_schedule.chart.query_context is None:
Expand Down Expand Up @@ -1164,6 +1195,15 @@ def run(self) -> None:
if not self._model:
raise ReportScheduleExecuteUnexpectedError()

# Resolve the executor at the run() boundary the same way master
# does: tolerate a missing user (find_user -> None) so the state
# machine still runs and its error envelope writes the ERROR
# execution-log row and sends the owner notification. The dedicated
# ReportScheduleExecutorNotFoundError guard lives at the content
# sites (_get_screenshots / _get_csv_data / _get_embedded_data),
# which raise inside that envelope. Guarding here instead would
# surface a 422 above the state machine, suppressing both the log
# row and the owner notification.
_, username = get_executor(
executors=app.config["ALERT_REPORTS_EXECUTORS"],
model=self._model,
Expand Down
14 changes: 13 additions & 1 deletion superset/reports/notifications/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,19 @@ def _validate_webhook_url(self, url: str) -> None:
raise NotificationParamException("Webhook URL target host is not allowed.")

@backoff.on_exception(
backoff.expo, NotificationUnprocessableException, factor=10, base=2, max_tries=5
backoff.expo,
NotificationUnprocessableException,
factor=10,
base=2,
max_tries=5,
# Bound total wall-clock retry time. Without this, a hanging or
# persistently failing target can stall a worker for minutes per bad URL
# (up to ~5 socket waits at timeout=60 plus ~150s of retry sleeps),
# starving sequential report dispatch. max_time is checked between
# attempts, so the final in-flight request can still run its full
# timeout; factor is intentionally kept at 10 so legitimately-transient
# 5xx targets are not abandoned early.
max_time=120,
)
@statsd_gauge("reports.webhook.send")
def send(self) -> None:
Expand Down
Loading
Loading