in datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs [491:785]
fn row_group_pruning_predicate_decimal_type() {
// For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
// store the data.
// In this case, construct four types of statistics to filtered with the decimal predication.
// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 2,
precision: 9,
})
.with_scale(2)
.with_precision(9);
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [1.00, 6.00]
// c1 > 5, this row group will be included in the results.
vec![ParquetStatistics::int32(
Some(100),
Some(600),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
// [0.1, 0.2]
// c1 > 5, this row group will not be included in the results.
vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
// [1, None]
// c1 > 5, this row group can not be filtered out, so will be included in the results.
vec![ParquetStatistics::int32(Some(100), None, None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 2]
);
// INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
// We should convert all type to the coercion type, which is decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 0,
precision: 9,
})
.with_scale(0)
.with_precision(9);
let schema_descr = get_test_schema_descr(vec![field]);
let expr = cast(col("c1"), DataType::Decimal128(11, 2)).gt(cast(
lit(ScalarValue::Decimal128(Some(500), 5, 2)),
Decimal128(11, 2),
));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [100, 600]
// c1 > 5, this row group will be included in the results.
vec![ParquetStatistics::int32(
Some(100),
Some(600),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
// [10, 20]
// c1 > 5, this row group will be included in the results.
vec![ParquetStatistics::int32(Some(10), Some(20), None, 0, false)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
// [0, 2]
// c1 > 5, this row group will not be included in the results.
vec![ParquetStatistics::int32(Some(0), Some(2), None, 0, false)],
);
let rgm4 = get_row_group_meta_data(
&schema_descr,
// [None, 2]
// c1 > 5, this row group can not be filtered out, so will be included in the results.
vec![ParquetStatistics::int32(None, Some(2), None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3, rgm4],
None,
Some(&pruning_predicate),
&metrics
),
vec![0, 1, 3]
);
// INT64: c1 < 5, the c1 is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
.with_logical_type(LogicalType::Decimal {
scale: 2,
precision: 18,
})
.with_scale(2)
.with_precision(18);
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [6.00, 8.00]
vec![ParquetStatistics::int32(
Some(600),
Some(800),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
// [0.1, 0.2]
vec![ParquetStatistics::int64(Some(10), Some(20), None, 0, false)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
// [0.1, 0.2]
vec![ParquetStatistics::int64(None, None, None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
precision: 18,
})
.with_scale(2)
.with_precision(18)
.with_byte_len(16);
let schema_descr = get_test_schema_descr(vec![field]);
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
// 5.00
Some(FixedLenByteArray::from(ByteArray::from(
500i128.to_be_bytes().to_vec(),
))),
// 80.00
Some(FixedLenByteArray::from(ByteArray::from(
8000i128.to_be_bytes().to_vec(),
))),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
// 5.00
Some(FixedLenByteArray::from(ByteArray::from(
500i128.to_be_bytes().to_vec(),
))),
// 200.00
Some(FixedLenByteArray::from(ByteArray::from(
20000i128.to_be_bytes().to_vec(),
))),
None,
0,
false,
)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::fixed_len_byte_array(
None, None, None, 0, false,
)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
let schema =
Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
precision: 18,
})
.with_scale(2)
.with_precision(18)
.with_byte_len(16);
let schema_descr = get_test_schema_descr(vec![field]);
// cast the type of c1 to decimal(28,3)
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(
// 5.00
Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
// 80.00
Some(ByteArray::from(8000i128.to_be_bytes().to_vec())),
None,
0,
false,
)],
);
let rgm2 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(
// 5.00
Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
// 200.00
Some(ByteArray::from(20000i128.to_be_bytes().to_vec())),
None,
0,
false,
)],
);
let rgm3 = get_row_group_meta_data(
&schema_descr,
vec![ParquetStatistics::byte_array(None, None, None, 0, false)],
);
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
Some(&pruning_predicate),
&metrics
),
vec![1, 2]
);
}