Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.models.backfill import ReprocessBehavior
from airflow.utils.state import DagRunState


class BackfillPostBody(StrictBaseModel):
Expand Down Expand Up @@ -69,6 +70,27 @@ class BackfillCollectionResponse(BaseModel):
total_entries: int


class BackfillDagRunResponse(BaseModel):
"""Serializer for a single BackfillDagRun entry with joined DagRun state."""

id: NonNegativeInt
backfill_id: NonNegativeInt
dag_run_id: NonNegativeInt | None
logical_date: datetime | None
partition_key: str | None
sort_ordinal: int
exception_reason: str | None
dag_run_state: DagRunState | None = Field(default=None, validation_alias=AliasPath("dag_run", "state"))
dag_run_run_id: str | None = Field(default=None, validation_alias=AliasPath("dag_run", "run_id"))


class BackfillDagRunCollectionResponse(BaseModel):
"""BackfillDagRun Collection serializer for responses."""

backfill_dag_runs: list[BackfillDagRunResponse]
total_entries: int


class DryRunBackfillResponse(BaseModel):
"""Backfill serializer for responses in dry-run mode."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,84 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/backfills/{backfill_id}/dag_runs:
get:
tags:
- Backfill
summary: List Backfill Dag Runs
operationId: list_backfill_dag_runs
security:
- OAuth2PasswordBearer: []
- HTTPBearer: []
parameters:
- name: backfill_id
in: path
required: true
schema:
type: integer
minimum: 0
title: Backfill Id
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 50
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: array
items:
type: string
description: 'Attributes to order by, multi criteria sort is supported.
Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal`'
default:
- sort_ordinal
title: Order By
description: 'Attributes to order by, multi criteria sort is supported. Prefix
with `-` for descending order. Supported attributes: `id, sort_ordinal`'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/BackfillDagRunCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/backfills/{backfill_id}/pause:
put:
tags:
Expand Down Expand Up @@ -11439,6 +11517,78 @@ components:
- total_entries
title: BackfillCollectionResponse
description: Backfill Collection serializer for responses.
BackfillDagRunCollectionResponse:
properties:
backfill_dag_runs:
items:
$ref: '#/components/schemas/BackfillDagRunResponse'
type: array
title: Backfill Dag Runs
total_entries:
type: integer
title: Total Entries
type: object
required:
- backfill_dag_runs
- total_entries
title: BackfillDagRunCollectionResponse
description: BackfillDagRun Collection serializer for responses.
BackfillDagRunResponse:
properties:
id:
type: integer
minimum: 0.0
title: Id
backfill_id:
type: integer
minimum: 0.0
title: Backfill Id
dag_run_id:
anyOf:
- type: integer
minimum: 0.0
- type: 'null'
title: Dag Run Id
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
partition_key:
anyOf:
- type: string
- type: 'null'
title: Partition Key
sort_ordinal:
type: integer
title: Sort Ordinal
exception_reason:
anyOf:
- type: string
- type: 'null'
title: Exception Reason
dag_run_state:
anyOf:
- $ref: '#/components/schemas/DagRunState'
- type: 'null'
dag_run_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Run Id
type: object
required:
- id
- backfill_id
- dag_run_id
- logical_date
- partition_key
- sort_ordinal
- exception_reason
title: BackfillDagRunResponse
description: Serializer for a single BackfillDagRun entry with joined DagRun
state.
BackfillPostBody:
properties:
dag_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.backfills import (
BackfillCollectionResponse,
BackfillDagRunCollectionResponse,
BackfillPostBody,
BackfillResponse,
DryRunBackfillCollectionResponse,
Expand Down Expand Up @@ -112,6 +113,43 @@ def get_backfill(
raise HTTPException(status.HTTP_404_NOT_FOUND, "Backfill not found")


@backfills_router.get(
path="/{backfill_id}/dag_runs",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[
Depends(requires_access_backfill(method="GET")),
],
)
def list_backfill_dag_runs(
backfill_id: NonNegativeInt,
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(SortParam(["id", "sort_ordinal"], BackfillDagRun).dynamic_depends(default="sort_ordinal")),
],
session: SessionDep,
) -> BackfillDagRunCollectionResponse:
"""List Dag runs associated with a backfill, including skipped slots."""
backfill = session.get(Backfill, backfill_id)
if not backfill:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Backfill with id {backfill_id} not found")

select_stmt, total_entries = paginated_select(
statement=select(BackfillDagRun)
.where(BackfillDagRun.backfill_id == backfill_id)
.options(joinedload(BackfillDagRun.dag_run)),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joinedload issues a LEFT OUTER JOIN, preserving rows where dag_run_id is NULL (skipped slots).

order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
return BackfillDagRunCollectionResponse(
backfill_dag_runs=session.scalars(select_stmt).unique(),
total_entries=total_entries,
)


@backfills_router.put(
path="/{backfill_id}/pause",
responses=create_openapi_http_exception_doc(
Expand Down
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ export const useBackfillServiceGetBackfillKey = "BackfillServiceGetBackfill";
export const UseBackfillServiceGetBackfillKeyFn = ({ backfillId }: {
backfillId: number;
}, queryKey?: Array<unknown>) => [useBackfillServiceGetBackfillKey, ...(queryKey ?? [{ backfillId }])];
export type BackfillServiceListBackfillDagRunsDefaultResponse = Awaited<ReturnType<typeof BackfillService.listBackfillDagRuns>>;
export type BackfillServiceListBackfillDagRunsQueryResult<TData = BackfillServiceListBackfillDagRunsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useBackfillServiceListBackfillDagRunsKey = "BackfillServiceListBackfillDagRuns";
export const UseBackfillServiceListBackfillDagRunsKeyFn = ({ backfillId, limit, offset, orderBy }: {
backfillId: number;
limit?: number;
offset?: number;
orderBy?: string[];
}, queryKey?: Array<unknown>) => [useBackfillServiceListBackfillDagRunsKey, ...(queryKey ?? [{ backfillId, limit, offset, orderBy }])];
export type BackfillServiceListBackfillsUiDefaultResponse = Awaited<ReturnType<typeof BackfillService.listBackfillsUi>>;
export type BackfillServiceListBackfillsUiQueryResult<TData = BackfillServiceListBackfillsUiDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useBackfillServiceListBackfillsUiKey = "BackfillServiceListBackfillsUi";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ export const ensureUseBackfillServiceGetBackfillData = (queryClient: QueryClient
backfillId: number;
}) => queryClient.ensureQueryData({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }), queryFn: () => BackfillService.getBackfill({ backfillId }) });
/**
* List Backfill Dag Runs
* @param data The data for the request.
* @param data.backfillId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal`
* @returns BackfillDagRunCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseBackfillServiceListBackfillDagRunsData = (queryClient: QueryClient, { backfillId, limit, offset, orderBy }: {
backfillId: number;
limit?: number;
offset?: number;
orderBy?: string[];
}) => queryClient.ensureQueryData({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) });
/**
* List Backfills Ui
* @param data The data for the request.
* @param data.limit
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ export const prefetchUseBackfillServiceGetBackfill = (queryClient: QueryClient,
backfillId: number;
}) => queryClient.prefetchQuery({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }), queryFn: () => BackfillService.getBackfill({ backfillId }) });
/**
* List Backfill Dag Runs
* @param data The data for the request.
* @param data.backfillId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal`
* @returns BackfillDagRunCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseBackfillServiceListBackfillDagRuns = (queryClient: QueryClient, { backfillId, limit, offset, orderBy }: {
backfillId: number;
limit?: number;
offset?: number;
orderBy?: string[];
}) => queryClient.prefetchQuery({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) });
/**
* List Backfills Ui
* @param data The data for the request.
* @param data.limit
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ export const useBackfillServiceGetBackfill = <TData = Common.BackfillServiceGetB
backfillId: number;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }, queryKey), queryFn: () => BackfillService.getBackfill({ backfillId }) as TData, ...options });
/**
* List Backfill Dag Runs
* @param data The data for the request.
* @param data.backfillId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal`
* @returns BackfillDagRunCollectionResponse Successful Response
* @throws ApiError
*/
export const useBackfillServiceListBackfillDagRuns = <TData = Common.BackfillServiceListBackfillDagRunsDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ backfillId, limit, offset, orderBy }: {
backfillId: number;
limit?: number;
offset?: number;
orderBy?: string[];
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }, queryKey), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) as TData, ...options });
/**
* List Backfills Ui
* @param data The data for the request.
* @param data.limit
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ export const useBackfillServiceGetBackfillSuspense = <TData = Common.BackfillSer
backfillId: number;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }, queryKey), queryFn: () => BackfillService.getBackfill({ backfillId }) as TData, ...options });
/**
* List Backfill Dag Runs
* @param data The data for the request.
* @param data.backfillId
* @param data.limit
* @param data.offset
* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal`
* @returns BackfillDagRunCollectionResponse Successful Response
* @throws ApiError
*/
export const useBackfillServiceListBackfillDagRunsSuspense = <TData = Common.BackfillServiceListBackfillDagRunsDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ backfillId, limit, offset, orderBy }: {
backfillId: number;
limit?: number;
offset?: number;
orderBy?: string[];
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }, queryKey), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) as TData, ...options });
/**
* List Backfills Ui
* @param data The data for the request.
* @param data.limit
Expand Down
Loading
Loading