aios/apps/facility/swift/py_tools/swift_tools/topic_cmd.py (2,715 lines of code) (raw):

import os import base_cmd import swift_common_define import swift_admin_delegate import status_analyzer import swift_util from swift.protocol.Common_pb2 import * from swift.protocol.AdminRequestResponse_pb2 import * import transport_cmd_base import time import swift_broker_delegate import local_file_util import zlib import json_wrapper class TopicAddCmd(base_cmd.BaseCmd): ''' swift {at|addtopic} {-z zookeeper_address | --zookeeper=zookeeper_address } {-t topic_name | --topic=topic_name } {-c partition_count | --partcount=partition_count } {-R range_count | --rangecount=range_count } {-s partition_buf_size | --partbuf=partition_buf_size } {-b partition_file_buf_size | --partfilebuf=partition_file_buf_size } {-y partition_max_buf_size | --partmaxbuf=partition_max_buf_size } {-x partition_min_buf_size | --partminbuf=partition_min_buf_size } {-r partition_resource | --partresource=partition_resource } {-l partition_limit | --partlimit=partition_limit } {-m topic_mode | --topicmode=topic_mode } {-f | --fieldfilter } {-S | --needSchema } {-i obsolete_file_interval | --obsoletefileinterval=obsolete_file_interval } {-n reserved_file_count | --reservedfilecount=reserved_file_count } {-d | --deletetopicdata } {-o security_commit_time | --committime=security_commit_time } {-p security_commit_data | --commitdata=security_commit_data } {-O owners | --owners=owners } {-q | --compress=compress_single_msg } {-u | --compressthres=compress_single_msg_threshold } {-a | --dfsroot=dfs_root } {-e | --extenddfsroot=extend_dfs_root } {-g | --topicgroup=topic_group } {-j | --expired=expired } {-E | --sealed=sealed } {-Y | --topicType=topic_type } {-P | --physicTopicLst=physic_topic_list} {-N | --enableTTLDel=enable_ttl_del} {-G | --enableLongPolling=enable_long_polling} {-v | --versionControl=version_control} {-M | --enableMergeData=enable_merge_data} {-I | --permitUser=permitUser} {-Z | --readnotcommitmsg=read_notcommit_msg} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_name, --topic=topic_name : required, topic name, eg: news -c partition_count, --partcount=partition_count : required, partition count of the topic -R range_count, --rangecount=range_count : optional, range count in one partion, default is 4, -s partition_buf_size --partbuf=partition_buf_size : [deprecated], partition buffer size of the topic, M -b partition_file_buf_size --partfilebuf=partition_file_buf_size : [deprecated], partition read file buffer size of the topic, M -x partition_min_buf_size --partmaxbuf=partition_max_buf_size : [optional], partition max buffer size of the topic, M -y partition_max_buf_size --partminbuf=partition_min_buf_size : [optional], partition min buffer size of the topic, M -r partition_resource --partresource=partition_resource : optional, partition resource[1,1000], default is 100 -l partition_limit --partlimit=partition_limit : optional, partition limit of this topic in one broker, default is no limit -m topic_mode --topicmode=topic_mode : optional, normal_mode | security_mode | memory_only_mode | memory_prefer_mode, default is normal_mode -f --fieldfilter : optional, filter fields in msg if specialized -i obsolete_file_interval --obsoletefileinterval=obsolete_file_interval : optional, obsolete file time interval, unit is hour. -n reserved_file_count --reservedfilecount=reserved_file_count : optional, reserved file count. -d --deletetopicdata : optional, delete old topic data if specialized -o security_commit_time --committime=security_commit_time : optional, max wait time for commit message in security_mode -p security_commit_data --commitdata=security_commit_data : optional, max data size for commit message in security_mode -O owners --owners=owners : optional, topic owners, multi owners seperate by , -S --needschema : optional, schema topic if specialized -V schema_versions --schema_versions : optional, topic schema verisons, must exist, versions seperate by ',' -q --compress=compress_single_msg : optional, compress msg in this topic -u --compressthres=compress_single_msg_threshold : optional, compress msg great than threshold [default:2048] -a --dfsroot=dfs_root : optional, dfs_root -e --extenddfsroot=extend_dfs_root : optional, extend dfs_root add old data when dfs root changed -g --topicgroup=topic_group : optional, topic_group -j --expired=expired : optional, topic expired time, auto delete topic after expired, second -T --request_time_out=optional : optional, timeout for request, ms -E --sealed : optional, is sealed topic or not -Y --topicType=topic_type : optional, TOPIC_TYPE_NORMAL | TOPIC_TYPE_PHYSIC | TOPIC_TYPE_LOGIC | TOPIC_TYPE_LOGIC_PHYSIC, default is TOPIC_TYPE_NORMAL -P --physicTopicLst=physic_topic_list : optional, physic topic list, seperate by ',' -N --enableTTLDel=enable_ttl_del : optional, enable TTL delete or not -G --enable_long_polling=enableLongPolling : optional, enable long polling -v --versionControl=version_control : optional, True or False -M --enableMergeData=enable_merge_data : optional, enable merge data -I --permitUser=permitUser : optional, users allowed to operate this topic, seperate by ',' -Z --readnotcommitmsg=read_notcommit_msg : optional, whether topic can read not committed msg, default is true Example: swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -x 10 swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -y 100 -b 128 swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -x 10 -y 100 -r 30 swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -m security_mode -f swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -i 1 -n 5 swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -l 2 swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -l 2 -q swift at -z zfs://10.250.12.23:1234/root -t news -c 1 -j 1000 -O Lilei,Hanmeimei swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -a hdfs://xxx/1 -e hdfs://xxx/2,hdfs://xxx/3 swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -s 10 [deprecated] swift at -z zfs://10.250.12.23:1234/root -t news --permitUser=Hanmeimei,Lilei ''' def addOptions(self): super(TopicAddCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') self.parser.add_option('-c', '--partcount', type='int', action='store', dest='partCnt') self.parser.add_option('-R', '--rangecount', type='int', action='store', dest='rangeCnt') self.parser.add_option('-s', '--partbuf', type='int', action='store', dest='partBuf') self.parser.add_option('-b', '--partfilebuf', type='int', action='store', dest='partFileBuf') self.parser.add_option('-x', '--partminbuf', type='int', action='store', dest='partMinBuf') self.parser.add_option('-y', '--partmaxbuf', type='int', action='store', dest='partMaxBuf') self.parser.add_option('-r', '--partresource', type='int', action='store', dest='partResource') self.parser.add_option('-l', '--partlimit', type='int', action='store', dest='partLimit') self.parser.add_option('-w', '--waittopicready', type='int', action='store', dest='timeout') self.parser.add_option( '-m', '--topicmode', type='choice', action='store', dest='topicMode', choices=[ 'normal_mode', 'security_mode', 'memory_only_mode', 'memory_prefer_mode', 'persist_data_mode'], default='normal_mode') self.parser.add_option('-f', '--fieldfilter', action='store_true', dest='fieldFilter', default=False) self.parser.add_option('-i', '--obsoletefileinterval', type='int', action='store', dest='obsoleteFileInterval') self.parser.add_option('-n', '--reservedfilecount', type='int', action='store', dest='reservedFileCount') self.parser.add_option('-d', '--deletetopicdata', action='store_true', dest='deleteTopicData', default=False) self.parser.add_option('-o', '--committime', type='int', action='store', dest='commitTime') self.parser.add_option('-p', '--commitdata', type='int', action='store', dest='commitData') self.parser.add_option('-O', '--owners', action='store', dest='owners') self.parser.add_option('-q', '--compress', action='store_true', dest='compressMsg', default=False) self.parser.add_option('-u', '--compressthres', action='store_true', dest='compressThres') self.parser.add_option('-a', '--dfsRoot', action='store', dest='dfsRoot') self.parser.add_option('-e', '--extendDfsRoot', action='store', dest='extendDfsRoot') self.parser.add_option('-g', '--topicGroup', action='store', dest='topicGroup') self.parser.add_option('-j', '--expired', type='int', action='store', dest='expiredTime') self.parser.add_option('-S', '--needschema', action='store_true', dest='needSchema', default=False) self.parser.add_option('-V', '--schema_versions', action='store', dest='schemaVersions') self.parser.add_option('-T', '--request_time_out', type='int', action='store', dest='requestTimeout') self.parser.add_option('-E', '--sealed', action='store', dest='sealed', default='false') self.parser.add_option('-Y', '--topic_type', type='choice', action='store', dest='topicType', choices=['TOPIC_TYPE_NORMAL', 'TOPIC_TYPE_PHYSIC', 'TOPIC_TYPE_LOGIC', 'TOPIC_TYPE_LOGIC_PHYSIC'], default='TOPIC_TYPE_NORMAL') self.parser.add_option('-P', '--physic_topic_list', action='store', dest='physicTopicLst') self.parser.add_option('-N', '--enable_ttl_del', action='store', dest='enableTTLDel', default='true') self.parser.add_option('-G', '--enable_long_polling', action='store', dest='enableLongPolling', default='false') self.parser.add_option('-v', '--version_control', action='store', dest='versionControl', default='false') self.parser.add_option('-M', '--enable_merge_data', action='store', dest='enableMergeData', default='false') self.parser.add_option('-I', '--permitUser', action='store', dest='permitUser', default='') self.parser.add_option('-Z', '--readnotcommitmsg', action='store', dest='readNotCommmitMsg', default='true') def checkOptionsValidity(self, options): ret, errMsg = super(TopicAddCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicName is None: errMsg = "ERROR: topic must be specified!" return False, errMsg if options.partCnt is None: errMsg = "ERROR: partition count for topic[%s] must be specified!"\ % options.topicName return False, errMsg if options.partCnt <= 0: errMsg = "ERROR: partition count [%d] must greater than zero" \ % options.partCnt return False, errMsg if options.rangeCnt is not None and options.rangeCnt < 1: errMsg = "ERROR: range count [%d] must greater than zero" \ % options.rangeCnt return False, errMsg if options.partBuf is not None: errMsg = "WARN: partition buffer is deprecated. using partminbuf and partmaxbuf!" if options.partBuf is not None and options.partBuf <= 0: errMsg = "ERROR: partition buffer [%d] must greater than zero"\ % options.partBuf return False, errMsg if options.partMaxBuf is None and options.partMinBuf is None and options.partBuf is not None: options.partMinBuf = options.partBuf / 2 if options.partMinBuf == 0: options.partMinBuf = 1 options.partMaxBuf = options.partBuf * 2 errMsg += " WARN: partmaxbuf and partminbuf is not setted, using partbuf value to"\ "set partminbuf %d and partmaxbuf %d!"\ % (options.partMinBuf, options.partMaxBuf) if options.partMaxBuf is not None and options.partMaxBuf <= 0: errMsg = "ERROR: partition max buffer [%d] must greater than zero"\ % options.partMaxBuf return False, errMsg if options.partMinBuf is not None and options.partMinBuf <= 0: errMsg = "ERROR: partition min buffer [%d] must greater than zero"\ % options.partMinBuf return False, errMsg if options.partMinBuf is not None and options.partMaxBuf is not None and options.partMinBuf > options.partMaxBuf: errMsg = "ERROR: partition min buffer [%d] must not greater than partition"\ " max buff [%d]" % (options.partMinBuf, options.partMaxBuf) return False, errMsg if options.partResource is not None and options.partResource <= 0: errMsg = "ERROR: partition resource [%d] must greater than zero" \ % options.partResource return False, errMsg if options.partLimit is not None and options.partLimit <= 0: errMsg = "ERROR: partition limit [%d] must greater than zero" % ( options.partLimit) return False, errMsg if options.obsoleteFileInterval is not None and options.obsoleteFileInterval <= 0: errMsg = "ERROR: obsolete file time interval [%d] must greater than zero" % ( options.obsoleteFileInterval) return False, errMsg if options.reservedFileCount is not None and options.reservedFileCount <= 0: errMsg = "ERROR: reserved file count [%d] must greater than zero" % ( options.reservedFileCount) return False, errMsg if options.partFileBuf is not None: errMsg += " WARN: setting partition file buffer has deprecated." if options.commitData is not None and options.commitData < 0: errMsg = "ERROR: security max commit data should not less than zero [%d]." \ % options.commitData return False, errMsg if options.commitTime is not None and options.commitTime < 0: errMsg = "ERROR: security max commit time should not less than zero [%d]." \ % options.commitTime return False, errMsg if options.expiredTime is not None and options.expiredTime < -1: errMsg = "ERROR: expired time should not less than -1 [%d]." \ % options.commitTime return False, errMsg if options.fieldFilter is not None and options.fieldFilter \ and options.needSchema is not None and options.needSchema: errMsg = "ERROR: cannot set both field filter and need schema" return False, errMsg if options.requestTimeout is not None and options.requestTimeout < 0: errMsg = "ERROR: expired time should larger than 0[%d]." \ % options.requestTimeout return False, errMsg if options.sealed is not None and options.sealed not in ['true', 'false']: errMsg = "ERROR: sealed can only set ['true', 'false']" return False, errMsg if options.enableTTLDel is not None and options.enableTTLDel not in ['true', 'false']: errMsg = "ERROR: enableTTLDel can only set ['true', 'false']" return False, errMsg if options.enableLongPolling is not None and options.enableLongPolling not in ['true', 'false']: errMsg = "ERROR, enableLongPolling can only set ['true', 'false']" return False, errMsg if options.enableMergeData is not None and options.enableMergeData not in ['true', 'false']: errMsg = "ERROR, enableMergeData can only set ['true', 'false']" return False, errMsg if options.readNotCommmitMsg is not None and options.readNotCommmitMsg not in ['true', 'false']: errMsg = "ERROR, readNotCommmitMsg can only set ['true', 'false']" return False, errMsg return True, errMsg def initMember(self, options): super(TopicAddCmd, self).initMember(options) self.topicName = options.topicName self.partCnt = options.partCnt self.rangeCnt = options.rangeCnt self.partBuf = options.partBuf self.partFileBuf = options.partFileBuf self.partMinBuf = options.partMinBuf self.partMaxBuf = options.partMaxBuf self.partResource = options.partResource self.partLimit = options.partLimit self.timeout = options.timeout self.needFieldFilter = options.fieldFilter if options.topicMode == "normal_mode": self.topicMode = TOPIC_MODE_NORMAL elif options.topicMode == "security_mode": self.topicMode = TOPIC_MODE_SECURITY elif options.topicMode == "memory_only_mode": self.topicMode = TOPIC_MODE_MEMORY_ONLY elif options.topicMode == "memory_prefer_mode": self.topicMode = TOPIC_MODE_MEMORY_PREFER elif options.topicMode == "persist_data_mode": self.topicMode = TOPIC_MODE_PERSIST_DATA self.obsoleteFileInterval = options.obsoleteFileInterval self.reservedFileCount = options.reservedFileCount self.deleteTopicData = options.deleteTopicData self.commitTime = options.commitTime self.commitData = options.commitData self.compressMsg = options.compressMsg self.compressThres = options.compressThres self.dfsRoot = options.dfsRoot self.topicGroup = options.topicGroup self.extendDfsRoot = options.extendDfsRoot self.expiredTime = options.expiredTime self.owners = options.owners self.needSchema = options.needSchema self.schemaVersions = options.schemaVersions self.requestTimeout = options.requestTimeout self.sealed = True if (options.sealed.lower() == 'true') else False if options.topicType == "TOPIC_TYPE_NORMAL": self.topicType = TOPIC_TYPE_NORMAL elif options.topicType == "TOPIC_TYPE_PHYSIC": self.topicType = TOPIC_TYPE_PHYSIC elif options.topicType == "TOPIC_TYPE_LOGIC": self.topicType = TOPIC_TYPE_LOGIC elif options.topicType == "TOPIC_TYPE_LOGIC_PHYSIC": self.topicType = TOPIC_TYPE_LOGIC_PHYSIC self.physicTopicLst = options.physicTopicLst self.enableTTLDel = False if (options.enableTTLDel.lower() == 'false') else True self.enableLongPolling = True if (options.enableLongPolling.lower() == 'true') else False self.versionControl = True if (options.versionControl.lower() == 'true') else False self.enableMergeData = True if (options.enableMergeData.lower() == 'true') else False self.permitUser = options.permitUser self.readNotCommmitMsg = False if (options.readNotCommmitMsg.lower() == 'false') else True def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.createTopic( topicName=self.topicName, partCnt=self.partCnt, rangeCnt=self.rangeCnt, partBufSize=self.partBuf, partMinBufSize=self.partMinBuf, partMaxBufSize=self.partMaxBuf, partResource=self.partResource, partLimit=self.partLimit, topicMode=self.topicMode, needFieldFilter=self.needFieldFilter, obsoleteFileTimeInterval=self.obsoleteFileInterval, reservedFileCount=self.reservedFileCount, partFileBufSize=self.partFileBuf, deleteTopicData=self.deleteTopicData, securityCommitTime=self.commitTime, securityCommitData=self.commitData, compressMsg=self.compressMsg, compressThres=self.compressThres, dfsRoot=self.dfsRoot, topicGroup=self.topicGroup, extendDfsRoot=self.extendDfsRoot, expiredTime=self.expiredTime, owners=self.owners, needSchema=self.needSchema, schemaVersions=self.schemaVersions, requestTimeout=self.requestTimeout, sealed=self.sealed, topicType=self.topicType, physicTopicLst=self.physicTopicLst, enableTTLDel=self.enableTTLDel, enableLongPolling=self.enableLongPolling, versionControl=self.versionControl, enableMergeData=self.enableMergeData, permitUser=self.permitUser, readNotCommmitMsg=self.readNotCommmitMsg) if not ret: print "Add topic Failed!" return ret, response, 1 print time.time(), "Add topic success!" if self.timeout is not None: print time.time(), "wait topic ready." return self._waitTopicReady(self.timeout) return ret, response, 0 def _waitTopicReady(self, timeout): while timeout > 0: if self._isTopicReady(self.topicName): print time.time(), "topic [%s] is ready" % self.topicName return "", "", 0 timeout -= 0.1 time.sleep(0.1) return "", "wait topic ready timeout.", -1 def _isTopicReady(self, topicName): ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName) if ret: if response.HasField(swift_common_define.PROTO_TOPIC_INFO): topicInfo = response.topicInfo for partitionInfo in topicInfo.partitionInfos: if partitionInfo.status != PARTITION_STATUS_RUNNING: print "ERROR: partition info status: %s, response: %s" % \ (str(partitionInfo), str(response)) return False else: print "ERROR: response does not have topicInfo field! response: %s" % str(response) return False else: print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg) return False return True class TopicBatchAddCmd(base_cmd.BaseCmd): ''' swift {atb|addtopicbatch} {-z zookeeper_address | --zookeeper=zookeeper_address } {-t topic_name | --topic=topic_name } {-c partition_count | --partcount=partition_count } {-I | --permitUsers=permitUsers} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_names, --topic=topic_names : required, topic name, eg: news,old -c partition_count, --partcount=partition_count : required, partition count of the topic -I permitUsers, --permitUsers=permitUsers : optional, define same as addtopic, seperate by \\; Example: swift at -z zfs://10.250.12.23:1234/root -t news\\;old -c 20\\;3 -x 10\\;20 -O wls,sdd,xieq\\;wls separator is ; and do not forget escape \\ when needed ''' def addOptions(self): super(TopicBatchAddCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicNames') self.parser.add_option('-c', '--partcount', action='store', dest='partCnt') self.parser.add_option('-R', '--rangecount', action='store', dest='rangeCnt') self.parser.add_option('-b', '--partfilebuf', action='store', dest='partFileBuf') self.parser.add_option('-x', '--partminbuf', action='store', dest='partMinBuf') self.parser.add_option('-y', '--partmaxbuf', action='store', dest='partMaxBuf') self.parser.add_option('-r', '--partresource', action='store', dest='partResource') self.parser.add_option('-l', '--partlimit', action='store', dest='partLimit') self.parser.add_option('-w', '--waittopicready', type='int', action='store', dest='timeout') self.parser.add_option('-m', '--topicmode', action='store', dest='topicMode') # choices=['normal_mode', 'security_mode', 'memory_only_mode', 'memory_prefer_mode'] self.parser.add_option('-f', '--fieldfilter', action='store', dest='fieldFilter') # default=False self.parser.add_option('-i', '--obsoletefileinterval', action='store', dest='obsoleteFileInterval') self.parser.add_option('-n', '--reservedfilecount', action='store', dest='reservedFileCount') self.parser.add_option('-d', '--deletetopicdata', action='store', dest='deleteTopicData') # default=False self.parser.add_option('-o', '--committime', action='store', dest='commitTime') self.parser.add_option('-p', '--commitdata', action='store', dest='commitData') self.parser.add_option('-O', '--owners', action='store', dest='owners') self.parser.add_option('-q', '--compress', action='store', dest='compressMsg') # default=False self.parser.add_option('-u', '--compressthres', action='store', dest='compressThres') self.parser.add_option('-a', '--dfsRoot', action='store', dest='dfsRoot') self.parser.add_option('-e', '--extendDfsRoot', action='store', dest='extendDfsRoot') self.parser.add_option('-g', '--topicGroup', action='store', dest='topicGroup') self.parser.add_option('-j', '--expired', action='store', dest='expiredTime') self.parser.add_option('-I', "--permitUsers", action="store", dest="permitUsers") self.parser.add_option('-Z', '--readnotcommitmsg', action='store', dest='readNotCommmitMsg') def checkOptionsValidity(self, options): ret, errMsg = super(TopicBatchAddCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicNames is None: errMsg = "ERROR: topic names must be specified!" return False, errMsg return True, '' def initMember(self, options): super(TopicBatchAddCmd, self).initMember(options) self.topicNames = options.topicNames self.partCnt = options.partCnt self.rangeCnt = options.rangeCnt self.partFileBuf = options.partFileBuf self.partMinBuf = options.partMinBuf self.partMaxBuf = options.partMaxBuf self.partResource = options.partResource self.partLimit = options.partLimit self.timeout = options.timeout self.needFieldFilter = options.fieldFilter self.topicMode = options.topicMode self.obsoleteFileInterval = options.obsoleteFileInterval self.reservedFileCount = options.reservedFileCount self.deleteTopicData = options.deleteTopicData self.commitTime = options.commitTime self.commitData = options.commitData self.compressMsg = options.compressMsg self.compressThres = options.compressThres self.dfsRoot = options.dfsRoot self.topicGroup = options.topicGroup self.extendDfsRoot = options.extendDfsRoot self.expiredTime = options.expiredTime self.owners = options.owners self.permitUsers = options.permitUsers self.readNotCommmitMsg = options.readNotCommmitMsg def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.createTopicBatch( topicNames=self.topicNames, partCnt=self.partCnt, rangeCnt=self.rangeCnt, partMinBufSize=self.partMinBuf, partMaxBufSize=self.partMaxBuf, partResource=self.partResource, partLimit=self.partLimit, topicMode=self.topicMode, needFieldFilter=self.needFieldFilter, obsoleteFileTimeInterval=self.obsoleteFileInterval, reservedFileCount=self.reservedFileCount, partFileBufSize=self.partFileBuf, deleteTopicData=self.deleteTopicData, securityCommitTime=self.commitTime, securityCommitData=self.commitData, compressMsg=self.compressMsg, compressThres=self.compressThres, dfsRoot=self.dfsRoot, topicGroup=self.topicGroup, extendDfsRoot=self.extendDfsRoot, expiredTime=self.expiredTime, owners=self.owners, permitUsers=self.permitUsers, readNotCommmitMsg=self.readNotCommmitMsg) if not ret: print "Add topic Failed!" return ret, response, 1 print "Add topic success!" if self.timeout is not None: print "wait topic ready." return self._waitTopicReady(self.timeout) return ret, response, 0 def _waitTopicReady(self, timeout): topicLst = self.topicNames.split(';') while timeout > 0: readyCount = 0 for topic in topicLst: if self._isTopicReady(topic): print "topic [%s] is ready" % topic readyCount += 1 else: print "topic [%s] is not ready" % topic if readyCount == len(topicLst): return "", "", 0 timeout -= 0.1 time.sleep(0.1) return "", "wait topic ready timeout.", -1 def _isTopicReady(self, topicName): ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName) if ret: if response.HasField(swift_common_define.PROTO_TOPIC_INFO): topicInfo = response.topicInfo for partitionInfo in topicInfo.partitionInfos: if partitionInfo.status != PARTITION_STATUS_RUNNING: print "ERROR: partition info status: %s, response: %s" % \ (str(partitionInfo), str(response)) return False else: print "ERROR: response does not have topicInfo field! response: %s" % str(response) return False else: print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg) return False return True class TopicModifyCmd(base_cmd.BaseCmd): ''' swift {mt|modifytopic} {-z zookeeper_address | --zookeeper=zookeeper_address } {-t topic_name | --topic=topic_name } {-r partition_resource | --partresource=partition_resource } {-l partition_limit | --partlimit=partition_limit } {-g topic_group | --group=topic_group } {-j expired_time | --expired_time=expired_time } {-m topic_mode | --topicmode=topic_mode } {-c partition_count | --partcount=partition_count } {-C partition_range_count | --partrangecount=partition_range_count } {-y partition_max_buf_size | --partmaxbuf=partition_max_buf_size } {-x partition_min_buf_size | --partminbuf=partition_min_buf_size } {-i obsolete_file_interval | --obsoletefileinterval=obsolete_file_interval } {-n reserved_file_count | --reservedfilecount=reserved_file_count } {-o security_commit_time | --committime=security_commit_time } {-p security_commit_data | --commitdata=security_commit_data } {-O owners | --owners=owners } {-a | --dfsroot=dfs_root } {-e | --extenddfsroot=extend_dfs_root } {-E | --sealed=sealed } {-Y | --topicType=topic_type } {-P | --physicTopicLst=physic_topic_list} {-N | --enableTTLDel=enable_ttl_del} {-L | --readSizeLimitSec=read_size_limit_sec} {-G | --enableLongPolling=enable_long_polling} {-M | --enableMergeData=enable_merge_data} {-I permitUser | --permitUser=permitUser } {-v version_controller | --versionControl=version_control } {-Z | --readnotcommitmsg=read_notcommit_msg} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_names, --topic=topic_names : required, topic name, eg: news,news1 -f topic_file, --topic_file=topic_file : optional, topic file, will append in topic_names, one topic per line -r partition_resource --partresource=partition_resource : optional, partition resource -l partition_limit --partlimit=partition_limit : optional, partition limit of this topic in one broker -g topic_group --group=topic_group : optional, change topic group name -j expired_time --expired_time=expired_time : optional, change expired time for topic, -1 no expired, second -m topic_mode --topicmode=topic_mode : optional, normal_mode | security_mode | memory_only_mode | memory_prefer_mode, default is normal_mode -c partition_count, --partcount=partition_count : optional, partition count of the topic -C partition_range_count, --partrangecount=partition_range_count : optional, partition range count of the topic -x partition_min_buf_size --partmaxbuf=partition_max_buf_size : [optional], partition max buffer size of the topic, M -y partition_max_buf_size --partminbuf=partition_min_buf_size : [optional], partition min buffer size of the topic, M -i obsolete_file_interval --obsoletefileinterval=obsolete_file_interval : optional, obsolete file time interval, unit is hour. -n reserved_file_count --reservedfilecount=reserved_file_count : optional, reserved file count -o security_commit_time --committime=security_commit_time : optional, max wait time for commit message in security_mode -p security_commit_data --commitdata=security_commit_data : optional, max data size for commit message in security_mode -O owners --owners=owners : optional, topic owners, multi owners seperate by , -E --sealed : optional, is sealed topic or not -Y --topicType=topic_type : optional, TOPIC_TYPE_NORMAL | TOPIC_TYPE_PHYSIC | TOPIC_TYPE_LOGIC | TOPIC_TYPE_LOGIC_PHYSIC, default is TOPIC_TYPE_NORMAL -P --physicTopicLst=physic_topic_list : optional, physic topic list, seperate by ',' -N --enableTTLDel=enable_ttl_del : optional, enable TTL delete or not -L --readSizeLimitSec=read_size_limit_sec : optional, read dfs limit one sec -G --enable_long_polling=enableLongPolling : optional, enable long polling -M --enable_merge_data=enableMergeData : optional, enable merge data -I permitUser --permitUser=permitUser : optional, users allowed to operate this topic, seperate by ',' -v version_control --versionControl=version_control : optional, writer version controller -Z --readnotcommitmsg=read_notcommit_msg : optional, whether topic can read not committed msg Example: swift mt -z zfs://10.250.12.23:1234/root -t news -r 10 swift mt -z zfs://10.250.12.23:1234/root -t news -l 2 swift mt -z zfs://10.250.12.23:1234/root -t news -r 20 -l 3 swift mt -z zfs://10.250.12.23:1234/root -t news -g group_name swift mt -z zfs://10.250.12.23:1234/root -t news -j -1 swift mt -z zfs://10.250.12.23:1234/root -t news -m memory_only_mode swift mt -z zfs://10.250.12.23:1234/root -t news --permitUser=Hanmeimei,Lilei ''' def addOptions(self): super(TopicModifyCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') self.parser.add_option('-f', '--topic_file', action='store', dest='topicFile') self.parser.add_option('-r', '--partresource', type='int', action='store', dest='partResource') self.parser.add_option('-l', '--partlimit', type='int', action='store', dest='partLimit') self.parser.add_option('-g', '--group', action='store', dest='topicGroup') self.parser.add_option('-j', '--expired_time', type='int', action='store', dest='expiredTime') self.parser.add_option('-x', '--partminbuf', type='int', action='store', dest='partMinBuf') self.parser.add_option('-y', '--partmaxbuf', type='int', action='store', dest='partMaxBuf') self.parser.add_option( '-m', '--topicmode', type='choice', action='store', dest='topicMode', choices=[ 'normal_mode', 'security_mode', 'memory_only_mode', 'memory_prefer_mode', 'persist_data_mode']) self.parser.add_option('-c', '--partcount', type='int', action='store', dest='partCnt') self.parser.add_option('-C', '--partrangecount', type='int', action='store', dest='partRangeCnt') self.parser.add_option('-i', '--obsoletefileinterval', type='int', action='store', dest='obsoleteFileInterval') self.parser.add_option('-n', '--reservedfilecount', type='int', action='store', dest='reservedFileCount') self.parser.add_option('-o', '--committime', type='int', action='store', dest='commitTime') self.parser.add_option('-p', '--commitdata', type='int', action='store', dest='commitData') self.parser.add_option('-O', '--owners', action='store', dest='owners') self.parser.add_option('-a', '--dfsRoot', action='store', dest='dfsRoot') self.parser.add_option('-e', '--extendDfsRoot', action='store', dest='extendDfsRoot') self.parser.add_option('-E', '--sealed', action='store', dest='sealed') self.parser.add_option('-Y', '--topic_type', type='choice', action='store', dest='topicType', choices=['TOPIC_TYPE_NORMAL', 'TOPIC_TYPE_PHYSIC', 'TOPIC_TYPE_LOGIC', 'TOPIC_TYPE_LOGIC_PHYSIC']) self.parser.add_option('-P', '--physic_topic_list', action='store', dest='physicTopicLst') self.parser.add_option('-N', '--enable_ttl_del', action='store', dest='enableTTLDel') self.parser.add_option('-L', '--read_size_limit_sec', type='int', action='store', dest='readSizeLimitSec') self.parser.add_option('-G', '--enable_long_polling', action='store', dest='enableLongPolling') self.parser.add_option('-M', '--enable_merge_data', action='store', dest='enableMergeData') self.parser.add_option('-I', '--permitUser', action='store', dest='permitUser', default='') self.parser.add_option('-v', '--versionControl', action='store', dest='versionControl') self.parser.add_option('-Z', '--readnotcommitmsg', action='store', dest='readNotCommmitMsg') def checkOptionsValidity(self, options): ret, errMsg = super(TopicModifyCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicName is None: errMsg = "ERROR: topic must be specified!" return False, errMsg if options.partResource is not None and options.partResource <= 0: errMsg = "ERROR: partition resource [%d] must greater than zero" \ % options.partResource return False, errMsg if options.partLimit is not None and options.partLimit <= 0: errMsg = "ERROR: partition limit [%d] must greater than zero" % ( options.partLimit) return False, errMsg if options.expiredTime is not None and options.expiredTime < -1: errMsg = "ERROR: expired time should not less than -1 [%d]." \ % options.commitTime return False, errMsg if options.partMaxBuf is not None and options.partMaxBuf <= 0: errMsg = "ERROR: partition max buffer [%d] must greater than zero"\ % options.partMaxBuf return False, errMsg if options.partCnt is not None and options.partCnt <= 0: errMsg = "ERROR: partition count [%d] must greater than zero" \ % options.partCnt return False, errMsg if options.partRangeCnt is not None and options.partRangeCnt <= 0: errMsg = "ERROR: partition Range count [%d] must greater than zero" \ % options.partRangeCnt return False, errMsg if options.partMinBuf is not None and options.partMinBuf <= 0: errMsg = "ERROR: partition min buffer [%d] must greater than zero"\ % options.partMinBuf return False, errMsg if options.partMinBuf is not None and options.partMaxBuf is not None and options.partMinBuf > options.partMaxBuf: errMsg = "ERROR: partition min buffer [%d] must not greater than partition max"\ " buff [%d]" % (options.partMinBuf, options.partMaxBuf) return False, errMsg if options.obsoleteFileInterval is not None and options.obsoleteFileInterval <= 0: errMsg = "ERROR: obsolete file time interval [%d] must greater than zero" % ( options.obsoleteFileInterval) return False, errMsg if options.reservedFileCount is not None and options.reservedFileCount <= 0: errMsg = "ERROR: reserved file count [%d] must greater than zero" % ( options.reservedFileCount) return False, errMsg if options.sealed is not None and options.sealed not in ['true', 'false']: errMsg = "ERROR: sealed can only set ['true', 'false']" return False, errMsg if options.enableTTLDel is not None and options.enableTTLDel not in ['true', 'false']: errMsg = "ERROR: enableTTLDel can only set ['true', 'false']" return False, errMsg if options.readSizeLimitSec is not None and options.readSizeLimitSec < 0: errMsg = "ERROR: readSizeLimitSec [%d] must not less than zero" % ( options.readSizeLimitSec) return False, errMsg if options.enableLongPolling is not None and options.enableLongPolling not in ['true', 'false']: errorMsg = "ERROR, enableLongPolling can only set ['true', 'false']" return False, errMsg if options.enableMergeData is not None and options.enableMergeData not in ['true', 'false']: errorMsg = "ERROR, enableMergeData can only set ['true', 'false']" return False, errMsg if options.commitData is not None and options.commitData < 0: errMsg = "ERROR: security max commit data should not less than zero [%d]." \ % options.commitData return False, errMsg if options.commitTime is not None and options.commitTime < 0: errMsg = "ERROR: security max commit time should not less than zero [%d]." \ % options.commitTime return False, errMsg if options.readNotCommmitMsg is not None and options.readNotCommmitMsg not in ['true', 'false']: errorMsg = "ERROR, readNotCommmitMsg can only set ['true', 'false']" return False, errMsg if options.partLimit is None and options.partResource is None and \ options.topicGroup is None and options.expiredTime is None and \ options.partMaxBuf is None and options.partMinBuf is None and \ options.topicMode is None and options.obsoleteFileInterval is None and \ options.reservedFileCount is None and options.partCnt is None and \ options.commitData is None and options.commitTime is None and \ options.owners is None and options.dfsRoot is None and \ options.extendDfsRoot is None and options.partRangeCnt is None and\ options.sealed is None and options.topicType is None and\ options.physicTopicLst is None and options.enableTTLDel is None and\ options.readSizeLimitSec is None and options.enableLongPolling is None and\ options.enableMergeData is None and options.permitUser is None and options.versionControl is None\ and options.readNotCommmitMsg is None: errMsg = "ERROR: modify options: partlimit, topic group, partresource, "\ "expired time, partition_max_buf_size, partition_min_buf_size, "\ "obsolete_file_interval, reserved_file_count, dfs_root, "\ "extend_dfs_root, range_count in partition, owners "\ "security max commit data, security max commit time "\ "sealed, topic_type, physic_topic_list or enable_ttl_del "\ "read_size_limit_sec, enable_long_polling, enableMergeData, "\ "readNotCommmitMsg must specify at least one!" return False, errMsg return True, '' def initMember(self, options): super(TopicModifyCmd, self).initMember(options) self.topicNames = options.topicName.split(',') if (options.topicFile): f = open(options.topicFile, 'r') for line in f.readlines(): line = line.strip('\n') self.topicNames.append(line) self.partResource = options.partResource self.partLimit = options.partLimit self.topicGroup = options.topicGroup self.expiredTime = options.expiredTime self.partMinBuf = options.partMinBuf self.partMaxBuf = options.partMaxBuf self.topicMode = None if options.topicMode is not None: if options.topicMode == "normal_mode": self.topicMode = TOPIC_MODE_NORMAL elif options.topicMode == "security_mode": self.topicMode = TOPIC_MODE_SECURITY elif options.topicMode == "memory_only_mode": self.topicMode = TOPIC_MODE_MEMORY_ONLY elif options.topicMode == "memory_prefer_mode": self.topicMode = TOPIC_MODE_MEMORY_PREFER elif options.topicMode == "persist_data_mode": self.topicMode = TOPIC_MODE_PERSIST_DATA self.obsoleteFileInterval = options.obsoleteFileInterval self.reservedFileCount = options.reservedFileCount self.partCnt = options.partCnt self.partRangeCnt = options.partRangeCnt self.owners = options.owners self.dfsRoot = options.dfsRoot self.extendDfsRoot = options.extendDfsRoot self.sealed = None if options.sealed is not None: self.sealed = True if (options.sealed.lower() == 'true') else False self.topicType = None if options.topicType is not None: if options.topicType == "TOPIC_TYPE_NORMAL": self.topicType = TOPIC_TYPE_NORMAL elif options.topicType == "TOPIC_TYPE_PHYSIC": self.topicType = TOPIC_TYPE_PHYSIC elif options.topicType == "TOPIC_TYPE_LOGIC": self.topicType = TOPIC_TYPE_LOGIC elif options.topicType == "TOPIC_TYPE_LOGIC_PHYSIC": self.topicType = TOPIC_TYPE_LOGIC_PHYSIC self.physicTopicLst = options.physicTopicLst self.enableTTLDel = None if options.enableTTLDel is not None: self.enableTTLDel = False if (options.enableTTLDel.lower() == 'false') else True self.readSizeLimitSec = options.readSizeLimitSec self.enableLongPolling = None if options.enableLongPolling is not None: self.enableLongPolling = True if (options.enableLongPolling.lower() == 'true') else False self.enableMergeData = None if options.enableMergeData is not None: self.enableMergeData = True if (options.enableMergeData.lower() == 'true') else False self.commitTime = options.commitTime self.commitData = options.commitData self.permitUser = options.permitUser self.versionControl = None if options.versionControl is not None: self.versionControl = True if (options.versionControl.lower() == 'true') else False self.readNotCommmitMsg = None if options.readNotCommmitMsg is not None: self.readNotCommmitMsg = False if (options.readNotCommmitMsg.lower() == 'false') else True def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): responses = [] fail_cnt = 0 for topicName in self.topicNames: topicName = topicName.strip() ret, response, errorMsg = self.adminDelegate.modifyTopic( topicName, self.partResource, self.partLimit, self.topicGroup, self.expiredTime, self.partMinBuf, self.partMaxBuf, self.topicMode, self.obsoleteFileInterval, self.reservedFileCount, self.partCnt, self.owners, self.dfsRoot, self.extendDfsRoot, self.partRangeCnt, self.sealed, self.topicType, self.physicTopicLst, self.enableTTLDel, self.readSizeLimitSec, self.enableLongPolling, self.commitTime, self.commitData, self.enableMergeData, self.permitUser, self.versionControl, self.readNotCommmitMsg) if not ret: fail_cnt += 1 print("Modify topic [%s] failed!" % topicName) print(response) else: print("Modify topic [%s] success!" % topicName) responses.append(response) error_code = 1 if fail_cnt == len(self.topicNames) else 0 return "", responses, error_code class TopicDeleteCmd(base_cmd.BaseCmd): ''' swift {dt|deletetopic} {-z zookeeper_address | --zookeeper=zookeeper_address } {-t topic_name | --topic=topic_name } {-f file_name | --file=file_name } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_name, --topic=topic_name : optional, topic name, eg: news -f file_name, --file=file_name : optional, file name Example: swift dt -z zfs://10.250.12.23:1234/root -t news swift dt -z zfs://10.250.12.23:1234/root -f file_name ''' def addOptions(self): super(TopicDeleteCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') self.parser.add_option('-f', '--file', action='store', dest='fileName') def checkOptionsValidity(self, options): ret, errMsg = super(TopicDeleteCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicName is None and options.fileName is None: errMsg = "ERROR: topic or file must be specified!" return False, errMsg if options.topicName is not None and options.fileName is not None: errMsg = "ERROR: must specified one of topic and file" return False, errMsg return True, '' def initMember(self, options): super(TopicDeleteCmd, self).initMember(options) self.topicName = options.topicName self.fileName = options.fileName self.localFileUtil = local_file_util.LocalFileUtil() self.failTopicFile = './fail_topic_list_for_delete' def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): failTopicList = [] if self.topicName is not None: ret, respone, errorMsg = self.adminDelegate.deleteTopic(self.topicName) if not ret: print "Delete topic failed!" print errorMsg return ret, respone, 1 print "Delete topic success!" return ret, respone, 0 elif self.fileName is not None: topics = self.localFileUtil.cat(self.fileName) topicVec = topics.split('\n') for topicName in topicVec: ret, respone, errorMsg = self.adminDelegate.deleteTopic(topicName) if not ret: print "Delete topic %s failed!" % topicName print errorMsg failTopicList.append(topicName) else: print "Delete topic %s success!" % topicName if not self.writeFailTopic(failTopicList): print "write fail topic to file:fail_topic_list failed" return ret, respone, 1 if len(failTopicList) == 0: return ret, respone, 0 else: return ret, respone, 1 def writeFailTopic(self, failTopicList): topicStr = '' for failTopic in failTopicList: topicStr += failTopic + '\n' return self.localFileUtil.write(topicStr, self.failTopicFile, 'w') class TopicDeleteBatchCmd(base_cmd.BaseCmd): ''' swift {dtb|deletetopicbatch} {-z zookeeper_address | --zookeeper=zookeeper_address } {-t topic_names | --topic=topic_name1,topic_name2,...} {-f file_name | --file=file_name } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_names, --topic=topic_name1,topic_name2,... : optional, topic name, eg: news,old,test -f file_name, --file=file_name : optional, file name Example: swift dtb -z zfs://10.250.12.23:1234/root -t news,old swift dtb -z zfs://10.250.12.23:1234/root -f file_name ''' def addOptions(self): super(TopicDeleteBatchCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicNames') self.parser.add_option('-f', '--file', action='store', dest='fileName') def checkOptionsValidity(self, options): ret, errMsg = super(TopicDeleteBatchCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicNames is None and options.fileName is None: errMsg = "ERROR: topic or file must be specified!" return False, errMsg if options.topicNames is not None and options.fileName is not None: errMsg = "ERROR: must specified one of topic and file" return False, errMsg return True, '' def initMember(self, options): super(TopicDeleteBatchCmd, self).initMember(options) self.topicNames = options.topicNames self.fileName = options.fileName self.localFileUtil = local_file_util.LocalFileUtil() self.failTopicFile = './fail_topic_list_for_delete' def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): failTopicList = [] if self.topicNames is not None: topicVec = self.topicNames.split(',') ret, respone, errorMsg = self.adminDelegate.deleteTopicBatch(topicVec) if self.fromApi: return ret, respone, errorMsg if not ret: print "Delete topic failed!" print errorMsg return "", "", 1 print "Delete topic %s success!" % str(topicVec) return "", "", 0 elif self.fileName is not None: topics = self.localFileUtil.cat(self.fileName) topicVec = topics.split('\n') ret, respone, errorMsg = self.adminDelegate.deleteTopicBatch(topicVec) if self.fromApi: return ret, respone, errorMsg if not ret: print "Delete topic in file failed!" print errorMsg failTopicList.append(topicVec) else: print "Delete topic %s success!" % str(topicVec) if len(failTopicList) > 0 and not self.writeFailTopic(failTopicList): print "write fail topic to file:fail_topic_list failed" return "", "", 1 if len(failTopicList) == 0: return "", "", 0 else: return "", "", 1 def writeFailTopic(self, failTopicList): topicStr = '' for failTopic in failTopicList: topicStr += failTopic + '\n' return self.localFileUtil.write(topicStr, self.failTopicFile, 'w') class TopicNamesCmd(base_cmd.BaseCmd): ''' swift {gtn|gettopicnames} {-z zookeeper_address | --zookeeper=zookeeper_address } {-f file_name | --file=file_name } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -f file_name --file=file_name Example: swift gtn -z zfs://10.250.12.23:1234/root swift gtn -z zfs://10.250.12.23:1234/root -f file.txt ''' def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def addOptions(self): super(TopicNamesCmd, self).addOptions() self.parser.add_option('-f', '--file', action='store', dest='fileName') def initMember(self, options): super(TopicNamesCmd, self).initMember(options) self.fileName = options.fileName self.localFileUtil = local_file_util.LocalFileUtil() def writeTopicName(self, topicNames): topicStr = '' for topicName in topicNames: print topicName topicStr += topicName + '\n' return self.localFileUtil.write(topicStr, self.fileName, 'w') def run(self): ret, response, errMsg = self.adminDelegate.getAllTopicName() if self.fromApi: return ret, response, errMsg if not ret: print "Get all topic name failed! " print errMsg return "", "", 1 else: print "Get topic names success! " if len(response.names) == 0: print "Swift system has no topic!" else: if self.fileName: self.writeTopicName(response.names) else: namesStr = ", ".join(response.names) print "Topic names: [%s]" % namesStr return "", "", 0 class TopicInfosCmd(base_cmd.BaseCmd): ''' swift {gti|gettopicinfo} {-z zookeeper_address | --zookeeper=zookeeper_address} {-t topic_name | --topic=topic_name} [-c cmd_type | --cmdtype=cmd_type] options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_name, --topic=topic_name : required, topic name, eg: news -c cmd_type, --cmdtype=cmd_type : optional, command type, default: summary summary: topic summary infos message: protocol message verbose: topic verbose infos -s sortType --sort=sortType : optional, partid(default)|brokeraddress| partstatus -g group_name --group=group_name : optional, group name when get all topic info -T --request_time_out=optional : optional, timeout for request, ms Example: swift gti -z zfs://10.250.12.23:1234/root swift gti -z zfs://10.250.12.23:1234/root -t news swift gti -z zfs://10.250.12.23:1234/root -t news -c message swift gti -z zfs://10.250.12.23:1234/root -t news -c verbose swift gti -z zfs://10.250.12.23:1234/root -t news -c verbose -s brokeraddress ''' def addOptions(self): super(TopicInfosCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') self.parser.add_option('-g', '--group', action='store', dest='groupName') self.parser.add_option('-c', '--cmdtype', type='choice', action='store', choices=['summary', 'message', 'verbose'], dest='cmdType', default='summary') self.parser.add_option('-s', '--sort', type='choice', action='store', dest='sortType', choices=[swift_common_define.PART_SORT_PART_ID, swift_common_define.PART_SORT_PART_STATUS, swift_common_define.PART_SORT_BROKER_ADDRESS], default='partid') self.parser.add_option('-T', '--optional', type='int', action='store', dest='requestTimeout') def checkOptionsValidity(self, options): ret, errMsg = super(TopicInfosCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicName is None: if options.cmdType != 'summary': errMsg = "ERROR: getAllTopicInfos method only support summary cmd type!" return False, errMsg return True, '' def initMember(self, options): super(TopicInfosCmd, self).initMember(options) self.topicName = options.topicName self.cmdType = options.cmdType self.sortType = options.sortType self.groupName = options.groupName self.requestTimeout = options.requestTimeout def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd, self.options.accessId, self.options.accessKey) return True def printTopicInfo(self): ret, response, errMsg = self.adminDelegate.getTopicInfo(self.topicName, self.requestTimeout) if self.fromApi: return ret, response, errMsg if not ret: print "Get topic info failed!" print errMsg return "", "", 1 if not response.HasField(swift_common_define.PROTO_TOPIC_INFO): print "ERROR: response does not have topicInfo field!" return "", "", 1 topicInfo = response.topicInfo if self.cmdType == "summary" or self.cmdType == 'verbose': topicInfoAnalyzer = status_analyzer.TopicInfoAnalyzer() partCount = topicInfoAnalyzer.getPartitionCount(topicInfo) print "TopicName: %s" % topicInfo.name if topicInfo.topicMode == TOPIC_MODE_NORMAL: print "TopicMode: normal_mode" elif topicInfo.topicMode == TOPIC_MODE_SECURITY: print "TopicMode: security_mode" print "MaxCommitTime: %d" % topicInfo.maxWaitTimeForSecurityCommit print "MaxCommitSize: %d" % topicInfo.maxDataSizeForSecurityCommit elif topicInfo.topicMode == TOPIC_MODE_MEMORY_ONLY: print "TopicMode: memory_only_mode " elif topicInfo.topicMode == TOPIC_MODE_MEMORY_PREFER: print "TopicMode: memory_prefer_mode " if topicInfo.needFieldFilter: print "NeedFieldFilter: true" else: print "NeedFieldFilter: false" if topicInfo.needSchema: print "NeedSchema: true" else: print "NeedSchema: false" print "SchemaVersions: %s" % ','.join([str(x) for x in topicInfo.schemaVersions]) print "TopicStatus: %s" % ( swift_util.SwiftUtil.protoEnumToString(topicInfo, "status")[13:]) print "PartitionCount: %d" % topicInfo.partitionCount print "RangeCountInPartition: %d" % topicInfo.rangeCountInPartition print "PartitionMaxBufferSize: %d" % topicInfo.partitionMaxBufferSize print "PartitionMinBufferSize: %d" % topicInfo.partitionMinBufferSize print "PartitionBufferSize: %d" % topicInfo.partitionBufferSize print "PartitionFileBufferSize: %d" % topicInfo.partitionFileBufferSize print "PartitionResource: %d" % topicInfo.resource print "PartitionLimit: %d" % topicInfo.partitionLimit print "ObsoleteFileTimeInterval: %d" % topicInfo.obsoleteFileTimeInterval print "ReservedFileCount: %d" % topicInfo.reservedFileCount if topicInfo.deleteTopicData: print "DeleteTopicData: true" else: print "DeleteTopicData: false" print "TopicGroup: %s" % topicInfo.topicGroup print "TopicExpiredTime: %d" % topicInfo.topicExpiredTime if 0 == len(topicInfo.owners): print "owners: %s" % 'NONE' else: print "owners: %s" % ','.join(topicInfo.owners) if topicInfo.topicType == TOPIC_TYPE_NORMAL: print "TopicType: TOPIC_TYPE_NORMAL" elif topicInfo.topicType == TOPIC_TYPE_PHYSIC: print "TopicType: TOPIC_TYPE_PHYSIC" elif topicInfo.topicType == TOPIC_TYPE_LOGIC: print "TopicType: TOPIC_TYPE_LOGIC" elif topicInfo.topicType == TOPIC_TYPE_LOGIC_PHYSIC: print "TopicType: TOPIC_TYPE_LOGIC_PHYSIC" if 0 == len(topicInfo.physicTopicLst): print "physicTopicLst: []" else: print "physicTopicLst: %s" % ','.join(topicInfo.physicTopicLst) if topicInfo.sealed: print "Sealed: true" else: print "Sealed: false" if topicInfo.enableTTLDel: print "enableTTLDel: true" else: print "enableTTLDel: false" print "readSizeLimitSec: %d" % topicInfo.readSizeLimitSec if topicInfo.enableLongPolling: print "enableLongPolling: true" else: print "enableLongPolling: false" if topicInfo.versionControl: print "versionControl: true" else: print "versionControl: false" if topicInfo.enableMergeData: print "enableMergeData: true" else: print "enableMergeData: false" if 0 == len(topicInfo.permitUser): print "permitUser: NONE" else: print "permitUser: %s" % ','.join(topicInfo.permitUser) if topicInfo.readNotCommittedMsg: print "readNotCommittedMsg: true" else: print "readNotCommittedMsg: false" print("Partition(waiting/starting/running/stopping/recovering/none): " "%d/%d/%d/%d/%d/%d ") % ( partCount.partWaiting, partCount.partStarting, partCount.partRunning, partCount.partStopping, partCount.partRecovering, partCount.partNone) if self.cmdType == "verbose": partInfos = topicInfoAnalyzer.getSortedPartitionInfo( topicInfo, self.sortType) if len(partInfos) > 0: print "" print "Partition Detailed Infos:" print '%-8s %-13s %-40s' % ( swift_common_define.PART_SORT_PART_ID, swift_common_define.PART_SORT_PART_STATUS, swift_common_define.PART_SORT_BROKER_ADDRESS) for partInfo in partInfos: print '%-8d %-13s %-40s' % ( partInfo.id, swift_util.SwiftUtil.protoEnumToString(partInfo, "status")[17:], partInfo.brokerAddress) elif self.cmdType == "message": print topicInfo return "", "", 0 def printAllTopicInfo(self): ret, response, errMsg = self.adminDelegate.getAllTopicInfo() if self.fromApi: return ret, response, errMsg if not ret: print "Get topic info failed!" print errMsg return "", "", 1 print "Get all topic info success!" allTopicInfo = response.allTopicInfo for topicInfo in allTopicInfo: topicInfoAnalyzer = status_analyzer.TopicInfoAnalyzer() partCount = topicInfoAnalyzer.getPartitionCount(topicInfo) topicName = topicInfo.name if self.groupName and topicInfo.topicGroup != self.groupName: continue topicStatus = swift_util.SwiftUtil.protoEnumToString(topicInfo, "status")[13:] print("%s.TopicStatus[%s].PartitionStatus(waiting/starting/running/stopping/recovering): " "%d/%d/%d/%d/%d ") % (topicName, topicStatus, partCount.partWaiting + partCount.partNone, partCount.partStarting, partCount.partRunning, partCount.partStopping, partCount.partRecovering) return "", "", 0 def run(self): if self.topicName is not None: return self.printTopicInfo() else: return self.printAllTopicInfo() class TopicSimpleInfosCmd(base_cmd.BaseCmd): ''' swift {gtsi|gettopicsimpleinfo} {-z zookeeper_address | --zookeeper=zookeeper_address} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address Example: swift gtsi -z zfs://10.250.12.23:1234/root ''' def addOptions(self): super(TopicSimpleInfosCmd, self).addOptions() def checkOptionsValidity(self, options): ret, errMsg = super(TopicSimpleInfosCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg return True, '' def initMember(self, options): super(TopicSimpleInfosCmd, self).initMember(options) def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errMsg = self.adminDelegate.getAllTopicSimpleInfo() if self.fromApi: return ret, response, errMsg print ret, response, errMsg class TopicDeleteByTimeCmd(transport_cmd_base.TransportCmdBase): ''' swift {dtt|deletetopicsbytime} {-z zookeeper_address | --zookeeper=zookeeper_address } {-i interval | --interval=interval } {-e exclude_topics, | --exclude=exclude_topics } {-f exclude_topics_file, | --exclude_file=exclude_topics_file } {-p prefix_name | --prefix_name=prefix_name } {-t topic_names | --topic_names=topic_names } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -i time_interval, --interval=time_interval : required, delete topics if the last message arrived interval hours before, (unit:hours) -e exclude_topics, --exclude=exclude_topics : option, don't delete exclude topics -f exclude_topics_file, --exclude_file=exclude_topics_file : option, don't delete exclude topics in file,one topic per line -p prefix_name --prefix_name=prefix_name : option, delete topic prefix with specified -t topic_names --topic_names=topic_names : option, delete topic with specified topic names Example: swift dtt -z zfs://10.250.12.23:1234/root -i 48 swift dtt -z zfs://10.250.12.23:1234/root -i 48 -e topic_a,topic_b swift dtt -z zfs://10.250.12.23:1234/root -i 48 -f a.txt swift dtt -z zfs://10.250.12.23:1234/root -i 1 -p model_ swift dtt -z zfs://10.250.12.23:1234/root -i 1 -t topic_name_file ''' def addOptions(self): super(TopicDeleteByTimeCmd, self).addOptions() self.parser.add_option('-i', '--interval', type=int, action='store', dest='interval') self.parser.add_option('-e', '--exclude', action='store', dest='excludeTopics') self.parser.add_option('-f', '--exclude_file', action='store', dest='excludeFile') self.parser.add_option('-p', '--prefix_name', action='store', dest='prefixName') self.parser.add_option('-t', '--topic_names', action='store', dest='topicNames') def checkOptionsValidity(self, options): if not super(TopicDeleteByTimeCmd, self).checkOptionsValidity(options): return False if options.interval is None: print "ERROR: interval must be specified!" return False return True def initMember(self, options): super(TopicDeleteByTimeCmd, self).initMember(options) self.interval = options.interval self.prefixName = options.prefixName self.excludeTopics = [] if options.excludeTopics is not None: self.excludeTopics = options.excludeTopics.split(",") if options.excludeFile is not None: f = open(options.excludeFile, 'r') for line in f.readlines(): line = line.strip('\n') self.excludeTopics.append(line) f.close() print "exclude topics:", self.excludeTopics self.topicNames = [] if options.topicNames is not None: f = open(options.topicNames, 'r') for line in f.readlines(): line = line.strip('\n') self.topicNames.append(line) f.close() print "topics:", self.topicNames def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) self.brokerDelegate = swift_broker_delegate.SwiftBrokerDelegate(self.options.username, self.options.passwd) return True def run(self): delTopicNames = self.deleteTopics() print delTopicNames if (len(delTopicNames) == 0): print "no topic need to delete." return "", "", 0 def delTopic(self, topicName): timeFormat = '%Y-%m-%d %X' ret, reponse, errorMsg = self.adminDelegate.deleteTopic(topicName) if not ret: print "[%s] Delete topic %s failed!" % (time.strftime(timeFormat, time.localtime()), topicName) print errorMsg else: print "[%s] Delete topic %s success!" % (time.strftime(timeFormat, time.localtime()), topicName) def deleteTopics(self): delList = [] ret, response, errMsg = self.adminDelegate.getAllTopicInfo() if not ret: print "get all topic info failed [%s]" % errMsg return delList allTopicInfo = response.allTopicInfo for topicInfo in allTopicInfo: topicInfoAnalyzer = status_analyzer.TopicInfoAnalyzer() partCount = topicInfoAnalyzer.getPartitionCount(topicInfo) topicName = topicInfo.name if len(topicInfo.partitionInfos) == partCount.partRunning: if self.needDelete(topicInfo) and topicName not in self.excludeTopics: delList.append(topicName) self.delTopic(topicName) return delList def needDelete(self, topicInfo): topicName = topicInfo.name if self.prefixName is not None and not topicName.startswith(self.prefixName): return False if len(self.topicNames) > 0 and topicName not in self.topicNames: return False maxTime = 0 allPartNoData = True curTime = int(time.time()) for partInfo in topicInfo.partitionInfos: brokerAddr = "tcp:%s" % partInfo.brokerAddress ret, data = self.brokerDelegate.getMaxMessageId(brokerAddr, topicName, partInfo.id) if not ret: if data.find('ERROR_BROKER_NO_DATA') != -1: allPartNoData = allPartNoData and True continue response = data if response.HasField(swift_common_define.PROTO_TIME_STAMP): if response.timestamp > maxTime: maxTime = response.timestamp if maxTime != 0: maxTime = maxTime / 1000 / 1000 print "has data, topic name:%s, not active time: %f(hour)" % (topicName, float(curTime - maxTime) / 3600) if float(curTime - maxTime) / 3600 > float(self.interval): return True else: return False elif allPartNoData: createTime = self.getTopicCreateTime(topicInfo) print "no data, topic name:%s, not active time: %f(hour)" % (topicName, float(curTime - createTime) / 3600) if float(curTime - createTime) / 3600 > float(self.interval): return True else: return False else: return False def getTopicCreateTime(self, topicInfo): if topicInfo.HasField("createTime"): return int(topicInfo.createTime) / 1000 / 1000 else: path = self.zfsAddress + '/topics/' + topicInfo.topicName ret = self.fileUtil.getMeta(path) prefixStr = 'node last modify time: ' pos = ret.find(prefixStr) if (pos != -1): timeStr = ret[pos + len(prefixStr):].strip() createTime = time.mktime(time.strptime(timeStr, '%Y-%m-%d %H:%M:%S')) return int(createTime) else: return -1 class TopicDataDeleteCmd(base_cmd.BaseCmd): ''' swift {dtd|deletetopicdata} {-z zookeeper_address | --zookeeper=zookeeper_address } {-d dfs_root | --dfs=dfs_root } {-i interval | --interval=interval } {-e exclude | --exclude=exclude_topics } {-t topic_name | --topic_name=topic_name } {-f exclude_file | --exclude_file=exclude_file } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -d dfs_root, --dfs=dfs_root : required, dfs root address -i time_interval, --interval=time_interval : required, delete topic data if the topic is not running and the last message is arrived interval time ago. (unit:hours) -e exclude_topics, --exclude=exclude_topics : option, don't delete the specified topic data -t topic_name, --topic=topic_name : option, only delete the specified topic data -f exclude_file, --exclude_file=exclude_file : option, don't delete the specified topic data in file Example: swift dtd -z zfs://10.250.12.23:1234/root -i 72 -d hdfs://xxxx/path swift dtd -z zfs://10.250.12.23:1234/root -i 72 -d hdfs://xxxx/path -f abc.txt swift dtd -z zfs://10.250.12.23:1234/root -i 72 -d hdfs://xxxx/path -e topic_a,topic_b swift dtd -z zfs://10.250.12.23:1234/root -i 72 -t topic_a -d hdfs://xxxx/path ''' def addOptions(self): super(TopicDataDeleteCmd, self).addOptions() self.parser.add_option('-d', '--dfs', action='store', dest='dfsRoot') self.parser.add_option('-i', '--interval', type='int', action='store', dest='interval') self.parser.add_option('-e', '--exclude', action='store', dest='excludeTopics') self.parser.add_option('-t', '--topic_name', action='store', dest='topicName') self.parser.add_option('-f', '--exclude_file', action='store', dest='excludeFile') def checkOptionsValidity(self, options): if not super(TopicDataDeleteCmd, self).checkOptionsValidity(options): return False if options.interval is None: print "ERROR: interval must be specified!" return False if options.dfsRoot is None: print "ERROR: dfsRoot must be specified!" return False return True def initMember(self, options): super(TopicDataDeleteCmd, self).initMember(options) self.interval = options.interval self.excludeTopics = [] if options.excludeTopics is not None: self.excludeTopics = options.excludeTopics.split(",") if options.excludeFile is not None: f = open(options.excludeFile, 'r') for line in f.readlines(): line = line.strip('\n') self.excludeTopics.append(line) f.close() print "exclude topics:", self.excludeTopics self.specifiedTopic = options.topicName self.dfsRoot = options.dfsRoot def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): runningTopicName = self.getAllRunningTopicNames() print runningTopicName if self.specifiedTopic is not None and self.specifiedTopic in runningTopicName: print "the specified topic [%s] still running, can't delete the data." % self.specifiedTopic return "", "", 1 allTopicName = self.getCandidataTopicNames() print allTopicName delTopicName = [] for topicName in allTopicName: if topicName not in runningTopicName and topicName not in self.excludeTopics: if self.specifiedTopic is not None: if topicName == self.specifiedTopic: delTopicName.append(self.specifiedTopic) else: delTopicName.append(topicName) self.delTopicData(delTopicName) return "", "", 0 def getAllRunningTopicNames(self): topicList = [] ret, response, errMsg = self.adminDelegate.getAllTopicInfo() if not ret: print "get all topic info failed [%s]." % errMsg return topicList allTopicInfo = response.allTopicInfo for topicInfo in allTopicInfo: topicList.append(topicInfo.name) return topicList def getCandidataTopicNames(self): topicList = [] self.fileUtil.addHadoopHome(self.toolsConfig.getHadoopHome()) print "get topic info in ", self.dfsRoot dirs = self.fileUtil.listDir(self.dfsRoot) print "all topic count [%d]" % len(dirs) curTime = int(time.time()) for topicName in dirs: maxTime = self.getTopicLastModifyTime(self.fileUtil, self.dfsRoot + "/" + topicName) print "topic name:%s, not active time: %f(hour)" % (topicName, float(curTime - maxTime) / 3600) if float(curTime - maxTime) / 3600 > float(self.interval): topicList.append(topicName) return topicList def getTopicLastModifyTime(self, fileUtil, topicDataPath): modifyTime = -1 partitions = fileUtil.listDir(topicDataPath) if len(partitions) == 0: return modifyTime for partition in partitions: files = fileUtil.listDir(topicDataPath + "/" + partition) if len(files) == 0: continue else: files.sort() metaInfo = fileUtil.getMeta(topicDataPath + "/" + partition + "/" + files[len(files) - 1]) prefix = "file last modify time: " pos = metaInfo.find(prefix) if pos != -1: timeStr = metaInfo[pos + len(prefix):] timeStr = timeStr.strip() createTime = int(time.mktime(time.strptime(timeStr, '%Y-%m-%d %H:%M:%S'))) if createTime > modifyTime: modifyTime = createTime return modifyTime def delTopicData(self, topicNames): for topic in topicNames: print "deleting topic data [%s]" % topic ret = self.fileUtil.remove(self.dfsRoot + "/" + topic) if not ret: print "delete topic data [%s] failed!" % topic class ExportTopicsCmd(base_cmd.BaseCmd): ''' swift {et|exporttopics} {-z zookeeper_address | --zookeeper=zookeeper_address } {-f export_file_name | --file=export_file_name } {-g group_name | --group=group_name } {-p prefix | --prefix=prefix_name } {-t topic_names | --topic=topic_names } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -f export_file_name, --file=export_file_name : required, export file name -g group_name, --group=group_name : optional, export group name, default export all topic -m migrateDfs, --migrate=migrate_dfs : optional, migrate dfs, add new hdfs, add current dfs into extendDfs root -p prefix, --prefix = prefix : optional, export topic info with prefix -i topicName, --topic = topic_names : optional, export topic info with topic_name file Example: swift et -z zfs://10.250.12.23:1234/root -f a.json swift et -z zfs://10.250.12.23:1234/root -f a.json -g swift swift et -z zfs://10.250.12.23:1234/root -f a.json -p prefix swift et -z zfs://10.250.12.23:1234/root -f a.json -i topicNameFile ''' def addOptions(self): super(ExportTopicsCmd, self).addOptions() self.parser.add_option('-f', '--file', action='store', dest='fileName') self.parser.add_option('-g', '--group', action='store', dest='groupName') self.parser.add_option('-m', '--migrate', action='store', dest='migrateDfs') self.parser.add_option('-p', '--prefix', action='store', dest='prefix') self.parser.add_option('-i', '--topic', action='store', dest='topicNames') def checkOptionsValidity(self, options): ret, errMsg = super(ExportTopicsCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.fileName is None: errMsg = "ERROR: file name must be specified!" print errMsg return False, errMsg return True, '' def initMember(self, options): super(ExportTopicsCmd, self).initMember(options) self.fileName = options.fileName self.localFileUtil = local_file_util.LocalFileUtil() self.groupName = options.groupName self.migrateDfs = options.migrateDfs self.prefix = options.prefix self.topicNames = [] if options.topicNames: f = open(options.topicNames, 'r') for line in f.readlines(): line = line.strip('\n') self.topicNames.append(line) f.close() def getDfsRoot(self): versionFile = self.zfsAddress + "/config/version" localConfigFile = "./.swift.conf" configFile = self.zfsAddress + "/config/swift.conf" if self.fileUtil.exists(versionFile): configFile = self.getLatestConfig(self.zfsAddress + "/config") + "/swift.conf" self.fileUtil.copy(configFile, localConfigFile, True) configLines = self.localFileUtil.readLines(localConfigFile) for line in configLines: line = line.strip() if line.startswith("data_root_path"): strVec = line.split("=") if len(strVec) == 2: return strVec[1].strip() return "" def hasDfsRoot(self, metaStr): metaVec = metaStr.split('\n') for metaMeg in metaVec: metaMeg = metaMeg.strip() if metaMeg.startswith("dfsRoot"): dfsVec = metaMeg.split("Root: ") if len(dfsVec) == 2 and len(dfsVec[1].strip()) > 2: return True return False def hasTopicGroup(self, metaStr): metaVec = metaStr.split('\n') for metaMeg in metaVec: metaMeg = metaMeg.strip() if metaMeg.startswith("topicGroup"): dfsVec = metaMeg.split("opicGroup: ") if len(dfsVec) == 2 and len(dfsVec[1].strip()) > 2: return True return False def generateMap(self, topicMetas): retMap = {} for meta in topicMetas.topicMetas: tmpMap = {} if self.groupName is not None: if self.hasTopicGroup(str(meta)): if meta.topicGroup != self.groupName: continue else: if self.groupName != 'default': continue if self.prefix is not None and not meta.topicName.startswith(self.prefix): continue if len(self.topicNames) > 0 and meta.topicName not in self.topicNames: continue metaVec = str(meta).split('\n') for metaMsg in metaVec: if metaMsg == '': continue metaMsg = metaMsg.strip() pos = metaMsg.find(':') metaMsgVec = [] metaMsgVec.append(metaMsg[0:pos]) metaMsgVec.append(metaMsg[pos + 1:len(metaMsg)]) if metaMsgVec[1] is not None: metaMsgVec[1] = metaMsgVec[1].strip() metaMsgVec[0] = metaMsgVec[0].strip() if metaMsgVec[1].startswith('"') and metaMsgVec[1].endswith('"'): metaMsgVec[1] = metaMsgVec[1].rstrip('"').lstrip('"') if tmpMap.has_key(metaMsgVec[0]): tmpMap[metaMsgVec[0]] += ',' + metaMsgVec[1] else: tmpMap[metaMsgVec[0]] = metaMsgVec[1] else: stripKey = metaMsgVec[0].strip() if tmpMap.has_key(stripKey): tmpMap[stripKey] += ',' else: tmpMap[stripKey] = '' if self.migrateDfs: self.updateMigrateDfs(tmpMap) retMap[meta.topicName] = tmpMap return retMap def updateMigrateDfs(self, topicInfo): dfsRoot = topicInfo["dfsRoot"] if topicInfo.has_key("extendDfsRoot"): extendDfsRoot = topicInfo["extendDfsRoot"] extendDfsList = extendDfsRoot.split(',') if dfsRoot not in extendDfsList: extendDfsList.append(dfsRoot) if self.migrateDfs in extendDfsList: extendDfsList.remove(self.migrateDfs) topicInfo["extendDfsRoot"] = ",".join(extendDfsList) else: topicInfo["extendDfsRoot"] = dfsRoot topicInfo["dfsRoot"] = self.migrateDfs def run(self): metaFile = self.zfsAddress + "/topic_meta" localFile = "./.topic_meta" self.fileUtil.copy(metaFile, localFile, True) metaStr = self.localFileUtil.cat(localFile) deCompStr = zlib.decompress(metaStr) topicMetas = TopicMetas() topicMetas.ParseFromString(deCompStr) if self.fromApi: return 0, topicMetas, 0 dfsRoot = self.getDfsRoot() for meta in topicMetas.topicMetas: if not self.hasDfsRoot(str(meta)): meta.dfsRoot = dfsRoot retMap = self.generateMap(topicMetas) print "export topic count ", len(retMap) retJson = json_wrapper.write(retMap, format=True) self.localFileUtil.write(retJson, self.fileName, 'w') return "", "", 0 class TopicMetaCmd(base_cmd.BaseCmd): ''' swift {gtm|gettopicmetas} {-z zookeeper_address | --zookeeper=zookeeper_address } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address Example: swift gtm -z zfs://10.250.12.23:1234/root ''' def addOptions(self): super(TopicMetaCmd, self).addOptions() def initMember(self, options): super(TopicMetaCmd, self).initMember(options) self.localFileUtil = local_file_util.LocalFileUtil() def run(self): metaFile = self.zfsAddress + "/topic_meta" localFile = "./.topic_meta" self.fileUtil.copy(metaFile, localFile, True) metaStr = self.localFileUtil.cat(localFile) if 0 == len(metaStr): return "", "", 1 deCompStr = zlib.decompress(metaStr) topicMetas = TopicMetas() topicMetas.ParseFromString(deCompStr) if self.fromApi: return "", topicMetas, 0 print topicMetas return "", "", 0 class ImportTopicsCmd(base_cmd.BaseCmd): ''' swift {it|importtopics} {-z zookeeper_address | --zookeeper=zookeeper_address } {-f local_file_name | --file=local_file_name } {-w time_out | --timeout=time_out} {-d delete_exist | --delete_exist=delete_exist} {-c continue | --continue=continue} {-g group | --group=group} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -f local_file_name, --file=local_file_name : required, local file name -w time_out, --timeout=time_out : optional, timeout for one topic -d delete_exist --delete_exist=delete_exist : optional, delete exist topic ,default is false -c continue_exist --continue=continue : optional, continue if add topic has error ,default is false -g group --group=group : optional, add in specified group name Example: swift it -z zfs://10.250.12.23:1234/root -f a.json -w 30 swift it -z zfs://10.250.12.23:1234/root -f a.json -d -c swift it -z zfs://10.250.12.23:1234/root -f a.json -g igraph ''' def addOptions(self): super(ImportTopicsCmd, self).addOptions() self.parser.add_option('-f', '--file', action='store', dest='fileName') self.parser.add_option('-w', '--timeout', type='int', action='store', dest='timeout') self.parser.add_option('-d', '--delete_exist', action='store_true', dest='deleteExist', default=False) self.parser.add_option('-c', '--continue', action='store_true', dest='continueAdd', default=False) self.parser.add_option('-g', '--group', action='store', dest='group') def checkOptionsValidity(self, options): ret, errMsg = super(ImportTopicsCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.fileName is None: errMsg = "ERROR: file name must be specified!" print errMsg return False, errMsg return True, '' def initMember(self, options): super(ImportTopicsCmd, self).initMember(options) self.fileName = options.fileName self.timeout = options.timeout self.deleteExist = options.deleteExist self.continueAdd = options.continueAdd self.localFileUtil = local_file_util.LocalFileUtil() self.group = options.group self.failTopicFile = './fail_topic_list_for_add_' + str(time.time()) def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): content = self.localFileUtil.cat(self.fileName) topicMap = json_wrapper.read(content) topicName = None failTopicList = {} hasError = False timeFormat = '%Y-%m-%d %X' for topic, topicDescription in topicMap.items(): partitionCount = None resource = None partitionLimit = None topicMode = None needFieldFilter = None obsoleteFileTimeInterval = None reservedFileCount = None deleteTopicData = None partitionMinBufferSize = None partitionMaxBufferSize = None maxWaitTimeForSecurityCommit = None maxDataSizeForSecurityCommit = None compressMsg = None compressThres = None dfsRoot = None topicGroup = None extendDfsRoot = None topicExpiredTime = None rangeCnt = None owners = None needSchema = None schemaVersions = None sealed = None topicType = None physicTopicLst = None enableTTLDel = None if topicDescription.has_key("topicName"): topicName = topicDescription["topicName"] if topicDescription.has_key("partitionCount"): partitionCount = int(topicDescription["partitionCount"]) if topicDescription.has_key("resource"): resource = int(topicDescription["resource"]) if topicDescription.has_key("partitionLimit"): partitionLimit = int(topicDescription["partitionLimit"]) if topicDescription.has_key("topicMode"): if topicDescription["topicMode"] == 'TOPIC_MODE_NORMAL': topicMode = TOPIC_MODE_NORMAL elif topicDescription["topicMode"] == 'TOPIC_MODE_SECURITY': topicMode = TOPIC_MODE_SECURITY elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_PREFER': topicMode = TOPIC_MODE_MEMORY_PREFER elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_ONLY': topicMode = TOPIC_MODE_MEMORY_ONLY if topicDescription.has_key("needFieldFilter"): needFieldFilter = topicDescription["needFieldFilter"] == 'true' if topicDescription.has_key("obsoleteFileTimeInterval"): obsoleteFileTimeInterval = int(topicDescription["obsoleteFileTimeInterval"]) if topicDescription.has_key("reservedFileCount"): reservedFileCount = int(topicDescription["reservedFileCount"]) if topicDescription.has_key("deleteTopicData"): deleteTopicData = topicDescription["deleteTopicData"] == 'true' if topicDescription.has_key("partitionMinBufferSize"): partitionMinBufferSize = int(topicDescription["partitionMinBufferSize"]) if topicDescription.has_key("partitionMaxBufferSize"): partitionMaxBufferSize = int(topicDescription["partitionMaxBufferSize"]) if topicDescription.has_key("maxWaitTimeForSecurityCommit"): maxWaitTimeForSecurityCommit = int(topicDescription["maxWaitTimeForSecurityCommit"]) if topicDescription.has_key("maxDataSizeForSecurityCommit"): maxDataSizeForSecurityCommit = int(topicDescription["maxDataSizeForSecurityCommit"]) if topicDescription.has_key("compressMsg"): compressMsg = topicDescription["compressMsg"] == 'true' if topicDescription.has_key("compressThres"): compressThres = int(topicDescription["compressThres"]) if topicDescription.has_key("dfsRoot"): dfsRoot = topicDescription["dfsRoot"] if topicDescription.has_key("extendDfsRoot"): extendDfsRoot = topicDescription["extendDfsRoot"] if topicDescription.has_key("topicGroup"): topicGroup = topicDescription["topicGroup"] if topicDescription.has_key("topicExpiredTime"): topicExpiredTime = int(topicDescription["topicExpiredTime"]) if topicDescription.has_key("rangeCount"): rangeCnt = int(topicDescription["rangeCount"]) if topicDescription.has_key("owners"): owners = topicDescription["owners"] if topicDescription.has_key("needSchema"): needSchema = topicDescription["needSchema"] == 'true' if topicDescription.has_key("schemaVersions"): schemaVersions = topicDescription["schemaVersions"] if topicDescription.has_key("sealed"): sealed = topicDescription["sealed"] == 'true' if topicDescription.has_key("topicType"): if topicDescription.has_key("topicType") == "TOPIC_TYPE_NORMAL": topicType = TOPIC_TYPE_NORMAL elif topicDescription.has_key("topicType") == "TOPIC_TYPE_PHYSIC": topicType = TOPIC_TYPE_PHYSIC elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC": topicType = TOPIC_TYPE_LOGIC elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC_PHYSIC": topicType = TOPIC_TYPE_LOGIC_PHYSIC if topicDescription.has_key("physicTopicLst"): physicTopicLst = topicDescription["physicTopicLst"] if topicDescription.has_key("enableTTLDel"): enableTTLDel = topicDescription["enableTTLDel"] == 'true' if self.group is not None: topicGroup = self.group if hasError and not self.continueAdd: failTopicList[topicName] = topicDescription continue if self.deleteExist: ret, reponse, errorMsg = self.adminDelegate.deleteTopic(topicName) if not ret: if errorMsg.find('ERROR_ADMIN_TOPIC_NOT_EXISTED') == -1: print "[%s] Delete topic %s failed! error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg) hasError = True continue else: print "[%s] Delete topic %s success!" % (time.strftime(timeFormat, time.localtime()), topicName) time.sleep(2) ret, response, errorMsg = self.adminDelegate.createTopic( topicName=topicName, partCnt=partitionCount, rangeCnt=rangeCnt, partMinBufSize=partitionMinBufferSize, partMaxBufSize=partitionMaxBufferSize, partResource=resource, partLimit=partitionLimit, topicMode=topicMode, needFieldFilter=needFieldFilter, obsoleteFileTimeInterval=obsoleteFileTimeInterval, reservedFileCount=reservedFileCount, deleteTopicData=deleteTopicData, securityCommitTime=maxWaitTimeForSecurityCommit, securityCommitData=maxDataSizeForSecurityCommit, compressMsg=compressMsg, compressThres=compressThres, dfsRoot=dfsRoot, topicGroup=topicGroup, extendDfsRoot=extendDfsRoot, expiredTime=topicExpiredTime, owners=owners, needSchema=needSchema, schemaVersions=schemaVersions, sealed=sealed, topicType=topicType, physicTopicLst=physicTopicLst, enableTTLDel=enableTTLDel) if not ret: print "[%s] Add topic [%s] Failed, error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg) failTopicList[topicName] = topicDescription hasError = True continue if self.timeout is not None: print "wait topic ready." if not self._waitTopicReady(self.timeout, topicName): print "wait topic %s ready failed" % topicName failTopicList[topicName] = topicDescription hasError = True continue print "[%s] Add topic [%s] success!" % (time.strftime(timeFormat, time.localtime()), topicName) if len(failTopicList) > 0: self.addFailTopic(failTopicList) return "", "", 0 def addFailTopic(self, failTopicList): failTopic = json_wrapper.write(failTopicList, format=True) return self.localFileUtil.write(failTopic, self.failTopicFile, 'w') def _waitTopicReady(self, timeout, topicName): while timeout > 0: if self._isTopicReady(topicName): print "topic [%s] is ready" % topicName return "", "", 0 timeout -= 0.1 time.sleep(0.1) return "", "wait topic ready timeout.", -1 def _isTopicReady(self, topicName): ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName) if ret: if response.HasField(swift_common_define.PROTO_TOPIC_INFO): topicInfo = response.topicInfo for partitionInfo in topicInfo.partitionInfos: if partitionInfo.status != PARTITION_STATUS_RUNNING: print "ERROR: partition info status: %s, response: %s" % \ (str(partitionInfo), str(response)) return False else: print "ERROR: response does not have topicInfo field! response: %s" % str(response) return False else: print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg) return False return True class UpdateTopicsCmd(base_cmd.BaseCmd): ''' swift {ut|updatetopics} {-z zookeeper_address | --zookeeper=zookeeper_address } {-f local_file_name | --file=local_file_name } {-w time_out | --timeout=time_out} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -f local_file_name, --file=local_file_name : required, local file name -w time_out, --timeout=time_out : optional, timeout for one topic Example: swift ut -z zfs://10.250.12.23:1234/root -f a.json -w 30 ''' def addOptions(self): super(UpdateTopicsCmd, self).addOptions() self.parser.add_option('-f', '--file', action='store', dest='fileName') self.parser.add_option('-w', '--timeout', type='int', action='store', dest='timeout') def checkOptionsValidity(self, options): if not super(UpdateTopicsCmd, self).checkOptionsValidity(options): return False if options.fileName is None: print "ERROR: file name must be specified!" return False return True def initMember(self, options): super(UpdateTopicsCmd, self).initMember(options) self.fileName = options.fileName self.timeout = options.timeout self.localFileUtil = local_file_util.LocalFileUtil() self.failTopicFile = './fail_topic_list_for_add' def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): content = self.localFileUtil.cat(self.fileName) topicMap = json_wrapper.read(content) topics = sorted(topicMap.items(), key=lambda x: x[1]) for (topic, topicDescription) in topics: if topic != topicDescription["topicName"]: print "error, topic name not equal [%s %s]" % (topic, topicDescription["topicName"]) return "", "", 0 ret, response, errorMsg = self.adminDelegate.deleteTopic(topic) if not ret: print "delete topic [%s] error, %s" % (topic, errorMsg) else: print "delete topic [%s] success." % topic time.sleep(2) # wait partition unload if not self.addOneTopic(topicDescription): print "not all topic updated, retry!" return "", "", 0 return "", "", 0 def addOneTopic(self, topicDescription): partitionCount = None resource = None partitionLimit = None topicMode = None needFieldFilter = None obsoleteFileTimeInterval = None reservedFileCount = None deleteTopicData = None partitionMinBufferSize = None partitionMaxBufferSize = None maxWaitTimeForSecurityCommit = None maxDataSizeForSecurityCommit = None compressMsg = None compressThres = None dfsRoot = None topicGroup = None extendDfsRoot = None if topicDescription.has_key("topicName"): topicName = topicDescription["topicName"] if topicDescription.has_key("partitionCount"): partitionCount = int(topicDescription["partitionCount"]) if topicDescription.has_key("resource"): resource = int(topicDescription["resource"]) if topicDescription.has_key("partitionLimit"): partitionLimit = int(topicDescription["partitionLimit"]) if topicDescription.has_key("topicMode"): if topicDescription["topicMode"] == 'TOPIC_MODE_NORMAL': topicMode = TOPIC_MODE_NORMAL elif topicDescription["topicMode"] == 'TOPIC_MODE_SECURITY': topicMode = TOPIC_MODE_SECURITY if topicDescription.has_key("needFieldFilter"): needFieldFilter = topicDescription["needFieldFilter"] == 'true' if topicDescription.has_key("obsoleteFileTimeInterval"): obsoleteFileTimeInterval = int(topicDescription["obsoleteFileTimeInterval"]) if topicDescription.has_key("reservedFileCount"): reservedFileCount = int(topicDescription["reservedFileCount"]) if topicDescription.has_key("deleteTopicData"): deleteTopicData = topicDescription["deleteTopicData"] == 'true' if topicDescription.has_key("partitionMinBufferSize"): partitionMinBufferSize = int(topicDescription["partitionMinBufferSize"]) if topicDescription.has_key("partitionMaxBufferSize"): partitionMaxBufferSize = int(topicDescription["partitionMaxBufferSize"]) if topicDescription.has_key("maxWaitTimeForSecurityCommit"): maxWaitTimeForSecurityCommit = int(topicDescription["maxWaitTimeForSecurityCommit"]) if topicDescription.has_key("maxDataSizeForSecurityCommit"): maxDataSizeForSecurityCommit = int(topicDescription["maxDataSizeForSecurityCommit"]) if topicDescription.has_key("compressMsg"): compressMsg = topicDescription["compressMsg"] == 'true' if topicDescription.has_key("compressThres"): compressThres = int(topicDescription["compressThres"]) if topicDescription.has_key("dfsRoot"): dfsRoot = topicDescription["dfsRoot"] if topicDescription.has_key("extendDfsRoot"): extendDfsRoot = topicDescription["extendDfsRoot"] if topicDescription.has_key("topicGroup"): topicGroup = topicDescription["topicGroup"] ret, response, errorMsg = self.adminDelegate.createTopic( topicName=topicName, partCnt=partitionCount, rangeCnt=self.rangeCnt, partMinBufSize=partitionMinBufferSize, partMaxBufSize=partitionMaxBufferSize, partResource=resource, partLimit=partitionLimit, topicMode=topicMode, needFieldFilter=needFieldFilter, obsoleteFileTimeInterval=obsoleteFileTimeInterval, reservedFileCount=reservedFileCount, deleteTopicData=deleteTopicData, securityCommitTime=maxWaitTimeForSecurityCommit, securityCommitData=maxDataSizeForSecurityCommit, compressMsg=compressMsg, compressThres=compressThres, dfsRoot=dfsRoot, topicGroup=topicGroup, extendDfsRoot=extendDfsRoot) if not ret: print "Add topic [%s] Failed!" % topicName print errorMsg return False if self.timeout is not None: print "wait topic [%s] ready." % topicName if not self._waitTopicReady(self.timeout, topicName): print "wait topic %s ready failed" % topicName return False print "update topic %s success!" % topicName return True def _waitTopicReady(self, timeout, topicName): while timeout > 0: if self._isTopicReady(topicName): print "topic [%s] is ready" % topicName return "", "", 0 timeout -= 0.1 time.sleep(0.1) return "", "wait topic ready timeout.", -1 def _isTopicReady(self, topicName): ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName) if ret: if response.HasField(swift_common_define.PROTO_TOPIC_INFO): topicInfo = response.topicInfo for partitionInfo in topicInfo.partitionInfos: if partitionInfo.status != PARTITION_STATUS_RUNNING: print "ERROR: partition info status: %s, response: %s" % \ (str(partitionInfo), str(response)) return False else: print "ERROR: response does not have topicInfo field! response: %s" % str(response) return False else: print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg) return False return True class TransferPartitionCmd(base_cmd.BaseCmd): ''' swift {tp|transferpartition} {-z zookeeper_address | --zookeeper=zookeeper_address } {-t transfer_info | --transfer=transfer_info } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t transfer_info, --transfer=transfer_info : required, topic name, eg: role_name1:0.3,role_name2:0.1 -g group_name, --group_name=group_name : opitional, group name, eg: swift_mainse Example: swift tp -z zfs://10.250.12.23:1234/root -t group1##broker_1_0:0.3;group2##broker_1_0:0.2 swift tp -z zfs://10.250.12.23:1234/root -t all swift tp -z zfs://10.250.12.23:1234/root -t all -g swift_mainse ''' def addOptions(self): super(TransferPartitionCmd, self).addOptions() self.parser.add_option('-t', '--transfer', action='store', dest='transferInfo') self.parser.add_option('-g', '--group_name', action='store', dest='groupName') def checkOptionsValidity(self, options): ret, errMsg = super(TransferPartitionCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.transferInfo is None: errMsg = "ERROR: transfer info must be specified!" return False, errMsg return True, '' def initMember(self, options): super(TransferPartitionCmd, self).initMember(options) self.groupName = options.groupName self.transferInfo = options.transferInfo def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.transferPartition( self.transferInfo, self.groupName) if self.fromApi: return ret, response, errorMsg if not ret: print "transfer partition failed!" print errorMsg return "", "", 1 print "transfer partition success!" return "", "", 0 class ChangeSlotCmd(base_cmd.BaseCmd): ''' swift {cs|changeSlot} {-z zookeeper_address | --zookeeper=zookeeper_address } {-r role_names | --rolenames=role_names } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -r role_names, --rolenames=role_names : required, role name, eg: default##broker_4_0,tisplus##broker_0_0 Example: swift cs -z zfs://10.250.12.23:1234/root -r group1##broker_1_0,group2##broker_2_0 ''' def addOptions(self): super(ChangeSlotCmd, self).addOptions() self.parser.add_option('-r', '--rolenames', action='store', dest='roleNames') def checkOptionsValidity(self, options): ret, errMsg = super(ChangeSlotCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.roleNames is None: errMsg = "ERROR: roleNames must be specified!" return False, errMsg return True, '' def initMember(self, options): super(ChangeSlotCmd, self).initMember(options) self.roleNames = options.roleNames def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.changeSlot(self.roleNames) if self.fromApi: return ret, response, errorMsg if not ret: print "change slot failed!" print errorMsg return "", "", 1 print "change slot success!" return "", "", 0 class RegisterSchemaCmd(base_cmd.BaseCmd): ''' swift {regscm|registerSchema} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_name, --topic_name : required, topic name -s schema, --schema : required, schema json string -v version, --version : optional, user defined int version -c cover, --cover : optional, to cover old versions or not if schema versions full, default is false Example: swift regscm -z zfs://10.250.12.23:1234/root -t mainse -s "{\"topic\":\"mainse\",\"fields\":[{\"name\":\"nid\",\"type\":\"string\"}]}" ''' def addOptions(self): super(RegisterSchemaCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') self.parser.add_option('-s', '--schema', action='store', dest='schema') self.parser.add_option('-v', '--version', action='store', dest='version') self.parser.add_option('-c', '--cover', action='store_true', dest='cover', default=False) def checkOptionsValidity(self, options): ret, errMsg = super(RegisterSchemaCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicName is None: errMsg = "ERROR: topic name must be specified!" return False, errMsg if options.schema is None: errMsg = "ERROR: schema must be specified!" return False, errMsg return True, '' def initMember(self, options): super(RegisterSchemaCmd, self).initMember(options) self.schema = options.schema self.topicName = options.topicName self.version = options.version self.cover = options.cover def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.registerSchema(self.topicName, self.schema, self.version, self.cover) if self.fromApi: return ret, response, errorMsg if not ret: print "register schema failed!" print errorMsg return "", "", 1 print "register schema[%d] success!" % response.version return response.version, "", 0 class GetSchemaCmd(base_cmd.BaseCmd): ''' swift {gscm|getSchemaCmd} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_name, --topic_name : required, topic name -v version, --version : required, schema version Example: swift gscm -z zfs://10.250.12.23:1234/root -t mainse -v 1234 ''' def addOptions(self): super(GetSchemaCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') self.parser.add_option('-v', '--version', action='store', dest='version') def checkOptionsValidity(self, options): ret, errMsg = super(GetSchemaCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.topicName is None: errMsg = "ERROR: topic name must be specified!" return False, errMsg if options.version is None: errMsg = "ERROR: version must be specified!" return False, errMsg return True, '' def initMember(self, options): super(GetSchemaCmd, self).initMember(options) self.topicName = options.topicName self.version = options.version def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.getSchema(self.topicName, self.version) if self.fromApi: return ret, response, errorMsg if not ret: print "get schema failed!" print errorMsg return "", "", 1 print response.schemaInfo return response.schemaInfo, "", 0 class GetTopicRWTimeCmd(base_cmd.BaseCmd): ''' swift {gtrw|getopicrwtime} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -t topic_name, --topic_name : optional, topic name Example: swift gtrw -z zfs://10.250.12.23:1234/root -t mainse ''' def addOptions(self): super(GetTopicRWTimeCmd, self).addOptions() self.parser.add_option('-t', '--topic', action='store', dest='topicName') def checkOptionsValidity(self, options): ret, errMsg = super(GetTopicRWTimeCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg return True, '' def initMember(self, options): super(GetTopicRWTimeCmd, self).initMember(options) self.topicName = options.topicName def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.getTopicRWTime(self.topicName) if self.fromApi: return ret, response, errorMsg if not ret: print errorMsg return "", errorMsg, 1 print response return response, "", 0 class GetLastDeletedNoUseTopicCmd(base_cmd.BaseCmd): ''' swift {gtldnt|getlastdeletednousetopic} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address Example: swift gtldnt -z zfs://10.250.12.23:1234/root ''' def addOptions(self): super(GetLastDeletedNoUseTopicCmd, self).addOptions() def checkOptionsValidity(self, options): ret, errMsg = super(GetLastDeletedNoUseTopicCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg return True, '' def initMember(self, options): super(GetLastDeletedNoUseTopicCmd, self).initMember(options) def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.getLastDeletedNoUseTopic() if self.fromApi: return ret, response, errorMsg if not ret: print errorMsg return "", errorMsg, 1 else: print response return response, "", 0 class GetDeletedNoUseTopicCmd(base_cmd.BaseCmd): ''' swift {gtdnt|getdeletednousetopic} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -f file_name, --file_name : required, deleted topic file name Example: swift gtdnt -z zfs://10.250.12.23:1234/root -f 163234200 ''' def addOptions(self): super(GetDeletedNoUseTopicCmd, self).addOptions() self.parser.add_option('-f', '--file_name', action='store', dest='fileName') def checkOptionsValidity(self, options): ret, errMsg = super(GetDeletedNoUseTopicCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.fileName is None: errMsg = "ERROR: file name must be specified!" return False, errMsg return True, '' def initMember(self, options): super(GetDeletedNoUseTopicCmd, self).initMember(options) self.fileName = options.fileName def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.getDeletedNoUseTopic(self.fileName) if self.fromApi: return ret, response, errorMsg if not ret: print errorMsg return "", errorMsg, 1 else: print response return response, "", 0 class GetDeletedNoUseTopicFilesCmd(base_cmd.BaseCmd): ''' swift {gtdntf|getdeletednousetopicfiles} options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address Example: swift gtdntf -z zfs://10.250.12.23:1234/root ''' def addOptions(self): super(GetDeletedNoUseTopicFilesCmd, self).addOptions() def checkOptionsValidity(self, options): ret, errMsg = super(GetDeletedNoUseTopicFilesCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg return True, '' def initMember(self, options): super(GetDeletedNoUseTopicFilesCmd, self).initMember(options) def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errorMsg = self.adminDelegate.getDeletedNoUseTopicFiles() if self.fromApi: return ret, response, errorMsg if not ret: print errorMsg return "", errorMsg, 1 else: print response return response, "", 0 class TopicAclManageCmd(base_cmd.BaseCmd): ''' swift {tam|topicaclmanage} {-z zookeeper_address | --zookeeper=zookeeper_address } {-a access_operator | --accessop=access_operator } {-t topic_name | --topic=topic_name } {-u access_id | --accessid=access_id } {-k access_key | --accesskey=access_key } {-p access_priority | --accessprior=access_priority } {-y access_type | --accesstype=access_type } {-s check_status | --checkstatus=check_status } options: -z zookeeper_address, --zookeeper=zookeeper_address : required, zookeeper root address -a access_operator, --accessop=access_operator : required, operator to be taken, should be one of ['add', 'delete', 'clear', 'update', 'key_update', 'priority_update', 'type_update', 'status', 'list', 'listall'] -t topic_name, --topic=topic_name : required except accessop='listall', topic name to be operated -u access_id, --accessid=access_id : required when accessop='add', '*update', 'delete', access id to be operated -k access_key, --accesskey=access_key : required when accessop='add', 'key_update', new access key to be set -p access_priority, --accessprior=access_priority : required when accessop='priority_update', new priority to be set, should be one of ['p0', 'p1',...,'p5'] -y access_type, --accesstype=access_type : required when accessop='type_update', new access type to be set, should be one of ['read_only', 'read_write', 'none'] -s check_status, --checkstatus=check_status : required when accessop='status', check status to be set, should be one of ['on', 'off'] ''' def addOptions(self): super(TopicAclManageCmd, self).addOptions() self.parser.add_option('-a', '--accessop', type='choice', action='store', dest='accessOp', choices=['add', 'delete', 'clear', 'update', 'key_update', 'priority_update', 'type_update', 'status', 'list', 'listall']) self.parser.add_option('-t', '--topic', action='store', dest="topicName") self.parser.add_option('-u', '--accessid', action='store', dest='accessId') self.parser.add_option('-k', '--accesskey', action='store', dest='accessKey') self.parser.add_option('-p', '--accessprior', type='choice', action='store', dest='accessPriority', choices=['p0', 'p1', 'p2', 'p3', 'p4', 'p5']) self.parser.add_option('-y', '--accesstype', type='choice', action='store', dest='accessType', choices=['read_only', 'read_write', 'none']) self.parser.add_option('-s', '--check_status', type='choice', action='store', dest='checkStatus', choices=['on', 'off']) def checkOptionsValidity(self, options): ret, errMsg = super(TopicAclManageCmd, self).checkOptionsValidity(options) if not ret: return False, errMsg if options.accessOp is None: errMsg = "ERROR: accessop must be specified" return False, errMsg if options.topicName is None and options.accessOp != 'listall': errMsg = "ERROR: topic must be specified when %s" % options.accessOp return False, errMsg if options.accessId is None and "update" in options.accessOp: errMsg = "ERROR: accessid must be specified when %s" % options.accessOp return False, errMsg if options.accessId is None and options.accessOp == 'delete': errMsg = "ERROR: accessid must be specified when delete" return False, errMsg if options.accessOp == 'add' and (options.accessId is None or options.accessKey is None): errMsg = "ERROR: accessid/accesskey must be specified when add" return False, errMsg if options.accessOp == 'key_update' and options.accessKey is None: errMsg = "ERROR: accesskey must be specified when key_update" return False, errMsg if options.accessOp == 'priority_update' and options.accessPriority is None: errMsg = "ERROR: accessprior must be specified when priority_update" return False, errMsg if options.accessOp == 'type_update' and options.accessType is None: errMsg = "ERROR: accesstype must be specified when type_update" return False, errMsg if options.accessOp == 'status' and options.checkStatus is None: errMsg = "ERROR: checkstatus must be specified when status" return False, errMsg return True, '' def initMember(self, options): super(TopicAclManageCmd, self).initMember(options) if options.accessOp == 'add': self.accessOp = ADD_TOPIC_ACCESS elif options.accessOp == 'delete': self.accessOp = DELETE_TOPIC_ACCESS elif options.accessOp == 'update': self.accessOp = UPDATE_TOPIC_ACCESS elif options.accessOp == 'key_update': self.accessOp = UPDATE_TOPIC_ACCESS_KEY elif options.accessOp == 'priority_update': self.accessOp = UPDATE_TOPIC_ACCESS_PRIORITY elif options.accessOp == 'type_update': self.accessOp = UPDATE_TOPIC_ACCESS_TYPE elif options.accessOp == 'status': self.accessOp = UPDATE_TOPIC_AUTH_STATUS elif options.accessOp == 'clear': self.accessOp = CLEAR_TOPIC_ACCESS elif options.accessOp == 'list': self.accessOp = LIST_ONE_TOPIC_ACCESS elif options.accessOp == 'listall': self.accessOp = LIST_ALL_TOPIC_ACCESS self.topicName = options.topicName self.accessId = options.accessId self.accessKey = options.accessKey self.accessPriority = None if options.accessPriority == 'p0': self.accessPriority = ACCESS_PRIORITY_P0 elif options.accessPriority == 'p1': self.accessPriority = ACCESS_PRIORITY_P1 elif options.accessPriority == 'p2': self.accessPriority = ACCESS_PRIORITY_P2 elif options.accessPriority == 'p3': self.accessPriority = ACCESS_PRIORITY_P3 elif options.accessPriority == 'p4': self.accessPriority = ACCESS_PRIORITY_P4 elif options.accessPriority == 'p5': self.accessPriority = ACCESS_PRIORITY_P5 self.accessType = None if options.accessType == 'read_only': self.accessType = TOPIC_ACCESS_READ elif options.accessType == 'read_write': self.accessType = TOPIC_ACCESS_READ_WRITE elif options.accessType == "none": self.accessType = TOPIC_ACCESS_NONE self.checkStatus = None if options.checkStatus == 'on': self.checkStatus = TOPIC_AUTH_CHECK_ON else: self.checkStatus = TOPIC_AUTH_CHECK_OFF def buildDelegate(self): self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate( self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd) return True def run(self): ret, response, errMsg = self.adminDelegate.topicAclManage( accessOp=self.accessOp, topicName=self.topicName, accessId=self.accessId, accessKey=self.accessKey, accessPriority=self.accessPriority, accessType=self.accessType, checkStatus=self.checkStatus) if not ret: print "topic acl manage failed" return ret, response, 1 print "topic acl manage success" if self.accessOp == LIST_ONE_TOPIC_ACCESS or self.accessOp == LIST_ALL_TOPIC_ACCESS: self.parseAndPrintTopicAcl(response) return ret, response, 0 def parseAndPrintTopicAcl(self, response): topicAclDatas = response.topicAclDatas.allTopicAclData toFormatStr = '{"topicName":"%s", "checkStatus":"%s", "topicAccessInfos":[%s]}' for topicAclData in topicAclDatas: accessInfoList = [] accessInfoStr = '{"accessId":"%s", "accessKey":"%s", "priority":"%s", "accessType":"%s"}' for topicAccessInfo in topicAclData.topicAccessInfos: oneItem = accessInfoStr % (topicAccessInfo.accessAuthInfo.accessId, topicAccessInfo.accessAuthInfo.accessKey, self._accessPriorityToStr(topicAccessInfo.accessPriority), self._accessTypeToStr(topicAccessInfo.accessType)) accessInfoList.append(oneItem) accessInfosStr = ",".join(accessInfoList) oneItem = toFormatStr % (topicAclData.topicName, self._checkStatusToStr(topicAclData.checkStatus), accessInfosStr) print oneItem def _accessPriorityToStr(self, priority): if priority == ACCESS_PRIORITY_P0: return "p0" elif priority == ACCESS_PRIORITY_P1: return "p1" elif priority == ACCESS_PRIORITY_P2: return "p2" elif priority == ACCESS_PRIORITY_P3: return "p3" elif priority == ACCESS_PRIORITY_P4: return "p4" elif priority == ACCESS_PRIORITY_P5: return "p5" def _accessTypeToStr(self, type): if type == TOPIC_ACCESS_NONE: return "none" elif type == TOPIC_ACCESS_READ: return "read_only" elif type == TOPIC_ACCESS_READ_WRITE: return "read_write" def _checkStatusToStr(self, status): if status == TOPIC_AUTH_CHECK_ON: return "on" else: return "off"