in fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java [269:393]
public LogAppendInfo putAsLeader(KvRecordBatch kvRecords, @Nullable int[] targetColumns)
throws Exception {
return inWriteLock(
kvLock,
() -> {
rocksDBKv.checkIfRocksDBClosed();
short schemaId = kvRecords.schemaId();
RowMerger currentMerger = rowMerger.configureTargetColumns(targetColumns);
RowType rowType = schema.getRowType();
WalBuilder walBuilder = createWalBuilder(schemaId, rowType);
walBuilder.setWriterState(kvRecords.writerId(), kvRecords.batchSequence());
// get offset to track the offset corresponded to the kv record
long logEndOffsetOfPrevBatch = logTablet.localLogEndOffset();
DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
try {
long logOffset = logEndOffsetOfPrevBatch;
// TODO: reuse the read context and decoder
KvRecordBatch.ReadContext readContext =
KvRecordReadContext.createReadContext(kvFormat, fieldTypes);
ValueDecoder valueDecoder =
new ValueDecoder(readContext.getRowDecoder(schemaId));
for (KvRecord kvRecord : kvRecords.records(readContext)) {
byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey());
KvPreWriteBuffer.Key key = KvPreWriteBuffer.Key.of(keyBytes);
if (kvRecord.getRow() == null) {
if (!rowMerger.supportsDelete()) {
// skip delete rows if the merger doesn't support yet
continue;
}
// it's for deletion
byte[] oldValue = getFromBufferOrKv(key);
if (oldValue == null) {
// there might be large amount of such deletion, so we don't log
LOG.debug(
"The specific key can't be found in kv tablet although the kv record is for deletion, "
+ "ignore it directly as it doesn't exist in the kv tablet yet.");
} else {
BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
BinaryRow newRow = currentMerger.delete(oldRow);
// if newRow is null, it means the row should be deleted
if (newRow == null) {
walBuilder.append(ChangeType.DELETE, oldRow);
kvPreWriteBuffer.delete(key, logOffset++);
} else {
// otherwise, it's a partial update, should produce -U,+U
walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
kvPreWriteBuffer.put(
key,
ValueEncoder.encodeValue(schemaId, newRow),
logOffset + 1);
logOffset += 2;
}
}
} else {
// upsert operation
byte[] oldValue = getFromBufferOrKv(key);
// it's update
if (oldValue != null) {
BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row;
BinaryRow newRow =
currentMerger.merge(oldRow, kvRecord.getRow());
if (newRow == oldRow) {
// newRow is the same to oldRow, means nothing
// happens (no update/delete), and input should be ignored
continue;
}
walBuilder.append(ChangeType.UPDATE_BEFORE, oldRow);
walBuilder.append(ChangeType.UPDATE_AFTER, newRow);
// logOffset is for -U, logOffset + 1 is for +U, we need to use
// the log offset for +U
kvPreWriteBuffer.put(
key,
ValueEncoder.encodeValue(schemaId, newRow),
logOffset + 1);
logOffset += 2;
} else {
// it's insert
// TODO: we should add guarantees that all non-specified columns
// of the input row are set to null.
BinaryRow newRow = kvRecord.getRow();
walBuilder.append(ChangeType.INSERT, newRow);
kvPreWriteBuffer.put(
key,
ValueEncoder.encodeValue(schemaId, newRow),
logOffset++);
}
}
}
// There will be a situation that these batches of kvRecordBatch have not
// generated any CDC logs, for example, when client attempts to delete
// some non-existent keys or MergeEngineType set to FIRST_ROW. In this case,
// we cannot simply return, as doing so would cause a
// OutOfOrderSequenceException problem. Therefore, here we will build an
// empty batch with lastLogOffset to 0L as the baseLogOffset is 0L. As doing
// that, the logOffsetDelta in logRecordBatch will be set to 0L. So, we will
// put a batch into file with recordCount 0 and offset plus 1L, it will
// update the batchSequence corresponding to the writerId and also increment
// the CDC log offset by 1.
LogAppendInfo logAppendInfo = logTablet.appendAsLeader(walBuilder.build());
// if the batch is duplicated, we should truncate the kvPreWriteBuffer
// already written.
if (logAppendInfo.duplicated()) {
kvPreWriteBuffer.truncateTo(
logEndOffsetOfPrevBatch, TruncateReason.DUPLICATED);
}
return logAppendInfo;
} catch (Throwable t) {
// While encounter error here, the CDC logs may fail writing to disk,
// and the client probably will resend the batch. If we do not remove the
// values generated by the erroneous batch from the kvPreWriteBuffer, the
// retry-send batch will produce incorrect CDC logs.
// TODO for some errors, the cdc logs may already be written to disk, for
// those errors, we should not truncate the kvPreWriteBuffer.
kvPreWriteBuffer.truncateTo(logEndOffsetOfPrevBatch, TruncateReason.ERROR);
throw t;
} finally {
// deallocate the memory and arrow writer used by the wal builder
walBuilder.deallocate();
}
});
}