in athena_glue_service_logs/converter.py [0:0]
def run(self):
"""Extract data from the data catalog and convert it to parquet, partitioning it along the way"""
from awsglue.transforms import DropNullFields
# Retrieve the source data from the Glue catalog
source_data = self.glue_context.create_dynamic_frame.from_catalog(
database=self.data_catalog.get_database_name(),
table_name=self.data_catalog.get_table_name(),
transformation_ctx="source_data"
)
# Perform any data-source-specific conversions
optimized_transforms = self.optimized_catalog.conversion_actions(source_data)
# Remove nulls and convert to dataframe - dataframe is only needed for replacing the date partitions.
# It was previously used to repartition, but Glue supports that now.
drop_nulls = DropNullFields.apply(frame=optimized_transforms, transformation_ctx="drop_nulls")
data_frame = drop_nulls.toDF()
# We might have no data - if that's the case, short-circuit
if not data_frame.head(1):
LOGGER.info("No data returned, skipping conversion.")
return
# Create Y-m-d partitions out of the optimized table's timestamp field
df_partitions = self._replace_date_partitions(data_frame, self.data_catalog.timestamp_field())
# Write out to partitioned parquet. We repartition to reduce the number of files to optimize Athena performance.
# Athena queries will slow down even at 1,000 files, so we tradeoff having large files per partition rather
# than many small files.
(
df_partitions
.repartition(*self._partition_columns())
.write
.mode('append')
.partitionBy(*self._partition_columns())
.parquet(self.optimized_catalog.get_s3_location())
)