in aios/apps/facility/swift/py_tools/swift_tools/swift_admin_delegate.py [0:0]
def createTopicBatch(self, topicNames, partCnt, rangeCnt=None,
partMinBufSize=None, partMaxBufSize=None,
partResource=None, partLimit=None, topicMode=None,
needFieldFilter=None, obsoleteFileTimeInterval=None,
reservedFileCount=None, partFileBufSize=None,
deleteTopicData=None, securityCommitTime=None,
securityCommitData=None, compressMsg=None, compressThres=None,
dfsRoot=None, topicGroup=None, extendDfsRoot=None, expiredTime=None,
owners=None, needSchema=None, sealed=None, topicType=None,
physicTopicLst=None, enableTTLDel=None, permitUsers=None, readNotCommmitMsg=None):
batchRequest = swift_proto_admin.TopicBatchCreationRequest()
topicLst = topicNames.split(';')
topicNum = len(topicLst)
for index in range(topicNum):
request = batchRequest.topicRequests.add()
request.topicName = topicLst[index]
request.partitionCount = int(getIndexVal(partCnt, index, topicNum))
if rangeCnt is not None:
request.rangeCountInPartition = int(getIndexVal(rangeCnt, index,
topicNum))
if partMinBufSize is not None:
request.partitionMinBufferSize = int(getIndexVal(partMinBufSize, index,
topicNum))
if partMaxBufSize is not None:
request.partitionMaxBufferSize = int(getIndexVal(partMaxBufSize, index,
topicNum))
if partResource is not None:
request.resource = int(getIndexVal(partResource, index,
topicNum))
if partLimit is not None:
request.partitionLimit = int(getIndexVal(partLimit, index, topicNum))
if topicMode is not None:
mode = getIndexVal(topicMode, index, topicNum)
if mode == "normal_mode":
request.topicMode = TOPIC_MODE_NORMAL
elif mode == "security_mode":
request.topicMode = TOPIC_MODE_SECURITY
elif mode == "memory_only_mode":
request.topicMode = TOPIC_MODE_MEMORY_ONLY
elif mode == "memory_prefer_mode":
request.topicMode = TOPIC_MODE_MEMORY_PREFER
if needFieldFilter is not None:
need = getIndexVal(needFieldFilter, index, topicNum)
if 'true' == need.lower():
request.needFieldFilter = True
if needSchema is not None:
need = getIndexVal(needSchema, index, topicNum)
if 'true' == need.lower():
if request.needFieldFilter:
raise Exception('cannot set both fieldFilter and needSchema')
else:
request.needSchema = True
if sealed is not None:
need = getIndexVal(sealed, index, topicNum)
if 'true' == need.lower():
request.sealed = True
if topicType is not None:
tpType = getIndexVal(topicType, index, topicNum)
if tpType.lower() == 'topic_type_normal':
request.topicType = TOPIC_TYPE_NORMAL
elif tpType.lower() == 'topic_type_physic':
request.topicType = TOPIC_TYPE_PHYSIC
elif tpType.lower() == 'topic_type_logic':
request.topicType = TOPIC_TYPE_LOGIC
elif tpType.lower() == 'topic_type_logic_physic':
request.topicType = TOPIC_TYPE_LOGIC_PHYSIC
if enableTTLDel is not None:
enable = getIndexVal(enableTTLDel, index, topicNum)
if 'true' == enable.lower():
request.enableTTLDel = True
elif 'false' == enable.lower():
request.enableTTLDel = False
if obsoleteFileTimeInterval is not None:
request.obsoleteFileTimeInterval = int(getIndexVal(obsoleteFileTimeInterval,
index, topicNum))
if reservedFileCount is not None:
request.reservedFileCount = int(getIndexVal(reservedFileCount, index,
topicNum))
if partFileBufSize is not None:
request.partitionFileBufferSize = int(getIndexVal(partFileBufSize,
index, topicNum))
if deleteTopicData is not None:
isDel = getIndexVal(deleteTopicData, index, topicNum)
if 'true' == isDel.lower():
request.deleteTopicData = True
if securityCommitTime is not None:
request.maxWaitTimeForSecurityCommit = getIndexVal(securityCommitTime,
index, topicNum)
if securityCommitData is not None:
request.maxDataSizeForSecurityCommit = getIndexVal(securityCommitData,
index, topicNum)
if compressMsg is not None:
compress = getIndexVal(compressMsg, index, topicNum)
if 'false' == compress.lower():
request.compressMsg = False
if compressThres is not None:
request.compressThres = int(getIndexVal(compressThres, index, topicNum))
if dfsRoot is not None:
request.dfsRoot = getIndexVal(dfsRoot, index, topicNum)
if topicGroup is not None:
request.topicGroup = getIndexVal(topicGroup, index, topicNum)
if extendDfsRoot is not None:
edr = getIndexVal(extendDfsRoot, index, topicNum)
dfsRoots = edr.split(',')
for path in dfsRoots:
request.extendDfsRoot.append(path)
if expiredTime is not None:
request.topicExpiredTime = int(getIndexVal(expiredTime, index, topicNum))
if owners is not None:
own = getIndexVal(owners, index, topicNum)
request.owners.extend(own.split(','))
if permitUsers:
permitUser = getIndexVal(permitUsers, index, topicNum)
request.permitUser.extend(permitUser.split(","))
if readNotCommmitMsg is not None:
request.readNotCommittedMsg = readNotCommmitMsg
return self._run(swift_common_define.SWIFT_METHOD_CREATE_TOPIC_BATCH, batchRequest)