in seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/utils/source/MilvusSourceConverter.java [68:257]
public SeaTunnelRow convertToSeaTunnelRow(
QueryResultsWrapper.RowRecord record, TableSchema tableSchema, TablePath tablePath) {
// get field names and types
SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
String[] fieldNames = typeInfo.getFieldNames();
Object[] seatunnelField = new Object[typeInfo.getTotalFields()];
// get field values from source milvus
Map<String, Object> fieldValuesMap = record.getFieldValues();
// filter dynamic field
JsonObject dynamicField = convertDynamicField(fieldValuesMap);
for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) {
if (fieldNames[fieldIndex].equals(CommonOptions.METADATA.getName())) {
seatunnelField[fieldIndex] = dynamicField.toString();
continue;
}
SeaTunnelDataType<?> seaTunnelDataType = typeInfo.getFieldType(fieldIndex);
Object filedValues = fieldValuesMap.get(fieldNames[fieldIndex]);
switch (seaTunnelDataType.getSqlType()) {
case STRING:
seatunnelField[fieldIndex] = filedValues.toString();
break;
case BOOLEAN:
if (filedValues instanceof Boolean) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Boolean.valueOf(filedValues.toString());
}
break;
case TINYINT:
if (filedValues instanceof Byte) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Byte.parseByte(filedValues.toString());
}
break;
case SMALLINT:
if (filedValues instanceof Short) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Short.parseShort(filedValues.toString());
}
case INT:
if (filedValues instanceof Integer) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Integer.valueOf(filedValues.toString());
}
break;
case BIGINT:
if (filedValues instanceof Long) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Long.parseLong(filedValues.toString());
}
break;
case FLOAT:
if (filedValues instanceof Float) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Float.parseFloat(filedValues.toString());
}
break;
case DOUBLE:
if (filedValues instanceof Double) {
seatunnelField[fieldIndex] = filedValues;
} else {
seatunnelField[fieldIndex] = Double.parseDouble(filedValues.toString());
}
break;
case ARRAY:
if (filedValues instanceof List) {
List<?> list = (List<?>) filedValues;
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelDataType;
SqlType elementType = arrayType.getElementType().getSqlType();
switch (elementType) {
case STRING:
String[] arrays = new String[list.size()];
for (int i = 0; i < list.size(); i++) {
arrays[i] = list.get(i).toString();
}
seatunnelField[fieldIndex] = arrays;
break;
case BOOLEAN:
Boolean[] booleanArrays = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
booleanArrays[i] = Boolean.valueOf(list.get(i).toString());
}
seatunnelField[fieldIndex] = booleanArrays;
break;
case TINYINT:
Byte[] byteArrays = new Byte[list.size()];
for (int i = 0; i < list.size(); i++) {
byteArrays[i] = Byte.parseByte(list.get(i).toString());
}
seatunnelField[fieldIndex] = byteArrays;
break;
case SMALLINT:
Short[] shortArrays = new Short[list.size()];
for (int i = 0; i < list.size(); i++) {
shortArrays[i] = Short.parseShort(list.get(i).toString());
}
seatunnelField[fieldIndex] = shortArrays;
break;
case INT:
Integer[] intArrays = new Integer[list.size()];
for (int i = 0; i < list.size(); i++) {
intArrays[i] = Integer.valueOf(list.get(i).toString());
}
seatunnelField[fieldIndex] = intArrays;
break;
case BIGINT:
Long[] longArrays = new Long[list.size()];
for (int i = 0; i < list.size(); i++) {
longArrays[i] = Long.parseLong(list.get(i).toString());
}
seatunnelField[fieldIndex] = longArrays;
break;
case FLOAT:
Float[] floatArrays = new Float[list.size()];
for (int i = 0; i < list.size(); i++) {
floatArrays[i] = Float.parseFloat(list.get(i).toString());
}
seatunnelField[fieldIndex] = floatArrays;
break;
case DOUBLE:
Double[] doubleArrays = new Double[list.size()];
for (int i = 0; i < list.size(); i++) {
doubleArrays[i] = Double.parseDouble(list.get(i).toString());
}
seatunnelField[fieldIndex] = doubleArrays;
break;
default:
throw new MilvusConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected array value: " + filedValues);
}
} else {
throw new MilvusConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected array value: " + filedValues);
}
break;
case FLOAT_VECTOR:
if (filedValues instanceof List) {
List list = (List) filedValues;
Float[] arrays = new Float[list.size()];
for (int i = 0; i < list.size(); i++) {
arrays[i] = Float.parseFloat(list.get(i).toString());
}
seatunnelField[fieldIndex] = BufferUtils.toByteBuffer(arrays);
break;
} else {
throw new MilvusConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected vector value: " + filedValues);
}
case BINARY_VECTOR:
case FLOAT16_VECTOR:
case BFLOAT16_VECTOR:
if (filedValues instanceof ByteBuffer) {
seatunnelField[fieldIndex] = filedValues;
break;
} else {
throw new MilvusConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected vector value: " + filedValues);
}
case SPARSE_FLOAT_VECTOR:
if (filedValues instanceof Map) {
seatunnelField[fieldIndex] = filedValues;
break;
} else {
throw new MilvusConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected vector value: " + filedValues);
}
default:
throw new MilvusConnectorException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unexpected value: " + seaTunnelDataType.getSqlType().name());
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(seatunnelField);
seaTunnelRow.setTableId(tablePath.getFullName());
seaTunnelRow.setRowKind(RowKind.INSERT);
return seaTunnelRow;
}