parquet_flask/io_logic/retrieve_spark_session.py (63 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging from socket import gethostbyname, gethostname from pyspark import SparkConf from pyspark.sql import SparkSession from parquet_flask.io_logic.spark_constants import SparkConstants from parquet_flask.utils.config import Config from parquet_flask.utils.singleton import Singleton LOGGER = logging.getLogger(__name__) class RetrieveSparkSession(metaclass=Singleton): def __init__(self): self.__sparks = {} self.__spark_config = { 'spark.executor.cores': '1', # fixing to 1 core for now 'spark.driver.port': '50243', # a random port. 'spark.jars.packages': 'org.apache.hadoop:hadoop-aws:3.2.0', # crosscheck the version. 'spark.hadoop.fs.s3a.impl': 'org.apache.hadoop.fs.s3a.S3AFileSystem', SparkConstants.CRED_PROVIDER_KEY: SparkConstants.SIMPLE_CRED, # should be overridden 'spark.hadoop.fs.s3a.connection.ssl.enabled': 'true', # old configs. no longer needs to be used # 'spark.driver.extraJavaOptions': '-Dcom.amazonaws.services.s3.enableV4=true', # 'spark.executor.extraJavaOptions': '-Dcom.amazonaws.services.s3.enableV4=true', # 'spark.executor.extraClassPath': '/opt/bitnami/spark/jars/hadoop-aws-3.2.0.jar:/opt/bitnami/spark/jars/aws-java-sdk-bundle-1.11.375.jar', # 'spark.driver.extraClassPath': '/opt/bitnami/spark/jars/hadoop-aws-3.2.0.jar:/opt/bitnami/spark/jars/aws-java-sdk-bundle-1.11.375.jar', # settings needed for history server. (still an experiment) # 'spark.eventLog.enabled': 'true', # 'spark.eventLog.dir': '/tmp/spark-events', # 'spark.eventLog.rolling.enabled': 'true', # 'spark.eventLog.rolling.maxFileSize': '128m', } self.__set_spark_config() def __set_spark_config(self): possible_extra_spark_config = Config().get_value(Config.spark_config_dict, '{}') try: input_dict = json.loads(possible_extra_spark_config) for k, v in input_dict.items(): self.__spark_config[k] = v except: LOGGER.exception(f'Not loading extra config. unable to convert to JSON object. {possible_extra_spark_config}.') return self def __add_aws_cred(self, conf: SparkConf): if SparkConstants.CRED_PROVIDER_KEY not in self.__spark_config: # assume simple raise EnvironmentError(f'missing {SparkConstants.CRED_PROVIDER_KEY} in spark_config. This should not happen') if self.__spark_config[SparkConstants.CRED_PROVIDER_KEY] == SparkConstants.SIMPLE_CRED: conf.set('spark.hadoop.fs.s3a.access.key', Config().get_value(Config.aws_access_key_id, '')) conf.set('spark.hadoop.fs.s3a.secret.key', Config().get_value(Config.aws_secret_access_key, '')) return if self.__spark_config[SparkConstants.CRED_PROVIDER_KEY] == SparkConstants.TEMP_CRED: conf.set('spark.hadoop.fs.s3a.access.key', Config().get_value(Config.aws_access_key_id, '')) conf.set('spark.hadoop.fs.s3a.secret.key', Config().get_value(Config.aws_secret_access_key, '')) conf.set('spark.hadoop.fs.s3a.session.token', Config().get_value(Config.aws_session_token, '')) return LOGGER.info(f'not setting aws cred ENV values as {SparkConstants.CRED_PROVIDER_KEY} = {self.__spark_config[SparkConstants.CRED_PROVIDER_KEY]}') return def retrieve_spark_session(self, app_name, master_spark, ram='1024m') -> SparkSession: session_key = '{}__{}'.format(app_name, master_spark) if session_key in self.__sparks: return self.__sparks[session_key] conf = SparkConf() for k, v in self.__spark_config.items(): conf.set(k, v) """ spark.executor.memory 3072m spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3.S3FileSystem spark.hadoop.fs.s3n.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem """ conf.set('spark.executor.memory', ram) # something local_ip = gethostbyname(gethostname()) LOGGER.debug(f'using IP: {local_ip} for spark.driver.host') conf.set('spark.driver.host', local_ip) self.__add_aws_cred(conf) # conf.set('spark.default.parallelism', '10') # conf.set('spark.hadoop.fs.s3a.endpoint', 's3.us-gov-west-1.amazonaws.com') self.__sparks[session_key] = SparkSession.builder.appName(app_name).config(conf=conf).master(master_spark).getOrCreate() return self.__sparks[session_key] def stop_spark_session(self, app_name, master_spark): session_key = '{}__{}'.format(app_name, master_spark) if session_key in self.__sparks: self.__sparks.pop(session_key).stop() return