in tensor2tensor/utils/data_reader.py [0:0]
def input_fn(dataset,
filepattern,
skip_random_fraction_when_training,
batch_size_means_tokens_param,
batch_size_multiplier,
max_length,
mode,
hparams,
data_dir=None,
params=None,
config=None,
force_repeat=False,
prevent_repeat=False):
"""Builds input pipeline for problem.
Args:
dataset: the dataset to make input function from.
filepattern: the pattern of files to read from.
skip_random_fraction_when_training: whether to skip randomly when training.
batch_size_means_tokens_param: whether batch size should mean tokens.
batch_size_multiplier: how to multiply batch size when bucketing.
max_length: maximum length,
mode: tf.estimator.ModeKeys
hparams: HParams, model hparams
data_dir: str, data directory; if None, will use hparams.data_dir
params: dict, may include "batch_size"
config: RunConfig; should have the data_parallelism attribute if not using
TPU
force_repeat: bool, whether to repeat the data even if not training
prevent_repeat: bool, whether to not repeat when in training mode.
Overrides force_repeat.
Returns:
(features_dict<str name, Tensor feature>, Tensor targets)
"""
is_training = mode == tf.estimator.ModeKeys.TRAIN
if config and config.use_tpu:
num_threads = 64
else:
num_threads = cpu_count() if is_training else 1
if config and hasattr(config,
"data_parallelism") and config.data_parallelism:
num_shards = config.data_parallelism.n
else:
num_shards = 1
mlperf_log.transformer_print(
key=mlperf_log.INPUT_MAX_LENGTH, value=max_length)
def tpu_valid_size(example):
return example_valid_size(example, hparams.min_length, max_length)
def gpu_valid_size(example):
drop_long_sequences = is_training or hparams.eval_drop_long_sequences
max_validate_length = max_length if drop_long_sequences else 10**9
return example_valid_size(example, hparams.min_length, max_validate_length)
def define_shapes(example):
batch_size = config and config.use_tpu and params["batch_size"]
return standardize_shapes(example, batch_size=batch_size)
# Read and preprocess
data_dir = data_dir or (hasattr(hparams, "data_dir") and hparams.data_dir)
if (force_repeat or is_training) and not prevent_repeat:
# Repeat and skip a random number of records
dataset = dataset.repeat()
if is_training and skip_random_fraction_when_training:
data_files = contrib.slim().parallel_reader.get_data_files(filepattern)
# In continuous_train_and_eval when switching between train and
# eval, this input_fn method gets called multiple times and it
# would give you the exact same samples from the last call
# (because the Graph seed is set). So this skip gives you some
# shuffling.
dataset = skip_random_fraction(dataset, data_files[0])
dataset = dataset.map(cast_ints_to_int32, num_parallel_calls=num_threads)
if batch_size_means_tokens_param:
batch_size_means_tokens = True
else:
if _are_shapes_fully_defined(dataset.output_shapes):
batch_size_means_tokens = False
else:
tf.logging.warning(
"Shapes are not fully defined. Assuming batch_size means tokens.")
batch_size_means_tokens = True
# Batching
if not batch_size_means_tokens:
# Batch size means examples per datashard.
if config and config.use_tpu:
# on TPU, we use params["batch_size"], which specifies the number of
# examples across all datashards
batch_size = params["batch_size"]
dataset = dataset.batch(batch_size, drop_remainder=True)
else:
batch_size = hparams.batch_size * num_shards
dataset = dataset.batch(batch_size)
else:
# batch_size means tokens per datashard
if config and config.use_tpu:
dataset = dataset.filter(tpu_valid_size)
padded_shapes = pad_for_tpu(dataset.output_shapes, hparams, max_length)
# on TPU, we use params["batch_size"], which specifies the number of
# examples across all datashards
batch_size = params["batch_size"]
if hparams.pad_batch:
tf.logging.warn(
"Padding the batch to ensure that remainder eval batches are "
"processed. This may lead to incorrect metrics for "
"non-zero-padded features, e.g. images. Use a smaller batch "
"size that has no remainder in that case.")
dataset = dataset.padded_batch(
batch_size, padded_shapes, drop_remainder=False)
dataset = dataset.map(
functools.partial(pad_batch, batch_multiple=batch_size),
num_parallel_calls=num_threads)
else:
dataset = dataset.padded_batch(
batch_size, padded_shapes, drop_remainder=True)
else:
# On GPU, bucket by length
dataset = dataset.filter(gpu_valid_size)
cur_batching_scheme = hparams_to_batching_scheme(
hparams,
shard_multiplier=num_shards,
length_multiplier=batch_size_multiplier)
if hparams.use_fixed_batch_size:
# Here batch_size really means examples per datashard.
cur_batching_scheme["batch_sizes"] = [hparams.batch_size]
cur_batching_scheme["boundaries"] = []
dataset = dataset.apply(
tf.data.experimental.bucket_by_sequence_length(
example_length, cur_batching_scheme["boundaries"],
cur_batching_scheme["batch_sizes"]))
if not is_training:
batch_multiple = num_shards
if hparams.use_fixed_batch_size:
# Make sure the last batch has the same fixed size as the rest.
batch_multiple *= hparams.batch_size
if batch_multiple > 1:
tf.logging.warn(
"Padding the batch to ensure that remainder eval batches have "
"a batch size divisible by the number of data shards. This may "
"lead to incorrect metrics for non-zero-padded features, e.g. "
"images. Use a single datashard (i.e. 1 GPU) in that case.")
dataset = dataset.map(
functools.partial(pad_batch, batch_multiple=batch_multiple),
num_parallel_calls=num_threads)
dataset = dataset.map(define_shapes, num_parallel_calls=num_threads)
# Add shuffling for training batches. This is necessary along with record
# level shuffling in the dataset generation. Record shuffling will shuffle
# the examples. However, in some cases, it's possible that the shuffle
# buffer size for record shuffling is smaller than the batch size. In such
# cases, adding batch shuffling ensures that the data is in random order
# during training
if (is_training and hasattr(hparams, "batch_shuffle_size") and
hparams.batch_shuffle_size):
dataset = dataset.shuffle(hparams.batch_shuffle_size)
# Split batches into chunks if targets are too long.
# The new "chunk_number" feature is 0 for the first chunk and goes up then.
# Chunks are reversed so the 0th chunk comes first, then the 1st and so on,
# so models can attend to them in the order they arrive. The last chunk is
# usually the one containing the end of the target sentence (EOS).
chunk_length = hparams.get("split_targets_chunk_length", 0)
max_chunks = hparams.get("split_targets_max_chunks", 100)
if chunk_length > 0:
def is_nonzero_chunk(example):
"""A chunk is zero if all targets are 0s."""
return tf.less(0, tf.reduce_sum(tf.abs(example["targets"])))
def split_on_length(example):
"""Split a batch of ditcs on length."""
x = example["targets"]
# TODO(kitaev): This code breaks if chunk_length * max_chunks < batch_size
length_diff = chunk_length * max_chunks - tf.shape(x)[1]
padded_x = tf.pad(x, [(0, 0), (0, length_diff), (0, 0), (0, 0)])
chunks = [padded_x[:, i*chunk_length:(i+1)*chunk_length, :, :]
for i in range(max_chunks - 1)]
chunks.append(padded_x[:, (max_chunks - 1)*chunk_length:, :, :])
new_example = {}
# Setting chunk_number to be tf.range(max_chunks) is incompatible with TPU
new_example["chunk_number"] = tf.concat([
tf.expand_dims(tf.ones_like(c) * n, axis=0)
for n, c in enumerate(chunks)
],
axis=0)
new_example["targets"] = tf.concat(
[tf.expand_dims(c, axis=0) for c in chunks], axis=0)
for k in example:
if k != "targets":
assert k != "chunk_number", (
"Chunking code expects the chunk_number feature name to be "
"available"
)
new_example[k] = tf.concat(
[tf.expand_dims(example[k], axis=0) for _ in range(max_chunks)],
axis=0)
return tf.data.Dataset.from_tensor_slices(new_example)
dataset = dataset.flat_map(split_on_length)
dataset = dataset.filter(is_nonzero_chunk)
# The chunking data pipeline thus far creates batches of examples where all
# of the examples have the same chunk number. This can lead to periodic
# fluctuations in the loss; for example, when all examples in the batch have
# chunk number 0 the loss may be higher than midway through a sequence.
# Enabling split_targets_strided_training adjusts the data so that each
# batch includes examples at various points within a sequence.
if is_training and hparams.split_targets_strided_training:
# TODO(kitaev): make sure that shape inference works on GPU, not just TPU.
inferred_batch_size = dataset.output_shapes["targets"].as_list()[0]
if inferred_batch_size is None:
raise ValueError(
"Strided training is only implemented when the batch size can be "
"inferred statically, for example when training on TPU."
)
chunk_stride = inferred_batch_size * max(
1, max_chunks // inferred_batch_size) + 1
def collapse_nested_datasets(example):
"""Converts a dataset of datasets to a dataset of tensor features."""
new_example = {}
for k, v in example.items():
v = tf.data.experimental.get_single_element(
v.batch(inferred_batch_size, drop_remainder=True))
new_example[k] = v
return tf.data.Dataset.from_tensor_slices(new_example)
dataset = dataset.unbatch()
dataset = dataset.window(inferred_batch_size, inferred_batch_size,
chunk_stride)
dataset = dataset.flat_map(collapse_nested_datasets)
dataset = dataset.batch(inferred_batch_size, drop_remainder=True)
def prepare_for_output(example):
if not config or not config.use_tpu:
_summarize_features(example, num_shards)
if mode == tf.estimator.ModeKeys.PREDICT:
example["infer_targets"] = example.pop("targets")
return example
else:
return example, example[hparams.get(
key="labels_feature_name", default="targets")]
dataset = dataset.map(prepare_for_output, num_parallel_calls=num_threads)
dataset = dataset.prefetch(2)
if mode == tf.estimator.ModeKeys.PREDICT:
# This is because of a bug in the Estimator that short-circuits prediction
# if it doesn't see a QueueRunner. DummyQueueRunner implements the
# minimal expected interface but does nothing.
tf.add_to_collection(tf.GraphKeys.QUEUE_RUNNERS, DummyQueueRunner())
return dataset