diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py index d2a0b652ca69..e5d30da1e4c9 100644 --- a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo.py @@ -38,6 +38,25 @@ def get_transformed_categorical_column_name(column_name_or_id): return column_name + '_id' +def fill_in_missing(feature, default_value): + """Fills missing values in a rank 2 SparseTensor. + + Args: + feature: A rank 2 SparseTensor with at most one value per row. + default_value: The value to fill in for missing entries. + + Returns: + A rank 1 Tensor with missing entries filled in. + """ + feature = tf.sparse.to_dense( + tf.SparseTensor( + feature.indices, + feature.values, + tf.stack([feature.dense_shape[0], 1])), + default_value=default_value) + return tf.squeeze(feature, axis=1) + + _INTEGER_COLUMN_NAMES = [ 'int-feature-{}'.format(column_idx) for column_idx in range(1, 14) ] @@ -132,23 +151,12 @@ def preprocessing_fn(inputs): result = {'clicked': inputs['clicked']} for name in _INTEGER_COLUMN_NAMES: feature = inputs[name] - # TODO(https://github.com/apache/beam/issues/24902): - # Replace this boilerplate with a helper function. - # This is a SparseTensor because it is optional. Here we fill in a - # default value when it is missing. - feature = tft.sparse_tensor_to_dense_with_shape( - feature, [None, 1], default_value=-1) - # Reshaping from a batch of vectors of size 1 to a batch of scalars and - # adding a bucketized version. - feature = tf.squeeze(feature, axis=1) + feature = fill_in_missing(feature, -1) result[name] = feature result[name + '_bucketized'] = tft.bucketize(feature, _NUM_BUCKETS) for name in _CATEGORICAL_COLUMN_NAMES: feature = inputs[name] - # Similar to for integer columns, but use '' as default. - feature = tft.sparse_tensor_to_dense_with_shape( - feature, [None, 1], default_value='') - feature = tf.squeeze(feature, axis=1) + feature = fill_in_missing(feature, '') result[get_transformed_categorical_column_name( name)] = tft.compute_and_apply_vocabulary( feature, frequency_threshold=frequency_threshold) diff --git a/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py new file mode 100644 index 000000000000..ce2dd2593d0c --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/cloudml/criteo_tft/criteo_test.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +try: + import tensorflow as tf + + from apache_beam.testing.benchmarks.cloudml.criteo_tft import criteo +except ImportError: + raise unittest.SkipTest('Dependencies are not installed') + + +class CriteoTest(tf.test.TestCase): + def test_fill_in_missing_int_feature(self): + feature = tf.SparseTensor( + indices=[[0, 0], [2, 0]], + values=tf.constant([10, 30], dtype=tf.int64), + dense_shape=[3, 1]) + + result = criteo.fill_in_missing(feature, -1) + + self.assertAllEqual(result, [10, -1, 30]) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_int_feature_traces_with_dynamic_shape(self): + @tf.function( + input_signature=[ + tf.SparseTensorSpec(shape=[None, None], dtype=tf.int64) + ]) + def fill_in_missing(feature): + return criteo.fill_in_missing(feature, -1) + + feature = tf.SparseTensor( + indices=[[0, 0], [2, 0]], + values=tf.constant([10, 30], dtype=tf.int64), + dense_shape=[3, 1]) + + result = fill_in_missing(feature) + + self.assertAllEqual(result, [10, -1, 30]) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_all_missing_int_feature(self): + feature = tf.SparseTensor( + indices=tf.zeros([0, 2], dtype=tf.int64), + values=tf.constant([], dtype=tf.int64), + dense_shape=[3, 0]) + + result = criteo.fill_in_missing(feature, -1) + + self.assertAllEqual(result, [-1, -1, -1]) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_string_feature(self): + feature = tf.SparseTensor( + indices=[[0, 0], [2, 0]], + values=tf.constant(['a', 'c'], dtype=tf.string), + dense_shape=[3, 1]) + + result = criteo.fill_in_missing(feature, '') + + self.assertAllEqual(result, [b'a', b'', b'c']) + self.assertEqual(result.shape.rank, 1) + + def test_fill_in_missing_all_missing_string_feature(self): + feature = tf.SparseTensor( + indices=tf.zeros([0, 2], dtype=tf.int64), + values=tf.constant([], dtype=tf.string), + dense_shape=[3, 0]) + + result = criteo.fill_in_missing(feature, '') + + self.assertAllEqual(result, [b'', b'', b'']) + self.assertEqual(result.shape.rank, 1) + + +if __name__ == '__main__': + unittest.main()