in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/RowDataKinesisDeserializationSchema.java [111:138]
public RowData deserialize(
byte[] recordValue,
String partitionKey,
String seqNum,
long approxArrivalTimestamp,
String stream,
String shardId)
throws IOException {
RowData physicalRow = physicalDeserializer.deserialize(recordValue);
GenericRowData metadataRow = new GenericRowData(requestedMetadataFields.size());
for (int i = 0; i < metadataRow.getArity(); i++) {
Metadata metadataField = requestedMetadataFields.get(i);
if (metadataField == Metadata.Timestamp) {
metadataRow.setField(i, TimestampData.fromEpochMillis(approxArrivalTimestamp));
} else if (metadataField == Metadata.SequenceNumber) {
metadataRow.setField(i, StringData.fromString(seqNum));
} else if (metadataField == Metadata.ShardId) {
metadataRow.setField(i, StringData.fromString(shardId));
} else {
String msg = String.format("Unsupported metadata key %s", metadataField);
throw new RuntimeException(msg); // should never happen
}
}
return new JoinedRowData(physicalRow.getRowKind(), physicalRow, metadataRow);
}