public static void upsertBinlog()

in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java [103:178]


    public static void upsertBinlog(
            Map<Struct, List<SourceRecord>> snapshotRecords,
            SourceRecord binlogRecord,
            RowType splitBoundaryType,
            SchemaNameAdjuster nameAdjuster,
            Object[] splitStart,
            Object[] splitEnd) {
        if (isDataChangeRecord(binlogRecord)) {
            Struct value = (Struct) binlogRecord.value();
            if (value != null) {
                Struct chunkKeyStruct = getStructContainsChunkKey(binlogRecord);
                if (splitKeyRangeContains(
                        getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct),
                        splitStart,
                        splitEnd)) {
                    boolean hasPrimaryKey = binlogRecord.key() != null;
                    Envelope.Operation operation =
                            Envelope.Operation.forCode(
                                    value.getString(Envelope.FieldName.OPERATION));
                    switch (operation) {
                        case CREATE:
                            upsertBinlog(
                                    snapshotRecords,
                                    binlogRecord,
                                    hasPrimaryKey
                                            ? (Struct) binlogRecord.key()
                                            : createReadOpValue(
                                                    binlogRecord, Envelope.FieldName.AFTER),
                                    false);
                            break;
                        case UPDATE:
                            Struct structFromAfter =
                                    createReadOpValue(binlogRecord, Envelope.FieldName.AFTER);
                            if (!hasPrimaryKey) {
                                upsertBinlog(
                                        snapshotRecords,
                                        binlogRecord,
                                        createReadOpValue(binlogRecord, Envelope.FieldName.BEFORE),
                                        true);
                                if (!splitKeyRangeContains(
                                        getSplitKey(
                                                splitBoundaryType, nameAdjuster, structFromAfter),
                                        splitStart,
                                        splitEnd)) {
                                    LOG.warn(
                                            "The updated chunk key is out of the split range. Cannot provide exactly-once semantics.");
                                }
                            }
                            // If the chunk key changed, we still send here
                            // This will cause the at-least-once semantics
                            upsertBinlog(
                                    snapshotRecords,
                                    binlogRecord,
                                    hasPrimaryKey ? (Struct) binlogRecord.key() : structFromAfter,
                                    false);
                            break;
                        case DELETE:
                            upsertBinlog(
                                    snapshotRecords,
                                    binlogRecord,
                                    hasPrimaryKey
                                            ? (Struct) binlogRecord.key()
                                            : createReadOpValue(
                                                    binlogRecord, Envelope.FieldName.BEFORE),
                                    true);
                            break;
                        case READ:
                            throw new IllegalStateException(
                                    String.format(
                                            "Binlog record shouldn't use READ operation, the the record is %s.",
                                            binlogRecord));
                    }
                }
            }
        }
    }