ludwig/combiners/combiners.py (242 lines of code) (raw):
#! /usr/bin/env python
# coding=utf-8
# Copyright (c) 2019 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import logging
import tensorflow as tf
from tensorflow.keras.layers import concatenate
from ludwig.encoders.sequence_encoders import ParallelCNN
from ludwig.encoders.sequence_encoders import StackedCNN
from ludwig.encoders.sequence_encoders import StackedCNNRNN
from ludwig.encoders.sequence_encoders import StackedParallelCNN
from ludwig.encoders.sequence_encoders import StackedRNN
from ludwig.modules.fully_connected_modules import FCStack
from ludwig.modules.reduction_modules import SequenceReducer
from ludwig.utils.misc_utils import get_from_registry
from ludwig.utils.tf_utils import sequence_length_3D
logger = logging.getLogger(__name__)
class ConcatCombiner(tf.keras.Model):
def __init__(
self,
input_features=None,
fc_layers=None,
num_fc_layers=None,
fc_size=256,
use_bias=True,
weights_initializer='glorot_uniform',
bias_initializer='zeros',
weights_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
# weights_constraint=None,
# bias_constraint=None,
norm=None,
norm_params=None,
activation='relu',
dropout=0,
**kwargs
):
super().__init__()
logger.debug(' {}'.format(self.name))
self.fc_stack = None
# todo this may be redundant, check
if fc_layers is None and \
num_fc_layers is not None:
fc_layers = []
for i in range(num_fc_layers):
fc_layers.append({'fc_size': fc_size})
if fc_layers is not None:
logger.debug(' FCStack')
self.fc_stack = FCStack(
layers=fc_layers,
num_layers=num_fc_layers,
default_fc_size=fc_size,
default_use_bias=use_bias,
default_weights_initializer=weights_initializer,
default_bias_initializer=bias_initializer,
default_weights_regularizer=weights_regularizer,
default_bias_regularizer=bias_regularizer,
default_activity_regularizer=activity_regularizer,
# default_weights_constraint=weights_constraint,
# default_bias_constraint=bias_constraint,
default_norm=norm,
default_norm_params=norm_params,
default_activation=activation,
default_dropout=dropout,
)
if input_features and len(input_features) == 1 and fc_layers is None:
self.supports_masking = True
def call(
self,
inputs, # encoder outputs
training=None,
mask=None,
**kwargs
):
encoder_outputs = [inputs[k]['encoder_output'] for k in inputs]
# ================ Concat ================
if len(encoder_outputs) > 1:
hidden = concatenate(encoder_outputs, 1)
else:
hidden = list(encoder_outputs)[0]
# ================ Fully Connected ================
if self.fc_stack is not None:
hidden = self.fc_stack(
hidden,
training=training,
mask=mask
)
return_data = {'combiner_output': hidden}
if len(inputs) == 1:
for key, value in [d for d in inputs.values()][0].items():
if key != 'encoder_output':
return_data[key] = value
return return_data
class SequenceConcatCombiner(tf.keras.Model):
def __init__(
self,
reduce_output=None,
main_sequence_feature=None,
**kwargs
):
super().__init__()
logger.debug(' {}'.format(self.name))
self.reduce_output = reduce_output
self.reduce_sequence = SequenceReducer(reduce_mode=reduce_output)
if self.reduce_output is None:
self.supports_masking = True
self.main_sequence_feature = main_sequence_feature
def __call__(
self,
inputs, # encoder outputs
training=None,
mask=None,
**kwargs
):
if (self.main_sequence_feature is None or
self.main_sequence_feature not in inputs):
for if_name, if_outputs in inputs.items():
# todo: when https://github.com/uber/ludwig/issues/810 is closed
# convert following test from using shape to use explicit
# if_outputs['type'] values for sequence features
if len(if_outputs['encoder_output'].shape) == 3:
self.main_sequence_feature = if_name
break
if self.main_sequence_feature is None:
raise Exception(
'No sequence feature available for sequence combiner'
)
main_sequence_feature_encoding = inputs[self.main_sequence_feature]
representation = main_sequence_feature_encoding['encoder_output']
representations = [representation]
sequence_max_length = representation.shape[1]
sequence_length = sequence_length_3D(representation)
# ================ Concat ================
for if_name, if_outputs in inputs.items():
if if_name != self.main_sequence_feature:
if_representation = if_outputs['encoder_output']
if len(if_representation.shape) == 3:
# The following check makes sense when
# both representations have a specified
# sequence length dimension. If they do not,
# then this check is simply checking if None == None
# and will not catch discrepancies in the different
# feature length dimension. Those errors will show up
# at training time. Possible solutions to this is
# to enforce a length second dimension in
# sequential feature placeholders, but that
# does not work with BucketedBatcher that requires
# the second dimension to be undefined in order to be
# able to trim the data points and speed up computation.
# So for now we are keeping things like this, make sure
# to write in the documentation that training time
# dimensions mismatch may occur if the sequential
# features have different lengths for some data points.
if if_representation.shape[1] != representation.shape[1]:
raise ValueError(
'The sequence length of the input feature {} '
'is {} and is different from the sequence '
'length of the main sequence feature {} which '
'is {}.\n Shape of {}: {}, shape of {}: {}.\n'
'Sequence lengths of all sequential features '
'must be the same in order to be concatenated '
'by the sequence concat combiner. '
'Try to impose the same max sequence length '
'as a preprocessing parameter to both features '
'or to reduce the output of {}.'.format(
if_name,
if_representation.shape[1],
self.main_sequence_feature,
representation.shape[1],
if_name,
if_representation.shape,
if_name,
representation.shape,
if_name
)
)
# this assumes all sequence representations have the
# same sequence length, 2nd dimension
representations.append(if_representation)
elif len(if_representation.shape) == 2:
multipliers = tf.constant([1, sequence_max_length, 1])
tiled_representation = tf.tile(
tf.expand_dims(if_representation, 1),
multipliers
)
representations.append(tiled_representation)
else:
raise ValueError(
'The representation of {} has rank {} and cannot be'
' concatenated by a sequence concat combiner. '
'Only rank 2 and rank 3 tensors are supported.'.format(
if_outputs['name'],
len(if_representation.shape)
)
)
hidden = tf.concat(representations, 2)
logger.debug(' concat_hidden: {0}'.format(hidden))
# ================ Mask ================
# todo future: maybe modify this with TF2 mask mechanics
sequence_mask = tf.sequence_mask(
sequence_length,
sequence_max_length
)
hidden = tf.multiply(
hidden,
tf.cast(tf.expand_dims(sequence_mask, -1), dtype=tf.float32)
)
# ================ Reduce ================
hidden = self.reduce_sequence(hidden)
return_data = {'combiner_output': hidden}
if len(inputs) == 1:
for key, value in [d for d in inputs.values()][0].items():
if key != 'encoder_output':
return_data[key] = value
return return_data
class SequenceCombiner(tf.keras.Model):
def __init__(
self,
reduce_output=None,
main_sequence_feature=None,
encoder=None,
**kwargs
):
super().__init__()
logger.debug(' {}'.format(self.name))
self.combiner = SequenceConcatCombiner(
reduce_output=None,
main_sequence_feature=main_sequence_feature
)
self.encoder_obj = get_from_registry(
encoder, sequence_encoder_registry)(
should_embed=False,
reduce_output=reduce_output,
**kwargs
)
if (hasattr(self.encoder_obj, 'supports_masking') and
self.encoder_obj.supports_masking):
self.supports_masking = True
def __call__(
self,
inputs, # encoder outputs
training=None,
mask=None,
**kwargs
):
# ================ Concat ================
hidden = self.combiner(
inputs, # encoder outputs
training=training,
**kwargs
)
# ================ Sequence encoding ================
hidden = self.encoder_obj(
hidden['combiner_output'],
training=training,
**kwargs
)
return_data = {'combiner_output': hidden['encoder_output']}
for key, value in hidden.items():
if key != 'encoder_output':
return_data[key] = value
return return_data
def get_combiner_class(combiner_type):
return get_from_registry(
combiner_type,
combiner_registry
)
combiner_registry = {
'concat': ConcatCombiner,
'sequence_concat': SequenceConcatCombiner,
'sequence': SequenceCombiner
}
sequence_encoder_registry = {
'stacked_cnn': StackedCNN,
'parallel_cnn': ParallelCNN,
'stacked_parallel_cnn': StackedParallelCNN,
'rnn': StackedRNN,
'cnnrnn': StackedCNNRNN
}