in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java [85:103]
public static List<ColumnSchema> toKuduConnectorColumns(List<Tuple2<String, DataType>> columns,
Collection<String> keyColumns) {
return columns.stream()
.map(t -> {
ColumnSchema.ColumnSchemaBuilder builder = new ColumnSchema
.ColumnSchemaBuilder(t.f0, KuduTypeUtils.toKuduType(t.f1))
.key(keyColumns.contains(t.f0))
.nullable(!keyColumns.contains(t.f0) && t.f1.getLogicalType().isNullable());
if (t.f1.getLogicalType() instanceof DecimalType) {
DecimalType decimalType = ((DecimalType) t.f1.getLogicalType());
builder.typeAttributes(new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
.precision(decimalType.getPrecision())
.scale(decimalType.getScale())
.build());
}
return builder.build();
}
).collect(Collectors.toList());
}