petastorm/tools/spark_session_cli.py (28 lines of code) (raw):

# Copyright (c) 2017-2018 Uber Technologies, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """This module contains a set of utils that enables uniform interface for all command line tools that end up creating spark session objects""" def configure_spark(spark_session_builder, args): """Applies configuration to a ``SparkSession.Builder`` object. Call :func:`add_configure_spark_arguments` to add command line arguments to the argparser object. This function returns the ``SparkSession.Builder`` to allow chaining additional calls to the ``Builder``. >>> from pyspark.sql import SparkSession >>> >>> arg_parser = argparse.ArgumentParser() >>> add_configure_spark_arguments(arg_parser) >>> # ... more argparse arguments >>> args = arg_parser.parse_args() >>> spark = configure_spark(SparkSession.builder.appName('petastorm-copy'), args).getOrCreate() :param spark_session_builder: An instance of the ``pyspark.sql.session.SparkSession.Builder`` object. :param args: A value returned by ``argparser.ArgumentParser.parse_args()`` call. :return: ``SparkSession.Builder`` object. """ if 'spark_session_config' not in args or 'master' not in args: raise RuntimeError('--spark-session-config and/or --master were not found in parsed arguments. ' 'Call add_configure_spark_arguments() to add them.') spark_session_config = _cli_spark_session_config_to_dict(args.spark_session_config) for key, value in spark_session_config.items(): spark_session_builder.config(key, value) if args.master: spark_session_builder.master(args.master) return spark_session_builder def add_configure_spark_arguments(argparser): """Adds a set of arguments that are needed for spark session configuration. >>> from pyspark.sql import SparkSession >>> >>> arg_parser = argparse.ArgumentParser() >>> add_configure_spark_arguments(arg_parser) >>> # ... more argparse arguments >>> args = arg_parser.parse_args() >>> spark = configure_spark(SparkSession.builder.appName('petastorm-copy'), args).getOrCreate() :param argparser: An instance of ``argparse.ArgumentParser`` object :return: None """ argparser.add_argument('--master', type=str, help='Spark master. Default if not specified. To run on a local machine, specify ' '"local[W]" (where W is the number of local spark workers, e.g. local[10])') argparser.add_argument('--spark-session-config', type=str, nargs='+', help='A list of "=" separated key-value pairs used to configure SparkSession object. ' 'For example: --spark-session-config spark.executor.cores=2 spark.executor.memory=10g') def _cli_spark_session_config_to_dict(spark_session_config): config_dict = dict() if not spark_session_config: return config_dict for config_pair in spark_session_config: key_value_split = config_pair.split('=') if len(key_value_split) != 2: raise ValueError('Elements of spark_session_config list are expected to be in key=value format. Got: %s', config_pair) config_dict[key_value_split[0]] = key_value_split[1] return config_dict