private void doSink()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/out/sink/OutTsfileDataSink.java [114:240]


  private void doSink(List<TimeSeriesRowModel> bufferList) {
    totalFileNum = totalSize[0];
    int virtualSGNum = pcontext.getModel().getVirutalStorageGroupNum();
    long partitionInterval = pcontext.getModel().getPartitionInterval();

    Map<String, List<TimeSeriesRowModel>> groupByDeviceIdMap =
        bufferList.stream()
            .collect(
                Collectors.toMap(
                    k -> k.getDeviceModel().getDeviceName(),
                    p -> {
                      List<TimeSeriesRowModel> result = new ArrayList();
                      result.add(p);
                      return result;
                    },
                    (o, n) -> {
                      o.addAll(n);
                      return o;
                    }));
    for (String deviceIdKey : groupByDeviceIdMap.keySet()) {
      if (deviceIdKey.startsWith("finish")) {
        finishedFileNum.incrementAndGet();
        continue;
      }
      Map<String, List<TimeSeriesRowModel>> groupByTsfileNameKeyMap =
          groupByDeviceIdMap.get(deviceIdKey).stream()
              .collect(
                  Collectors.toMap(
                      k ->
                          getTsfileName(
                              k.getDeviceModel().getDeviceName(),
                              k.getTimestamp(),
                              virtualSGNum,
                              partitionInterval),
                      p -> {
                        List<TimeSeriesRowModel> result = new ArrayList();
                        result.add(p);
                        return result;
                      },
                      (o, n) -> {
                        o.addAll(n);
                        return o;
                      }));
      for (String tsfileNameKey : groupByTsfileNameKeyMap.keySet()) {

        List<TimeSeriesRowModel> groupByTsfileNameKeyList =
            groupByTsfileNameKeyMap.get(tsfileNameKey);
        // 需要 根据deviceName 来判断他属于那个tsfile
        TsFileWriter tsFileWriter =
            getTsfileWriter(
                tsfileWriterMap,
                schemaMap,
                deviceIdKey,
                tsfileNameKey,
                pcontext.getModel().getFileFolder());
        Schema schema = schemaMap.get(tsfileNameKey);
        registDevice(tsFileWriter, schema, deviceInfoMap, deviceIdKey);
        // 需要转化grouplist数据为tsfile可以写入的数据
        TimeSeriesRowModel firstRow = groupByTsfileNameKeyList.get(0);
        DeviceModel deviceModel = firstRow.getDeviceModel();
        List<MeasurementSchema> measurementSchemaList =
            firstRow.getIFieldList().stream()
                .map(
                    iField -> {
                      String columnName =
                          iField
                              .getColumnName()
                              .substring(deviceModel.getDeviceName().length() + 1);
                      TSDataType tsDataType = iField.getTsDataType();
                      MeasurementSchema measurementSchema =
                          new MeasurementSchema(columnName, tsDataType);
                      return measurementSchema;
                    })
                .collect(Collectors.toList());

        Tablet tablet = new Tablet(deviceModel.getDeviceName(), measurementSchemaList);
        tablet.initBitMaps();

        groupByTsfileNameKeyList.stream()
            .forEach(
                model -> {
                  List<FieldCopy> fields =
                      model.getIFieldList().stream()
                          .map(iField -> iField.getField())
                          .collect(Collectors.toList());
                  int rowIndex = tablet.rowSize++;
                  tablet.addTimestamp(rowIndex, Long.parseLong(model.getTimestamp()));
                  for (int i = 0; i < fields.size(); ) {
                    List<MeasurementSchema> schemas = tablet.getSchemas();
                    for (int j = 0; j < schemas.size(); j++) {
                      MeasurementSchema measurementSchema = schemas.get(j);
                      Object value = fields.get(i).getObjectValue(measurementSchema.getType());
                      if (value == null) {
                        tablet.bitMaps[j].mark(rowIndex);
                      }
                      tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
                      i++;
                    }
                  }

                  if (tablet.rowSize == tablet.getMaxRowNumber()) {
                    try {
                      exportPipelineService.syncWriteByTsfileWriter(
                          tsFileWriter, tablet, deviceModel.isAligned());
                      finishedRowNum.addAndGet(tablet.rowSize);
                      tablet.initBitMaps();
                      tablet.reset();
                    } catch (IOException | WriteProcessException e) {
                      e.printStackTrace();
                    }
                  }
                });

        try {
          if (tablet.rowSize != 0) {
            exportPipelineService.syncWriteByTsfileWriter(
                tsFileWriter, tablet, deviceModel.isAligned());
            finishedRowNum.addAndGet(tablet.rowSize);
          }
          tsFileWriter.flushAllChunkGroups();
        } catch (IOException | WriteProcessException e) {
          e.printStackTrace();
        }
      }
    }
    bufferList.clear();
  }