From 31f4624229b9f444795a4711037a58d52804997e Mon Sep 17 00:00:00 2001 From: Ei-Sandi Date: Tue, 23 Jun 2026 14:00:07 +0000 Subject: [PATCH] Add metrics for how often a dag gets serialised. --- .../src/airflow/models/serialized_dag.py | 3 + .../tests/unit/models/test_serialized_dag.py | 80 ++++++++++++++++++- .../metrics/metrics_template.yaml | 7 ++ 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..bd8bb856a932b 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -32,6 +32,7 @@ from sqlalchemy.orm import Mapped, backref, foreign, mapped_column, relationship from sqlalchemy.sql.expression import func, literal +from airflow._shared.observability.metrics import stats from airflow._shared.timezones import timezone from airflow.configuration import conf from airflow.models.asset import ( @@ -762,6 +763,7 @@ def write_dag( session.merge(dag_version) # Update the latest DagCode DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) + stats.incr("dag.serialization", tags={"dag_id": dag.dag_id, "bundle_name": bundle_name}) return True dagv = DagVersion.write_dag( @@ -777,6 +779,7 @@ def write_dag( cls._create_deadline_alert_records(new_serialized_dag, deadline_uuid_mapping) log.debug("DAG: %s written to the DB", dag.dag_id) DagCode.write_code(dagv, dag.fileloc, session=session) + stats.incr("dag.serialization", tags={"dag_id": dag.dag_id, "bundle_name": bundle_name}) return True @classmethod diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index a1ad63de5d688..8df53f5cfdcd1 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -28,6 +28,7 @@ from sqlalchemy import delete, func, select, update import airflow.example_dags as example_dags_module +from airflow._shared.observability.metrics.base_stats_logger import StatsLogger from airflow.dag_processing.dagbag import DagBag from airflow.models.asset import AssetActive, AssetAliasModel, AssetModel from airflow.models.dag import DagModel @@ -90,6 +91,9 @@ def make_example_dags(module): class TestSerializedDagModel: """Unit tests for SerializedDagModel.""" + SERIALIZED_DAG_STATS = "airflow.models.serialized_dag.stats" + TEST_BUNDLE_NAME = "testing" + @pytest.fixture( autouse=True, params=[ @@ -161,7 +165,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle): example_params_trigger_ui = example_dags.get("example_params_trigger_ui") dag_updated = SDM.write_dag( dag=LazyDeserializedDAG.from_dag(example_params_trigger_ui), - bundle_name="testing", + bundle_name=self.TEST_BUNDLE_NAME, ) assert dag_updated is True @@ -178,7 +182,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle): # column is not updated dag_updated = SDM.write_dag( dag=LazyDeserializedDAG.from_dag(example_params_trigger_ui), - bundle_name="testing", + bundle_name=self.TEST_BUNDLE_NAME, ) s_dag_1 = SDM.get(example_params_trigger_ui.dag_id) @@ -192,7 +196,7 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle): dag_updated = SDM.write_dag( dag=LazyDeserializedDAG.from_dag(example_params_trigger_ui), - bundle_name="testing", + bundle_name=self.TEST_BUNDLE_NAME, ) s_dag_2 = SDM.get(example_params_trigger_ui.dag_id) @@ -201,6 +205,76 @@ def test_serialized_dag_is_updated_if_dag_is_changed(self, testing_dag_bundle): assert s_dag_2.data["dag"]["tags"] == ["example", "new_tag", "params"] assert dag_updated is True + def test_serialization_metric_incremented_on_new_write(self, testing_dag_bundle): + """A brand new serialized DAG write emits the ``dag.serialization`` metric.""" + dag = make_example_dags(example_dags_module).get("example_params_trigger_ui") + with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats: + assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name=self.TEST_BUNDLE_NAME) is True + + mock_stats.incr.assert_called_once_with( + "dag.serialization", + tags={"dag_id": dag.dag_id, "bundle_name": self.TEST_BUNDLE_NAME}, + ) + + def test_serialization_metric_not_incremented_when_unchanged(self, testing_dag_bundle): + """Re-writing an unchanged DAG must not emit the ``dag.serialization`` metric.""" + dag = make_example_dags(example_dags_module).get("example_params_trigger_ui") + assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name=self.TEST_BUNDLE_NAME) is True + + with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats: + assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name=self.TEST_BUNDLE_NAME) is False + + mock_stats.incr.assert_not_called() + + def test_serialization_metric_incremented_on_inplace_update(self, dag_maker, session): + """Updating a DAG version in place (no dag runs) emits the metric once.""" + with dag_maker("metric_dag") as dag: + PythonOperator(task_id="task1", python_callable=lambda: None) + # Change the DAG so the hash differs; with no dag runs this updates in place. + PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag) + + with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats: + assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") is True + + assert session.scalar(select(func.count()).select_from(DagVersion)) == 1 + mock_stats.incr.assert_called_once_with( + "dag.serialization", + tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"}, + ) + + def test_serialization_metric_incremented_on_new_version(self, dag_maker, session): + """Writing a new DAG version (existing run) emits the metric once.""" + with dag_maker("metric_dag") as dag: + PythonOperator(task_id="task1", python_callable=lambda: None) + dag_maker.create_dagrun(run_id="run1", logical_date=pendulum.datetime(2025, 1, 1)) + PythonOperator(task_id="task2", python_callable=lambda: None, dag=dag) + + with mock.patch(self.SERIALIZED_DAG_STATS) as mock_stats: + assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") is True + + assert session.scalar(select(func.count()).select_from(DagVersion)) == 2 + mock_stats.incr.assert_called_once_with( + "dag.serialization", + tags={"dag_id": "metric_dag", "bundle_name": "dag_maker"}, + ) + + @mock.patch("airflow._shared.observability.metrics.stats._export_legacy_names", True) + @mock.patch("airflow._shared.observability.metrics.stats._get_backend") + def test_serialization_metric_exports_new_and_legacy_names(self, mock_get_backend, testing_dag_bundle): + """Serializing a DAG emits both the modern ``dag.serialization`` metric and its legacy name.""" + mock_backend = mock.MagicMock(spec=StatsLogger) + mock_get_backend.return_value = mock_backend + dag = make_example_dags(example_dags_module).get("example_params_trigger_ui") + + assert SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name=self.TEST_BUNDLE_NAME) is True + + mock_backend.incr.assert_has_calls( + [ + mock.call(f"dag.serialization.{dag.dag_id}.{self.TEST_BUNDLE_NAME}"), + mock.call("dag.serialization", tags={"dag_id": dag.dag_id, "bundle_name": self.TEST_BUNDLE_NAME}), + ] + ) + def test_read_dags(self): """DAGs can be read from database.""" example_dags = self._write_example_dags() diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index fa26e0a4c481f..7a75dc810716e 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -226,6 +226,13 @@ metrics: legacy_name: "-" name_variables: [] + - name: "dag.serialization" + description: "Number of times a Dag was serialized and written to the metadata DB. + Metric with dag_id and bundle_name tagging." + type: "counter" + legacy_name: "dag.serialization.{dag_id}.{bundle_name}" + name_variables: ["dag_id", "bundle_name"] + - name: "celery.task_timeout_error" description: "Number of ``AirflowTaskTimeout`` errors raised when publishing Task to Celery Broker." type: "counter"