diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 0a530578644d7..91a60cb776f63 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -59,6 +59,62 @@ class BulkDAGRunBody(StrictBaseModel): note: str | None = Field(None, max_length=1000) +class PartitionSelectorMixin(StrictBaseModel): + """Partition filter fields shared by bulk-clear and clearPartitions bodies.""" + + partition_key: str | None = Field( + default=None, + description="Select runs by exact partition key match. Mutually exclusive with the other partition selectors.", + ) + partition_date_start: datetime | None = Field( + default=None, + description=( + "Inclusive start of the partition date window. " + "The value is interpreted in the Dag's timetable timezone. " + "Mutually exclusive with the other partition selectors." + ), + ) + partition_date_end: datetime | None = Field( + default=None, + description=( + "Inclusive end of the partition date window. " + "The value is interpreted in the Dag's timetable timezone. " + "Mutually exclusive with the other partition selectors." + ), + ) + + @property + def has_partition_selectors(self) -> bool: + return ( + self.partition_key is not None + or self.partition_date_start is not None + or self.partition_date_end is not None + ) + + def _validate_partition_date_window_order(self) -> None: + if ( + self.partition_date_start is not None + and self.partition_date_end is not None + and self.partition_date_start > self.partition_date_end + ): + raise ValueError("partition_date_start must be on or before partition_date_end.") + + def _check_exactly_one_selection_mode( + self, *, extra_selector_active: bool, extra_selector_name: str + ) -> None: + has_partition_key = self.partition_key is not None + has_partition_date_window = ( + self.partition_date_start is not None or self.partition_date_end is not None + ) + modes_active = sum([extra_selector_active, has_partition_key, has_partition_date_window]) + if modes_active != 1: + raise ValueError( + f"Exactly one of {extra_selector_name}, partition_key, or a partition date window " + "(partition_date_start / partition_date_end) must be provided." + ) + self._validate_partition_date_window_order() + + class BaseDAGRunClear(StrictBaseModel): """Shared options for the single-run and bulk Dag Run clear endpoints.""" @@ -89,10 +145,18 @@ class DAGRunClearBody(BaseDAGRunClear): """Dag Run serializer for clear endpoint body.""" -class BulkDAGRunClearBody(BaseDAGRunClear): +class BulkDAGRunClearBody(BaseDAGRunClear, PartitionSelectorMixin): """Request body for the bulk clear Dag Runs endpoint.""" - dag_runs: list[BulkDAGRunBody] = Field(min_length=1) + dag_runs: list[BulkDAGRunBody] = Field(default_factory=list) + + @model_validator(mode="after") + def validate_exactly_one_selection_mode(self) -> BulkDAGRunClearBody: + self._check_exactly_one_selection_mode( + extra_selector_active=bool(self.dag_runs), + extra_selector_name="dag_runs (non-empty)", + ) + return self class DAGRunResponse(BaseModel): @@ -242,3 +306,36 @@ class DAGRunsBatchBody(StrictBaseModel): duration_lt: float | None = None conf_contains: str | None = None + + +class ClearPartitionsBody(PartitionSelectorMixin): + """Request body for the clearPartitions endpoint (column-reset: set partition fields to None).""" + + run_id: str | None = Field( + default=None, + description="Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window.", + ) + clear_task_instances: bool = Field( + default=False, + description="Also clear task instances on the matched runs.", + ) + dry_run: bool = Field( + default=True, + description="If True, compute counts without writing any changes.", + ) + + @model_validator(mode="after") + def validate_exactly_one_selector(self) -> ClearPartitionsBody: + self._check_exactly_one_selection_mode( + extra_selector_active=self.run_id is not None, + extra_selector_name="run_id", + ) + return self + + +class ClearPartitionsResponse(BaseModel): + """Response for the clearPartitions endpoint.""" + + dag_runs_cleared: int + task_instances_cleared: int + dry_run: bool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 639f6fb990461..ea6f0a70fcd0a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -3143,6 +3143,66 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/dags/{dag_id}/clearPartitions: + post: + tags: + - DagRun + summary: Clear Dag Run Partitions + description: Reset partition_key and partition_date fields on matching Dag Runs. + operationId: clear_dag_run_partitions + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ClearPartitionsBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/ClearPartitionsResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dagSources/{dag_id}: get: tags: @@ -12170,6 +12230,31 @@ components: description: Request body for bulk operations on Dag Runs. BulkDAGRunClearBody: properties: + partition_key: + anyOf: + - type: string + - type: 'null' + title: Partition Key + description: Select runs by exact partition key match. Mutually exclusive + with the other partition selectors. + partition_date_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Partition Date Start + description: Inclusive start of the partition date window. The value is + interpreted in the Dag's timetable timezone. Mutually exclusive with the + other partition selectors. + partition_date_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Partition Date End + description: Inclusive end of the partition date window. The value is interpreted + in the Dag's timetable timezone. Mutually exclusive with the other partition + selectors. dry_run: type: boolean title: Dry Run @@ -12203,12 +12288,9 @@ components: items: $ref: '#/components/schemas/BulkDAGRunBody' type: array - minItems: 1 title: Dag Runs additionalProperties: false type: object - required: - - dag_runs title: BulkDAGRunClearBody description: Request body for the bulk clear Dag Runs endpoint. BulkDeleteAction_BulkDAGRunBody_: @@ -12576,6 +12658,73 @@ components: - action - entities title: BulkUpdateAction[VariableBody] + ClearPartitionsBody: + properties: + partition_key: + anyOf: + - type: string + - type: 'null' + title: Partition Key + description: Select runs by exact partition key match. Mutually exclusive + with the other partition selectors. + partition_date_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Partition Date Start + description: Inclusive start of the partition date window. The value is + interpreted in the Dag's timetable timezone. Mutually exclusive with the + other partition selectors. + partition_date_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Partition Date End + description: Inclusive end of the partition date window. The value is interpreted + in the Dag's timetable timezone. Mutually exclusive with the other partition + selectors. + run_id: + anyOf: + - type: string + - type: 'null' + title: Run Id + description: Select runs by exact run_id. Mutually exclusive with ``partition_key`` + and partition date window. + clear_task_instances: + type: boolean + title: Clear Task Instances + description: Also clear task instances on the matched runs. + default: false + dry_run: + type: boolean + title: Dry Run + description: If True, compute counts without writing any changes. + default: true + additionalProperties: false + type: object + title: ClearPartitionsBody + description: 'Request body for the clearPartitions endpoint (column-reset: set + partition fields to None).' + ClearPartitionsResponse: + properties: + dag_runs_cleared: + type: integer + title: Dag Runs Cleared + task_instances_cleared: + type: integer + title: Task Instances Cleared + dry_run: + type: boolean + title: Dry Run + type: object + required: + - dag_runs_cleared + - task_instances_cleared + - dry_run + title: ClearPartitionsResponse + description: Response for the clearPartitions endpoint. ClearTaskInstanceCollectionResponse: properties: task_instances: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 30ae090289c75..889251bf81e99 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -73,6 +73,8 @@ from airflow.api_fastapi.core_api.datamodels.dag_run import ( BulkDAGRunBody, BulkDAGRunClearBody, + ClearPartitionsBody, + ClearPartitionsResponse, DAGRunClearBody, DAGRunCollectionResponse, DagRunMutableStates, @@ -98,6 +100,7 @@ from airflow.api_fastapi.core_api.services.public.dag_run import ( BulkDagRunService, DagRunWaiter, + clear_partition_fields, dry_run_clear_dag_run, get_dag_run_and_dag_for_clear, patch_dag_run_note, @@ -349,26 +352,53 @@ def clear_dag_runs( """Clear multiple Dag Runs in a single request.""" url_dag_id_is_wildcard = dag_id == "~" - # No ordered set type in Python, using a dict with throwaway values as replacement. - runs_to_clear: dict[tuple[str, str], None] = {} - for run in body.dag_runs: + partition_mode = not body.dag_runs and body.has_partition_selectors + + if partition_mode: if url_dag_id_is_wildcard: - if not run.dag_id or run.dag_id == "~": - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - f"When the URL dag_id is '~', every entry must provide a concrete dag_id " - f"(missing on dag_run_id: {run.dag_run_id!r}).", - ) - run_to_clear = (run.dag_id, run.dag_run_id) + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Partition selectors require a concrete dag_id; '~' is not supported.", + ) + dag = get_latest_version_of_dag(dag_bag, dag_id, session) + + stmt = select(DagRun.run_id).where(DagRun.dag_id == dag_id) + if body.partition_key is not None: + stmt = stmt.where(DagRun.partition_key == body.partition_key) else: - entity_dag_id = run.dag_id or dag_id - if entity_dag_id != dag_id: - raise HTTPException( - status.HTTP_400_BAD_REQUEST, - f"Entry dag_id {entity_dag_id!r} does not match the URL dag_id {dag_id!r}.", - ) - run_to_clear = (dag_id, run.dag_run_id) - runs_to_clear[run_to_clear] = None + stmt = stmt.where(DagRun.partition_date.is_not(None)) + stmt = DagRun.apply_partition_date_window( + stmt, + timetable=dag.timetable, + start=body.partition_date_start, + end=body.partition_date_end, + ) + stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id) + + runs_to_clear: dict[tuple[str, str], None] = { + (dag_id, run_id): None for run_id in session.scalars(stmt) + } + else: + # No ordered set type in Python, using a dict with throwaway values as replacement. + runs_to_clear = {} + for run in body.dag_runs: + if url_dag_id_is_wildcard: + if not run.dag_id or run.dag_id == "~": + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"When the URL dag_id is '~', every entry must provide a concrete dag_id " + f"(missing on dag_run_id: {run.dag_run_id!r}).", + ) + run_to_clear = (run.dag_id, run.dag_run_id) + else: + entity_dag_id = run.dag_id or dag_id + if entity_dag_id != dag_id: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"Entry dag_id {entity_dag_id!r} does not match the URL dag_id {dag_id!r}.", + ) + run_to_clear = (dag_id, run.dag_run_id) + runs_to_clear[run_to_clear] = None if body.dry_run: affected: list[TaskInstanceResponse | NewTaskResponse] = [] @@ -415,6 +445,35 @@ def clear_dag_runs( ) +@dag_run_at_dag_router.post( + "/clearPartitions", + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), + dependencies=[ + Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), + Depends(action_logging()), + ], +) +def clear_dag_run_partitions( + dag_id: str, + body: ClearPartitionsBody, + dag_bag: DagBagDep, + session: SessionDep, +) -> ClearPartitionsResponse: + """Reset partition_key and partition_date fields on matching Dag Runs.""" + dag = get_latest_version_of_dag(dag_bag, dag_id, session) + dag_runs_cleared, task_instances_cleared = clear_partition_fields( + dag=dag, + body=body, + dag_id=dag_id, + session=session, + ) + return ClearPartitionsResponse( + dag_runs_cleared=dag_runs_cleared, + task_instances_cleared=task_instances_cleared, + dry_run=body.dry_run, + ) + + @dag_run_router.get( "", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py b/airflow-core/src/airflow/api_fastapi/core_api/security.py index 76f2922030924..724a6967f7499 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/security.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py @@ -823,6 +823,10 @@ def inner( continue entity_methods.append((entity_dag_id, "PUT")) + if not body.dag_runs and body.has_partition_selectors: + if dag_id and dag_id != "~": + entity_methods.append((dag_id, "PUT")) + requests = _build_dag_run_access_requests(entity_methods) _requires_access( is_authorized_callback=lambda: get_auth_manager().batch_is_authorized_dag( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py index 856bc1710b4e0..6d43d18cd3413 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py @@ -50,11 +50,15 @@ BulkDeleteAction, BulkUpdateAction, ) -from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, DagRunMutableStates +from airflow.api_fastapi.core_api.datamodels.dag_run import ( + BulkDAGRunBody, + ClearPartitionsBody, + DagRunMutableStates, +) from airflow.api_fastapi.core_api.datamodels.task_instances import NewTaskResponse from airflow.api_fastapi.core_api.services.public.common import BulkService from airflow.listeners.listener import get_listener_manager -from airflow.models.dagrun import DagRun +from airflow.models.dagrun import DagRun, clear_partition_runs from airflow.models.taskinstance import TaskInstance from airflow.models.xcom import XCOM_RETURN_KEY, XComModel from airflow.utils.session import create_session_async @@ -155,6 +159,31 @@ def perform_clear_dag_run( return dag_run_cleared +def clear_partition_fields( + *, + dag: SerializedDAG, + body: ClearPartitionsBody, + dag_id: str, + session: Session, +) -> tuple[int, int]: + """ + Reset partition_key and partition_date to None on matching runs. + + Returns (dag_runs_cleared, task_instances_cleared). + """ + return clear_partition_runs( + dag=dag, + dag_id=dag_id, + run_id=body.run_id, + partition_key=body.partition_key, + partition_date_start=body.partition_date_start, + partition_date_end=body.partition_date_end, + clear_tis=body.clear_task_instances, + dry_run=body.dry_run, + session=session, + ) + + def patch_dag_run_state( *, dag: SerializedDAG, diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index e26f9737d8f68..44a9fd47c23b5 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -188,9 +188,9 @@ def string_lower_type(val): ARG_PARTITION_DATE_START = Arg( ("--partition-date-start",), help=( - "Inclusive lower bound of the partition_date window. Matched at local calendar-day " - "granularity: the start of the given local calendar day in the Dag's timetable timezone " - "(any time-of-day component is ignored). " + "Inclusive lower bound of the partition_date window. The wall-clock value is " + "re-interpreted in the Dag's timetable timezone. " + "A date-only value (no time) is treated as local midnight. " "Accepts the same datetime formats as --start-date." ), type=parsedate, @@ -198,9 +198,9 @@ def string_lower_type(val): ARG_PARTITION_DATE_END = Arg( ("--partition-date-end",), help=( - "Inclusive upper bound of the partition_date window. Matched at local calendar-day " - "granularity: all runs whose partition_date falls on the given local calendar day in the " - "Dag's timetable timezone are included (any time-of-day component is ignored). " + "Inclusive upper bound of the partition_date window. The wall-clock value is " + "re-interpreted in the Dag's timetable timezone. " + "A date-only value (no time) is treated as local midnight. " "Accepts the same datetime formats as --end-date." ), type=parsedate, @@ -1205,8 +1205,8 @@ class GroupCommand(NamedTuple): "Clear Dag runs of the given dag_id and re-queue them for reprocessing. Exactly one " "of the following selectors must be provided: --run-id (single run); --partition-key " "(every run with that exact partition_key); or a partition_date window via " - "--partition-date-start and/or --partition-date-end (both bounds are inclusive local " - "calendar days, anchored in the Dag's timetable timezone). " + "--partition-date-start and/or --partition-date-end (both bounds are inclusive, " + "interpreted in the Dag's timetable timezone). " "Intended for partitioned Dags, whose runs are keyed by partition_date / " "partition_key instead of logical_date. For traditional, non-partitioned Dags, use " "`airflow tasks clear --start-date / --end-date`." diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py b/airflow-core/src/airflow/cli/commands/dag_command.py index 580a61a1b4f49..39bdc7a04708b 100644 --- a/airflow-core/src/airflow/cli/commands/dag_command.py +++ b/airflow-core/src/airflow/cli/commands/dag_command.py @@ -127,12 +127,11 @@ def dag_clear(args, *, session: Session = NEW_SESSION) -> None: """ Clear Dag runs selected by run_id, partition_key, or a partition_date window. - When a partition_date window is given, both bounds are **day-granular** and - anchored in the timetable's timezone for tz-aware partitioned timetables. - --partition-date-start is the inclusive start local calendar day; - --partition-date-end is the inclusive end local calendar day (any - time-of-day or timezone-offset component in either value is ignored; only - the calendar date is used). + When a partition_date window is given, both bounds are interpreted in the + timetable's local timezone. + --partition-date-start is the inclusive start; --partition-date-end is the + inclusive end. A date-only value (no time component) is treated as local + midnight of that date. """ has_range = args.partition_date_start is not None or args.partition_date_end is not None selectors_used = sum([args.run_id is not None, args.partition_key is not None, has_range]) @@ -164,14 +163,12 @@ def dag_clear(args, *, session: Session = NEW_SESSION) -> None: query = query.where(DagRun.partition_key == args.partition_key) else: query = query.where(DagRun.partition_date.is_not(None)) - if args.partition_date_start is not None: - lower = dag.timetable.resolve_day_bound(args.partition_date_start.date()) - query = query.where(DagRun.partition_date >= lower) - if args.partition_date_end is not None: - upper = dag.timetable.resolve_day_bound( - args.partition_date_end.date() + datetime.timedelta(days=1) - ) - query = query.where(DagRun.partition_date < upper) + query = DagRun.apply_partition_date_window( + query, + timetable=dag.timetable, + start=args.partition_date_start, + end=args.partition_date_end, + ) query = query.order_by(DagRun.partition_date, DagRun.run_id) runs = list(session.execute(query).all()) diff --git a/airflow-core/src/airflow/cli/commands/partition_command.py b/airflow-core/src/airflow/cli/commands/partition_command.py index e55ba02f6652e..c948e7f4b16eb 100644 --- a/airflow-core/src/airflow/cli/commands/partition_command.py +++ b/airflow-core/src/airflow/cli/commands/partition_command.py @@ -18,14 +18,10 @@ from __future__ import annotations -import datetime from typing import TYPE_CHECKING -from sqlalchemy import or_, select - from airflow._shared.timezones.timezone import parse as parsedate -from airflow.models.dagrun import DagRun -from airflow.models.taskinstance import TaskInstance, clear_task_instances +from airflow.models.dagrun import DagRun, clear_partition_runs from airflow.utils import cli as cli_utils from airflow.utils.cli import get_db_dag from airflow.utils.providers_configuration_loader import providers_configuration_loaded @@ -34,37 +30,6 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session -TI_CHUNK_SIZE = 500 - - -def _flush_buffer( - buffer: list[str], - carry: list[TaskInstance], - session: Session, - *, - drain: bool = False, -) -> int: - """ - Fetch TIs for buffered run_ids, extend carry, send full TI_CHUNK_SIZE slices. - - If drain=True, also send the final partial slice (used at end of run). - Returns the total number of TIs sent to clear_task_instances by this call. - """ - flushed = 0 - if buffer: - chunk_tis = list(session.scalars(select(TaskInstance).where(TaskInstance.run_id.in_(buffer)))) - buffer.clear() - carry.extend(chunk_tis) - while len(carry) >= TI_CHUNK_SIZE: - slice_tis = carry[:TI_CHUNK_SIZE] - del carry[:TI_CHUNK_SIZE] - clear_task_instances(slice_tis, session=session) - flushed += len(slice_tis) - if drain and carry: - clear_task_instances(carry, session=session) - flushed += len(carry) - return flushed - @cli_utils.action_cli @providers_configuration_loaded @@ -73,11 +38,11 @@ def clear(args, *, session: Session = NEW_SESSION) -> None: """ Clear the partition_key and partition_date of matching DagRuns. - When a partition_date window is given, both bounds are **day-granular** and - anchored in the timetable's timezone for tz-aware partitioned timetables. - --start-date is the inclusive start local calendar day; --end-date is the - inclusive end local calendar day (any time-of-day or timezone-offset - component in either value is ignored; only the calendar date is used). + When a partition_date window is given, both bounds are inclusive and their + wall-clock value is re-interpreted in the Dag's timetable timezone. The time + component is honoured, so sub-day windows on sub-daily schedules select only + the matching partitions; a date-only value (no time) is treated as local + midnight. """ has_range = args.start_date is not None or args.end_date is not None or args.date is not None selectors_used = sum([args.run_id is not None, args.partition_key is not None, has_range]) @@ -100,83 +65,53 @@ def clear(args, *, session: Session = NEW_SESSION) -> None: except ValueError: raise SystemExit("--date sides must be parseable as a date or datetime.") - stmt = select(DagRun).where(DagRun.dag_id == args.dag_id) - if args.run_id: - stmt = stmt.where(DagRun.run_id == args.run_id) - elif args.partition_key is not None: - stmt = stmt.where(DagRun.partition_key == args.partition_key) - else: - stmt = stmt.where(or_(DagRun.partition_key.is_not(None), DagRun.partition_date.is_not(None))) - if args.start_date is not None or args.end_date is not None: - dag = get_db_dag(bundle_names=None, dag_id=args.dag_id) - if args.start_date is not None: - lower = dag.timetable.resolve_day_bound(args.start_date.date()) - stmt = stmt.where(DagRun.partition_date >= lower) - if args.end_date is not None: - upper = dag.timetable.resolve_day_bound(args.end_date.date() + datetime.timedelta(days=1)) - stmt = stmt.where(DagRun.partition_date < upper) - stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id) - + has_date_window = args.start_date is not None or args.end_date is not None + dag = get_db_dag(bundle_names=None, dag_id=args.dag_id) if has_date_window else None clear_tis = bool(args.clear_task_instances) - cleared = 0 - processed_any = False - # For --clear-task-instances: run_ids are buffered so that TIs are fetched with a single - # SELECT IN per chunk (avoiding N+1). The fetched TIs are then flushed to - # clear_task_instances in slices of TI_CHUNK_SIZE, batched by TI count rather than - # DagRun count. Any leftover TIs that do not fill a full slice are carried forward and - # combined with the next SELECT's results before the next set of slices is cut. - ti_buffer_run_ids: list[str] = [] - ti_carry: list[TaskInstance] = [] - tis_cleared_total = 0 - runs_for_ti_total = 0 - tis_dry_total = 0 - runs_for_ti_dry = 0 + processed_any = False + ti_run_count = 0 - for run in session.scalars(stmt).yield_per(100): + def _print_run(run: DagRun, had_partition_fields: bool) -> None: + nonlocal processed_any, ti_run_count processed_any = True - fields_already_cleared = run.partition_key is None and run.partition_date is None - if fields_already_cleared and not clear_tis: - print(f"DagRun {run.run_id}: already cleared, skipping.") - continue - if not fields_already_cleared: + if clear_tis: + ti_run_count += 1 + if had_partition_fields: print( f"DagRun {run.run_id}: " f"partition_key={run.partition_key!r} -> None, " f"partition_date={run.partition_date.isoformat() if run.partition_date else None} -> None" ) - if not args.dry_run: - run.partition_key = None - run.partition_date = None - cleared += 1 - if clear_tis: - if args.dry_run: - run_tis = session.scalars(select(TaskInstance).where(TaskInstance.run_id == run.run_id)).all() - tis_dry_total += len(run_tis) - runs_for_ti_dry += 1 - else: - ti_buffer_run_ids.append(run.run_id) - runs_for_ti_total += 1 - if len(ti_buffer_run_ids) >= TI_CHUNK_SIZE: - tis_cleared_total += _flush_buffer(ti_buffer_run_ids, ti_carry, session) + elif not clear_tis: + print(f"DagRun {run.run_id}: already cleared, skipping.") + + cleared, tis_cleared_total = clear_partition_runs( + dag=dag, + dag_id=args.dag_id, + run_id=args.run_id, + partition_key=args.partition_key, + partition_date_start=args.start_date, + partition_date_end=args.end_date, + clear_tis=clear_tis, + dry_run=args.dry_run, + session=session, + on_run_matched=_print_run, + ) if not processed_any: print(f"No matching DagRuns found for dag_id={args.dag_id}.") return - # Flush the tail: fetch any remaining buffered run_ids, combine with carry, then - # cut full slices and send the final partial slice. if clear_tis: if args.dry_run: print( - f"Dry run: would clear task instances on {runs_for_ti_dry} " - f"DagRun(s) ({tis_dry_total} task instance(s))." + f"Dry run: would clear task instances on {ti_run_count} " + f"DagRun(s) ({tis_cleared_total} task instance(s))." ) else: - tis_cleared_total += _flush_buffer(ti_buffer_run_ids, ti_carry, session, drain=True) print( - f"Cleared task instances on {runs_for_ti_total} " - f"DagRun(s) ({tis_cleared_total} task instance(s))." + f"Cleared task instances on {ti_run_count} DagRun(s) ({tis_cleared_total} task instance(s))." ) if args.dry_run: diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 47de1190b3775..cb347e17f6df6 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -77,7 +77,7 @@ from airflow.models.backfill import Backfill from airflow.models.base import Base, StringID from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel -from airflow.models.taskinstance import TaskInstance as TI +from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH from airflow.models.tasklog import LogTemplate from airflow.models.taskmap import TaskMap @@ -108,6 +108,7 @@ from sqlalchemy.engine import ScalarResult from sqlalchemy.orm import Session from sqlalchemy.sql.elements import Case, ColumnElement + from sqlalchemy.sql.selectable import Select from airflow.api_fastapi.execution_api.datamodels.taskinstance import DagRun as DRDataModel from airflow.models.dag_version import DagVersion @@ -115,6 +116,7 @@ from airflow.sdk import DAG as SDKDAG from airflow.serialization.definitions.dag import SerializedDAG from airflow.serialization.definitions.mappedoperator import Operator + from airflow.timetables.base import Timetable CreatedTasks = TypeVar("CreatedTasks", Iterator["dict[str, Any]"], Iterator[TI]) AttributeValueType: TypeAlias = ( @@ -2194,6 +2196,23 @@ def _get_log_template(log_template_id: int | None, *, session: Session = NEW_SES def _get_partial_task_ids(dag: SerializedDAG | None) -> list[str] | None: return dag.task_ids if dag and dag.partial else None + @staticmethod + def apply_partition_date_window( + stmt: Select, + *, + timetable: Timetable, + start: datetime | None, + end: datetime | None, + ) -> Select: + """Filter stmt to the inclusive interval [lower, upper] on partition_date.""" + if start is not None: + lower = timetable.localize_partition_datetime(start) + stmt = stmt.where(DagRun.partition_date >= lower) + if end is not None: + upper = timetable.localize_partition_datetime(end) + stmt = stmt.where(DagRun.partition_date <= upper) + return stmt + class DagRunNote(Base): """For storage of arbitrary notes concerning the dagrun instance.""" @@ -2231,6 +2250,130 @@ def __repr__(self): return prefix + ">" +_TI_CHUNK_SIZE = 500 + + +def clear_partition_runs( + *, + dag: SerializedDAG | None, + dag_id: str, + run_id: str | None, + partition_key: str | None, + partition_date_start: datetime | None, + partition_date_end: datetime | None, + clear_tis: bool, + dry_run: bool, + session: Session, + on_run_matched: Callable[[DagRun, bool], None] | None = None, +) -> tuple[int, int]: + """ + Reset partition_key and partition_date to None on matching Dag runs. + + Selector priority: run_id → partition_key → date-window (datetime-precision, localized + through the timetable timezone). Runs already cleared (both fields None) are skipped unless + clear_tis is True. + + The date-window selector requires a non-None ``dag`` to resolve the timetable timezone. + The run_id and partition_key selectors do not use ``dag`` and accept ``None``. + + Returns (dag_runs_cleared, task_instances_cleared). + + ``on_run_matched`` is called once per matched run, before any mutation, with + ``(run, had_partition_fields)`` where ``had_partition_fields`` is True when the run's + partition fields are non-None at the time of the call (i.e. they will be reset). + + :meta private: + """ + stmt = select(DagRun).where(DagRun.dag_id == dag_id) + if run_id is not None: + stmt = stmt.where(DagRun.run_id == run_id) + elif partition_key is not None: + stmt = stmt.where(DagRun.partition_key == partition_key) + else: + if dag is None: + raise ValueError( + "The date-window selector requires a loaded Dag to resolve the timetable timezone; " + "dag must not be None when partition_date_start or partition_date_end is used." + ) + stmt = stmt.where(or_(DagRun.partition_key.is_not(None), DagRun.partition_date.is_not(None))) + stmt = DagRun.apply_partition_date_window( + stmt, + timetable=dag.timetable, + start=partition_date_start, + end=partition_date_end, + ) + stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id) + + dag_runs_cleared = 0 + ti_buffer_run_ids: list[str] = [] + ti_carry: list[TI] = [] + tis_cleared_total = 0 + dry_run_matched_ids: list[str] = [] + + def _flush_ti_buffer(*, drain: bool = False) -> int: + flushed = 0 + if ti_buffer_run_ids: + chunk_tis = list( + session.scalars( + select(TI).where( + TI.dag_id == dag_id, + TI.run_id.in_(ti_buffer_run_ids), + ) + ) + ) + ti_buffer_run_ids.clear() + ti_carry.extend(chunk_tis) + while len(ti_carry) >= _TI_CHUNK_SIZE: + slice_tis = ti_carry[:_TI_CHUNK_SIZE] + del ti_carry[:_TI_CHUNK_SIZE] + clear_task_instances(slice_tis, session=session) + flushed += len(slice_tis) + if drain and ti_carry: + clear_task_instances(ti_carry, session=session) + flushed += len(ti_carry) + return flushed + + for run in session.scalars(stmt).yield_per(100): + fields_already_cleared = run.partition_key is None and run.partition_date is None + if on_run_matched is not None: + on_run_matched(run, not fields_already_cleared) + if fields_already_cleared and not clear_tis: + continue + if not fields_already_cleared: + if not dry_run: + run.partition_key = None + run.partition_date = None + dag_runs_cleared += 1 + if clear_tis: + if dry_run: + dry_run_matched_ids.append(run.run_id) + else: + ti_buffer_run_ids.append(run.run_id) + if len(ti_buffer_run_ids) >= _TI_CHUNK_SIZE: + tis_cleared_total += _flush_ti_buffer() + + if clear_tis and not dry_run: + tis_cleared_total += _flush_ti_buffer(drain=True) + + if dry_run and clear_tis: + if dry_run_matched_ids: + for i in range(0, len(dry_run_matched_ids), _TI_CHUNK_SIZE): + chunk = dry_run_matched_ids[i : i + _TI_CHUNK_SIZE] + tis_cleared_total += ( + session.scalar( + select(func.count()) + .select_from(TI) + .where( + TI.dag_id == dag_id, + TI.run_id.in_(chunk), + ) + ) + or 0 + ) + + return dag_runs_cleared, tis_cleared_total + + def get_or_create_dagrun( *, dag: SerializedDAG, diff --git a/airflow-core/src/airflow/timetables/_cron.py b/airflow-core/src/airflow/timetables/_cron.py index 3d2a1b1021326..db9950afb33bf 100644 --- a/airflow-core/src/airflow/timetables/_cron.py +++ b/airflow-core/src/airflow/timetables/_cron.py @@ -143,15 +143,15 @@ def validate(self) -> None: except (CroniterBadCronError, CroniterBadDateError) as e: raise AirflowTimetableInvalid(str(e)) - def resolve_day_bound(self, day: datetime.date) -> DateTime: + def localize_partition_datetime(self, dt: datetime.datetime) -> DateTime: """ - Return the UTC instant of *day*'s local midnight in this timetable's timezone. + Re-interpret *dt*'s wall-clock reading as a moment in this timetable's timezone. - Overrides the base default (midnight UTC) so day-bound comparisons are + Overrides the base (UTC pass-through) so partition-date filter bounds are evaluated in the timetable's local timezone rather than at the raw UTC - instant. + instant, while preserving sub-day precision. """ - return convert_to_utc(make_aware(datetime.datetime(day.year, day.month, day.day), self._timezone)) + return convert_to_utc(make_aware(dt.replace(tzinfo=None), self._timezone)) def _get_next(self, current: DateTime) -> DateTime: """Get the first schedule after specified time, with DST fixed.""" diff --git a/airflow-core/src/airflow/timetables/base.py b/airflow-core/src/airflow/timetables/base.py index 34c8366d0836e..4bee4ab38a368 100644 --- a/airflow-core/src/airflow/timetables/base.py +++ b/airflow-core/src/airflow/timetables/base.py @@ -282,20 +282,22 @@ def iter_partition_dagrun_infos( msg = f"{type(self).__name__} is not partitioned" raise NotImplementedError(msg) - def resolve_day_bound(self, day: datetime.date) -> DateTime: + def localize_partition_datetime(self, dt: datetime.datetime) -> DateTime: """ - Return the UTC instant of *day*'s start (midnight). + Re-interpret *dt*'s wall-clock reading as a moment in this timetable's timezone. - By default a calendar day starts at midnight UTC. Timetables with a local - timezone (e.g. :class:`~airflow.timetables._cron.CronMixin` subclasses) - override this to anchor at local midnight in their timezone, converted to - UTC. Callers pass *day* for an inclusive lower bound and - ``day + timedelta(days=1)`` for a half-open upper bound (e.g. ``dag_clear`` - uses it to bound ``partition_date`` queries). + The base implementation treats the timetable as UTC: the wall-clock is kept + as-is and the result is simply a timezone-aware UTC instant (a no-op for + already-UTC inputs). Timetables with a local timezone (e.g. + :class:`~airflow.timetables._cron.CronMixin` subclasses) override this to + re-localize the wall-clock to their own timezone before converting to UTC, + preserving sub-day precision for narrow windows on sub-daily schedules. + + Used by :meth:`~airflow.models.dagrun.DagRun.apply_partition_date_window` to + convert user-supplied ``partition_date`` filter bounds without truncating the + time component. """ - return timezone.coerce_datetime( - datetime.datetime(day.year, day.month, day.day, tzinfo=datetime.timezone.utc) - ) + return timezone.coerce_datetime(dt) def resolve_partition_date(self, partition_key: str | None) -> datetime.datetime | None: """ diff --git a/airflow-core/src/airflow/timetables/trigger.py b/airflow-core/src/airflow/timetables/trigger.py index 160085a8c4b62..af3f1c23c4cf1 100644 --- a/airflow-core/src/airflow/timetables/trigger.py +++ b/airflow-core/src/airflow/timetables/trigger.py @@ -28,7 +28,6 @@ from airflow._shared.timezones.timezone import ( coerce_datetime, - convert_to_utc, make_aware, parse_timezone, utcnow, @@ -468,24 +467,6 @@ def _get_partition_info(self, run_date: DateTime) -> tuple[DateTime, str]: partition_key = self._format_key(partition_date) return partition_date, partition_key - def _localize_wall_clock_to_timetable_timezone(self, dt: datetime.datetime) -> DateTime: - """ - Re-interpret *dt*'s wall-clock reading as a moment in this timetable's timezone. - - The production backfill path attaches ``default_timezone`` (UTC) to the - bounds it passes in, but the user's intent is the same wall-clock time in - the timetable's local timezone. This method extracts the naive - year/month/day/hour/minute/second from *dt* (regardless of whatever - tzinfo it currently carries) and re-localises that wall-clock to - ``self._timezone``, returning a UTC instant. - - Crucially it preserves sub-day precision — the hour, minute, and second - are kept intact — so a narrow hourly window is not widened to a whole day. - """ - aware = coerce_datetime(dt) - naive = aware.replace(tzinfo=None) - return convert_to_utc(make_aware(naive, self._timezone)) - def iter_partition_dagrun_infos( self, *, @@ -525,8 +506,8 @@ def iter_partition_dagrun_infos( Both bounds must be timezone-aware; a naive datetime is coerced to UTC before the wall-clock localization step. """ - current = self._align_to_next(self._localize_wall_clock_to_timetable_timezone(earliest)) - latest_dt = self._localize_wall_clock_to_timetable_timezone(latest) + current = self._align_to_next(self.localize_partition_datetime(earliest)) + latest_dt = self.localize_partition_datetime(latest) while current <= latest_dt: partition_key = self._format_key(current) yield DagRunInfo( diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index ff01113fe4b7a..b72cc90eb300b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -1047,6 +1047,7 @@ export type DagRunServiceTriggerDagRunMutationResult = Awaited>; export type DagRunServiceGetListDagRunsBatchMutationResult = Awaited>; export type DagRunServiceClearDagRunsMutationResult = Awaited>; +export type DagRunServiceClearDagRunPartitionsMutationResult = Awaited>; export type DagServiceFavoriteDagMutationResult = Awaited>; export type DagServiceUnfavoriteDagMutationResult = Awaited>; export type TaskInstanceServiceGetTaskInstancesBatchMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 07b70162f2725..37c04b705eeb9 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2,7 +2,7 @@ import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; import { AssetService, AssetStateStoreService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TaskStateStoreService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; -import { AssetStateStoreBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, BulkDAGRunClearBody, ClearTaskInstancesBody, ConnectionBody, ConnectionTestRequestBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateStoreBody, TaskStateStorePatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; +import { AssetStateStoreBody, BackfillPostBody, BulkBody_BulkDAGRunBody_, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, BulkDAGRunClearBody, ClearPartitionsBody, ClearTaskInstancesBody, ConnectionBody, ConnectionTestRequestBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateStoreBody, TaskStateStorePatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** * Get Assets @@ -2254,6 +2254,22 @@ export const useDagRunServiceClearDagRuns = ({ mutationFn: ({ dagId, requestBody }) => DagRunService.clearDagRuns({ dagId, requestBody }) as unknown as Promise, ...options }); /** +* Clear Dag Run Partitions +* Reset partition_key and partition_date fields on matching Dag Runs. +* @param data The data for the request. +* @param data.dagId +* @param data.requestBody +* @returns ClearPartitionsResponse Successful Response +* @throws ApiError +*/ +export const useDagRunServiceClearDagRunPartitions = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, requestBody }) => DagRunService.clearDagRunPartitions({ dagId, requestBody }) as unknown as Promise, ...options }); +/** * Favorite Dag * Mark the Dag as favorite. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index e756263238bc5..f4c8c63a6b993 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1137,6 +1137,44 @@ export const $BulkDAGRunBody = { export const $BulkDAGRunClearBody = { properties: { + partition_key: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Partition Key', + description: 'Select runs by exact partition key match. Mutually exclusive with the other partition selectors.' + }, + partition_date_start: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Partition Date Start', + description: "Inclusive start of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors." + }, + partition_date_end: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Partition Date End', + description: "Inclusive end of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors." + }, dry_run: { type: 'boolean', title: 'Dry Run', @@ -1182,13 +1220,11 @@ export const $BulkDAGRunClearBody = { '$ref': '#/components/schemas/BulkDAGRunBody' }, type: 'array', - minItems: 1, title: 'Dag Runs' } }, additionalProperties: false, type: 'object', - required: ['dag_runs'], title: 'BulkDAGRunClearBody', description: 'Request body for the bulk clear Dag Runs endpoint.' } as const; @@ -1707,6 +1743,98 @@ export const $BulkUpdateAction_VariableBody_ = { title: 'BulkUpdateAction[VariableBody]' } as const; +export const $ClearPartitionsBody = { + properties: { + partition_key: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Partition Key', + description: 'Select runs by exact partition key match. Mutually exclusive with the other partition selectors.' + }, + partition_date_start: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Partition Date Start', + description: "Inclusive start of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors." + }, + partition_date_end: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Partition Date End', + description: "Inclusive end of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors." + }, + run_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Run Id', + description: 'Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window.' + }, + clear_task_instances: { + type: 'boolean', + title: 'Clear Task Instances', + description: 'Also clear task instances on the matched runs.', + default: false + }, + dry_run: { + type: 'boolean', + title: 'Dry Run', + description: 'If True, compute counts without writing any changes.', + default: true + } + }, + additionalProperties: false, + type: 'object', + title: 'ClearPartitionsBody', + description: 'Request body for the clearPartitions endpoint (column-reset: set partition fields to None).' +} as const; + +export const $ClearPartitionsResponse = { + properties: { + dag_runs_cleared: { + type: 'integer', + title: 'Dag Runs Cleared' + }, + task_instances_cleared: { + type: 'integer', + title: 'Task Instances Cleared' + }, + dry_run: { + type: 'boolean', + title: 'Dry Run' + } + }, + type: 'object', + required: ['dag_runs_cleared', 'task_instances_cleared', 'dry_run'], + title: 'ClearPartitionsResponse', + description: 'Response for the clearPartitions endpoint.' +} as const; + export const $ClearTaskInstanceCollectionResponse = { properties: { task_instances: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 6644fe8d99217..abec8b40dad95 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse2, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionTestData, GetConnectionTestResponse, EnqueueConnectionTestData, EnqueueConnectionTestResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, ClearDagRunsData, ClearDagRunsResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStateStoreData, ListAssetStateStoreResponse, ClearAssetStateStoreData, ClearAssetStateStoreResponse, GetAssetStateStoreData, GetAssetStateStoreResponse, SetAssetStateStoreData, SetAssetStateStoreResponse, DeleteAssetStateStoreData, DeleteAssetStateStoreResponse, ListTaskStateStoreData, ListTaskStateStoreResponse, ClearTaskStateStoreData, ClearTaskStateStoreResponse, GetTaskStateStoreData, GetTaskStateStoreResponse, SetTaskStateStoreData, SetTaskStateStoreResponse, PatchTaskStateStoreData, PatchTaskStateStoreResponse, DeleteTaskStateStoreData, DeleteTaskStateStoreResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse2, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionTestData, GetConnectionTestResponse, EnqueueConnectionTestData, EnqueueConnectionTestResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, ClearDagRunsData, ClearDagRunsResponse, ClearDagRunPartitionsData, ClearDagRunPartitionsResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStateStoreData, ListAssetStateStoreResponse, ClearAssetStateStoreData, ClearAssetStateStoreResponse, GetAssetStateStoreData, GetAssetStateStoreResponse, SetAssetStateStoreData, SetAssetStateStoreResponse, DeleteAssetStateStoreData, DeleteAssetStateStoreResponse, ListTaskStateStoreData, ListTaskStateStoreResponse, ClearTaskStateStoreData, ClearTaskStateStoreResponse, GetTaskStateStoreData, GetTaskStateStoreResponse, SetTaskStateStoreData, SetTaskStateStoreResponse, PatchTaskStateStoreData, PatchTaskStateStoreResponse, DeleteTaskStateStoreData, DeleteTaskStateStoreResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -1308,6 +1308,34 @@ export class DagRunService { }); } + /** + * Clear Dag Run Partitions + * Reset partition_key and partition_date fields on matching Dag Runs. + * @param data The data for the request. + * @param data.dagId + * @param data.requestBody + * @returns ClearPartitionsResponse Successful Response + * @throws ApiError + */ + public static clearDagRunPartitions(data: ClearDagRunPartitionsData): CancelablePromise { + return __request(OpenAPI, { + method: 'POST', + url: '/api/v2/dags/{dag_id}/clearPartitions', + path: { + dag_id: data.dagId + }, + body: data.requestBody, + mediaType: 'application/json', + errors: { + 400: 'Bad Request', + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Get Dag Run Stats * Get duration statistics for a DAG based on its historical completed runs. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index e5e6cef885b22..d35e15f26e6d0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -347,6 +347,18 @@ export type BulkDAGRunBody = { * Request body for the bulk clear Dag Runs endpoint. */ export type BulkDAGRunClearBody = { + /** + * Select runs by exact partition key match. Mutually exclusive with the other partition selectors. + */ + partition_key?: string | null; + /** + * Inclusive start of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors. + */ + partition_date_start?: string | null; + /** + * Inclusive end of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors. + */ + partition_date_end?: string | null; dry_run?: boolean; only_failed?: boolean; /** @@ -358,7 +370,7 @@ export type BulkDAGRunClearBody = { */ run_on_latest_version?: boolean | null; note?: string | null; - dag_runs: Array; + dag_runs?: Array; }; export type BulkDeleteAction_BulkDAGRunBody_ = { @@ -539,6 +551,45 @@ export type BulkUpdateAction_VariableBody_ = { action_on_non_existence?: BulkActionNotOnExistence; }; +/** + * Request body for the clearPartitions endpoint (column-reset: set partition fields to None). + */ +export type ClearPartitionsBody = { + /** + * Select runs by exact partition key match. Mutually exclusive with the other partition selectors. + */ + partition_key?: string | null; + /** + * Inclusive start of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors. + */ + partition_date_start?: string | null; + /** + * Inclusive end of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors. + */ + partition_date_end?: string | null; + /** + * Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window. + */ + run_id?: string | null; + /** + * Also clear task instances on the matched runs. + */ + clear_task_instances?: boolean; + /** + * If True, compute counts without writing any changes. + */ + dry_run?: boolean; +}; + +/** + * Response for the clearPartitions endpoint. + */ +export type ClearPartitionsResponse = { + dag_runs_cleared: number; + task_instances_cleared: number; + dry_run: boolean; +}; + /** * Response for clear dag run dry run, which may contain new tasks without full TaskInstance data. */ @@ -3117,6 +3168,13 @@ export type ClearDagRunsData = { export type ClearDagRunsResponse = ClearTaskInstanceCollectionResponse | DAGRunCollectionResponse; +export type ClearDagRunPartitionsData = { + dagId: string; + requestBody: ClearPartitionsBody; +}; + +export type ClearDagRunPartitionsResponse = ClearPartitionsResponse; + export type GetDagRunStatsData = { dagId: string; dagRunId: string; @@ -5728,6 +5786,37 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/dags/{dag_id}/clearPartitions': { + post: { + req: ClearDagRunPartitionsData; + res: { + /** + * Successful Response + */ + 200: ClearPartitionsResponse; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/ui/dags/{dag_id}/dagRuns/{dag_run_id}/stats': { get: { req: GetDagRunStatsData; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dag_run.py new file mode 100644 index 0000000000000..1b6e8d65d2341 --- /dev/null +++ b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dag_run.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunClearBody, ClearPartitionsBody + +_DATE_A = datetime(2024, 1, 1, tzinfo=timezone.utc) +_DATE_B = datetime(2024, 1, 31, tzinfo=timezone.utc) + +# Non-partition selector kwargs differ per model class. +_NON_PARTITION_KWARGS: dict[type, dict] = { + BulkDAGRunClearBody: {"dag_runs": [{"dag_run_id": "run1", "dag_id": "dag1"}]}, + ClearPartitionsBody: {"run_id": "manual__2024-01-01"}, +} + + +@pytest.mark.parametrize("model_cls", [BulkDAGRunClearBody, ClearPartitionsBody]) +class TestHasPartitionSelectors: + """has_partition_selectors property is driven by the shared mixin, tested for both models.""" + + @pytest.mark.parametrize( + "kwargs", + [ + pytest.param({"partition_key": "pk"}, id="partition_key"), + pytest.param({"partition_date_start": _DATE_A, "partition_date_end": _DATE_B}, id="date_window"), + pytest.param({"partition_date_end": _DATE_B}, id="date_end_only"), + ], + ) + def test_returns_true_when_partition_selector_set(self, model_cls, kwargs): + assert model_cls(**kwargs).has_partition_selectors is True + + def test_returns_false_when_only_non_partition_selector_provided(self, model_cls): + body = model_cls(**_NON_PARTITION_KWARGS[model_cls]) + assert body.has_partition_selectors is False + + +@pytest.mark.parametrize("model_cls", [BulkDAGRunClearBody, ClearPartitionsBody]) +class TestDateWindowOrder: + """validate_partition_date_window_order is shared; verify via both model classes.""" + + def test_raises_value_error_when_start_after_end(self, model_cls): + with pytest.raises(ValueError, match="partition_date_start must be on or before partition_date_end"): + model_cls(partition_date_start=_DATE_B, partition_date_end=_DATE_A) + + def test_accepts_equal_start_and_end(self, model_cls): + body = model_cls(partition_date_start=_DATE_A, partition_date_end=_DATE_A) + assert body.partition_date_start == body.partition_date_end + + def test_accepts_start_before_end(self, model_cls): + body = model_cls(partition_date_start=_DATE_A, partition_date_end=_DATE_B) + assert body.partition_date_start < body.partition_date_end diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 9b879e519bf75..5b5a0293ffc4b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -2264,6 +2264,693 @@ def test_only_new_skips_task_that_already_has_ti(self, test_client, dag_two_vers ) +PARTITION_DAG_ID = "partition_test_dag" + + +class TestBulkClearDagRunsPartitionSelector: + """ + Tests for the partition-selector extensions to clearDagRuns (Part A). + + These tests cover: partition_key selector, partition_date window, dry_run, + mutual-exclusion validation, wildcard rejection, and authz bypass fix (B1). + """ + + @pytest.fixture + def partition_dag(self, dag_maker, configure_git_connection_for_dag_bundle, session): + """Dag with two runs carrying partition_key and partition_date.""" + with dag_maker( + PARTITION_DAG_ID, + schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"), + start_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + serialized=True, + ): + task_1 = EmptyOperator(task_id="task_1") + + run_specs = [ + ("partition_run_a", "2026-01-01|us", datetime(2026, 1, 1, tzinfo=timezone.utc)), + ("partition_run_b", "2026-01-02|us", datetime(2026, 1, 2, tzinfo=timezone.utc)), + ("partition_run_c", "2026-01-03|us", datetime(2026, 1, 3, tzinfo=timezone.utc)), + ] + for run_id, partition_key, partition_date in run_specs: + run = dag_maker.create_dagrun( + run_id=run_id, + state=DagRunState.SUCCESS, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, + logical_date=partition_date, + ) + run.partition_key = partition_key + run.partition_date = partition_date + ti = run.get_task_instance(task_id="task_1") + ti.task = task_1 + ti.state = State.SUCCESS + session.merge(ti) + + dag_maker.sync_dagbag_to_db() + session.flush() + return { + "dag_id": PARTITION_DAG_ID, + "run_a_id": "partition_run_a", + "run_b_id": "partition_run_b", + "run_c_id": "partition_run_c", + } + + def test_partition_key_selector_clears_matching_run(self, test_client, session, partition_dag): + """partition_key selector resolves the matching run and clears it (state → queued).""" + dag_id = partition_dag["dag_id"] + response = test_client.post( + f"/dags/{dag_id}/clearDagRuns", + json={"dry_run": False, "partition_key": "2026-01-01|us"}, + ) + assert response.status_code == 200 + body = response.json() + assert body["total_entries"] == 1 + assert body["dag_runs"][0]["dag_run_id"] == partition_dag["run_a_id"] + assert body["dag_runs"][0]["state"] == "queued" + + # run_b and run_c must be untouched + session.expire_all() + for run_id in (partition_dag["run_b_id"], partition_dag["run_c_id"]): + state = session.scalar(select(DagRun.state).where(DagRun.run_id == run_id)) + assert state == DagRunState.SUCCESS + + def test_partition_date_window_inclusive_end(self, test_client, session, partition_dag): + """ + Window [Jan 1, Jan 2] must include runs with partition_date on Jan 1 and Jan 2 + but exclude Jan 3 (cap-boundary pair: end==Jan 2 included, end+1==Jan 3 excluded). + """ + dag_id = partition_dag["dag_id"] + response = test_client.post( + f"/dags/{dag_id}/clearDagRuns", + json={ + "dry_run": False, + "partition_date_start": "2026-01-01T00:00:00Z", + "partition_date_end": "2026-01-02T00:00:00Z", + }, + ) + assert response.status_code == 200 + body = response.json() + cleared_run_ids = sorted(r["dag_run_id"] for r in body["dag_runs"]) + assert cleared_run_ids == sorted([partition_dag["run_a_id"], partition_dag["run_b_id"]]) + for run in body["dag_runs"]: + assert run["state"] == "queued" + + # Jan 3 run must be untouched + session.expire_all() + state_c = session.scalar(select(DagRun.state).where(DagRun.run_id == partition_dag["run_c_id"])) + assert state_c == DagRunState.SUCCESS + + def test_partition_date_end_boundary_excludes_next_day(self, test_client, session, partition_dag): + """Upper bound is inclusive: run on end datetime is included, run after end is not.""" + dag_id = partition_dag["dag_id"] + # Window: start=Jan 1, end=Jan 1T00:00Z → only run_a selected (run_b on Jan 2 excluded) + response = test_client.post( + f"/dags/{dag_id}/clearDagRuns", + json={ + "dry_run": True, + "partition_date_start": "2026-01-01T00:00:00Z", + "partition_date_end": "2026-01-01T00:00:00Z", + }, + ) + assert response.status_code == 200 + body = response.json() + affected_run_ids = {ti["dag_run_id"] for ti in body["task_instances"]} + assert affected_run_ids == {partition_dag["run_a_id"]} + + def test_partition_selector_dry_run_does_not_write(self, test_client, session, partition_dag): + """dry_run=True returns affected TIs without modifying run state.""" + dag_id = partition_dag["dag_id"] + response = test_client.post( + f"/dags/{dag_id}/clearDagRuns", + json={"dry_run": True, "partition_key": "2026-01-01|us"}, + ) + assert response.status_code == 200 + body = response.json() + # dry_run returns ClearTaskInstanceCollectionResponse + assert "task_instances" in body + assert len(body["task_instances"]) > 0, "dry_run must return at least one affected TI" + session.expire_all() + state = session.scalar(select(DagRun.state).where(DagRun.run_id == partition_dag["run_a_id"])) + assert state == DagRunState.SUCCESS + + @pytest.mark.parametrize( + "build_body", + [ + pytest.param( + lambda d: { + "dry_run": True, + "dag_runs": [{"dag_run_id": d["run_a_id"]}], + "partition_key": "2026-01-01|us", + }, + id="dag_runs_and_partition_key", + ), + pytest.param( + lambda d: { + "dry_run": True, + "partition_key": "2026-01-01|us", + "partition_date_start": "2026-01-01T00:00:00Z", + }, + id="two_partition_selectors", + ), + pytest.param( + lambda d: {"dry_run": True}, + id="no_selector", + ), + pytest.param( + lambda d: { + "dry_run": True, + "partition_date_start": "2026-01-03T00:00:00Z", + "partition_date_end": "2026-01-01T00:00:00Z", + }, + id="start_after_end", + ), + ], + ) + def test_invalid_selector_combination_returns_422(self, test_client, partition_dag, build_body): + """Invalid selector combinations must be rejected with 422.""" + response = test_client.post( + f"/dags/{partition_dag['dag_id']}/clearDagRuns", + json=build_body(partition_dag), + ) + assert response.status_code == 422 + + def test_wildcard_dag_id_with_partition_selector_returns_400(self, test_client, partition_dag): + """'~' dag_id + partition selector must be rejected with 400 (timetable unknown).""" + response = test_client.post( + "/dags/~/clearDagRuns", + json={"dry_run": True, "partition_key": "2026-01-01|us"}, + ) + assert response.status_code == 400 + + def test_partition_selector_unauthenticated_returns_401(self, unauthenticated_test_client, partition_dag): + """Unauthenticated request with partition selector must return 401.""" + response = unauthenticated_test_client.post( + f"/dags/{partition_dag['dag_id']}/clearDagRuns", + json={"dry_run": True, "partition_key": "2026-01-01|us"}, + ) + assert response.status_code == 401 + + def test_partition_selector_unauthorized_returns_403(self, unauthorized_test_client, partition_dag): + """Unauthorized user with partition selector must return 403 (authz bypass fix B1).""" + response = unauthorized_test_client.post( + f"/dags/{partition_dag['dag_id']}/clearDagRuns", + json={"dry_run": True, "partition_key": "2026-01-01|us"}, + ) + assert response.status_code == 403 + + def test_partition_key_no_match_returns_200_empty(self, test_client, session, partition_dag): + """A partition_key matching no run returns 200 with an empty result, not 404.""" + dag_id = partition_dag["dag_id"] + response = test_client.post( + f"/dags/{dag_id}/clearDagRuns", + json={"dry_run": False, "partition_key": "9999-12-31|none"}, + ) + assert response.status_code == 200 + body = response.json() + assert body["total_entries"] == 0 + assert body["dag_runs"] == [] + + # All runs must be untouched. + session.expire_all() + for run_id in (partition_dag["run_a_id"], partition_dag["run_b_id"], partition_dag["run_c_id"]): + state = session.scalar(select(DagRun.state).where(DagRun.run_id == run_id)) + assert state == DagRunState.SUCCESS + + @pytest.mark.parametrize( + ("window", "expected_run_keys"), + [ + pytest.param( + {"partition_date_start": "2026-01-02T00:00:00Z"}, + ("run_b_id", "run_c_id"), + id="start-only-includes-from-start-onward", + ), + pytest.param( + {"partition_date_end": "2026-01-02T00:00:00Z"}, + ("run_a_id", "run_b_id"), + id="end-only-includes-up-to-and-including-end", + ), + ], + ) + def test_partition_date_single_bound_window( + self, test_client, session, partition_dag, window, expected_run_keys + ): + """A window with only one bound is open-ended on the missing side.""" + dag_id = partition_dag["dag_id"] + response = test_client.post( + f"/dags/{dag_id}/clearDagRuns", + json={"dry_run": False, **window}, + ) + assert response.status_code == 200 + body = response.json() + cleared_run_ids = sorted(r["dag_run_id"] for r in body["dag_runs"]) + assert cleared_run_ids == sorted(partition_dag[k] for k in expected_run_keys) + + +class TestClearPartitions: + """ + Tests for the new clearPartitions endpoint (Part B/C). + + Covers: run_id selector, partition_key selector, partition_date window, + clear_task_instances, dry_run, mutual-exclusion validation, and authz. + """ + + @pytest.fixture + def partitioned_dag_with_runs(self, dag_maker, configure_git_connection_for_dag_bundle, session): + """Dag with three runs carrying partition fields and task instances.""" + dag_id = "clear_partitions_test_dag" + with dag_maker( + dag_id, + schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"), + start_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + serialized=True, + ): + task_x = EmptyOperator(task_id="task_x") + task_y = EmptyOperator(task_id="task_y") + + run_specs = [ + ("cp_run_a", "key-a", datetime(2026, 1, 1, tzinfo=timezone.utc)), + ("cp_run_b", "key-b", datetime(2026, 1, 2, tzinfo=timezone.utc)), + ("cp_run_c", "key-c", datetime(2026, 1, 3, tzinfo=timezone.utc)), + ] + runs = {} + for run_id, partition_key, partition_date in run_specs: + run = dag_maker.create_dagrun( + run_id=run_id, + state=DagRunState.SUCCESS, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, + logical_date=partition_date, + ) + run.partition_key = partition_key + run.partition_date = partition_date + runs[run_id] = run + + # Only run_a carries task instances, for the clear_task_instances tests + for op in (task_x, task_y): + ti = runs["cp_run_a"].get_task_instance(task_id=op.task_id) + ti.task = op + ti.state = State.SUCCESS + session.merge(ti) + + dag_maker.sync_dagbag_to_db() + session.flush() + return { + "dag_id": dag_id, + "run_a_id": "cp_run_a", + "run_b_id": "cp_run_b", + "run_c_id": "cp_run_c", + } + + def test_run_id_selector_clears_partition_fields(self, test_client, session, partitioned_dag_with_runs): + """run_id selector resets partition fields to None on the matching run.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"run_id": info["run_a_id"], "dry_run": False}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 1 + assert body["task_instances_cleared"] == 0 + assert body["dry_run"] is False + + session.expire_all() + run = session.scalar(select(DagRun).where(DagRun.run_id == info["run_a_id"])) + assert run.partition_key is None + assert run.partition_date is None + # Other runs untouched + run_b = session.scalar(select(DagRun).where(DagRun.run_id == info["run_b_id"])) + assert run_b.partition_key == "key-b" + + def test_partition_key_selector_clears_partition_fields( + self, test_client, session, partitioned_dag_with_runs + ): + """partition_key selector resets partition fields to None on matching run.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"partition_key": "key-b", "dry_run": False}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 1 + assert body["dry_run"] is False + + session.expire_all() + run_b = session.scalar(select(DagRun).where(DagRun.run_id == info["run_b_id"])) + assert run_b.partition_key is None + assert run_b.partition_date is None + + def test_partition_date_window_clears_fields_within_range( + self, test_client, session, partitioned_dag_with_runs + ): + """partition_date window [Jan 1, Jan 2] clears run_a and run_b, leaves run_c.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={ + "partition_date_start": "2026-01-01T00:00:00Z", + "partition_date_end": "2026-01-02T00:00:00Z", + "dry_run": False, + }, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 2 + assert body["dry_run"] is False + + session.expire_all() + for run_id in (info["run_a_id"], info["run_b_id"]): + run = session.scalar(select(DagRun).where(DagRun.run_id == run_id)) + assert run.partition_key is None + assert run.partition_date is None + # run_c (Jan 3) must be untouched + run_c = session.scalar(select(DagRun).where(DagRun.run_id == info["run_c_id"])) + assert run_c.partition_key == "key-c" + + def test_dry_run_returns_counts_without_writing(self, test_client, session, partitioned_dag_with_runs): + """dry_run=True reports the would-be count but does not modify the DB.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"partition_key": "key-a", "dry_run": True}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 1 + assert body["dry_run"] is True + + session.expire_all() + run_a = session.scalar(select(DagRun).where(DagRun.run_id == info["run_a_id"])) + assert run_a.partition_key == "key-a" # unchanged + + def test_clear_task_instances_non_dry_run(self, test_client, session, partitioned_dag_with_runs): + """clear_task_instances=True clears TIs and reports the count.""" + info = partitioned_dag_with_runs + # run_a has 2 TIs (task_x and task_y) + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"run_id": info["run_a_id"], "clear_task_instances": True, "dry_run": False}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 1 + assert body["task_instances_cleared"] == 2 + assert body["dry_run"] is False + + def test_clear_task_instances_dry_run_counts_tis(self, test_client, session, partitioned_dag_with_runs): + """dry_run + clear_task_instances reports TI count without writing.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"run_id": info["run_a_id"], "clear_task_instances": True, "dry_run": True}, + ) + assert response.status_code == 200 + body = response.json() + assert body["task_instances_cleared"] == 2 + assert body["dry_run"] is True + + session.expire_all() + run_a = session.scalar(select(DagRun).where(DagRun.run_id == info["run_a_id"])) + assert run_a.partition_key == "key-a" # not cleared (dry_run) + + @pytest.mark.parametrize( + "build_body", + [ + pytest.param( + lambda d: {"run_id": d["run_a_id"], "partition_key": "key-a", "dry_run": True}, + id="run_id_and_partition_key", + ), + pytest.param( + lambda d: {"dry_run": True}, + id="no_selector", + ), + pytest.param( + lambda d: { + "partition_date_start": "2026-01-03T00:00:00Z", + "partition_date_end": "2026-01-01T00:00:00Z", + "dry_run": True, + }, + id="start_after_end", + ), + ], + ) + def test_invalid_selector_combination_returns_422( + self, test_client, partitioned_dag_with_runs, build_body + ): + """Invalid selector combinations must be rejected with 422.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json=build_body(info), + ) + assert response.status_code == 422 + + def test_unauthenticated_returns_401(self, unauthenticated_test_client, partitioned_dag_with_runs): + """Unauthenticated request must return 401.""" + info = partitioned_dag_with_runs + response = unauthenticated_test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"partition_key": "key-a", "dry_run": True}, + ) + assert response.status_code == 401 + + def test_unauthorized_returns_403(self, unauthorized_test_client, partitioned_dag_with_runs): + """Unauthorized user must return 403.""" + info = partitioned_dag_with_runs + response = unauthorized_test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"partition_key": "key-a", "dry_run": True}, + ) + assert response.status_code == 403 + + def test_partition_key_no_match_returns_200_zero_count( + self, test_client, session, partitioned_dag_with_runs + ): + """A partition_key matching no run returns 200 with zero counts, not 404.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"partition_key": "nonexistent-key", "dry_run": False}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 0 + assert body["task_instances_cleared"] == 0 + + # Existing runs keep their partition fields. + session.expire_all() + run_a = session.scalar(select(DagRun).where(DagRun.run_id == info["run_a_id"])) + assert run_a.partition_key == "key-a" + + @pytest.mark.parametrize( + ("window", "expected_run_keys"), + [ + pytest.param( + {"partition_date_start": "2026-01-02T00:00:00Z"}, + ("run_b_id", "run_c_id"), + id="start-only-includes-from-start-onward", + ), + pytest.param( + {"partition_date_end": "2026-01-02T00:00:00Z"}, + ("run_a_id", "run_b_id"), + id="end-only-includes-up-to-and-including-end", + ), + ], + ) + def test_partition_date_single_bound_window( + self, test_client, session, partitioned_dag_with_runs, window, expected_run_keys + ): + """A window with only one bound is open-ended on the missing side.""" + info = partitioned_dag_with_runs + response = test_client.post( + f"/dags/{info['dag_id']}/clearPartitions", + json={"dry_run": False, **window}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 2 + + session.expire_all() + cleared_ids = {info[k] for k in expected_run_keys} + for run_id in (info["run_a_id"], info["run_b_id"], info["run_c_id"]): + run = session.scalar(select(DagRun).where(DagRun.run_id == run_id)) + if run_id in cleared_ids: + assert run.partition_key is None + assert run.partition_date is None + else: + assert run.partition_key is not None + + def test_cross_dag_run_id_collision_does_not_clear_other_dag( + self, test_client, session, dag_maker, configure_git_connection_for_dag_bundle + ): + """Clearing by run_id only affects the target Dag; a second Dag with the same run_id is untouched.""" + shared_run_id = "shared_run_id" + + # Build target Dag with the shared run_id. + dag_id_target = "cp_cross_dag_target" + with dag_maker( + dag_id_target, + schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"), + start_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + serialized=True, + ): + task_a = EmptyOperator(task_id="task_a") + + run_target = dag_maker.create_dagrun( + run_id=shared_run_id, + state=DagRunState.SUCCESS, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, + logical_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + ) + run_target.partition_key = "key-target" + run_target.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc) + ti_target = run_target.get_task_instance(task_id=task_a.task_id) + ti_target.task = task_a + ti_target.state = State.SUCCESS + session.merge(ti_target) + + # Build bystander Dag with the same run_id. + dag_id_bystander = "cp_cross_dag_bystander" + with dag_maker( + dag_id_bystander, + schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"), + start_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + serialized=True, + ): + task_b = EmptyOperator(task_id="task_b") + + run_bystander = dag_maker.create_dagrun( + run_id=shared_run_id, + state=DagRunState.SUCCESS, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, + logical_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + ) + run_bystander.partition_key = "key-bystander" + run_bystander.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc) + ti_bystander = run_bystander.get_task_instance(task_id=task_b.task_id) + ti_bystander.task = task_b + ti_bystander.state = State.SUCCESS + session.merge(ti_bystander) + + dag_maker.sync_dagbag_to_db() + session.flush() + + response = test_client.post( + f"/dags/{dag_id_target}/clearPartitions", + json={"run_id": shared_run_id, "clear_task_instances": True, "dry_run": False}, + ) + assert response.status_code == 200 + body = response.json() + assert body["dag_runs_cleared"] == 1 + assert body["task_instances_cleared"] == 1 + + # Target Dag's run and TI are cleared. + session.expire_all() + run_t = session.scalar( + select(DagRun).where(DagRun.dag_id == dag_id_target, DagRun.run_id == shared_run_id) + ) + assert run_t.partition_key is None + assert run_t.partition_date is None + + # Bystander Dag's run and TI are completely untouched. + run_b = session.scalar( + select(DagRun).where(DagRun.dag_id == dag_id_bystander, DagRun.run_id == shared_run_id) + ) + assert run_b.partition_key == "key-bystander" + assert run_b.partition_date is not None + + ti_b_after = session.scalar( + select(TaskInstance).where( + TaskInstance.dag_id == dag_id_bystander, TaskInstance.run_id == shared_run_id + ) + ) + assert ti_b_after.state == State.SUCCESS + + def test_cross_dag_run_id_collision_dry_run_counts_only_target_dag( + self, test_client, session, dag_maker, configure_git_connection_for_dag_bundle + ): + """dry_run=True TI count only includes the target Dag's TIs, not a bystander sharing the same run_id.""" + shared_run_id = "shared_dry_run_id" + + dag_id_target = "cp_cross_dag_dry_target" + with dag_maker( + dag_id_target, + schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"), + start_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + serialized=True, + ): + task_a = EmptyOperator(task_id="task_a") + task_b = EmptyOperator(task_id="task_b") + + run_target = dag_maker.create_dagrun( + run_id=shared_run_id, + state=DagRunState.SUCCESS, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, + logical_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + ) + run_target.partition_key = "key-target-dry" + run_target.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc) + for op in (task_a, task_b): + ti = run_target.get_task_instance(task_id=op.task_id) + ti.task = op + ti.state = State.SUCCESS + session.merge(ti) + + dag_id_bystander = "cp_cross_dag_dry_bystander" + with dag_maker( + dag_id_bystander, + schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"), + start_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + serialized=True, + ): + task_c = EmptyOperator(task_id="task_c") + task_d = EmptyOperator(task_id="task_d") + task_e = EmptyOperator(task_id="task_e") + + run_bystander = dag_maker.create_dagrun( + run_id=shared_run_id, + state=DagRunState.SUCCESS, + run_type=DagRunType.MANUAL, + triggered_by=DagRunTriggeredByType.REST_API, + logical_date=datetime(2026, 1, 1, tzinfo=timezone.utc), + ) + run_bystander.partition_key = "key-bystander-dry" + run_bystander.partition_date = datetime(2026, 1, 1, tzinfo=timezone.utc) + for op in (task_c, task_d, task_e): + ti = run_bystander.get_task_instance(task_id=op.task_id) + ti.task = op + ti.state = State.SUCCESS + session.merge(ti) + + dag_maker.sync_dagbag_to_db() + session.flush() + + response = test_client.post( + f"/dags/{dag_id_target}/clearPartitions", + json={"run_id": shared_run_id, "clear_task_instances": True, "dry_run": True}, + ) + assert response.status_code == 200 + body = response.json() + # Only the 2 TIs from the target Dag count; the 3 bystander TIs must not be included. + assert body["task_instances_cleared"] == 2 + assert body["dry_run"] is True + + # Neither run is written to (dry_run=True). + session.expire_all() + run_t = session.scalar( + select(DagRun).where(DagRun.dag_id == dag_id_target, DagRun.run_id == shared_run_id) + ) + assert run_t.partition_key == "key-target-dry" + run_b = session.scalar( + select(DagRun).where(DagRun.dag_id == dag_id_bystander, DagRun.run_id == shared_run_id) + ) + assert run_b.partition_key == "key-bystander-dry" + + class TestTriggerDagRun: def _dags_for_trigger_tests(self, session=None): inactive_dag = DagModel( diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py b/airflow-core/tests/unit/cli/commands/test_dag_command.py index 0c9f2647f0fe7..8ba0a64156a22 100644 --- a/airflow-core/tests/unit/cli/commands/test_dag_command.py +++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py @@ -1543,13 +1543,13 @@ def seeded_no_tz_runs(self, dag_maker): dag_maker.sync_dagbag_to_db() @pytest.mark.usefixtures("seeded_no_tz_runs") - def test_no_tz_lower_bound_truncates_time_of_day(self, parser, monkeypatch): - """--partition-date-start with a non-midnight time-of-day must still match the day. + def test_no_tz_lower_bound_honours_time_of_day(self, parser, monkeypatch): + """--partition-date-start with a non-midnight time-of-day is honoured, not truncated. - A start of 2026-03-08T12:00:00 truncates to 2026-03-08 via .date(), - giving lower = 2026-03-08T00:00Z, which is <= the stored 2026-03-08T00:00Z - run (so it is included). Without truncation the raw-instant comparison - would be 2026-03-08T12:00Z > 2026-03-08T00:00Z and the run would be missed. + A start of 2026-03-08T12:00:00 passes through as lower = 2026-03-08T12:00Z + (no-tz fallback keeps the wall-clock as UTC). The stored 2026-03-08T00:00Z + run is *before* that bound and is excluded; the 2026-03-09T00:00Z run is + after it and is cleared. """ def _patched(*, bundle_names, dag_id): @@ -1572,9 +1572,9 @@ def _patched(*, bundle_names, dag_id): dag_command.dag_clear(args) states = self._get_run_states() - # 2026-03-08T12 truncates to 2026-03-08 → lower = 2026-03-08T00Z; run is included. - assert states["no_tz_2026_03_08"] == DagRunState.QUEUED - # 2026-03-09 is also >= 2026-03-08T00Z (no upper bound). + # 2026-03-08T00Z is before the 12:00Z lower bound → excluded. + assert states["no_tz_2026_03_08"] == DagRunState.SUCCESS + # 2026-03-09T00Z is after the lower bound (no upper bound) → cleared. assert states["no_tz_2026_03_09"] == DagRunState.QUEUED @pytest.mark.usefixtures("seeded_no_tz_runs") @@ -1680,13 +1680,12 @@ def seeded_asset_partitioned_runs(self, dag_maker): @pytest.mark.usefixtures("seeded_asset_partitioned_runs") def test_asset_timetable_clears_window_inclusive(self, parser): - """PartitionedAssetTimetable uses the base default; day-granular UTC bounds are correct. + """PartitionedAssetTimetable uses the base (UTC) localization; datetime-precision bounds are correct. - PartitionedAssetTimetable has no local timezone, so - resolve_day_bound returns midnight UTC for each calendar day. - The full window 2026-04-10 to 2026-04-14 (inclusive) should clear the - at-boundary and within-window runs; 2026-04-15 and partition_date=None - must not be touched. + PartitionedAssetTimetable has no local timezone, so localize_partition_datetime + is a UTC pass-through. The full window 2026-04-10 to 2026-04-14 (inclusive) + should clear the at-boundary and within-window runs; 2026-04-15 and + partition_date=None must not be touched. """ args = parser.parse_args( [ @@ -1707,7 +1706,7 @@ def test_asset_timetable_clears_window_inclusive(self, parser): "asset_2026_04_10": DagRunState.QUEUED, "asset_2026_04_12": DagRunState.QUEUED, "asset_2026_04_14": DagRunState.QUEUED, - # Outside the half-open upper bound — must NOT be cleared. + # Beyond the inclusive upper bound — must NOT be cleared. "asset_2026_04_15": DagRunState.SUCCESS, # NULL partition_date is never matched by the date-range filter. "asset_non_part": DagRunState.SUCCESS, @@ -1715,7 +1714,7 @@ def test_asset_timetable_clears_window_inclusive(self, parser): @pytest.mark.usefixtures("seeded_asset_partitioned_runs") def test_asset_timetable_upper_bound_at_cap(self, parser): - """--partition-date-end 2026-04-14 must include the run at exactly that UTC midnight (at-cap).""" + """--partition-date-end 2026-04-14 includes the run at exactly that UTC midnight (at-cap, inclusive).""" args = parser.parse_args( [ "dags", @@ -1733,7 +1732,7 @@ def test_asset_timetable_upper_bound_at_cap(self, parser): assert states["asset_2026_04_10"] == DagRunState.QUEUED assert states["asset_2026_04_12"] == DagRunState.QUEUED assert states["asset_2026_04_14"] == DagRunState.QUEUED - # 2026-04-15 is outside the half-open upper bound. + # 2026-04-15 is beyond the inclusive upper bound. assert states["asset_2026_04_15"] == DagRunState.SUCCESS assert states["asset_non_part"] == DagRunState.SUCCESS @@ -1741,9 +1740,8 @@ def test_asset_timetable_upper_bound_at_cap(self, parser): def test_asset_timetable_upper_bound_over_cap(self, parser): """--partition-date-end 2026-04-13 must NOT include the 2026-04-14 run (over-cap). - Half-open upper bound: end=2026-04-13 → upper = 2026-04-14T00:00Z. - The run stored at 2026-04-14T00:00Z satisfies partition_date < 2026-04-14T00:00Z - as False, so it is excluded. + Inclusive upper bound: end=2026-04-13T00:00Z → partition_date <= 2026-04-13T00:00Z. + The run stored at 2026-04-14T00:00Z is excluded because Apr 14 > Apr 13. """ args = parser.parse_args( [ @@ -1760,7 +1758,7 @@ def test_asset_timetable_upper_bound_over_cap(self, parser): states = self._get_run_states() assert states["asset_2026_04_10"] == DagRunState.QUEUED assert states["asset_2026_04_12"] == DagRunState.QUEUED - # 2026-04-14 is exactly at the half-open boundary — must NOT be cleared. + # 2026-04-14 is beyond the inclusive end (Apr 13) — must NOT be cleared. assert states["asset_2026_04_14"] == DagRunState.SUCCESS assert states["asset_2026_04_15"] == DagRunState.SUCCESS assert states["asset_non_part"] == DagRunState.SUCCESS diff --git a/airflow-core/tests/unit/cli/commands/test_partition_command.py b/airflow-core/tests/unit/cli/commands/test_partition_command.py index 27b999afa3cc0..efcbc9afe5f37 100644 --- a/airflow-core/tests/unit/cli/commands/test_partition_command.py +++ b/airflow-core/tests/unit/cli/commands/test_partition_command.py @@ -434,10 +434,8 @@ def test_clear_task_instances_chunks_at_cap(self, parser, dag_maker): dag_maker.sync_dagbag_to_db() with ( - mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap), - mock.patch( - "airflow.cli.commands.partition_command.clear_task_instances", autospec=True - ) as mock_cti, + mock.patch("airflow.models.dagrun._TI_CHUNK_SIZE", ti_cap), + mock.patch("airflow.models.dagrun.clear_task_instances", autospec=True) as mock_cti, ): partition_command.clear( parser.parse_args( @@ -487,10 +485,8 @@ def test_clear_task_instances_chunks_over_cap(self, parser, dag_maker): dag_maker.sync_dagbag_to_db() with ( - mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap), - mock.patch( - "airflow.cli.commands.partition_command.clear_task_instances", autospec=True - ) as mock_cti, + mock.patch("airflow.models.dagrun._TI_CHUNK_SIZE", ti_cap), + mock.patch("airflow.models.dagrun.clear_task_instances", autospec=True) as mock_cti, ): partition_command.clear( parser.parse_args( @@ -541,10 +537,8 @@ def test_clear_task_instances_chunks_just_under_cap(self, parser, dag_maker): dag_maker.sync_dagbag_to_db() with ( - mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap), - mock.patch( - "airflow.cli.commands.partition_command.clear_task_instances", autospec=True - ) as mock_cti, + mock.patch("airflow.models.dagrun._TI_CHUNK_SIZE", ti_cap), + mock.patch("airflow.models.dagrun.clear_task_instances", autospec=True) as mock_cti, ): partition_command.clear( parser.parse_args( @@ -569,7 +563,7 @@ def test_clear_task_instances_chunks_just_under_cap(self, parser, dag_maker): clear_db_dags() def test_clear_task_instances_chunks_mid_loop_trigger(self, parser, dag_maker): - """Pin mid-loop SELECT IN + slice trigger (partition_command.py L120-132). + """Pin mid-loop SELECT IN + slice trigger (dagrun.py clear_partition_runs loop body). Design: TI_CHUNK_SIZE=3, 1 dag with 2 tasks, 3 DRs (6 TIs total). @@ -616,10 +610,8 @@ def test_clear_task_instances_chunks_mid_loop_trigger(self, parser, dag_maker): dag_maker.sync_dagbag_to_db() with ( - mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap), - mock.patch( - "airflow.cli.commands.partition_command.clear_task_instances", autospec=True - ) as mock_cti, + mock.patch("airflow.models.dagrun._TI_CHUNK_SIZE", ti_cap), + mock.patch("airflow.models.dagrun.clear_task_instances", autospec=True) as mock_cti, ): partition_command.clear( parser.parse_args( @@ -786,12 +778,12 @@ def test_date_range_syntax_mutually_exclusive_with_start_end(self, parser): ) assert excinfo.value.code == "--date cannot be combined with --start-date / --end-date." - def test_date_range_end_date_is_day_granular(self, parser, dag_maker): - """Pin day-granular semantics: --date right side covers the whole local calendar day. + def test_date_range_date_only_endpoints_default_to_midnight(self, parser, dag_maker): + """Date-only --date endpoints default to local midnight; the time component is honoured. - '2026-01-02~2026-01-02' resolves to [Jan 2 00:00Z, Jan 3 00:00Z), so both - the midnight run and a 15:00 run on the same local day are cleared. - The time-of-day component of the input is ignored; only the calendar date is used. + '2026-01-02~2026-01-02' resolves to the inclusive window [Jan 2 00:00Z, Jan 2 00:00Z], + so only the midnight run is cleared. A 15:00 run on the same calendar day falls + after the upper bound and is left untouched. """ dag_maker.create_dagrun( run_id="part_run_2_midday_date", @@ -819,22 +811,21 @@ def test_date_range_end_date_is_day_granular(self, parser, dag_maker): run_2 = _get_run("part_run_2") assert run_2.partition_key is None assert run_2.partition_date is None - # 15:00 on the same local day is also cleared (whole-day upper bound Jan 3 midnight). + # 15:00 is after the Jan 2 00:00Z upper bound → untouched. run_midday = _get_run("part_run_2_midday_date") - assert run_midday.partition_key is None - assert run_midday.partition_date is None + assert run_midday.partition_key == "2026-01-02T15:00:00" # Runs outside the date range are untouched. run_1 = _get_run("part_run_1") assert run_1.partition_key == "2026-01-01T00:00:00" run_3 = _get_run("part_run_3") assert run_3.partition_key == "2026-01-03T00:00:00" - def test_clear_datetime_input_time_component_ignored(self, parser, dag_maker): - """Time-of-day component in --start-date / --end-date is stripped; only the date is used. + def test_clear_end_date_time_component_honoured(self, parser, dag_maker): + """The time-of-day in --end-date is honoured; runs after it are not cleared. - --end-date 2026-01-02T10:00:00 strips to Jan 2, making the half-open upper - bound Jan 3 00:00Z. Both the 10:00 and 15:00 runs on Jan 2 fall within - [start, Jan 3 00:00Z) and are cleared. + --end-date 2026-01-02T10:00:00 gives the inclusive upper bound Jan 2 10:00Z. + The 10:00 run is at the bound and cleared; the 15:00 run is after it and + left untouched. """ dag_maker.create_dagrun( run_id="part_run_h10", @@ -865,21 +856,20 @@ def test_clear_datetime_input_time_component_ignored(self, parser, dag_maker): ) ) - # 10:00 on Jan 2 is within [start, Jan 3 00:00Z) — cleared. + # 10:00 on Jan 2 is at the inclusive upper bound — cleared. run_h10 = _get_run("part_run_h10") assert run_h10.partition_key is None assert run_h10.partition_date is None - # 15:00 on Jan 2 is also within the half-open day bound — cleared. + # 15:00 on Jan 2 is after the upper bound — untouched. run_h15 = _get_run("part_run_h15") - assert run_h15.partition_key is None - assert run_h15.partition_date is None + assert run_h15.partition_key == "2026-01-02T15:00:00" - def test_clear_datetime_inputs_use_date_part_only(self, parser, dag_maker): - """Datetime --start-date / --end-date inputs are day-granular; the time part is ignored. + def test_clear_datetime_inputs_honour_time_window(self, parser, dag_maker): + """Datetime --start-date / --end-date define a sub-day window; the time part is honoured. - --start-date 2026-01-02T03:00:00 strips to Jan 2 → lower = Jan 2 00:00Z. - --end-date 2026-01-02T10:00:00 strips to Jan 2 → upper = Jan 3 00:00Z. - All three runs on Jan 2 (02:00, 05:00, 11:00) fall within that window. + --start-date 2026-01-02T03:00:00 → lower = Jan 2 03:00Z. + --end-date 2026-01-02T10:00:00 → upper = Jan 2 10:00Z. + Only the 05:00 run sits inside [03:00Z, 10:00Z]; the 02:00 and 11:00 runs are outside. """ dag_maker.create_dagrun( run_id="part_run_h02", @@ -919,14 +909,16 @@ def test_clear_datetime_inputs_use_date_part_only(self, parser, dag_maker): ) ) - # All three runs on Jan 2 fall within [Jan 2 00:00Z, Jan 3 00:00Z) — cleared. - for run_id in ("part_run_h02", "part_run_h05", "part_run_h11b"): - run = _get_run(run_id) - assert run.partition_key is None - assert run.partition_date is None + # Only the 05:00 run is inside [Jan 2 03:00Z, Jan 2 10:00Z] — cleared. + run_h05 = _get_run("part_run_h05") + assert run_h05.partition_key is None + assert run_h05.partition_date is None + # 02:00 (before lower) and 11:00 (after upper) are outside the window — untouched. + assert _get_run("part_run_h02").partition_key == "2026-01-02T02:00:00" + assert _get_run("part_run_h11b").partition_key == "2026-01-02T11:00:00" - def test_clear_via_date_range_datetime_endpoints_use_date_part_only(self, parser, dag_maker): - """--date with ISO datetime endpoints strips the time part; the full local day is covered.""" + def test_clear_via_date_range_datetime_endpoints_honour_time(self, parser, dag_maker): + """--date with ISO datetime endpoints honours the time part; only the in-window run clears.""" dag_maker.create_dagrun( run_id="part_run_h02b", state=DagRunState.SUCCESS, @@ -950,7 +942,7 @@ def test_clear_via_date_range_datetime_endpoints_use_date_part_only(self, parser ) dag_maker.sync_dagbag_to_db() - # Both sides strip to Jan 2 → window is [Jan 2 00:00Z, Jan 3 00:00Z). + # Window is the inclusive [Jan 2 03:00Z, Jan 2 10:00Z]. partition_command.clear( parser.parse_args( [ @@ -964,11 +956,13 @@ def test_clear_via_date_range_datetime_endpoints_use_date_part_only(self, parser ) ) - # All three runs on Jan 2 are within the day window — cleared. - for run_id in ("part_run_h02b", "part_run_h05b", "part_run_h11c"): - run = _get_run(run_id) - assert run.partition_key is None - assert run.partition_date is None + # Only the 05:00 run is inside the window — cleared. + run_h05b = _get_run("part_run_h05b") + assert run_h05b.partition_key is None + assert run_h05b.partition_date is None + # 02:00 (before lower) and 11:00 (after upper) are untouched. + assert _get_run("part_run_h02b").partition_key == "2026-01-02T02:00:00" + assert _get_run("part_run_h11c").partition_key == "2026-01-02T11:00:00" TAIPEI_DAG_ID = "test_partitions_clear_taipei_dag" @@ -1117,3 +1111,117 @@ def test_taipei_upper_bound_over_cap(self, parser): # 2026-02-19 (stored at 2026-02-18T16Z) equals the upper bound — strictly less than, NOT cleared. assert dates["taipei_2026_02_19"] == datetime(2026, 2, 18, 16, 0, 0, tzinfo=pendulum.UTC) assert dates["taipei_2026_02_20"] == datetime(2026, 2, 19, 16, 0, 0, tzinfo=pendulum.UTC) + + def test_cross_dag_run_id_collision_does_not_clear_other_dag(self, parser, dag_maker): + """Clearing by run_id only affects the target Dag; a second Dag sharing the same run_id is untouched.""" + shared_run_id = "cross_dag_shared_run" + dag_id_target = "cli_cross_dag_target" + dag_id_bystander = "cli_cross_dag_bystander" + clear_db_runs() + clear_db_dags() + + with dag_maker( + dag_id_target, + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime(2026, 1, 1), + serialized=True, + ): + EmptyOperator(task_id="t1") + dag_maker.create_dagrun( + run_id=shared_run_id, + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 1, 1, tzinfo=pendulum.UTC), + partition_key="key-target", + ) + + with dag_maker( + dag_id_bystander, + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime(2026, 1, 1), + serialized=True, + ): + EmptyOperator(task_id="t1") + dag_maker.create_dagrun( + run_id=shared_run_id, + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime(2026, 1, 1, tzinfo=pendulum.UTC), + partition_key="key-bystander", + ) + + dag_maker.sync_dagbag_to_db() + + _set_tis_state(shared_run_id, TaskInstanceState.SUCCESS) + + partition_command.clear( + parser.parse_args( + [ + "partitions", + "clear", + "--dag-id", + dag_id_target, + "--run-id", + shared_run_id, + "--clear-task-instances", + ] + ) + ) + + with create_session() as session: + run_target = session.scalar( + select(DagRun).where(DagRun.dag_id == dag_id_target, DagRun.run_id == shared_run_id) + ) + run_bystander = session.scalar( + select(DagRun).where(DagRun.dag_id == dag_id_bystander, DagRun.run_id == shared_run_id) + ) + tis_bystander = list( + session.scalars( + select(TaskInstance).where( + TaskInstance.dag_id == dag_id_bystander, + TaskInstance.run_id == shared_run_id, + ) + ) + ) + + assert run_target.partition_key is None + assert run_target.partition_date is None + + assert run_bystander.partition_key == "key-bystander" + assert run_bystander.partition_date is not None + assert all(ti.state == TaskInstanceState.SUCCESS for ti in tis_bystander) + + # Target Dag's TIs must have been reset (clear_task_instances=True). + with create_session() as session: + tis_target = list( + session.scalars( + select(TaskInstance).where( + TaskInstance.dag_id == dag_id_target, + TaskInstance.run_id == shared_run_id, + ) + ) + ) + assert all(ti.state is None for ti in tis_target) + + clear_db_runs() + clear_db_dags() + + @pytest.mark.parametrize( + "selector_args", + [ + ["--run-id", "part_run_1"], + ["--partition-key", "2026-01-01T00:00:00"], + ], + ) + def test_run_id_and_partition_key_do_not_call_get_db_dag(self, parser, selector_args): + """run_id and partition_key selectors must not load the Dag from the DB. + + get_db_dag raises when the Dag has been deleted but DagRuns remain; the + CLI must not call it for these two selectors so that orphaned runs can + still be cleared. + """ + with mock.patch("airflow.cli.commands.partition_command.get_db_dag") as mock_get_db_dag: + partition_command.clear( + parser.parse_args(["partitions", "clear", "--dag-id", DAG_ID, *selector_args]) + ) + mock_get_db_dag.assert_not_called() diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index c92b75a4583ac..e578701965648 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -50,7 +50,7 @@ from airflow.callbacks.callback_requests import DagCallbackRequest, DagRunContext from airflow.models.dag import DagModel, infer_automated_data_interval from airflow.models.dag_version import DagVersion -from airflow.models.dagrun import DagRun, DagRunNote +from airflow.models.dagrun import DagRun, DagRunNote, clear_partition_runs from airflow.models.deadline import Deadline from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel from airflow.models.serialized_dag import SerializedDagModel @@ -61,7 +61,16 @@ from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator -from airflow.sdk import DAG, BaseOperator, get_current_context, setup, task, task_group, teardown +from airflow.sdk import ( + DAG, + BaseOperator, + CronPartitionTimetable, + get_current_context, + setup, + task, + task_group, + teardown, +) from airflow.sdk.definitions.callback import AsyncCallback from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference, VariableInterval from airflow.serialization.definitions.deadline import SerializedReferenceModels @@ -77,6 +86,7 @@ from tests_common.test_utils import db from tests_common.test_utils.config import conf_vars from tests_common.test_utils.dag import sync_dag_to_db +from tests_common.test_utils.db import clear_db_dags, clear_db_runs from tests_common.test_utils.mapping import expand_mapped_task from tests_common.test_utils.mock_operators import MockOperator from tests_common.test_utils.taskinstance import create_task_instance, run_task_instance @@ -4474,3 +4484,392 @@ def test_stats_tags_with_none_team_name(self, dag_maker): dr._team_name = None tags = dr.stats_tags assert "team_name" not in tags + + +class TestClearPartitionRuns: + """Direct unit tests for the clear_partition_runs model-layer function.""" + + @pytest.fixture(autouse=True) + def setup_partitioned_dag(self, dag_maker): + + clear_db_runs() + clear_db_dags() + with dag_maker( + "test_cpr_dag", + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime.datetime(2026, 1, 1), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + for day in (1, 2, 3): + dag_maker.create_dagrun( + run_id=f"cpr_run_{day}", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime.datetime(2026, 1, day, tzinfo=pendulum.UTC), + partition_key=f"2026-01-0{day}T00:00:00", + ) + dag_maker.sync_dagbag_to_db() + self._serialized_dag = SerializedDagModel.get_dag("test_cpr_dag") + yield + clear_db_runs() + clear_db_dags() + + def _get_run(self, run_id: str, session) -> DagRun: + return session.scalar(select(DagRun).where(DagRun.run_id == run_id)) + + def test_selector_run_id_clears_single_run(self, session): + cleared, tis = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id="cpr_run_2", + partition_key=None, + partition_date_start=None, + partition_date_end=None, + clear_tis=False, + dry_run=False, + session=session, + ) + + assert cleared == 1 + assert tis == 0 + run = self._get_run("cpr_run_2", session) + assert run.partition_key is None + assert run.partition_date is None + # Other runs untouched. + run_1 = self._get_run("cpr_run_1", session) + assert run_1.partition_key == "2026-01-01T00:00:00" + + def test_selector_partition_key_clears_matching_run(self, session): + cleared, tis = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id=None, + partition_key="2026-01-03T00:00:00", + partition_date_start=None, + partition_date_end=None, + clear_tis=False, + dry_run=False, + session=session, + ) + + assert cleared == 1 + run = self._get_run("cpr_run_3", session) + assert run.partition_key is None + # Other runs untouched. + run_1 = self._get_run("cpr_run_1", session) + assert run_1.partition_key == "2026-01-01T00:00:00" + + def test_selector_date_window_clears_range(self, session): + cleared, tis = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2026, 1, 2, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2026, 1, 3, tzinfo=pendulum.UTC), + clear_tis=False, + dry_run=False, + session=session, + ) + + assert cleared == 2 + run_2 = self._get_run("cpr_run_2", session) + assert run_2.partition_key is None + run_3 = self._get_run("cpr_run_3", session) + assert run_3.partition_key is None + # Out-of-range run untouched. + run_1 = self._get_run("cpr_run_1", session) + assert run_1.partition_key == "2026-01-01T00:00:00" + + def test_dry_run_returns_correct_count_without_db_changes(self, session): + cleared, tis = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2026, 1, 1, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2026, 1, 3, tzinfo=pendulum.UTC), + clear_tis=False, + dry_run=True, + session=session, + ) + + assert cleared == 3 + assert tis == 0 + # DB unchanged. + for run_id, expected_key in [ + ("cpr_run_1", "2026-01-01T00:00:00"), + ("cpr_run_2", "2026-01-02T00:00:00"), + ("cpr_run_3", "2026-01-03T00:00:00"), + ]: + run = self._get_run(run_id, session) + assert run.partition_key == expected_key + + def test_cross_dag_isolation(self, dag_maker, session): + """Runs from another dag sharing the same run_id are not affected.""" + + bystander_dag_id = "test_cpr_bystander_dag" + with dag_maker( + bystander_dag_id, + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime.datetime(2026, 1, 1), + serialized=True, + ): + EmptyOperator(task_id="t1") + dag_maker.create_dagrun( + run_id="cpr_run_2", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime.datetime(2026, 1, 2, tzinfo=pendulum.UTC), + partition_key="bystander-key", + ) + dag_maker.sync_dagbag_to_db() + + cleared, _ = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id="cpr_run_2", + partition_key=None, + partition_date_start=None, + partition_date_end=None, + clear_tis=False, + dry_run=False, + session=session, + ) + + assert cleared == 1 + bystander_run = session.scalar( + select(DagRun).where(DagRun.dag_id == bystander_dag_id, DagRun.run_id == "cpr_run_2") + ) + assert bystander_run.partition_key == "bystander-key" + + def test_callback_called_once_per_matched_run_before_mutation(self, session): + calls: list[tuple[str, bool, str | None]] = [] + + def capture(run: DagRun, had_partition_fields: bool) -> None: + calls.append((run.run_id, had_partition_fields, run.partition_key)) + + clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2026, 1, 1, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2026, 1, 3, tzinfo=pendulum.UTC), + clear_tis=False, + dry_run=False, + session=session, + on_run_matched=capture, + ) + + assert len(calls) == 3 + for run_id, had_partition_fields, old_partition_key in calls: + assert had_partition_fields is True + assert old_partition_key is not None, ( + f"run {run_id}: callback saw None partition_key (mutation before callback)" + ) + + def test_callback_had_partition_fields_false_for_already_cleared_run(self, dag_maker, session): + dag_maker.create_dagrun( + run_id="cpr_already_cleared", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=None, + partition_key=None, + ) + dag_maker.sync_dagbag_to_db() + + calls: list[tuple[str, bool]] = [] + + def capture(run: DagRun, had_partition_fields: bool) -> None: + calls.append((run.run_id, had_partition_fields)) + + clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_cpr_dag", + run_id="cpr_already_cleared", + partition_key=None, + partition_date_start=None, + partition_date_end=None, + clear_tis=False, + dry_run=False, + session=session, + on_run_matched=capture, + ) + + assert calls == [("cpr_already_cleared", False)] + + def test_chunk_boundary_at_cap(self, dag_maker, session): + """2 DRs × 3 TIs = 6 TIs == chunk size → clear_task_instances called once.""" + + clear_db_runs() + clear_db_dags() + with dag_maker( + "test_cpr_chunk_at_cap", + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime.datetime(2024, 1, 1), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + EmptyOperator(task_id="t3") + for i in range(2): + dag_maker.create_dagrun( + run_id=f"cpr_cap_run_{i:04d}", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime.datetime(2024, 1, 1, tzinfo=pendulum.UTC) + + datetime.timedelta(days=i), + partition_key=f"cap-key-{i:04d}", + ) + dag_maker.sync_dagbag_to_db() + serialized_dag = SerializedDagModel.get_dag("test_cpr_chunk_at_cap") + + with ( + mock.patch("airflow.models.dagrun._TI_CHUNK_SIZE", 6), + mock.patch("airflow.models.dagrun.clear_task_instances", autospec=True) as mock_cti, + ): + cleared, tis = clear_partition_runs( + dag=serialized_dag, + dag_id="test_cpr_chunk_at_cap", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2024, 1, 1, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2025, 12, 31, tzinfo=pendulum.UTC), + clear_tis=True, + dry_run=False, + session=session, + ) + + assert cleared == 2 + assert [len(c.args[0]) for c in mock_cti.mock_calls] == [6] + + def test_chunk_boundary_over_cap(self, dag_maker, session): + """3 DRs × 3 TIs = 9 TIs > chunk size 6 → two calls: [6, 3].""" + + clear_db_runs() + clear_db_dags() + with dag_maker( + "test_cpr_chunk_over_cap", + schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC), + start_date=datetime.datetime(2024, 1, 1), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + EmptyOperator(task_id="t3") + for i in range(3): + dag_maker.create_dagrun( + run_id=f"cpr_over_run_{i:04d}", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime.datetime(2024, 1, 1, tzinfo=pendulum.UTC) + + datetime.timedelta(days=i), + partition_key=f"over-key-{i:04d}", + ) + dag_maker.sync_dagbag_to_db() + serialized_dag = SerializedDagModel.get_dag("test_cpr_chunk_over_cap") + + with ( + mock.patch("airflow.models.dagrun._TI_CHUNK_SIZE", 6), + mock.patch("airflow.models.dagrun.clear_task_instances", autospec=True) as mock_cti, + ): + clear_partition_runs( + dag=serialized_dag, + dag_id="test_cpr_chunk_over_cap", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2024, 1, 1, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2025, 12, 31, tzinfo=pendulum.UTC), + clear_tis=True, + dry_run=False, + session=session, + ) + + assert [len(c.args[0]) for c in mock_cti.mock_calls] == [6, 3] + + def test_date_window_with_dag_none_raises_value_error(self, session): + """Date-window selector requires a loaded Dag; dag=None must raise ValueError.""" + with pytest.raises(ValueError, match="date-window selector requires"): + clear_partition_runs( + dag=None, + dag_id="test_cpr_dag", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2026, 1, 1, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2026, 1, 3, tzinfo=pendulum.UTC), + clear_tis=False, + dry_run=False, + session=session, + ) + + +@pytest.mark.db_test +class TestApplyPartitionDateWindowSubDay: + """apply_partition_date_window preserves sub-day precision for hourly partitioned Dags.""" + + @pytest.fixture(autouse=True) + def setup_hourly_dag(self, dag_maker): + clear_db_runs() + clear_db_dags() + with dag_maker( + "test_subday_dag", + schedule=CronPartitionTimetable("0 * * * *", timezone=pendulum.UTC), + start_date=datetime.datetime(2026, 1, 1), + catchup=True, + serialized=True, + ): + EmptyOperator(task_id="t1") + for hour in (6, 8, 10): + dag_maker.create_dagrun( + run_id=f"subday_run_{hour:02d}", + state=DagRunState.SUCCESS, + logical_date=None, + partition_date=datetime.datetime(2026, 1, 1, hour, tzinfo=pendulum.UTC), + partition_key=f"2026-01-01T{hour:02d}:00:00", + ) + dag_maker.sync_dagbag_to_db() + self._serialized_dag = SerializedDagModel.get_dag("test_subday_dag") + yield + clear_db_runs() + clear_db_dags() + + def _get_run(self, run_id: str, session) -> DagRun: + return session.scalar(select(DagRun).where(DagRun.run_id == run_id)) + + def test_narrow_window_selects_only_matching_partition(self, session): + """A one-hour window clears exactly the matching tick, not the entire day.""" + cleared, _ = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_subday_dag", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2026, 1, 1, 8, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2026, 1, 1, 8, tzinfo=pendulum.UTC), + clear_tis=False, + dry_run=False, + session=session, + ) + assert cleared == 1 + assert self._get_run("subday_run_06", session).partition_key == "2026-01-01T06:00:00" + assert self._get_run("subday_run_08", session).partition_key is None + assert self._get_run("subday_run_10", session).partition_key == "2026-01-01T10:00:00" + + def test_multi_hour_window_selects_range(self, session): + """A window spanning 06:00–10:00 clears all three hourly partitions.""" + cleared, _ = clear_partition_runs( + dag=self._serialized_dag, + dag_id="test_subday_dag", + run_id=None, + partition_key=None, + partition_date_start=datetime.datetime(2026, 1, 1, 6, tzinfo=pendulum.UTC), + partition_date_end=datetime.datetime(2026, 1, 1, 10, tzinfo=pendulum.UTC), + clear_tis=False, + dry_run=False, + session=session, + ) + assert cleared == 3 diff --git a/airflow-core/tests/unit/timetables/test_base_timetable.py b/airflow-core/tests/unit/timetables/test_base_timetable.py index 5aaee3db06321..8eb3ca2871b33 100644 --- a/airflow-core/tests/unit/timetables/test_base_timetable.py +++ b/airflow-core/tests/unit/timetables/test_base_timetable.py @@ -16,9 +16,6 @@ # under the License. from __future__ import annotations -import datetime - -import pendulum import pytest from airflow._shared.module_loading import qualname @@ -27,7 +24,6 @@ def test_builtin_timetable_type_name_returns_class_name(): """Built-in timetables should return just the class name as type_name.""" - from airflow.timetables.simple import NullTimetable tt = NullTimetable() assert tt.type_name == "NullTimetable" @@ -58,7 +54,6 @@ class CustomTimetable(Timetable): def test_compute_rollup_fingerprint_non_partitioned_returns_empty(): """Non-partitioned timetables return an empty dict.""" from airflow.timetables.base import compute_rollup_fingerprint - from airflow.timetables.simple import NullTimetable assert compute_rollup_fingerprint(NullTimetable()) == {} @@ -181,21 +176,3 @@ def test_compute_rollup_fingerprint_key_format(asset_or_ref, expected_key): ) fp = compute_rollup_fingerprint(tt) assert expected_key in fp - - -def test_base_resolve_day_bound_returns_midnight_utc(): - """Base default returns midnight UTC as a pendulum DateTime for any calendar day.""" - tt = NullTimetable() - result = tt.resolve_day_bound(datetime.date(2026, 4, 10)) - - expected = pendulum.datetime(2026, 4, 10, 0, 0, 0, tz="UTC") - assert result == expected - assert result.timezone_name == "UTC" - - -def test_base_resolve_day_bound_is_pendulum_datetime(): - """Return value from the base default is a pendulum DateTime, not stdlib datetime.""" - tt = NullTimetable() - result = tt.resolve_day_bound(datetime.date(2026, 1, 1)) - - assert isinstance(result, pendulum.DateTime) diff --git a/airflow-core/tests/unit/timetables/test_cron_mixin.py b/airflow-core/tests/unit/timetables/test_cron_mixin.py index f7ef3cf583ce3..d8d5ff44df30b 100644 --- a/airflow-core/tests/unit/timetables/test_cron_mixin.py +++ b/airflow-core/tests/unit/timetables/test_cron_mixin.py @@ -16,10 +16,6 @@ # under the License. from __future__ import annotations -import datetime - -import pendulum - from airflow.timetables._cron import CronMixin SAMPLE_TZ = "UTC" @@ -43,29 +39,3 @@ def test_dom_and_dow_conflict(): assert "(or)" in desc assert "Every minute, on day 1 of the month" in desc assert "Every minute, only on Monday" in desc - - -def test_cron_mixin_resolve_day_bound_utc_tz(): - """UTC timetable: local midnight equals UTC midnight.""" - cm = CronMixin("0 0 * * *", "UTC") - result = cm.resolve_day_bound(datetime.date(2026, 4, 10)) - - expected = pendulum.datetime(2026, 4, 10, 0, 0, 0, tz="UTC") - assert result == expected - - -def test_cron_mixin_resolve_day_bound_utc_plus8_crosses_day(): - """UTC+8 timetable: 2026-02-19 local midnight = 2026-02-18T16:00:00Z.""" - cm = CronMixin("0 0 * * *", "Asia/Taipei") - result = cm.resolve_day_bound(datetime.date(2026, 2, 19)) - - expected = pendulum.datetime(2026, 2, 18, 16, 0, 0, tz="UTC") - assert result == expected - - -def test_cron_mixin_resolve_day_bound_is_pendulum_datetime(): - """Return value is a pendulum DateTime.""" - cm = CronMixin("0 0 * * *", "Asia/Taipei") - result = cm.resolve_day_bound(datetime.date(2026, 1, 1)) - - assert isinstance(result, pendulum.DateTime) diff --git a/airflow-core/tests/unit/timetables/test_partitioned_timetable.py b/airflow-core/tests/unit/timetables/test_partitioned_timetable.py index cb5d882179421..72dd2773bc915 100644 --- a/airflow-core/tests/unit/timetables/test_partitioned_timetable.py +++ b/airflow-core/tests/unit/timetables/test_partitioned_timetable.py @@ -17,7 +17,6 @@ from __future__ import annotations -import datetime from collections.abc import Callable, Iterable from contextlib import ExitStack from typing import TYPE_CHECKING @@ -280,19 +279,3 @@ def test_decode_partition_date_returns_period_start_for_valid_key(self): default_partition_mapper=StartOfDayMapper(), ) assert timetable._decode_partition_date("2025-01-01") == pendulum.datetime(2025, 1, 1, tz="UTC") - - def test_partitioned_asset_timetable_resolve_day_bound_returns_midnight_utc(self): - """PartitionedAssetTimetable has no local timezone; resolve_day_bound uses the base default. - - Non-regression: verifies the timetable walks the base-default code path - (midnight UTC) rather than any CronMixin path. - """ - tt = PartitionedAssetTimetable( - assets=Asset(name="asset-a"), - default_partition_mapper=IdentityMapper(), - ) - result = tt.resolve_day_bound(datetime.date(2026, 4, 10)) - - expected = pendulum.datetime(2026, 4, 10, 0, 0, 0, tz="UTC") - assert result == expected - assert result.timezone_name == "UTC" diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index cb9a1ca15a3cc..bd7ce6dd44859 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -173,6 +173,62 @@ class BulkResponse(BaseModel): ] = None +class ClearPartitionsBody(BaseModel): + """ + Request body for the clearPartitions endpoint (column-reset: set partition fields to None). + """ + + model_config = ConfigDict( + extra="forbid", + ) + partition_key: Annotated[ + str | None, + Field( + description="Select runs by exact partition key match. Mutually exclusive with the other partition selectors.", + title="Partition Key", + ), + ] = None + partition_date_start: Annotated[ + datetime | None, + Field( + description="Inclusive start of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors.", + title="Partition Date Start", + ), + ] = None + partition_date_end: Annotated[ + datetime | None, + Field( + description="Inclusive end of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors.", + title="Partition Date End", + ), + ] = None + run_id: Annotated[ + str | None, + Field( + description="Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window.", + title="Run Id", + ), + ] = None + clear_task_instances: Annotated[ + bool | None, + Field(description="Also clear task instances on the matched runs.", title="Clear Task Instances"), + ] = False + dry_run: Annotated[ + bool | None, + Field(description="If True, compute counts without writing any changes.", title="Dry Run"), + ] = True + + +class ClearPartitionsResponse(BaseModel): + """ + Response for the clearPartitions endpoint. + """ + + dag_runs_cleared: Annotated[int, Field(title="Dag Runs Cleared")] + task_instances_cleared: Annotated[int, Field(title="Task Instances Cleared")] + dry_run: Annotated[bool, Field(title="Dry Run")] + + class TaskIds(RootModel[list]): root: Annotated[list, Field(max_length=2, min_length=2)] @@ -1407,6 +1463,27 @@ class BulkDAGRunClearBody(BaseModel): model_config = ConfigDict( extra="forbid", ) + partition_key: Annotated[ + str | None, + Field( + description="Select runs by exact partition key match. Mutually exclusive with the other partition selectors.", + title="Partition Key", + ), + ] = None + partition_date_start: Annotated[ + datetime | None, + Field( + description="Inclusive start of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors.", + title="Partition Date Start", + ), + ] = None + partition_date_end: Annotated[ + datetime | None, + Field( + description="Inclusive end of the partition date window. The value is interpreted in the Dag's timetable timezone. Mutually exclusive with the other partition selectors.", + title="Partition Date End", + ), + ] = None dry_run: Annotated[bool | None, Field(title="Dry Run")] = True only_failed: Annotated[bool | None, Field(title="Only Failed")] = False only_new: Annotated[ @@ -1424,7 +1501,7 @@ class BulkDAGRunClearBody(BaseModel): ), ] = None note: Annotated[Note | None, Field(title="Note")] = None - dag_runs: Annotated[list[BulkDAGRunBody], Field(min_length=1, title="Dag Runs")] + dag_runs: Annotated[list[BulkDAGRunBody] | None, Field(title="Dag Runs")] = None class BulkDeleteActionBulkDAGRunBody(BaseModel):