easy_rec/python/layers/keras/blocks.py (217 lines of code) (raw):

# -*- encoding:utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. """Convenience blocks for building models.""" import logging import tensorflow as tf from tensorflow.python.keras.initializers import Constant from tensorflow.python.keras.layers import Dense from tensorflow.python.keras.layers import Dropout from tensorflow.python.keras.layers import Lambda from tensorflow.python.keras.layers import Layer from easy_rec.python.layers.keras.activation import activation_layer from easy_rec.python.layers.utils import Parameter from easy_rec.python.utils.shape_utils import pad_or_truncate_sequence from easy_rec.python.utils.tf_utils import add_elements_to_collection if tf.__version__ >= '2.0': tf = tf.compat.v1 class MLP(Layer): """Sequential multi-layer perceptron (MLP) block. Attributes: units: Sequential list of layer sizes. use_bias: Whether to include a bias term. activation: Type of activation to use on all except the last layer. final_activation: Type of activation to use on last layer. **kwargs: Extra args passed to the Keras Layer base class. """ def __init__(self, params, name='mlp', reuse=None, **kwargs): super(MLP, self).__init__(name=name, **kwargs) self.layer_name = name # for add to output params.check_required('hidden_units') use_bn = params.get_or_default('use_bn', True) use_final_bn = params.get_or_default('use_final_bn', True) use_bias = params.get_or_default('use_bias', False) use_final_bias = params.get_or_default('use_final_bias', False) dropout_rate = list(params.get_or_default('dropout_ratio', [])) activation = params.get_or_default('activation', 'relu') initializer = params.get_or_default('initializer', 'he_uniform') final_activation = params.get_or_default('final_activation', None) use_bn_after_act = params.get_or_default('use_bn_after_activation', False) units = list(params.hidden_units) logging.info( 'MLP(%s) units: %s, dropout: %r, activate=%s, use_bn=%r, final_bn=%r,' ' final_activate=%s, bias=%r, initializer=%s, bn_after_activation=%r' % (name, units, dropout_rate, activation, use_bn, use_final_bn, final_activation, use_bias, initializer, use_bn_after_act)) assert len(units) > 0, 'MLP(%s) takes at least one hidden units' % name self.reuse = reuse self.add_to_outputs = params.get_or_default('add_to_outputs', False) num_dropout = len(dropout_rate) self._sub_layers = [] for i, num_units in enumerate(units[:-1]): name = 'layer_%d' % i drop_rate = dropout_rate[i] if i < num_dropout else 0.0 self.add_rich_layer(num_units, use_bn, drop_rate, activation, initializer, use_bias, use_bn_after_act, name, params.l2_regularizer) n = len(units) - 1 drop_rate = dropout_rate[n] if num_dropout > n else 0.0 name = 'layer_%d' % n self.add_rich_layer(units[-1], use_final_bn, drop_rate, final_activation, initializer, use_final_bias, use_bn_after_act, name, params.l2_regularizer) def add_rich_layer(self, num_units, use_bn, dropout_rate, activation, initializer, use_bias, use_bn_after_activation, name, l2_reg=None): act_layer = activation_layer(activation, name='%s/act' % name) if use_bn and not use_bn_after_activation: dense = Dense( units=num_units, use_bias=use_bias, kernel_initializer=initializer, kernel_regularizer=l2_reg, name='%s/dense' % name) self._sub_layers.append(dense) bn = tf.keras.layers.BatchNormalization( name='%s/bn' % name, trainable=True) self._sub_layers.append(bn) self._sub_layers.append(act_layer) else: dense = Dense( num_units, use_bias=use_bias, kernel_initializer=initializer, kernel_regularizer=l2_reg, name='%s/dense' % name) self._sub_layers.append(dense) self._sub_layers.append(act_layer) if use_bn and use_bn_after_activation: bn = tf.keras.layers.BatchNormalization(name='%s/bn' % name) self._sub_layers.append(bn) if 0.0 < dropout_rate < 1.0: dropout = Dropout(dropout_rate, name='%s/dropout' % name) self._sub_layers.append(dropout) elif dropout_rate >= 1.0: raise ValueError('invalid dropout_ratio: %.3f' % dropout_rate) def call(self, x, training=None, **kwargs): """Performs the forward computation of the block.""" for layer in self._sub_layers: cls = layer.__class__.__name__ if cls in ('Dropout', 'BatchNormalization', 'Dice'): x = layer(x, training=training) if cls in ('BatchNormalization', 'Dice') and training: add_elements_to_collection(layer.updates, tf.GraphKeys.UPDATE_OPS) else: x = layer(x) if self.add_to_outputs and 'prediction_dict' in kwargs: outputs = kwargs['prediction_dict'] outputs[self.layer_name] = tf.squeeze(x, axis=1) logging.info('add `%s` to model outputs' % self.layer_name) return x class Highway(Layer): def __init__(self, params, name='highway', reuse=None, **kwargs): super(Highway, self).__init__(name=name, **kwargs) self.emb_size = params.get_or_default('emb_size', None) self.num_layers = params.get_or_default('num_layers', 1) self.activation = params.get_or_default('activation', 'relu') self.dropout_rate = params.get_or_default('dropout_rate', 0.0) self.init_gate_bias = params.get_or_default('init_gate_bias', -3.0) self.act_layer = activation_layer(self.activation) self.dropout_layer = Dropout( self.dropout_rate) if self.dropout_rate > 0.0 else None self.project_layer = None self.gate_bias_initializer = Constant(self.init_gate_bias) self.gates = [] # T self.transforms = [] # H self.multiply_layer = tf.keras.layers.Multiply() self.add_layer = tf.keras.layers.Add() def build(self, input_shape): dim = input_shape[-1] if self.emb_size is not None and dim != self.emb_size: self.project_layer = Dense(self.emb_size, name='input_projection') dim = self.emb_size self.carry_gate = Lambda(lambda x: 1.0 - x, output_shape=(dim,)) for i in range(self.num_layers): gate = Dense( units=dim, bias_initializer=self.gate_bias_initializer, activation='sigmoid', name='gate_%d' % i) self.gates.append(gate) self.transforms.append(Dense(units=dim)) def call(self, inputs, training=None, **kwargs): value = inputs if self.project_layer is not None: value = self.project_layer(inputs) for i in range(self.num_layers): gate = self.gates[i](value) transformed = self.act_layer(self.transforms[i](value)) if self.dropout_layer is not None: transformed = self.dropout_layer(transformed, training=training) transformed_gated = self.multiply_layer([gate, transformed]) identity_gated = self.multiply_layer([self.carry_gate(gate), value]) value = self.add_layer([transformed_gated, identity_gated]) return value class Gate(Layer): """Weighted sum gate.""" def __init__(self, params, name='gate', reuse=None, **kwargs): super(Gate, self).__init__(name=name, **kwargs) self.weight_index = params.get_or_default('weight_index', 0) if params.has_field('mlp'): mlp_cfg = Parameter.make_from_pb(params.mlp) mlp_cfg.l2_regularizer = params.l2_regularizer self.top_mlp = MLP(mlp_cfg, name='top_mlp') else: self.top_mlp = None def call(self, inputs, training=None, **kwargs): assert len( inputs ) > 1, 'input of Gate layer must be a list containing at least 2 elements' weights = inputs[self.weight_index] j = 0 for i, x in enumerate(inputs): if i == self.weight_index: continue if j == 0: output = weights[:, j, None] * x else: output += weights[:, j, None] * x j += 1 if self.top_mlp is not None: output = self.top_mlp(output, training=training) return output class TextCNN(Layer): """Text CNN Model. References - [Convolutional Neural Networks for Sentence Classification](https://arxiv.org/abs/1408.5882) """ def __init__(self, params, name='text_cnn', reuse=None, **kwargs): super(TextCNN, self).__init__(name=name, **kwargs) self.config = params.get_pb_config() self.pad_seq_length = self.config.pad_sequence_length if self.pad_seq_length <= 0: logging.warning( 'run text cnn with pad_sequence_length <= 0, the predict of model may be unstable' ) self.conv_layers = [] self.pool_layer = tf.keras.layers.GlobalMaxPool1D() self.concat_layer = tf.keras.layers.Concatenate(axis=-1) for size, filters in zip(self.config.filter_sizes, self.config.num_filters): conv = tf.keras.layers.Conv1D( filters=int(filters), kernel_size=int(size), activation=self.config.activation) self.conv_layers.append(conv) if self.config.HasField('mlp'): p = Parameter.make_from_pb(self.config.mlp) p.l2_regularizer = params.l2_regularizer self.mlp = MLP(p, name='mlp', reuse=reuse) else: self.mlp = None def call(self, inputs, training=None, **kwargs): """Input shape: 3D tensor with shape: `(batch_size, steps, input_dim).""" assert isinstance(inputs, (list, tuple)) assert len(inputs) >= 2 seq_emb, seq_len = inputs[:2] if self.pad_seq_length > 0: seq_emb, seq_len = pad_or_truncate_sequence(seq_emb, seq_len, self.pad_seq_length) pooled_outputs = [] for layer in self.conv_layers: conv = layer(seq_emb) pooled = self.pool_layer(conv) pooled_outputs.append(pooled) net = self.concat_layer(pooled_outputs) if self.mlp is not None: output = self.mlp(net, training=training) else: output = net return output