in seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchTypeConverter.java [232:369]
public BasicTypeDefine<EsType> reconvert(Column column) {
BasicTypeDefine.BasicTypeDefineBuilder<EsType> builder =
BasicTypeDefine.<EsType>builder()
.name(column.getName())
.nullable(column.isNullable())
.comment(column.getComment())
.defaultValue(column.getDefaultValue());
switch (column.getDataType().getSqlType()) {
case BOOLEAN:
builder.columnType(BOOLEAN);
builder.dataType(BOOLEAN);
builder.nativeType(new EsType(BOOLEAN, new HashMap<>()));
break;
case BYTES:
builder.columnType(BINARY);
builder.dataType(BINARY);
builder.nativeType(new EsType(BINARY, new HashMap<>()));
break;
case TINYINT:
builder.columnType(BYTE);
builder.dataType(BYTE);
builder.nativeType(new EsType(BYTE, new HashMap<>()));
break;
case SMALLINT:
builder.columnType(SHORT);
builder.dataType(SHORT);
builder.nativeType(new EsType(SHORT, new HashMap<>()));
break;
case INT:
builder.columnType(INTEGER);
builder.dataType(INTEGER);
builder.nativeType(new EsType(INTEGER, new HashMap<>()));
break;
case BIGINT:
builder.columnType(LONG);
builder.dataType(LONG);
builder.nativeType(new EsType(LONG, new HashMap<>()));
break;
case FLOAT:
builder.columnType(FLOAT);
builder.dataType(FLOAT);
builder.nativeType(new EsType(FLOAT, new HashMap<>()));
break;
case DOUBLE:
builder.columnType(DOUBLE);
builder.dataType(DOUBLE);
builder.nativeType(new EsType(DOUBLE, new HashMap<>()));
break;
case DATE:
case TIMESTAMP:
Map<String, Object> option = new HashMap<>();
if (column.getScale() != null && column.getScale() > 3) {
option.put("format", "strict_date_optional_time||epoch_millis");
builder.columnType(DATE_NANOS);
builder.dataType(DATE_NANOS);
builder.nativeType(new EsType(DATE_NANOS, option));
} else {
option.put("format", "strict_date_optional_time_nanos||epoch_millis");
builder.columnType(DATE);
builder.dataType(DATE);
builder.nativeType(new EsType(DATE, option));
}
break;
case DECIMAL:
builder.columnType(TEXT);
builder.dataType(TEXT);
builder.nativeType(new EsType(TEXT, new HashMap<>()));
break;
case MAP:
builder.columnType(FLATTENED);
builder.dataType(FLATTENED);
builder.nativeType(new EsType(FLATTENED, new HashMap<>()));
break;
case ARRAY:
SeaTunnelDataType type = ((ArrayType) column.getDataType()).getElementType();
if (type.equals(BasicType.BYTE_TYPE)) {
builder.columnType(BINARY);
builder.dataType(BINARY);
builder.nativeType(new EsType(BINARY, new HashMap<>()));
} else if (type.equals(BasicType.SHORT_TYPE)) {
builder.columnType(SHORT);
builder.dataType(SHORT);
builder.nativeType(new EsType(SHORT, new HashMap<>()));
} else if (type.equals(BasicType.INT_TYPE)) {
builder.columnType(INTEGER);
builder.dataType(INTEGER);
builder.nativeType(new EsType(INTEGER, new HashMap<>()));
} else if (type.equals(BasicType.LONG_TYPE)) {
builder.columnType(LONG);
builder.dataType(LONG);
builder.nativeType(new EsType(LONG, new HashMap<>()));
} else if (type.equals(BasicType.FLOAT_TYPE)) {
builder.columnType(FLOAT);
builder.dataType(FLOAT);
builder.nativeType(new EsType(FLOAT, new HashMap<>()));
} else if (type.equals(BasicType.DOUBLE_TYPE)) {
builder.columnType(DOUBLE);
builder.dataType(DOUBLE);
builder.nativeType(new EsType(DOUBLE, new HashMap<>()));
} else if (type.equals(BasicType.STRING_TYPE)) {
builder.columnType(TEXT);
builder.dataType(TEXT);
builder.nativeType(new EsType(TEXT, new HashMap<>()));
} else {
builder.columnType(TEXT);
builder.dataType(TEXT);
builder.nativeType(new EsType(TEXT, new HashMap<>()));
}
break;
case ROW:
builder.columnType(OBJECT);
builder.dataType(OBJECT);
SeaTunnelRowType row = (SeaTunnelRowType) column.getDataType();
Map<String, BasicTypeDefine<EsType>> typeInfo = new HashMap<>();
for (int i = 0; i < row.getFieldNames().length; i++) {
typeInfo.put(
row.getFieldName(i),
reconvert(
PhysicalColumn.of(
row.getFieldName(i),
row.getFieldType(i),
(Long) null,
true,
null,
null)));
}
builder.nativeType(new EsType(OBJECT, (Map) typeInfo));
break;
case TIME:
case NULL:
case STRING:
default:
builder.columnType(TEXT);
builder.dataType(TEXT);
builder.nativeType(new EsType(TEXT, new HashMap<>()));
}
return builder.build();
}