Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class GanttTaskInstance(BaseModel):
task_display_name: str
try_number: int
state: TaskInstanceState | None
scheduled_dttm: datetime | None
queued_dttm: datetime | None
start_date: datetime | None
end_date: datetime | None
is_group: bool = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2249,6 +2249,18 @@ components:
anyOf:
- $ref: '#/components/schemas/TaskInstanceState'
- type: 'null'
scheduled_dttm:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Scheduled Dttm
queued_dttm:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Queued Dttm
start_date:
anyOf:
- type: string
Expand All @@ -2275,6 +2287,8 @@ components:
- task_display_name
- try_number
- state
- scheduled_dttm
- queued_dttm
- start_date
- end_date
title: GanttTaskInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def get_gantt_data(
TaskInstance.task_display_name.label("task_display_name"), # type: ignore[attr-defined]
TaskInstance.try_number.label("try_number"),
TaskInstance.state.label("state"),
TaskInstance.scheduled_dttm.label("scheduled_dttm"),
TaskInstance.queued_dttm.label("queued_dttm"),
TaskInstance.start_date.label("start_date"),
TaskInstance.end_date.label("end_date"),
).where(
Expand All @@ -81,6 +83,8 @@ def get_gantt_data(
TaskInstanceHistory.task_display_name.label("task_display_name"),
TaskInstanceHistory.try_number.label("try_number"),
TaskInstanceHistory.state.label("state"),
TaskInstanceHistory.scheduled_dttm.label("scheduled_dttm"),
TaskInstanceHistory.queued_dttm.label("queued_dttm"),
TaskInstanceHistory.start_date.label("start_date"),
TaskInstanceHistory.end_date.label("end_date"),
).where(
Expand All @@ -106,6 +110,8 @@ def get_gantt_data(
task_display_name=row.task_display_name,
try_number=row.try_number,
state=row.state,
scheduled_dttm=row.scheduled_dttm,
queued_dttm=row.queued_dttm,
start_date=row.start_date,
end_date=row.end_date,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8028,6 +8028,30 @@ export const $GanttTaskInstance = {
}
]
},
scheduled_dttm: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Scheduled Dttm'
},
queued_dttm: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Queued Dttm'
},
start_date: {
anyOf: [
{
Expand Down Expand Up @@ -8064,7 +8088,7 @@ export const $GanttTaskInstance = {
}
},
type: 'object',
required: ['task_id', 'task_display_name', 'try_number', 'state', 'start_date', 'end_date'],
required: ['task_id', 'task_display_name', 'try_number', 'state', 'scheduled_dttm', 'queued_dttm', 'start_date', 'end_date'],
title: 'GanttTaskInstance',
description: 'Task instance data for Gantt chart.'
} as const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1973,6 +1973,8 @@ export type GanttTaskInstance = {
task_display_name: string;
try_number: number;
state: TaskInstanceState | null;
scheduled_dttm: string | null;
queued_dttm: string | null;
start_date: string | null;
end_date: string | null;
is_group?: boolean;
Expand Down
56 changes: 52 additions & 4 deletions airflow-core/src/airflow/ui/src/layouts/Details/Gantt/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import { buildTaskInstanceUrl } from "src/utils/links";
export type GanttDataItem = {
isGroup?: boolean | null;
isMapped?: boolean | null;
isQueued?: boolean;
isScheduled?: boolean;
state?: TaskInstanceState | null;
taskId: string;
tryNumber?: number;
Expand Down Expand Up @@ -122,19 +124,57 @@ export const transformGanttData = ({
if (tries && tries.length > 0) {
return tries
.filter((tryInstance) => tryInstance.start_date !== null)
.map((tryInstance) => {
.flatMap((tryInstance) => {
const hasTaskRunning = isStatePending(tryInstance.state);
const endTime = hasTaskRunning ? dayjs().toISOString() : tryInstance.end_date;

return {
const items: Array<GanttDataItem> = [];

// Scheduled segment: from scheduled_dttm to queued_dttm (or start_date if no queued_dttm)
if (tryInstance.scheduled_dttm !== null) {
const scheduledEnd = tryInstance.queued_dttm ?? tryInstance.start_date;

items.push({
isGroup: false,
isMapped: tryInstance.is_mapped,
isScheduled: true,
state: "scheduled" as TaskInstanceState,
taskId: tryInstance.task_id,
tryNumber: tryInstance.try_number,
x: [dayjs(tryInstance.scheduled_dttm).toISOString(), dayjs(scheduledEnd).toISOString()],
y: tryInstance.task_display_name,
});
}

// Queue segment: from queued_dttm to start_date
if (tryInstance.queued_dttm !== null) {
items.push({
isGroup: false,
isMapped: tryInstance.is_mapped,
isQueued: true,
state: "queued" as TaskInstanceState,
taskId: tryInstance.task_id,
tryNumber: tryInstance.try_number,
x: [
dayjs(tryInstance.queued_dttm).toISOString(),
dayjs(tryInstance.start_date).toISOString(),
],
y: tryInstance.task_display_name,
});
}

// Execution segment: from start_date to end_date
items.push({
isGroup: false,
isMapped: tryInstance.is_mapped,
isQueued: false,
state: tryInstance.state,
taskId: tryInstance.task_id,
tryNumber: tryInstance.try_number,
x: [dayjs(tryInstance.start_date).toISOString(), dayjs(endTime).toISOString()],
y: tryInstance.task_display_name,
};
});

return items;
});
}
}
Expand Down Expand Up @@ -328,6 +368,14 @@ export const createChartOptions = ({
label(tooltipItem: TooltipItem<"bar">) {
const taskInstance = data[tooltipItem.dataIndex];

if (taskInstance?.isScheduled) {
return `${translate("state")}: ${translate("states.scheduled")}`;
}

if (taskInstance?.isQueued) {
return `${translate("state")}: ${translate("states.queued")}`;
}

return `${translate("state")}: ${translate(`states.${taskInstance?.state}`)}`;
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
"task_display_name": TASK_DISPLAY_NAME,
"try_number": 1,
"state": "success",
"scheduled_dttm": "2024-11-30T09:50:00Z",
"queued_dttm": "2024-11-30T09:55:00Z",
"start_date": "2024-11-30T10:00:00Z",
"end_date": "2024-11-30T10:05:00Z",
"is_group": False,
Expand All @@ -62,6 +64,8 @@
"task_display_name": TASK_DISPLAY_NAME_2,
"try_number": 1,
"state": "failed",
"scheduled_dttm": "2024-11-30T10:02:00Z",
"queued_dttm": "2024-11-30T10:03:00Z",
"start_date": "2024-11-30T10:05:00Z",
"end_date": "2024-11-30T10:10:00Z",
"is_group": False,
Expand All @@ -73,6 +77,8 @@
"task_display_name": TASK_DISPLAY_NAME_3,
"try_number": 1,
"state": "running",
"scheduled_dttm": None,
"queued_dttm": None,
"start_date": "2024-11-30T10:10:00Z",
"end_date": None,
"is_group": False,
Expand Down Expand Up @@ -116,16 +122,22 @@ def setup(dag_maker, session=None):
if ti.task_id == TASK_ID:
ti.state = TaskInstanceState.SUCCESS
ti.try_number = 1
ti.scheduled_dttm = pendulum.DateTime(2024, 11, 30, 9, 50, 0, tzinfo=pendulum.UTC)
ti.queued_dttm = pendulum.DateTime(2024, 11, 30, 9, 55, 0, tzinfo=pendulum.UTC)
ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 0, 0, tzinfo=pendulum.UTC)
ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0, tzinfo=pendulum.UTC)
elif ti.task_id == TASK_ID_2:
ti.state = TaskInstanceState.FAILED
ti.try_number = 1
ti.scheduled_dttm = pendulum.DateTime(2024, 11, 30, 10, 2, 0, tzinfo=pendulum.UTC)
ti.queued_dttm = pendulum.DateTime(2024, 11, 30, 10, 3, 0, tzinfo=pendulum.UTC)
ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 5, 0, tzinfo=pendulum.UTC)
ti.end_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0, tzinfo=pendulum.UTC)
elif ti.task_id == TASK_ID_3:
ti.state = TaskInstanceState.RUNNING
ti.try_number = 1
ti.scheduled_dttm = None
ti.queued_dttm = None
ti.start_date = pendulum.DateTime(2024, 11, 30, 10, 10, 0, tzinfo=pendulum.UTC)
ti.end_date = None

Expand Down Expand Up @@ -306,6 +318,18 @@ def test_sorted_by_task_id_and_try_number(self, test_client):
sorted_tis = sorted(task_instances, key=lambda x: (x["task_id"], x["try_number"]))
assert task_instances == sorted_tis

def test_timing_fields_are_returned(self, test_client):
response = test_client.get(f"/gantt/{DAG_ID}/run_1")
assert response.status_code == 200
data = response.json()
tis = {ti["task_id"]: ti for ti in data["task_instances"]}
assert tis[TASK_ID]["scheduled_dttm"] == "2024-11-30T09:50:00Z"
assert tis[TASK_ID]["queued_dttm"] == "2024-11-30T09:55:00Z"
assert tis[TASK_ID_2]["scheduled_dttm"] == "2024-11-30T10:02:00Z"
assert tis[TASK_ID_2]["queued_dttm"] == "2024-11-30T10:03:00Z"
assert tis[TASK_ID_3]["scheduled_dttm"] is None
assert tis[TASK_ID_3]["queued_dttm"] is None

def test_should_response_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.get(f"/gantt/{DAG_ID}/run_1")
assert response.status_code == 401
Expand Down
Loading