Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@

from collections.abc import Iterable
from datetime import datetime
from typing import TYPE_CHECKING

from pydantic import AliasPath, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator
from pydantic import AliasPath, AwareDatetime, ConfigDict, Field, JsonValue, NonNegativeInt, field_validator

from airflow._shared.secrets_masker import redact
from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
from airflow.api_fastapi.core_api.datamodels.dag_run import TriggerDAGRunPostBody
from airflow.utils.types import DagRunType

if TYPE_CHECKING:
from airflow.serialization.definitions.dag import SerializedDAG


class DagScheduleAssetReference(StrictBaseModel):
Expand Down Expand Up @@ -185,3 +192,21 @@ def set_from_rest_api(cls, v: dict) -> dict:
return v

model_config = ConfigDict(extra="forbid")


class MaterializeAssetBody(TriggerDAGRunPostBody):
"""Materialize asset request."""

logical_date: AwareDatetime | None = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already had logical_date field for TriggerDAGRunPostBody, do we still need to add it again here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to set the default as None


def validate_context(self, dag: SerializedDAG) -> dict:
params = super().validate_context(dag)
if self.dag_run_id is None:
params["run_id"] = dag.timetable.generate_run_id(
run_type=DagRunType.ASSET_MATERIALIZATION,
run_after=timezone.coerce_datetime(params["run_after"]),
data_interval=params["data_interval"],
)
return params

model_config = ConfigDict(extra="forbid")
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,14 @@ paths:
schema:
type: integer
title: Asset Id
requestBody:
content:
application/json:
schema:
anyOf:
- $ref: '#/components/schemas/MaterializeAssetBody'
- type: 'null'
title: Body
responses:
'200':
description: Successful Response
Expand Down Expand Up @@ -11949,6 +11957,57 @@ components:
type: object
title: LastAssetEventResponse
description: Last asset event response serializer.
MaterializeAssetBody:
properties:
dag_run_id:
anyOf:
- type: string
- type: 'null'
title: Dag Run Id
data_interval_start:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval Start
data_interval_end:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Data Interval End
logical_date:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date
run_after:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Run After
conf:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
title: Conf
note:
anyOf:
- type: string
- type: 'null'
title: Note
partition_key:
anyOf:
- type: string
- type: 'null'
title: Partition Key
additionalProperties: false
type: object
title: MaterializeAssetBody
description: Materialize asset request.
PatchTaskInstanceBody:
properties:
new_state:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
MaterializeAssetBody,
QueuedEventCollectionResponse,
QueuedEventResponse,
)
Expand Down Expand Up @@ -387,6 +388,7 @@ def materialize_asset(
dag_bag: DagBagDep,
user: GetUserDep,
session: SessionDep,
body: MaterializeAssetBody | None = None,
) -> DAGRunResponse:
"""Materialize an asset by triggering a DAG run that produces it."""
dag_id_it = iter(
Expand Down Expand Up @@ -425,17 +427,19 @@ def materialize_asset(
f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs",
)

params = (body or MaterializeAssetBody()).validate_context(dag)
return dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.ASSET_MATERIALIZATION,
run_after=(run_after := timezone.coerce_datetime(timezone.utcnow())),
data_interval=None,
),
run_after=run_after,
run_id=params["run_id"],
logical_date=params["logical_date"],
data_interval=params["data_interval"],
run_after=params["run_after"],
conf=params["conf"],
run_type=DagRunType.ASSET_MATERIALIZATION,
triggered_by=DagRunTriggeredByType.REST_API,
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
note=params["note"],
session=session,
)

