easy_rec/python/compat/sok_optimizer.py (308 lines of code) (raw):
#
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import tensorflow as tf
from tensorflow.python.eager import context
# from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
# from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import gradients
from tensorflow.python.ops import resource_variable_ops
from tensorflow.python.ops import state_ops
from easy_rec.python.compat.dynamic_variable import DynamicVariable
def OptimizerWrapper(optimizer):
"""Abbreviated as ``sok.experiment.OptimizerWrapper``.
This is a wrapper for tensorflow optimizer so that it can update
dynamic_variable.DynamicVariable.
Parameters
----------
optimizer: tensorflow optimizer
The original tensorflow optimizer.
Example
-------
.. code-block:: python
import numpy as np
import tensorflow as tf
import horovod.tensorflow as hvd
from sparse_operation_kit import experiment as sok
v = dynamic_variable.DynamicVariable(dimension=3, initializer="13")
indices = tf.convert_to_tensor([0, 1, 2**40], dtype=tf.int64)
with tf.GradientTape() as tape:
embedding = tf.nn.embedding_lookup(v, indices)
print("embedding:", embedding)
loss = tf.reduce_sum(embedding)
grads = tape.gradient(loss, [v])
optimizer = tf.keras.optimizers.SGD(learning_rate=1.0)
optimizer = sok.OptimizerWrapper(optimizer)
optimizer.apply_gradients(zip(grads, [v]))
embedding = tf.nn.embedding_lookup(v, indices)
print("embedding:", embedding)
"""
# a specific code path for dl framework tf2.11.0
try:
if isinstance(optimizer, tf.keras.optimizers.legacy.Optimizer):
return OptimizerWrapperV2(optimizer)
except Exception:
pass
if isinstance(optimizer, tf.keras.optimizers.Optimizer):
return OptimizerWrapperV2(optimizer)
else:
return OptimizerWrapperV1(optimizer)
class OptimizerWrapperV1(object):
def __init__(self, optimizer):
self._optimizer = optimizer
# slots
unused = tf.Variable([0.0],
dtype=tf.float32,
name='unused',
trainable=False)
self._optimizer._create_slots([unused])
names, slots = [], []
for name in self._optimizer.get_slot_names():
names.append(name)
slots.append(self._optimizer.get_slot(unused, name))
unused_key = self._var_key(unused)
for name in names:
assert unused_key in self._optimizer._slots[name]
self._optimizer._slots[name].pop(unused_key)
self._initial_vals = {}
for i, name in enumerate(names):
self._initial_vals[name] = slots[i]
# self._optimizer._prepare()
def compute_gradients(self,
loss,
var_list=None,
aggregation_method=None,
colocate_gradients_with_ops=False,
grad_loss=None):
self._loss = loss
tmp_grads = gradients.gradients(loss, var_list)
return list(zip(tmp_grads, var_list))
# TODO: the following routine does not work with DynamicVariable
# return self._optimizer.compute_gradients(loss=loss, var_list=var_list,
# # gate_gradients=gate_gradients,
# aggregation_method=aggregation_method,
# colocate_gradients_with_ops=colocate_gradients_with_ops,
# grad_loss=grad_loss)
def _var_key(self, var):
if isinstance(var, DynamicVariable):
return (var._tf_handle.op.graph, var._tf_handle.op.name)
else:
return (var.op.graph, var.op.name)
def _create_slots(self, vars):
for var in vars:
if isinstance(var, DynamicVariable):
self._create_slots_dynamic(var)
else:
self._optimizer._create_slots(var)
def _create_slots_dynamic(self, var):
key = self._var_key(var)
for slot_name in self._initial_vals:
if key not in self._optimizer._slots[slot_name]:
if var.backend_type == 'hbm':
with ops.colocate_with(var):
slot = DynamicVariable(
dimension=var.dimension,
initializer=self._initial_vals[slot_name],
name='DynamicSlot',
trainable=False)
else:
tmp_config = var.config_dict
# tmp_initializer = var.initializer_str
with ops.colocate_with(var):
slot = DynamicVariable(
dimension=var.dimension,
initializer=self._initial_vals[slot_name],
var_type=var.backend_type,
name='DynamicSlot',
trainable=False,
**tmp_config)
self._optimizer._slots[slot_name][key] = slot
def get_slot_names(self):
return self._optimizer.get_slot_names()
def get_slot(self, var, slot_name):
key = self._var_key(var)
return self._optimizer._slots[slot_name][key]
@property
def _slots(self):
return self._optimizer._slots
def apply_gradients(self, grads_and_vars, global_step=None, name=None):
gradients = grads_and_vars
sparse_vars = [x for x in gradients if 'DynamicVariable' in str(type(x[1]))]
dense_vars = [
x for x in gradients if 'DynamicVariable' not in str(type(x[1]))
]
def _dummy_finish(update_ops, name_scope):
return update_ops
finish_func = self._optimizer._finish
self._optimizer._finish = _dummy_finish
with ops.control_dependencies([array_ops.identity(self._loss)]):
sparse_grad_updates = self.apply_sparse_gradients(sparse_vars, name=name)
dense_grad_updates = self._optimizer.apply_gradients(
dense_vars, global_step=None, name=name)
if sparse_grad_updates is not None and dense_grad_updates is not None:
grad_updates = sparse_grad_updates + dense_grad_updates
elif sparse_grad_updates is not None:
grad_updates = sparse_grad_updates
elif dense_grad_updates is not None:
grad_updates = dense_grad_updates
assert global_step is not None
with ops.control_dependencies([finish_func(grad_updates, 'update')]):
with ops.colocate_with(global_step):
if isinstance(global_step, resource_variable_ops.BaseResourceVariable):
# TODO(apassos): the implicit read in assign_add is slow; consider
# making it less so.
apply_updates = resource_variable_ops.assign_add_variable_op(
global_step.handle,
ops.convert_to_tensor(1, dtype=global_step.dtype),
name=name)
else:
apply_updates = state_ops.assign_add(global_step, 1, name=name)
if not context.executing_eagerly():
if isinstance(apply_updates, ops.Tensor):
apply_updates = apply_updates.op
train_op = ops.get_collection_ref(ops.GraphKeys.TRAIN_OP)
if apply_updates not in train_op:
train_op.append(apply_updates)
return apply_updates
def apply_sparse_gradients(self, grads_and_vars, global_step=None, name=None):
# 1. Create slots and do sparse_read
to_static_ops = []
grad_list, var_list = [], []
for g, v in grads_and_vars:
if g is not None:
unique, indices = tf.unique(g.indices)
grad_list.append(ops.IndexedSlices(g.values, indices, g.dense_shape))
# TODO: Check multi-thread safety of DET
with tf.control_dependencies([g.values]):
to_static_ops.append(v.to_static(unique, False))
var_list.append(v)
key = self._var_key(v)
for slot_name in self._initial_vals:
if key not in self._optimizer._slots[slot_name]:
tmp_slot_var_name = v._dummy_handle.op.name + '/' + self._optimizer._name
if v.backend_type == 'hbm':
with ops.colocate_with(v):
slot = DynamicVariable(
dimension=v.dimension,
initializer=self._initial_vals[slot_name],
name=tmp_slot_var_name,
trainable=False,
)
else:
tmp_config = v.config_dict
# tmp_initializer = v.initializer_str
with ops.colocate_with(v):
slot = DynamicVariable(
dimension=v.dimension,
initializer=self._initial_vals[slot_name],
var_type=v.backend_type,
name=tmp_slot_var_name,
trainable=False,
**tmp_config)
self._optimizer._slots[slot_name][key] = slot
else:
slot = self._optimizer._slots[slot_name][key]
to_static_ops.append(slot.to_static(unique))
if len(grad_list) == 0:
return
# 3. Call tf-optimizer
with ops.control_dependencies(to_static_ops):
train_op = self._optimizer.apply_gradients(
zip(grad_list, var_list), global_step=global_step, name=name)
# 5. Write buffer back to dynamic variables
to_dynamic_ops = []
if not isinstance(train_op, list):
train_op = [train_op]
with ops.control_dependencies(train_op):
for v in var_list:
key = self._var_key(v)
to_dynamic_ops.append(v.to_dynamic())
for name in self._initial_vals:
slot = self._optimizer._slots[name][key]
to_dynamic_ops.append(slot.to_dynamic())
return to_dynamic_ops
class OptimizerWrapperV2(object):
def __init__(self, optimizer):
self._optimizer = optimizer
# slots
if tf.__version__[0] == '1':
unused = tf.Variable([0.0],
name='unused',
trainable=False,
use_resource=True)
else:
unused = tf.Variable([0.0], name='unused', trainable=False)
self._optimizer._create_slots([unused])
names, slots = [], []
for name in self._optimizer.get_slot_names():
names.append(name)
slots.append(self._optimizer.get_slot(unused, name))
unused_key = self._var_key(unused)
if unused_key in self._optimizer._slots:
self._optimizer._slots.pop(unused_key)
self._initial_vals = {}
for i, name in enumerate(names):
self._initial_vals[name] = slots[i]
self._iterations = tf.Variable(0)
@property
def lr(self):
return self._optimizer.lr
def _create_slots(self, vars):
for tmp_var in vars:
if isinstance(tmp_var, DynamicVariable):
self._create_slots_dynamic(tmp_var)
else:
self._optimizer._create_slots(tmp_var)
def _create_slots_dynamic(self, var):
key = self._var_key(var)
if key not in self._optimizer._slots:
self._optimizer._slots[key] = {}
for slot_name in self._initial_vals:
if slot_name not in self._optimizer._slots[key]:
if var.backend_type == 'hbm':
slot = DynamicVariable(
dimension=var.dimension,
initializer=self._initial_vals[slot_name],
name='DynamicSlot',
trainable=False,
)
else:
tmp_config = var.config_dict
# tmp_initializer = var.initializer_str
slot = DynamicVariable(
dimension=var.dimension,
initializer=self._initial_vals[slot_name],
var_type=var.backend_type,
name='DynamicSlot',
trainable=False,
**tmp_config)
self._optimizer._slots[key][slot_name] = slot
def _var_key(self, var):
if hasattr(var, '_distributed_container'):
var = var._distributed_container()
if var._in_graph_mode:
return var._shared_name
return var._unique_id
def get_slot_names(self):
return self._optimizer.get_slot_names()
def get_slot(self, var, name):
return self._optimizer.get_slot(var, name)
@property
def _slots(self):
return self._optimizer._slots
def apply_gradients(self, grads_and_vars, global_step=None, name=None):
# 1. Create slots and do sparse_read
to_static_ops = []
grad_list, var_list = [], []
for g, v in grads_and_vars:
if g is not None:
unique, indices = tf.unique(g.indices)
grad_list.append(ops.IndexedSlices(g.values, indices, g.dense_shape))
# TODO: Check multi-thread safety of DET
# with tf.control_dependencies([g.values]):
to_static_ops.append(v.to_static(unique))
var_list.append(v)
key = self._var_key(v)
if key not in self._optimizer._slots:
self._optimizer._slots[key] = {}
for slot_name in self._initial_vals:
if slot_name not in self._optimizer._slots[key]:
if v.backend_type == 'hbm':
slot = DynamicVariable(
dimension=v.dimension,
initializer=self._initial_vals[slot_name],
name='DynamicSlot',
trainable=False,
)
else:
tmp_config = v.config_dict
# tmp_initializer = v.initializer_str
slot = DynamicVariable(
dimension=v.dimension,
initializer=self._initial_vals[slot_name],
var_type=v.backend_type,
name='DynamicSlot',
trainable=False,
**tmp_config)
self._optimizer._slots[key][slot_name] = slot
else:
slot = self._optimizer._slots[key][slot_name]
to_static_ops.append(slot.to_static(unique))
if len(grad_list) == 0:
return
# 2. Switch iterations
iterations = self._optimizer._iterations
self._optimizer._iterations = self._iterations
# 3. Call tf-optimizer
with tf.control_dependencies(to_static_ops):
train_op = self._optimizer.apply_gradients(
zip(grad_list, var_list), name=name)
# 4. Switch iterations
self._optimizer._iterations = iterations
# 5. Write buffer back to dynamic variables
to_dynamic_ops = []
with tf.control_dependencies([train_op]):
for v in var_list:
key = self._var_key(v)
to_dynamic_ops.append(v.to_dynamic())
for name in self._initial_vals:
slot = self._optimizer._slots[key][name]
to_dynamic_ops.append(slot.to_dynamic())
return tf.group(to_dynamic_ops)
class SGD(object):
def __init__(self, lr):
self._lr = tf.Variable(lr)
@property
def lr(self):
return self._lr
def apply_gradients(self, grads_and_vars, global_step=None, name=None):
train_ops = []
for g, v in grads_and_vars:
if g is not None:
scaled_g = ops.IndexedSlices(g.values * self._lr, g.indices,
g.dense_shape)
train_ops.append(v.scatter_sub(scaled_g))
return tf.group(train_ops)