in tensorflow/tensorflow/python/keras/layers/normalization.py [0:0]
def build(self, input_shape):
input_shape = tensor_shape.TensorShape(input_shape)
if not input_shape.ndims:
raise ValueError('Input has undefined rank:', input_shape)
ndims = len(input_shape)
# Convert axis to list and resolve negatives
if isinstance(self.axis, int):
self.axis = [self.axis]
for idx, x in enumerate(self.axis):
if x < 0:
self.axis[idx] = ndims + x
# Validate axes
for x in self.axis:
if x < 0 or x >= ndims:
raise ValueError('Invalid axis: %d' % x)
if len(self.axis) != len(set(self.axis)):
raise ValueError('Duplicate axis: %s' % self.axis)
if self.virtual_batch_size is not None:
if self.virtual_batch_size <= 0:
raise ValueError('virtual_batch_size must be a positive integer that '
'divides the true batch size of the input Tensor')
# If using virtual batches, the first dimension must be the batch
# dimension and cannot be the batch norm axis
if 0 in self.axis:
raise ValueError('When using virtual_batch_size, the batch dimension '
'must be 0 and thus axis cannot include 0')
if self.adjustment is not None:
raise ValueError('When using virtual_batch_size, adjustment cannot '
'be specified')
if self.fused in (None, True):
# TODO(yaozhang): if input is not 4D, reshape it to 4D and reshape the
# output back to its original shape accordingly.
if self._USE_V2_BEHAVIOR:
if self.fused is None:
self.fused = (ndims == 4)
elif self.fused and ndims != 4:
raise ValueError('Batch normalization layers with fused=True only '
'support 4D input tensors.')
else:
assert self.fused is not None
self.fused = (ndims == 4 and self._fused_can_be_used())
# TODO(chrisying): fused batch norm is currently not supported for
# multi-axis batch norm and by extension virtual batches. In some cases,
# it might be possible to use fused batch norm but would require reshaping
# the Tensor to 4D with the axis in 1 or 3 (preferred 1) which is
# particularly tricky. A compromise might be to just support the most
# common use case (turning 5D w/ virtual batch to NCHW)
if self.fused:
if self.axis == [1]:
self._data_format = 'NCHW'
elif self.axis == [3]:
self._data_format = 'NHWC'
else:
raise ValueError('Unsupported axis, fused batch norm only supports '
'axis == [1] or axis == [3]')
axis_to_dim = {x: input_shape.dims[x].value for x in self.axis}
for x in axis_to_dim:
if axis_to_dim[x] is None:
raise ValueError('Input has undefined `axis` dimension. Input shape: ',
input_shape)
self.input_spec = InputSpec(ndim=ndims, axes=axis_to_dim)
if len(axis_to_dim) == 1 and self.virtual_batch_size is None:
# Single axis batch norm (most common/default use-case)
param_shape = (list(axis_to_dim.values())[0],)
else:
# Parameter shape is the original shape but with 1 in all non-axis dims
param_shape = [axis_to_dim[i] if i in axis_to_dim
else 1 for i in range(ndims)]
if self.virtual_batch_size is not None:
# When using virtual batches, add an extra dim at index 1
param_shape.insert(1, 1)
for idx, x in enumerate(self.axis):
self.axis[idx] = x + 1 # Account for added dimension
if self.scale:
self.gamma = self.add_weight(
name='gamma',
shape=param_shape,
dtype=self._param_dtype,
initializer=self.gamma_initializer,
regularizer=self.gamma_regularizer,
constraint=self.gamma_constraint,
trainable=True,
experimental_autocast=False)
else:
self.gamma = None
if self.fused:
self._gamma_const = K.constant(
1.0, dtype=self._param_dtype, shape=param_shape)
if self.center:
self.beta = self.add_weight(
name='beta',
shape=param_shape,
dtype=self._param_dtype,
initializer=self.beta_initializer,
regularizer=self.beta_regularizer,
constraint=self.beta_constraint,
trainable=True,
experimental_autocast=False)
else:
self.beta = None
if self.fused:
self._beta_const = K.constant(
0.0, dtype=self._param_dtype, shape=param_shape)
try:
# Disable variable partitioning when creating the moving mean and variance
if hasattr(self, '_scope') and self._scope:
partitioner = self._scope.partitioner
self._scope.set_partitioner(None)
else:
partitioner = None
self.moving_mean = self.add_weight(
name='moving_mean',
shape=param_shape,
dtype=self._param_dtype,
initializer=self.moving_mean_initializer,
synchronization=tf_variables.VariableSynchronization.ON_READ,
trainable=False,
aggregation=tf_variables.VariableAggregation.MEAN,
experimental_autocast=False)
self.moving_variance = self.add_weight(
name='moving_variance',
shape=param_shape,
dtype=self._param_dtype,
initializer=self.moving_variance_initializer,
synchronization=tf_variables.VariableSynchronization.ON_READ,
trainable=False,
aggregation=tf_variables.VariableAggregation.MEAN,
experimental_autocast=False)
if self.renorm:
# In batch renormalization we track the inference moving stddev instead
# of the moving variance to more closely align with the paper.
def moving_stddev_initializer(*args, **kwargs):
return math_ops.sqrt(
self.moving_variance_initializer(*args, **kwargs))
with distribution_strategy_context.get_strategy(
).extended.colocate_vars_with(self.moving_variance):
self.moving_stddev = self.add_weight(
name='moving_stddev',
shape=param_shape,
dtype=self._param_dtype,
initializer=moving_stddev_initializer,
synchronization=tf_variables.VariableSynchronization.ON_READ,
trainable=False,
aggregation=tf_variables.VariableAggregation.MEAN,
experimental_autocast=False)
# Create variables to maintain the moving mean and standard deviation.
# These are used in training and thus are different from the moving
# averages above. The renorm variables are colocated with moving_mean
# and moving_stddev.
# NOTE: below, the outer `with device` block causes the current device
# stack to be cleared. The nested ones use a `lambda` to set the desired
# device and ignore any devices that may be set by the custom getter.
def _renorm_variable(name,
shape,
initializer=init_ops.zeros_initializer()):
"""Create a renorm variable."""
var = self.add_weight(
name=name,
shape=shape,
dtype=self._param_dtype,
initializer=initializer,
synchronization=tf_variables.VariableSynchronization.ON_READ,
trainable=False,
aggregation=tf_variables.VariableAggregation.MEAN,
experimental_autocast=False)
return var
with distribution_strategy_context.get_strategy(
).extended.colocate_vars_with(self.moving_mean):
self.renorm_mean = _renorm_variable('renorm_mean', param_shape,
self.moving_mean_initializer)
with distribution_strategy_context.get_strategy(
).extended.colocate_vars_with(self.moving_stddev):
self.renorm_stddev = _renorm_variable('renorm_stddev', param_shape,
moving_stddev_initializer)
finally:
if partitioner:
self._scope.set_partitioner(partitioner)
self.built = True