in tensorflow_estimator/python/estimator/canned/linear_optimizer/python/utils/sdca_ops.py [0:0]
def minimize(self, global_step=None, name=None):
"""Add operations to train a linear model by minimizing the loss function.
Args:
global_step: Optional `Variable` to increment by one after the variables
have been updated.
name: Optional name for the returned operation.
Returns:
An Operation that updates the variables passed in the constructor.
"""
# Technically, the op depends on a lot more than the variables,
# but we'll keep the list short.
with name_scope(name, 'sdca/minimize'):
sparse_example_indices = []
sparse_feature_indices = []
sparse_features_values = []
for sf in self._examples['sparse_features']:
sparse_example_indices.append(sf.example_indices)
sparse_feature_indices.append(sf.feature_indices)
# If feature values are missing, sdca assumes a value of 1.0f.
if sf.feature_values is not None:
sparse_features_values.append(sf.feature_values)
example_ids_hashed = tf.compat.v1.train.sdca_fprint(
internal_convert_to_tensor(self._examples['example_ids']))
example_state_data = self._hashtable.lookup(example_ids_hashed)
# Solver returns example_state_update, new delta sparse_feature_weights
# and delta dense_feature_weights.
sparse_weights = []
sparse_indices = []
# If we have partitioned variables, keep a few dictionaries of Tensors
# around that we need for the assign_add after the op call to
# gen_sdca_ops.sdca_optimizer(). These are keyed because we may have a
# mix of partitioned and un-partitioned variables.
num_partitions_by_var = {}
p_assignments_by_var = {}
gather_ids_by_var = {}
for v_num, (w, i) in enumerate(
zip(self._slots['unshrunk_sparse_features_weights'],
sparse_feature_indices)):
# Append the sparse_indices (in full-variable space).
sparse_idx = tf.cast(
tf.unique(tf.cast(i, tf.dtypes.int32))[0], tf.dtypes.int64)
sparse_indices.append(sparse_idx)
if isinstance(w, list) or isinstance(w, var_ops.PartitionedVariable):
num_partitions = len(w)
flat_ids = tf.reshape(sparse_idx, [-1])
# We use div partitioning, which is easiest to support downstream.
# Compute num_total_ids as the sum of dim-0 of w, then assign
# to partitions based on a constant number of ids per partition.
# Optimize if we already know the full shape statically.
dim_0_size = self._get_first_dimension_size_statically(
w, num_partitions)
if tf.compat.dimension_value(dim_0_size):
num_total_ids = tf.constant(
tf.compat.dimension_value(dim_0_size), flat_ids.dtype)
else:
dim_0_sizes = []
for p in range(num_partitions):
if tf.compat.dimension_value(w[p].shape[0]) is not None:
dim_0_sizes.append(tf.compat.dimension_value(w[p].shape[0]))
else:
with ops.colocate_with(w[p]):
dim_0_sizes.append(tf.compat.v1.shape(w[p])[0])
num_total_ids = tf.math.reduce_sum(
tf.cast(tf.stack(dim_0_sizes), flat_ids.dtype))
ids_per_partition = num_total_ids // num_partitions
extras = num_total_ids % num_partitions
p_assignments = tf.math.maximum(flat_ids // (ids_per_partition + 1),
(flat_ids - extras) //
ids_per_partition)
# Emulate a conditional using a boolean indicator tensor
new_ids = tf.where(p_assignments < extras,
flat_ids % (ids_per_partition + 1),
(flat_ids - extras) % ids_per_partition)
# Cast partition assignments to int32 for use in dynamic_partition.
# There really should not be more than 2^32 partitions.
p_assignments = tf.cast(p_assignments, tf.dtypes.int32)
# Partition list of ids based on assignments into num_partitions
# separate lists.
gather_ids = tf.dynamic_partition(new_ids, p_assignments,
num_partitions)
# Add these into the dictionaries for use in the later update.
num_partitions_by_var[v_num] = num_partitions
p_assignments_by_var[v_num] = p_assignments
gather_ids_by_var[v_num] = gather_ids
# Gather the weights from each partition.
partition_gathered_weights = []
for p in range(num_partitions):
with ops.colocate_with(w[p]):
partition_gathered_weights.append(
tf.compat.v1.gather(w[p], gather_ids[p]))
# Stitch the weights back together in the same order they were before
# we dynamic_partitioned them.
condition_indices = tf.dynamic_partition(
tf.range(tf.compat.v1.shape(new_ids)[0]), p_assignments,
num_partitions)
batch_gathered_weights = tf.dynamic_stitch(
condition_indices, partition_gathered_weights)
else:
w_as_tensor = internal_convert_to_tensor(w)
with tf.compat.v1.device(w_as_tensor.device):
batch_gathered_weights = tf.compat.v1.gather(
w_as_tensor, sparse_idx)
sparse_weights.append(batch_gathered_weights)
if tf.compat.forward_compatible(year=2018, month=10, day=30):
esu, sfw, dfw = gen_sdca_ops.sdca_optimizer_v2(
sparse_example_indices,
sparse_feature_indices,
sparse_features_values,
self._convert_n_to_tensor(self._examples['dense_features']),
internal_convert_to_tensor(self._examples['example_weights']),
internal_convert_to_tensor(self._examples['example_labels']),
sparse_indices,
sparse_weights,
self._convert_n_to_tensor(
self._slots['unshrunk_dense_features_weights']),
example_state_data,
loss_type=self._options['loss_type'],
l1=self._symmetric_l1_regularization(),
l2=self._symmetric_l2_regularization(),
num_loss_partitions=self._num_loss_partitions(),
num_inner_iterations=1,
adaptive=self._adaptive())
else:
esu, sfw, dfw = tf.compat.v1.train.sdca_optimizer(
sparse_example_indices,
sparse_feature_indices,
sparse_features_values,
self._convert_n_to_tensor(self._examples['dense_features']),
internal_convert_to_tensor(self._examples['example_weights']),
internal_convert_to_tensor(self._examples['example_labels']),
sparse_indices,
sparse_weights,
self._convert_n_to_tensor(
self._slots['unshrunk_dense_features_weights']),
example_state_data,
loss_type=self._options['loss_type'],
l1=self._symmetric_l1_regularization(),
l2=self._symmetric_l2_regularization(),
num_loss_partitions=self._num_loss_partitions(),
num_inner_iterations=1,
adaptative=self._adaptive())
with tf.control_dependencies([esu]):
update_ops = [self._hashtable.insert(example_ids_hashed, esu)]
# Update the weights before the proximal step.
for v_num, (w, i, u) in enumerate(
zip(self._slots['unshrunk_sparse_features_weights'], sparse_indices,
sfw)):
if (isinstance(w, var_ops.PartitionedVariable) or
isinstance(w, list)):
update_ops += self._get_partitioned_update_ops(
v_num, num_partitions_by_var, p_assignments_by_var,
gather_ids_by_var, w, u, p_assignments, num_partitions)
else:
update_ops.append(tf.compat.v1.scatter_add(w, i, u))
for w, u in zip(self._slots['unshrunk_dense_features_weights'], dfw):
if (isinstance(w, var_ops.PartitionedVariable) or
isinstance(w, list)):
split_updates = tf.split(
u, num_or_size_splits=[v.shape.as_list()[0] for v in w])
for v, split_update in zip(w, split_updates):
update_ops.append(tf.compat.v1.assign_add(v, split_update))
else:
update_ops.append(tf.compat.v1.assign_add(w, u))
if global_step is None:
return tf.group(*update_ops)
with tf.control_dependencies(update_ops):
return tf.compat.v1.assign_add(global_step, 1, name=name).op