parquet_flask/io_logic/cdms_schema.py (145 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from pyspark.sql.types import StructType, StructField, DoubleType, StringType, MapType, LongType, TimestampType, \ IntegerType class CdmsSchema: ALL_SCHEMA = StructType([ StructField('depth', DoubleType(), True), StructField('latitude', DoubleType(), True), StructField('longitude', DoubleType(), True), StructField('meta', StringType(), True), StructField('platform', MapType(StringType(), StringType()), True), StructField('time', StringType(), True), StructField('time_obj', TimestampType(), True), StructField('provider', StringType(), True), StructField('project', StringType(), True), StructField('platform_code', IntegerType(), True), StructField('year', IntegerType(), True), StructField('month', IntegerType(), True), StructField('job_id', StringType(), True), StructField('air_pressure', DoubleType(), True), StructField('air_pressure_quality', LongType(), True), StructField('air_temperature', DoubleType(), True), StructField('air_temperature_quality', LongType(), True), StructField('dew_point_temperature', DoubleType(), True), StructField('dew_point_temperature_quality', LongType(), True), StructField('downwelling_longwave_flux_in_air', DoubleType(), True), StructField('downwelling_longwave_flux_in_air_quality', LongType(), True), StructField('downwelling_longwave_radiance_in_air', DoubleType(), True), StructField('downwelling_longwave_radiance_in_air_quality', LongType(), True), StructField('downwelling_shortwave_flux_in_air', DoubleType(), True), StructField('downwelling_shortwave_flux_in_air_quality', LongType(), True), StructField('mass_concentration_of_chlorophyll_in_sea_water', DoubleType(), True), StructField('mass_concentration_of_chlorophyll_in_sea_water_quality', LongType(), True), StructField('rainfall_rate', DoubleType(), True), StructField('rainfall_rate_quality', LongType(), True), StructField('relative_humidity', DoubleType(), True), StructField('relative_humidity_quality', LongType(), True), StructField('sea_surface_salinity', DoubleType(), True), StructField('sea_surface_salinity_quality', LongType(), True), StructField('sea_surface_skin_temperature', DoubleType(), True), StructField('sea_surface_skin_temperature_quality', LongType(), True), StructField('sea_surface_subskin_temperature', DoubleType(), True), StructField('sea_surface_subskin_temperature_quality', LongType(), True), StructField('sea_surface_temperature', DoubleType(), True), StructField('sea_surface_temperature_quality', LongType(), True), StructField('sea_water_density', DoubleType(), True), StructField('sea_water_density_quality', LongType(), True), StructField('sea_water_electrical_conductivity', DoubleType(), True), StructField('sea_water_electrical_conductivity_quality', LongType(), True), StructField('sea_water_practical_salinity', DoubleType(), True), StructField('sea_water_practical_salinity_quality', LongType(), True), StructField('sea_water_salinity', DoubleType(), True), StructField('sea_water_salinity_quality', LongType(), True), StructField('sea_water_temperature', DoubleType(), True), StructField('sea_water_temperature_quality', LongType(), True), StructField('surface_downwelling_photosynthetic_photon_flux_in_air', DoubleType(), True), StructField('surface_downwelling_photosynthetic_photon_flux_in_air_quality', LongType(), True), StructField('wet_bulb_temperature', DoubleType(), True), StructField('wet_bulb_temperature_quality', LongType(), True), StructField('wind_speed', DoubleType(), True), StructField('wind_speed_quality', LongType(), True), StructField('wind_from_direction', DoubleType(), True), StructField('wind_from_direction_quality', LongType(), True), StructField('wind_to_direction', DoubleType(), True), StructField('wind_to_direction_quality', LongType(), True), StructField('eastward_wind', DoubleType(), True), StructField('northward_wind', DoubleType(), True), StructField('wind_component_quality', LongType(), True), StructField('device', StringType(), True), ]) def __get_json_datatype(self, datetype_name: str, datatype_def: dict): if 'type' in datatype_def: temp_type = datatype_def['type'] if isinstance(temp_type, str): return temp_type if isinstance(temp_type, list): return temp_type[0] raise ValueError(f'unknown datatype: {datetype_name}: {datatype_def}') if datetype_name.endswith('_quality'): return 'long' if datetype_name == 'platform': # special case return 'platform' raise ValueError(f'unknown datatype: {datetype_name}: {datatype_def}') def __init__(self): self.__json_to_pandas_data_type = { 'number': 'double', 'long': 'int64', 'string': 'object', 'platform': 'object', } self.__json_to_spark_data_types = { 'number': DoubleType(), 'long': LongType(), 'string': StringType(), 'platform': MapType(StringType(), StringType()), } self.__default_columns = [ StructField('time_obj', TimestampType(), True), StructField('provider', StringType(), True), StructField('project', StringType(), True), StructField('platform_code', IntegerType(), True), StructField('year', IntegerType(), True), StructField('month', IntegerType(), True), StructField('job_id', StringType(), True), ] self.__non_observation_columns = [ 'time_obj', 'time', 'provider', 'project', 'platform_code', 'platform', 'year', 'month', 'job_id', 'device', 'latitude', 'longitude', 'depth', ] def __get_pandas_type(self, json_type: str): if json_type not in self.__json_to_pandas_data_type: raise ValueError(f'unknown json type. cannot convert to pandas type: {json_type}') return self.__json_to_pandas_data_type[json_type] def __get_spark_type(self, json_type: str): if json_type not in self.__json_to_spark_data_types: raise ValueError(f'unknown json type. cannot convert to spark type: {json_type}') return self.__json_to_spark_data_types[json_type] def __get_obs_defs(self, in_situ_schema: dict): if 'definitions' not in in_situ_schema: raise ValueError(f'missing definitions in in_situ_schema: {in_situ_schema}') base_defs = in_situ_schema['definitions'] if 'observation' not in base_defs: raise ValueError(f'missing observation in in_situ_schema["definitions"]: {base_defs}') obs_defs = base_defs['observation'] if 'properties' not in obs_defs: raise ValueError(f'missing properties in in_situ_schema["definitions"]["observation"]: {obs_defs}') return obs_defs['properties'] def get_observation_names(self, in_situ_schema: dict): obs_names = [k for k in self.__get_obs_defs(in_situ_schema).keys() if k not in self.__non_observation_columns and not k.endswith('_quality')] return obs_names def get_schema_from_json(self, in_situ_schema: dict): dynamic_columns = [StructField(k, self.__get_spark_type(self.__get_json_datatype(k, v)), True) for k, v in self.__get_obs_defs(in_situ_schema).items()] return StructType(dynamic_columns + self.__default_columns) def get_pandas_schema_from_json(self, in_situ_schema: dict): dynamic_columns = {k: self.__get_pandas_type(self.__get_json_datatype(k, v)) for k, v in self.__get_obs_defs(in_situ_schema).items()} return dynamic_columns