diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py index c6cb2fa282799..041ec12c1f24b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/assets.py @@ -19,11 +19,18 @@ from collections.abc import Iterable from datetime import datetime +from typing import TYPE_CHECKING -from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator +from pydantic import AliasPath, AwareDatetime, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator from airflow._shared.secrets_masker import redact +from airflow._shared.timezones import timezone from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel +from airflow.api_fastapi.core_api.datamodels.dag_run import TriggerDAGRunPostBody +from airflow.utils.types import DagRunType + +if TYPE_CHECKING: + from airflow.serialization.definitions.dag import SerializedDAG class DagScheduleAssetReference(StrictBaseModel): @@ -185,3 +192,21 @@ def set_from_rest_api(cls, v: dict) -> dict: return v model_config = ConfigDict(extra="forbid") + + +class MaterializeAssetBody(TriggerDAGRunPostBody): + """Materialize asset request.""" + + logical_date: AwareDatetime | None = None + + def validate_context(self, dag: SerializedDAG) -> dict: + params = super().validate_context(dag) + if self.dag_run_id is None: + params["run_id"] = dag.timetable.generate_run_id( + run_type=DagRunType.ASSET_MATERIALIZATION, + run_after=timezone.coerce_datetime(params["run_after"]), + data_interval=params["data_interval"], + ) + return params + + model_config = ConfigDict(extra="forbid") diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 128d0f2bd0536..b167e463a35f0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -484,6 +484,14 @@ paths: schema: type: integer title: Asset Id + requestBody: + content: + application/json: + schema: + anyOf: + - $ref: '#/components/schemas/MaterializeAssetBody' + - type: 'null' + title: Body responses: '200': description: Successful Response @@ -11949,6 +11957,57 @@ components: type: object title: LastAssetEventResponse description: Last asset event response serializer. + MaterializeAssetBody: + properties: + dag_run_id: + anyOf: + - type: string + - type: 'null' + title: Dag Run Id + data_interval_start: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval Start + data_interval_end: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Data Interval End + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date + run_after: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After + conf: + anyOf: + - additionalProperties: true + type: object + - type: 'null' + title: Conf + note: + anyOf: + - type: string + - type: 'null' + title: Note + partition_key: + anyOf: + - type: string + - type: 'null' + title: Partition Key + additionalProperties: false + type: object + title: MaterializeAssetBody + description: Materialize asset request. PatchTaskInstanceBody: properties: new_state: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py index 68220c20e8f95..a911984dc65e2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py @@ -54,6 +54,7 @@ AssetEventResponse, AssetResponse, CreateAssetEventsBody, + MaterializeAssetBody, QueuedEventCollectionResponse, QueuedEventResponse, ) @@ -387,6 +388,7 @@ def materialize_asset( dag_bag: DagBagDep, user: GetUserDep, session: SessionDep, + body: MaterializeAssetBody | None = None, ) -> DAGRunResponse: """Materialize an asset by triggering a DAG run that produces it.""" dag_id_it = iter( @@ -425,17 +427,19 @@ def materialize_asset( f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs", ) + params = (body or MaterializeAssetBody()).validate_context(dag) return dag.create_dagrun( - run_id=dag.timetable.generate_run_id( - run_type=DagRunType.ASSET_MATERIALIZATION, - run_after=(run_after := timezone.coerce_datetime(timezone.utcnow())), - data_interval=None, - ), - run_after=run_after, + run_id=params["run_id"], + logical_date=params["logical_date"], + data_interval=params["data_interval"], + run_after=params["run_after"], + conf=params["conf"], run_type=DagRunType.ASSET_MATERIALIZATION, triggered_by=DagRunTriggeredByType.REST_API, triggering_user_name=user.get_name(), state=DagRunState.QUEUED, + partition_key=params["partition_key"], + note=params["note"], session=session, ) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 8e9ef5aa29d0c..dac7a198e59bd 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2,7 +2,7 @@ import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query"; import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen"; -import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; +import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen"; import * as Common from "./common"; /** * Get Assets @@ -1765,14 +1765,17 @@ export const useAssetServiceCreateAssetEvent = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ assetId }) => AssetService.materializeAsset({ assetId }) as unknown as Promise, ...options }); + requestBody?: MaterializeAssetBody; +}, TContext>({ mutationFn: ({ assetId, requestBody }) => AssetService.materializeAsset({ assetId, requestBody }) as unknown as Promise, ...options }); /** * Create Backfill * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index e17c16e70e49a..c5077028b44f3 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4473,6 +4473,108 @@ export const $LastAssetEventResponse = { description: 'Last asset event response serializer.' } as const; +export const $MaterializeAssetBody = { + properties: { + dag_run_id: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Dag Run Id' + }, + data_interval_start: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Data Interval Start' + }, + data_interval_end: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Data Interval End' + }, + logical_date: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Logical Date' + }, + run_after: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Run After' + }, + conf: { + anyOf: [ + { + additionalProperties: true, + type: 'object' + }, + { + type: 'null' + } + ], + title: 'Conf' + }, + note: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Note' + }, + partition_key: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Partition Key' + } + }, + additionalProperties: false, + type: 'object', + title: 'MaterializeAssetBody', + description: 'Materialize asset request.' +} as const; + export const $PatchTaskInstanceBody = { properties: { new_state: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 6e701bf68dc6e..6f0c2af82fb0e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -172,6 +172,7 @@ export class AssetService { * Materialize an asset by triggering a DAG run that produces it. * @param data The data for the request. * @param data.assetId + * @param data.requestBody * @returns DAGRunResponse Successful Response * @throws ApiError */ @@ -182,6 +183,8 @@ export class AssetService { path: { asset_id: data.assetId }, + body: data.requestBody, + mediaType: 'application/json', errors: { 400: 'Bad Request', 401: 'Unauthorized', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 1bf8cd21fb140..521643cf7f0f6 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1134,6 +1134,22 @@ export type LastAssetEventResponse = { timestamp?: string | null; }; +/** + * Materialize asset request. + */ +export type MaterializeAssetBody = { + dag_run_id?: string | null; + data_interval_start?: string | null; + data_interval_end?: string | null; + logical_date?: string | null; + run_after?: string | null; + conf?: { + [key: string]: unknown; +} | null; + note?: string | null; + partition_key?: string | null; +}; + /** * Request body for Clear Task Instances endpoint. */ @@ -2305,6 +2321,7 @@ export type CreateAssetEventResponse = AssetEventResponse; export type MaterializeAssetData = { assetId: number; + requestBody?: MaterializeAssetBody | null; }; export type MaterializeAssetResponse = DAGRunResponse; diff --git a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx index d94991378e765..91b2cd1daa051 100644 --- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx +++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx @@ -26,7 +26,6 @@ import { FiPlay } from "react-icons/fi"; import { useDagParams } from "src/queries/useDagParams"; import { useParamStore } from "src/queries/useParamStore"; import { useTogglePause } from "src/queries/useTogglePause"; -import { useTrigger } from "src/queries/useTrigger"; import { DEFAULT_DATETIME_FORMAT } from "src/utils/datetimeUtils"; import ConfigForm from "../ConfigForm"; @@ -35,16 +34,17 @@ import { ErrorAlert, type ExpandedApiError } from "../ErrorAlert"; import { Checkbox } from "../ui/Checkbox"; import { RadioCardItem, RadioCardRoot } from "../ui/RadioCard"; import TriggerDAGAdvancedOptions from "./TriggerDAGAdvancedOptions"; -import type { DagRunTriggerParams } from "./types"; -import { dataIntervalModeOptions } from "./types"; +import { dataIntervalModeOptions, type DagRunTriggerParams } from "./types"; type TriggerDAGFormProps = { readonly dagDisplayName: string; readonly dagId: string; + readonly error?: unknown; readonly hasSchedule: boolean; readonly isPartitioned: boolean; readonly isPaused: boolean; - readonly onClose: () => void; + readonly isPending?: boolean; + readonly onSubmitTrigger?: (params: DagRunTriggerParams) => void; readonly open: boolean; readonly prefillConfig?: | { @@ -58,10 +58,12 @@ type TriggerDAGFormProps = { const TriggerDAGForm = ({ dagDisplayName, dagId, + error, hasSchedule, isPartitioned, isPaused, - onClose, + isPending = false, + onSubmitTrigger, open, prefillConfig, }: TriggerDAGFormProps) => { @@ -69,10 +71,8 @@ const TriggerDAGForm = ({ const [errors, setErrors] = useState<{ conf?: string; date?: unknown }>({}); const [formError, setFormError] = useState(false); const initialParamsDict = useDagParams(dagId, open); - const { error: errorTrigger, isPending, triggerDagRun } = useTrigger({ dagId, onSuccessConfirm: onClose }); const { conf, initialParamDict, setConf, setInitialParamDict } = useParamStore(); const [unpause, setUnpause] = useState(true); - const { mutate: togglePause } = useTogglePause({ dagId }); const { control, handleSubmit, reset, watch } = useForm({ @@ -106,7 +106,6 @@ const TriggerDAGForm = ({ note: "", partitionKey: undefined, }); - // Also update the param store to keep it in sync. // Wait until we have the initial params so section ordering stays consistent. if (confString && Object.keys(initialParamsDict.paramsDict).length > 0) { @@ -130,16 +129,11 @@ const TriggerDAGForm = ({ // Automatically reset form when conf is fetched (only if no prefillConfig) useEffect(() => { if (conf && !prefillConfig && open) { - reset((prevValues) => ({ - ...prevValues, - conf, - })); + reset((prevValues) => ({ ...prevValues, conf })); } }, [conf, prefillConfig, open, reset]); - const resetDateError = () => { - setErrors((prev) => ({ ...prev, date: undefined })); - }; + const resetDateError = () => setErrors((prev) => ({ ...prev, date: undefined })); const dataIntervalMode = watch("dataIntervalMode"); const dataIntervalStart = watch("dataIntervalStart"); @@ -150,19 +144,14 @@ const TriggerDAGForm = ({ (noDataInterval || dayjs(dataIntervalStart).isAfter(dayjs(dataIntervalEnd))); const onSubmit = (data: DagRunTriggerParams) => { if (unpause && isPaused) { - togglePause({ - dagId, - requestBody: { - is_paused: false, - }, - }); + togglePause({ dagId, requestBody: { is_paused: false } }); } - triggerDagRun(data); + onSubmitTrigger?.(data); }; return ( <> - + {isPartitioned ? undefined : ( <> @@ -272,7 +261,7 @@ const TriggerDAGForm = ({ formError || isPending || dataIntervalInvalid || - (Boolean(errorTrigger) && (errorTrigger as ExpandedApiError).status === 403) + (Boolean(error) && (error as ExpandedApiError).status === 403) } onClick={() => void handleSubmit(onSubmit)()} > diff --git a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx index 840d15626f974..cfa17d6865eed 100644 --- a/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx +++ b/airflow-core/src/airflow/ui/src/components/TriggerDag/TriggerDAGModal.tsx @@ -137,7 +137,6 @@ const TriggerDAGModal: React.FC = ({ hasSchedule={hasSchedule} isPartitioned={isPartitioned} isPaused={isPaused} - onClose={onClose} open={open} prefillConfig={prefillConfig} /> diff --git a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx index 472981325bf20..6b9d00afad189 100644 --- a/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Asset/CreateAssetEventModal.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Button, Field, Heading, HStack, VStack, Text } from "@chakra-ui/react"; +import { Button, Field, Heading, HStack, Text, VStack } from "@chakra-ui/react"; import { useQueryClient } from "@tanstack/react-query"; import { useState } from "react"; import { useTranslation } from "react-i18next"; @@ -36,14 +36,15 @@ import type { AssetEventResponse, AssetResponse, DAGRunResponse, + MaterializeAssetBody, EdgeResponse, } from "openapi/requests/types.gen"; import { ErrorAlert } from "src/components/ErrorAlert"; import { JsonEditor } from "src/components/JsonEditor"; +import TriggerDAGForm from "src/components/TriggerDag/TriggerDAGForm"; +import type { DagRunTriggerParams } from "src/components/TriggerDag/types"; import { Dialog, toaster } from "src/components/ui"; -import { Checkbox } from "src/components/ui/Checkbox"; import { RadioCardItem, RadioCardRoot } from "src/components/ui/RadioCard"; -import { useTogglePause } from "src/queries/useTogglePause"; type Props = { readonly asset: AssetResponse; @@ -55,7 +56,6 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => { const { t: translate } = useTranslation(["assets", "components"]); const [eventType, setEventType] = useState("manual"); const [extraError, setExtraError] = useState(); - const [unpause, setUnpause] = useState(true); const [extra, setExtra] = useState("{}"); const [partitionKey, setPartitionKey] = useState(undefined); const queryClient = useQueryClient(); @@ -93,6 +93,7 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => { const onSuccess = async (response: AssetEventResponse | DAGRunResponse) => { setExtra("{}"); setExtraError(undefined); + setPartitionKey(undefined); onClose(); let queryKeys = [UseAssetServiceGetAssetEventsKeyFn({ assetId: asset.id }, [{ assetId: asset.id }])]; @@ -127,11 +128,9 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => { enabled: Boolean(upstreamDagId), }); - const { mutate: togglePause } = useTogglePause({ dagId: dag?.dag_id ?? upstreamDagId ?? "" }); - const { error: manualError, - isPending, + isPending: isManualPending, mutate: createAssetEvent, } = useAssetServiceCreateAssetEvent({ onSuccess }); const { @@ -142,28 +141,41 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => { onSuccess, }); - const handleSubmit = () => { - if (eventType === "materialize") { - if (unpause && dag?.is_paused) { - togglePause({ - dagId: dag.dag_id, - requestBody: { - is_paused: false, - }, - }); - } - materializeAsset({ assetId: asset.id }); - } else { - createAssetEvent({ - requestBody: { - asset_id: asset.id, - extra: JSON.parse(extra) as Record, - partition_key: partitionKey ?? null, - }, - }); - } + const handleMaterializeSubmit = (dagRunRequestBody: DagRunTriggerParams) => { + const parsedConfig = JSON.parse(dagRunRequestBody.conf) as Record; + const logicalDate = dagRunRequestBody.logicalDate ? new Date(dagRunRequestBody.logicalDate) : undefined; + const dataIntervalStart = dagRunRequestBody.dataIntervalStart + ? new Date(dagRunRequestBody.dataIntervalStart) + : undefined; + const dataIntervalEnd = dagRunRequestBody.dataIntervalEnd + ? new Date(dagRunRequestBody.dataIntervalEnd) + : undefined; + + const requestBody: MaterializeAssetBody = { + conf: parsedConfig, + dag_run_id: dagRunRequestBody.dagRunId === "" ? undefined : dagRunRequestBody.dagRunId, + data_interval_end: dataIntervalEnd?.toISOString() ?? null, + data_interval_start: dataIntervalStart?.toISOString() ?? null, + logical_date: logicalDate?.toISOString() ?? null, + note: dagRunRequestBody.note === "" ? undefined : dagRunRequestBody.note, + partition_key: dagRunRequestBody.partitionKey ?? null, + }; + + materializeAsset({ + assetId: asset.id, + requestBody, + }); }; + const handleManualSubmit = () => + createAssetEvent({ + requestBody: { + asset_id: asset.id, + extra: JSON.parse(extra) as Record, + partition_key: partitionKey ?? null, + }, + }); + return ( @@ -203,37 +215,48 @@ export const CreateAssetEventModal = ({ asset, onClose, open }: Props) => { /> + {eventType === "manual" ? ( + + {translate("createEvent.manual.extra")} + + {extraError} + + ) : undefined} {eventType === "manual" ? ( <> - - {translate("createEvent.manual.extra")} - - {extraError} - {translate("common:dagRun.partitionKey")} - {extraError} + ) : undefined} - {eventType === "materialize" && dag?.is_paused ? ( - setUnpause(!unpause)}> - {translate("createEvent.materialize.unpauseDag", { dagName: dag.dag_display_name })} - + {eventType === "materialize" && dag !== undefined && upstreamDagId !== undefined ? ( + ) : undefined} - - - - + {eventType === "manual" ? ( + + + + ) : undefined} ); diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 7912e995d8ea6..749d94ac87792 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1408,6 +1408,59 @@ def test_should_respond_200(self, test_client): "note": None, } + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_should_respond_200_with_partition_key(self, test_client): + partition_key = "2026-03-23" + response = test_client.post("/assets/1/materialize", json={"partition_key": partition_key}) + assert response.status_code == 200 + assert response.json()["partition_key"] == partition_key + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_should_respond_200_with_trigger_fields(self, test_client): + payload = { + "conf": {"foo": "bar"}, + "dag_run_id": "asset_materialization_run_1", + "data_interval_end": "2026-03-24T00:00:00Z", + "data_interval_start": "2026-03-23T00:00:00Z", + "logical_date": "2026-03-23T00:00:00Z", + "note": "created from asset page", + "partition_key": "2026-03-23", + } + response = test_client.post("/assets/1/materialize", json=payload) + + assert response.status_code == 200 + assert response.json()["conf"] == {"foo": "bar"} + assert response.json()["dag_run_id"] == "asset_materialization_run_1" + assert response.json()["data_interval_start"] == "2026-03-23T00:00:00Z" + assert response.json()["data_interval_end"] == "2026-03-24T00:00:00Z" + assert response.json()["logical_date"] == "2026-03-23T00:00:00Z" + assert response.json()["note"] == "created from asset page" + assert response.json()["partition_key"] == "2026-03-23" + assert response.json()["run_type"] == "asset_materialization" + + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_should_respond_200_with_trigger_fields_without_dag_run_id(self, test_client): + payload = { + "conf": {"foo": "bar"}, + # "dag_run_id": "asset_materialization_run_1", + "data_interval_end": "2026-03-24T00:00:00Z", + "data_interval_start": "2026-03-23T00:00:00Z", + "logical_date": "2026-03-23T00:00:00Z", + "note": "created from asset page", + "partition_key": "2026-03-23", + } + response = test_client.post("/assets/1/materialize", json=payload) + + assert response.status_code == 200 + assert response.json()["conf"] == {"foo": "bar"} + assert response.json()["dag_run_id"].startswith("asset_materialization__") + assert response.json()["data_interval_start"] == "2026-03-23T00:00:00Z" + assert response.json()["data_interval_end"] == "2026-03-24T00:00:00Z" + assert response.json()["logical_date"] == "2026-03-23T00:00:00Z" + assert response.json()["note"] == "created from asset page" + assert response.json()["partition_key"] == "2026-03-23" + assert response.json()["run_type"] == "asset_materialization" + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.post("/assets/2/materialize") assert response.status_code == 401 diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 1fe3871370357..17aa78431a0fb 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -621,6 +621,24 @@ class LastAssetEventResponse(BaseModel): timestamp: Annotated[datetime | None, Field(title="Timestamp")] = None +class MaterializeAssetBody(BaseModel): + """ + Materialize asset request. + """ + + model_config = ConfigDict( + extra="forbid", + ) + dag_run_id: Annotated[str | None, Field(title="Dag Run Id")] = None + data_interval_start: Annotated[datetime | None, Field(title="Data Interval Start")] = None + data_interval_end: Annotated[datetime | None, Field(title="Data Interval End")] = None + logical_date: Annotated[datetime | None, Field(title="Logical Date")] = None + run_after: Annotated[datetime | None, Field(title="Run After")] = None + conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None + note: Annotated[str | None, Field(title="Note")] = None + partition_key: Annotated[str | None, Field(title="Partition Key")] = None + + class PluginImportErrorResponse(BaseModel): """ Plugin Import Error serializer for responses.