in streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDbSink.java [124:181]
public void onEvent(Event event) throws SpRuntimeException {
if (event == null) {
return;
}
final AbstractField timestampAbstractField = event.getFieldBySelector(timestampFieldId);
final Long timestamp = timestampAbstractField.getAsPrimitive().getAsLong();
if (timestamp == null) {
return;
}
final Map<String, Object> measurementValuePairs = event.getRaw();
// should be at least a timestamp field and a measurement field
if (measurementValuePairs.size() <= 1) {
return;
}
final int measurementFieldCount = measurementValuePairs.size() - 1;
final List<String> measurements = new ArrayList<>(measurementFieldCount);
final List<TSDataType> types = new ArrayList<>(measurementFieldCount);
final List<Object> values = new ArrayList<>(measurementFieldCount);
for (Map.Entry<String, Object> measurementValuePair : measurementValuePairs.entrySet()) {
if (timestampAbstractField.getFieldNameIn().equals(measurementValuePair.getKey())) {
continue;
}
measurements.add(measurementValuePair.getKey());
final Object value = measurementValuePair.getValue();
if (value instanceof Integer) {
types.add(TSDataType.INT32);
values.add(value);
} else if (value instanceof Long) {
types.add(TSDataType.INT64);
values.add(value);
} else if (value instanceof Float) {
types.add(TSDataType.FLOAT);
values.add(value);
} else if (value instanceof Double) {
types.add(TSDataType.DOUBLE);
values.add(value);
} else if (value instanceof Boolean) {
types.add(TSDataType.BOOLEAN);
values.add(value);
} else {
types.add(TSDataType.TEXT);
values.add(value);
}
}
try {
sessionPool.insertRecord(deviceId, timestamp, measurements, types, values);
} catch (IoTDBConnectionException | StatementExecutionException e) {
LOG.error("Failed to save event to IoTDB, because: " + e.getMessage());
e.printStackTrace();
}
}