def get_parquet_files()

in convert_csv.py [0:0]


def get_parquet_files(dir_path):
    # Concatenates same csv file types as parquet files within agentsExports folder, under "parquetExports" subdir
    try:
        os.makedirs(target_dir)
    except OSError: # already exists
        pass
    dfs = {}
    # get directory listing we will iterate over
    if filters:
        agent_dirs = [x for x in get_subdirs(dir_path) if x in filters]
    else:
        agent_dirs = get_subdirs(dir_path)

    # build a single parquet file for each export type for each agent
    for agent in agent_dirs:
        if not is_agent_id(agent):
            continue
        # Initialize empty data frames to load in csv files for all export types
        for export_type in EXPORT_TYPES:
            empty_df = sqlContext.createDataFrame(sc.emptyRDD(), EXPORT_TYPES[export_type])
            dfs[export_type] = empty_df

        agent_export_types = [export_type for export_type in get_subdirs(os.path.join(dir_path, agent)) if export_type != "results"]
        for export_type in agent_export_types:
            exports = sorted(os.listdir(os.path.join(dir_path, agent, export_type)))
            date = exports[0][:18] # export files are of the form 2017-11-04T000100Z_<accountNumber>_<type>.csv
            print(str.format("Loading {} exported CSV files of type {} for agent {}, will be labeled with {}", len(exports), export_type, agent, date))
            for export in exports:
                # Remove colons if necessary from filename for compatibility with Spark
                export_file = os.path.join(dir_path, agent, export_type, export)
                if ":" in export_file:
                    new_name = export_file.replace(":", "")
                    os.rename(export_file, new_name)
                    export_file = new_name
                # Get dataframe for non-hidden CSV files
                if '.csv' in export_file:
                    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(EXPORT_TYPES[export_type]).load(export_file)
                    dfs[export_type] = dfs[export_type].unionAll(df)

            # Write the dataframe for each export type to a subdirectory of the target directory
            print(" Converting to parquet...")
            new_name = str.format("{}_{}.parquet", date, agent)
            subfolder_dir = os.path.join(target_dir, export_type + "-" + new_name)
            if os.path.isdir(subfolder_dir):
                shutil.rmtree(subfolder_dir) # an empty target directory is required by spark to write out the dataframe
            #dfs[export_type].toPandas().to_csv(os.path.join(target_dir, export_type + ".csv"))
            dfs[export_type].coalesce(1).write.parquet(subfolder_dir)
            parquet_file = glob.glob(os.path.join(subfolder_dir, "part-*"))[0]
            os.rename(parquet_file, os.path.join(subfolder_dir, new_name))

            # Upload to S3
            print("   Uploading to S3...")
            for parquet_file in os.listdir(subfolder_dir):
                if not parquet_file.startswith('.') and not parquet_file.startswith('_'):
                    s3.upload_file(os.path.join(subfolder_dir, parquet_file), bucket_name, os.path.join(export_type, parquet_file))
            print("    Successful!")