in seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java [93:229]
public Column convert(BasicTypeDefine<EsType> typeDefine) {
PhysicalColumn.PhysicalColumnBuilder builder =
PhysicalColumn.builder()
.name(typeDefine.getName())
.sourceType(typeDefine.getColumnType())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());
String type = typeDefine.getDataType().toLowerCase();
switch (type) {
case AGGREGATE_METRIC_DOUBLE:
List<String> metrics =
(List<String>) typeDefine.getNativeType().getOptions().get("metrics");
builder.dataType(
new SeaTunnelRowType(
metrics.toArray(new String[0]),
metrics.stream()
.map(s -> BasicType.DOUBLE_TYPE)
.toArray(SeaTunnelDataType<?>[]::new)));
break;
case DENSE_VECTOR:
String elementType =
typeDefine.getNativeType().getOptions().get("element_type").toString();
if (elementType.equals("byte")) {
builder.dataType(ArrayType.BYTE_ARRAY_TYPE);
} else {
builder.dataType(ArrayType.FLOAT_ARRAY_TYPE);
}
break;
case BYTE:
builder.dataType(BasicType.BYTE_TYPE);
break;
case BOOLEAN:
builder.dataType(BasicType.BOOLEAN_TYPE);
break;
case DATE:
case DATETIME:
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
builder.scale(3);
break;
case DATE_NANOS:
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
builder.scale(9);
break;
case DOUBLE:
case RANK_FEATURE:
builder.dataType(BasicType.DOUBLE_TYPE);
break;
case FLOAT:
case HALF_FLOAT:
builder.dataType(BasicType.FLOAT_TYPE);
break;
case HISTOGRAM:
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"values", "counts"},
new SeaTunnelDataType<?>[] {
ArrayType.DOUBLE_ARRAY_TYPE, ArrayType.LONG_ARRAY_TYPE
});
builder.dataType(rowType);
break;
case EsType.NESTED:
builder.dataType(
new ArrayType<>(
Map[].class,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)));
break;
case INTEGER:
case TOKEN_COUNT:
builder.dataType(BasicType.INT_TYPE);
break;
case LONG:
builder.dataType(BasicType.LONG_TYPE);
break;
case SHORT:
builder.dataType(BasicType.SHORT_TYPE);
break;
case OBJECT:
Map<String, BasicTypeDefine<EsType>> typeInfo =
(Map) typeDefine.getNativeType().getOptions();
SeaTunnelRowType object =
new SeaTunnelRowType(
typeInfo.keySet().toArray(new String[0]),
typeInfo.values().stream()
.map(this::convert)
.map(Column::getDataType)
.toArray(SeaTunnelDataType<?>[]::new));
builder.dataType(object);
break;
case INTEGER_RANGE:
builder.dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE));
break;
case FLOAT_RANGE:
builder.dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.FLOAT_TYPE));
break;
case LONG_RANGE:
builder.dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.LONG_TYPE));
break;
case DOUBLE_RANGE:
builder.dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.DOUBLE_TYPE));
break;
case DATE_RANGE:
builder.dataType(
new MapType<>(BasicType.STRING_TYPE, LocalTimeType.LOCAL_DATE_TIME_TYPE));
break;
case IP_RANGE:
builder.dataType(new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE));
break;
case UNSIGNED_LONG:
builder.dataType(new DecimalType(20, 0));
builder.columnLength(20L);
builder.scale(0);
break;
case TEXT:
case BINARY:
case VERSION:
case IP:
case JOIN:
case KEYWORD:
case FLATTENED:
case GEO_POINT:
case COMPLETION:
case STRING:
case GEO_SHAPE:
case PERCOLATOR:
case POINT:
case RANK_FEATURES:
case SEARCH_AS_YOU_TYPE:
case SPARSE_VECTOR:
case MATCH_ONLY_TEXT:
case SHAPE:
default:
builder.dataType(BasicType.STRING_TYPE);
break;
}
return builder.build();
}