def lambda_handler()

in 04_EdgeApplication/report/lambda_ingest_logs_elasticsearch.py [0:0]


def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    # get the device name and check for the message type: logs or preds
    if event.get('Records') is not None:
        for r in event['Records']:
            bucket = r['s3']['bucket']['name']
            key = r['s3']['object']['key']
            
            with io.BytesIO() as data:
                s3_client.download_fileobj(bucket, key, data)
                data.seek(0)
                log_data = []
                for line in data.readlines():
                    log = json.loads(line)
                    meta = log['eventMetadata']
                    inputs = struct.unpack('6f', base64.b64decode(log['deviceFleetInputs'][0]['data']))
                    outputs = struct.unpack('6f', base64.b64decode(log['deviceFleetOutputs'][0]['data']))
                    item = {
                        "deviceId": meta['deviceId'],
                        "eventTime": "%s+00:00" % datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
                    }
                    for i,d in enumerate(pred_labels):
                        item["mean_pred_%s" % d] = str(inputs[i])
                        item["anomaly_%s" % d] = str(outputs[i])
                    put_record(item, 'preds')
    elif event.get('device_name') is not None:
        device_name = event['device_name']
        if event['msg_type'] == 'logs':
            log_data = []
            for logs in event['logs']:
                data = logs['data']
                item = {
                    "deviceId": device_name,
                    "turbineId": device_name.replace('jetson-', 'Turbine '),
                    "arduino_timestamp": data[0],
                    "nanoFreemem": data[1],
                    "eventTime": logs['ts']
                }
                for i,d in enumerate(log_labels):
                    item[d] = data[i+2]
                log_data.append(item)
                put_record(item, 'logs')

            csv_buffer = io.BytesIO()
            csv_buffer.write(json.dumps(log_data).encode('utf-8'))
            csv_buffer.seek(0)
    else:
        raise Exception("Invalid event: %s" % json.dumps(event))