in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java [144:196]
private static AttributeValue deserializeAttributeValue(
DynamoDbType dynamoDbType, DataInputStream in) throws IOException {
switch (dynamoDbType) {
case NULL:
return AttributeValue.builder().nul(true).build();
case STRING:
return AttributeValue.builder().s(in.readUTF()).build();
case NUMBER:
return AttributeValue.builder().n(in.readUTF()).build();
case BOOLEAN:
return AttributeValue.builder().bool(in.readBoolean()).build();
case BINARY:
int length = in.readInt();
byte[] bytes = new byte[length];
in.read(bytes);
return AttributeValue.builder().b(SdkBytes.fromByteArray(bytes)).build();
case STRING_SET:
int stringSetSize = in.readInt();
List<String> stringSet = new ArrayList<>(stringSetSize);
for (int i = 0; i < stringSetSize; i++) {
stringSet.add(in.readUTF());
}
return AttributeValue.builder().ss(stringSet).build();
case NUMBER_SET:
int numberSetSize = in.readInt();
List<String> numberSet = new ArrayList<>(numberSetSize);
for (int i = 0; i < numberSetSize; i++) {
numberSet.add(in.readUTF());
}
return AttributeValue.builder().ns(numberSet).build();
case BINARY_SET:
int binarySetSize = in.readInt();
List<SdkBytes> byteSet = new ArrayList<>(binarySetSize);
for (int i = 0; i < binarySetSize; i++) {
int byteLength = in.readInt();
byte[] bs = new byte[byteLength];
in.read(bs);
byteSet.add(SdkBytes.fromByteArray(bs));
}
return AttributeValue.builder().bs(byteSet).build();
case LIST:
int listSize = in.readInt();
List<AttributeValue> list = new ArrayList<>(listSize);
for (int i = 0; i < listSize; i++) {
list.add(deserializeAttributeValue(in));
}
return AttributeValue.builder().l(list).build();
case MAP:
return AttributeValue.builder().m(deserializeItem(in)).build();
default:
throw new IllegalArgumentException("Unknown DynamoDbType " + dynamoDbType);
}
}