in ml_ops/visualization_blog/lambdas/updateresources/update.py [0:0]
def get_readings(params, bucket):
def create_table(table_name, attributes, input_path, delimiter=','):
for attribute in attributes:
if attribute['AttributeName'] == 'timestamp':
attribute['AttributeType'] = 'string'
properties = ', '.join(
[
'{} {}'.format(field['AttributeName'], field['AttributeType'])
for field in attributes
]
)
# Update table schema if it exists
cursor.execute('DROP TABLE IF EXISTS {};'.format(table_name))
cursor.execute(
'''
CREATE EXTERNAL TABLE IF NOT EXISTS {table} ({properties})
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = ',',
'field.delim' = '{delimiter}')
LOCATION '{input_path}'
TBLPROPERTIES ('has_encrypted_data'='false','skip.header.line.count'='1');
'''.format(
table=table_name,
properties=properties,
delimiter=delimiter,
input_path=input_path
)
)
cursor = connect(
s3_staging_dir='s3://{}/stage/ '.format(bucket),
region_name=environ['AWS_REGION'],
work_group=environ['ATHENA_WORKGROUP']
).cursor()
identifier = SCHEMAS_DEF[params['DatasetGroup']['Domain']]['identifier']
datetimestr = '%Y-%m-%d' if params['TimestampFormat'
] == 'yyyy-MM-dd' else '%Y-%m-%d %H:%i:%s'
create_table(
table_name='train',
attributes=params['Datasets'][0]['Schema']['Attributes'],
input_path='s3://{}/train/'.format(bucket)
)
create_table(
table_name='forecast',
attributes=params['Datasets'][0]['Schema']['Attributes'] +
[{
'AttributeName': 'type',
'AttributeType': 'string'
}],
input_path='s3://{}/quicksight/'.format(bucket)
)
cursor.execute(
'''
select * from (
select {identifier}, max(date_parse(timestamp, '{date_format}')) as date from train
group by {identifier}
) as x
INNER JOIN train AS t
ON t.{identifier} = x.{identifier}
AND x.date = date_parse(t.timestamp, '{date_format}')
'''.format(identifier=identifier, date_format=datetimestr)
)
attributes = params['Datasets'][0]['Schema']['Attributes']
for row in cursor:
query_field = list(row[-len(attributes):])
for forecast_type in params['Forecast']['ForecastTypes']:
yield {
**{
attributes[i]['AttributeName']: query_field[i]
for i in range(len(attributes))
},
**{
'type': get_type_string(forecast_type)
}
}