in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/utils/KuduTableUtils.java [98:121]
public static List<ColumnSchema> toKuduConnectorColumns(
List<Tuple2<String, DataType>> columns, Collection<String> keyColumns) {
return columns.stream()
.map(
t -> {
ColumnSchema.ColumnSchemaBuilder builder =
new ColumnSchema.ColumnSchemaBuilder(
t.f0, KuduTypeUtils.toKuduType(t.f1))
.key(keyColumns.contains(t.f0))
.nullable(
!keyColumns.contains(t.f0)
&& t.f1.getLogicalType().isNullable());
if (t.f1.getLogicalType() instanceof DecimalType) {
DecimalType decimalType = ((DecimalType) t.f1.getLogicalType());
builder.typeAttributes(
new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
.precision(decimalType.getPrecision())
.scale(decimalType.getScale())
.build());
}
return builder.build();
})
.collect(Collectors.toList());
}