Status RowOperationsPBDecoder::DecodeUpdateOrDelete()

in src/kudu/common/row_operations.cc [572:720]


Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& mapping,
                                                    DecodedRowOperation* op) {
  const uint8_t* client_isset_map = nullptr;
  const uint8_t* client_non_null_map = nullptr;

  // Read the null and isset bitmaps for the client-provided row.
  RETURN_NOT_OK(ReadIssetBitmap(&client_isset_map));
  if (client_schema_->has_nullables()) {
    RETURN_NOT_OK(ReadNonNullBitmap(&client_non_null_map));
  }

  // Allocate space for the row key.
  auto* rowkey_storage = reinterpret_cast<uint8_t*>(
      dst_arena_->AllocateBytesAligned(tablet_schema_->key_byte_size(), 8));
  if (PREDICT_FALSE(!rowkey_storage)) {
    return Status::RuntimeError("Out of memory");
  }

  // We're passing the full schema instead of the key schema here.
  // That's OK because the keys come at the bottom. We lose some bounds
  // checking in debug builds, but it avoids an extra copy of the key schema.
  ContiguousRow rowkey(tablet_schema_, rowkey_storage);

  // First process the key columns.
  size_t client_col_idx = 0;
  for (; client_col_idx < client_schema_->num_key_columns(); client_col_idx++) {
    // Look up the corresponding column from the tablet. We use the server-side
    // ColumnSchema object since it has the most up-to-date default, nullability,
    // etc.
    if (client_schema_ != tablet_schema_) {
      DCHECK_EQ(mapping.client_to_tablet_idx(client_col_idx),
                client_col_idx) << "key columns should match";
    }
    size_t tablet_col_idx = client_col_idx;

    const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
    if (PREDICT_FALSE(!BitmapTest(client_isset_map, client_col_idx))) {
      op->SetFailureStatusOnce(Status::InvalidArgument("No value provided for key column",
                                                       col.ToString()));
      continue;
    }

    bool client_set_to_null = client_schema_->has_nullables() &&
        BitmapTest(client_non_null_map, client_col_idx);
    if (PREDICT_FALSE(client_set_to_null)) {
      op->SetFailureStatusOnce(Status::InvalidArgument("NULL values not allowed for key column",
                                                       col.ToString()));
      RETURN_NOT_OK(ReadColumnAndDiscard(col));
      continue;
    }

    RETURN_NOT_OK(ReadColumn(col, rowkey.mutable_cell_ptr(tablet_col_idx), nullptr));
  }
  op->row_data = rowkey_storage;

  // Now we process the rest of the columns:
  // For UPDATE, we expect at least one other column to be set, indicating the
  // update to perform.
  // For DELETE, we expect no other columns to be set (and we verify that).
  Status row_status;
  if (op->type == RowOperationsPB::UPDATE || op->type == RowOperationsPB::UPDATE_IGNORE) {
    faststring buf;
    RowChangeListEncoder rcl_encoder(&buf);

    // Now process the rest of columns as updates.
    DCHECK_EQ(client_schema_->num_key_columns(), client_col_idx);
    for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
      size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
      const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);

      if (BitmapTest(client_isset_map, client_col_idx)) {
        bool client_set_to_null = client_schema_->has_nullables() &&
          BitmapTest(client_non_null_map, client_col_idx);

        if (col.is_immutable()) {
          if (op->type == RowOperationsPB::UPDATE) {
            op->SetFailureStatusOnce(
                Status::Immutable("UPDATE not allowed for immutable column", col.ToString()));
          } else {
            DCHECK_EQ(RowOperationsPB::UPDATE_IGNORE, op->type);
            op->error_ignored = true;
          }
          if (!client_set_to_null) {
            RETURN_NOT_OK(ReadColumnAndDiscard(col));
          }
          // Use 'continue' not 'break' to consume the rest row data.
          continue;
        }

        uint8_t scratch[kLargestTypeSize];
        uint8_t* val_to_add = nullptr;
        if (!client_set_to_null) {
          RETURN_NOT_OK(ReadColumn(col, scratch, &row_status));
          if (PREDICT_FALSE(!row_status.ok())) {
            op->SetFailureStatusOnce(row_status);
          }
          val_to_add = scratch;
        } else if (PREDICT_FALSE(!col.is_nullable())) {
          op->SetFailureStatusOnce(Status::InvalidArgument(
              "NULL value not allowed for non-nullable column", col.ToString()));
          RETURN_NOT_OK(ReadColumnAndDiscard(col));
          continue;
        }

        rcl_encoder.AddColumnUpdate(col, tablet_schema_->column_id(tablet_col_idx), val_to_add);
      }
    }

    if (PREDICT_FALSE(buf.size() == 0)) {
      // No actual column updates specified!
      op->SetFailureStatusOnce(Status::InvalidArgument("No fields updated, key is",
                                                       tablet_schema_->DebugRowKey(rowkey)));
    }

    if (PREDICT_TRUE(op->result.ok())) {
      // Copy the row-changelist to the arena.
      auto* rcl_in_arena = reinterpret_cast<uint8_t*>(
          dst_arena_->AllocateBytesAligned(buf.size(), 8));
      if (PREDICT_FALSE(rcl_in_arena == nullptr)) {
        return Status::RuntimeError("Out of memory allocating RCL");
      }
      memcpy(rcl_in_arena, buf.data(), buf.size());
      op->changelist = RowChangeList(Slice(rcl_in_arena, buf.size()));
    }
  } else if (op->type == RowOperationsPB::DELETE || op->type == RowOperationsPB::DELETE_IGNORE) {
    // Ensure that no other columns are set.
    for (; client_col_idx < client_schema_->num_columns(); client_col_idx++) {
      if (PREDICT_FALSE(BitmapTest(client_isset_map, client_col_idx))) {
        size_t tablet_col_idx = GetTabletColIdx(mapping, client_col_idx);
        const ColumnSchema& col = tablet_schema_->column(tablet_col_idx);
        op->SetFailureStatusOnce(Status::InvalidArgument(
            "DELETE should not have a value for column", col.ToString()));

        bool client_set_to_null = client_schema_->has_nullables() &&
            BitmapTest(client_non_null_map, client_col_idx);
        if (!client_set_to_null || !col.is_nullable()) {
          RETURN_NOT_OK(ReadColumnAndDiscard(col));
        }
      }
    }
    if (PREDICT_TRUE(op->result.ok())) {
      op->changelist = RowChangeList::CreateDelete();
    }
  } else {
    LOG(FATAL) << "Should only call this method with UPDATE or DELETE";
  }

  return Status::OK();
}