in tensorflow_managed_spot_training_checkpointing/source_dir/cifar10_keras_main.py [0:0]
def _input(epochs, batch_size, channel, channel_name):
"""Uses the tf.data input pipeline for CIFAR-10 dataset."""
mode = args.data_config[channel_name]['TrainingInputMode']
logging.info("Running {} in {} mode".format(channel_name, mode))
if mode == 'Pipe':
from sagemaker_tensorflow import PipeModeDataset
dataset = PipeModeDataset(channel=channel_name, record_format='TFRecord')
else:
filenames = _get_filenames(channel_name, channel)
dataset = tf.data.TFRecordDataset(filenames)
# Repeat infinitely.
dataset = dataset.repeat()
dataset = dataset.prefetch(10)
# Parse records.
dataset = dataset.map(_dataset_parser, num_parallel_calls=10)
# Potentially shuffle records.
if channel_name == 'train':
# Ensure that the capacity is sufficiently large to provide good random shuffling.
buffer_size = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN * 0.4) + 3 * batch_size
dataset = dataset.shuffle(buffer_size=buffer_size)
# Batch it up.
dataset = dataset.batch(batch_size, drop_remainder=True)
iterator = tf.compat.v1.data.make_one_shot_iterator(dataset)
image_batch, label_batch = iterator.get_next()
return {INPUT_TENSOR_NAME: image_batch}, label_batch