Expand Down
7 changes: 5 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from "@tanstack/react-query";
import { AssetService, AuthLinksService, BackfillService, CalendarService, ConfigService, ConnectionService, DagParsingService, DagRunService, DagService, DagSourceService, DagStatsService, DagVersionService, DagWarningService, DashboardService, DeadlinesService, DependenciesService, EventLogService, ExperimentalService, ExtraLinksService, GanttService, GridService, ImportErrorService, JobService, LoginService, MonitorService, PartitionedDagRunService, PluginService, PoolService, ProviderService, StructureService, TaskInstanceService, TaskService, TeamsService, VariableService, VersionService, XcomService } from "../requests/services.gen";
import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_, BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_, ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState, DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody, PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody } from "../requests/types.gen";
import * as Common from "./common";
/**
* Get Assets
Expand Down Expand Up @@ -1765,14 +1765,17 @@ export const useAssetServiceCreateAssetEvent = <TData = Common.AssetServiceCreat
* Materialize an asset by triggering a DAG run that produces it.
* @param data The data for the request.
* @param data.assetId
* @param data.requestBody
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceMaterializeAsset = <TData = Common.AssetServiceMaterializeAssetMutationResult, TError = unknown, TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
assetId: number;
requestBody?: MaterializeAssetBody;
}, TContext>, "mutationFn">) => useMutation<TData, TError, {
assetId: number;
}, TContext>({ mutationFn: ({ assetId }) => AssetService.materializeAsset({ assetId }) as unknown as Promise<TData>, ...options });
requestBody?: MaterializeAssetBody;
}, TContext>({ mutationFn: ({ assetId, requestBody }) => AssetService.materializeAsset({ assetId, requestBody }) as unknown as Promise<TData>, ...options });
/**
* Create Backfill
* @param data The data for the request.
Expand Down
102 changes: 102 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4473,6 +4473,108 @@ export const $LastAssetEventResponse = {
description: 'Last asset event response serializer.'
} as const;

export const $MaterializeAssetBody = {
properties: {
dag_run_id: {
anyOf: [
{
type: 'string'
},
{
type: 'null'
}
],
title: 'Dag Run Id'
},
data_interval_start: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Data Interval Start'
},
data_interval_end: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Data Interval End'
},
logical_date: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Logical Date'
},
run_after: {
anyOf: [
{
type: 'string',
format: 'date-time'
},
{
type: 'null'
}
],
title: 'Run After'
},
conf: {
anyOf: [
{
additionalProperties: true,
type: 'object'
},
{
type: 'null'
}
],
title: 'Conf'
},
note: {
anyOf: [
{
type: 'string'
},
{
type: 'null'
}
],
title: 'Note'
},
partition_key: {
anyOf: [
{
type: 'string'
},
{
type: 'null'
}
],
title: 'Partition Key'
}
},
additionalProperties: false,
type: 'object',
title: 'MaterializeAssetBody',
description: 'Materialize asset request.'
} as const;

export const $PatchTaskInstanceBody = {
properties: {
new_state: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export class AssetService {
* Materialize an asset by triggering a DAG run that produces it.
* @param data The data for the request.
* @param data.assetId
* @param data.requestBody
* @returns DAGRunResponse Successful Response
* @throws ApiError
*/
Expand All @@ -182,6 +183,8 @@ export class AssetService {
path: {
asset_id: data.assetId
},
body: data.requestBody,
mediaType: 'application/json',
errors: {
400: 'Bad Request',
401: 'Unauthorized',
Expand Down
17 changes: 17 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,22 @@ export type LastAssetEventResponse = {
timestamp?: string | null;
};

/**
* Materialize asset request.
*/
export type MaterializeAssetBody = {
dag_run_id?: string | null;
data_interval_start?: string | null;
data_interval_end?: string | null;
logical_date?: string | null;
run_after?: string | null;
conf?: {
[key: string]: unknown;
} | null;
note?: string | null;
partition_key?: string | null;
};

/**
* Request body for Clear Task Instances endpoint.
*/
Expand Down Expand Up @@ -2307,6 +2323,7 @@ export type CreateAssetEventResponse = AssetEventResponse;

export type MaterializeAssetData = {
assetId: number;
requestBody?: MaterializeAssetBody | null;
};

export type MaterializeAssetResponse = DAGRunResponse;
Expand Down
Loading
Loading