def get_readings()

in ml_ops/visualization_blog/lambdas/updateresources/update.py [0:0]


def get_readings(params, bucket):
    def create_table(table_name, attributes, input_path, delimiter=','):
        for attribute in attributes:
            if attribute['AttributeName'] == 'timestamp':
                attribute['AttributeType'] = 'string'
        properties = ', '.join(
            [
                '{} {}'.format(field['AttributeName'], field['AttributeType'])
                for field in attributes
            ]
        )

        # Update table schema if it exists
        cursor.execute('DROP TABLE IF EXISTS {};'.format(table_name))
        cursor.execute(
            '''
            CREATE EXTERNAL TABLE IF NOT EXISTS {table} ({properties})
            ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
            WITH SERDEPROPERTIES (
                'serialization.format' = ',',
                'field.delim' = '{delimiter}')
            LOCATION '{input_path}'
            TBLPROPERTIES ('has_encrypted_data'='false','skip.header.line.count'='1');
            '''.format(
                table=table_name,
                properties=properties,
                delimiter=delimiter,
                input_path=input_path
            )
        )

    cursor = connect(
        s3_staging_dir='s3://{}/stage/ '.format(bucket),
        region_name=environ['AWS_REGION'],
        work_group=environ['ATHENA_WORKGROUP']
    ).cursor()
    identifier = SCHEMAS_DEF[params['DatasetGroup']['Domain']]['identifier']
    datetimestr = '%Y-%m-%d' if params['TimestampFormat'
                                      ] == 'yyyy-MM-dd' else '%Y-%m-%d %H:%i:%s'

    create_table(
        table_name='train',
        attributes=params['Datasets'][0]['Schema']['Attributes'],
        input_path='s3://{}/train/'.format(bucket)
    )

    create_table(
        table_name='forecast',
        attributes=params['Datasets'][0]['Schema']['Attributes'] +
        [{
            'AttributeName': 'type',
            'AttributeType': 'string'
        }],
        input_path='s3://{}/quicksight/'.format(bucket)
    )

    cursor.execute(
        '''
            select * from (
                select {identifier}, max(date_parse(timestamp, '{date_format}')) as date from train
                group by {identifier}
            ) as x
            INNER JOIN train AS t
            ON t.{identifier} = x.{identifier}
            AND x.date = date_parse(t.timestamp, '{date_format}')
        '''.format(identifier=identifier, date_format=datetimestr)
    )

    attributes = params['Datasets'][0]['Schema']['Attributes']

    for row in cursor:
        query_field = list(row[-len(attributes):])
        for forecast_type in params['Forecast']['ForecastTypes']:
            yield {
                **{
                    attributes[i]['AttributeName']: query_field[i]
                    for i in range(len(attributes))
                },
                **{
                    'type': get_type_string(forecast_type)
                }
            }