easy_rec/python/compat/feature_column/sequence_feature_column.py (236 lines of code) (raw):

# -*- encoding:utf-8 -*- # Copyright 2018 The TensorFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """This API defines FeatureColumn for sequential input. NOTE: This API is a work in progress and will likely be changing frequently. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections from tensorflow.python.framework import dtypes from tensorflow.python.framework import ops from tensorflow.python.framework import tensor_shape from tensorflow.python.ops import array_ops from tensorflow.python.ops import check_ops from tensorflow.python.ops import math_ops from tensorflow.python.ops import parsing_ops from tensorflow.python.ops import sparse_ops from easy_rec.python.compat.feature_column import feature_column as fc_v1 from easy_rec.python.compat.feature_column import feature_column_v2 as fc from easy_rec.python.compat.feature_column import utils as fc_utils # pylint: disable=protected-access class SequenceFeatures(fc._BaseFeaturesLayer): """A layer for sequence input. All `feature_columns` must be sequence dense columns with the same `sequence_length`. The output of this method can be fed into sequence networks, such as RNN. The output of this method is a 3D `Tensor` of shape `[batch_size, T, D]`. `T` is the maximum sequence length for this batch, which could differ from batch to batch. If multiple `feature_columns` are given with `Di` `num_elements` each, their outputs are concatenated. So, the final `Tensor` has shape `[batch_size, T, D0 + D1 + ... + Dn]`. Example: ```python rating = sequence_numeric_column('rating') watches = sequence_categorical_column_with_identity( 'watches', num_buckets=1000) watches_embedding = embedding_column(watches, dimension=10) columns = [rating, watches_embedding] sequence_input_layer = SequenceFeatures(columns) features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) sequence_input, sequence_length = sequence_input_layer(features) sequence_length_mask = tf.sequence_mask(sequence_length) rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size) rnn_layer = tf.keras.layers.RNN(rnn_cell) outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask) ``` """ def __init__(self, feature_columns, trainable=True, name=None, **kwargs): """Constructs a SequenceFeatures layer. Args: feature_columns: An iterable of dense sequence columns. Valid columns are - `embedding_column` that wraps a `sequence_categorical_column_with_*` - `sequence_numeric_column`. trainable: Boolean, whether the layer's variables will be updated via gradient descent during training. name: Name to give to the SequenceFeatures. **kwargs: Keyword arguments to construct a layer. Raises: ValueError: If any of the `feature_columns` is not a `SequenceDenseColumn`. """ super(SequenceFeatures, self).__init__( feature_columns=feature_columns, trainable=trainable, name=name, expected_column_type=fc.SequenceDenseColumn, **kwargs) def _target_shape(self, input_shape, total_elements): return (input_shape[0], input_shape[1], total_elements) def call(self, features): """Returns sequence input corresponding to the `feature_columns`. Args: features: A dict mapping keys to tensors. Returns: An `(input_layer, sequence_length)` tuple where: - input_layer: A float `Tensor` of shape `[batch_size, T, D]`. `T` is the maximum sequence length for this batch, which could differ from batch to batch. `D` is the sum of `num_elements` for all `feature_columns`. - sequence_length: An int `Tensor` of shape `[batch_size]`. The sequence length for each example. Raises: ValueError: If features are not a dictionary. """ if not isinstance(features, dict): raise ValueError('We expected a dictionary here. Instead we got: ', features) transformation_cache = fc.FeatureTransformationCache(features) output_tensors = [] sequence_lengths = [] for column in self._feature_columns: with ops.name_scope(column.name): dense_tensor, sequence_length = column.get_sequence_dense_tensor( transformation_cache, self._state_manager) # Flattens the final dimension to produce a 3D Tensor. output_tensors.append(self._process_dense_tensor(column, dense_tensor)) sequence_lengths.append(sequence_length) # Check and process sequence lengths. fc._verify_static_batch_size_equality(sequence_lengths, self._feature_columns) sequence_length = _assert_all_equal_and_return(sequence_lengths) return self._verify_and_concat_tensors(output_tensors), sequence_length def concatenate_context_input(context_input, sequence_input): """Replicates `context_input` across all timesteps of `sequence_input`. Expands dimension 1 of `context_input` then tiles it `sequence_length` times. This value is appended to `sequence_input` on dimension 2 and the result is returned. Args: context_input: A `Tensor` of dtype `float32` and shape `[batch_size, d1]`. sequence_input: A `Tensor` of dtype `float32` and shape `[batch_size, padded_length, d0]`. Returns: A `Tensor` of dtype `float32` and shape `[batch_size, padded_length, d0 + d1]`. Raises: ValueError: If `sequence_input` does not have rank 3 or `context_input` does not have rank 2. """ seq_rank_check = check_ops.assert_rank( sequence_input, 3, message='sequence_input must have rank 3', data=[array_ops.shape(sequence_input)]) seq_type_check = check_ops.assert_type( sequence_input, dtypes.float32, message='sequence_input must have dtype float32; got {}.'.format( sequence_input.dtype)) ctx_rank_check = check_ops.assert_rank( context_input, 2, message='context_input must have rank 2', data=[array_ops.shape(context_input)]) ctx_type_check = check_ops.assert_type( context_input, dtypes.float32, message='context_input must have dtype float32; got {}.'.format( context_input.dtype)) with ops.control_dependencies( [seq_rank_check, seq_type_check, ctx_rank_check, ctx_type_check]): padded_length = array_ops.shape(sequence_input)[1] tiled_context_input = array_ops.tile( array_ops.expand_dims(context_input, 1), array_ops.concat([[1], [padded_length], [1]], 0)) return array_ops.concat([sequence_input, tiled_context_input], 2) def sequence_categorical_column_with_identity(key, num_buckets, default_value=None, feature_name=None): """Returns a feature column that represents sequences of integers. Pass this to `embedding_column` or `indicator_column` to convert sequence categorical data into dense representation for input to sequence NN, such as RNN. Example: ```python watches = sequence_categorical_column_with_identity( 'watches', num_buckets=1000) watches_embedding = embedding_column(watches, dimension=10) columns = [watches_embedding] features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) sequence_feature_layer = SequenceFeatures(columns) sequence_input, sequence_length = sequence_feature_layer(features) sequence_length_mask = tf.sequence_mask(sequence_length) rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size) rnn_layer = tf.keras.layers.RNN(rnn_cell) outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask) ``` Args: key: A unique string identifying the input feature. num_buckets: Range of inputs. Namely, inputs are expected to be in the range `[0, num_buckets)`. default_value: If `None`, this column's graph operations will fail for out-of-range inputs. Otherwise, this value must be in the range `[0, num_buckets)`, and will replace out-of-range inputs. Returns: A `SequenceCategoricalColumn`. Raises: ValueError: if `num_buckets` is less than one. ValueError: if `default_value` is not in range `[0, num_buckets)`. """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_identity( feature_name=feature_name, key=key, num_buckets=num_buckets, default_value=default_value)) def sequence_numeric_column_with_bucketized_column(source_column, boundaries): if not isinstance(source_column, (SequenceNumericColumn,)): # pylint: disable=protected-access raise ValueError( 'source_column must be a column generated with sequence_numeric_column(). ' 'Given: {}'.format(source_column)) if len(source_column.shape) > 1: raise ValueError('source_column must be one-dimensional column. ' 'Given: {}'.format(source_column)) if not boundaries: raise ValueError('boundaries must not be empty.') if not (isinstance(boundaries, list) or isinstance(boundaries, tuple)): raise ValueError('boundaries must be a sorted list.') for i in range(len(boundaries) - 1): if boundaries[i] >= boundaries[i + 1]: raise ValueError('boundaries must be a sorted list.') return fc.SequenceBucketizedColumn(source_column, tuple(boundaries)) def sequence_numeric_column_with_raw_column(source_column, sequence_length): if not isinstance(source_column, (SequenceNumericColumn,)): # pylint: disable=protected-access raise ValueError( 'source_column must be a column generated with sequence_numeric_column(). ' 'Given: {}'.format(source_column)) if len(source_column.shape) > 1: raise ValueError('source_column must be one-dimensional column. ' 'Given: {}'.format(source_column)) return fc.SequenceNumericColumn(source_column, sequence_length) def sequence_weighted_categorical_column(categorical_column, weight_feature_key, dtype=dtypes.float32): if (dtype is None) or not (dtype.is_integer or dtype.is_floating): raise ValueError('dtype {} is not convertible to float.'.format(dtype)) return fc.SequenceWeightedCategoricalColumn( categorical_column=categorical_column, weight_feature_key=weight_feature_key, dtype=dtype) def sequence_categorical_column_with_hash_bucket(key, hash_bucket_size, dtype=dtypes.string, feature_name=None): """A sequence of categorical terms where ids are set by hashing. Pass this to `embedding_column` or `indicator_column` to convert sequence categorical data into dense representation for input to sequence NN, such as RNN. Example: ```python tokens = sequence_categorical_column_with_hash_bucket( 'tokens', hash_bucket_size=1000) tokens_embedding = embedding_column(tokens, dimension=10) columns = [tokens_embedding] features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) sequence_feature_layer = SequenceFeatures(columns) sequence_input, sequence_length = sequence_feature_layer(features) sequence_length_mask = tf.sequence_mask(sequence_length) rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size) rnn_layer = tf.keras.layers.RNN(rnn_cell) outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask) ``` Args: key: A unique string identifying the input feature. hash_bucket_size: An int > 1. The number of buckets. dtype: The type of features. Only string and integer types are supported. Returns: A `SequenceCategoricalColumn`. Raises: ValueError: `hash_bucket_size` is not greater than 1. ValueError: `dtype` is neither string nor integer. """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_hash_bucket( feature_name=feature_name, key=key, hash_bucket_size=hash_bucket_size, dtype=dtype)) def sequence_categorical_column_with_vocabulary_file(key, vocabulary_file, vocabulary_size=None, num_oov_buckets=0, default_value=None, dtype=dtypes.string, feature_name=None): """A sequence of categorical terms where ids use a vocabulary file. Pass this to `embedding_column` or `indicator_column` to convert sequence categorical data into dense representation for input to sequence NN, such as RNN. Example: ```python states = sequence_categorical_column_with_vocabulary_file( key='states', vocabulary_file='/us/states.txt', vocabulary_size=50, num_oov_buckets=5) states_embedding = embedding_column(states, dimension=10) columns = [states_embedding] features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) sequence_feature_layer = SequenceFeatures(columns) sequence_input, sequence_length = sequence_feature_layer(features) sequence_length_mask = tf.sequence_mask(sequence_length) rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size) rnn_layer = tf.keras.layers.RNN(rnn_cell) outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask) ``` Args: key: A unique string identifying the input feature. vocabulary_file: The vocabulary file name. vocabulary_size: Number of the elements in the vocabulary. This must be no greater than length of `vocabulary_file`, if less than length, later values are ignored. If None, it is set to the length of `vocabulary_file`. num_oov_buckets: Non-negative integer, the number of out-of-vocabulary buckets. All out-of-vocabulary inputs will be assigned IDs in the range `[vocabulary_size, vocabulary_size+num_oov_buckets)` based on a hash of the input value. A positive `num_oov_buckets` can not be specified with `default_value`. default_value: The integer ID value to return for out-of-vocabulary feature values, defaults to `-1`. This can not be specified with a positive `num_oov_buckets`. dtype: The type of features. Only string and integer types are supported. Returns: A `SequenceCategoricalColumn`. Raises: ValueError: `vocabulary_file` is missing or cannot be opened. ValueError: `vocabulary_size` is missing or < 1. ValueError: `num_oov_buckets` is a negative integer. ValueError: `num_oov_buckets` and `default_value` are both specified. ValueError: `dtype` is neither string nor integer. """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_vocabulary_file( feature_name=feature_name, key=key, vocabulary_file=vocabulary_file, vocabulary_size=vocabulary_size, num_oov_buckets=num_oov_buckets, default_value=default_value, dtype=dtype)) def sequence_categorical_column_with_vocabulary_list(key, vocabulary_list, dtype=None, default_value=-1, num_oov_buckets=0, feature_name=None): """A sequence of categorical terms where ids use an in-memory list. Pass this to `embedding_column` or `indicator_column` to convert sequence categorical data into dense representation for input to sequence NN, such as RNN. Example: ```python colors = sequence_categorical_column_with_vocabulary_list( key='colors', vocabulary_list=('R', 'G', 'B', 'Y'), num_oov_buckets=2) colors_embedding = embedding_column(colors, dimension=3) columns = [colors_embedding] features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) sequence_feature_layer = SequenceFeatures(columns) sequence_input, sequence_length = sequence_feature_layer(features) sequence_length_mask = tf.sequence_mask(sequence_length) rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size) rnn_layer = tf.keras.layers.RNN(rnn_cell) outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask) ``` Args: key: A unique string identifying the input feature. vocabulary_list: An ordered iterable defining the vocabulary. Each feature is mapped to the index of its value (if present) in `vocabulary_list`. Must be castable to `dtype`. dtype: The type of features. Only string and integer types are supported. If `None`, it will be inferred from `vocabulary_list`. default_value: The integer ID value to return for out-of-vocabulary feature values, defaults to `-1`. This can not be specified with a positive `num_oov_buckets`. num_oov_buckets: Non-negative integer, the number of out-of-vocabulary buckets. All out-of-vocabulary inputs will be assigned IDs in the range `[len(vocabulary_list), len(vocabulary_list)+num_oov_buckets)` based on a hash of the input value. A positive `num_oov_buckets` can not be specified with `default_value`. Returns: A `SequenceCategoricalColumn`. Raises: ValueError: if `vocabulary_list` is empty, or contains duplicate keys. ValueError: `num_oov_buckets` is a negative integer. ValueError: `num_oov_buckets` and `default_value` are both specified. ValueError: if `dtype` is not integer or string. """ return fc.SequenceCategoricalColumn( fc.categorical_column_with_vocabulary_list( feature_name=feature_name, key=key, vocabulary_list=vocabulary_list, dtype=dtype, default_value=default_value, num_oov_buckets=num_oov_buckets)) def sequence_numeric_column(key, shape=(1,), default_value=0., dtype=dtypes.float32, normalizer_fn=None, feature_name=None): """Returns a feature column that represents sequences of numeric data. Example: ```python temperature = sequence_numeric_column('temperature') columns = [temperature] features = tf.io.parse_example(..., features=make_parse_example_spec(columns)) sequence_feature_layer = SequenceFeatures(columns) sequence_input, sequence_length = sequence_feature_layer(features) sequence_length_mask = tf.sequence_mask(sequence_length) rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size) rnn_layer = tf.keras.layers.RNN(rnn_cell) outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask) ``` Args: key: A unique string identifying the input features. shape: The shape of the input data per sequence id. E.g. if `shape=(2,)`, each example must contain `2 * sequence_length` values. default_value: A single value compatible with `dtype` that is used for padding the sparse data into a dense `Tensor`. dtype: The type of values. normalizer_fn: If not `None`, a function that can be used to normalize the value of the tensor after `default_value` is applied for parsing. Normalizer function takes the input `Tensor` as its argument, and returns the output `Tensor`. (e.g. lambda x: (x - 3.0) / 4.2). Please note that even though the most common use case of this function is normalization, it can be used for any kind of Tensorflow transformations. Returns: A `SequenceNumericColumn`. Raises: TypeError: if any dimension in shape is not an int. ValueError: if any dimension in shape is not a positive integer. ValueError: if `dtype` is not convertible to `tf.float32`. """ shape = fc._check_shape(shape=shape, key=key) if not (dtype.is_integer or dtype.is_floating): raise ValueError('dtype must be convertible to float. ' 'dtype: {}, key: {}'.format(dtype, key)) if normalizer_fn is not None and not callable(normalizer_fn): raise TypeError( 'normalizer_fn must be a callable. Given: {}'.format(normalizer_fn)) return SequenceNumericColumn( feature_name=feature_name, key=key, shape=shape, default_value=default_value, dtype=dtype, normalizer_fn=normalizer_fn) def _assert_all_equal_and_return(tensors, name=None): """Asserts that all tensors are equal and returns the first one.""" with ops.name_scope(name, 'assert_all_equal', values=tensors): if len(tensors) == 1: return tensors[0] assert_equal_ops = [] for t in tensors[1:]: assert_equal_ops.append(check_ops.assert_equal(tensors[0], t)) with ops.control_dependencies(assert_equal_ops): return array_ops.identity(tensors[0]) class SequenceNumericColumn( fc.SequenceDenseColumn, fc_v1._FeatureColumn, collections.namedtuple('SequenceNumericColumn', ('feature_name', 'key', 'shape', 'default_value', 'dtype', 'normalizer_fn'))): """Represents sequences of numeric data.""" @property def _is_v2_column(self): return True @property def name(self): """See `FeatureColumn` base class.""" return self.feature_name if self.feature_name else self.key @property def raw_name(self): """See `FeatureColumn` base class.""" return self.key @property def parse_example_spec(self): """See `FeatureColumn` base class.""" return {self.key: parsing_ops.VarLenFeature(self.dtype)} def _transform_feature(self, inputs): input_tensor = inputs.get(self.key) return self._transform_input_tensor(input_tensor) def _transform_input_tensor(self, input_tensor): return math_ops.cast(input_tensor, dtypes.float32) def transform_feature(self, transformation_cache, state_manager): """See `FeatureColumn` base class. In this case, we apply the `normalizer_fn` to the input tensor. Args: transformation_cache: A `FeatureTransformationCache` object to access features. state_manager: A `StateManager` to create / access resources such as lookup tables. Returns: Normalized input tensor. """ input_tensor = transformation_cache.get(self.key, state_manager) if self.normalizer_fn is not None: input_tensor = self.normalizer_fn(input_tensor) return self._transform_input_tensor(input_tensor) @property def variable_shape(self): """Returns a `TensorShape` representing the shape of sequence input.""" return tensor_shape.TensorShape(self.shape) def get_sequence_dense_tensor(self, transformation_cache, state_manager): """Returns a `TensorSequenceLengthPair`. Args: transformation_cache: A `FeatureTransformationCache` object to access features. state_manager: A `StateManager` to create / access resources such as lookup tables. """ sp_tensor = transformation_cache.get(self, state_manager) dense_tensor = sparse_ops.sparse_tensor_to_dense( sp_tensor, default_value=self.default_value) # Reshape into [batch_size, T, variable_shape]. dense_shape = array_ops.concat( [array_ops.shape(dense_tensor)[:1], [-1], self.variable_shape], axis=0) dense_tensor = array_ops.reshape(dense_tensor, shape=dense_shape) # Get the number of timesteps per example # For the 2D case, the raw values are grouped according to num_elements; # for the 3D case, the grouping happens in the third dimension, and # sequence length is not affected. if sp_tensor.shape.ndims == 2: num_elements = self.variable_shape.num_elements() else: num_elements = 1 seq_length = fc_utils.sequence_length_from_sparse_tensor( sp_tensor, num_elements=num_elements) return fc.SequenceDenseColumn.TensorSequenceLengthPair( dense_tensor=dense_tensor, sequence_length=seq_length) # TODO(b/119409767): Implement parents, _{get,from}_config. @property def parents(self): """See 'FeatureColumn` base class.""" raise NotImplementedError() def _get_config(self): """See 'FeatureColumn` base class.""" raise NotImplementedError() @classmethod def _from_config(cls, config, custom_objects=None, columns_by_name=None): """See 'FeatureColumn` base class.""" raise NotImplementedError() # pylint: enable=protected-access