in 04_EdgeApplication/report/lambda_ingest_logs_elasticsearch.py [0:0]
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
# get the device name and check for the message type: logs or preds
if event.get('Records') is not None:
for r in event['Records']:
bucket = r['s3']['bucket']['name']
key = r['s3']['object']['key']
with io.BytesIO() as data:
s3_client.download_fileobj(bucket, key, data)
data.seek(0)
log_data = []
for line in data.readlines():
log = json.loads(line)
meta = log['eventMetadata']
inputs = struct.unpack('6f', base64.b64decode(log['deviceFleetInputs'][0]['data']))
outputs = struct.unpack('6f', base64.b64decode(log['deviceFleetOutputs'][0]['data']))
item = {
"deviceId": meta['deviceId'],
"eventTime": "%s+00:00" % datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3]
}
for i,d in enumerate(pred_labels):
item["mean_pred_%s" % d] = str(inputs[i])
item["anomaly_%s" % d] = str(outputs[i])
put_record(item, 'preds')
elif event.get('device_name') is not None:
device_name = event['device_name']
if event['msg_type'] == 'logs':
log_data = []
for logs in event['logs']:
data = logs['data']
item = {
"deviceId": device_name,
"turbineId": device_name.replace('jetson-', 'Turbine '),
"arduino_timestamp": data[0],
"nanoFreemem": data[1],
"eventTime": logs['ts']
}
for i,d in enumerate(log_labels):
item[d] = data[i+2]
log_data.append(item)
put_record(item, 'logs')
csv_buffer = io.BytesIO()
csv_buffer.write(json.dumps(log_data).encode('utf-8'))
csv_buffer.seek(0)
else:
raise Exception("Invalid event: %s" % json.dumps(event))