in 06_preprocessing/jpeg_to_tfrecord_tft.py [0:0]
def run_main(arguments):
global IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS, LABELS
JOBNAME = (
'preprocess-images-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S'))
PROJECT = arguments['project_id']
OUTPUT_DIR = arguments['output_dir']
# set RUNNER using command-line arg or based on output_dir path
on_cloud = OUTPUT_DIR.startswith('gs://')
if arguments['runner']:
RUNNER = arguments['runner']
on_cloud = (RUNNER == 'DataflowRunner')
else:
RUNNER = 'DataflowRunner' if on_cloud else 'DirectRunner'
# clean-up output directory since Beam will name files 0000-of-0004 etc.
# and this could cause confusion if earlier run has 0000-of-0005, for eg
if on_cloud:
try:
subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
except subprocess.CalledProcessError:
pass
else:
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.makedirs(OUTPUT_DIR)
# tf.config.run_functions_eagerly(not on_cloud)
# read list of labels
with tf.io.gfile.GFile(arguments['labels_file'], 'r') as f:
LABELS = [line.rstrip() for line in f]
print('Read in {} labels, from {} to {}'.format(
len(LABELS), LABELS[0], LABELS[-1]))
if len(LABELS) < 2:
print('Require at least two labels')
sys.exit(-1)
# resize the input images
ht, wd = arguments['resize'].split(',')
IMG_HEIGHT = int(ht)
IMG_WIDTH = int(wd)
print("Will resize input images to {}x{}".format(IMG_HEIGHT, IMG_WIDTH))
# make it repeatable
np.random.seed(10)
# set up Beam pipeline to convert images to TF Records
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
'job_name': JOBNAME,
'project': PROJECT,
'max_num_workers': 20, # autoscale up to 20
'region': arguments['region'],
'teardown_policy': 'TEARDOWN_ALWAYS',
'save_main_session': True,
'requirements_file': 'requirements.txt'
}
opts = beam.pipeline.PipelineOptions(flags=[], **options)
RAW_DATA_SCHEMA = tft.tf_metadata.dataset_schema.schema_utils.schema_from_feature_spec({
'filename': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.string),
})
IMG_BYTES_METADATA = tft.tf_metadata.dataset_metadata.DatasetMetadata(
tft.tf_metadata.dataset_schema.schema_utils.schema_from_feature_spec({
'img_bytes': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.string),
'label_int': tf.io.FixedLenFeature([], tf.int64)
})
)
csv_tfxio = tfxio.CsvTFXIO(file_pattern=arguments['all_data'],
column_names=['filename', 'label'],
schema=RAW_DATA_SCHEMA,
telemetry_descriptors=['standalone_tft'])
with beam.Pipeline(RUNNER, options=opts) as p:
with tft_beam.Context(temp_dir=os.path.join(OUTPUT_DIR, 'tmp', 'beam_context')):
img_records = (p
| 'read_csv' >> csv_tfxio.BeamSource(batch_size=1)
| 'img_record' >> beam.Map(
lambda x: create_input_record(x[0], x[1])))
# tf.transform preprocessing
# note that our preprocessing is simply to resize the images
# so there is no need to be careful to run analysis only on training data
# Ideally, we could have done csv_tfxio.TensorAdapterConfig()
# but here, we are processing bytes, not the filenames we read from CSV
raw_dataset = (img_records, IMG_BYTES_METADATA)
transformed_dataset, transform_fn = (
raw_dataset | 'tft_img' >> tft_beam.AnalyzeAndTransformDataset(tft_preprocess)
)
transformed_data, transformed_metadata = transformed_dataset
transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema)
# write the cropped images
splits = (transformed_data
| 'create_tfr' >> beam.Map(transformed_data_coder.encode)
| 'assign_ds' >> beam.Map(assign_record_to_split)
)
for split in ['train', 'valid', 'test']:
write_records(OUTPUT_DIR, splits, split)
# make sure to write out a SavedModel with the tf transforms that were carried out
_ = (
transform_fn | 'write_tft' >> tft_beam.WriteTransformFn(
os.path.join(OUTPUT_DIR, 'tft'))
)
if on_cloud:
print("Submitting {} job: {}".format(RUNNER, JOBNAME))
print("Monitor at https://console.cloud.google.com/dataflow/jobs")
else:
print("Running on DirectRunner. Please hold on ...")