in src/kudu/common/row_operations.cc [572:720]
Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& mapping,
DecodedRowOperation* op) {
const uint8_t* client_isset_map = nullptr;
const uint8_t* client_non_null_map = nullptr;
// Read the null and isset bitmaps for the client-provided row.
RETURN_NOT_OK(ReadIssetBitmap(&client_isset_map));
if (client_schema_->has_nullables()) {
RETURN_NOT_OK(ReadNonNullBitmap(&client_non_null_map));
}
// Allocate space for the row key.
auto* rowkey_storage = reinterpret_cast<uint8_t*>(
dst_arena_->AllocateBytesAligned(tablet_schema_->key_byte_size(), 8));
if (PREDICT_FALSE(!rowkey_storage)) {
return Status::RuntimeError("Out of memory");
}
// We're passing the full schema instead of the key schema here.
// That's OK because the keys come at the bottom. We lose some bounds
// checking in debug builds, but it avoids an extra copy of the key schema.
ContiguousRow rowkey(tablet_schema_, rowkey_storage);
// First process the key columns.
size_t client_col_idx = 0;
for (; client_col_idx < client_schema_->num_key_columns(); client_col_idx++) {
// Look up the corresponding column from the tablet. We use the server-side
// ColumnSchema object since it has the most up-to-date default, nullability,
// etc.
if (client_schema_ != tablet_schema_) {
DCHECK_EQ(mapping.client_to_tablet_idx(client_col_idx),
client_col_idx) << "key columns should match";
}
size_t tablet_col_idx = client_col_idx;
const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
if (PREDICT_FALSE(!BitmapTest(client_isset_map, client_col_idx))) {
op->SetFailureStatusOnce(Status::InvalidArgument("No value provided for key column",
col.ToString()));
continue;
}
bool client_set_to_null = client_schema_->has_nullables() &&
BitmapTest(client_non_null_map, client_col_idx);
if (PREDICT_FALSE(client_set_to_null)) {
op->SetFailureStatusOnce(Status::InvalidArgument("NULL values not allowed for key column",
col.ToString()));
RETURN_NOT_OK(ReadColumnAndDiscard(col));
continue;
}
RETURN_NOT_OK(ReadColumn(col, rowkey.mutable_cell_ptr(tablet_col_idx), nullptr));
}
op->row_data = rowkey_storage;
// Now we process the rest of the columns:
// For UPDATE, we expect at least one other column to be set, indicating the
// update to perform.
// For DELETE, we expect no other columns to be set (and we verify that).
Status row_status;
if (op->type == RowOperationsPB::UPDATE || op->type == RowOperationsPB::UPDATE_IGNORE) {
faststring buf;
RowChangeListEncoder rcl_encoder(&buf);
// Now process the rest of columns as updates.
DCHECK_EQ(client_schema_->num_key_columns(), client_col_idx);
for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
if (BitmapTest(client_isset_map, client_col_idx)) {
bool client_set_to_null = client_schema_->has_nullables() &&
BitmapTest(client_non_null_map, client_col_idx);
if (col.is_immutable()) {
if (op->type == RowOperationsPB::UPDATE) {
op->SetFailureStatusOnce(
Status::Immutable("UPDATE not allowed for immutable column", col.ToString()));
} else {
DCHECK_EQ(RowOperationsPB::UPDATE_IGNORE, op->type);
op->error_ignored = true;
}
if (!client_set_to_null) {
RETURN_NOT_OK(ReadColumnAndDiscard(col));
}
// Use 'continue' not 'break' to consume the rest row data.
continue;
}
uint8_t scratch[kLargestTypeSize];
uint8_t* val_to_add = nullptr;
if (!client_set_to_null) {
RETURN_NOT_OK(ReadColumn(col, scratch, &row_status));
if (PREDICT_FALSE(!row_status.ok())) {
op->SetFailureStatusOnce(row_status);
}
val_to_add = scratch;
} else if (PREDICT_FALSE(!col.is_nullable())) {
op->SetFailureStatusOnce(Status::InvalidArgument(
"NULL value not allowed for non-nullable column", col.ToString()));
RETURN_NOT_OK(ReadColumnAndDiscard(col));
continue;
}
rcl_encoder.AddColumnUpdate(col, tablet_schema_->column_id(tablet_col_idx), val_to_add);
}
}
if (PREDICT_FALSE(buf.size() == 0)) {
// No actual column updates specified!
op->SetFailureStatusOnce(Status::InvalidArgument("No fields updated, key is",
tablet_schema_->DebugRowKey(rowkey)));
}
if (PREDICT_TRUE(op->result.ok())) {
// Copy the row-changelist to the arena.
auto* rcl_in_arena = reinterpret_cast<uint8_t*>(
dst_arena_->AllocateBytesAligned(buf.size(), 8));
if (PREDICT_FALSE(rcl_in_arena == nullptr)) {
return Status::RuntimeError("Out of memory allocating RCL");
}
memcpy(rcl_in_arena, buf.data(), buf.size());
op->changelist = RowChangeList(Slice(rcl_in_arena, buf.size()));
}
} else if (op->type == RowOperationsPB::DELETE || op->type == RowOperationsPB::DELETE_IGNORE) {
// Ensure that no other columns are set.
for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
if (PREDICT_FALSE(BitmapTest(client_isset_map, client_col_idx))) {
size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
op->SetFailureStatusOnce(Status::InvalidArgument(
"DELETE should not have a value for column", col.ToString()));
bool client_set_to_null = client_schema_->has_nullables() &&
BitmapTest(client_non_null_map, client_col_idx);
if (!client_set_to_null || !col.is_nullable()) {
RETURN_NOT_OK(ReadColumnAndDiscard(col));
}
}
}
if (PREDICT_TRUE(op->result.ok())) {
op->changelist = RowChangeList::CreateDelete();
}
} else {
LOG(FATAL) << "Should only call this method with UPDATE or DELETE";
}
return Status::OK();
}