in aios/apps/facility/swift/admin/SysController.cpp [1539:1712]
void SysController::modifyTopic(const TopicCreationRequest *request, TopicCreationResponse *response) {
CHECK_IS_LEADER(response);
const Result<bool> &result = AdminRequestChecker::checkTopicCreationRequest(request, _adminConfig, false);
if (result.is_err()) {
handleError(response, ERROR_ADMIN_INVALID_PARAMETER, result.get_error().message());
return;
}
ScopedLock lock(_lock);
const string &topicName = request->topicname();
TopicInfoPtr topicInfoPtr = _topicTable.findTopic(topicName);
if (topicInfoPtr == NULL) {
handleError(response, ERROR_ADMIN_TOPIC_NOT_EXISTED, topicName);
return;
}
TopicCreationRequest newTopicMeta = topicInfoPtr->getTopicMeta();
if (!canSealedTopicModify(newTopicMeta, request)) {
handleError(response, ERROR_ADMIN_SEALED_TOPIC_CANNOT_MODIFY, request->ShortDebugString());
return;
}
if (TOPIC_TYPE_LOGIC == newTopicMeta.topictype() || TOPIC_TYPE_LOGIC_PHYSIC == newTopicMeta.topictype()) {
if (request->has_sealed() && request->sealed() != newTopicMeta.sealed()) {
const string &errMsg = StringUtil::formatString("cannot seal logic topic, "
"request[%s]",
request->ShortDebugString().c_str());
handleError(response, ERROR_ADMIN_INVALID_PARAMETER, errMsg);
return;
}
}
bool changed = false;
bool needChangeVersion = false;
TopicCreationRequest tmpRequest; // get default value
#define SET_TOPIC_CREATION_REQUEST(item) \
if (request->has_##item() && request->item() != newTopicMeta.item()) { \
if (!tmpRequest.has_##item() || tmpRequest.item() != request->item()) { \
newTopicMeta.set_##item(request->item()); \
changed = true; \
needChangeVersion = true; \
} \
}
SET_TOPIC_CREATION_REQUEST(resource);
SET_TOPIC_CREATION_REQUEST(partitionlimit);
SET_TOPIC_CREATION_REQUEST(topicmode);
SET_TOPIC_CREATION_REQUEST(obsoletefiletimeinterval);
SET_TOPIC_CREATION_REQUEST(reservedfilecount);
SET_TOPIC_CREATION_REQUEST(partitionminbuffersize);
SET_TOPIC_CREATION_REQUEST(partitionmaxbuffersize);
SET_TOPIC_CREATION_REQUEST(maxwaittimeforsecuritycommit);
SET_TOPIC_CREATION_REQUEST(maxdatasizeforsecuritycommit);
SET_TOPIC_CREATION_REQUEST(compressmsg);
SET_TOPIC_CREATION_REQUEST(compressthres);
SET_TOPIC_CREATION_REQUEST(dfsroot);
SET_TOPIC_CREATION_REQUEST(topicgroup);
SET_TOPIC_CREATION_REQUEST(topicexpiredtime);
SET_TOPIC_CREATION_REQUEST(rangecountinpartition);
SET_TOPIC_CREATION_REQUEST(sealed);
SET_TOPIC_CREATION_REQUEST(topictype);
SET_TOPIC_CREATION_REQUEST(enablettldel);
SET_TOPIC_CREATION_REQUEST(readsizelimitsec);
SET_TOPIC_CREATION_REQUEST(enablelongpolling);
SET_TOPIC_CREATION_REQUEST(enablemergedata);
SET_TOPIC_CREATION_REQUEST(versioncontrol);
SET_TOPIC_CREATION_REQUEST(readnotcommittedmsg);
#undef SET_TOPIC_CREATION_REQUEST
#define REPEATED_FIELD_CHANGE(field, changeVersion) \
if (0 != request->field##_size()) { \
bool rptFldChg = false; \
if (request->field##_size() != newTopicMeta.field##_size()) { \
rptFldChg = true; \
} else { \
for (int i = 0; i < request->field##_size(); ++i) { \
if (request->field(i) != newTopicMeta.field(i)) { \
rptFldChg = true; \
break; \
} \
} \
} \
if (rptFldChg) { \
newTopicMeta.mutable_##field()->Clear(); \
bool clearAll = false; \
for (int idx = 0; idx < request->field##_size(); ++idx) { \
const string &val = request->field(idx); \
if (!val.empty()) { \
*newTopicMeta.add_##field() = val; \
if (val == "*") { \
clearAll = true; \
break; \
} \
} \
} \
if (clearAll) { \
newTopicMeta.mutable_##field()->Clear(); \
} \
changed = true; \
if (changeVersion) { \
needChangeVersion = changeVersion; \
} \
} \
}
REPEATED_FIELD_CHANGE(extenddfsroot, true);
REPEATED_FIELD_CHANGE(owners, false);
#undef REPEATED_FIELD_CHANGE
uint32_t curPartCnt = topicInfoPtr->getPartitionCount();
if (request->has_partitioncount()) {
if (TOPIC_TYPE_LOGIC_PHYSIC == topicInfoPtr->getTopicType() && topicInfoPtr->physicTopicLstSize() > 0) {
const string &lastPhysic = topicInfoPtr->getLastPhysicTopicName();
uint32_t lastPartCnt = curPartCnt;
if (LogicTopicHelper::getPhysicPartCnt(lastPhysic, lastPartCnt)) {
curPartCnt = lastPartCnt;
if (request->partitioncount() != lastPartCnt) {
newTopicMeta.set_partitioncount(request->partitioncount());
needChangeVersion = true;
changed = true;
}
}
} else {
if (request->partitioncount() != newTopicMeta.partitioncount()) {
newTopicMeta.set_partitioncount(request->partitioncount());
changed = true;
needChangeVersion = true;
}
}
}
if (changed) {
if (needChangeVersion) {
newTopicMeta.set_modifytime(TimeUtility::currentTime());
}
if (TOPIC_TYPE_NORMAL != newTopicMeta.topictype() || TOPIC_TYPE_NORMAL != topicInfoPtr->getTopicType()) {
const TopicCreationRequest &curMeta = topicInfoPtr->getTopicMeta();
if (!AdminRequestChecker::checkLogicTopicModify(newTopicMeta, curMeta)) {
const string &errMsg =
StringUtil::formatString("logic topic[%s] "
"modify param invalid %s",
curMeta.topicname().c_str(),
ProtoUtil::plainDiffStr(&curMeta, &newTopicMeta).c_str());
handleError(response, ERROR_ADMIN_INVALID_PARAMETER, errMsg);
return;
}
if (curPartCnt != newTopicMeta.partitioncount()) {
ErrorCode ec = _zkDataAccessor->sendChangePartCntTask(newTopicMeta);
if (ERROR_NONE != ec) {
const string &errMsg = StringUtil::formatString("send logic topic"
"modify partition task failed[%s], part[%d -> %d]",
curMeta.ShortDebugString().c_str(),
curPartCnt,
newTopicMeta.partitioncount());
handleError(response, ec, errMsg);
}
AUTIL_LOG(INFO,
"send modify partition task[%s], part[%d -> %d]",
curMeta.ShortDebugString().c_str(),
curPartCnt,
newTopicMeta.partitioncount());
SET_OK(response);
return;
} else { // L & LP not change current part count
newTopicMeta.set_partitioncount(topicInfoPtr->getPartitionCount());
}
}
if (!_zkDataAccessor->modifyTopic(newTopicMeta)) {
handleError(response, ERROR_ADMIN_OPERATION_FAILED, "set topic meta failed!");
} else {
if (newTopicMeta.partitioncount() == topicInfoPtr->getPartitionCount()) {
topicInfoPtr->setTopicMeta(newTopicMeta);
} else {
_topicTable.delTopic(topicName);
_topicTable.addTopic(&newTopicMeta);
}
SET_OK(response);
}
} else {
AUTIL_LOG(INFO, "topic[%s] params not change, do nothing", request->topicname().c_str());
SET_OK(response);
}
}