void SysController::modifyTopic()

in aios/apps/facility/swift/admin/SysController.cpp [1539:1712]


void SysController::modifyTopic(const TopicCreationRequest *request, TopicCreationResponse *response) {
    CHECK_IS_LEADER(response);
    const Result<bool> &result = AdminRequestChecker::checkTopicCreationRequest(request, _adminConfig, false);
    if (result.is_err()) {
        handleError(response, ERROR_ADMIN_INVALID_PARAMETER, result.get_error().message());
        return;
    }
    ScopedLock lock(_lock);
    const string &topicName = request->topicname();
    TopicInfoPtr topicInfoPtr = _topicTable.findTopic(topicName);
    if (topicInfoPtr == NULL) {
        handleError(response, ERROR_ADMIN_TOPIC_NOT_EXISTED, topicName);
        return;
    }
    TopicCreationRequest newTopicMeta = topicInfoPtr->getTopicMeta();
    if (!canSealedTopicModify(newTopicMeta, request)) {
        handleError(response, ERROR_ADMIN_SEALED_TOPIC_CANNOT_MODIFY, request->ShortDebugString());
        return;
    }
    if (TOPIC_TYPE_LOGIC == newTopicMeta.topictype() || TOPIC_TYPE_LOGIC_PHYSIC == newTopicMeta.topictype()) {
        if (request->has_sealed() && request->sealed() != newTopicMeta.sealed()) {
            const string &errMsg = StringUtil::formatString("cannot seal logic topic, "
                                                            "request[%s]",
                                                            request->ShortDebugString().c_str());
            handleError(response, ERROR_ADMIN_INVALID_PARAMETER, errMsg);
            return;
        }
    }
    bool changed = false;
    bool needChangeVersion = false;
    TopicCreationRequest tmpRequest; // get default value
#define SET_TOPIC_CREATION_REQUEST(item)                                                                               \
    if (request->has_##item() && request->item() != newTopicMeta.item()) {                                             \
        if (!tmpRequest.has_##item() || tmpRequest.item() != request->item()) {                                        \
            newTopicMeta.set_##item(request->item());                                                                  \
            changed = true;                                                                                            \
            needChangeVersion = true;                                                                                  \
        }                                                                                                              \
    }
    SET_TOPIC_CREATION_REQUEST(resource);
    SET_TOPIC_CREATION_REQUEST(partitionlimit);
    SET_TOPIC_CREATION_REQUEST(topicmode);
    SET_TOPIC_CREATION_REQUEST(obsoletefiletimeinterval);
    SET_TOPIC_CREATION_REQUEST(reservedfilecount);
    SET_TOPIC_CREATION_REQUEST(partitionminbuffersize);
    SET_TOPIC_CREATION_REQUEST(partitionmaxbuffersize);
    SET_TOPIC_CREATION_REQUEST(maxwaittimeforsecuritycommit);
    SET_TOPIC_CREATION_REQUEST(maxdatasizeforsecuritycommit);
    SET_TOPIC_CREATION_REQUEST(compressmsg);
    SET_TOPIC_CREATION_REQUEST(compressthres);
    SET_TOPIC_CREATION_REQUEST(dfsroot);
    SET_TOPIC_CREATION_REQUEST(topicgroup);
    SET_TOPIC_CREATION_REQUEST(topicexpiredtime);
    SET_TOPIC_CREATION_REQUEST(rangecountinpartition);
    SET_TOPIC_CREATION_REQUEST(sealed);
    SET_TOPIC_CREATION_REQUEST(topictype);
    SET_TOPIC_CREATION_REQUEST(enablettldel);
    SET_TOPIC_CREATION_REQUEST(readsizelimitsec);
    SET_TOPIC_CREATION_REQUEST(enablelongpolling);
    SET_TOPIC_CREATION_REQUEST(enablemergedata);
    SET_TOPIC_CREATION_REQUEST(versioncontrol);
    SET_TOPIC_CREATION_REQUEST(readnotcommittedmsg);
#undef SET_TOPIC_CREATION_REQUEST
#define REPEATED_FIELD_CHANGE(field, changeVersion)                                                                    \
    if (0 != request->field##_size()) {                                                                                \
        bool rptFldChg = false;                                                                                        \
        if (request->field##_size() != newTopicMeta.field##_size()) {                                                  \
            rptFldChg = true;                                                                                          \
        } else {                                                                                                       \
            for (int i = 0; i < request->field##_size(); ++i) {                                                        \
                if (request->field(i) != newTopicMeta.field(i)) {                                                      \
                    rptFldChg = true;                                                                                  \
                    break;                                                                                             \
                }                                                                                                      \
            }                                                                                                          \
        }                                                                                                              \
        if (rptFldChg) {                                                                                               \
            newTopicMeta.mutable_##field()->Clear();                                                                   \
            bool clearAll = false;                                                                                     \
            for (int idx = 0; idx < request->field##_size(); ++idx) {                                                  \
                const string &val = request->field(idx);                                                               \
                if (!val.empty()) {                                                                                    \
                    *newTopicMeta.add_##field() = val;                                                                 \
                    if (val == "*") {                                                                                  \
                        clearAll = true;                                                                               \
                        break;                                                                                         \
                    }                                                                                                  \
                }                                                                                                      \
            }                                                                                                          \
            if (clearAll) {                                                                                            \
                newTopicMeta.mutable_##field()->Clear();                                                               \
            }                                                                                                          \
            changed = true;                                                                                            \
            if (changeVersion) {                                                                                       \
                needChangeVersion = changeVersion;                                                                     \
            }                                                                                                          \
        }                                                                                                              \
    }
    REPEATED_FIELD_CHANGE(extenddfsroot, true);
    REPEATED_FIELD_CHANGE(owners, false);
#undef REPEATED_FIELD_CHANGE
    uint32_t curPartCnt = topicInfoPtr->getPartitionCount();
    if (request->has_partitioncount()) {
        if (TOPIC_TYPE_LOGIC_PHYSIC == topicInfoPtr->getTopicType() && topicInfoPtr->physicTopicLstSize() > 0) {
            const string &lastPhysic = topicInfoPtr->getLastPhysicTopicName();
            uint32_t lastPartCnt = curPartCnt;
            if (LogicTopicHelper::getPhysicPartCnt(lastPhysic, lastPartCnt)) {
                curPartCnt = lastPartCnt;
                if (request->partitioncount() != lastPartCnt) {
                    newTopicMeta.set_partitioncount(request->partitioncount());
                    needChangeVersion = true;
                    changed = true;
                }
            }
        } else {
            if (request->partitioncount() != newTopicMeta.partitioncount()) {
                newTopicMeta.set_partitioncount(request->partitioncount());
                changed = true;
                needChangeVersion = true;
            }
        }
    }
    if (changed) {
        if (needChangeVersion) {
            newTopicMeta.set_modifytime(TimeUtility::currentTime());
        }
        if (TOPIC_TYPE_NORMAL != newTopicMeta.topictype() || TOPIC_TYPE_NORMAL != topicInfoPtr->getTopicType()) {
            const TopicCreationRequest &curMeta = topicInfoPtr->getTopicMeta();
            if (!AdminRequestChecker::checkLogicTopicModify(newTopicMeta, curMeta)) {
                const string &errMsg =
                    StringUtil::formatString("logic topic[%s] "
                                             "modify param invalid %s",
                                             curMeta.topicname().c_str(),
                                             ProtoUtil::plainDiffStr(&curMeta, &newTopicMeta).c_str());
                handleError(response, ERROR_ADMIN_INVALID_PARAMETER, errMsg);
                return;
            }
            if (curPartCnt != newTopicMeta.partitioncount()) {
                ErrorCode ec = _zkDataAccessor->sendChangePartCntTask(newTopicMeta);
                if (ERROR_NONE != ec) {
                    const string &errMsg = StringUtil::formatString("send logic topic"
                                                                    "modify partition task failed[%s], part[%d -> %d]",
                                                                    curMeta.ShortDebugString().c_str(),
                                                                    curPartCnt,
                                                                    newTopicMeta.partitioncount());
                    handleError(response, ec, errMsg);
                }
                AUTIL_LOG(INFO,
                          "send modify partition task[%s], part[%d -> %d]",
                          curMeta.ShortDebugString().c_str(),
                          curPartCnt,
                          newTopicMeta.partitioncount());
                SET_OK(response);
                return;
            } else { // L & LP not change current part count
                newTopicMeta.set_partitioncount(topicInfoPtr->getPartitionCount());
            }
        }
        if (!_zkDataAccessor->modifyTopic(newTopicMeta)) {
            handleError(response, ERROR_ADMIN_OPERATION_FAILED, "set topic meta failed!");
        } else {
            if (newTopicMeta.partitioncount() == topicInfoPtr->getPartitionCount()) {
                topicInfoPtr->setTopicMeta(newTopicMeta);
            } else {
                _topicTable.delTopic(topicName);
                _topicTable.addTopic(&newTopicMeta);
            }
            SET_OK(response);
        }
    } else {
        AUTIL_LOG(INFO, "topic[%s] params not change, do nothing", request->topicname().c_str());
        SET_OK(response);
    }
}