def run()

in aios/apps/facility/swift/py_tools/swift_tools/topic_cmd.py [0:0]


    def run(self):
        content = self.localFileUtil.cat(self.fileName)
        topicMap = json_wrapper.read(content)
        topicName = None
        failTopicList = {}
        hasError = False
        timeFormat = '%Y-%m-%d %X'
        for topic, topicDescription in topicMap.items():
            partitionCount = None
            resource = None
            partitionLimit = None
            topicMode = None
            needFieldFilter = None
            obsoleteFileTimeInterval = None
            reservedFileCount = None
            deleteTopicData = None
            partitionMinBufferSize = None
            partitionMaxBufferSize = None
            maxWaitTimeForSecurityCommit = None
            maxDataSizeForSecurityCommit = None
            compressMsg = None
            compressThres = None
            dfsRoot = None
            topicGroup = None
            extendDfsRoot = None
            topicExpiredTime = None
            rangeCnt = None
            owners = None
            needSchema = None
            schemaVersions = None
            sealed = None
            topicType = None
            physicTopicLst = None
            enableTTLDel = None
            if topicDescription.has_key("topicName"):
                topicName = topicDescription["topicName"]
            if topicDescription.has_key("partitionCount"):
                partitionCount = int(topicDescription["partitionCount"])
            if topicDescription.has_key("resource"):
                resource = int(topicDescription["resource"])
            if topicDescription.has_key("partitionLimit"):
                partitionLimit = int(topicDescription["partitionLimit"])
            if topicDescription.has_key("topicMode"):
                if topicDescription["topicMode"] == 'TOPIC_MODE_NORMAL':
                    topicMode = TOPIC_MODE_NORMAL
                elif topicDescription["topicMode"] == 'TOPIC_MODE_SECURITY':
                    topicMode = TOPIC_MODE_SECURITY
                elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_PREFER':
                    topicMode = TOPIC_MODE_MEMORY_PREFER
                elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_ONLY':
                    topicMode = TOPIC_MODE_MEMORY_ONLY
            if topicDescription.has_key("needFieldFilter"):
                needFieldFilter = topicDescription["needFieldFilter"] == 'true'
            if topicDescription.has_key("obsoleteFileTimeInterval"):
                obsoleteFileTimeInterval = int(topicDescription["obsoleteFileTimeInterval"])
            if topicDescription.has_key("reservedFileCount"):
                reservedFileCount = int(topicDescription["reservedFileCount"])
            if topicDescription.has_key("deleteTopicData"):
                deleteTopicData = topicDescription["deleteTopicData"] == 'true'
            if topicDescription.has_key("partitionMinBufferSize"):
                partitionMinBufferSize = int(topicDescription["partitionMinBufferSize"])
            if topicDescription.has_key("partitionMaxBufferSize"):
                partitionMaxBufferSize = int(topicDescription["partitionMaxBufferSize"])
            if topicDescription.has_key("maxWaitTimeForSecurityCommit"):
                maxWaitTimeForSecurityCommit = int(topicDescription["maxWaitTimeForSecurityCommit"])
            if topicDescription.has_key("maxDataSizeForSecurityCommit"):
                maxDataSizeForSecurityCommit = int(topicDescription["maxDataSizeForSecurityCommit"])
            if topicDescription.has_key("compressMsg"):
                compressMsg = topicDescription["compressMsg"] == 'true'
            if topicDescription.has_key("compressThres"):
                compressThres = int(topicDescription["compressThres"])
            if topicDescription.has_key("dfsRoot"):
                dfsRoot = topicDescription["dfsRoot"]
            if topicDescription.has_key("extendDfsRoot"):
                extendDfsRoot = topicDescription["extendDfsRoot"]
            if topicDescription.has_key("topicGroup"):
                topicGroup = topicDescription["topicGroup"]
            if topicDescription.has_key("topicExpiredTime"):
                topicExpiredTime = int(topicDescription["topicExpiredTime"])
            if topicDescription.has_key("rangeCount"):
                rangeCnt = int(topicDescription["rangeCount"])
            if topicDescription.has_key("owners"):
                owners = topicDescription["owners"]
            if topicDescription.has_key("needSchema"):
                needSchema = topicDescription["needSchema"] == 'true'
                if topicDescription.has_key("schemaVersions"):
                    schemaVersions = topicDescription["schemaVersions"]
            if topicDescription.has_key("sealed"):
                sealed = topicDescription["sealed"] == 'true'
            if topicDescription.has_key("topicType"):
                if topicDescription.has_key("topicType") == "TOPIC_TYPE_NORMAL":
                    topicType = TOPIC_TYPE_NORMAL
                elif topicDescription.has_key("topicType") == "TOPIC_TYPE_PHYSIC":
                    topicType = TOPIC_TYPE_PHYSIC
                elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC":
                    topicType = TOPIC_TYPE_LOGIC
                elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC_PHYSIC":
                    topicType = TOPIC_TYPE_LOGIC_PHYSIC
            if topicDescription.has_key("physicTopicLst"):
                physicTopicLst = topicDescription["physicTopicLst"]
            if topicDescription.has_key("enableTTLDel"):
                enableTTLDel = topicDescription["enableTTLDel"] == 'true'
            if self.group is not None:
                topicGroup = self.group
            if hasError and not self.continueAdd:
                failTopicList[topicName] = topicDescription
                continue
            if self.deleteExist:
                ret, reponse, errorMsg = self.adminDelegate.deleteTopic(topicName)
                if not ret:
                    if errorMsg.find('ERROR_ADMIN_TOPIC_NOT_EXISTED') == -1:
                        print "[%s] Delete topic %s failed! error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg)
                        hasError = True
                        continue
                else:
                    print "[%s] Delete topic %s success!" % (time.strftime(timeFormat, time.localtime()), topicName)
                    time.sleep(2)

            ret, response, errorMsg = self.adminDelegate.createTopic(
                topicName=topicName, partCnt=partitionCount, rangeCnt=rangeCnt,
                partMinBufSize=partitionMinBufferSize, partMaxBufSize=partitionMaxBufferSize,
                partResource=resource, partLimit=partitionLimit,
                topicMode=topicMode, needFieldFilter=needFieldFilter,
                obsoleteFileTimeInterval=obsoleteFileTimeInterval,
                reservedFileCount=reservedFileCount,
                deleteTopicData=deleteTopicData,
                securityCommitTime=maxWaitTimeForSecurityCommit,
                securityCommitData=maxDataSizeForSecurityCommit,
                compressMsg=compressMsg,
                compressThres=compressThres,
                dfsRoot=dfsRoot, topicGroup=topicGroup, extendDfsRoot=extendDfsRoot,
                expiredTime=topicExpiredTime, owners=owners, needSchema=needSchema,
                schemaVersions=schemaVersions, sealed=sealed, topicType=topicType,
                physicTopicLst=physicTopicLst, enableTTLDel=enableTTLDel)
            if not ret:
                print "[%s] Add topic [%s] Failed, error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg)
                failTopicList[topicName] = topicDescription
                hasError = True
                continue

            if self.timeout is not None:
                print "wait topic ready."
                if not self._waitTopicReady(self.timeout, topicName):
                    print "wait topic %s ready failed" % topicName
                    failTopicList[topicName] = topicDescription
                    hasError = True
                    continue
            print "[%s] Add topic [%s] success!" % (time.strftime(timeFormat, time.localtime()), topicName)
        if len(failTopicList) > 0:
            self.addFailTopic(failTopicList)
        return "", "", 0