CoTryTask UpdateChainOperation::handle()

in src/mgmtd/ops/UpdateChainOperation.cc [8:121]


CoTryTask<UpdateChainRsp> UpdateChainOperation::handle(MgmtdState &state) {
  CO_RETURN_ON_ERROR(state.validateClusterId(*this, req.clusterId));
  CO_RETURN_ON_ERROR(co_await state.validateAdmin(*this, req.user));

  if (req.chainId == 0) {
    CO_RETURN_AND_LOG_OP_ERR(*this, StatusCode::kInvalidArg, "Empty chain id");
  }

  if (req.targetId == 0) {
    CO_RETURN_AND_LOG_OP_ERR(*this, StatusCode::kInvalidArg, "Empty target id");
  }

  if (req.mode != UpdateChainReq::Mode::ADD && req.mode != UpdateChainReq::Mode::REMOVE) {
    CO_RETURN_AND_LOG_OP_ERR(*this,
                             StatusCode::kInvalidArg,
                             "Unsupported mode: {}({})",
                             hf3fs::toStringView(req.mode),
                             static_cast<int>(req.mode));
  }
  bool addTarget = req.mode == UpdateChainReq::Mode::ADD;

  auto handler = [&]() -> CoTryTask<UpdateChainRsp> {
    auto writerLock = co_await state.coScopedLock<"UpdateChain">();

    flat::ChainInfo chainInfo;
    {
      auto dataPtr = co_await state.data_.coLock();
      auto &ri = dataPtr->routingInfo;
      auto it = ri.chains.find(req.chainId);
      if (it == ri.chains.end()) {
        CO_RETURN_AND_LOG_OP_ERR(*this, MgmtdCode::kChainNotFound, "chain: {}", req.chainId.toUnderType());
      }
      auto tit = ri.getTargets().find(req.targetId);
      if (req.mode == UpdateChainReq::Mode::ADD && tit != ri.getTargets().end()) {
        CO_RETURN_AND_LOG_OP_ERR(*this, MgmtdCode::kTargetExisted, "target: {}", req.targetId.toUnderType());
      }
      chainInfo = it->second;
    }

    if (addTarget) {
      // already checked req.targetId is not present in any chain,
      // directly append it into this chain
      flat::ChainTargetInfo cti;
      cti.targetId = req.targetId;
      cti.publicState = flat::PublicTargetState::OFFLINE;

      chainInfo.chainVersion = nextVersion(chainInfo.chainVersion);
      chainInfo.targets.push_back(cti);
      if (chainInfo.preferredTargetOrder.size() == chainInfo.targets.size() - 1) {
        chainInfo.preferredTargetOrder.push_back(req.targetId);
      }
    } else {
      XLOGF_IF(DFATAL, req.mode != UpdateChainReq::Mode::REMOVE, "Invalid mode: {}", static_cast<int>(req.mode));
      auto tit = std::find_if(chainInfo.targets.begin(), chainInfo.targets.end(), [&](const auto &cti) {
        return cti.targetId == req.targetId;
      });
      if (tit == chainInfo.targets.end()) {
        CO_RETURN_AND_LOG_OP_ERR(*this,
                                 MgmtdCode::kTargetNotFound,
                                 "target {} is not found in chain {}",
                                 req.targetId.toUnderType(),
                                 req.chainId.toUnderType());
      }
      if (tit->publicState != flat::PublicTargetState::OFFLINE) {
        CO_RETURN_AND_LOG_OP_ERR(*this,
                                 StatusCode::kInvalidArg,
                                 "Do not allow to remove target {} from chain {}: state is {}",
                                 req.targetId.toUnderType(),
                                 req.chainId.toUnderType(),
                                 hf3fs::toStringView(tit->publicState));
      }

      chainInfo.chainVersion = nextVersion(chainInfo.chainVersion);
      chainInfo.targets.erase(tit);
      if (auto it =
              std::find(chainInfo.preferredTargetOrder.begin(), chainInfo.preferredTargetOrder.end(), req.targetId);
          it != chainInfo.preferredTargetOrder.end()) {
        chainInfo.preferredTargetOrder.erase(it);
      }
    }

    auto commitRes =
        co_await updateStoredRoutingInfo(state, *this, [&](kv::IReadWriteTransaction &txn) -> CoTryTask<void> {
          if (!addTarget) {
            CO_RETURN_ON_ERROR(co_await state.store_.clearTargetInfo(txn, req.targetId));
          }
          co_return co_await state.store_.storeChainInfo(txn, chainInfo);
        });
    CO_RETURN_ON_ERROR(commitRes);

    co_await updateMemoryRoutingInfo(state, *this, [&](RoutingInfo &ri) {
      auto &oldChain = ri.getChain(req.chainId);
      LOG_OP_INFO(*this,
                  "{} change from {} to {}",
                  chainInfo.chainId,
                  serde::toJsonString(oldChain),
                  serde::toJsonString(chainInfo));
      oldChain = chainInfo;
      if (addTarget) {
        XLOGF_IF(DFATAL,
                 req.targetId != chainInfo.targets.back().targetId,
                 "targetId mismatch: {} and {}",
                 req.targetId.toUnderType(),
                 chainInfo.targets.back().targetId.toUnderType());
        ri.insertNewTarget(req.chainId, chainInfo.targets.back());
      } else {
        ri.removeTarget(req.chainId, req.targetId);
      }
    });

    co_return UpdateChainRsp::create(std::move(chainInfo));
  };
  co_return co_await doAsPrimary(state, std::move(handler));
}