public LogAppendInfo putAsLeader()

in fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java [269:393]


    public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns)
            throws Exception {
        return inWriteLock(
                kvLock,
                () -> {
                    rocksDBKv.checkIfRocksDBClosed();
                    short schemaId = kvRecords.schemaId();
                    RowMerger currentMerger = rowMerger.configureTargetColumns(targetColumns);
                    RowType rowType = schema.getRowType();
                    WalBuilder walBuilder = createWalBuilder(schemaId, rowType);
                    walBuilder.setWriterState(kvRecords.writerId(), kvRecords.batchSequence());
                    // get offset to track the offset corresponded to the kv record
                    long logEndOffsetOfPrevBatch = logTablet.localLogEndOffset();
                    DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
                    try {
                        long logOffset = logEndOffsetOfPrevBatch;

                        // TODO: reuse the read context and decoder
                        KvRecordBatch.ReadContext readContext =
                                KvRecordReadContext.createReadContext(kvFormat, fieldTypes);
                        ValueDecoder valueDecoder =
                                new ValueDecoder(readContext.getRowDecoder(schemaId));
                        for (KvRecord kvRecord : kvRecords.records(readContext)) {
                            byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
                            KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
                            if (kvRecord.getRow() == null) {
                                if (!rowMerger.supportsDelete()) {
                                    // skip delete rows if the merger doesn't support yet
                                    continue;
                                }
                                // it's for deletion
                                byte[] oldValue = getFromBufferOrKv(key);
                                if (oldValue == null) {
                                    // there might be large amount of such deletion, so we don't log
                                    LOG.debug(
                                            "The specific key can't be found in kv tablet although the kv record is for deletion, "
                                                    + "ignore it directly as it doesn't exist in the kv tablet yet.");
                                } else {
                                    BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
                                    BinaryRow newRow = currentMerger.delete(oldRow);
                                    // if newRow is null, it means the row should be deleted
                                    if (newRow == null) {
                                        walBuilder.append(ChangeType.DELETE, oldRow);
                                        kvPreWriteBuffer.delete(key, logOffset++);
                                    } else {
                                        // otherwise, it's a partial update, should produce -U,+U
                                        walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
                                        walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
                                        kvPreWriteBuffer.put(
                                                key,
                                                ValueEncoder.encodeValue(schemaId, newRow),
                                                logOffset + 1);
                                        logOffset += 2;
                                    }
                                }
                            } else {
                                // upsert operation
                                byte[] oldValue = getFromBufferOrKv(key);
                                // it's update
                                if (oldValue != null) {
                                    BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
                                    BinaryRow newRow =
                                            currentMerger.merge(oldRow, kvRecord.getRow());
                                    if (newRow == oldRow) {
                                        // newRow is the same to oldRow, means nothing
                                        // happens (no update/delete), and input should be ignored
                                        continue;
                                    }
                                    walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
                                    walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
                                    // logOffset is for -U, logOffset + 1 is for +U, we need to use
                                    // the log offset for +U
                                    kvPreWriteBuffer.put(
                                            key,
                                            ValueEncoder.encodeValue(schemaId, newRow),
                                            logOffset + 1);
                                    logOffset += 2;
                                } else {
                                    // it's insert
                                    // TODO: we should add guarantees that all non-specified columns
                                    //  of the input row are set to null.
                                    BinaryRow newRow = kvRecord.getRow();
                                    walBuilder.append(ChangeType.INSERT, newRow);
                                    kvPreWriteBuffer.put(
                                            key,
                                            ValueEncoder.encodeValue(schemaId, newRow),
                                            logOffset++);
                                }
                            }
                        }

                        // There will be a situation that these batches of kvRecordBatch have not
                        // generated any CDC logs, for example, when client attempts to delete
                        // some non-existent keys or MergeEngineType set to FIRST_ROW. In this case,
                        // we cannot simply return, as doing so would cause a
                        // OutOfOrderSequenceException problem. Therefore, here we will build an
                        // empty batch with lastLogOffset to 0L as the baseLogOffset is 0L. As doing
                        // that, the logOffsetDelta in logRecordBatch will be set to 0L. So, we will
                        // put a batch into file with recordCount 0 and offset plus 1L, it will
                        // update the batchSequence corresponding to the writerId and also increment
                        // the CDC log offset by 1.
                        LogAppendInfo logAppendInfo = logTablet.appendAsLeader(walBuilder.build());

                        // if the batch is duplicated, we should truncate the kvPreWriteBuffer
                        // already written.
                        if (logAppendInfo.duplicated()) {
                            kvPreWriteBuffer.truncateTo(
                                    logEndOffsetOfPrevBatch, TruncateReason.DUPLICATED);
                        }
                        return logAppendInfo;
                    } catch (Throwable t) {
                        // While encounter error here, the CDC logs may fail writing to disk,
                        // and the client probably will resend the batch. If we do not remove the
                        // values generated by the erroneous batch from the kvPreWriteBuffer, the
                        // retry-send batch will produce incorrect CDC logs.
                        // TODO for some errors, the cdc logs may already be written to disk, for
                        //  those errors, we should not truncate the kvPreWriteBuffer.
                        kvPreWriteBuffer.truncateTo(logEndOffsetOfPrevBatch, TruncateReason.ERROR);
                        throw t;
                    } finally {
                        // deallocate the memory and arrow writer used by the wal builder
                        walBuilder.deallocate();
                    }
                });
    }