Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship
from sqlalchemy.sql.expression import func, literal

from airflow._shared.observability.metrics import stats
from airflow._shared.timezones import timezone
from airflow.configuration import conf
from airflow.models.asset import (
Expand Down Expand Up @@ -762,6 +763,7 @@ def write_dag(
session.merge(dag_version)
# Update the latest DagCode
DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session)
stats.incr("dag.serialization", tags={"dag_id": dag.dag_id, "bundle_name": bundle_name})
return True

dagv = DagVersion.write_dag(
Expand All @@ -777,6 +779,7 @@ def write_dag(
cls._create_deadline_alert_records(new_serialized_dag, deadline_uuid_mapping)
log.debug("DAG: %s written to the DB", dag.dag_id)
DagCode.write_code(dagv, dag.fileloc, session=session)
stats.incr("dag.serialization", tags={"dag_id": dag.dag_id, "bundle_name": bundle_name})
return True

@classmethod
Expand Down
73 changes: 73 additions & 0 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlalchemy import delete, func, select, update

import airflow.example_dags as example_dags_module
from airflow._shared.observability.metrics.base_stats_logger import StatsLogger
from airflow.dag_processing.dagbag import DagBag
from airflow.models.asset import AssetActive, AssetAliasModel, AssetModel
from airflow.models.dag import DagModel
Expand Down Expand Up @@ -90,6 +91,8 @@ def make_example_dags(module):
class TestSerializedDagModel:
"""Unit tests for SerializedDagModel."""

SERIALIZED_DAG_STATS = "airflow.models.serialized_dag.stats"

@pytest.fixture(
autouse=True,
params=[
Expand Down Expand Up @@ -201,6 +204,76 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle):
assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"]
assert dag_updated is True

Comment thread
Ei-Sandi marked this conversation as resolved.
def test_serialization_metric_incremented_on_new_write(self, testing_dag_bundle):
"""A brand new serialized DAG write emits the ``dag.serialization`` metric."""
dag = make_example_dags(example_dags_module).get("example_params_trigger_ui")
with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="testing") is True

mock_stats.incr.assert_called_once_with(
"dag.serialization",
tags={"dag_id": dag.dag_id, "bundle_name": "testing"},
)

def test_serialization_metric_not_incremented_when_unchanged(self, testing_dag_bundle):
"""Re-writing an unchanged DAG must not emit the ``dag.serialization`` metric."""
dag = make_example_dags(example_dags_module).get("example_params_trigger_ui")
assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="testing") is True

with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="testing") is False

mock_stats.incr.assert_not_called()

def test_serialization_metric_incremented_on_inplace_update(self, dag_maker, session):
"""Updating a DAG version in place (no dag runs) emits the metric once."""
with dag_maker("metric_dag") as dag:
PythonOperator(task_id="task1", python_callable=lambda: None)
# Change the DAG so the hash differs; with no dag runs this updates in place.
PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)

with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") is True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Half of the tests are setting bundle_name as testing and the other half as dag_maker. This isn't an issue but it's a bit confusing because someone might think that the context changes, but I don't think that's the case.


assert session.scalar(select(func.count()).select_from(DagVersion)) == 1
mock_stats.incr.assert_called_once_with(
"dag.serialization",
tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"},
)

def test_serialization_metric_incremented_on_new_version(self, dag_maker, session):
"""Writing a new DAG version (existing run) emits the metric once."""
with dag_maker("metric_dag") as dag:
PythonOperator(task_id="task1", python_callable=lambda: None)
dag_maker.create_dagrun(run_id="run1", logical_date=pendulum.datetime(2025, 1, 1))
PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag)

with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats:
assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") is True

assert session.scalar(select(func.count()).select_from(DagVersion)) == 2
mock_stats.incr.assert_called_once_with(
"dag.serialization",
tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"},
)

@mock.patch("airflow._shared.observability.metrics.stats._export_legacy_names", True)
@mock.patch("airflow._shared.observability.metrics.stats._get_backend")
def test_serialization_metric_exports_new_and_legacy_names(self, mock_get_backend, testing_dag_bundle):
"""Serializing a DAG emits both the modern ``dag.serialization`` metric and its legacy name."""
mock_backend = mock.MagicMock(spec=StatsLogger)
mock_get_backend.return_value = mock_backend
dag = make_example_dags(example_dags_module).get("example_params_trigger_ui")

assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="testing") is True

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the bundle_name is also used at least twice for each test and it could be a common variable.


mock_backend.incr.assert_has_calls(
[
mock.call(f"dag.serialization.{dag.dag_id}.testing"),
mock.call("dag.serialization", tags={"dag_id": dag.dag_id, "bundle_name": "testing"}),
]
)

def test_read_dags(self):
"""DAGs can be read from database."""
example_dags = self._write_example_dags()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "dag.serialization"
description: "Number of times a Dag was serialized and written to the metadata DB.
Metric with dag_id and bundle_name tagging."
type: "counter"
legacy_name: "dag.serialization.{dag_id}.{bundle_name}"
name_variables: ["dag_id", "bundle_name"]

- name: "celery.task_timeout_error"
description: "Number of ``AirflowTaskTimeout`` errors raised when publishing Task to Celery Broker."
type: "counter"
Expand Down
Loading