def run()

in athena_glue_service_logs/converter.py [0:0]


    def run(self):
        """Extract data from the data catalog and convert it to parquet, partitioning it along the way"""
        from awsglue.transforms import DropNullFields

        # Retrieve the source data from the Glue catalog
        source_data = self.glue_context.create_dynamic_frame.from_catalog(
            database=self.data_catalog.get_database_name(),
            table_name=self.data_catalog.get_table_name(),
            transformation_ctx="source_data"
        )

        # Perform any data-source-specific conversions
        optimized_transforms = self.optimized_catalog.conversion_actions(source_data)

        # Remove nulls and convert to dataframe - dataframe is only needed for replacing the date partitions.
        # It was previously used to repartition, but Glue supports that now.
        drop_nulls = DropNullFields.apply(frame=optimized_transforms, transformation_ctx="drop_nulls")
        data_frame = drop_nulls.toDF()

        # We might have no data - if that's the case, short-circuit
        if not data_frame.head(1):
            LOGGER.info("No data returned, skipping conversion.")
            return

        # Create Y-m-d partitions out of the optimized table's timestamp field
        df_partitions = self._replace_date_partitions(data_frame, self.data_catalog.timestamp_field())

        # Write out to partitioned parquet. We repartition to reduce the number of files to optimize Athena performance.
        # Athena queries will slow down even at 1,000 files, so we tradeoff having large files per partition rather
        # than many small files.
        (
            df_partitions
            .repartition(*self._partition_columns())
            .write
            .mode('append')
            .partitionBy(*self._partition_columns())
            .parquet(self.optimized_catalog.get_s3_location())
        )