in createForecastDataSetGroup/createForecastDataSetGroup.py [0:0]
def onEventHandler(event, context):
body = json.loads(event['Records'][0]['body'])
message = json.loads(body['Message'])
logger.debug("From SQS: " + json.dumps(message))
s3_object_info=message["Records"][0]["s3"]
sourceBucketName = s3_object_info["bucket"]["name"]
sourceObjectKey = s3_object_info["object"]["key"]
if (not "DatasetGroups" in sourceObjectKey):
logger.debug("s3 object key do not contain DatasetGroups, ignore and skip, objectkey="+sourceObjectKey)
return
s3ObjectUrl = "s3://" + sourceBucketName + "/" + sourceObjectKey
datasetGroupName = sourceObjectKey.split("/")[1]
logger.info("upsert forecast dataset group=" + datasetGroupName)
config=loadconfig(datasetGroupName)
target_schema=config["target_schema"]
related_schema=config["related_schema"]
response = forecast_client.list_datasets()
existingDataSets = response["Datasets"]
# upsert data set
targetDataSetArn = upsertDataSet(existingDataSets, forecast_client, target_schema, datasetGroupName + "_target",
"TARGET_TIME_SERIES")
relatedDataSetArn = upsertDataSet(existingDataSets, forecast_client, related_schema, datasetGroupName + "_related",
"RELATED_TIME_SERIES")
# if dataGroup not exist, create
if (not isExistingDataSetGroup(forecast_client, datasetGroupName)):
response = forecast_client.create_dataset_group(
DatasetGroupName=datasetGroupName,
Domain='CUSTOM',
DatasetArns=[
targetDataSetArn, relatedDataSetArn
]
)
logger.info("triggerred creation of forecast datasetgroup=" + datasetGroupName)
# load history data
if ("target.csv" in s3ObjectUrl):
upsertDataImportJob(forecast_client, targetDataSetArn, s3ObjectUrl)
if ("related.csv" in s3ObjectUrl):
upsertDataImportJob(forecast_client, relatedDataSetArn, s3ObjectUrl)