Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,62 @@ class BulkDAGRunBody(StrictBaseModel):
note: str | None = Field(None, max_length=1000)


class PartitionSelectorMixin(StrictBaseModel):
"""Partition filter fields shared by bulk-clear and clearPartitions bodies."""

partition_key: str | None = Field(
default=None,
description="Select runs by exact partition key match. Mutually exclusive with the other partition selectors.",
)
partition_date_start: datetime | None = Field(
default=None,
description=(
"Inclusive start of the partition date window. "
"The value is interpreted in the Dag's timetable timezone. "
"Mutually exclusive with the other partition selectors."
),
)
partition_date_end: datetime | None = Field(
default=None,
description=(
"Inclusive end of the partition date window. "
"The value is interpreted in the Dag's timetable timezone. "
"Mutually exclusive with the other partition selectors."
),
)

@property
def has_partition_selectors(self) -> bool:
return (
self.partition_key is not None
or self.partition_date_start is not None
or self.partition_date_end is not None
)

def _validate_partition_date_window_order(self) -> None:
if (
self.partition_date_start is not None
and self.partition_date_end is not None
and self.partition_date_start > self.partition_date_end
):
raise ValueError("partition_date_start must be on or before partition_date_end.")

def _check_exactly_one_selection_mode(
self, *, extra_selector_active: bool, extra_selector_name: str
) -> None:
has_partition_key = self.partition_key is not None
has_partition_date_window = (
self.partition_date_start is not None or self.partition_date_end is not None
)
modes_active = sum([extra_selector_active, has_partition_key, has_partition_date_window])
if modes_active != 1:
raise ValueError(
f"Exactly one of {extra_selector_name}, partition_key, or a partition date window "
"(partition_date_start / partition_date_end) must be provided."
)
self._validate_partition_date_window_order()


class BaseDAGRunClear(StrictBaseModel):
"""Shared options for the single-run and bulk Dag Run clear endpoints."""

Expand Down Expand Up @@ -89,10 +145,18 @@ class DAGRunClearBody(BaseDAGRunClear):
"""Dag Run serializer for clear endpoint body."""


class BulkDAGRunClearBody(BaseDAGRunClear):
class BulkDAGRunClearBody(BaseDAGRunClear, PartitionSelectorMixin):
"""Request body for the bulk clear Dag Runs endpoint."""

dag_runs: list[BulkDAGRunBody] = Field(min_length=1)
dag_runs: list[BulkDAGRunBody] = Field(default_factory=list)

@model_validator(mode="after")
def validate_exactly_one_selection_mode(self) -> BulkDAGRunClearBody:
self._check_exactly_one_selection_mode(
extra_selector_active=bool(self.dag_runs),
extra_selector_name="dag_runs (non-empty)",
)
return self


class DAGRunResponse(BaseModel):
Expand Down Expand Up @@ -242,3 +306,36 @@ class DAGRunsBatchBody(StrictBaseModel):
duration_lt: float | None = None

conf_contains: str | None = None


class ClearPartitionsBody(PartitionSelectorMixin):
"""Request body for the clearPartitions endpoint (column-reset: set partition fields to None)."""

run_id: str | None = Field(
default=None,
description="Select runs by exact run_id. Mutually exclusive with ``partition_key`` and partition date window.",
)
clear_task_instances: bool = Field(
default=False,
description="Also clear task instances on the matched runs.",
)
dry_run: bool = Field(
default=True,
description="If True, compute counts without writing any changes.",
)

@model_validator(mode="after")
def validate_exactly_one_selector(self) -> ClearPartitionsBody:
self._check_exactly_one_selection_mode(
extra_selector_active=self.run_id is not None,
extra_selector_name="run_id",
)
return self
Comment thread
Lee-W marked this conversation as resolved.


class ClearPartitionsResponse(BaseModel):
"""Response for the clearPartitions endpoint."""

dag_runs_cleared: int
task_instances_cleared: int
dry_run: bool
Original file line number Diff line number Diff line change
Expand Up @@ -3143,6 +3143,66 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/dags/{dag_id}/clearPartitions:
post:
tags:
- DagRun
summary: Clear Dag Run Partitions
description: Reset partition_key and partition_date fields on matching Dag Runs.
operationId: clear_dag_run_partitions
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ClearPartitionsBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/ClearPartitionsResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/dagSources/{dag_id}:
get:
tags:
Expand Down Expand Up @@ -12170,6 +12230,31 @@ components:
description: Request body for bulk operations on Dag Runs.
BulkDAGRunClearBody:
properties:
partition_key:
anyOf:
- type: string
- type: 'null'
title: Partition Key
description: Select runs by exact partition key match. Mutually exclusive
with the other partition selectors.
partition_date_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Partition Date Start
description: Inclusive start of the partition date window. The value is
interpreted in the Dag's timetable timezone. Mutually exclusive with the
other partition selectors.
partition_date_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Partition Date End
description: Inclusive end of the partition date window. The value is interpreted
in the Dag's timetable timezone. Mutually exclusive with the other partition
selectors.
dry_run:
type: boolean
title: Dry Run
Expand Down Expand Up @@ -12203,12 +12288,9 @@ components:
items:
$ref: '#/components/schemas/BulkDAGRunBody'
type: array
minItems: 1
title: Dag Runs
additionalProperties: false
type: object
required:
- dag_runs
title: BulkDAGRunClearBody
description: Request body for the bulk clear Dag Runs endpoint.
BulkDeleteAction_BulkDAGRunBody_:
Expand Down Expand Up @@ -12576,6 +12658,73 @@ components:
- action
- entities
title: BulkUpdateAction[VariableBody]
ClearPartitionsBody:
properties:
partition_key:
anyOf:
- type: string
- type: 'null'
title: Partition Key
description: Select runs by exact partition key match. Mutually exclusive
with the other partition selectors.
partition_date_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Partition Date Start
description: Inclusive start of the partition date window. The value is
interpreted in the Dag's timetable timezone. Mutually exclusive with the
other partition selectors.
partition_date_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Partition Date End
description: Inclusive end of the partition date window. The value is interpreted
in the Dag's timetable timezone. Mutually exclusive with the other partition
selectors.
run_id:
anyOf:
- type: string
- type: 'null'
title: Run Id
description: Select runs by exact run_id. Mutually exclusive with ``partition_key``
and partition date window.
clear_task_instances:
type: boolean
title: Clear Task Instances
description: Also clear task instances on the matched runs.
default: false
dry_run:
type: boolean
title: Dry Run
description: If True, compute counts without writing any changes.
default: true
additionalProperties: false
type: object
title: ClearPartitionsBody
description: 'Request body for the clearPartitions endpoint (column-reset: set
partition fields to None).'
ClearPartitionsResponse:
properties:
dag_runs_cleared:
type: integer
title: Dag Runs Cleared
task_instances_cleared:
type: integer
title: Task Instances Cleared
dry_run:
type: boolean
title: Dry Run
type: object
required:
- dag_runs_cleared
- task_instances_cleared
- dry_run
title: ClearPartitionsResponse
description: Response for the clearPartitions endpoint.
ClearTaskInstanceCollectionResponse:
properties:
task_instances:
Expand Down
Loading