easy_rec/python/compat/feature_column/sequence_feature_column.py (236 lines of code) (raw):
# -*- encoding:utf-8 -*-
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""This API defines FeatureColumn for sequential input.
NOTE: This API is a work in progress and will likely be changing frequently.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import check_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import parsing_ops
from tensorflow.python.ops import sparse_ops
from easy_rec.python.compat.feature_column import feature_column as fc_v1
from easy_rec.python.compat.feature_column import feature_column_v2 as fc
from easy_rec.python.compat.feature_column import utils as fc_utils
# pylint: disable=protected-access
class SequenceFeatures(fc._BaseFeaturesLayer):
"""A layer for sequence input.
All `feature_columns` must be sequence dense columns with the same
`sequence_length`. The output of this method can be fed into sequence
networks, such as RNN.
The output of this method is a 3D `Tensor` of shape `[batch_size, T, D]`.
`T` is the maximum sequence length for this batch, which could differ from
batch to batch.
If multiple `feature_columns` are given with `Di` `num_elements` each, their
outputs are concatenated. So, the final `Tensor` has shape
`[batch_size, T, D0 + D1 + ... + Dn]`.
Example:
```python
rating = sequence_numeric_column('rating')
watches = sequence_categorical_column_with_identity(
'watches', num_buckets=1000)
watches_embedding = embedding_column(watches, dimension=10)
columns = [rating, watches_embedding]
sequence_input_layer = SequenceFeatures(columns)
features = tf.io.parse_example(...,
features=make_parse_example_spec(columns))
sequence_input, sequence_length = sequence_input_layer(features)
sequence_length_mask = tf.sequence_mask(sequence_length)
rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size)
rnn_layer = tf.keras.layers.RNN(rnn_cell)
outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask)
```
"""
def __init__(self, feature_columns, trainable=True, name=None, **kwargs):
"""Constructs a SequenceFeatures layer.
Args:
feature_columns: An iterable of dense sequence columns. Valid columns are
- `embedding_column` that wraps a `sequence_categorical_column_with_*`
- `sequence_numeric_column`.
trainable: Boolean, whether the layer's variables will be updated via
gradient descent during training.
name: Name to give to the SequenceFeatures.
**kwargs: Keyword arguments to construct a layer.
Raises:
ValueError: If any of the `feature_columns` is not a
`SequenceDenseColumn`.
"""
super(SequenceFeatures, self).__init__(
feature_columns=feature_columns,
trainable=trainable,
name=name,
expected_column_type=fc.SequenceDenseColumn,
**kwargs)
def _target_shape(self, input_shape, total_elements):
return (input_shape[0], input_shape[1], total_elements)
def call(self, features):
"""Returns sequence input corresponding to the `feature_columns`.
Args:
features: A dict mapping keys to tensors.
Returns:
An `(input_layer, sequence_length)` tuple where:
- input_layer: A float `Tensor` of shape `[batch_size, T, D]`.
`T` is the maximum sequence length for this batch, which could differ
from batch to batch. `D` is the sum of `num_elements` for all
`feature_columns`.
- sequence_length: An int `Tensor` of shape `[batch_size]`. The sequence
length for each example.
Raises:
ValueError: If features are not a dictionary.
"""
if not isinstance(features, dict):
raise ValueError('We expected a dictionary here. Instead we got: ',
features)
transformation_cache = fc.FeatureTransformationCache(features)
output_tensors = []
sequence_lengths = []
for column in self._feature_columns:
with ops.name_scope(column.name):
dense_tensor, sequence_length = column.get_sequence_dense_tensor(
transformation_cache, self._state_manager)
# Flattens the final dimension to produce a 3D Tensor.
output_tensors.append(self._process_dense_tensor(column, dense_tensor))
sequence_lengths.append(sequence_length)
# Check and process sequence lengths.
fc._verify_static_batch_size_equality(sequence_lengths,
self._feature_columns)
sequence_length = _assert_all_equal_and_return(sequence_lengths)
return self._verify_and_concat_tensors(output_tensors), sequence_length
def concatenate_context_input(context_input, sequence_input):
"""Replicates `context_input` across all timesteps of `sequence_input`.
Expands dimension 1 of `context_input` then tiles it `sequence_length` times.
This value is appended to `sequence_input` on dimension 2 and the result is
returned.
Args:
context_input: A `Tensor` of dtype `float32` and shape `[batch_size, d1]`.
sequence_input: A `Tensor` of dtype `float32` and shape `[batch_size,
padded_length, d0]`.
Returns:
A `Tensor` of dtype `float32` and shape `[batch_size, padded_length,
d0 + d1]`.
Raises:
ValueError: If `sequence_input` does not have rank 3 or `context_input` does
not have rank 2.
"""
seq_rank_check = check_ops.assert_rank(
sequence_input,
3,
message='sequence_input must have rank 3',
data=[array_ops.shape(sequence_input)])
seq_type_check = check_ops.assert_type(
sequence_input,
dtypes.float32,
message='sequence_input must have dtype float32; got {}.'.format(
sequence_input.dtype))
ctx_rank_check = check_ops.assert_rank(
context_input,
2,
message='context_input must have rank 2',
data=[array_ops.shape(context_input)])
ctx_type_check = check_ops.assert_type(
context_input,
dtypes.float32,
message='context_input must have dtype float32; got {}.'.format(
context_input.dtype))
with ops.control_dependencies(
[seq_rank_check, seq_type_check, ctx_rank_check, ctx_type_check]):
padded_length = array_ops.shape(sequence_input)[1]
tiled_context_input = array_ops.tile(
array_ops.expand_dims(context_input, 1),
array_ops.concat([[1], [padded_length], [1]], 0))
return array_ops.concat([sequence_input, tiled_context_input], 2)
def sequence_categorical_column_with_identity(key,
num_buckets,
default_value=None,
feature_name=None):
"""Returns a feature column that represents sequences of integers.
Pass this to `embedding_column` or `indicator_column` to convert sequence
categorical data into dense representation for input to sequence NN, such as
RNN.
Example:
```python
watches = sequence_categorical_column_with_identity(
'watches', num_buckets=1000)
watches_embedding = embedding_column(watches, dimension=10)
columns = [watches_embedding]
features = tf.io.parse_example(..., features=make_parse_example_spec(columns))
sequence_feature_layer = SequenceFeatures(columns)
sequence_input, sequence_length = sequence_feature_layer(features)
sequence_length_mask = tf.sequence_mask(sequence_length)
rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size)
rnn_layer = tf.keras.layers.RNN(rnn_cell)
outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask)
```
Args:
key: A unique string identifying the input feature.
num_buckets: Range of inputs. Namely, inputs are expected to be in the
range `[0, num_buckets)`.
default_value: If `None`, this column's graph operations will fail for
out-of-range inputs. Otherwise, this value must be in the range
`[0, num_buckets)`, and will replace out-of-range inputs.
Returns:
A `SequenceCategoricalColumn`.
Raises:
ValueError: if `num_buckets` is less than one.
ValueError: if `default_value` is not in range `[0, num_buckets)`.
"""
return fc.SequenceCategoricalColumn(
fc.categorical_column_with_identity(
feature_name=feature_name,
key=key,
num_buckets=num_buckets,
default_value=default_value))
def sequence_numeric_column_with_bucketized_column(source_column, boundaries):
if not isinstance(source_column, (SequenceNumericColumn,)): # pylint: disable=protected-access
raise ValueError(
'source_column must be a column generated with sequence_numeric_column(). '
'Given: {}'.format(source_column))
if len(source_column.shape) > 1:
raise ValueError('source_column must be one-dimensional column. '
'Given: {}'.format(source_column))
if not boundaries:
raise ValueError('boundaries must not be empty.')
if not (isinstance(boundaries, list) or isinstance(boundaries, tuple)):
raise ValueError('boundaries must be a sorted list.')
for i in range(len(boundaries) - 1):
if boundaries[i] >= boundaries[i + 1]:
raise ValueError('boundaries must be a sorted list.')
return fc.SequenceBucketizedColumn(source_column, tuple(boundaries))
def sequence_numeric_column_with_raw_column(source_column, sequence_length):
if not isinstance(source_column, (SequenceNumericColumn,)): # pylint: disable=protected-access
raise ValueError(
'source_column must be a column generated with sequence_numeric_column(). '
'Given: {}'.format(source_column))
if len(source_column.shape) > 1:
raise ValueError('source_column must be one-dimensional column. '
'Given: {}'.format(source_column))
return fc.SequenceNumericColumn(source_column, sequence_length)
def sequence_weighted_categorical_column(categorical_column,
weight_feature_key,
dtype=dtypes.float32):
if (dtype is None) or not (dtype.is_integer or dtype.is_floating):
raise ValueError('dtype {} is not convertible to float.'.format(dtype))
return fc.SequenceWeightedCategoricalColumn(
categorical_column=categorical_column,
weight_feature_key=weight_feature_key,
dtype=dtype)
def sequence_categorical_column_with_hash_bucket(key,
hash_bucket_size,
dtype=dtypes.string,
feature_name=None):
"""A sequence of categorical terms where ids are set by hashing.
Pass this to `embedding_column` or `indicator_column` to convert sequence
categorical data into dense representation for input to sequence NN, such as
RNN.
Example:
```python
tokens = sequence_categorical_column_with_hash_bucket(
'tokens', hash_bucket_size=1000)
tokens_embedding = embedding_column(tokens, dimension=10)
columns = [tokens_embedding]
features = tf.io.parse_example(..., features=make_parse_example_spec(columns))
sequence_feature_layer = SequenceFeatures(columns)
sequence_input, sequence_length = sequence_feature_layer(features)
sequence_length_mask = tf.sequence_mask(sequence_length)
rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size)
rnn_layer = tf.keras.layers.RNN(rnn_cell)
outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask)
```
Args:
key: A unique string identifying the input feature.
hash_bucket_size: An int > 1. The number of buckets.
dtype: The type of features. Only string and integer types are supported.
Returns:
A `SequenceCategoricalColumn`.
Raises:
ValueError: `hash_bucket_size` is not greater than 1.
ValueError: `dtype` is neither string nor integer.
"""
return fc.SequenceCategoricalColumn(
fc.categorical_column_with_hash_bucket(
feature_name=feature_name,
key=key,
hash_bucket_size=hash_bucket_size,
dtype=dtype))
def sequence_categorical_column_with_vocabulary_file(key,
vocabulary_file,
vocabulary_size=None,
num_oov_buckets=0,
default_value=None,
dtype=dtypes.string,
feature_name=None):
"""A sequence of categorical terms where ids use a vocabulary file.
Pass this to `embedding_column` or `indicator_column` to convert sequence
categorical data into dense representation for input to sequence NN, such as
RNN.
Example:
```python
states = sequence_categorical_column_with_vocabulary_file(
key='states', vocabulary_file='/us/states.txt', vocabulary_size=50,
num_oov_buckets=5)
states_embedding = embedding_column(states, dimension=10)
columns = [states_embedding]
features = tf.io.parse_example(..., features=make_parse_example_spec(columns))
sequence_feature_layer = SequenceFeatures(columns)
sequence_input, sequence_length = sequence_feature_layer(features)
sequence_length_mask = tf.sequence_mask(sequence_length)
rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size)
rnn_layer = tf.keras.layers.RNN(rnn_cell)
outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask)
```
Args:
key: A unique string identifying the input feature.
vocabulary_file: The vocabulary file name.
vocabulary_size: Number of the elements in the vocabulary. This must be no
greater than length of `vocabulary_file`, if less than length, later
values are ignored. If None, it is set to the length of `vocabulary_file`.
num_oov_buckets: Non-negative integer, the number of out-of-vocabulary
buckets. All out-of-vocabulary inputs will be assigned IDs in the range
`[vocabulary_size, vocabulary_size+num_oov_buckets)` based on a hash of
the input value. A positive `num_oov_buckets` can not be specified with
`default_value`.
default_value: The integer ID value to return for out-of-vocabulary feature
values, defaults to `-1`. This can not be specified with a positive
`num_oov_buckets`.
dtype: The type of features. Only string and integer types are supported.
Returns:
A `SequenceCategoricalColumn`.
Raises:
ValueError: `vocabulary_file` is missing or cannot be opened.
ValueError: `vocabulary_size` is missing or < 1.
ValueError: `num_oov_buckets` is a negative integer.
ValueError: `num_oov_buckets` and `default_value` are both specified.
ValueError: `dtype` is neither string nor integer.
"""
return fc.SequenceCategoricalColumn(
fc.categorical_column_with_vocabulary_file(
feature_name=feature_name,
key=key,
vocabulary_file=vocabulary_file,
vocabulary_size=vocabulary_size,
num_oov_buckets=num_oov_buckets,
default_value=default_value,
dtype=dtype))
def sequence_categorical_column_with_vocabulary_list(key,
vocabulary_list,
dtype=None,
default_value=-1,
num_oov_buckets=0,
feature_name=None):
"""A sequence of categorical terms where ids use an in-memory list.
Pass this to `embedding_column` or `indicator_column` to convert sequence
categorical data into dense representation for input to sequence NN, such as
RNN.
Example:
```python
colors = sequence_categorical_column_with_vocabulary_list(
key='colors', vocabulary_list=('R', 'G', 'B', 'Y'),
num_oov_buckets=2)
colors_embedding = embedding_column(colors, dimension=3)
columns = [colors_embedding]
features = tf.io.parse_example(..., features=make_parse_example_spec(columns))
sequence_feature_layer = SequenceFeatures(columns)
sequence_input, sequence_length = sequence_feature_layer(features)
sequence_length_mask = tf.sequence_mask(sequence_length)
rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size)
rnn_layer = tf.keras.layers.RNN(rnn_cell)
outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask)
```
Args:
key: A unique string identifying the input feature.
vocabulary_list: An ordered iterable defining the vocabulary. Each feature
is mapped to the index of its value (if present) in `vocabulary_list`.
Must be castable to `dtype`.
dtype: The type of features. Only string and integer types are supported.
If `None`, it will be inferred from `vocabulary_list`.
default_value: The integer ID value to return for out-of-vocabulary feature
values, defaults to `-1`. This can not be specified with a positive
`num_oov_buckets`.
num_oov_buckets: Non-negative integer, the number of out-of-vocabulary
buckets. All out-of-vocabulary inputs will be assigned IDs in the range
`[len(vocabulary_list), len(vocabulary_list)+num_oov_buckets)` based on a
hash of the input value. A positive `num_oov_buckets` can not be specified
with `default_value`.
Returns:
A `SequenceCategoricalColumn`.
Raises:
ValueError: if `vocabulary_list` is empty, or contains duplicate keys.
ValueError: `num_oov_buckets` is a negative integer.
ValueError: `num_oov_buckets` and `default_value` are both specified.
ValueError: if `dtype` is not integer or string.
"""
return fc.SequenceCategoricalColumn(
fc.categorical_column_with_vocabulary_list(
feature_name=feature_name,
key=key,
vocabulary_list=vocabulary_list,
dtype=dtype,
default_value=default_value,
num_oov_buckets=num_oov_buckets))
def sequence_numeric_column(key,
shape=(1,),
default_value=0.,
dtype=dtypes.float32,
normalizer_fn=None,
feature_name=None):
"""Returns a feature column that represents sequences of numeric data.
Example:
```python
temperature = sequence_numeric_column('temperature')
columns = [temperature]
features = tf.io.parse_example(..., features=make_parse_example_spec(columns))
sequence_feature_layer = SequenceFeatures(columns)
sequence_input, sequence_length = sequence_feature_layer(features)
sequence_length_mask = tf.sequence_mask(sequence_length)
rnn_cell = tf.keras.layers.SimpleRNNCell(hidden_size)
rnn_layer = tf.keras.layers.RNN(rnn_cell)
outputs, state = rnn_layer(sequence_input, mask=sequence_length_mask)
```
Args:
key: A unique string identifying the input features.
shape: The shape of the input data per sequence id. E.g. if `shape=(2,)`,
each example must contain `2 * sequence_length` values.
default_value: A single value compatible with `dtype` that is used for
padding the sparse data into a dense `Tensor`.
dtype: The type of values.
normalizer_fn: If not `None`, a function that can be used to normalize the
value of the tensor after `default_value` is applied for parsing.
Normalizer function takes the input `Tensor` as its argument, and returns
the output `Tensor`. (e.g. lambda x: (x - 3.0) / 4.2). Please note that
even though the most common use case of this function is normalization, it
can be used for any kind of Tensorflow transformations.
Returns:
A `SequenceNumericColumn`.
Raises:
TypeError: if any dimension in shape is not an int.
ValueError: if any dimension in shape is not a positive integer.
ValueError: if `dtype` is not convertible to `tf.float32`.
"""
shape = fc._check_shape(shape=shape, key=key)
if not (dtype.is_integer or dtype.is_floating):
raise ValueError('dtype must be convertible to float. '
'dtype: {}, key: {}'.format(dtype, key))
if normalizer_fn is not None and not callable(normalizer_fn):
raise TypeError(
'normalizer_fn must be a callable. Given: {}'.format(normalizer_fn))
return SequenceNumericColumn(
feature_name=feature_name,
key=key,
shape=shape,
default_value=default_value,
dtype=dtype,
normalizer_fn=normalizer_fn)
def _assert_all_equal_and_return(tensors, name=None):
"""Asserts that all tensors are equal and returns the first one."""
with ops.name_scope(name, 'assert_all_equal', values=tensors):
if len(tensors) == 1:
return tensors[0]
assert_equal_ops = []
for t in tensors[1:]:
assert_equal_ops.append(check_ops.assert_equal(tensors[0], t))
with ops.control_dependencies(assert_equal_ops):
return array_ops.identity(tensors[0])
class SequenceNumericColumn(
fc.SequenceDenseColumn, fc_v1._FeatureColumn,
collections.namedtuple('SequenceNumericColumn',
('feature_name', 'key', 'shape', 'default_value',
'dtype', 'normalizer_fn'))):
"""Represents sequences of numeric data."""
@property
def _is_v2_column(self):
return True
@property
def name(self):
"""See `FeatureColumn` base class."""
return self.feature_name if self.feature_name else self.key
@property
def raw_name(self):
"""See `FeatureColumn` base class."""
return self.key
@property
def parse_example_spec(self):
"""See `FeatureColumn` base class."""
return {self.key: parsing_ops.VarLenFeature(self.dtype)}
def _transform_feature(self, inputs):
input_tensor = inputs.get(self.key)
return self._transform_input_tensor(input_tensor)
def _transform_input_tensor(self, input_tensor):
return math_ops.cast(input_tensor, dtypes.float32)
def transform_feature(self, transformation_cache, state_manager):
"""See `FeatureColumn` base class.
In this case, we apply the `normalizer_fn` to the input tensor.
Args:
transformation_cache: A `FeatureTransformationCache` object to access
features.
state_manager: A `StateManager` to create / access resources such as
lookup tables.
Returns:
Normalized input tensor.
"""
input_tensor = transformation_cache.get(self.key, state_manager)
if self.normalizer_fn is not None:
input_tensor = self.normalizer_fn(input_tensor)
return self._transform_input_tensor(input_tensor)
@property
def variable_shape(self):
"""Returns a `TensorShape` representing the shape of sequence input."""
return tensor_shape.TensorShape(self.shape)
def get_sequence_dense_tensor(self, transformation_cache, state_manager):
"""Returns a `TensorSequenceLengthPair`.
Args:
transformation_cache: A `FeatureTransformationCache` object to access
features.
state_manager: A `StateManager` to create / access resources such as
lookup tables.
"""
sp_tensor = transformation_cache.get(self, state_manager)
dense_tensor = sparse_ops.sparse_tensor_to_dense(
sp_tensor, default_value=self.default_value)
# Reshape into [batch_size, T, variable_shape].
dense_shape = array_ops.concat(
[array_ops.shape(dense_tensor)[:1], [-1], self.variable_shape], axis=0)
dense_tensor = array_ops.reshape(dense_tensor, shape=dense_shape)
# Get the number of timesteps per example
# For the 2D case, the raw values are grouped according to num_elements;
# for the 3D case, the grouping happens in the third dimension, and
# sequence length is not affected.
if sp_tensor.shape.ndims == 2:
num_elements = self.variable_shape.num_elements()
else:
num_elements = 1
seq_length = fc_utils.sequence_length_from_sparse_tensor(
sp_tensor, num_elements=num_elements)
return fc.SequenceDenseColumn.TensorSequenceLengthPair(
dense_tensor=dense_tensor, sequence_length=seq_length)
# TODO(b/119409767): Implement parents, _{get,from}_config.
@property
def parents(self):
"""See 'FeatureColumn` base class."""
raise NotImplementedError()
def _get_config(self):
"""See 'FeatureColumn` base class."""
raise NotImplementedError()
@classmethod
def _from_config(cls, config, custom_objects=None, columns_by_name=None):
"""See 'FeatureColumn` base class."""
raise NotImplementedError()
# pylint: enable=protected-access