examples/imagenet/generate_petastorm_imagenet.py (67 lines of code) (raw):
# Copyright (c) 2017-2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This utility converts a directory with files from the Imagenet dataset (http://image-net.org/) into a
petastorm dataset (Parquet format).
The script can run locally (use '--master=local[*]' command line argument), or submitted to a spark cluster.
Schema defined in examples.imagenet.schema.ImagenetSchema will be used. The schema
NOTE: Imagenet dataset needs to be requested and downloaded separately by the user.
"""
from __future__ import division
import argparse
import glob
import json
import os
import cv2
from pyspark.sql import SparkSession
from six.moves.urllib.request import urlopen # pylint: disable=import-error
from examples.imagenet.schema import ImagenetSchema
from petastorm.etl.dataset_metadata import materialize_dataset
from petastorm.unischema import dict_to_spark_row
def _arg_parser():
parser = argparse.ArgumentParser(description=__doc__, add_help=False,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i', '--input-path', type=str, required=True,
help='Path to the imagenet directory. If you are running this script on a Spark cluster, '
'you should have this file be mounted and accessible to executors.')
parser.add_argument('-o', '--output-url', type=str, required=True,
help='hdfs://... or file:/// url where the parquet dataset will be written to.')
parser.add_argument('-m', '--master', type=str, required=False, default=None,
help='Spark master. Use --master=local[*] to run locally.')
return parser
def download_nouns_mapping():
"""Downloads a mapping between noun id (``nXXXXXXXX`` form) and the noun string representation.
:return: A dictionary: ``{noun_id : text}``
"""
NOUN_MAP_URL = 'https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json'
request = urlopen(NOUN_MAP_URL)
class_map_json = request.read()
# raw_dict has the form of {id: [noun_id, text]}. We flatten it into {noun_id: text}
raw_dict = json.loads(class_map_json.decode("utf-8"))
nouns_map = {k: v for k, v in raw_dict.values()}
return nouns_map
def imagenet_directory_to_petastorm_dataset(imagenet_path, output_url, spark_master=None, parquet_files_count=100,
noun_id_to_text=None):
"""Converts a directory with imagenet data into a petastorm dataset.
Expected directory format is:
>>> nXXXXXXXX/
>>> *.JPEG
>>> nZZZZZZZZ/
>>> *.JPEG
:param imagenet_path: a path to the directory containing ``n*/`` subdirectories. If you are running this script on
a Spark cluster, you should have this file be mounted and accessible to executors.
:param output_url: the location where your dataset will be written to. Should be a url: either
``file://...`` or ``hdfs://...``
:param spark_master: A master parameter used by spark session builder. Use default value (``None``) to use system
environment configured spark cluster. Use ``local[*]`` to run on a local box.
:param noun_id_to_text: A dictionary: ``{noun_id : text}``. If ``None``, this function will download the dictionary
from the Internet.
:return: ``None``
"""
session_builder = SparkSession \
.builder \
.appName('Imagenet Dataset Creation') \
.config('spark.executor.memory', '10g') \
.config('spark.driver.memory', '10g') # Increase the memory if running locally with high number of executors
if spark_master:
session_builder.master(spark_master)
spark = session_builder.getOrCreate()
sc = spark.sparkContext
# Get a list of noun_ids
noun_ids = os.listdir(imagenet_path)
if not all(noun_id.startswith('n') for noun_id in noun_ids):
raise RuntimeError('Directory {} expected to contain only subdirectories with name '
'starting with "n".'.format(imagenet_path))
if not noun_id_to_text:
noun_id_to_text = download_nouns_mapping()
ROWGROUP_SIZE_MB = 256
with materialize_dataset(spark, output_url, ImagenetSchema, ROWGROUP_SIZE_MB):
# list of [(nXXXX, 'noun-text'), ...]
noun_id_text_list = map(lambda noun_id: (noun_id, noun_id_to_text[noun_id]), noun_ids)
# rdd of [(nXXXX, 'noun-text', path), ...]
noun_id_text_image_path_rdd = sc.parallelize(noun_id_text_list, min(len(noun_ids) / 10 + 1, 10000)) \
.flatMap(lambda word_id_label: [word_id_label + (image_path,) for image_path in
glob.glob(os.path.join(imagenet_path, word_id_label[0], '*.JPEG'))])
# rdd of [(nXXXX, 'noun-text', image), ...]
noun_id_text_image_rdd = noun_id_text_image_path_rdd \
.map(lambda id_word_image_path:
{ImagenetSchema.noun_id.name: id_word_image_path[0],
ImagenetSchema.text.name: id_word_image_path[1],
ImagenetSchema.image.name: cv2.imread(id_word_image_path[2])})
# Convert to pyspark.sql.Row
sql_rows_rdd = noun_id_text_image_rdd.map(lambda r: dict_to_spark_row(ImagenetSchema, r))
# Write out the result
spark.createDataFrame(sql_rows_rdd, ImagenetSchema.as_spark_schema()) \
.coalesce(parquet_files_count) \
.write \
.mode('overwrite') \
.option('compression', 'none') \
.parquet(output_url)
if __name__ == '__main__':
args = _arg_parser().parse_args()
imagenet_directory_to_petastorm_dataset(args.input_path, args.output_url)