in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java [103:178]
public static void upsertBinlog(
Map<Struct, List<SourceRecord>> snapshotRecords,
SourceRecord binlogRecord,
RowType splitBoundaryType,
SchemaNameAdjuster nameAdjuster,
Object[] splitStart,
Object[] splitEnd) {
if (isDataChangeRecord(binlogRecord)) {
Struct value = (Struct) binlogRecord.value();
if (value != null) {
Struct chunkKeyStruct = getStructContainsChunkKey(binlogRecord);
if (splitKeyRangeContains(
getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct),
splitStart,
splitEnd)) {
boolean hasPrimaryKey = binlogRecord.key() != null;
Envelope.Operation operation =
Envelope.Operation.forCode(
value.getString(Envelope.FieldName.OPERATION));
switch (operation) {
case CREATE:
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? (Struct) binlogRecord.key()
: createReadOpValue(
binlogRecord, Envelope.FieldName.AFTER),
false);
break;
case UPDATE:
Struct structFromAfter =
createReadOpValue(binlogRecord, Envelope.FieldName.AFTER);
if (!hasPrimaryKey) {
upsertBinlog(
snapshotRecords,
binlogRecord,
createReadOpValue(binlogRecord, Envelope.FieldName.BEFORE),
true);
if (!splitKeyRangeContains(
getSplitKey(
splitBoundaryType, nameAdjuster, structFromAfter),
splitStart,
splitEnd)) {
LOG.warn(
"The updated chunk key is out of the split range. Cannot provide exactly-once semantics.");
}
}
// If the chunk key changed, we still send here
// This will cause the at-least-once semantics
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey ? (Struct) binlogRecord.key() : structFromAfter,
false);
break;
case DELETE:
upsertBinlog(
snapshotRecords,
binlogRecord,
hasPrimaryKey
? (Struct) binlogRecord.key()
: createReadOpValue(
binlogRecord, Envelope.FieldName.BEFORE),
true);
break;
case READ:
throw new IllegalStateException(
String.format(
"Binlog record shouldn't use READ operation, the the record is %s.",
binlogRecord));
}
}
}
}
}