in connectors/flink-iotdb-connector/src/main/java/org/apache/iotdb/flink/DefaultIoTSerializationSchema.java [41:89]
public Event serialize(Map<String, String> tuple) {
if (tuple == null) {
return null;
}
String device = tuple.get(fieldDevice);
String ts = tuple.get(fieldTimestamp);
Long timestamp = ts == null ? System.currentTimeMillis() : Long.parseLong(ts);
List<String> measurements = null;
if (tuple.get(fieldMeasurements) != null) {
measurements = Arrays.asList(tuple.get(fieldMeasurements).split(separator));
}
List<TSDataType> types = new ArrayList<>();
for (String type : tuple.get(fieldTypes).split(separator)) {
types.add(TSDataType.valueOf(type));
}
List<Object> values = new ArrayList<>();
String[] valuesStr = tuple.get(fieldValues).split(separator);
for (int i = 0; i < valuesStr.length; i++) {
switch (types.get(i)) {
case INT64:
values.add(Long.parseLong(valuesStr[i]));
break;
case DOUBLE:
values.add(Double.parseDouble(valuesStr[i]));
break;
case INT32:
values.add(Integer.parseInt(valuesStr[i]));
break;
case TEXT:
values.add(valuesStr[i]);
break;
case FLOAT:
values.add(Float.parseFloat(valuesStr[i]));
break;
case BOOLEAN:
values.add(Boolean.parseBoolean(valuesStr[i]));
break;
default:
continue;
}
}
return new Event(device, timestamp, measurements, types, values);
}