easy_rec/python/layers/keras/numerical_embedding.py (263 lines of code) (raw):

# -*- encoding:utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. import logging import math import os import tensorflow as tf from tensorflow.python.framework import ops from tensorflow.python.keras.layers import Layer from easy_rec.python.compat.array_ops import repeat from easy_rec.python.utils.activation import get_activation from easy_rec.python.utils.tf_utils import get_ps_num_from_tf_config curr_dir, _ = os.path.split(__file__) parent_dir = os.path.dirname(curr_dir) ops_idr = os.path.dirname(parent_dir) ops_dir = os.path.join(ops_idr, 'ops') if 'PAI' in tf.__version__: ops_dir = os.path.join(ops_dir, '1.12_pai') elif tf.__version__.startswith('1.12'): ops_dir = os.path.join(ops_dir, '1.12') elif tf.__version__.startswith('1.15'): if 'IS_ON_PAI' in os.environ: ops_dir = os.path.join(ops_dir, 'DeepRec') else: ops_dir = os.path.join(ops_dir, '1.15') elif tf.__version__.startswith('2.12'): ops_dir = os.path.join(ops_dir, '2.12') logging.info('ops_dir is %s' % ops_dir) custom_op_path = os.path.join(ops_dir, 'libcustom_ops.so') try: custom_ops = tf.load_op_library(custom_op_path) logging.info('load custom op from %s succeed' % custom_op_path) except Exception as ex: logging.warning('load custom op from %s failed: %s' % (custom_op_path, str(ex))) custom_ops = None class NLinear(Layer): """N linear layers for N token (feature) embeddings. To understand this module, let's revise `tf.layers.dense`. When `tf.layers.dense` is applied to three-dimensional inputs of the shape ``(batch_size, n_tokens, d_embedding)``, then the same linear transformation is applied to each of ``n_tokens`` token (feature) embeddings. By contrast, `NLinear` allocates one linear layer per token (``n_tokens`` layers in total). One such layer can be represented as ``tf.layers.dense(d_in, d_out)``. So, the i-th linear transformation is applied to the i-th token embedding, as illustrated in the following pseudocode:: layers = [tf.layers.dense(d_in, d_out) for _ in range(n_tokens)] x = tf.random.normal(batch_size, n_tokens, d_in) result = tf.stack([layers[i](x[:, i]) for i in range(n_tokens)], 1) Examples: .. testcode:: batch_size = 2 n_features = 3 d_embedding_in = 4 d_embedding_out = 5 x = tf.random.normal(batch_size, n_features, d_embedding_in) m = NLinear(n_features, d_embedding_in, d_embedding_out) assert m(x).shape == (batch_size, n_features, d_embedding_out) """ def __init__(self, n_tokens, d_in, d_out, bias=True, name='nd_linear', **kwargs): """Init with input shapes. Args: n_tokens: the number of tokens (features) d_in: the input dimension d_out: the output dimension bias: indicates if the underlying linear layers have biases name: layer name """ super(NLinear, self).__init__(name=name, **kwargs) self.weight = self.add_weight( 'weights', [1, n_tokens, d_in, d_out], dtype=tf.float32) if bias: initializer = tf.constant_initializer(0.0) self.bias = self.add_weight( 'bias', [1, n_tokens, d_out], dtype=tf.float32, initializer=initializer) else: self.bias = None def call(self, x, **kwargs): if x.shape.ndims != 3: raise ValueError( 'The input must have three dimensions (batch_size, n_tokens, d_embedding)' ) if x.shape[2] != self.weight.shape[2]: raise ValueError('invalid input embedding dimension %d, expect %d' % (int(x.shape[2]), int(self.weight.shape[2]))) x = x[..., None] * self.weight # [B, N, D, D_out] x = tf.reduce_sum(x, axis=-2) # [B, N, D_out] if self.bias is not None: x = x + self.bias return x class PeriodicEmbedding(Layer): """Periodic embeddings for numerical features described in [1]. References: * [1] Yury Gorishniy, Ivan Rubachev, Artem Babenko, "On Embeddings for Numerical Features in Tabular Deep Learning", 2022 https://arxiv.org/pdf/2203.05556.pdf Attributes: embedding_dim: the embedding size, must be an even positive integer. sigma: the scale of the weight initialization. **This is a super important parameter which significantly affects performance**. Its optimal value can be dramatically different for different datasets, so no "default value" can exist for this parameter, and it must be tuned for each dataset. In the original paper, during hyperparameter tuning, this parameter was sampled from the distribution ``LogUniform[1e-2, 1e2]``. A similar grid would be ``[1e-2, 1e-1, 1e0, 1e1, 1e2]``. If possible, add more intermediate values to this grid. output_3d_tensor: whether to output a 3d tensor output_tensor_list: whether to output the list of embedding """ def __init__(self, params, name='periodic_embedding', reuse=None, **kwargs): super(PeriodicEmbedding, self).__init__(name=name, **kwargs) self.reuse = reuse params.check_required(['embedding_dim', 'sigma']) self.embedding_dim = int(params.embedding_dim) if self.embedding_dim % 2: raise ValueError('embedding_dim must be even') sigma = params.sigma self.initializer = tf.random_normal_initializer(stddev=sigma) self.add_linear_layer = params.get_or_default('add_linear_layer', True) self.linear_activation = params.get_or_default('linear_activation', 'relu') self.output_tensor_list = params.get_or_default('output_tensor_list', False) self.output_3d_tensor = params.get_or_default('output_3d_tensor', False) def build(self, input_shape): if input_shape.ndims != 2: raise ValueError('inputs of AutoDisEmbedding must have 2 dimensions.') self.num_features = int(input_shape[-1]) num_ps = get_ps_num_from_tf_config() partitioner = None if num_ps > 0: partitioner = tf.fixed_size_partitioner(num_shards=num_ps) emb_dim = self.embedding_dim // 2 self.coef = self.add_weight( 'coefficients', shape=[1, self.num_features, emb_dim], partitioner=partitioner, initializer=self.initializer) if self.add_linear_layer: self.linear = NLinear( self.num_features, self.embedding_dim, self.embedding_dim, name='nd_linear') super(PeriodicEmbedding, self).build(input_shape) def call(self, inputs, **kwargs): features = inputs[..., None] # [B, N, 1] v = 2 * math.pi * self.coef * features # [B, N, E] emb = tf.concat([tf.sin(v), tf.cos(v)], axis=-1) # [B, N, 2E] dim = self.embedding_dim if self.add_linear_layer: emb = self.linear(emb) act = get_activation(self.linear_activation) if callable(act): emb = act(emb) output = tf.reshape(emb, [-1, self.num_features * dim]) if self.output_tensor_list: return output, tf.unstack(emb, axis=1) if self.output_3d_tensor: return output, emb return output class AutoDisEmbedding(Layer): """An Embedding Learning Framework for Numerical Features in CTR Prediction. Refer: https://arxiv.org/pdf/2012.08986v2.pdf """ def __init__(self, params, name='auto_dis_embedding', reuse=None, **kwargs): super(AutoDisEmbedding, self).__init__(name=name, **kwargs) self.reuse = reuse params.check_required(['embedding_dim', 'num_bins', 'temperature']) self.emb_dim = int(params.embedding_dim) self.num_bins = int(params.num_bins) self.temperature = params.temperature self.keep_prob = params.get_or_default('keep_prob', 0.8) self.output_tensor_list = params.get_or_default('output_tensor_list', False) self.output_3d_tensor = params.get_or_default('output_3d_tensor', False) def build(self, input_shape): if input_shape.ndims != 2: raise ValueError('inputs of AutoDisEmbedding must have 2 dimensions.') self.num_features = int(input_shape[-1]) num_ps = get_ps_num_from_tf_config() partitioner = None if num_ps > 0: partitioner = tf.fixed_size_partitioner(num_shards=num_ps) self.meta_emb = self.add_weight( 'meta_embedding', shape=[self.num_features, self.num_bins, self.emb_dim], partitioner=partitioner) self.proj_w = self.add_weight( 'project_w', shape=[1, self.num_features, self.num_bins], partitioner=partitioner) self.proj_mat = self.add_weight( 'project_mat', shape=[self.num_features, self.num_bins, self.num_bins], partitioner=partitioner) super(AutoDisEmbedding, self).build(input_shape) def call(self, inputs, **kwargs): x = tf.expand_dims(inputs, axis=-1) # [B, N, 1] hidden = tf.nn.leaky_relu(self.proj_w * x) # [B, N, num_bin] # 低版本的tf(1.12) matmul 不支持广播,所以改成 einsum # y = tf.matmul(mat, hidden[..., None]) # [B, N, num_bin, 1] # y = tf.squeeze(y, axis=3) # [B, N, num_bin] y = tf.einsum('nik,bnk->bni', self.proj_mat, hidden) # [B, N, num_bin] # keep_prob(float): if dropout_flag is True, keep_prob rate to keep connect alpha = self.keep_prob x_bar = y + alpha * hidden # [B, N, num_bin] x_hat = tf.nn.softmax(x_bar / self.temperature) # [B, N, num_bin] # emb = tf.matmul(x_hat[:, :, None, :], meta_emb) # [B, N, 1, D] # emb = tf.squeeze(emb, axis=2) # [B, N, D] emb = tf.einsum('bnk,nkd->bnd', x_hat, self.meta_emb) output = tf.reshape(emb, [-1, self.emb_dim * self.num_features]) # [B, N*D] if self.output_tensor_list: return output, tf.unstack(emb, axis=1) if self.output_3d_tensor: return output, emb return output class NaryDisEmbedding(Layer): """Numerical Feature Representation with Hybrid 𝑁 -ary Encoding, CIKM 2022.. Refer: https://dl.acm.org/doi/pdf/10.1145/3511808.3557090 """ def __init__(self, params, name='nary_dis_embedding', reuse=None, **kwargs): super(NaryDisEmbedding, self).__init__(name=name, **kwargs) self.reuse = reuse self.nary_carry = custom_ops.nary_carry params.check_required(['embedding_dim', 'carries']) self.emb_dim = int(params.embedding_dim) self.carries = params.get_or_default('carries', [2, 9]) self.num_replicas = params.get_or_default('num_replicas', 1) assert self.num_replicas >= 1, 'num replicas must be >= 1' self.lengths = list(map(self.max_length, self.carries)) self.vocab_size = int(sum(self.lengths)) self.multiplier = params.get_or_default('multiplier', 1.0) self.intra_ary_pooling = params.get_or_default('intra_ary_pooling', 'sum') self.output_3d_tensor = params.get_or_default('output_3d_tensor', False) self.output_tensor_list = params.get_or_default('output_tensor_list', False) logging.info( '{} carries: {}, lengths: {}, vocab_size: {}, intra_ary: {}, replicas: {}, multiplier: {}' .format(self.name, ','.join(map(str, self.carries)), ','.join(map(str, self.lengths)), self.vocab_size, self.intra_ary_pooling, self.num_replicas, self.multiplier)) @staticmethod def max_length(carry): bits = math.log(4294967295, carry) return (math.floor(bits) + 1) * carry def build(self, input_shape): assert isinstance(input_shape, tf.TensorShape), 'NaryDisEmbedding only takes 1 input' self.num_features = int(input_shape[-1]) logging.info('%s has %d input features', self.name, self.num_features) vocab_size = self.num_features * self.vocab_size emb_dim = self.emb_dim * self.num_replicas num_ps = get_ps_num_from_tf_config() partitioner = None if num_ps > 0: partitioner = tf.fixed_size_partitioner(num_shards=num_ps) self.embedding_table = self.add_weight( 'embed_table', shape=[vocab_size, emb_dim], partitioner=partitioner) super(NaryDisEmbedding, self).build(input_shape) def call(self, inputs, **kwargs): if inputs.shape.ndims != 2: raise ValueError('inputs of NaryDisEmbedding must have 2 dimensions.') if self.multiplier != 1.0: inputs *= self.multiplier inputs = tf.to_int32(inputs) offset, emb_indices, emb_splits = 0, [], [] with ops.device('/CPU:0'): for carry, length in zip(self.carries, self.lengths): values, splits = self.nary_carry(inputs, carry=carry, offset=offset) offset += length emb_indices.append(values) emb_splits.append(splits) indices = tf.concat(emb_indices, axis=0) splits = tf.concat(emb_splits, axis=0) # embedding shape: [B*N*C, D] embedding = tf.nn.embedding_lookup(self.embedding_table, indices) total_length = tf.size(splits) if self.intra_ary_pooling == 'sum': if tf.__version__ >= '2.0': segment_ids = tf.repeat(tf.range(total_length), repeats=splits) else: segment_ids = repeat(tf.range(total_length), repeats=splits) embedding = tf.math.segment_sum(embedding, segment_ids) elif self.intra_ary_pooling == 'mean': if tf.__version__ >= '2.0': segment_ids = tf.repeat(tf.range(total_length), repeats=splits) else: segment_ids = repeat(tf.range(total_length), repeats=splits) embedding = tf.math.segment_mean(embedding, segment_ids) else: raise ValueError('Unsupported intra ary pooling method %s' % self.intra_ary_pooling) # B: batch size # N: num features # C: num carries # D: embedding dimension # R: num replicas # shape of embedding: [B*N*C, R*D] N = self.num_features C = len(self.carries) D = self.emb_dim if self.num_replicas == 1: embedding = tf.reshape(embedding, [C, -1, D]) # [C, B*N, D] embedding = tf.transpose(embedding, perm=[1, 0, 2]) # [B*N, C, D] embedding = tf.reshape(embedding, [-1, C * D]) # [B*N, C*D] output = tf.reshape(embedding, [-1, N * C * D]) # [B, N*C*D] if self.output_tensor_list: return output, tf.split(embedding, N) # [B, C*D] * N if self.output_3d_tensor: embedding = tf.reshape(embedding, [-1, N, C * D]) # [B, N, C*D] return output, embedding return output # self.num_replicas > 1: replicas = tf.split(embedding, self.num_replicas, axis=1) outputs = [] outputs2 = [] for replica in replicas: # shape of replica: [B*N*C, D] embedding = tf.reshape(replica, [C, -1, D]) # [C, B*N, D] embedding = tf.transpose(embedding, perm=[1, 0, 2]) # [B*N, C, D] embedding = tf.reshape(embedding, [-1, C * D]) # [B*N, C*D] output = tf.reshape(embedding, [-1, N * C * D]) # [B, N*C*D] outputs.append(output) if self.output_tensor_list: embedding = tf.split(embedding, N) # [B, C*D] * N outputs2.append(embedding) elif self.output_3d_tensor: embedding = tf.reshape(embedding, [-1, N, C * D]) # [B, N, C*D] outputs2.append(embedding) return outputs + outputs2