in agora/cerebral_simulator/src/app.py [0:0]
def write_data_to_influxdb(equipment_type, device_id, measurement_name, timestamp, data):
global write_api
if not ENABLE_INFLUXDB or write_api is None:
return
try:
point = Point(measurement_name).time(timestamp, WritePrecision.NS)
for key, value in data.items():
if isinstance(value, (int, float)):
point.field(key, float(value))
elif isinstance(value, bool):
point.field(key, value)
elif isinstance(value, str):
point.tag(key, value)
elif value is None:
continue
else:
if VERBOSE:
logger.warning(f"Skipping field {key} with unsupported type {type(value)}")
write_api.write(bucket=INFLUXDB_BUCKET, record=point)
if VERBOSE:
logger.info(f"Data for {device_id} written to InfluxDB: {data}")
except Exception as e:
logger.error(f"Error writing data to InfluxDB for {device_id}: {str(e)}")
# try to reconnect
init_influxdb()