def lambda_handler()

in 04_EdgeApplication/report/lambda_ingest_logs_cloudwatch.py [0:0]


def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    # get the device name and check for the message type: logs or preds
    if event.get('Records') is not None:
        for r in event['Records']:
            bucket = r['s3']['bucket']['name']
            key = r['s3']['object']['key']
            
            with io.BytesIO() as data:
                s3_client.download_fileobj(bucket, key, data)
                data.seek(0)
                log_data = []
                for line in data.readlines():
                    log = json.loads(line)
                    inputs = struct.unpack('6f', base64.b64decode(log['deviceFleetInputs'][0]['data']))
                    outputs = struct.unpack('6f', base64.b64decode(log['deviceFleetOutputs'][0]['data']))
                    log_data.append({
                        'timestamp': round(time.time() * 1000),
                        'message': ' '.join([str(i) for i in (inputs + outputs)])
                    })
            put_events('preds', log_data)
    elif event.get('device_name') is not None:
        device_name = event['device_name']
        if event['msg_type'] == 'logs':
            log_data = []
            for logs in event['logs']:
                data = logs['data']
                log_data.append({
                    'timestamp': round(time.time() * 1000),
                    'message': ' '.join([logs['ts'], device_name] + [str(i) for i in data])
                })
            put_events('sensors', log_data)
    else:
        raise Exception("Invalid event: %s" % json.dumps(event))