From fb0ed81fb06c203294e541ced7c16d1a98583257 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Wed, 17 Jun 2026 18:48:42 -0500 Subject: [PATCH 1/3] Added Criteo TFT fill_in_missing helper --- .../benchmarks/cloudml/criteo_tft/criteo.py | 32 ++++---- .../cloudml/criteo_tft/criteo_test.py | 74 +++++++++++++++++++ 2 files changed, 93 insertions(+), 13 deletions(-) create mode 100644 sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py index d2a0b652ca69..be15d481bb11 100644 --- a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py @@ -38,6 +38,23 @@ def get_transformed_categorical_column_name(column_name_or_id): return column_name + '_id' +def fill_in_missing(feature, default_value): + """Fills missing values in a rank 2 SparseTensor. + + Args: + feature: A rank 2 SparseTensor with at most one value per row. + default_value: The value to fill in for missing entries. + + Returns: + A rank 1 Tensor with missing entries filled in. + """ + feature = tf.sparse.to_dense( + tf.SparseTensor( + feature.indices, feature.values, [feature.dense_shape[0], 1]), + default_value=default_value) + return tf.squeeze(feature, axis=1) + + _INTEGER_COLUMN_NAMES = [ 'int-feature-{}'.format(column_idx) for column_idx in range(1, 14) ] @@ -132,23 +149,12 @@ def preprocessing_fn(inputs): result = {'clicked': inputs['clicked']} for name in _INTEGER_COLUMN_NAMES: feature = inputs[name] - # TODO(https://github.com/apache/beam/issues/24902): - # Replace this boilerplate with a helper function. - # This is a SparseTensor because it is optional. Here we fill in a - # default value when it is missing. - feature = tft.sparse_tensor_to_dense_with_shape( - feature, [None, 1], default_value=-1) - # Reshaping from a batch of vectors of size 1 to a batch of scalars and - # adding a bucketized version. - feature = tf.squeeze(feature, axis=1) + feature = fill_in_missing(feature, -1) result[name] = feature result[name + '_bucketized'] = tft.bucketize(feature, _NUM_BUCKETS) for name in _CATEGORICAL_COLUMN_NAMES: feature = inputs[name] - # Similar to for integer columns, but use '' as default. - feature = tft.sparse_tensor_to_dense_with_shape( - feature, [None, 1], default_value='') - feature = tf.squeeze(feature, axis=1) + feature = fill_in_missing(feature, '') result[get_transformed_categorical_column_name( name)] = tft.compute_and_apply_vocabulary( feature, frequency_threshold=frequency_threshold) diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py new file mode 100644 index 000000000000..cd79aebac6dd --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +try: + import tensorflow as tf + from apache_beam.testing.benchmarks.cloudml.criteo_tft import criteo +except ImportError: + raise unittest.SkipTest('Dependencies are not installed') + + +class CriteoTest(tf.test.TestCase): + def test_fill_in_missing_int_feature(self): + feature = tf.SparseTensor( + indices=[[0, 0], [2, 0]], + values=tf.constant([10, 30], dtype=tf.int64), + dense_shape=[3, 1]) + + result = criteo.fill_in_missing(feature, -1) + + self.assertAllEqual(result, [10, -1, 30]) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_all_missing_int_feature(self): + feature = tf.SparseTensor( + indices=tf.zeros([0, 2], dtype=tf.int64), + values=tf.constant([], dtype=tf.int64), + dense_shape=[3, 0]) + + result = criteo.fill_in_missing(feature, -1) + + self.assertAllEqual(result, [-1, -1, -1]) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_string_feature(self): + feature = tf.SparseTensor( + indices=[[0, 0], [2, 0]], + values=tf.constant(['a', 'c'], dtype=tf.string), + dense_shape=[3, 1]) + + result = criteo.fill_in_missing(feature, '') + + self.assertAllEqual(result, [b'a', b'', b'c']) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_all_missing_string_feature(self): + feature = tf.SparseTensor( + indices=tf.zeros([0, 2], dtype=tf.int64), + values=tf.constant([], dtype=tf.string), + dense_shape=[3, 0]) + + result = criteo.fill_in_missing(feature, '') + + self.assertAllEqual(result, [b'', b'', b'']) + self.assertEqual(result.shape.rank, 1) + + +if __name__ == '__main__': + unittest.main() From ca3dcc24b4ac3f75dd102ecefaba4c4132e38668 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Wed, 17 Jun 2026 21:49:57 -0500 Subject: [PATCH 2/3] Fix Criteo sparse tensor shape tracing --- .../benchmarks/cloudml/criteo_tft/criteo.py | 4 +++- .../cloudml/criteo_tft/criteo_test.py | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py index be15d481bb11..e5d30da1e4c9 100644 --- a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py @@ -50,7 +50,9 @@ def fill_in_missing(feature, default_value): """ feature = tf.sparse.to_dense( tf.SparseTensor( - feature.indices, feature.values, [feature.dense_shape[0], 1]), + feature.indices, + feature.values, + tf.stack([feature.dense_shape[0], 1])), default_value=default_value) return tf.squeeze(feature, axis=1) diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py index cd79aebac6dd..d7f399b598da 100644 --- a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py @@ -36,6 +36,24 @@ def test_fill_in_missing_int_feature(self): self.assertAllEqual(result, [10, -1, 30]) self.assertEqual(result.shape.rank, 1) + def test_fill_in_missing_int_feature_traces_with_dynamic_shape(self): + @tf.function( + input_signature=[ + tf.SparseTensorSpec(shape=[None, None], dtype=tf.int64) + ]) + def fill_in_missing(feature): + return criteo.fill_in_missing(feature, -1) + + feature = tf.SparseTensor( + indices=[[0, 0], [2, 0]], + values=tf.constant([10, 30], dtype=tf.int64), + dense_shape=[3, 1]) + + result = fill_in_missing(feature) + + self.assertAllEqual(result, [10, -1, 30]) + self.assertEqual(result.shape.rank, 1) + def test_fill_in_missing_all_missing_int_feature(self): feature = tf.SparseTensor( indices=tf.zeros([0, 2], dtype=tf.int64), From c2a2c68a868e0eaac6efbebfdd0a33b160b0a8a0 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Thu, 18 Jun 2026 07:57:39 -0500 Subject: [PATCH 3/3] Lint fixed --- .../testing/benchmarks/cloudml/criteo_tft/criteo_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py index d7f399b598da..ce2dd2593d0c 100644 --- a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py @@ -19,6 +19,7 @@ try: import tensorflow as tf + from apache_beam.testing.benchmarks.cloudml.criteo_tft import criteo except ImportError: raise unittest.SkipTest('Dependencies are not installed')