def export()

in source/forecast-shared/shared/Forecast/forecast.py [0:0]


    def export(self, dataset_file: DatasetFile) -> Export:
        """
        Export/ check on an export of this Forecast
        :param dataset_file: The dataset file last updated that generated this export
        :return: Status
        """
        if not self.arn:
            raise ValueError("Forecast does not yet exist - cannot perform export.")

        if self.uses_auto_predictor():
            export_name = f"export_ap_{self._dataset_group.dataset_group_name}_{self._latest_timestamp}"
        else:
            export_name = f"export_{self._dataset_group.dataset_group_name}_{self._latest_timestamp}"
        export_arn = (
            self.arn.replace(":forecast/", ":forecast-export-job/") + f"/{export_name}"
        )

        past_export = Export()
        try:
            past_status = self.cli.describe_forecast_export_job(
                ForecastExportJobArn=export_arn
            )
            past_export.status = Status[past_status.get("Status")]
        except self.cli.exceptions.ResourceInUseException as excinfo:
            logger.debug(
                "Forecast export %s is updating: %s" % (export_name, str(excinfo))
            )
        except self.cli.exceptions.ResourceNotFoundException:
            logger.info("Creating forecast export %s" % export_name)
            self.cli.create_forecast_export_job(
                ForecastArn=self.arn,
                ForecastExportJobName=export_name,
                Destination={
                    "S3Config": {
                        "Path": f"s3://{dataset_file.bucket}/exports/{export_name}",
                        "RoleArn": environ.get("FORECAST_ROLE"),
                    }
                },
            )
            past_export.status = Status.CREATE_PENDING

        logger.debug(
            "Export status for %s is %s" % (export_name, str(past_export.status))
        )
        self.set_user_tags(resource_arn=export_arn)
        return past_export