in aios/apps/facility/swift/py_tools/swift_tools/topic_cmd.py [0:0]
def run(self):
content = self.localFileUtil.cat(self.fileName)
topicMap = json_wrapper.read(content)
topicName = None
failTopicList = {}
hasError = False
timeFormat = '%Y-%m-%d %X'
for topic, topicDescription in topicMap.items():
partitionCount = None
resource = None
partitionLimit = None
topicMode = None
needFieldFilter = None
obsoleteFileTimeInterval = None
reservedFileCount = None
deleteTopicData = None
partitionMinBufferSize = None
partitionMaxBufferSize = None
maxWaitTimeForSecurityCommit = None
maxDataSizeForSecurityCommit = None
compressMsg = None
compressThres = None
dfsRoot = None
topicGroup = None
extendDfsRoot = None
topicExpiredTime = None
rangeCnt = None
owners = None
needSchema = None
schemaVersions = None
sealed = None
topicType = None
physicTopicLst = None
enableTTLDel = None
if topicDescription.has_key("topicName"):
topicName = topicDescription["topicName"]
if topicDescription.has_key("partitionCount"):
partitionCount = int(topicDescription["partitionCount"])
if topicDescription.has_key("resource"):
resource = int(topicDescription["resource"])
if topicDescription.has_key("partitionLimit"):
partitionLimit = int(topicDescription["partitionLimit"])
if topicDescription.has_key("topicMode"):
if topicDescription["topicMode"] == 'TOPIC_MODE_NORMAL':
topicMode = TOPIC_MODE_NORMAL
elif topicDescription["topicMode"] == 'TOPIC_MODE_SECURITY':
topicMode = TOPIC_MODE_SECURITY
elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_PREFER':
topicMode = TOPIC_MODE_MEMORY_PREFER
elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_ONLY':
topicMode = TOPIC_MODE_MEMORY_ONLY
if topicDescription.has_key("needFieldFilter"):
needFieldFilter = topicDescription["needFieldFilter"] == 'true'
if topicDescription.has_key("obsoleteFileTimeInterval"):
obsoleteFileTimeInterval = int(topicDescription["obsoleteFileTimeInterval"])
if topicDescription.has_key("reservedFileCount"):
reservedFileCount = int(topicDescription["reservedFileCount"])
if topicDescription.has_key("deleteTopicData"):
deleteTopicData = topicDescription["deleteTopicData"] == 'true'
if topicDescription.has_key("partitionMinBufferSize"):
partitionMinBufferSize = int(topicDescription["partitionMinBufferSize"])
if topicDescription.has_key("partitionMaxBufferSize"):
partitionMaxBufferSize = int(topicDescription["partitionMaxBufferSize"])
if topicDescription.has_key("maxWaitTimeForSecurityCommit"):
maxWaitTimeForSecurityCommit = int(topicDescription["maxWaitTimeForSecurityCommit"])
if topicDescription.has_key("maxDataSizeForSecurityCommit"):
maxDataSizeForSecurityCommit = int(topicDescription["maxDataSizeForSecurityCommit"])
if topicDescription.has_key("compressMsg"):
compressMsg = topicDescription["compressMsg"] == 'true'
if topicDescription.has_key("compressThres"):
compressThres = int(topicDescription["compressThres"])
if topicDescription.has_key("dfsRoot"):
dfsRoot = topicDescription["dfsRoot"]
if topicDescription.has_key("extendDfsRoot"):
extendDfsRoot = topicDescription["extendDfsRoot"]
if topicDescription.has_key("topicGroup"):
topicGroup = topicDescription["topicGroup"]
if topicDescription.has_key("topicExpiredTime"):
topicExpiredTime = int(topicDescription["topicExpiredTime"])
if topicDescription.has_key("rangeCount"):
rangeCnt = int(topicDescription["rangeCount"])
if topicDescription.has_key("owners"):
owners = topicDescription["owners"]
if topicDescription.has_key("needSchema"):
needSchema = topicDescription["needSchema"] == 'true'
if topicDescription.has_key("schemaVersions"):
schemaVersions = topicDescription["schemaVersions"]
if topicDescription.has_key("sealed"):
sealed = topicDescription["sealed"] == 'true'
if topicDescription.has_key("topicType"):
if topicDescription.has_key("topicType") == "TOPIC_TYPE_NORMAL":
topicType = TOPIC_TYPE_NORMAL
elif topicDescription.has_key("topicType") == "TOPIC_TYPE_PHYSIC":
topicType = TOPIC_TYPE_PHYSIC
elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC":
topicType = TOPIC_TYPE_LOGIC
elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC_PHYSIC":
topicType = TOPIC_TYPE_LOGIC_PHYSIC
if topicDescription.has_key("physicTopicLst"):
physicTopicLst = topicDescription["physicTopicLst"]
if topicDescription.has_key("enableTTLDel"):
enableTTLDel = topicDescription["enableTTLDel"] == 'true'
if self.group is not None:
topicGroup = self.group
if hasError and not self.continueAdd:
failTopicList[topicName] = topicDescription
continue
if self.deleteExist:
ret, reponse, errorMsg = self.adminDelegate.deleteTopic(topicName)
if not ret:
if errorMsg.find('ERROR_ADMIN_TOPIC_NOT_EXISTED') == -1:
print "[%s] Delete topic %s failed! error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg)
hasError = True
continue
else:
print "[%s] Delete topic %s success!" % (time.strftime(timeFormat, time.localtime()), topicName)
time.sleep(2)
ret, response, errorMsg = self.adminDelegate.createTopic(
topicName=topicName, partCnt=partitionCount, rangeCnt=rangeCnt,
partMinBufSize=partitionMinBufferSize, partMaxBufSize=partitionMaxBufferSize,
partResource=resource, partLimit=partitionLimit,
topicMode=topicMode, needFieldFilter=needFieldFilter,
obsoleteFileTimeInterval=obsoleteFileTimeInterval,
reservedFileCount=reservedFileCount,
deleteTopicData=deleteTopicData,
securityCommitTime=maxWaitTimeForSecurityCommit,
securityCommitData=maxDataSizeForSecurityCommit,
compressMsg=compressMsg,
compressThres=compressThres,
dfsRoot=dfsRoot, topicGroup=topicGroup, extendDfsRoot=extendDfsRoot,
expiredTime=topicExpiredTime, owners=owners, needSchema=needSchema,
schemaVersions=schemaVersions, sealed=sealed, topicType=topicType,
physicTopicLst=physicTopicLst, enableTTLDel=enableTTLDel)
if not ret:
print "[%s] Add topic [%s] Failed, error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg)
failTopicList[topicName] = topicDescription
hasError = True
continue
if self.timeout is not None:
print "wait topic ready."
if not self._waitTopicReady(self.timeout, topicName):
print "wait topic %s ready failed" % topicName
failTopicList[topicName] = topicDescription
hasError = True
continue
print "[%s] Add topic [%s] success!" % (time.strftime(timeFormat, time.localtime()), topicName)
if len(failTopicList) > 0:
self.addFailTopic(failTopicList)
return "", "", 0