def status()

in source/forecast-shared/shared/Dataset/dataset_import_job.py [0:0]


    def status(self) -> Status:
        """
        Get the status of this dataset import job
        :return: Status
        """
        previous_imports = self.history()

        # check if the data has not been imported
        if not previous_imports:
            return Status.DOES_NOT_EXIST

        # check if the data is outdated
        last_import_arn = previous_imports[0].get("DatasetImportJobArn")
        previous_status = self.cli.describe_dataset_import_job(
            DatasetImportJobArn=last_import_arn
        )

        # if the data is active, check if it should be updated
        if previous_status.get("Status") == Status.ACTIVE:
            past_etag = self.get_service_tag_for_arn(last_import_arn, "SolutionETag")

            # always re-import data if upgrading from 1.0 (adds the etag tag)
            # always re-import data if upgrading from 1.1 with a multipart etag (large datasets)
            if not past_etag:
                logger.info(
                    "no signature found for this dataset - marking as DOES_NOT_EXIST to trigger import"
                )
                return Status.DOES_NOT_EXIST  # re-import data to

            # always re-import the data if the signature has changed
            if past_etag != self.dataset_file.etag:
                logger.info(
                    "signature found for this dataset, but it does not match the current imported dataset signature - marking as DOES_NOT_EXIST to trigger import"
                )
                return Status.DOES_NOT_EXIST

        self.set_user_tags(resource_arn=last_import_arn)
        return Status[previous_status.get("Status")]