From 376f5f553657ba1fbba9f07df32b3db2cf606515 Mon Sep 17 00:00:00 2001 From: Shivam Rastogi <6463385+shivaam@users.noreply.github.com> Date: Sat, 23 May 2026 13:42:05 -0400 Subject: [PATCH] Add GET /backfills/{backfill_id}/dag_runs endpoint Adds a new public API endpoint that returns the BackfillDagRun entries for a given backfill with joined DagRun state. Users can see what happened in a backfill: which dates ran, their states (queued, running, success, failed), and which slots were skipped (with reason). - BackfillDagRunResponse / BackfillDagRunCollectionResponse models - LEFT OUTER JOIN via joinedload includes skipped slots (null dag_run_id) - Pagination via limit/offset, default ordering by sort_ordinal - 404 when backfill doesn't exist - 8 unit tests covering happy path, skipped slots, 404, pagination, empty backfill, and ordering contract closes: #46250 --- .../core_api/datamodels/backfills.py | 22 +++ .../openapi/v2-rest-api-generated.yaml | 150 ++++++++++++++++ .../core_api/routes/public/backfills.py | 38 ++++ .../airflow/ui/openapi-gen/queries/common.ts | 9 + .../ui/openapi-gen/queries/ensureQueryData.ts | 16 ++ .../ui/openapi-gen/queries/prefetch.ts | 16 ++ .../airflow/ui/openapi-gen/queries/queries.ts | 16 ++ .../ui/openapi-gen/queries/suspense.ts | 16 ++ .../ui/openapi-gen/requests/schemas.gen.ts | 110 ++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 33 +++- .../ui/openapi-gen/requests/types.gen.ts | 62 +++++++ .../core_api/routes/public/test_backfills.py | 163 ++++++++++++++++++ .../airflowctl/api/datamodels/generated.py | 29 ++++ 13 files changed, 679 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py index 52538b37e16de..76c7137499cc2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -24,6 +24,7 @@ from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel from airflow.models.backfill import ReprocessBehavior +from airflow.utils.state import DagRunState class BackfillPostBody(StrictBaseModel): @@ -69,6 +70,27 @@ class BackfillCollectionResponse(BaseModel): total_entries: int +class BackfillDagRunResponse(BaseModel): + """Serializer for a single BackfillDagRun entry with joined DagRun state.""" + + id: NonNegativeInt + backfill_id: NonNegativeInt + dag_run_id: NonNegativeInt | None + logical_date: datetime | None + partition_key: str | None + sort_ordinal: int + exception_reason: str | None + dag_run_state: DagRunState | None = Field(default=None, validation_alias=AliasPath("dag_run", "state")) + dag_run_run_id: str | None = Field(default=None, validation_alias=AliasPath("dag_run", "run_id")) + + +class BackfillDagRunCollectionResponse(BaseModel): + """BackfillDagRun Collection serializer for responses.""" + + backfill_dag_runs: list[BackfillDagRunResponse] + total_entries: int + + class DryRunBackfillResponse(BaseModel): """Backfill serializer for responses in dry-run mode.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 23f2622b13b3f..de8290791c305 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -1215,6 +1215,84 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /api/v2/backfills/{backfill_id}/dag_runs: + get: + tags: + - Backfill + summary: List Backfill Dag Runs + operationId: list_backfill_dag_runs + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: backfill_id + in: path + required: true + schema: + type: integer + minimum: 0 + title: Backfill Id + - name: limit + in: query + required: false + schema: + type: integer + minimum: 0 + default: 50 + title: Limit + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + default: 0 + title: Offset + - name: order_by + in: query + required: false + schema: + type: array + items: + type: string + description: 'Attributes to order by, multi criteria sort is supported. + Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal`' + default: + - sort_ordinal + title: Order By + description: 'Attributes to order by, multi criteria sort is supported. Prefix + with `-` for descending order. Supported attributes: `id, sort_ordinal`' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/BackfillDagRunCollectionResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/backfills/{backfill_id}/pause: put: tags: @@ -11439,6 +11517,78 @@ components: - total_entries title: BackfillCollectionResponse description: Backfill Collection serializer for responses. + BackfillDagRunCollectionResponse: + properties: + backfill_dag_runs: + items: + $ref: '#/components/schemas/BackfillDagRunResponse' + type: array + title: Backfill Dag Runs + total_entries: + type: integer + title: Total Entries + type: object + required: + - backfill_dag_runs + - total_entries + title: BackfillDagRunCollectionResponse + description: BackfillDagRun Collection serializer for responses. + BackfillDagRunResponse: + properties: + id: + type: integer + minimum: 0.0 + title: Id + backfill_id: + type: integer + minimum: 0.0 + title: Backfill Id + dag_run_id: + anyOf: + - type: integer + minimum: 0.0 + - type: 'null' + title: Dag Run Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + partition_key: + anyOf: + - type: string + - type: 'null' + title: Partition Key + sort_ordinal: + type: integer + title: Sort Ordinal + exception_reason: + anyOf: + - type: string + - type: 'null' + title: Exception Reason + dag_run_state: + anyOf: + - $ref: '#/components/schemas/DagRunState' + - type: 'null' + dag_run_run_id: + anyOf: + - type: string + - type: 'null' + title: Dag Run Run Id + type: object + required: + - id + - backfill_id + - dag_run_id + - logical_date + - partition_key + - sort_ordinal + - exception_reason + title: BackfillDagRunResponse + description: Serializer for a single BackfillDagRun entry with joined DagRun + state. BackfillPostBody: properties: dag_id: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py index 4e4bc90dceb97..98bed78da8533 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -33,6 +33,7 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.backfills import ( BackfillCollectionResponse, + BackfillDagRunCollectionResponse, BackfillPostBody, BackfillResponse, DryRunBackfillCollectionResponse, @@ -112,6 +113,43 @@ def get_backfill( raise HTTPException(status.HTTP_404_NOT_FOUND, "Backfill not found") +@backfills_router.get( + path="/{backfill_id}/dag_runs", + responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[ + Depends(requires_access_backfill(method="GET")), + ], +) +def list_backfill_dag_runs( + backfill_id: NonNegativeInt, + limit: QueryLimit, + offset: QueryOffset, + order_by: Annotated[ + SortParam, + Depends(SortParam(["id", "sort_ordinal"], BackfillDagRun).dynamic_depends(default="sort_ordinal")), + ], + session: SessionDep, +) -> BackfillDagRunCollectionResponse: + """List Dag runs associated with a backfill, including skipped slots.""" + backfill = session.get(Backfill, backfill_id) + if not backfill: + raise HTTPException(status.HTTP_404_NOT_FOUND, f"Backfill with id {backfill_id} not found") + + select_stmt, total_entries = paginated_select( + statement=select(BackfillDagRun) + .where(BackfillDagRun.backfill_id == backfill_id) + .options(joinedload(BackfillDagRun.dag_run)), + order_by=order_by, + offset=offset, + limit=limit, + session=session, + ) + return BackfillDagRunCollectionResponse( + backfill_dag_runs=session.scalars(select_stmt).unique(), + total_entries=total_entries, + ) + + @backfills_router.put( path="/{backfill_id}/pause", responses=create_openapi_http_exception_doc( diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index f8e3dbe9af638..6e9eb65500563 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -101,6 +101,15 @@ export const useBackfillServiceGetBackfillKey = "BackfillServiceGetBackfill"; export const UseBackfillServiceGetBackfillKeyFn = ({ backfillId }: { backfillId: number; }, queryKey?: Array) => [useBackfillServiceGetBackfillKey, ...(queryKey ?? [{ backfillId }])]; +export type BackfillServiceListBackfillDagRunsDefaultResponse = Awaited>; +export type BackfillServiceListBackfillDagRunsQueryResult = UseQueryResult; +export const useBackfillServiceListBackfillDagRunsKey = "BackfillServiceListBackfillDagRuns"; +export const UseBackfillServiceListBackfillDagRunsKeyFn = ({ backfillId, limit, offset, orderBy }: { + backfillId: number; + limit?: number; + offset?: number; + orderBy?: string[]; +}, queryKey?: Array) => [useBackfillServiceListBackfillDagRunsKey, ...(queryKey ?? [{ backfillId, limit, offset, orderBy }])]; export type BackfillServiceListBackfillsUiDefaultResponse = Awaited>; export type BackfillServiceListBackfillsUiQueryResult = UseQueryResult; export const useBackfillServiceListBackfillsUiKey = "BackfillServiceListBackfillsUi"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index f124d321a1f88..329a14d3f3fd1 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -195,6 +195,22 @@ export const ensureUseBackfillServiceGetBackfillData = (queryClient: QueryClient backfillId: number; }) => queryClient.ensureQueryData({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }), queryFn: () => BackfillService.getBackfill({ backfillId }) }); /** +* List Backfill Dag Runs +* @param data The data for the request. +* @param data.backfillId +* @param data.limit +* @param data.offset +* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal` +* @returns BackfillDagRunCollectionResponse Successful Response +* @throws ApiError +*/ +export const ensureUseBackfillServiceListBackfillDagRunsData = (queryClient: QueryClient, { backfillId, limit, offset, orderBy }: { + backfillId: number; + limit?: number; + offset?: number; + orderBy?: string[]; +}) => queryClient.ensureQueryData({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) }); +/** * List Backfills Ui * @param data The data for the request. * @param data.limit diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 5bdcfe667b228..ce0eb5c4d7432 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -195,6 +195,22 @@ export const prefetchUseBackfillServiceGetBackfill = (queryClient: QueryClient, backfillId: number; }) => queryClient.prefetchQuery({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }), queryFn: () => BackfillService.getBackfill({ backfillId }) }); /** +* List Backfill Dag Runs +* @param data The data for the request. +* @param data.backfillId +* @param data.limit +* @param data.offset +* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal` +* @returns BackfillDagRunCollectionResponse Successful Response +* @throws ApiError +*/ +export const prefetchUseBackfillServiceListBackfillDagRuns = (queryClient: QueryClient, { backfillId, limit, offset, orderBy }: { + backfillId: number; + limit?: number; + offset?: number; + orderBy?: string[]; +}) => queryClient.prefetchQuery({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) }); +/** * List Backfills Ui * @param data The data for the request. * @param data.limit diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 8c0976ec328e9..c5a9ff30a2bee 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -195,6 +195,22 @@ export const useBackfillServiceGetBackfill = , "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }, queryKey), queryFn: () => BackfillService.getBackfill({ backfillId }) as TData, ...options }); /** +* List Backfill Dag Runs +* @param data The data for the request. +* @param data.backfillId +* @param data.limit +* @param data.offset +* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal` +* @returns BackfillDagRunCollectionResponse Successful Response +* @throws ApiError +*/ +export const useBackfillServiceListBackfillDagRuns = = unknown[]>({ backfillId, limit, offset, orderBy }: { + backfillId: number; + limit?: number; + offset?: number; + orderBy?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }, queryKey), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) as TData, ...options }); +/** * List Backfills Ui * @param data The data for the request. * @param data.limit diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index e11772395f0f6..13b58bd7fdc91 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -195,6 +195,22 @@ export const useBackfillServiceGetBackfillSuspense = , "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseBackfillServiceGetBackfillKeyFn({ backfillId }, queryKey), queryFn: () => BackfillService.getBackfill({ backfillId }) as TData, ...options }); /** +* List Backfill Dag Runs +* @param data The data for the request. +* @param data.backfillId +* @param data.limit +* @param data.offset +* @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal` +* @returns BackfillDagRunCollectionResponse Successful Response +* @throws ApiError +*/ +export const useBackfillServiceListBackfillDagRunsSuspense = = unknown[]>({ backfillId, limit, offset, orderBy }: { + backfillId: number; + limit?: number; + offset?: number; + orderBy?: string[]; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseBackfillServiceListBackfillDagRunsKeyFn({ backfillId, limit, offset, orderBy }, queryKey), queryFn: () => BackfillService.listBackfillDagRuns({ backfillId, limit, offset, orderBy }) as TData, ...options }); +/** * List Backfills Ui * @param data The data for the request. * @param data.limit diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 4a4b95f183023..822c492152922 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -480,6 +480,116 @@ export const $BackfillCollectionResponse = { description: 'Backfill Collection serializer for responses.' } as const; +export const $BackfillDagRunCollectionResponse = { + properties: { + backfill_dag_runs: { + items: { + '$ref': '#/components/schemas/BackfillDagRunResponse' + }, + type: 'array', + title: 'Backfill Dag Runs' + }, + total_entries: { + type: 'integer', + title: 'Total Entries' + } + }, + type: 'object', + required: ['backfill_dag_runs', 'total_entries'], + title: 'BackfillDagRunCollectionResponse', + description: 'BackfillDagRun Collection serializer for responses.' +} as const; + +export const $BackfillDagRunResponse = { + properties: { + id: { + type: 'integer', + minimum: 0, + title: 'Id' + }, + backfill_id: { + type: 'integer', + minimum: 0, + title: 'Backfill Id' + }, + dag_run_id: { + anyOf: [ + { + type: 'integer', + minimum: 0 + }, + { + type: 'null' + } + ], + title: 'Dag Run Id' + }, + logical_date: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Logical Date' + }, + partition_key: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Partition Key' + }, + sort_ordinal: { + type: 'integer', + title: 'Sort Ordinal' + }, + exception_reason: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Exception Reason' + }, + dag_run_state: { + anyOf: [ + { + '$ref': '#/components/schemas/DagRunState' + }, + { + type: 'null' + } + ] + }, + dag_run_run_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Run Run Id' + } + }, + type: 'object', + required: ['id', 'backfill_id', 'dag_run_id', 'logical_date', 'partition_key', 'sort_ordinal', 'exception_reason'], + title: 'BackfillDagRunResponse', + description: 'Serializer for a single BackfillDagRun entry with joined DagRun state.' +} as const; + export const $BackfillPostBody = { properties: { dag_id: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 7546818497787..be9666e5551a6 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, ListBackfillDagRunsData, ListBackfillDagRunsResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, BulkDagRunsData, BulkDagRunsResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagRunStatsData, GetDagRunStatsResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskGroupInstancesData, PatchTaskGroupInstancesResponse, PatchTaskGroupInstancesDryRunData, PatchTaskGroupInstancesDryRunResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailTryDetailData, GetHitlDetailTryDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, ListAssetStatesData, ListAssetStatesResponse, ClearAssetStateData, ClearAssetStateResponse, GetAssetStateData, GetAssetStateResponse, SetAssetStateData, SetAssetStateResponse, DeleteAssetStateData, DeleteAssetStateResponse, ListTaskStatesData, ListTaskStatesResponse, ClearTaskStateData, ClearTaskStateResponse, GetTaskStateData, GetTaskStateResponse, SetTaskStateData, SetTaskStateResponse, DeleteTaskStateData, DeleteTaskStateResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GenerateTokenData, GenerateTokenResponse2, GetPartitionedDagRunsData, GetPartitionedDagRunsResponse, GetPendingPartitionedDagRunData, GetPendingPartitionedDagRunResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, GetDeadlinesData, GetDeadlinesResponse, GetDagDeadlineAlertsData, GetDagDeadlineAlertsResponse, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesStreamData, GetGridTiSummariesStreamResponse, GetGanttDataData, GetGanttDataResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -505,6 +505,37 @@ export class BackfillService { }); } + /** + * List Backfill Dag Runs + * @param data The data for the request. + * @param data.backfillId + * @param data.limit + * @param data.offset + * @param data.orderBy Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal` + * @returns BackfillDagRunCollectionResponse Successful Response + * @throws ApiError + */ + public static listBackfillDagRuns(data: ListBackfillDagRunsData): CancelablePromise { + return __request(OpenAPI, { + method: 'GET', + url: '/api/v2/backfills/{backfill_id}/dag_runs', + path: { + backfill_id: data.backfillId + }, + query: { + limit: data.limit, + offset: data.offset, + order_by: data.orderBy + }, + errors: { + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Pause Backfill * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 70c9fa131c191..e4ffd96da9c2b 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -137,6 +137,29 @@ export type BackfillCollectionResponse = { total_entries: number; }; +/** + * BackfillDagRun Collection serializer for responses. + */ +export type BackfillDagRunCollectionResponse = { + backfill_dag_runs: Array; + total_entries: number; +}; + +/** + * Serializer for a single BackfillDagRun entry with joined DagRun state. + */ +export type BackfillDagRunResponse = { + id: number; + backfill_id: number; + dag_run_id: number | null; + logical_date: string | null; + partition_key: string | null; + sort_ordinal: number; + exception_reason: string | null; + dag_run_state?: DagRunState | null; + dag_run_run_id?: string | null; +}; + /** * Object used for create backfill request. */ @@ -2675,6 +2698,18 @@ export type GetBackfillData = { export type GetBackfillResponse = BackfillResponse; +export type ListBackfillDagRunsData = { + backfillId: number; + limit?: number; + offset?: number; + /** + * Attributes to order by, multi criteria sort is supported. Prefix with `-` for descending order. Supported attributes: `id, sort_ordinal` + */ + orderBy?: Array<(string)>; +}; + +export type ListBackfillDagRunsResponse = BackfillDagRunCollectionResponse; + export type PauseBackfillData = { backfillId: number; }; @@ -4827,6 +4862,33 @@ export type $OpenApiTs = { }; }; }; + '/api/v2/backfills/{backfill_id}/dag_runs': { + get: { + req: ListBackfillDagRunsData; + res: { + /** + * Successful Response + */ + 200: BackfillDagRunCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; '/api/v2/backfills/{backfill_id}/pause': { put: { req: PauseBackfillData; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py index 33ff158aa2848..ab4eee857b9e9 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py @@ -204,6 +204,169 @@ def test_invalid_id(self, test_client): ) +class TestListBackfillDagRuns(TestBackfillEndpoint): + def test_list_backfill_dag_runs(self, test_client, session): + """Happy path: backfill with mixed dag run states.""" + (dag,) = self._create_dag_models() + from_date = pendulum.parse("2024-01-01") + to_date = pendulum.parse("2024-01-03") + b = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(b) + session.flush() + + dr1 = DagRun( + dag_id=dag.dag_id, + run_id="backfill__2024-01-01", + logical_date=pendulum.parse("2024-01-01"), + state=DagRunState.SUCCESS, + run_type="scheduled", + ) + dr2 = DagRun( + dag_id=dag.dag_id, + run_id="backfill__2024-01-02", + logical_date=pendulum.parse("2024-01-02"), + state=DagRunState.FAILED, + run_type="scheduled", + ) + session.add_all([dr1, dr2]) + session.flush() + + bdr1 = BackfillDagRun( + backfill_id=b.id, dag_run_id=dr1.id, logical_date=pendulum.parse("2024-01-01"), sort_ordinal=1 + ) + bdr2 = BackfillDagRun( + backfill_id=b.id, dag_run_id=dr2.id, logical_date=pendulum.parse("2024-01-02"), sort_ordinal=2 + ) + session.add_all([bdr1, bdr2]) + session.commit() + + response = test_client.get(f"/backfills/{b.id}/dag_runs") + assert response.status_code == 200 + data = response.json() + assert data["total_entries"] == 2 + runs = data["backfill_dag_runs"] + assert len(runs) == 2 + assert runs[0]["sort_ordinal"] == 1 + assert runs[0]["dag_run_state"] == "success" + assert runs[0]["dag_run_run_id"] == "backfill__2024-01-01" + assert runs[1]["sort_ordinal"] == 2 + assert runs[1]["dag_run_state"] == "failed" + + def test_list_backfill_dag_runs_with_skipped_slots(self, test_client, session): + """Slots skipped due to existing runs have null dag_run_id and exception_reason set.""" + (dag,) = self._create_dag_models() + b = Backfill( + dag_id=dag.dag_id, from_date=pendulum.parse("2024-01-01"), to_date=pendulum.parse("2024-01-02") + ) + session.add(b) + session.flush() + + bdr = BackfillDagRun( + backfill_id=b.id, + dag_run_id=None, + logical_date=pendulum.parse("2024-01-01"), + sort_ordinal=1, + exception_reason="already exists", + ) + session.add(bdr) + session.commit() + + response = test_client.get(f"/backfills/{b.id}/dag_runs") + assert response.status_code == 200 + data = response.json() + assert data["total_entries"] == 1 + run = data["backfill_dag_runs"][0] + assert run["dag_run_id"] is None + assert run["dag_run_state"] is None + assert run["exception_reason"] == "already exists" + + def test_list_backfill_dag_runs_not_found(self, test_client): + """Non-existent backfill returns 404.""" + response = test_client.get("/backfills/999999/dag_runs") + assert response.status_code == 404 + + def test_list_backfill_dag_runs_pagination(self, test_client, session): + """Limit and offset work correctly.""" + (dag,) = self._create_dag_models() + b = Backfill( + dag_id=dag.dag_id, from_date=pendulum.parse("2024-01-01"), to_date=pendulum.parse("2024-01-05") + ) + session.add(b) + session.flush() + + for i in range(1, 4): + session.add( + BackfillDagRun( + backfill_id=b.id, + dag_run_id=None, + logical_date=pendulum.parse(f"2024-01-0{i}"), + sort_ordinal=i, + exception_reason="already exists", + ) + ) + session.commit() + + response = test_client.get(f"/backfills/{b.id}/dag_runs?limit=2&offset=0") + assert response.status_code == 200 + data = response.json() + assert data["total_entries"] == 3 + assert len(data["backfill_dag_runs"]) == 2 + + response = test_client.get(f"/backfills/{b.id}/dag_runs?limit=2&offset=2") + assert response.status_code == 200 + data = response.json() + assert len(data["backfill_dag_runs"]) == 1 + + def test_list_backfill_dag_runs_empty(self, test_client, session): + """Backfill with no dag runs returns empty list.""" + (dag,) = self._create_dag_models() + b = Backfill( + dag_id=dag.dag_id, from_date=pendulum.parse("2024-01-01"), to_date=pendulum.parse("2024-01-02") + ) + session.add(b) + session.commit() + + response = test_client.get(f"/backfills/{b.id}/dag_runs") + assert response.status_code == 200 + data = response.json() + assert data["total_entries"] == 0 + assert data["backfill_dag_runs"] == [] + + @pytest.mark.parametrize( + ("order_by", "expected_first_ordinal"), + [ + ("sort_ordinal", 1), + ("-sort_ordinal", 3), + ("id", 1), + ], + ) + def test_list_backfill_dag_runs_ordering(self, order_by, expected_first_ordinal, test_client, session): + """Verify sort contract for allowed order_by values.""" + (dag,) = self._create_dag_models() + b = Backfill( + dag_id=dag.dag_id, from_date=pendulum.parse("2024-01-01"), to_date=pendulum.parse("2024-01-03") + ) + session.add(b) + session.flush() + + for i in range(1, 4): + session.add( + BackfillDagRun( + backfill_id=b.id, + dag_run_id=None, + logical_date=pendulum.parse(f"2024-01-0{i}"), + sort_ordinal=i, + exception_reason="already exists", + ) + ) + session.commit() + + response = test_client.get(f"/backfills/{b.id}/dag_runs?order_by={order_by}") + assert response.status_code == 200 + data = response.json() + assert data["backfill_dag_runs"][0]["sort_ordinal"] == expected_first_ordinal + + class TestCreateBackfill(TestBackfillEndpoint): @pytest.mark.parametrize( ("repro_act", "repro_exp"), diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index f05fa65cf56f8..0a1805719d3c2 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -80,6 +80,10 @@ class AssetWatcherResponse(BaseModel): created_date: Annotated[datetime, Field(title="Created Date")] +class DagRunId(RootModel[int]): + root: Annotated[int, Field(ge=0, title="Dag Run Id")] + + class BaseInfoResponse(BaseModel): """ Base info serializer for responses. @@ -1234,6 +1238,22 @@ class AssetStateCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class BackfillDagRunResponse(BaseModel): + """ + Serializer for a single BackfillDagRun entry with joined DagRun state. + """ + + id: Annotated[int, Field(ge=0, title="Id")] + backfill_id: Annotated[int, Field(ge=0, title="Backfill Id")] + dag_run_id: Annotated[DagRunId | None, Field(title="Dag Run Id")] = None + logical_date: Annotated[datetime | None, Field(title="Logical Date")] = None + partition_key: Annotated[str | None, Field(title="Partition Key")] = None + sort_ordinal: Annotated[int, Field(title="Sort Ordinal")] + exception_reason: Annotated[str | None, Field(title="Exception Reason")] = None + dag_run_state: DagRunState | None = None + dag_run_run_id: Annotated[str | None, Field(title="Dag Run Run Id")] = None + + class BackfillPostBody(BaseModel): """ Object used for create backfill request. @@ -2026,6 +2046,15 @@ class BackfillCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class BackfillDagRunCollectionResponse(BaseModel): + """ + BackfillDagRun Collection serializer for responses. + """ + + backfill_dag_runs: Annotated[list[BackfillDagRunResponse], Field(title="Backfill Dag Runs")] + total_entries: Annotated[int, Field(title="Total Entries")] + + class BulkBodyBulkDAGRunBody(BaseModel): model_config = ConfigDict( extra="forbid",