in flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorStateSerializer.java [79:121]
public DynamoDbStreamsSourceEnumeratorState deserialize(
int version, byte[] serializedEnumeratorState) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serializedEnumeratorState);
DataInputStream in = new DataInputStream(bais)) {
if (version != getVersion()) {
throw new VersionMismatchException(
"Trying to deserialize DynamoDbStreamsSourceEnumeratorState serialized with unsupported version "
+ version
+ ". Serializer version is "
+ getVersion());
}
final int numKnownSplits = in.readInt();
final int splitSerializerVersion = in.readInt();
List<DynamoDBStreamsShardSplitWithAssignmentStatus> knownSplits =
new ArrayList<>(numKnownSplits);
for (int i = 0; i < numKnownSplits; i++) {
int serializedLength = in.readInt();
byte[] serializedSplit = new byte[serializedLength];
if (in.read(serializedSplit) != -1) {
DynamoDbStreamsShardSplit deserializedSplit =
splitSerializer.deserialize(splitSerializerVersion, serializedSplit);
SplitAssignmentStatus assignmentStatus =
SplitAssignmentStatus.fromStatusCode(in.readInt());
knownSplits.add(
new DynamoDBStreamsShardSplitWithAssignmentStatus(
deserializedSplit, assignmentStatus));
} else {
throw new IOException(
"Unexpectedly reading more bytes than is present in stream.");
}
}
Instant startTimestamp = Instant.ofEpochMilli(in.readLong());
if (in.available() > 0) {
throw new IOException("Unexpected trailing bytes when deserializing.");
}
return new DynamoDbStreamsSourceEnumeratorState(knownSplits, startTimestamp);
}
}