Status HdfsParquetTableWriter::Init()

in be/src/exec/parquet/hdfs-parquet-table-writer.cc [1317:1493]


Status HdfsParquetTableWriter::Init() {
  // Initialize file metadata
  file_metadata_.version = PARQUET_WRITER_VERSION;

  stringstream created_by;
  created_by << "impala version " << GetDaemonBuildVersion()
             << " (build " << GetDaemonBuildHash() << ")";
  file_metadata_.__set_created_by(created_by.str());

  // Default to snappy compressed
  THdfsCompression::type codec = THdfsCompression::SNAPPY;
  // Compression level only supported for zstd and zlib.
  std::optional<int> clevel;
  const TQueryOptions& query_options = state_->query_options();
  if (query_options.__isset.compression_codec) {
    codec = query_options.compression_codec.codec;
    if (query_options.compression_codec.__isset.compression_level) {
      clevel = query_options.compression_codec.compression_level;
    }
  } else if (table_desc_->IsIcebergTable()) {
    TCompressionCodec compression_codec = table_desc_->IcebergParquetCompressionCodec();
    codec = compression_codec.codec;
    if (compression_codec.__isset.compression_level) {
      clevel = compression_codec.compression_level;
    }
  }

  if (!(codec == THdfsCompression::NONE ||
        codec == THdfsCompression::GZIP ||
        codec == THdfsCompression::SNAPPY ||
        codec == THdfsCompression::ZSTD ||
        codec == THdfsCompression::LZ4)) {
    stringstream ss;
    ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
    return Status(ss.str());
  }

  // Map parquet codecs to Impala codecs. Its important to do the above check before
  // we do any mapping.
  // Parquet supports codecs enumerated in parquet::CompressionCodec. Impala supports
  // codecs enumerated in impala::THdfsCompression. In most cases, Impala codec and
  // Parquet codec refer to the same codec. The only exception is LZ4. For Hadoop
  // compatibility parquet::CompressionCodec::LZ4 refers to THdfsCompression::LZ4_BLOCKED
  // and not THdfsCompression::LZ4. Hence, the following mapping and re-mapping to ensure
  // that the input THdfsCompression::LZ4 codec gets mapped to
  // THdfsCompression::LZ4_BLOCKED for parquet.
  parquet::CompressionCodec::type parquet_codec = ConvertImpalaToParquetCodec(codec);
  codec = ConvertParquetToImpalaCodec(parquet_codec);

  VLOG_FILE << "Using compression codec: " << codec;
  if (clevel.has_value()) {
    VLOG_FILE << "Using compression level: " << clevel.value();
  }

  if (query_options.__isset.parquet_page_row_count_limit) {
    page_row_count_limit_ = query_options.parquet_page_row_count_limit;
  }

  int num_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols();
  // When opening files using the hdfsOpenFile() API, the maximum block size is limited to
  // 2GB.
  int64_t min_block_size = MinBlockSize(num_cols);
  if (min_block_size >= numeric_limits<int32_t>::max()) {
    stringstream ss;
    return Status(Substitute("Minimum required block size must be less than 2GB "
        "(currently $0), try reducing the number of non-partitioning columns in the "
        "target table (currently $1).",
        PrettyPrinter::Print(min_block_size, TUnit::BYTES), num_cols));
  }

  Codec::CodecInfo codec_info(codec, clevel);

  if (is_iceberg_file_) {
    ConfigureForIceberg(num_cols);
  } else {
    Configure(num_cols);
  }

  columns_.resize(num_cols);
  // Initialize each column structure.
  for (int i = 0; i < columns_.size(); ++i) {
    BaseColumnWriter* writer = nullptr;
    const ColumnType& type = output_expr_evals_[i]->root().type();
    const int num_clustering_cols = table_desc_->num_clustering_cols();
    const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
    switch (type.type) {
      case TYPE_BOOLEAN:
        writer =
          new BoolColumnWriter(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_TINYINT:
        writer =
          new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_SMALLINT:
        writer =
          new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_INT:
        writer =
          new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_BIGINT:
        writer =
          new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_FLOAT:
        writer =
          new ColumnWriter<float>(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_DOUBLE:
        writer =
          new ColumnWriter<double>(this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_TIMESTAMP:
        switch (timestamp_type_) {
          case TParquetTimestampType::INT96_NANOS:
            writer =
                new ColumnWriter<TimestampValue>(
                    this, output_expr_evals_[i], codec_info, col_name);
            break;
          case TParquetTimestampType::INT64_MILLIS:
            writer = new Int64MilliTimestampColumnWriter(
                this, output_expr_evals_[i], codec_info, col_name);
            break;
          case TParquetTimestampType::INT64_MICROS:
            writer = new Int64MicroTimestampColumnWriter(
                this, output_expr_evals_[i], codec_info, col_name);
            break;
          case TParquetTimestampType::INT64_NANOS:
            writer = new Int64NanoTimestampColumnWriter(
                this, output_expr_evals_[i], codec_info, col_name);
            break;
          default:
            DCHECK(false);
        }
        break;
      case TYPE_VARCHAR:
      case TYPE_STRING:
      case TYPE_CHAR:
        writer = new ColumnWriter<StringValue>(
            this, output_expr_evals_[i], codec_info, col_name);
        break;
      case TYPE_DECIMAL:
        switch (output_expr_evals_[i]->root().type().GetByteSize()) {
          case 4:
            writer =
                new ColumnWriter<Decimal4Value>(
                    this, output_expr_evals_[i], codec_info, col_name);
            break;
          case 8:
            writer =
                new ColumnWriter<Decimal8Value>(
                    this, output_expr_evals_[i], codec_info, col_name);
            break;
          case 16:
            writer =
                new ColumnWriter<Decimal16Value>(
                    this, output_expr_evals_[i], codec_info, col_name);
            break;
          default:
            DCHECK(false);
        }
        break;
      case TYPE_DATE:
        writer = new ColumnWriter<DateValue>(
            this, output_expr_evals_[i], codec_info, col_name);
        break;
      default:
        DCHECK(false);
    }
    columns_[i].reset(writer);
    RETURN_IF_ERROR(columns_[i]->Init());
  }
  RETURN_IF_ERROR(CreateSchema());
  return Status::OK();
}