in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java [48:72]
public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
if (rowDataToAttributeValueConverter == null) {
rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(physicalDataType);
}
DynamoDbWriteRequest.Builder builder =
DynamoDbWriteRequest.builder()
.setItem(rowDataToAttributeValueConverter.convertRowData(element));
switch (element.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
builder.setType(DynamoDbWriteRequestType.PUT);
break;
case DELETE:
builder.setType(DynamoDbWriteRequestType.DELETE);
break;
case UPDATE_BEFORE:
default:
throw new TableException("Unsupported message kind: " + element.getRowKind());
}
return builder.build();
}