in ml_ops/visualization_blog/lambdas/updateresources/update.py [0:0]
def transform(s3_object, bucket, key, params, page):
# Transform forecast output into input format
csv_buffer = StringIO()
schema = SCHEMAS_DEF[params['Datasets'][0]['Domain']]
fieldnames = [
attr['AttributeName']
for attr in params['Datasets'][0]['Schema']['Attributes']
]
out = csv.DictWriter(csv_buffer, fieldnames=fieldnames + ['type'])
out.writeheader()
stream = codecs.getreader('utf-8')(s3_object.get()['Body'])
datetimestr = '%Y-%m-%d' if params['TimestampFormat'
] == 'yyyy-MM-dd' else '%Y-%m-%d %H:%M:%S'
for row in csv.DictReader(stream):
row[schema['date']
] = datetime.strptime(row.pop('date'),
'%Y-%m-%dT%H:%M:%SZ').strftime(datetimestr)
for forecast_type in params['Forecast']['ForecastTypes']:
row['type'] = 'p{:.0f}'.format(float(forecast_type) * 100)
row[schema['metric']] = row.pop(get_type_string(forecast_type))
out.writerow(
{field: row[field]
for field in schema['fields'] + ['type']}
)
if page == 0:
for entry in get_readings(params, bucket):
out.writerow(entry)
S3_CLI.put_object(Body=csv_buffer.getvalue(), Bucket=bucket, Key=key)