in seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowConverter.java [357:497]
public static InternalRow reconvert(
SeaTunnelRow seaTunnelRow,
SeaTunnelRowType seaTunnelRowType,
TableSchema sinkTableSchema) {
List<DataField> sinkTotalFields = sinkTableSchema.fields();
int sourceTotalFields = seaTunnelRowType.getTotalFields();
if (sourceTotalFields != sinkTotalFields.size()) {
throw CommonError.writeRowErrorWithFiledsCountNotMatch(
"Paimon", sourceTotalFields, sinkTotalFields.size());
}
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
// Convert SeaTunnel RowKind to Paimon RowKind
org.apache.paimon.types.RowKind rowKind =
RowKindConverter.convertSeaTunnelRowKind2PaimonRowKind(seaTunnelRow.getRowKind());
if (rowKind == null) {
throw CommonError.unsupportedRowKind(
PaimonBaseOptions.CONNECTOR_IDENTITY,
seaTunnelRow.getRowKind().shortString(),
seaTunnelRow.getTableId());
}
binaryRow.setRowKind(rowKind);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
// judge the field is or not equals null
if (seaTunnelRow.getField(i) == null) {
binaryWriter.setNullAt(i);
continue;
}
checkCanWriteWithSchema(i, seaTunnelRowType, sinkTotalFields);
String fieldName = seaTunnelRowType.getFieldName(i);
switch (fieldTypes[i].getSqlType()) {
case TINYINT:
binaryWriter.writeByte(i, (Byte) seaTunnelRow.getField(i));
break;
case SMALLINT:
binaryWriter.writeShort(i, (Short) seaTunnelRow.getField(i));
break;
case INT:
binaryWriter.writeInt(i, (Integer) seaTunnelRow.getField(i));
break;
case BIGINT:
binaryWriter.writeLong(i, (Long) seaTunnelRow.getField(i));
break;
case FLOAT:
binaryWriter.writeFloat(i, (Float) seaTunnelRow.getField(i));
break;
case DOUBLE:
binaryWriter.writeDouble(i, (Double) seaTunnelRow.getField(i));
break;
case DECIMAL:
DataField decimalDataField =
SchemaUtil.getDataField(sinkTotalFields, fieldName);
org.apache.paimon.types.DecimalType decimalType =
(org.apache.paimon.types.DecimalType) decimalDataField.type();
binaryWriter.writeDecimal(
i,
Decimal.fromBigDecimal(
(BigDecimal) seaTunnelRow.getField(i),
decimalType.getPrecision(),
decimalType.getScale()),
decimalType.getPrecision());
break;
case STRING:
binaryWriter.writeString(
i, BinaryString.fromString((String) seaTunnelRow.getField(i)));
break;
case BYTES:
binaryWriter.writeBinary(i, (byte[]) seaTunnelRow.getField(i));
break;
case BOOLEAN:
binaryWriter.writeBoolean(i, (Boolean) seaTunnelRow.getField(i));
break;
case DATE:
LocalDate date = (LocalDate) seaTunnelRow.getField(i);
BinaryWriter.createValueSetter(DataTypes.DATE())
.setValue(binaryWriter, i, DateTimeUtils.toInternal(date));
break;
case TIMESTAMP:
DataField dataField = SchemaUtil.getDataField(sinkTotalFields, fieldName);
int precision = ((TimestampType) dataField.type()).getPrecision();
LocalDateTime datetime = (LocalDateTime) seaTunnelRow.getField(i);
binaryWriter.writeTimestamp(
i, Timestamp.fromLocalDateTime(datetime), precision);
break;
case TIME:
LocalTime time = (LocalTime) seaTunnelRow.getField(i);
BinaryWriter.createValueSetter(DataTypes.TIME())
.setValue(binaryWriter, i, DateTimeUtils.toInternal(time));
break;
case MAP:
MapType<?, ?> mapType = (MapType<?, ?>) seaTunnelRowType.getFieldType(i);
SeaTunnelDataType<?> keyType = mapType.getKeyType();
SeaTunnelDataType<?> valueType = mapType.getValueType();
DataType paimonKeyType = RowTypeConverter.reconvert(fieldName, keyType);
DataType paimonValueType = RowTypeConverter.reconvert(fieldName, valueType);
Map<?, ?> field = (Map<?, ?>) seaTunnelRow.getField(i);
Object[] keys = field.keySet().toArray(new Object[0]);
Object[] values = field.values().toArray(new Object[0]);
binaryWriter.writeMap(
i,
BinaryMap.valueOf(
reconvert(fieldName, keys, keyType),
reconvert(fieldName, values, valueType)),
new InternalMapSerializer(paimonKeyType, paimonValueType));
break;
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) seaTunnelRowType.getFieldType(i);
BinaryArray paimonArray =
reconvert(
fieldName,
seaTunnelRow.getField(i),
arrayType.getElementType());
binaryWriter.writeArray(
i,
paimonArray,
new InternalArraySerializer(
RowTypeConverter.reconvert(
fieldName, arrayType.getElementType())));
break;
case ROW:
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Object row = seaTunnelRow.getField(i);
InternalRow paimonRow =
reconvert(
(SeaTunnelRow) row,
(SeaTunnelRowType) rowType,
sinkTableSchema);
RowType paimonRowType =
RowTypeConverter.reconvert((SeaTunnelRowType) rowType, sinkTableSchema);
binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType));
break;
default:
throw CommonError.unsupportedDataType(
PaimonBaseOptions.CONNECTOR_IDENTITY,
seaTunnelRowType.getFieldType(i).getSqlType().toString(),
fieldName);
}
}
return binaryRow;
}