blogs/finspace_redshift-2021-09/finspace_spark.py [8:105]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class SparkFinSpace(FinSpace):
    import pyspark
    def __init__(
        self, 
        spark: pyspark.sql.session.SparkSession = None,
        config = Config(retries = {'max_attempts': 0, 'mode': 'standard'}),
        dev_overrides: dict = None
    ):
        FinSpace.__init__(self, config=config, dev_overrides=dev_overrides)
        self.spark = spark # used on Spark cluster for reading views, creating changesets from DataFrames
        
    def upload_dataframe(self, data_frame: pyspark.sql.dataframe.DataFrame):
        resp = self.client.get_user_ingestion_info()
        upload_location = resp['ingestionPath']
#        data_frame.write.option('header', 'true').csv(upload_location)
        data_frame.write.parquet(upload_location)
        return upload_location
    
    def ingest_dataframe(self, data_frame: pyspark.sql.dataframe.DataFrame, dataset_id: str, change_type: str, wait_for_completion=True):
        print("Uploading data...")
        upload_location = self.upload_dataframe(data_frame)
        
        print("Data upload finished. Ingesting data...")
        
        return self.ingest_from_s3(upload_location, dataset_id, change_type, wait_for_completion, format_type='parquet', format_params={})
    
    def read_view_as_spark(
        self,
        dataset_id: str,
        view_id: str
        ):
        # TODO: switch to DescribeMatz when available in HFS
        views = self.list_views(dataset_id=dataset_id, max_results=50)
        filtered = [v for v in views if v['id'] == view_id]

        if len(filtered) == 0:
            raise Exception('No such view found')
        if len(filtered) > 1:
            raise Exception('Internal Server error')
        view = filtered[0]
        
        # 0. Ensure view is ready to be read
        if (view['status'] != 'SUCCESS'): 
            status = view['status'] 
            print(f'view run status is not ready: {status}. Returning empty.')
            return

        glue_db_name = view['destinationTypeProperties']['databaseName']
        glue_table_name = view['destinationTypeProperties']['tableName']
        
        # Query Glue table directly with catalog function of spark
        return self.spark.table(f"`{glue_db_name}`.`{glue_table_name}`")
    
    def get_schema_from_spark(self, data_frame: pyspark.sql.dataframe.DataFrame):
        from pyspark.sql.types import StructType

        # for translation to FinSpace's schema
        # 'STRING'|'CHAR'|'INTEGER'|'TINYINT'|'SMALLINT'|'BIGINT'|'FLOAT'|'DOUBLE'|'DATE'|'DATETIME'|'BOOLEAN'|'BINARY'
        DoubleType    = "DOUBLE"
        FloatType     = "FLOAT"
        DateType      = "DATE"
        StringType    = "STRING"
        IntegerType   = "INTEGER"
        LongType      = "BIGINT"
        BooleanType   = "BOOLEAN"
        TimestampType = "DATETIME"
        
        hab_columns = []

        items = [i for i in data_frame.schema] 

        switcher = {
            "BinaryType"    : StringType,
            "BooleanType"   : BooleanType,
            "ByteType"      : IntegerType,
            "DateType"      : DateType,
            "DoubleType"    : FloatType,
            "IntegerType"   : IntegerType,
            "LongType"      : IntegerType,
            "NullType"      : StringType,
            "ShortType"     : IntegerType,
            "StringType"    : StringType,
            "TimestampType" : TimestampType,
        }

        
        for i in items:
#            print( f"name: {i.name} type: {i.dataType}" )

            habType = switcher.get( str(i.dataType), StringType)

            hab_columns.append({
                "dataType"    : habType, 
                "name"        : i.name,
                "description" : ""
            })

        return( hab_columns )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



notebooks/Utilities/finspace_spark.py [8:105]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
class SparkFinSpace(FinSpace):
    import pyspark
    def __init__(
        self, 
        spark: pyspark.sql.session.SparkSession = None,
        config = Config(retries = {'max_attempts': 0, 'mode': 'standard'}),
        dev_overrides: dict = None
    ):
        FinSpace.__init__(self, config=config, dev_overrides=dev_overrides)
        self.spark = spark # used on Spark cluster for reading views, creating changesets from DataFrames
        
    def upload_dataframe(self, data_frame: pyspark.sql.dataframe.DataFrame):
        resp = self.client.get_user_ingestion_info()
        upload_location = resp['ingestionPath']
#        data_frame.write.option('header', 'true').csv(upload_location)
        data_frame.write.parquet(upload_location)
        return upload_location
    
    def ingest_dataframe(self, data_frame: pyspark.sql.dataframe.DataFrame, dataset_id: str, change_type: str, wait_for_completion=True):
        print("Uploading data...")
        upload_location = self.upload_dataframe(data_frame)
        
        print("Data upload finished. Ingesting data...")
        
        return self.ingest_from_s3(upload_location, dataset_id, change_type, wait_for_completion, format_type='parquet', format_params={})
    
    def read_view_as_spark(
        self,
        dataset_id: str,
        view_id: str
        ):
        # TODO: switch to DescribeMatz when available in HFS
        views = self.list_views(dataset_id=dataset_id, max_results=50)
        filtered = [v for v in views if v['id'] == view_id]

        if len(filtered) == 0:
            raise Exception('No such view found')
        if len(filtered) > 1:
            raise Exception('Internal Server error')
        view = filtered[0]
        
        # 0. Ensure view is ready to be read
        if (view['status'] != 'SUCCESS'): 
            status = view['status'] 
            print(f'view run status is not ready: {status}. Returning empty.')
            return

        glue_db_name = view['destinationTypeProperties']['databaseName']
        glue_table_name = view['destinationTypeProperties']['tableName']
        
        # Query Glue table directly with catalog function of spark
        return self.spark.table(f"`{glue_db_name}`.`{glue_table_name}`")
    
    def get_schema_from_spark(self, data_frame: pyspark.sql.dataframe.DataFrame):
        from pyspark.sql.types import StructType

        # for translation to FinSpace's schema
        # 'STRING'|'CHAR'|'INTEGER'|'TINYINT'|'SMALLINT'|'BIGINT'|'FLOAT'|'DOUBLE'|'DATE'|'DATETIME'|'BOOLEAN'|'BINARY'
        DoubleType    = "DOUBLE"
        FloatType     = "FLOAT"
        DateType      = "DATE"
        StringType    = "STRING"
        IntegerType   = "INTEGER"
        LongType      = "BIGINT"
        BooleanType   = "BOOLEAN"
        TimestampType = "DATETIME"
        
        hab_columns = []

        items = [i for i in data_frame.schema] 

        switcher = {
            "BinaryType"    : StringType,
            "BooleanType"   : BooleanType,
            "ByteType"      : IntegerType,
            "DateType"      : DateType,
            "DoubleType"    : FloatType,
            "IntegerType"   : IntegerType,
            "LongType"      : IntegerType,
            "NullType"      : StringType,
            "ShortType"     : IntegerType,
            "StringType"    : StringType,
            "TimestampType" : TimestampType,
        }

        
        for i in items:
#            print( f"name: {i.name} type: {i.dataType}" )

            habType = switcher.get( str(i.dataType), StringType)

            hab_columns.append({
                "dataType"    : habType, 
                "name"        : i.name,
                "description" : ""
            })

        return( hab_columns )
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



