in connectors/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java [113:157]
public void convert(Row input, Collector<TSRecord> collector) {
long timestamp = (long) input.getField(timeIndex);
for (TSRecord tsRecord : reuse) {
tsRecord.dataPointList.clear();
}
for (int i = 0; i < input.getArity(); i++) {
if (i == timeIndex) {
continue;
}
TSRecord templateRecord = outputTemplate[tsRecordIndexMapping[i]];
DataPoint templateDataPoint = templateRecord.dataPointList.get(dataPointIndexMapping[i]);
Object o = input.getField(i);
if (o != null) {
switch (templateDataPoint.getType()) {
case BOOLEAN:
templateDataPoint.setBoolean((Boolean) o);
break;
case INT32:
templateDataPoint.setInteger((Integer) o);
break;
case INT64:
templateDataPoint.setLong((Long) o);
break;
case FLOAT:
templateDataPoint.setFloat((Float) o);
break;
case DOUBLE:
templateDataPoint.setDouble((Double) o);
break;
case TEXT:
templateDataPoint.setString(BytesUtils.valueOf((String) o));
break;
default:
templateDataPoint.setString(BytesUtils.valueOf(o.toString()));
}
reuse[tsRecordIndexMapping[i]].addTuple(templateDataPoint);
}
}
for (TSRecord tsRecord : reuse) {
if (!tsRecord.dataPointList.isEmpty()) {
tsRecord.setTime(timestamp);
collector.collect(tsRecord);
}
}
}