in be/src/exec/parquet/hdfs-parquet-table-writer.cc [1317:1493]
Status HdfsParquetTableWriter::Init() {
// Initialize file metadata
file_metadata_.version = PARQUET_WRITER_VERSION;
stringstream created_by;
created_by << "impala version " << GetDaemonBuildVersion()
<< " (build " << GetDaemonBuildHash() << ")";
file_metadata_.__set_created_by(created_by.str());
// Default to snappy compressed
THdfsCompression::type codec = THdfsCompression::SNAPPY;
// Compression level only supported for zstd and zlib.
std::optional<int> clevel;
const TQueryOptions& query_options = state_->query_options();
if (query_options.__isset.compression_codec) {
codec = query_options.compression_codec.codec;
if (query_options.compression_codec.__isset.compression_level) {
clevel = query_options.compression_codec.compression_level;
}
} else if (table_desc_->IsIcebergTable()) {
TCompressionCodec compression_codec = table_desc_->IcebergParquetCompressionCodec();
codec = compression_codec.codec;
if (compression_codec.__isset.compression_level) {
clevel = compression_codec.compression_level;
}
}
if (!(codec == THdfsCompression::NONE ||
codec == THdfsCompression::GZIP ||
codec == THdfsCompression::SNAPPY ||
codec == THdfsCompression::ZSTD ||
codec == THdfsCompression::LZ4)) {
stringstream ss;
ss << "Invalid parquet compression codec " << Codec::GetCodecName(codec);
return Status(ss.str());
}
// Map parquet codecs to Impala codecs. Its important to do the above check before
// we do any mapping.
// Parquet supports codecs enumerated in parquet::CompressionCodec. Impala supports
// codecs enumerated in impala::THdfsCompression. In most cases, Impala codec and
// Parquet codec refer to the same codec. The only exception is LZ4. For Hadoop
// compatibility parquet::CompressionCodec::LZ4 refers to THdfsCompression::LZ4_BLOCKED
// and not THdfsCompression::LZ4. Hence, the following mapping and re-mapping to ensure
// that the input THdfsCompression::LZ4 codec gets mapped to
// THdfsCompression::LZ4_BLOCKED for parquet.
parquet::CompressionCodec::type parquet_codec = ConvertImpalaToParquetCodec(codec);
codec = ConvertParquetToImpalaCodec(parquet_codec);
VLOG_FILE << "Using compression codec: " << codec;
if (clevel.has_value()) {
VLOG_FILE << "Using compression level: " << clevel.value();
}
if (query_options.__isset.parquet_page_row_count_limit) {
page_row_count_limit_ = query_options.parquet_page_row_count_limit;
}
int num_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols();
// When opening files using the hdfsOpenFile() API, the maximum block size is limited to
// 2GB.
int64_t min_block_size = MinBlockSize(num_cols);
if (min_block_size >= numeric_limits<int32_t>::max()) {
stringstream ss;
return Status(Substitute("Minimum required block size must be less than 2GB "
"(currently $0), try reducing the number of non-partitioning columns in the "
"target table (currently $1).",
PrettyPrinter::Print(min_block_size, TUnit::BYTES), num_cols));
}
Codec::CodecInfo codec_info(codec, clevel);
if (is_iceberg_file_) {
ConfigureForIceberg(num_cols);
} else {
Configure(num_cols);
}
columns_.resize(num_cols);
// Initialize each column structure.
for (int i = 0; i < columns_.size(); ++i) {
BaseColumnWriter* writer = nullptr;
const ColumnType& type = output_expr_evals_[i]->root().type();
const int num_clustering_cols = table_desc_->num_clustering_cols();
const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
switch (type.type) {
case TYPE_BOOLEAN:
writer =
new BoolColumnWriter(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_TINYINT:
writer =
new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_SMALLINT:
writer =
new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_INT:
writer =
new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_BIGINT:
writer =
new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_FLOAT:
writer =
new ColumnWriter<float>(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_DOUBLE:
writer =
new ColumnWriter<double>(this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_TIMESTAMP:
switch (timestamp_type_) {
case TParquetTimestampType::INT96_NANOS:
writer =
new ColumnWriter<TimestampValue>(
this, output_expr_evals_[i], codec_info, col_name);
break;
case TParquetTimestampType::INT64_MILLIS:
writer = new Int64MilliTimestampColumnWriter(
this, output_expr_evals_[i], codec_info, col_name);
break;
case TParquetTimestampType::INT64_MICROS:
writer = new Int64MicroTimestampColumnWriter(
this, output_expr_evals_[i], codec_info, col_name);
break;
case TParquetTimestampType::INT64_NANOS:
writer = new Int64NanoTimestampColumnWriter(
this, output_expr_evals_[i], codec_info, col_name);
break;
default:
DCHECK(false);
}
break;
case TYPE_VARCHAR:
case TYPE_STRING:
case TYPE_CHAR:
writer = new ColumnWriter<StringValue>(
this, output_expr_evals_[i], codec_info, col_name);
break;
case TYPE_DECIMAL:
switch (output_expr_evals_[i]->root().type().GetByteSize()) {
case 4:
writer =
new ColumnWriter<Decimal4Value>(
this, output_expr_evals_[i], codec_info, col_name);
break;
case 8:
writer =
new ColumnWriter<Decimal8Value>(
this, output_expr_evals_[i], codec_info, col_name);
break;
case 16:
writer =
new ColumnWriter<Decimal16Value>(
this, output_expr_evals_[i], codec_info, col_name);
break;
default:
DCHECK(false);
}
break;
case TYPE_DATE:
writer = new ColumnWriter<DateValue>(
this, output_expr_evals_[i], codec_info, col_name);
break;
default:
DCHECK(false);
}
columns_[i].reset(writer);
RETURN_IF_ERROR(columns_[i]->Init());
}
RETURN_IF_ERROR(CreateSchema());
return Status::OK();
}