in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java [72:124]
private static void serializeAttributeValue(AttributeValue value, DataOutputStream out)
throws IOException {
if (value.nul() != null) {
out.writeByte(DynamoDbType.NULL.toByteValue());
} else if (value.bool() != null) {
out.writeByte(DynamoDbType.BOOLEAN.toByteValue());
out.writeBoolean(value.bool());
} else if (value.s() != null) {
out.writeByte(DynamoDbType.STRING.toByteValue());
out.writeUTF(value.s());
} else if (value.n() != null) {
out.writeByte(DynamoDbType.NUMBER.toByteValue());
out.writeUTF(value.n());
} else if (value.b() != null) {
byte[] bytes = value.b().asByteArrayUnsafe();
out.writeByte(DynamoDbType.BINARY.toByteValue());
out.writeInt(bytes.length);
out.write(bytes);
} else if (value.hasSs()) {
out.writeByte(DynamoDbType.STRING_SET.toByteValue());
out.writeInt(value.ss().size());
for (String s : value.ss()) {
out.writeUTF(s);
}
} else if (value.hasNs()) {
out.writeByte(DynamoDbType.NUMBER_SET.toByteValue());
out.writeInt(value.ns().size());
for (String s : value.ns()) {
out.writeUTF(s);
}
} else if (value.hasBs()) {
out.writeByte(DynamoDbType.BINARY_SET.toByteValue());
out.writeInt(value.bs().size());
for (SdkBytes sdkBytes : value.bs()) {
byte[] bytes = sdkBytes.asByteArrayUnsafe();
out.writeInt(bytes.length);
out.write(bytes);
}
} else if (value.hasL()) {
out.writeByte(DynamoDbType.LIST.toByteValue());
List<AttributeValue> l = value.l();
out.writeInt(l.size());
for (AttributeValue attributeValue : l) {
serializeAttributeValue(attributeValue, out);
}
} else if (value.hasM()) {
out.writeByte(DynamoDbType.MAP.toByteValue());
Map<String, AttributeValue> m = value.m();
serializeItem(m, out);
} else {
throw new IllegalArgumentException("Attribute value must not be empty: " + value);
}
}