in source/forecast-shared/shared/Dataset/dataset.py [0:0]
def create(self):
"""
Create the dataset
:return: None
"""
exceptions = []
try:
dataset_info = self.cli.describe_dataset(DatasetArn=self.arn)
if dataset_info.get("DatasetType") != self._dataset_type:
exceptions.append(
f"dataset type ({dataset_info.get('DatasetType')}) does not match expected ({self._dataset_type})"
)
if dataset_info.get("Domain") != self._dataset_domain:
exceptions.append(
f"dataset domain ({dataset_info.get('Domain')}) does not match ({self.dataset_domain})"
)
if dataset_info.get("DataFrequency") != self._data_frequency:
exceptions.append(
f"data frequency ({dataset_info.get('DataFrequency')}) does not match ({self._data_frequency})"
)
if dataset_info.get("Schema") != self._dataset_schema:
exceptions.append("dataset schema does not match")
if exceptions:
raise ValueError("\n".join(exceptions))
except ClientError as ex:
if ex.response["Error"]["Code"] != "ResourceNotFoundException":
raise ex
try:
self.cli.create_dataset(**self._create_params())
except self.cli.exceptions.ResourceAlreadyExistsException:
logger.debug("Dataset %s is already creating" % str(self._dataset_name))
self.set_user_tags(
resource_arn=self.arn,
)