public void appendRowGroup()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java [1033:1115]


  public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
                             boolean dropColumns) throws IOException {
    startBlock(rowGroup.getRowCount());

    Map<String, ColumnChunkMetaData> columnsToCopy =
        new HashMap<String, ColumnChunkMetaData>();
    for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
      columnsToCopy.put(chunk.getPath().toDotString(), chunk);
    }

    List<ColumnChunkMetaData> columnsInOrder =
        new ArrayList<ColumnChunkMetaData>();

    for (ColumnDescriptor descriptor : schema.getColumns()) {
      String path = ColumnPath.get(descriptor.getPath()).toDotString();
      ColumnChunkMetaData chunk = columnsToCopy.remove(path);
      if (chunk != null) {
        columnsInOrder.add(chunk);
      } else {
        throw new IllegalArgumentException(String.format(
            "Missing column '%s', cannot copy row group: %s", path, rowGroup));
      }
    }

    // complain if some columns would be dropped and that's not okay
    if (!dropColumns && !columnsToCopy.isEmpty()) {
      throw new IllegalArgumentException(String.format(
          "Columns cannot be copied (missing from target schema): %s",
          String.join(", ", columnsToCopy.keySet())));
    }

    // copy the data for all chunks
    long start = -1;
    long length = 0;
    long blockUncompressedSize = 0L;
    for (int i = 0; i < columnsInOrder.size(); i += 1) {
      ColumnChunkMetaData chunk = columnsInOrder.get(i);

      // get this chunk's start position in the new file
      long newChunkStart = out.getPos() + length;

      // add this chunk to be copied with any previous chunks
      if (start < 0) {
        // no previous chunk included, start at this chunk's starting pos
        start = chunk.getStartingPos();
      }
      length += chunk.getTotalSize();

      if ((i + 1) == columnsInOrder.size() ||
          columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
        // not contiguous. do the copy now.
        copy(from, out, start, length);
        // reset to start at the next column chunk
        start = -1;
        length = 0;
      }

      // TODO: column/offset indexes are not copied
      // (it would require seeking to the end of the file for each row groups)
      currentColumnIndexes.add(null);
      currentOffsetIndexes.add(null);

      Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
      currentBlock.addColumn(ColumnChunkMetaData.get(
          chunk.getPath(),
          chunk.getPrimitiveType(),
          chunk.getCodec(),
          chunk.getEncodingStats(),
          chunk.getEncodings(),
          chunk.getStatistics(),
          offsets.firstDataPageOffset,
          offsets.dictionaryPageOffset,
          chunk.getValueCount(),
          chunk.getTotalSize(),
          chunk.getTotalUncompressedSize()));

      blockUncompressedSize += chunk.getTotalUncompressedSize();
    }

    currentBlock.setTotalByteSize(blockUncompressedSize);

    endBlock();
  }