public void open()

in connectors/flink-tsfile-connector/src/main/java/org/apache/iotdb/flink/tsfile/RowTSRecordConverter.java [58:110]


  public void open(Schema schema) {
    this.tsRecordIndexMapping = new int[rowTypeInfo.getArity()];
    this.dataPointIndexMapping = new int[rowTypeInfo.getArity()];
    List<TSRecord> outputTemplateList = new ArrayList<>();

    for (int i = 0; i < rowTypeInfo.getArity(); i++) {
      String fieldName = rowTypeInfo.getFieldNames()[i];
      if (QueryConstant.RESERVED_TIME.equals(fieldName)) {
        timeIndex = i;
        tsRecordIndexMapping[i] = -1;
        dataPointIndexMapping[i] = -1;
        continue;
      }
      String deviceId =
          fieldName.substring(0, fieldName.lastIndexOf(TsFileConstant.PATH_SEPARATOR));
      String measurementId =
          fieldName.substring(fieldName.lastIndexOf(TsFileConstant.PATH_SEPARATOR) + 1);
      int tsRecordIndex =
          outputTemplateList.stream()
              .map(t -> t.deviceId)
              .collect(Collectors.toList())
              .indexOf(deviceId);
      if (tsRecordIndex < 0) {
        outputTemplateList.add(new TSRecord(deviceId, 0));
        tsRecordIndex = outputTemplateList.size() - 1;
      }
      tsRecordIndexMapping[i] = tsRecordIndex;
      TSRecord tsRecord = outputTemplateList.get(tsRecordIndex);
      Class typeClass = rowTypeInfo.getFieldTypes()[i].getTypeClass();
      if (typeClass == Boolean.class || typeClass == boolean.class) {
        tsRecord.addTuple(new BooleanDataPoint(measurementId, false));
      } else if (typeClass == Integer.class || typeClass == int.class) {
        tsRecord.addTuple(new IntDataPoint(measurementId, 0));
      } else if (typeClass == Long.class || typeClass == long.class) {
        tsRecord.addTuple(new LongDataPoint(measurementId, 0));
      } else if (typeClass == Float.class || typeClass == float.class) {
        tsRecord.addTuple(new FloatDataPoint(measurementId, 0));
      } else if (typeClass == Double.class || typeClass == double.class) {
        tsRecord.addTuple(new DoubleDataPoint(measurementId, 0));
      } else if (typeClass == String.class) {
        tsRecord.addTuple(new StringDataPoint(measurementId, null));
      } else {
        throw new UnSupportedDataTypeException(typeClass.toString());
      }
      dataPointIndexMapping[i] = tsRecord.dataPointList.size() - 1;
    }
    outputTemplate = outputTemplateList.toArray(new TSRecord[0]);

    reuse = new TSRecord[outputTemplate.length];
    for (int i = 0; i < outputTemplate.length; i++) {
      reuse[i] = new TSRecord(outputTemplate[i].deviceId, 0);
    }
  }