diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py index 57a96c8a0ad70..3b74e84b47f3d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/gantt.py @@ -30,6 +30,8 @@ class GanttTaskInstance(BaseModel): task_display_name: str try_number: int state: TaskInstanceState | None + scheduled_dttm: datetime | None + queued_dttm: datetime | None start_date: datetime | None end_date: datetime | None is_group: bool = False diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 9e81e8344fb20..9103b8822fcef 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -2249,6 +2249,18 @@ components: anyOf: - $ref: '#/components/schemas/TaskInstanceState' - type: 'null' + scheduled_dttm: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Scheduled Dttm + queued_dttm: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Queued Dttm start_date: anyOf: - type: string @@ -2275,6 +2287,8 @@ components: - task_display_name - try_number - state + - scheduled_dttm + - queued_dttm - start_date - end_date title: GanttTaskInstance diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py index f33b12e6f7e8a..7807e3fd6bc0f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/gantt.py @@ -67,6 +67,8 @@ def get_gantt_data( TaskInstance.task_display_name.label("task_display_name"), # type: ignore[attr-defined] TaskInstance.try_number.label("try_number"), TaskInstance.state.label("state"), + TaskInstance.scheduled_dttm.label("scheduled_dttm"), + TaskInstance.queued_dttm.label("queued_dttm"), TaskInstance.start_date.label("start_date"), TaskInstance.end_date.label("end_date"), ).where( @@ -81,6 +83,8 @@ def get_gantt_data( TaskInstanceHistory.task_display_name.label("task_display_name"), TaskInstanceHistory.try_number.label("try_number"), TaskInstanceHistory.state.label("state"), + TaskInstanceHistory.scheduled_dttm.label("scheduled_dttm"), + TaskInstanceHistory.queued_dttm.label("queued_dttm"), TaskInstanceHistory.start_date.label("start_date"), TaskInstanceHistory.end_date.label("end_date"), ).where( @@ -106,6 +110,8 @@ def get_gantt_data( task_display_name=row.task_display_name, try_number=row.try_number, state=row.state, + scheduled_dttm=row.scheduled_dttm, + queued_dttm=row.queued_dttm, start_date=row.start_date, end_date=row.end_date, ) diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index f47aa107aea24..f1988af3e61e7 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -8028,6 +8028,30 @@ export const $GanttTaskInstance = { } ] }, + scheduled_dttm: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Scheduled Dttm' + }, + queued_dttm: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Queued Dttm' + }, start_date: { anyOf: [ { @@ -8064,7 +8088,7 @@ export const $GanttTaskInstance = { } }, type: 'object', - required: ['task_id', 'task_display_name', 'try_number', 'state', 'start_date', 'end_date'], + required: ['task_id', 'task_display_name', 'try_number', 'state', 'scheduled_dttm', 'queued_dttm', 'start_date', 'end_date'], title: 'GanttTaskInstance', description: 'Task instance data for Gantt chart.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 7423c08d42c05..a12382f277711 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1973,6 +1973,8 @@ export type GanttTaskInstance = { task_display_name: string; try_number: number; state: TaskInstanceState | null; + scheduled_dttm: string | null; + queued_dttm: string | null; start_date: string | null; end_date: string | null; is_group?: boolean; diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts index b2a54fa7d7e82..27e73183001d3 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts @@ -38,6 +38,8 @@ import { buildTaskInstanceUrl } from "src/utils/links"; export type GanttDataItem = { isGroup?: boolean | null; isMapped?: boolean | null; + isQueued?: boolean; + isScheduled?: boolean; state?: TaskInstanceState | null; taskId: string; tryNumber?: number; @@ -122,19 +124,57 @@ export const transformGanttData = ({ if (tries && tries.length > 0) { return tries .filter((tryInstance) => tryInstance.start_date !== null) - .map((tryInstance) => { + .flatMap((tryInstance) => { const hasTaskRunning = isStatePending(tryInstance.state); const endTime = hasTaskRunning ? dayjs().toISOString() : tryInstance.end_date; - - return { + const items: Array = []; + + // Scheduled segment: from scheduled_dttm to queued_dttm (or start_date if no queued_dttm) + if (tryInstance.scheduled_dttm !== null) { + const scheduledEnd = tryInstance.queued_dttm ?? tryInstance.start_date; + + items.push({ + isGroup: false, + isMapped: tryInstance.is_mapped, + isScheduled: true, + state: "scheduled" as TaskInstanceState, + taskId: tryInstance.task_id, + tryNumber: tryInstance.try_number, + x: [dayjs(tryInstance.scheduled_dttm).toISOString(), dayjs(scheduledEnd).toISOString()], + y: tryInstance.task_display_name, + }); + } + + // Queue segment: from queued_dttm to start_date + if (tryInstance.queued_dttm !== null) { + items.push({ + isGroup: false, + isMapped: tryInstance.is_mapped, + isQueued: true, + state: "queued" as TaskInstanceState, + taskId: tryInstance.task_id, + tryNumber: tryInstance.try_number, + x: [ + dayjs(tryInstance.queued_dttm).toISOString(), + dayjs(tryInstance.start_date).toISOString(), + ], + y: tryInstance.task_display_name, + }); + } + + // Execution segment: from start_date to end_date + items.push({ isGroup: false, isMapped: tryInstance.is_mapped, + isQueued: false, state: tryInstance.state, taskId: tryInstance.task_id, tryNumber: tryInstance.try_number, x: [dayjs(tryInstance.start_date).toISOString(), dayjs(endTime).toISOString()], y: tryInstance.task_display_name, - }; + }); + + return items; }); } } @@ -328,6 +368,14 @@ export const createChartOptions = ({ label(tooltipItem: TooltipItem<"bar">) { const taskInstance = data[tooltipItem.dataIndex]; + if (taskInstance?.isScheduled) { + return `${translate("state")}: ${translate("states.scheduled")}`; + } + + if (taskInstance?.isQueued) { + return `${translate("state")}: ${translate("states.queued")}`; + } + return `${translate("state")}: ${translate(`states.${taskInstance?.state}`)}`; }, }, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py index 162c82682afcf..0e2be9e277cae 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_gantt.py @@ -51,6 +51,8 @@ "task_display_name": TASK_DISPLAY_NAME, "try_number": 1, "state": "success", + "scheduled_dttm": "2024-11-30T09:50:00Z", + "queued_dttm": "2024-11-30T09:55:00Z", "start_date": "2024-11-30T10:00:00Z", "end_date": "2024-11-30T10:05:00Z", "is_group": False, @@ -62,6 +64,8 @@ "task_display_name": TASK_DISPLAY_NAME_2, "try_number": 1, "state": "failed", + "scheduled_dttm": "2024-11-30T10:02:00Z", + "queued_dttm": "2024-11-30T10:03:00Z", "start_date": "2024-11-30T10:05:00Z", "end_date": "2024-11-30T10:10:00Z", "is_group": False, @@ -73,6 +77,8 @@ "task_display_name": TASK_DISPLAY_NAME_3, "try_number": 1, "state": "running", + "scheduled_dttm": None, + "queued_dttm": None, "start_date": "2024-11-30T10:10:00Z", "end_date": None, "is_group": False, @@ -116,16 +122,22 @@ def setup(dag_maker, session=None): if ti.task_id == TASK_ID: ti.state = TaskInstanceState.SUCCESS ti.try_number = 1 + ti.scheduled_dttm = pendulum.DateTime(2024, 11, 30, 9, 50, 0, tzinfo=pendulum.UTC) + ti.queued_dttm = pendulum.DateTime(2024, 11, 30, 9, 55, 0, tzinfo=pendulum.UTC) ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 0, 0, tzinfo=pendulum.UTC) ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0, tzinfo=pendulum.UTC) elif ti.task_id == TASK_ID_2: ti.state = TaskInstanceState.FAILED ti.try_number = 1 + ti.scheduled_dttm = pendulum.DateTime(2024, 11, 30, 10, 2, 0, tzinfo=pendulum.UTC) + ti.queued_dttm = pendulum.DateTime(2024, 11, 30, 10, 3, 0, tzinfo=pendulum.UTC) ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0, tzinfo=pendulum.UTC) ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0, tzinfo=pendulum.UTC) elif ti.task_id == TASK_ID_3: ti.state = TaskInstanceState.RUNNING ti.try_number = 1 + ti.scheduled_dttm = None + ti.queued_dttm = None ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0, tzinfo=pendulum.UTC) ti.end_date = None @@ -306,6 +318,18 @@ def test_sorted_by_task_id_and_try_number(self, test_client): sorted_tis = sorted(task_instances, key=lambda x: (x["task_id"], x["try_number"])) assert task_instances == sorted_tis + def test_timing_fields_are_returned(self, test_client): + response = test_client.get(f"/gantt/{DAG_ID}/run_1") + assert response.status_code == 200 + data = response.json() + tis = {ti["task_id"]: ti for ti in data["task_instances"]} + assert tis[TASK_ID]["scheduled_dttm"] == "2024-11-30T09:50:00Z" + assert tis[TASK_ID]["queued_dttm"] == "2024-11-30T09:55:00Z" + assert tis[TASK_ID_2]["scheduled_dttm"] == "2024-11-30T10:02:00Z" + assert tis[TASK_ID_2]["queued_dttm"] == "2024-11-30T10:03:00Z" + assert tis[TASK_ID_3]["scheduled_dttm"] is None + assert tis[TASK_ID_3]["queued_dttm"] is None + def test_should_response_401(self, unauthenticated_test_client): response = unauthenticated_test_client.get(f"/gantt/{DAG_ID}/run_1") assert response.status_code == 401