in convert_csv.py [0:0]
def get_parquet_files(dir_path):
# Concatenates same csv file types as parquet files within agentsExports folder, under "parquetExports" subdir
try:
os.makedirs(target_dir)
except OSError: # already exists
pass
dfs = {}
# get directory listing we will iterate over
if filters:
agent_dirs = [x for x in get_subdirs(dir_path) if x in filters]
else:
agent_dirs = get_subdirs(dir_path)
# build a single parquet file for each export type for each agent
for agent in agent_dirs:
if not is_agent_id(agent):
continue
# Initialize empty data frames to load in csv files for all export types
for export_type in EXPORT_TYPES:
empty_df = sqlContext.createDataFrame(sc.emptyRDD(), EXPORT_TYPES[export_type])
dfs[export_type] = empty_df
agent_export_types = [export_type for export_type in get_subdirs(os.path.join(dir_path, agent)) if export_type != "results"]
for export_type in agent_export_types:
exports = sorted(os.listdir(os.path.join(dir_path, agent, export_type)))
date = exports[0][:18] # export files are of the form 2017-11-04T000100Z_<accountNumber>_<type>.csv
print(str.format("Loading {} exported CSV files of type {} for agent {}, will be labeled with {}", len(exports), export_type, agent, date))
for export in exports:
# Remove colons if necessary from filename for compatibility with Spark
export_file = os.path.join(dir_path, agent, export_type, export)
if ":" in export_file:
new_name = export_file.replace(":", "")
os.rename(export_file, new_name)
export_file = new_name
# Get dataframe for non-hidden CSV files
if '.csv' in export_file:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(EXPORT_TYPES[export_type]).load(export_file)
dfs[export_type] = dfs[export_type].unionAll(df)
# Write the dataframe for each export type to a subdirectory of the target directory
print(" Converting to parquet...")
new_name = str.format("{}_{}.parquet", date, agent)
subfolder_dir = os.path.join(target_dir, export_type + "-" + new_name)
if os.path.isdir(subfolder_dir):
shutil.rmtree(subfolder_dir) # an empty target directory is required by spark to write out the dataframe
#dfs[export_type].toPandas().to_csv(os.path.join(target_dir, export_type + ".csv"))
dfs[export_type].coalesce(1).write.parquet(subfolder_dir)
parquet_file = glob.glob(os.path.join(subfolder_dir, "part-*"))[0]
os.rename(parquet_file, os.path.join(subfolder_dir, new_name))
# Upload to S3
print(" Uploading to S3...")
for parquet_file in os.listdir(subfolder_dir):
if not parquet_file.startswith('.') and not parquet_file.startswith('_'):
s3.upload_file(os.path.join(subfolder_dir, parquet_file), bucket_name, os.path.join(export_type, parquet_file))
print(" Successful!")