import os

import base_cmd
import swift_common_define
import swift_admin_delegate
import status_analyzer
import swift_util
from swift.protocol.Common_pb2 import *
from swift.protocol.AdminRequestResponse_pb2 import *
import transport_cmd_base
import time
import swift_broker_delegate
import local_file_util
import zlib
import json_wrapper


class TopicAddCmd(base_cmd.BaseCmd):
    '''
    swift {at|addtopic}
       {-z zookeeper_address           | --zookeeper=zookeeper_address }
       {-t topic_name                  | --topic=topic_name }
       {-c partition_count             | --partcount=partition_count }
       {-R range_count                 | --rangecount=range_count }
       {-s partition_buf_size          | --partbuf=partition_buf_size }
       {-b partition_file_buf_size     | --partfilebuf=partition_file_buf_size }
       {-y partition_max_buf_size      | --partmaxbuf=partition_max_buf_size }
       {-x partition_min_buf_size      | --partminbuf=partition_min_buf_size }
       {-r partition_resource          | --partresource=partition_resource }
       {-l partition_limit             | --partlimit=partition_limit }
       {-m topic_mode                  | --topicmode=topic_mode }
       {-f                             | --fieldfilter }
       {-S                             | --needSchema }
       {-i obsolete_file_interval      | --obsoletefileinterval=obsolete_file_interval }
       {-n reserved_file_count         | --reservedfilecount=reserved_file_count }
       {-d                             | --deletetopicdata }
       {-o security_commit_time        | --committime=security_commit_time }
       {-p security_commit_data        | --commitdata=security_commit_data }
       {-O owners                      | --owners=owners }
       {-q                             | --compress=compress_single_msg }
       {-u                             | --compressthres=compress_single_msg_threshold }
       {-a                             | --dfsroot=dfs_root }
       {-e                             | --extenddfsroot=extend_dfs_root }
       {-g                             | --topicgroup=topic_group }
       {-j                             | --expired=expired }
       {-E                             | --sealed=sealed }
       {-Y                             | --topicType=topic_type }
       {-P                             | --physicTopicLst=physic_topic_list}
       {-N                             | --enableTTLDel=enable_ttl_del}
       {-G                             | --enableLongPolling=enable_long_polling}
       {-v                             | --versionControl=version_control}
       {-M                             | --enableMergeData=enable_merge_data}
       {-I                             | --permitUser=permitUser}
       {-Z                             | --readnotcommitmsg=read_notcommit_msg}

    options:
       -z zookeeper_address,      --zookeeper=zookeeper_address         : required, zookeeper root address
       -t topic_name,             --topic=topic_name                    : required, topic name, eg: news
       -c partition_count,        --partcount=partition_count           : required, partition count of the topic
       -R range_count,            --rangecount=range_count              : optional, range count in one partion, default is 4,
       -s partition_buf_size      --partbuf=partition_buf_size          : [deprecated], partition buffer size of the topic, M
       -b partition_file_buf_size --partfilebuf=partition_file_buf_size : [deprecated], partition read file buffer size of the topic, M
       -x partition_min_buf_size  --partmaxbuf=partition_max_buf_size   : [optional], partition max buffer size of the topic, M
       -y partition_max_buf_size  --partminbuf=partition_min_buf_size   : [optional], partition min buffer size of the topic, M
       -r partition_resource      --partresource=partition_resource     : optional, partition resource[1,1000], default is 100
       -l partition_limit         --partlimit=partition_limit           : optional, partition limit of this topic in one broker, default is no limit
       -m topic_mode              --topicmode=topic_mode                : optional, normal_mode | security_mode | memory_only_mode | memory_prefer_mode, default is normal_mode
       -f                         --fieldfilter                         : optional, filter fields in msg if specialized
       -i obsolete_file_interval  --obsoletefileinterval=obsolete_file_interval : optional, obsolete file time interval, unit is hour.
       -n reserved_file_count     --reservedfilecount=reserved_file_count       : optional, reserved file count.
       -d                         --deletetopicdata                     : optional, delete old topic data if specialized
       -o security_commit_time    --committime=security_commit_time     : optional, max wait time for commit message in security_mode
       -p security_commit_data    --commitdata=security_commit_data     : optional, max data size for commit message in security_mode
       -O owners                  --owners=owners                       : optional, topic owners, multi owners seperate by ,
       -S                         --needschema                          : optional, schema topic
 if specialized
       -V schema_versions         --schema_versions                     : optional, topic schema verisons, must exist, versions seperate by ','
       -q                         --compress=compress_single_msg        : optional, compress msg in this topic
       -u                         --compressthres=compress_single_msg_threshold        : optional, compress msg great than threshold [default:2048]
       -a                         --dfsroot=dfs_root                    : optional, dfs_root
       -e                         --extenddfsroot=extend_dfs_root       : optional, extend dfs_root add old data when dfs root changed
       -g                         --topicgroup=topic_group              : optional, topic_group
       -j                         --expired=expired                     : optional, topic expired time, auto delete topic after expired, second
       -T                         --request_time_out=optional           : optional, timeout for request, ms
       -E                         --sealed                              : optional, is sealed topic or not
       -Y                         --topicType=topic_type                : optional, TOPIC_TYPE_NORMAL | TOPIC_TYPE_PHYSIC | TOPIC_TYPE_LOGIC | TOPIC_TYPE_LOGIC_PHYSIC, default is TOPIC_TYPE_NORMAL
       -P                         --physicTopicLst=physic_topic_list    : optional, physic topic list, seperate by ','
       -N                         --enableTTLDel=enable_ttl_del           : optional, enable TTL delete or not
       -G                         --enable_long_polling=enableLongPolling : optional, enable long polling
       -v                         --versionControl=version_control      : optional, True or False
       -M                         --enableMergeData=enable_merge_data   : optional, enable merge data
       -I                         --permitUser=permitUser               : optional, users allowed to operate this topic, seperate by ','
       -Z                         --readnotcommitmsg=read_notcommit_msg : optional, whether topic can read not committed msg, default is true

Example:
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -x 10
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -y 100 -b 128
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -x 10 -y 100 -r 30
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -m security_mode -f
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -i 1 -n 5
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -l 2
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -r 30 -l 2 -q
    swift at -z zfs://10.250.12.23:1234/root -t news -c 1 -j 1000 -O Lilei,Hanmeimei
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -a hdfs://xxx/1 -e hdfs://xxx/2,hdfs://xxx/3
    swift at -z zfs://10.250.12.23:1234/root -t news -c 20 -s 10 [deprecated]
    swift at -z zfs://10.250.12.23:1234/root -t news --permitUser=Hanmeimei,Lilei
    '''

    def addOptions(self):
        super(TopicAddCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicName')
        self.parser.add_option('-c', '--partcount', type='int',
                               action='store', dest='partCnt')
        self.parser.add_option('-R', '--rangecount', type='int',
                               action='store', dest='rangeCnt')
        self.parser.add_option('-s', '--partbuf', type='int',
                               action='store', dest='partBuf')
        self.parser.add_option('-b', '--partfilebuf', type='int',
                               action='store', dest='partFileBuf')
        self.parser.add_option('-x', '--partminbuf', type='int',
                               action='store', dest='partMinBuf')
        self.parser.add_option('-y', '--partmaxbuf', type='int',
                               action='store', dest='partMaxBuf')
        self.parser.add_option('-r', '--partresource', type='int',
                               action='store', dest='partResource')
        self.parser.add_option('-l', '--partlimit', type='int',
                               action='store', dest='partLimit')
        self.parser.add_option('-w', '--waittopicready', type='int',
                               action='store', dest='timeout')
        self.parser.add_option(
            '-m',
            '--topicmode',
            type='choice',
            action='store',
            dest='topicMode',
            choices=[
                'normal_mode',
                'security_mode',
                'memory_only_mode',
                'memory_prefer_mode',
                'persist_data_mode'],
            default='normal_mode')
        self.parser.add_option('-f', '--fieldfilter',
                               action='store_true',
                               dest='fieldFilter',
                               default=False)
        self.parser.add_option('-i', '--obsoletefileinterval', type='int',
                               action='store', dest='obsoleteFileInterval')
        self.parser.add_option('-n', '--reservedfilecount', type='int',
                               action='store', dest='reservedFileCount')
        self.parser.add_option('-d', '--deletetopicdata',
                               action='store_true',
                               dest='deleteTopicData',
                               default=False)
        self.parser.add_option('-o', '--committime', type='int',
                               action='store', dest='commitTime')
        self.parser.add_option('-p', '--commitdata', type='int',
                               action='store', dest='commitData')
        self.parser.add_option('-O', '--owners', action='store', dest='owners')
        self.parser.add_option('-q', '--compress',
                               action='store_true',
                               dest='compressMsg',
                               default=False)
        self.parser.add_option('-u', '--compressthres',
                               action='store_true',
                               dest='compressThres')
        self.parser.add_option('-a', '--dfsRoot', action='store',
                               dest='dfsRoot')
        self.parser.add_option('-e', '--extendDfsRoot', action='store',
                               dest='extendDfsRoot')
        self.parser.add_option('-g', '--topicGroup', action='store',
                               dest='topicGroup')
        self.parser.add_option('-j', '--expired', type='int',
                               action='store', dest='expiredTime')
        self.parser.add_option('-S', '--needschema',
                               action='store_true',
                               dest='needSchema',
                               default=False)
        self.parser.add_option('-V', '--schema_versions', action='store', dest='schemaVersions')
        self.parser.add_option('-T', '--request_time_out', type='int',
                               action='store', dest='requestTimeout')
        self.parser.add_option('-E', '--sealed', action='store',
                               dest='sealed', default='false')
        self.parser.add_option('-Y', '--topic_type', type='choice',
                               action='store', dest='topicType',
                               choices=['TOPIC_TYPE_NORMAL', 'TOPIC_TYPE_PHYSIC',
                                        'TOPIC_TYPE_LOGIC', 'TOPIC_TYPE_LOGIC_PHYSIC'],
                               default='TOPIC_TYPE_NORMAL')
        self.parser.add_option('-P', '--physic_topic_list', action='store',
                               dest='physicTopicLst')
        self.parser.add_option('-N', '--enable_ttl_del', action='store',
                               dest='enableTTLDel', default='true')
        self.parser.add_option('-G', '--enable_long_polling', action='store',
                               dest='enableLongPolling', default='false')
        self.parser.add_option('-v', '--version_control', action='store',
                               dest='versionControl', default='false')
        self.parser.add_option('-M', '--enable_merge_data', action='store',
                               dest='enableMergeData', default='false')
        self.parser.add_option('-I', '--permitUser', action='store',
                               dest='permitUser', default='')
        self.parser.add_option('-Z', '--readnotcommitmsg', action='store',
                               dest='readNotCommmitMsg', default='true')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicAddCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.topicName is None:
            errMsg = "ERROR: topic must be specified!"
            return False, errMsg

        if options.partCnt is None:
            errMsg = "ERROR: partition count for topic[%s] must be specified!"\
                     % options.topicName
            return False, errMsg

        if options.partCnt <= 0:
            errMsg = "ERROR: partition count [%d] must greater than zero" \
                     % options.partCnt
            return False, errMsg

        if options.rangeCnt is not None and options.rangeCnt < 1:
            errMsg = "ERROR: range count [%d] must greater than zero" \
                     % options.rangeCnt
            return False, errMsg

        if options.partBuf is not None:
            errMsg = "WARN: partition buffer is deprecated. using partminbuf and partmaxbuf!"

        if options.partBuf is not None and options.partBuf <= 0:
            errMsg = "ERROR: partition buffer [%d] must greater than zero"\
                     % options.partBuf
            return False, errMsg

        if options.partMaxBuf is None and options.partMinBuf is None and options.partBuf is not None:
            options.partMinBuf = options.partBuf / 2
            if options.partMinBuf == 0:
                options.partMinBuf = 1
            options.partMaxBuf = options.partBuf * 2
            errMsg += " WARN: partmaxbuf and partminbuf is not setted, using partbuf value to"\
                      "set partminbuf %d and partmaxbuf %d!"\
                      % (options.partMinBuf, options.partMaxBuf)

        if options.partMaxBuf is not None and options.partMaxBuf <= 0:
            errMsg = "ERROR: partition max buffer [%d] must greater than zero"\
                     % options.partMaxBuf
            return False, errMsg

        if options.partMinBuf is not None and options.partMinBuf <= 0:
            errMsg = "ERROR: partition min buffer [%d] must greater than zero"\
                     % options.partMinBuf
            return False, errMsg

        if options.partMinBuf is not None and options.partMaxBuf is not None and options.partMinBuf > options.partMaxBuf:
            errMsg = "ERROR: partition min buffer [%d] must not greater than partition"\
                     " max buff [%d]" % (options.partMinBuf, options.partMaxBuf)
            return False, errMsg

        if options.partResource is not None and options.partResource <= 0:
            errMsg = "ERROR: partition resource [%d] must greater than zero" \
                     % options.partResource
            return False, errMsg

        if options.partLimit is not None and options.partLimit <= 0:
            errMsg = "ERROR: partition limit [%d] must greater than zero" % (
                options.partLimit)
            return False, errMsg

        if options.obsoleteFileInterval is not None and options.obsoleteFileInterval <= 0:
            errMsg = "ERROR: obsolete file time interval [%d] must greater than zero" % (
                options.obsoleteFileInterval)
            return False, errMsg

        if options.reservedFileCount is not None and options.reservedFileCount <= 0:
            errMsg = "ERROR: reserved file count [%d] must greater than zero" % (
                options.reservedFileCount)
            return False, errMsg

        if options.partFileBuf is not None:
            errMsg += " WARN: setting partition file buffer has deprecated."

        if options.commitData is not None and options.commitData < 0:
            errMsg = "ERROR: security max commit data should not less than zero [%d]." \
                     % options.commitData
            return False, errMsg

        if options.commitTime is not None and options.commitTime < 0:
            errMsg = "ERROR: security max commit time should not less than zero [%d]." \
                     % options.commitTime
            return False, errMsg

        if options.expiredTime is not None and options.expiredTime < -1:
            errMsg = "ERROR: expired time should not less than -1 [%d]." \
                     % options.commitTime
            return False, errMsg

        if options.fieldFilter is not None and options.fieldFilter \
           and options.needSchema is not None and options.needSchema:
            errMsg = "ERROR: cannot set both field filter and need schema"
            return False, errMsg

        if options.requestTimeout is not None and options.requestTimeout < 0:
            errMsg = "ERROR: expired time should larger than 0[%d]." \
                     % options.requestTimeout
            return False, errMsg

        if options.sealed is not None and options.sealed not in ['true', 'false']:
            errMsg = "ERROR: sealed can only set ['true', 'false']"
            return False, errMsg

        if options.enableTTLDel is not None and options.enableTTLDel not in ['true', 'false']:
            errMsg = "ERROR: enableTTLDel can only set ['true', 'false']"
            return False, errMsg

        if options.enableLongPolling is not None and options.enableLongPolling not in ['true', 'false']:
            errMsg = "ERROR, enableLongPolling can only set ['true', 'false']"
            return False, errMsg

        if options.enableMergeData is not None and options.enableMergeData not in ['true', 'false']:
            errMsg = "ERROR, enableMergeData can only set ['true', 'false']"
            return False, errMsg

        if options.readNotCommmitMsg is not None and options.readNotCommmitMsg not in ['true', 'false']:
            errMsg = "ERROR, readNotCommmitMsg can only set ['true', 'false']"
            return False, errMsg

        return True, errMsg

    def initMember(self, options):
        super(TopicAddCmd, self).initMember(options)
        self.topicName = options.topicName
        self.partCnt = options.partCnt
        self.rangeCnt = options.rangeCnt
        self.partBuf = options.partBuf
        self.partFileBuf = options.partFileBuf
        self.partMinBuf = options.partMinBuf
        self.partMaxBuf = options.partMaxBuf
        self.partResource = options.partResource
        self.partLimit = options.partLimit
        self.timeout = options.timeout
        self.needFieldFilter = options.fieldFilter
        if options.topicMode == "normal_mode":
            self.topicMode = TOPIC_MODE_NORMAL
        elif options.topicMode == "security_mode":
            self.topicMode = TOPIC_MODE_SECURITY
        elif options.topicMode == "memory_only_mode":
            self.topicMode = TOPIC_MODE_MEMORY_ONLY
        elif options.topicMode == "memory_prefer_mode":
            self.topicMode = TOPIC_MODE_MEMORY_PREFER
        elif options.topicMode == "persist_data_mode":
            self.topicMode = TOPIC_MODE_PERSIST_DATA
        self.obsoleteFileInterval = options.obsoleteFileInterval
        self.reservedFileCount = options.reservedFileCount
        self.deleteTopicData = options.deleteTopicData
        self.commitTime = options.commitTime
        self.commitData = options.commitData
        self.compressMsg = options.compressMsg
        self.compressThres = options.compressThres
        self.dfsRoot = options.dfsRoot
        self.topicGroup = options.topicGroup
        self.extendDfsRoot = options.extendDfsRoot
        self.expiredTime = options.expiredTime
        self.owners = options.owners
        self.needSchema = options.needSchema
        self.schemaVersions = options.schemaVersions
        self.requestTimeout = options.requestTimeout
        self.sealed = True if (options.sealed.lower() == 'true') else False
        if options.topicType == "TOPIC_TYPE_NORMAL":
            self.topicType = TOPIC_TYPE_NORMAL
        elif options.topicType == "TOPIC_TYPE_PHYSIC":
            self.topicType = TOPIC_TYPE_PHYSIC
        elif options.topicType == "TOPIC_TYPE_LOGIC":
            self.topicType = TOPIC_TYPE_LOGIC
        elif options.topicType == "TOPIC_TYPE_LOGIC_PHYSIC":
            self.topicType = TOPIC_TYPE_LOGIC_PHYSIC
        self.physicTopicLst = options.physicTopicLst
        self.enableTTLDel = False if (options.enableTTLDel.lower() == 'false') else True
        self.enableLongPolling = True if (options.enableLongPolling.lower() == 'true') else False
        self.versionControl = True if (options.versionControl.lower() == 'true') else False
        self.enableMergeData = True if (options.enableMergeData.lower() == 'true') else False
        self.permitUser = options.permitUser
        self.readNotCommmitMsg = False if (options.readNotCommmitMsg.lower() == 'false') else True

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.createTopic(
            topicName=self.topicName,
            partCnt=self.partCnt, rangeCnt=self.rangeCnt, partBufSize=self.partBuf,
            partMinBufSize=self.partMinBuf, partMaxBufSize=self.partMaxBuf,
            partResource=self.partResource, partLimit=self.partLimit,
            topicMode=self.topicMode, needFieldFilter=self.needFieldFilter,
            obsoleteFileTimeInterval=self.obsoleteFileInterval,
            reservedFileCount=self.reservedFileCount,
            partFileBufSize=self.partFileBuf,
            deleteTopicData=self.deleteTopicData,
            securityCommitTime=self.commitTime,
            securityCommitData=self.commitData,
            compressMsg=self.compressMsg,
            compressThres=self.compressThres,
            dfsRoot=self.dfsRoot,
            topicGroup=self.topicGroup,
            extendDfsRoot=self.extendDfsRoot,
            expiredTime=self.expiredTime,
            owners=self.owners,
            needSchema=self.needSchema,
            schemaVersions=self.schemaVersions,
            requestTimeout=self.requestTimeout,
            sealed=self.sealed,
            topicType=self.topicType,
            physicTopicLst=self.physicTopicLst,
            enableTTLDel=self.enableTTLDel,
            enableLongPolling=self.enableLongPolling,
            versionControl=self.versionControl,
            enableMergeData=self.enableMergeData,
            permitUser=self.permitUser,
            readNotCommmitMsg=self.readNotCommmitMsg)
        if not ret:
            print "Add topic Failed!"
            return ret, response, 1

        print time.time(), "Add topic success!"
        if self.timeout is not None:
            print time.time(), "wait topic ready."
            return self._waitTopicReady(self.timeout)

        return ret, response, 0

    def _waitTopicReady(self, timeout):
        while timeout > 0:
            if self._isTopicReady(self.topicName):
                print time.time(), "topic [%s] is ready" % self.topicName
                return "", "", 0
            timeout -= 0.1
            time.sleep(0.1)
        return "", "wait topic ready timeout.", -1

    def _isTopicReady(self, topicName):
        ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName)
        if ret:
            if response.HasField(swift_common_define.PROTO_TOPIC_INFO):
                topicInfo = response.topicInfo
                for partitionInfo in topicInfo.partitionInfos:
                    if partitionInfo.status != PARTITION_STATUS_RUNNING:
                        print "ERROR: partition info status: %s, response: %s" % \
                            (str(partitionInfo), str(response))
                        return False
            else:
                print "ERROR: response does not have topicInfo field! response: %s" % str(response)
                return False
        else:
            print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg)
            return False
        return True


class TopicBatchAddCmd(base_cmd.BaseCmd):
    '''
    swift {atb|addtopicbatch}
       {-z zookeeper_address           | --zookeeper=zookeeper_address }
       {-t topic_name                  | --topic=topic_name }
       {-c partition_count             | --partcount=partition_count }
       {-I                             | --permitUsers=permitUsers}
    options:
       -z zookeeper_address,      --zookeeper=zookeeper_address         : required, zookeeper root address
       -t topic_names,            --topic=topic_names                    : required, topic name, eg: news,old
       -c partition_count,        --partcount=partition_count           : required, partition count of the topic
       -I permitUsers,            --permitUsers=permitUsers             : optional, define same as addtopic, seperate by \\;
Example:
    swift at -z zfs://10.250.12.23:1234/root -t news\\;old -c 20\\;3 -x 10\\;20 -O wls,sdd,xieq\\;wls
    separator is ; and do not forget escape \\ when needed
    '''

    def addOptions(self):
        super(TopicBatchAddCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicNames')
        self.parser.add_option('-c', '--partcount', action='store', dest='partCnt')
        self.parser.add_option('-R', '--rangecount', action='store', dest='rangeCnt')
        self.parser.add_option('-b', '--partfilebuf', action='store',
                               dest='partFileBuf')
        self.parser.add_option('-x', '--partminbuf', action='store', dest='partMinBuf')
        self.parser.add_option('-y', '--partmaxbuf', action='store', dest='partMaxBuf')
        self.parser.add_option('-r', '--partresource', action='store', dest='partResource')
        self.parser.add_option('-l', '--partlimit', action='store', dest='partLimit')
        self.parser.add_option('-w', '--waittopicready', type='int',
                               action='store', dest='timeout')
        self.parser.add_option('-m', '--topicmode',
                               action='store', dest='topicMode')
        # choices=['normal_mode', 'security_mode', 'memory_only_mode', 'memory_prefer_mode']
        self.parser.add_option('-f', '--fieldfilter',
                               action='store',
                               dest='fieldFilter')
        # default=False
        self.parser.add_option('-i', '--obsoletefileinterval',
                               action='store', dest='obsoleteFileInterval')
        self.parser.add_option('-n', '--reservedfilecount',
                               action='store', dest='reservedFileCount')
        self.parser.add_option('-d', '--deletetopicdata',
                               action='store',
                               dest='deleteTopicData')
        # default=False
        self.parser.add_option('-o', '--committime',
                               action='store', dest='commitTime')
        self.parser.add_option('-p', '--commitdata',
                               action='store', dest='commitData')
        self.parser.add_option('-O', '--owners', action='store', dest='owners')
        self.parser.add_option('-q', '--compress',
                               action='store',
                               dest='compressMsg')
        # default=False
        self.parser.add_option('-u', '--compressthres',
                               action='store',
                               dest='compressThres')
        self.parser.add_option('-a', '--dfsRoot', action='store',
                               dest='dfsRoot')
        self.parser.add_option('-e', '--extendDfsRoot', action='store',
                               dest='extendDfsRoot')
        self.parser.add_option('-g', '--topicGroup', action='store',
                               dest='topicGroup')
        self.parser.add_option('-j', '--expired',
                               action='store', dest='expiredTime')
        self.parser.add_option('-I', "--permitUsers", action="store", dest="permitUsers")
        self.parser.add_option('-Z', '--readnotcommitmsg', action='store', dest='readNotCommmitMsg')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicBatchAddCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.topicNames is None:
            errMsg = "ERROR: topic names must be specified!"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(TopicBatchAddCmd, self).initMember(options)
        self.topicNames = options.topicNames
        self.partCnt = options.partCnt
        self.rangeCnt = options.rangeCnt
        self.partFileBuf = options.partFileBuf
        self.partMinBuf = options.partMinBuf
        self.partMaxBuf = options.partMaxBuf
        self.partResource = options.partResource
        self.partLimit = options.partLimit
        self.timeout = options.timeout
        self.needFieldFilter = options.fieldFilter
        self.topicMode = options.topicMode
        self.obsoleteFileInterval = options.obsoleteFileInterval
        self.reservedFileCount = options.reservedFileCount
        self.deleteTopicData = options.deleteTopicData
        self.commitTime = options.commitTime
        self.commitData = options.commitData
        self.compressMsg = options.compressMsg
        self.compressThres = options.compressThres
        self.dfsRoot = options.dfsRoot
        self.topicGroup = options.topicGroup
        self.extendDfsRoot = options.extendDfsRoot
        self.expiredTime = options.expiredTime
        self.owners = options.owners
        self.permitUsers = options.permitUsers
        self.readNotCommmitMsg = options.readNotCommmitMsg

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.createTopicBatch(
            topicNames=self.topicNames,
            partCnt=self.partCnt, rangeCnt=self.rangeCnt,
            partMinBufSize=self.partMinBuf, partMaxBufSize=self.partMaxBuf,
            partResource=self.partResource, partLimit=self.partLimit,
            topicMode=self.topicMode, needFieldFilter=self.needFieldFilter,
            obsoleteFileTimeInterval=self.obsoleteFileInterval,
            reservedFileCount=self.reservedFileCount,
            partFileBufSize=self.partFileBuf,
            deleteTopicData=self.deleteTopicData,
            securityCommitTime=self.commitTime,
            securityCommitData=self.commitData,
            compressMsg=self.compressMsg,
            compressThres=self.compressThres,
            dfsRoot=self.dfsRoot,
            topicGroup=self.topicGroup,
            extendDfsRoot=self.extendDfsRoot,
            expiredTime=self.expiredTime,
            owners=self.owners,
            permitUsers=self.permitUsers,
            readNotCommmitMsg=self.readNotCommmitMsg)
        if not ret:
            print "Add topic Failed!"
            return ret, response, 1

        print "Add topic success!"
        if self.timeout is not None:
            print "wait topic ready."
            return self._waitTopicReady(self.timeout)

        return ret, response, 0

    def _waitTopicReady(self, timeout):
        topicLst = self.topicNames.split(';')
        while timeout > 0:
            readyCount = 0
            for topic in topicLst:
                if self._isTopicReady(topic):
                    print "topic [%s] is ready" % topic
                    readyCount += 1
                else:
                    print "topic [%s] is not ready" % topic
                if readyCount == len(topicLst):
                    return "", "", 0
            timeout -= 0.1
            time.sleep(0.1)
        return "", "wait topic ready timeout.", -1

    def _isTopicReady(self, topicName):
        ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName)
        if ret:
            if response.HasField(swift_common_define.PROTO_TOPIC_INFO):
                topicInfo = response.topicInfo
                for partitionInfo in topicInfo.partitionInfos:
                    if partitionInfo.status != PARTITION_STATUS_RUNNING:
                        print "ERROR: partition info status: %s, response: %s" % \
                            (str(partitionInfo), str(response))
                        return False
            else:
                print "ERROR: response does not have topicInfo field! response: %s" % str(response)
                return False
        else:
            print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg)
            return False
        return True


class TopicModifyCmd(base_cmd.BaseCmd):
    '''
    swift {mt|modifytopic}
       {-z zookeeper_address           | --zookeeper=zookeeper_address }
       {-t topic_name                  | --topic=topic_name }
       {-r partition_resource          | --partresource=partition_resource }
       {-l partition_limit             | --partlimit=partition_limit }
       {-g topic_group                 | --group=topic_group }
       {-j expired_time                | --expired_time=expired_time }
       {-m topic_mode                  | --topicmode=topic_mode }
       {-c partition_count             | --partcount=partition_count }
       {-C partition_range_count       | --partrangecount=partition_range_count }
       {-y partition_max_buf_size      | --partmaxbuf=partition_max_buf_size }
       {-x partition_min_buf_size      | --partminbuf=partition_min_buf_size }
       {-i obsolete_file_interval      | --obsoletefileinterval=obsolete_file_interval }
       {-n reserved_file_count         | --reservedfilecount=reserved_file_count }
       {-o security_commit_time        | --committime=security_commit_time }
       {-p security_commit_data        | --commitdata=security_commit_data }
       {-O owners                      | --owners=owners }
       {-a                             | --dfsroot=dfs_root }
       {-e                             | --extenddfsroot=extend_dfs_root }
       {-E                             | --sealed=sealed }
       {-Y                             | --topicType=topic_type }
       {-P                             | --physicTopicLst=physic_topic_list}
       {-N                             | --enableTTLDel=enable_ttl_del}
       {-L                             | --readSizeLimitSec=read_size_limit_sec}
       {-G                             | --enableLongPolling=enable_long_polling}
       {-M                             | --enableMergeData=enable_merge_data}
       {-I permitUser                  | --permitUser=permitUser }
       {-v version_controller          | --versionControl=version_control }
       {-Z                             | --readnotcommitmsg=read_notcommit_msg}
    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address     : required, zookeeper root address
       -t topic_names,        --topic=topic_names               : required, topic name, eg: news,news1
       -f topic_file,         --topic_file=topic_file           : optional, topic file, will append in topic_names, one topic per line
       -r partition_resource  --partresource=partition_resource : optional, partition resource
       -l partition_limit     --partlimit=partition_limit       : optional, partition limit of this topic in one broker
       -g topic_group         --group=topic_group               : optional, change topic group name
       -j expired_time        --expired_time=expired_time       : optional, change expired time for topic, -1 no expired,  second
       -m topic_mode              --topicmode=topic_mode                : optional, normal_mode | security_mode | memory_only_mode | memory_prefer_mode, default is normal_mode
       -c partition_count,        --partcount=partition_count           : optional, partition count of the topic
       -C partition_range_count,  --partrangecount=partition_range_count  : optional, partition range count of the topic
       -x partition_min_buf_size  --partmaxbuf=partition_max_buf_size   : [optional], partition max buffer size of the topic, M
       -y partition_max_buf_size  --partminbuf=partition_min_buf_size   : [optional], partition min buffer size of the topic, M
       -i obsolete_file_interval  --obsoletefileinterval=obsolete_file_interval : optional, obsolete file time interval, unit is hour.
       -n reserved_file_count     --reservedfilecount=reserved_file_count       : optional, reserved file count
       -o security_commit_time    --committime=security_commit_time     : optional, max wait time for commit message in security_mode
       -p security_commit_data    --commitdata=security_commit_data     : optional, max data size for commit message in security_mode
       -O owners                  --owners=owners                       : optional, topic owners, multi owners seperate by ,
       -E                         --sealed                              : optional, is sealed topic or not
       -Y                         --topicType=topic_type                : optional, TOPIC_TYPE_NORMAL | TOPIC_TYPE_PHYSIC | TOPIC_TYPE_LOGIC | TOPIC_TYPE_LOGIC_PHYSIC, default is TOPIC_TYPE_NORMAL
       -P                         --physicTopicLst=physic_topic_list    : optional, physic topic list, seperate by ','
       -N                         --enableTTLDel=enable_ttl_del           : optional, enable TTL delete or not
       -L                         --readSizeLimitSec=read_size_limit_sec  : optional, read dfs limit one sec
       -G                         --enable_long_polling=enableLongPolling : optional, enable long polling
       -M                         --enable_merge_data=enableMergeData : optional, enable merge data
       -I permitUser              --permitUser=permitUser               : optional, users allowed to operate this topic, seperate by ','
       -v version_control         --versionControl=version_control      : optional, writer version controller
       -Z                         --readnotcommitmsg=read_notcommit_msg : optional, whether topic can read not committed msg
Example:
    swift mt -z zfs://10.250.12.23:1234/root -t news -r 10
    swift mt -z zfs://10.250.12.23:1234/root -t news -l 2
    swift mt -z zfs://10.250.12.23:1234/root -t news -r 20 -l 3
    swift mt -z zfs://10.250.12.23:1234/root -t news -g group_name
    swift mt -z zfs://10.250.12.23:1234/root -t news -j -1
    swift mt -z zfs://10.250.12.23:1234/root -t news -m memory_only_mode
    swift mt -z zfs://10.250.12.23:1234/root -t news --permitUser=Hanmeimei,Lilei
    '''

    def addOptions(self):
        super(TopicModifyCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicName')
        self.parser.add_option('-f', '--topic_file', action='store', dest='topicFile')
        self.parser.add_option('-r', '--partresource', type='int',
                               action='store', dest='partResource')
        self.parser.add_option('-l', '--partlimit', type='int',
                               action='store', dest='partLimit')
        self.parser.add_option('-g', '--group', action='store', dest='topicGroup')
        self.parser.add_option('-j', '--expired_time', type='int',
                               action='store', dest='expiredTime')
        self.parser.add_option('-x', '--partminbuf', type='int',
                               action='store', dest='partMinBuf')
        self.parser.add_option('-y', '--partmaxbuf', type='int',
                               action='store', dest='partMaxBuf')
        self.parser.add_option(
            '-m',
            '--topicmode',
            type='choice',
            action='store',
            dest='topicMode',
            choices=[
                'normal_mode',
                'security_mode',
                'memory_only_mode',
                'memory_prefer_mode',
                'persist_data_mode'])
        self.parser.add_option('-c', '--partcount', type='int',
                               action='store', dest='partCnt')
        self.parser.add_option('-C', '--partrangecount', type='int',
                               action='store', dest='partRangeCnt')
        self.parser.add_option('-i', '--obsoletefileinterval', type='int',
                               action='store', dest='obsoleteFileInterval')
        self.parser.add_option('-n', '--reservedfilecount', type='int',
                               action='store', dest='reservedFileCount')
        self.parser.add_option('-o', '--committime', type='int',
                               action='store', dest='commitTime')
        self.parser.add_option('-p', '--commitdata', type='int',
                               action='store', dest='commitData')
        self.parser.add_option('-O', '--owners', action='store', dest='owners')
        self.parser.add_option('-a', '--dfsRoot', action='store',
                               dest='dfsRoot')
        self.parser.add_option('-e', '--extendDfsRoot', action='store',
                               dest='extendDfsRoot')
        self.parser.add_option('-E', '--sealed', action='store', dest='sealed')
        self.parser.add_option('-Y', '--topic_type', type='choice',
                               action='store', dest='topicType',
                               choices=['TOPIC_TYPE_NORMAL', 'TOPIC_TYPE_PHYSIC',
                                        'TOPIC_TYPE_LOGIC', 'TOPIC_TYPE_LOGIC_PHYSIC'])
        self.parser.add_option('-P', '--physic_topic_list', action='store',
                               dest='physicTopicLst')
        self.parser.add_option('-N', '--enable_ttl_del', action='store',
                               dest='enableTTLDel')
        self.parser.add_option('-L', '--read_size_limit_sec', type='int',
                               action='store', dest='readSizeLimitSec')
        self.parser.add_option('-G', '--enable_long_polling', action='store',
                               dest='enableLongPolling')
        self.parser.add_option('-M', '--enable_merge_data', action='store',
                               dest='enableMergeData')
        self.parser.add_option('-I', '--permitUser', action='store',
                               dest='permitUser', default='')
        self.parser.add_option('-v', '--versionControl', action='store',
                               dest='versionControl')
        self.parser.add_option('-Z', '--readnotcommitmsg', action='store', dest='readNotCommmitMsg')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicModifyCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.topicName is None:
            errMsg = "ERROR: topic must be specified!"
            return False, errMsg

        if options.partResource is not None and options.partResource <= 0:
            errMsg = "ERROR: partition resource [%d] must greater than zero" \
                % options.partResource
            return False, errMsg

        if options.partLimit is not None and options.partLimit <= 0:
            errMsg = "ERROR: partition limit [%d] must greater than zero" % (
                options.partLimit)
            return False, errMsg

        if options.expiredTime is not None and options.expiredTime < -1:
            errMsg = "ERROR: expired time should not less than -1 [%d]." \
                     % options.commitTime
            return False, errMsg

        if options.partMaxBuf is not None and options.partMaxBuf <= 0:
            errMsg = "ERROR: partition max buffer [%d] must greater than zero"\
                     % options.partMaxBuf
            return False, errMsg

        if options.partCnt is not None and options.partCnt <= 0:
            errMsg = "ERROR: partition count [%d] must greater than zero" \
                     % options.partCnt
            return False, errMsg

        if options.partRangeCnt is not None and options.partRangeCnt <= 0:
            errMsg = "ERROR: partition Range count [%d] must greater than zero" \
                     % options.partRangeCnt
            return False, errMsg

        if options.partMinBuf is not None and options.partMinBuf <= 0:
            errMsg = "ERROR: partition min buffer [%d] must greater than zero"\
                     % options.partMinBuf
            return False, errMsg

        if options.partMinBuf is not None and options.partMaxBuf is not None and options.partMinBuf > options.partMaxBuf:
            errMsg = "ERROR: partition min buffer [%d] must not greater than partition max"\
                     " buff [%d]" % (options.partMinBuf, options.partMaxBuf)
            return False, errMsg

        if options.obsoleteFileInterval is not None and options.obsoleteFileInterval <= 0:
            errMsg = "ERROR: obsolete file time interval [%d] must greater than zero" % (
                options.obsoleteFileInterval)
            return False, errMsg

        if options.reservedFileCount is not None and options.reservedFileCount <= 0:
            errMsg = "ERROR: reserved file count [%d] must greater than zero" % (
                options.reservedFileCount)
            return False, errMsg

        if options.sealed is not None and options.sealed not in ['true', 'false']:
            errMsg = "ERROR: sealed can only set ['true', 'false']"
            return False, errMsg

        if options.enableTTLDel is not None and options.enableTTLDel not in ['true', 'false']:
            errMsg = "ERROR: enableTTLDel can only set ['true', 'false']"
            return False, errMsg

        if options.readSizeLimitSec is not None and options.readSizeLimitSec < 0:
            errMsg = "ERROR: readSizeLimitSec [%d] must not less than zero" % (
                options.readSizeLimitSec)
            return False, errMsg

        if options.enableLongPolling is not None and options.enableLongPolling not in ['true', 'false']:
            errorMsg = "ERROR, enableLongPolling can only set ['true', 'false']"
            return False, errMsg

        if options.enableMergeData is not None and options.enableMergeData not in ['true', 'false']:
            errorMsg = "ERROR, enableMergeData can only set ['true', 'false']"
            return False, errMsg

        if options.commitData is not None and options.commitData < 0:
            errMsg = "ERROR: security max commit data should not less than zero [%d]." \
                     % options.commitData
            return False, errMsg

        if options.commitTime is not None and options.commitTime < 0:
            errMsg = "ERROR: security max commit time should not less than zero [%d]." \
                     % options.commitTime
            return False, errMsg

        if options.readNotCommmitMsg is not None and options.readNotCommmitMsg not in ['true', 'false']:
            errorMsg = "ERROR, readNotCommmitMsg can only set ['true', 'false']"
            return False, errMsg

        if options.partLimit is None and options.partResource is None and \
           options.topicGroup is None and options.expiredTime is None and \
           options.partMaxBuf is None and options.partMinBuf is None and \
           options.topicMode is None and options.obsoleteFileInterval is None and \
           options.reservedFileCount is None and options.partCnt is None and \
           options.commitData is None and options.commitTime is None and \
           options.owners is None and options.dfsRoot is None and \
           options.extendDfsRoot is None and options.partRangeCnt is None and\
           options.sealed is None and options.topicType is None and\
           options.physicTopicLst is None and options.enableTTLDel is None and\
           options.readSizeLimitSec is None and options.enableLongPolling is None and\
           options.enableMergeData is None and options.permitUser is None and options.versionControl is None\
           and options.readNotCommmitMsg is None:
            errMsg = "ERROR: modify options: partlimit, topic group, partresource, "\
                     "expired time, partition_max_buf_size, partition_min_buf_size, "\
                     "obsolete_file_interval, reserved_file_count, dfs_root, "\
                     "extend_dfs_root, range_count in partition, owners "\
                     "security max commit data, security max commit time "\
                     "sealed, topic_type, physic_topic_list or enable_ttl_del "\
                     "read_size_limit_sec, enable_long_polling, enableMergeData, "\
                     "readNotCommmitMsg must specify at least one!"
            return False, errMsg

        return True, ''

    def initMember(self, options):
        super(TopicModifyCmd, self).initMember(options)
        self.topicNames = options.topicName.split(',')
        if (options.topicFile):
            f = open(options.topicFile, 'r')
            for line in f.readlines():
                line = line.strip('\n')
                self.topicNames.append(line)
        self.partResource = options.partResource
        self.partLimit = options.partLimit
        self.topicGroup = options.topicGroup
        self.expiredTime = options.expiredTime
        self.partMinBuf = options.partMinBuf
        self.partMaxBuf = options.partMaxBuf
        self.topicMode = None
        if options.topicMode is not None:
            if options.topicMode == "normal_mode":
                self.topicMode = TOPIC_MODE_NORMAL
            elif options.topicMode == "security_mode":
                self.topicMode = TOPIC_MODE_SECURITY
            elif options.topicMode == "memory_only_mode":
                self.topicMode = TOPIC_MODE_MEMORY_ONLY
            elif options.topicMode == "memory_prefer_mode":
                self.topicMode = TOPIC_MODE_MEMORY_PREFER
            elif options.topicMode == "persist_data_mode":
                self.topicMode = TOPIC_MODE_PERSIST_DATA
        self.obsoleteFileInterval = options.obsoleteFileInterval
        self.reservedFileCount = options.reservedFileCount
        self.partCnt = options.partCnt
        self.partRangeCnt = options.partRangeCnt
        self.owners = options.owners
        self.dfsRoot = options.dfsRoot
        self.extendDfsRoot = options.extendDfsRoot
        self.sealed = None
        if options.sealed is not None:
            self.sealed = True if (options.sealed.lower() == 'true') else False
        self.topicType = None
        if options.topicType is not None:
            if options.topicType == "TOPIC_TYPE_NORMAL":
                self.topicType = TOPIC_TYPE_NORMAL
            elif options.topicType == "TOPIC_TYPE_PHYSIC":
                self.topicType = TOPIC_TYPE_PHYSIC
            elif options.topicType == "TOPIC_TYPE_LOGIC":
                self.topicType = TOPIC_TYPE_LOGIC
            elif options.topicType == "TOPIC_TYPE_LOGIC_PHYSIC":
                self.topicType = TOPIC_TYPE_LOGIC_PHYSIC
        self.physicTopicLst = options.physicTopicLst
        self.enableTTLDel = None
        if options.enableTTLDel is not None:
            self.enableTTLDel = False if (options.enableTTLDel.lower() == 'false') else True
        self.readSizeLimitSec = options.readSizeLimitSec
        self.enableLongPolling = None
        if options.enableLongPolling is not None:
            self.enableLongPolling = True if (options.enableLongPolling.lower() == 'true') else False
        self.enableMergeData = None
        if options.enableMergeData is not None:
            self.enableMergeData = True if (options.enableMergeData.lower() == 'true') else False
        self.commitTime = options.commitTime
        self.commitData = options.commitData
        self.permitUser = options.permitUser
        self.versionControl = None
        if options.versionControl is not None:
            self.versionControl = True if (options.versionControl.lower() == 'true') else False
        self.readNotCommmitMsg = None
        if options.readNotCommmitMsg is not None:
            self.readNotCommmitMsg = False if (options.readNotCommmitMsg.lower() == 'false') else True

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        responses = []
        fail_cnt = 0
        for topicName in self.topicNames:
            topicName = topicName.strip()
            ret, response, errorMsg = self.adminDelegate.modifyTopic(
                topicName, self.partResource, self.partLimit, self.topicGroup,
                self.expiredTime, self.partMinBuf, self.partMaxBuf, self.topicMode,
                self.obsoleteFileInterval, self.reservedFileCount, self.partCnt,
                self.owners, self.dfsRoot, self.extendDfsRoot, self.partRangeCnt,
                self.sealed, self.topicType, self.physicTopicLst, self.enableTTLDel,
                self.readSizeLimitSec, self.enableLongPolling, self.commitTime,
                self.commitData, self.enableMergeData, self.permitUser, self.versionControl, self.readNotCommmitMsg)
            if not ret:
                fail_cnt += 1
                print("Modify topic [%s] failed!" % topicName)
                print(response)
            else:
                print("Modify topic [%s] success!" % topicName)
            responses.append(response)
        error_code = 1 if fail_cnt == len(self.topicNames) else 0
        return "", responses, error_code


class TopicDeleteCmd(base_cmd.BaseCmd):
    '''
    swift {dt|deletetopic}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-t topic_name          | --topic=topic_name }
       {-f file_name           | --file=file_name }

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -t topic_name,         --topic=topic_name              : optional, topic name, eg: news
       -f file_name,          --file=file_name                : optional,
 file name

Example:
    swift dt -z zfs://10.250.12.23:1234/root -t news
    swift dt -z zfs://10.250.12.23:1234/root -f file_name
    '''

    def addOptions(self):
        super(TopicDeleteCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicName')
        self.parser.add_option('-f', '--file', action='store', dest='fileName')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicDeleteCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.topicName is None and options.fileName is None:
            errMsg = "ERROR: topic or file must be specified!"
            return False, errMsg
        if options.topicName is not None and options.fileName is not None:
            errMsg = "ERROR: must specified one of topic and file"
            return False, errMsg

        return True, ''

    def initMember(self, options):
        super(TopicDeleteCmd, self).initMember(options)
        self.topicName = options.topicName
        self.fileName = options.fileName
        self.localFileUtil = local_file_util.LocalFileUtil()
        self.failTopicFile = './fail_topic_list_for_delete'

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        failTopicList = []
        if self.topicName is not None:
            ret, respone, errorMsg = self.adminDelegate.deleteTopic(self.topicName)
            if not ret:
                print "Delete topic failed!"
                print errorMsg
                return ret, respone, 1
            print "Delete topic success!"
            return ret, respone, 0
        elif self.fileName is not None:
            topics = self.localFileUtil.cat(self.fileName)
            topicVec = topics.split('\n')
            for topicName in topicVec:
                ret, respone, errorMsg = self.adminDelegate.deleteTopic(topicName)
                if not ret:
                    print "Delete topic %s failed!" % topicName
                    print errorMsg
                    failTopicList.append(topicName)
                else:
                    print "Delete topic %s success!" % topicName
            if not self.writeFailTopic(failTopicList):
                print "write fail topic to file:fail_topic_list failed"
                return ret, respone, 1
            if len(failTopicList) == 0:
                return ret, respone, 0
            else:
                return ret, respone, 1

    def writeFailTopic(self, failTopicList):
        topicStr = ''
        for failTopic in failTopicList:
            topicStr += failTopic + '\n'
        return self.localFileUtil.write(topicStr, self.failTopicFile, 'w')


class TopicDeleteBatchCmd(base_cmd.BaseCmd):
    '''
    swift {dtb|deletetopicbatch}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-t topic_names         | --topic=topic_name1,topic_name2,...}
       {-f file_name           | --file=file_name }

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -t topic_names,        --topic=topic_name1,topic_name2,...  : optional, topic name, eg: news,old,test
       -f file_name,          --file=file_name                : optional,
 file name

Example:
    swift dtb -z zfs://10.250.12.23:1234/root -t news,old
    swift dtb -z zfs://10.250.12.23:1234/root -f file_name
    '''

    def addOptions(self):
        super(TopicDeleteBatchCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicNames')
        self.parser.add_option('-f', '--file', action='store', dest='fileName')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicDeleteBatchCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.topicNames is None and options.fileName is None:
            errMsg = "ERROR: topic or file must be specified!"
            return False, errMsg
        if options.topicNames is not None and options.fileName is not None:
            errMsg = "ERROR: must specified one of topic and file"
            return False, errMsg

        return True, ''

    def initMember(self, options):
        super(TopicDeleteBatchCmd, self).initMember(options)
        self.topicNames = options.topicNames
        self.fileName = options.fileName
        self.localFileUtil = local_file_util.LocalFileUtil()
        self.failTopicFile = './fail_topic_list_for_delete'

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        failTopicList = []
        if self.topicNames is not None:
            topicVec = self.topicNames.split(',')
            ret, respone, errorMsg = self.adminDelegate.deleteTopicBatch(topicVec)
            if self.fromApi:
                return ret, respone, errorMsg
            if not ret:
                print "Delete topic failed!"
                print errorMsg
                return "", "", 1
            print "Delete topic %s success!" % str(topicVec)
            return "", "", 0
        elif self.fileName is not None:
            topics = self.localFileUtil.cat(self.fileName)
            topicVec = topics.split('\n')
            ret, respone, errorMsg = self.adminDelegate.deleteTopicBatch(topicVec)
            if self.fromApi:
                return ret, respone, errorMsg
            if not ret:
                print "Delete topic in file failed!"
                print errorMsg
                failTopicList.append(topicVec)
            else:
                print "Delete topic %s success!" % str(topicVec)
            if len(failTopicList) > 0 and not self.writeFailTopic(failTopicList):
                print "write fail topic to file:fail_topic_list failed"
                return "", "", 1
            if len(failTopicList) == 0:
                return "", "", 0
            else:
                return "", "", 1

    def writeFailTopic(self, failTopicList):
        topicStr = ''
        for failTopic in failTopicList:
            topicStr += failTopic + '\n'
        return self.localFileUtil.write(topicStr, self.failTopicFile, 'w')


class TopicNamesCmd(base_cmd.BaseCmd):
    '''
    swift {gtn|gettopicnames}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-f file_name           | --file=file_name }
    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -f file_name           --file=file_name
Example:
    swift gtn -z zfs://10.250.12.23:1234/root
    swift gtn -z zfs://10.250.12.23:1234/root -f file.txt

    '''

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def addOptions(self):
        super(TopicNamesCmd, self).addOptions()
        self.parser.add_option('-f', '--file', action='store', dest='fileName')

    def initMember(self, options):
        super(TopicNamesCmd, self).initMember(options)
        self.fileName = options.fileName
        self.localFileUtil = local_file_util.LocalFileUtil()

    def writeTopicName(self, topicNames):
        topicStr = ''
        for topicName in topicNames:
            print topicName
            topicStr += topicName + '\n'
        return self.localFileUtil.write(topicStr, self.fileName, 'w')

    def run(self):
        ret, response, errMsg = self.adminDelegate.getAllTopicName()
        if self.fromApi:
            return ret, response, errMsg
        if not ret:
            print "Get all topic name failed! "
            print errMsg
            return "", "", 1
        else:
            print "Get topic names success! "
            if len(response.names) == 0:
                print "Swift system has no topic!"
            else:
                if self.fileName:
                    self.writeTopicName(response.names)
                else:
                    namesStr = ", ".join(response.names)
                    print "Topic names: [%s]" % namesStr
        return "", "", 0


class TopicInfosCmd(base_cmd.BaseCmd):
    '''
    swift {gti|gettopicinfo}
       {-z zookeeper_address   | --zookeeper=zookeeper_address}
       {-t topic_name          | --topic=topic_name}
       [-c cmd_type            | --cmdtype=cmd_type]

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -t topic_name,         --topic=topic_name              : required, topic name, eg: news
       -c cmd_type,           --cmdtype=cmd_type              : optional, command type, default: summary
          summary:            topic summary infos
          message:            protocol message
          verbose:            topic verbose infos
       -s sortType            --sort=sortType                 : optional, partid(default)|brokeraddress| partstatus
       -g group_name          --group=group_name              : optional, group name when get all topic info
       -T                         --request_time_out=optional           : optional, timeout for request, ms

Example:
    swift gti -z zfs://10.250.12.23:1234/root
    swift gti -z zfs://10.250.12.23:1234/root -t news
    swift gti -z zfs://10.250.12.23:1234/root -t news -c message
    swift gti -z zfs://10.250.12.23:1234/root -t news -c verbose
    swift gti -z zfs://10.250.12.23:1234/root -t news -c verbose -s brokeraddress
    '''

    def addOptions(self):
        super(TopicInfosCmd, self).addOptions()
        self.parser.add_option('-t', '--topic',
                               action='store',
                               dest='topicName')
        self.parser.add_option('-g', '--group',
                               action='store',
                               dest='groupName')
        self.parser.add_option('-c', '--cmdtype',
                               type='choice',
                               action='store',
                               choices=['summary', 'message', 'verbose'],
                               dest='cmdType',
                               default='summary')
        self.parser.add_option('-s', '--sort',
                               type='choice',
                               action='store',
                               dest='sortType',
                               choices=[swift_common_define.PART_SORT_PART_ID,
                                        swift_common_define.PART_SORT_PART_STATUS,
                                        swift_common_define.PART_SORT_BROKER_ADDRESS],
                               default='partid')
        self.parser.add_option('-T', '--optional', type='int',
                               action='store', dest='requestTimeout')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicInfosCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.topicName is None:
            if options.cmdType != 'summary':
                errMsg = "ERROR: getAllTopicInfos method only support summary cmd type!"
                return False, errMsg

        return True, ''

    def initMember(self, options):
        super(TopicInfosCmd, self).initMember(options)
        self.topicName = options.topicName
        self.cmdType = options.cmdType
        self.sortType = options.sortType
        self.groupName = options.groupName
        self.requestTimeout = options.requestTimeout

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil,
            self.zfsAddress,
            self.adminLeader,
            self.options.username,
            self.options.passwd,
            self.options.accessId,
            self.options.accessKey)
        return True

    def printTopicInfo(self):
        ret, response, errMsg = self.adminDelegate.getTopicInfo(self.topicName,
                                                                self.requestTimeout)
        if self.fromApi:
            return ret, response, errMsg
        if not ret:
            print "Get topic info failed!"
            print errMsg
            return "", "", 1

        if not response.HasField(swift_common_define.PROTO_TOPIC_INFO):
            print "ERROR: response does not have topicInfo field!"
            return "", "", 1

        topicInfo = response.topicInfo
        if self.cmdType == "summary" or self.cmdType == 'verbose':
            topicInfoAnalyzer = status_analyzer.TopicInfoAnalyzer()
            partCount = topicInfoAnalyzer.getPartitionCount(topicInfo)
            print "TopicName: %s" % topicInfo.name
            if topicInfo.topicMode == TOPIC_MODE_NORMAL:
                print "TopicMode: normal_mode"
            elif topicInfo.topicMode == TOPIC_MODE_SECURITY:
                print "TopicMode: security_mode"
                print "MaxCommitTime: %d" % topicInfo.maxWaitTimeForSecurityCommit
                print "MaxCommitSize: %d" % topicInfo.maxDataSizeForSecurityCommit
            elif topicInfo.topicMode == TOPIC_MODE_MEMORY_ONLY:
                print "TopicMode: memory_only_mode "
            elif topicInfo.topicMode == TOPIC_MODE_MEMORY_PREFER:
                print "TopicMode: memory_prefer_mode "
            if topicInfo.needFieldFilter:
                print "NeedFieldFilter: true"
            else:
                print "NeedFieldFilter: false"

            if topicInfo.needSchema:
                print "NeedSchema: true"
            else:
                print "NeedSchema: false"
            print "SchemaVersions: %s" % ','.join([str(x) for x in topicInfo.schemaVersions])
            print "TopicStatus: %s" % (
                swift_util.SwiftUtil.protoEnumToString(topicInfo, "status")[13:])
            print "PartitionCount: %d" % topicInfo.partitionCount
            print "RangeCountInPartition: %d" % topicInfo.rangeCountInPartition
            print "PartitionMaxBufferSize: %d" % topicInfo.partitionMaxBufferSize
            print "PartitionMinBufferSize: %d" % topicInfo.partitionMinBufferSize
            print "PartitionBufferSize: %d" % topicInfo.partitionBufferSize
            print "PartitionFileBufferSize: %d" % topicInfo.partitionFileBufferSize
            print "PartitionResource: %d" % topicInfo.resource
            print "PartitionLimit: %d" % topicInfo.partitionLimit
            print "ObsoleteFileTimeInterval: %d" % topicInfo.obsoleteFileTimeInterval
            print "ReservedFileCount: %d" % topicInfo.reservedFileCount
            if topicInfo.deleteTopicData:
                print "DeleteTopicData: true"
            else:
                print "DeleteTopicData: false"
            print "TopicGroup: %s" % topicInfo.topicGroup
            print "TopicExpiredTime: %d" % topicInfo.topicExpiredTime
            if 0 == len(topicInfo.owners):
                print "owners: %s" % 'NONE'
            else:
                print "owners: %s" % ','.join(topicInfo.owners)
            if topicInfo.topicType == TOPIC_TYPE_NORMAL:
                print "TopicType: TOPIC_TYPE_NORMAL"
            elif topicInfo.topicType == TOPIC_TYPE_PHYSIC:
                print "TopicType: TOPIC_TYPE_PHYSIC"
            elif topicInfo.topicType == TOPIC_TYPE_LOGIC:
                print "TopicType: TOPIC_TYPE_LOGIC"
            elif topicInfo.topicType == TOPIC_TYPE_LOGIC_PHYSIC:
                print "TopicType: TOPIC_TYPE_LOGIC_PHYSIC"
            if 0 == len(topicInfo.physicTopicLst):
                print "physicTopicLst: []"
            else:
                print "physicTopicLst: %s" % ','.join(topicInfo.physicTopicLst)
            if topicInfo.sealed:
                print "Sealed: true"
            else:
                print "Sealed: false"
            if topicInfo.enableTTLDel:
                print "enableTTLDel: true"
            else:
                print "enableTTLDel: false"
            print "readSizeLimitSec: %d" % topicInfo.readSizeLimitSec
            if topicInfo.enableLongPolling:
                print "enableLongPolling: true"
            else:
                print "enableLongPolling: false"
            if topicInfo.versionControl:
                print "versionControl: true"
            else:
                print "versionControl: false"
            if topicInfo.enableMergeData:
                print "enableMergeData: true"
            else:
                print "enableMergeData: false"
            if 0 == len(topicInfo.permitUser):
                print "permitUser: NONE"
            else:
                print "permitUser: %s" % ','.join(topicInfo.permitUser)
            if topicInfo.readNotCommittedMsg:
                print "readNotCommittedMsg: true"
            else:
                print "readNotCommittedMsg: false"
            print("Partition(waiting/starting/running/stopping/recovering/none): "
                  "%d/%d/%d/%d/%d/%d ") % (
                partCount.partWaiting,
                partCount.partStarting,
                partCount.partRunning,
                partCount.partStopping,
                partCount.partRecovering,
                partCount.partNone)

            if self.cmdType == "verbose":
                partInfos = topicInfoAnalyzer.getSortedPartitionInfo(
                    topicInfo, self.sortType)
                if len(partInfos) > 0:
                    print ""
                    print "Partition Detailed Infos:"
                    print '%-8s %-13s %-40s' % (
                        swift_common_define.PART_SORT_PART_ID,
                        swift_common_define.PART_SORT_PART_STATUS,
                        swift_common_define.PART_SORT_BROKER_ADDRESS)
                    for partInfo in partInfos:
                        print '%-8d %-13s %-40s' % (
                            partInfo.id,
                            swift_util.SwiftUtil.protoEnumToString(partInfo, "status")[17:],
                            partInfo.brokerAddress)
        elif self.cmdType == "message":
            print topicInfo

        return "", "", 0

    def printAllTopicInfo(self):
        ret, response, errMsg = self.adminDelegate.getAllTopicInfo()
        if self.fromApi:
            return ret, response, errMsg
        if not ret:
            print "Get topic info failed!"
            print errMsg
            return "", "", 1

        print "Get all topic info success!"

        allTopicInfo = response.allTopicInfo
        for topicInfo in allTopicInfo:
            topicInfoAnalyzer = status_analyzer.TopicInfoAnalyzer()
            partCount = topicInfoAnalyzer.getPartitionCount(topicInfo)
            topicName = topicInfo.name
            if self.groupName and topicInfo.topicGroup != self.groupName:
                continue
            topicStatus = swift_util.SwiftUtil.protoEnumToString(topicInfo, "status")[13:]
            print("%s.TopicStatus[%s].PartitionStatus(waiting/starting/running/stopping/recovering): "
                  "%d/%d/%d/%d/%d ") % (topicName, topicStatus,
                                        partCount.partWaiting + partCount.partNone,
                                        partCount.partStarting, partCount.partRunning,
                                        partCount.partStopping, partCount.partRecovering)
        return "", "", 0

    def run(self):
        if self.topicName is not None:
            return self.printTopicInfo()
        else:
            return self.printAllTopicInfo()


class TopicSimpleInfosCmd(base_cmd.BaseCmd):
    '''
    swift {gtsi|gettopicsimpleinfo}
       {-z zookeeper_address   | --zookeeper=zookeeper_address}

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address

Example:
    swift gtsi -z zfs://10.250.12.23:1234/root
    '''

    def addOptions(self):
        super(TopicSimpleInfosCmd, self).addOptions()

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicSimpleInfosCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        return True, ''

    def initMember(self, options):
        super(TopicSimpleInfosCmd, self).initMember(options)

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errMsg = self.adminDelegate.getAllTopicSimpleInfo()
        if self.fromApi:
            return ret, response, errMsg
        print ret, response, errMsg


class TopicDeleteByTimeCmd(transport_cmd_base.TransportCmdBase):
    '''
    swift {dtt|deletetopicsbytime}
       {-z zookeeper_address     | --zookeeper=zookeeper_address }
       {-i interval              | --interval=interval }
       {-e exclude_topics,       | --exclude=exclude_topics }
       {-f exclude_topics_file,  | --exclude_file=exclude_topics_file }
       {-p prefix_name           | --prefix_name=prefix_name }
       {-t topic_names           | --topic_names=topic_names }

    options:
       -z zookeeper_address,    --zookeeper=zookeeper_address       : required, zookeeper root address
       -i time_interval,        --interval=time_interval            : required, delete topics if the last message arrived interval hours before, (unit:hours)
       -e exclude_topics,       --exclude=exclude_topics            : option, don't delete exclude topics
       -f exclude_topics_file,  --exclude_file=exclude_topics_file  : option, don't delete exclude topics in file,one topic per line
       -p prefix_name           --prefix_name=prefix_name           : option, delete topic prefix with specified
       -t topic_names           --topic_names=topic_names           : option, delete topic with specified topic names
Example:
    swift dtt -z zfs://10.250.12.23:1234/root -i 48
    swift dtt -z zfs://10.250.12.23:1234/root -i 48 -e topic_a,topic_b
    swift dtt -z zfs://10.250.12.23:1234/root -i 48 -f a.txt
    swift dtt -z zfs://10.250.12.23:1234/root -i 1 -p model_
    swift dtt -z zfs://10.250.12.23:1234/root -i 1 -t topic_name_file

    '''

    def addOptions(self):
        super(TopicDeleteByTimeCmd, self).addOptions()
        self.parser.add_option('-i', '--interval', type=int, action='store', dest='interval')
        self.parser.add_option('-e', '--exclude', action='store', dest='excludeTopics')
        self.parser.add_option('-f', '--exclude_file', action='store', dest='excludeFile')
        self.parser.add_option('-p', '--prefix_name', action='store', dest='prefixName')
        self.parser.add_option('-t', '--topic_names', action='store', dest='topicNames')

    def checkOptionsValidity(self, options):
        if not super(TopicDeleteByTimeCmd, self).checkOptionsValidity(options):
            return False

        if options.interval is None:
            print "ERROR: interval must be specified!"
            return False

        return True

    def initMember(self, options):
        super(TopicDeleteByTimeCmd, self).initMember(options)
        self.interval = options.interval
        self.prefixName = options.prefixName
        self.excludeTopics = []
        if options.excludeTopics is not None:
            self.excludeTopics = options.excludeTopics.split(",")
        if options.excludeFile is not None:
            f = open(options.excludeFile, 'r')
            for line in f.readlines():
                line = line.strip('\n')
                self.excludeTopics.append(line)
            f.close()
        print "exclude topics:", self.excludeTopics
        self.topicNames = []
        if options.topicNames is not None:
            f = open(options.topicNames, 'r')
            for line in f.readlines():
                line = line.strip('\n')
                self.topicNames.append(line)
            f.close()
        print "topics:", self.topicNames

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        self.brokerDelegate = swift_broker_delegate.SwiftBrokerDelegate(self.options.username, self.options.passwd)
        return True

    def run(self):
        delTopicNames = self.deleteTopics()
        print delTopicNames
        if (len(delTopicNames) == 0):
            print "no topic need to delete."
        return "", "", 0

    def delTopic(self, topicName):
        timeFormat = '%Y-%m-%d %X'
        ret, reponse, errorMsg = self.adminDelegate.deleteTopic(topicName)
        if not ret:
            print "[%s] Delete topic %s failed!" % (time.strftime(timeFormat, time.localtime()), topicName)
            print errorMsg
        else:
            print "[%s] Delete topic %s success!" % (time.strftime(timeFormat, time.localtime()), topicName)

    def deleteTopics(self):
        delList = []
        ret, response, errMsg = self.adminDelegate.getAllTopicInfo()
        if not ret:
            print "get all topic info failed [%s]" % errMsg
            return delList
        allTopicInfo = response.allTopicInfo
        for topicInfo in allTopicInfo:
            topicInfoAnalyzer = status_analyzer.TopicInfoAnalyzer()
            partCount = topicInfoAnalyzer.getPartitionCount(topicInfo)
            topicName = topicInfo.name
            if len(topicInfo.partitionInfos) == partCount.partRunning:
                if self.needDelete(topicInfo) and topicName not in self.excludeTopics:
                    delList.append(topicName)
                    self.delTopic(topicName)
        return delList

    def needDelete(self, topicInfo):
        topicName = topicInfo.name
        if self.prefixName is not None and not topicName.startswith(self.prefixName):
            return False
        if len(self.topicNames) > 0 and topicName not in self.topicNames:
            return False
        maxTime = 0
        allPartNoData = True
        curTime = int(time.time())
        for partInfo in topicInfo.partitionInfos:
            brokerAddr = "tcp:%s" % partInfo.brokerAddress
            ret, data = self.brokerDelegate.getMaxMessageId(brokerAddr, topicName, partInfo.id)
            if not ret:
                if data.find('ERROR_BROKER_NO_DATA') != -1:
                    allPartNoData = allPartNoData and True
                continue
            response = data
            if response.HasField(swift_common_define.PROTO_TIME_STAMP):
                if response.timestamp > maxTime:
                    maxTime = response.timestamp
        if maxTime != 0:
            maxTime = maxTime / 1000 / 1000
            print "has data, topic name:%s, not active time: %f(hour)" % (topicName, float(curTime - maxTime) / 3600)
            if float(curTime - maxTime) / 3600 > float(self.interval):
                return True
            else:
                return False
        elif allPartNoData:
            createTime = self.getTopicCreateTime(topicInfo)
            print "no data, topic name:%s, not active time: %f(hour)" % (topicName, float(curTime - createTime) / 3600)
            if float(curTime - createTime) / 3600 > float(self.interval):
                return True
            else:
                return False
        else:
            return False

    def getTopicCreateTime(self, topicInfo):
        if topicInfo.HasField("createTime"):
            return int(topicInfo.createTime) / 1000 / 1000
        else:
            path = self.zfsAddress + '/topics/' + topicInfo.topicName
            ret = self.fileUtil.getMeta(path)
            prefixStr = 'node last modify time: '
            pos = ret.find(prefixStr)
            if (pos != -1):
                timeStr = ret[pos + len(prefixStr):].strip()
                createTime = time.mktime(time.strptime(timeStr, '%Y-%m-%d %H:%M:%S'))
                return int(createTime)
            else:
                return -1


class TopicDataDeleteCmd(base_cmd.BaseCmd):
    '''
    swift {dtd|deletetopicdata}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-d dfs_root            | --dfs=dfs_root }
       {-i interval            | --interval=interval }
       {-e exclude             | --exclude=exclude_topics }
       {-t topic_name          | --topic_name=topic_name }
       {-f exclude_file        | --exclude_file=exclude_file }

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -d dfs_root,           --dfs=dfs_root                  : required, dfs root address
       -i time_interval,      --interval=time_interval        : required, delete topic data if the topic is not running and the last message is arrived interval time ago. (unit:hours)
       -e exclude_topics,     --exclude=exclude_topics        : option, don't delete the specified topic data
       -t topic_name,         --topic=topic_name              : option, only delete the specified topic data
       -f exclude_file,       --exclude_file=exclude_file     : option, don't delete the specified topic data in file

Example:
    swift dtd -z zfs://10.250.12.23:1234/root -i 72 -d hdfs://xxxx/path
    swift dtd -z zfs://10.250.12.23:1234/root -i 72 -d hdfs://xxxx/path -f abc.txt
    swift dtd -z zfs://10.250.12.23:1234/root -i 72 -d hdfs://xxxx/path -e topic_a,topic_b
    swift dtd -z zfs://10.250.12.23:1234/root -i 72 -t topic_a  -d hdfs://xxxx/path

    '''

    def addOptions(self):
        super(TopicDataDeleteCmd, self).addOptions()
        self.parser.add_option('-d', '--dfs', action='store', dest='dfsRoot')
        self.parser.add_option('-i', '--interval', type='int', action='store', dest='interval')
        self.parser.add_option('-e', '--exclude', action='store', dest='excludeTopics')
        self.parser.add_option('-t', '--topic_name', action='store', dest='topicName')
        self.parser.add_option('-f', '--exclude_file', action='store', dest='excludeFile')

    def checkOptionsValidity(self, options):
        if not super(TopicDataDeleteCmd, self).checkOptionsValidity(options):
            return False
        if options.interval is None:
            print "ERROR: interval must be specified!"
            return False
        if options.dfsRoot is None:
            print "ERROR: dfsRoot must be specified!"
            return False
        return True

    def initMember(self, options):
        super(TopicDataDeleteCmd, self).initMember(options)
        self.interval = options.interval
        self.excludeTopics = []
        if options.excludeTopics is not None:
            self.excludeTopics = options.excludeTopics.split(",")
        if options.excludeFile is not None:
            f = open(options.excludeFile, 'r')
            for line in f.readlines():
                line = line.strip('\n')
                self.excludeTopics.append(line)
            f.close()
        print "exclude topics:", self.excludeTopics
        self.specifiedTopic = options.topicName
        self.dfsRoot = options.dfsRoot

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        runningTopicName = self.getAllRunningTopicNames()
        print runningTopicName
        if self.specifiedTopic is not None and self.specifiedTopic in runningTopicName:
            print "the specified topic [%s] still running, can't delete the data." % self.specifiedTopic
            return "", "", 1
        allTopicName = self.getCandidataTopicNames()
        print allTopicName
        delTopicName = []
        for topicName in allTopicName:
            if topicName not in runningTopicName and topicName not in self.excludeTopics:
                if self.specifiedTopic is not None:
                    if topicName == self.specifiedTopic:
                        delTopicName.append(self.specifiedTopic)
                else:
                    delTopicName.append(topicName)
        self.delTopicData(delTopicName)
        return "", "", 0

    def getAllRunningTopicNames(self):
        topicList = []
        ret, response, errMsg = self.adminDelegate.getAllTopicInfo()
        if not ret:
            print "get all topic info failed [%s]." % errMsg
            return topicList
        allTopicInfo = response.allTopicInfo
        for topicInfo in allTopicInfo:
            topicList.append(topicInfo.name)
        return topicList

    def getCandidataTopicNames(self):
        topicList = []
        self.fileUtil.addHadoopHome(self.toolsConfig.getHadoopHome())
        print "get topic info in ", self.dfsRoot
        dirs = self.fileUtil.listDir(self.dfsRoot)
        print "all topic count [%d]" % len(dirs)
        curTime = int(time.time())
        for topicName in dirs:
            maxTime = self.getTopicLastModifyTime(self.fileUtil, self.dfsRoot + "/" + topicName)
            print "topic name:%s, not active time: %f(hour)" % (topicName, float(curTime - maxTime) / 3600)
            if float(curTime - maxTime) / 3600 > float(self.interval):
                topicList.append(topicName)
        return topicList

    def getTopicLastModifyTime(self, fileUtil, topicDataPath):
        modifyTime = -1
        partitions = fileUtil.listDir(topicDataPath)
        if len(partitions) == 0:
            return modifyTime
        for partition in partitions:
            files = fileUtil.listDir(topicDataPath + "/" + partition)
            if len(files) == 0:
                continue
            else:
                files.sort()
                metaInfo = fileUtil.getMeta(topicDataPath + "/" + partition + "/" + files[len(files) - 1])
                prefix = "file last modify time: "
                pos = metaInfo.find(prefix)
                if pos != -1:
                    timeStr = metaInfo[pos + len(prefix):]
                    timeStr = timeStr.strip()
                    createTime = int(time.mktime(time.strptime(timeStr, '%Y-%m-%d %H:%M:%S')))
                    if createTime > modifyTime:
                        modifyTime = createTime
        return modifyTime

    def delTopicData(self, topicNames):
        for topic in topicNames:
            print "deleting topic data [%s]" % topic
            ret = self.fileUtil.remove(self.dfsRoot + "/" + topic)
            if not ret:
                print "delete topic data [%s] failed!" % topic


class ExportTopicsCmd(base_cmd.BaseCmd):
    '''
    swift {et|exporttopics}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-f export_file_name    | --file=export_file_name }
       {-g group_name          | --group=group_name }
       {-p prefix              | --prefix=prefix_name }
       {-t topic_names         | --topic=topic_names }

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -f export_file_name,   --file=export_file_name         : required, export file name
       -g group_name,         --group=group_name              : optional, export group name, default export all topic
       -m migrateDfs,         --migrate=migrate_dfs           : optional, migrate dfs, add new hdfs, add current dfs into extendDfs root
       -p prefix,             --prefix = prefix               : optional, export topic info with prefix
       -i topicName,          --topic = topic_names           : optional, export topic info with topic_name file
Example:
    swift et -z zfs://10.250.12.23:1234/root -f a.json
    swift et -z zfs://10.250.12.23:1234/root -f a.json -g swift
    swift et -z zfs://10.250.12.23:1234/root -f a.json -p prefix
    swift et -z zfs://10.250.12.23:1234/root -f a.json -i topicNameFile
    '''

    def addOptions(self):
        super(ExportTopicsCmd, self).addOptions()
        self.parser.add_option('-f', '--file', action='store', dest='fileName')
        self.parser.add_option('-g', '--group', action='store', dest='groupName')
        self.parser.add_option('-m', '--migrate', action='store', dest='migrateDfs')
        self.parser.add_option('-p', '--prefix', action='store', dest='prefix')
        self.parser.add_option('-i', '--topic', action='store', dest='topicNames')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(ExportTopicsCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.fileName is None:
            errMsg = "ERROR: file name must be specified!"
            print errMsg
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(ExportTopicsCmd, self).initMember(options)
        self.fileName = options.fileName
        self.localFileUtil = local_file_util.LocalFileUtil()
        self.groupName = options.groupName
        self.migrateDfs = options.migrateDfs
        self.prefix = options.prefix
        self.topicNames = []
        if options.topicNames:
            f = open(options.topicNames, 'r')
            for line in f.readlines():
                line = line.strip('\n')
                self.topicNames.append(line)
            f.close()

    def getDfsRoot(self):
        versionFile = self.zfsAddress + "/config/version"
        localConfigFile = "./.swift.conf"
        configFile = self.zfsAddress + "/config/swift.conf"
        if self.fileUtil.exists(versionFile):
            configFile = self.getLatestConfig(self.zfsAddress + "/config") + "/swift.conf"
        self.fileUtil.copy(configFile, localConfigFile, True)
        configLines = self.localFileUtil.readLines(localConfigFile)
        for line in configLines:
            line = line.strip()
            if line.startswith("data_root_path"):
                strVec = line.split("=")
                if len(strVec) == 2:
                    return strVec[1].strip()
        return ""

    def hasDfsRoot(self, metaStr):
        metaVec = metaStr.split('\n')
        for metaMeg in metaVec:
            metaMeg = metaMeg.strip()
            if metaMeg.startswith("dfsRoot"):
                dfsVec = metaMeg.split("Root: ")
                if len(dfsVec) == 2 and len(dfsVec[1].strip()) > 2:
                    return True
        return False

    def hasTopicGroup(self, metaStr):
        metaVec = metaStr.split('\n')
        for metaMeg in metaVec:
            metaMeg = metaMeg.strip()
            if metaMeg.startswith("topicGroup"):
                dfsVec = metaMeg.split("opicGroup: ")
                if len(dfsVec) == 2 and len(dfsVec[1].strip()) > 2:
                    return True
        return False

    def generateMap(self, topicMetas):
        retMap = {}
        for meta in topicMetas.topicMetas:
            tmpMap = {}
            if self.groupName is not None:
                if self.hasTopicGroup(str(meta)):
                    if meta.topicGroup != self.groupName:
                        continue
                else:
                    if self.groupName != 'default':
                        continue
            if self.prefix is not None and not meta.topicName.startswith(self.prefix):
                continue
            if len(self.topicNames) > 0 and meta.topicName not in self.topicNames:
                continue
            metaVec = str(meta).split('\n')
            for metaMsg in metaVec:
                if metaMsg == '':
                    continue
                metaMsg = metaMsg.strip()
                pos = metaMsg.find(':')
                metaMsgVec = []
                metaMsgVec.append(metaMsg[0:pos])
                metaMsgVec.append(metaMsg[pos + 1:len(metaMsg)])
                if metaMsgVec[1] is not None:
                    metaMsgVec[1] = metaMsgVec[1].strip()
                    metaMsgVec[0] = metaMsgVec[0].strip()
                    if metaMsgVec[1].startswith('"') and metaMsgVec[1].endswith('"'):
                        metaMsgVec[1] = metaMsgVec[1].rstrip('"').lstrip('"')
                    if tmpMap.has_key(metaMsgVec[0]):
                        tmpMap[metaMsgVec[0]] += ',' + metaMsgVec[1]
                    else:
                        tmpMap[metaMsgVec[0]] = metaMsgVec[1]
                else:
                    stripKey = metaMsgVec[0].strip()
                    if tmpMap.has_key(stripKey):
                        tmpMap[stripKey] += ','
                    else:
                        tmpMap[stripKey] = ''
            if self.migrateDfs:
                self.updateMigrateDfs(tmpMap)
            retMap[meta.topicName] = tmpMap
        return retMap

    def updateMigrateDfs(self, topicInfo):
        dfsRoot = topicInfo["dfsRoot"]
        if topicInfo.has_key("extendDfsRoot"):
            extendDfsRoot = topicInfo["extendDfsRoot"]
            extendDfsList = extendDfsRoot.split(',')
            if dfsRoot not in extendDfsList:
                extendDfsList.append(dfsRoot)
            if self.migrateDfs in extendDfsList:
                extendDfsList.remove(self.migrateDfs)
            topicInfo["extendDfsRoot"] = ",".join(extendDfsList)
        else:
            topicInfo["extendDfsRoot"] = dfsRoot
        topicInfo["dfsRoot"] = self.migrateDfs

    def run(self):
        metaFile = self.zfsAddress + "/topic_meta"
        localFile = "./.topic_meta"
        self.fileUtil.copy(metaFile, localFile, True)
        metaStr = self.localFileUtil.cat(localFile)
        deCompStr = zlib.decompress(metaStr)
        topicMetas = TopicMetas()
        topicMetas.ParseFromString(deCompStr)
        if self.fromApi:
            return 0, topicMetas, 0
        dfsRoot = self.getDfsRoot()
        for meta in topicMetas.topicMetas:
            if not self.hasDfsRoot(str(meta)):
                meta.dfsRoot = dfsRoot
        retMap = self.generateMap(topicMetas)
        print "export topic count ", len(retMap)
        retJson = json_wrapper.write(retMap, format=True)
        self.localFileUtil.write(retJson, self.fileName, 'w')
        return "", "", 0


class TopicMetaCmd(base_cmd.BaseCmd):
    '''
    swift {gtm|gettopicmetas}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
Example:
    swift gtm -z zfs://10.250.12.23:1234/root
    '''

    def addOptions(self):
        super(TopicMetaCmd, self).addOptions()

    def initMember(self, options):
        super(TopicMetaCmd, self).initMember(options)
        self.localFileUtil = local_file_util.LocalFileUtil()

    def run(self):
        metaFile = self.zfsAddress + "/topic_meta"
        localFile = "./.topic_meta"
        self.fileUtil.copy(metaFile, localFile, True)
        metaStr = self.localFileUtil.cat(localFile)
        if 0 == len(metaStr):
            return "", "", 1
        deCompStr = zlib.decompress(metaStr)
        topicMetas = TopicMetas()
        topicMetas.ParseFromString(deCompStr)
        if self.fromApi:
            return "", topicMetas, 0
        print topicMetas
        return "", "", 0


class ImportTopicsCmd(base_cmd.BaseCmd):
    '''
    swift {it|importtopics}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-f local_file_name     | --file=local_file_name }
       {-w time_out            | --timeout=time_out}
       {-d delete_exist        | --delete_exist=delete_exist}
       {-c continue            | --continue=continue}
       {-g group               | --group=group}

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -f local_file_name,    --file=local_file_name          : required, local file name
       -w time_out,           --timeout=time_out              : optional, timeout for one topic
       -d delete_exist        --delete_exist=delete_exist     : optional, delete exist topic ,default is false
       -c continue_exist      --continue=continue             : optional, continue if add topic has error ,default is false
       -g group               --group=group                   : optional, add in specified group name
Example:
    swift it -z zfs://10.250.12.23:1234/root -f a.json -w 30
    swift it -z zfs://10.250.12.23:1234/root -f a.json -d -c
    swift it -z zfs://10.250.12.23:1234/root -f a.json -g igraph
    '''

    def addOptions(self):
        super(ImportTopicsCmd, self).addOptions()
        self.parser.add_option('-f', '--file', action='store', dest='fileName')
        self.parser.add_option('-w', '--timeout', type='int',
                               action='store', dest='timeout')
        self.parser.add_option('-d', '--delete_exist', action='store_true', dest='deleteExist',
                               default=False)
        self.parser.add_option('-c', '--continue', action='store_true', dest='continueAdd',
                               default=False)
        self.parser.add_option('-g', '--group', action='store', dest='group')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(ImportTopicsCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.fileName is None:
            errMsg = "ERROR: file name must be specified!"
            print errMsg
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(ImportTopicsCmd, self).initMember(options)
        self.fileName = options.fileName
        self.timeout = options.timeout
        self.deleteExist = options.deleteExist
        self.continueAdd = options.continueAdd
        self.localFileUtil = local_file_util.LocalFileUtil()
        self.group = options.group
        self.failTopicFile = './fail_topic_list_for_add_' + str(time.time())

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        content = self.localFileUtil.cat(self.fileName)
        topicMap = json_wrapper.read(content)
        topicName = None
        failTopicList = {}
        hasError = False
        timeFormat = '%Y-%m-%d %X'
        for topic, topicDescription in topicMap.items():
            partitionCount = None
            resource = None
            partitionLimit = None
            topicMode = None
            needFieldFilter = None
            obsoleteFileTimeInterval = None
            reservedFileCount = None
            deleteTopicData = None
            partitionMinBufferSize = None
            partitionMaxBufferSize = None
            maxWaitTimeForSecurityCommit = None
            maxDataSizeForSecurityCommit = None
            compressMsg = None
            compressThres = None
            dfsRoot = None
            topicGroup = None
            extendDfsRoot = None
            topicExpiredTime = None
            rangeCnt = None
            owners = None
            needSchema = None
            schemaVersions = None
            sealed = None
            topicType = None
            physicTopicLst = None
            enableTTLDel = None
            if topicDescription.has_key("topicName"):
                topicName = topicDescription["topicName"]
            if topicDescription.has_key("partitionCount"):
                partitionCount = int(topicDescription["partitionCount"])
            if topicDescription.has_key("resource"):
                resource = int(topicDescription["resource"])
            if topicDescription.has_key("partitionLimit"):
                partitionLimit = int(topicDescription["partitionLimit"])
            if topicDescription.has_key("topicMode"):
                if topicDescription["topicMode"] == 'TOPIC_MODE_NORMAL':
                    topicMode = TOPIC_MODE_NORMAL
                elif topicDescription["topicMode"] == 'TOPIC_MODE_SECURITY':
                    topicMode = TOPIC_MODE_SECURITY
                elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_PREFER':
                    topicMode = TOPIC_MODE_MEMORY_PREFER
                elif topicDescription["topicMode"] == 'TOPIC_MODE_MEMORY_ONLY':
                    topicMode = TOPIC_MODE_MEMORY_ONLY
            if topicDescription.has_key("needFieldFilter"):
                needFieldFilter = topicDescription["needFieldFilter"] == 'true'
            if topicDescription.has_key("obsoleteFileTimeInterval"):
                obsoleteFileTimeInterval = int(topicDescription["obsoleteFileTimeInterval"])
            if topicDescription.has_key("reservedFileCount"):
                reservedFileCount = int(topicDescription["reservedFileCount"])
            if topicDescription.has_key("deleteTopicData"):
                deleteTopicData = topicDescription["deleteTopicData"] == 'true'
            if topicDescription.has_key("partitionMinBufferSize"):
                partitionMinBufferSize = int(topicDescription["partitionMinBufferSize"])
            if topicDescription.has_key("partitionMaxBufferSize"):
                partitionMaxBufferSize = int(topicDescription["partitionMaxBufferSize"])
            if topicDescription.has_key("maxWaitTimeForSecurityCommit"):
                maxWaitTimeForSecurityCommit = int(topicDescription["maxWaitTimeForSecurityCommit"])
            if topicDescription.has_key("maxDataSizeForSecurityCommit"):
                maxDataSizeForSecurityCommit = int(topicDescription["maxDataSizeForSecurityCommit"])
            if topicDescription.has_key("compressMsg"):
                compressMsg = topicDescription["compressMsg"] == 'true'
            if topicDescription.has_key("compressThres"):
                compressThres = int(topicDescription["compressThres"])
            if topicDescription.has_key("dfsRoot"):
                dfsRoot = topicDescription["dfsRoot"]
            if topicDescription.has_key("extendDfsRoot"):
                extendDfsRoot = topicDescription["extendDfsRoot"]
            if topicDescription.has_key("topicGroup"):
                topicGroup = topicDescription["topicGroup"]
            if topicDescription.has_key("topicExpiredTime"):
                topicExpiredTime = int(topicDescription["topicExpiredTime"])
            if topicDescription.has_key("rangeCount"):
                rangeCnt = int(topicDescription["rangeCount"])
            if topicDescription.has_key("owners"):
                owners = topicDescription["owners"]
            if topicDescription.has_key("needSchema"):
                needSchema = topicDescription["needSchema"] == 'true'
                if topicDescription.has_key("schemaVersions"):
                    schemaVersions = topicDescription["schemaVersions"]
            if topicDescription.has_key("sealed"):
                sealed = topicDescription["sealed"] == 'true'
            if topicDescription.has_key("topicType"):
                if topicDescription.has_key("topicType") == "TOPIC_TYPE_NORMAL":
                    topicType = TOPIC_TYPE_NORMAL
                elif topicDescription.has_key("topicType") == "TOPIC_TYPE_PHYSIC":
                    topicType = TOPIC_TYPE_PHYSIC
                elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC":
                    topicType = TOPIC_TYPE_LOGIC
                elif topicDescription.has_key("topicType") == "TOPIC_TYPE_LOGIC_PHYSIC":
                    topicType = TOPIC_TYPE_LOGIC_PHYSIC
            if topicDescription.has_key("physicTopicLst"):
                physicTopicLst = topicDescription["physicTopicLst"]
            if topicDescription.has_key("enableTTLDel"):
                enableTTLDel = topicDescription["enableTTLDel"] == 'true'
            if self.group is not None:
                topicGroup = self.group
            if hasError and not self.continueAdd:
                failTopicList[topicName] = topicDescription
                continue
            if self.deleteExist:
                ret, reponse, errorMsg = self.adminDelegate.deleteTopic(topicName)
                if not ret:
                    if errorMsg.find('ERROR_ADMIN_TOPIC_NOT_EXISTED') == -1:
                        print "[%s] Delete topic %s failed! error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg)
                        hasError = True
                        continue
                else:
                    print "[%s] Delete topic %s success!" % (time.strftime(timeFormat, time.localtime()), topicName)
                    time.sleep(2)

            ret, response, errorMsg = self.adminDelegate.createTopic(
                topicName=topicName, partCnt=partitionCount, rangeCnt=rangeCnt,
                partMinBufSize=partitionMinBufferSize, partMaxBufSize=partitionMaxBufferSize,
                partResource=resource, partLimit=partitionLimit,
                topicMode=topicMode, needFieldFilter=needFieldFilter,
                obsoleteFileTimeInterval=obsoleteFileTimeInterval,
                reservedFileCount=reservedFileCount,
                deleteTopicData=deleteTopicData,
                securityCommitTime=maxWaitTimeForSecurityCommit,
                securityCommitData=maxDataSizeForSecurityCommit,
                compressMsg=compressMsg,
                compressThres=compressThres,
                dfsRoot=dfsRoot, topicGroup=topicGroup, extendDfsRoot=extendDfsRoot,
                expiredTime=topicExpiredTime, owners=owners, needSchema=needSchema,
                schemaVersions=schemaVersions, sealed=sealed, topicType=topicType,
                physicTopicLst=physicTopicLst, enableTTLDel=enableTTLDel)
            if not ret:
                print "[%s] Add topic [%s] Failed, error msg [%s]" % (time.strftime(timeFormat, time.localtime()), topicName, errorMsg)
                failTopicList[topicName] = topicDescription
                hasError = True
                continue

            if self.timeout is not None:
                print "wait topic ready."
                if not self._waitTopicReady(self.timeout, topicName):
                    print "wait topic %s ready failed" % topicName
                    failTopicList[topicName] = topicDescription
                    hasError = True
                    continue
            print "[%s] Add topic [%s] success!" % (time.strftime(timeFormat, time.localtime()), topicName)
        if len(failTopicList) > 0:
            self.addFailTopic(failTopicList)
        return "", "", 0

    def addFailTopic(self, failTopicList):
        failTopic = json_wrapper.write(failTopicList, format=True)
        return self.localFileUtil.write(failTopic, self.failTopicFile, 'w')

    def _waitTopicReady(self, timeout, topicName):
        while timeout > 0:
            if self._isTopicReady(topicName):
                print "topic [%s] is ready" % topicName
                return "", "", 0
            timeout -= 0.1
            time.sleep(0.1)
        return "", "wait topic ready timeout.", -1

    def _isTopicReady(self, topicName):
        ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName)
        if ret:
            if response.HasField(swift_common_define.PROTO_TOPIC_INFO):
                topicInfo = response.topicInfo
                for partitionInfo in topicInfo.partitionInfos:
                    if partitionInfo.status != PARTITION_STATUS_RUNNING:
                        print "ERROR: partition info status: %s, response: %s" % \
                            (str(partitionInfo), str(response))
                        return False
            else:
                print "ERROR: response does not have topicInfo field! response: %s" % str(response)
                return False
        else:
            print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg)
            return False
        return True


class UpdateTopicsCmd(base_cmd.BaseCmd):
    '''
    swift {ut|updatetopics}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-f local_file_name     | --file=local_file_name }
       {-w time_out            | --timeout=time_out}

    options:
       -z zookeeper_address,  --zookeeper=zookeeper_address   : required, zookeeper root address
       -f local_file_name,    --file=local_file_name          : required, local file name
       -w time_out,           --timeout=time_out              : optional, timeout for one topic

Example:
    swift ut -z zfs://10.250.12.23:1234/root -f a.json -w 30
    '''

    def addOptions(self):
        super(UpdateTopicsCmd, self).addOptions()
        self.parser.add_option('-f', '--file', action='store', dest='fileName')
        self.parser.add_option('-w', '--timeout', type='int',
                               action='store', dest='timeout')

    def checkOptionsValidity(self, options):
        if not super(UpdateTopicsCmd, self).checkOptionsValidity(options):
            return False
        if options.fileName is None:
            print "ERROR: file name must be specified!"
            return False
        return True

    def initMember(self, options):
        super(UpdateTopicsCmd, self).initMember(options)
        self.fileName = options.fileName
        self.timeout = options.timeout
        self.localFileUtil = local_file_util.LocalFileUtil()
        self.failTopicFile = './fail_topic_list_for_add'

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        content = self.localFileUtil.cat(self.fileName)
        topicMap = json_wrapper.read(content)
        topics = sorted(topicMap.items(), key=lambda x: x[1])
        for (topic, topicDescription) in topics:
            if topic != topicDescription["topicName"]:
                print "error, topic name not equal [%s %s]" % (topic, topicDescription["topicName"])
                return "", "", 0
            ret, response, errorMsg = self.adminDelegate.deleteTopic(topic)
            if not ret:
                print "delete topic [%s] error, %s" % (topic, errorMsg)
            else:
                print "delete topic [%s] success." % topic
            time.sleep(2)  # wait partition unload
            if not self.addOneTopic(topicDescription):
                print "not all topic updated, retry!"
                return "", "", 0
        return "", "", 0

    def addOneTopic(self, topicDescription):
        partitionCount = None
        resource = None
        partitionLimit = None
        topicMode = None
        needFieldFilter = None
        obsoleteFileTimeInterval = None
        reservedFileCount = None
        deleteTopicData = None
        partitionMinBufferSize = None
        partitionMaxBufferSize = None
        maxWaitTimeForSecurityCommit = None
        maxDataSizeForSecurityCommit = None
        compressMsg = None
        compressThres = None
        dfsRoot = None
        topicGroup = None
        extendDfsRoot = None
        if topicDescription.has_key("topicName"):
            topicName = topicDescription["topicName"]
            if topicDescription.has_key("partitionCount"):
                partitionCount = int(topicDescription["partitionCount"])
            if topicDescription.has_key("resource"):
                resource = int(topicDescription["resource"])
            if topicDescription.has_key("partitionLimit"):
                partitionLimit = int(topicDescription["partitionLimit"])
            if topicDescription.has_key("topicMode"):
                if topicDescription["topicMode"] == 'TOPIC_MODE_NORMAL':
                    topicMode = TOPIC_MODE_NORMAL
                elif topicDescription["topicMode"] == 'TOPIC_MODE_SECURITY':
                    topicMode = TOPIC_MODE_SECURITY
            if topicDescription.has_key("needFieldFilter"):
                needFieldFilter = topicDescription["needFieldFilter"] == 'true'
            if topicDescription.has_key("obsoleteFileTimeInterval"):
                obsoleteFileTimeInterval = int(topicDescription["obsoleteFileTimeInterval"])
            if topicDescription.has_key("reservedFileCount"):
                reservedFileCount = int(topicDescription["reservedFileCount"])
            if topicDescription.has_key("deleteTopicData"):
                deleteTopicData = topicDescription["deleteTopicData"] == 'true'
            if topicDescription.has_key("partitionMinBufferSize"):
                partitionMinBufferSize = int(topicDescription["partitionMinBufferSize"])
            if topicDescription.has_key("partitionMaxBufferSize"):
                partitionMaxBufferSize = int(topicDescription["partitionMaxBufferSize"])
            if topicDescription.has_key("maxWaitTimeForSecurityCommit"):
                maxWaitTimeForSecurityCommit = int(topicDescription["maxWaitTimeForSecurityCommit"])
            if topicDescription.has_key("maxDataSizeForSecurityCommit"):
                maxDataSizeForSecurityCommit = int(topicDescription["maxDataSizeForSecurityCommit"])
            if topicDescription.has_key("compressMsg"):
                compressMsg = topicDescription["compressMsg"] == 'true'
            if topicDescription.has_key("compressThres"):
                compressThres = int(topicDescription["compressThres"])
            if topicDescription.has_key("dfsRoot"):
                dfsRoot = topicDescription["dfsRoot"]
            if topicDescription.has_key("extendDfsRoot"):
                extendDfsRoot = topicDescription["extendDfsRoot"]
            if topicDescription.has_key("topicGroup"):
                topicGroup = topicDescription["topicGroup"]

        ret, response, errorMsg = self.adminDelegate.createTopic(
            topicName=topicName, partCnt=partitionCount, rangeCnt=self.rangeCnt,
            partMinBufSize=partitionMinBufferSize, partMaxBufSize=partitionMaxBufferSize,
            partResource=resource, partLimit=partitionLimit,
            topicMode=topicMode, needFieldFilter=needFieldFilter,
            obsoleteFileTimeInterval=obsoleteFileTimeInterval,
            reservedFileCount=reservedFileCount,
            deleteTopicData=deleteTopicData,
            securityCommitTime=maxWaitTimeForSecurityCommit,
            securityCommitData=maxDataSizeForSecurityCommit,
            compressMsg=compressMsg,
            compressThres=compressThres,
            dfsRoot=dfsRoot, topicGroup=topicGroup, extendDfsRoot=extendDfsRoot)
        if not ret:
            print "Add topic [%s] Failed!" % topicName
            print errorMsg
            return False
        if self.timeout is not None:
            print "wait topic [%s] ready." % topicName
            if not self._waitTopicReady(self.timeout, topicName):
                print "wait topic %s ready failed" % topicName
                return False
        print "update topic %s success!" % topicName
        return True

    def _waitTopicReady(self, timeout, topicName):
        while timeout > 0:
            if self._isTopicReady(topicName):
                print "topic [%s] is ready" % topicName
                return "", "", 0
            timeout -= 0.1
            time.sleep(0.1)
        return "", "wait topic ready timeout.", -1

    def _isTopicReady(self, topicName):
        ret, response, errMsg = self.adminDelegate.getTopicInfo(topicName)
        if ret:
            if response.HasField(swift_common_define.PROTO_TOPIC_INFO):
                topicInfo = response.topicInfo
                for partitionInfo in topicInfo.partitionInfos:
                    if partitionInfo.status != PARTITION_STATUS_RUNNING:
                        print "ERROR: partition info status: %s, response: %s" % \
                            (str(partitionInfo), str(response))
                        return False
            else:
                print "ERROR: response does not have topicInfo field! response: %s" % str(response)
                return False
        else:
            print "get topic info failed: %s, errorMsg: %s" % (topicName, errMsg)
            return False
        return True


class TransferPartitionCmd(base_cmd.BaseCmd):
    '''
    swift {tp|transferpartition}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-t transfer_info       | --transfer=transfer_info }
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address     : required, zookeeper root address
       -t transfer_info,         --transfer=transfer_info          : required, topic name, eg: role_name1:0.3,role_name2:0.1
       -g group_name,            --group_name=group_name           : opitional, group name, eg: swift_mainse

Example:
    swift tp -z zfs://10.250.12.23:1234/root -t group1##broker_1_0:0.3;group2##broker_1_0:0.2
    swift tp -z zfs://10.250.12.23:1234/root -t all
    swift tp -z zfs://10.250.12.23:1234/root -t all -g swift_mainse
    '''

    def addOptions(self):
        super(TransferPartitionCmd, self).addOptions()
        self.parser.add_option('-t', '--transfer', action='store', dest='transferInfo')
        self.parser.add_option('-g', '--group_name', action='store', dest='groupName')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TransferPartitionCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.transferInfo is None:
            errMsg = "ERROR: transfer info must be specified!"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(TransferPartitionCmd, self).initMember(options)
        self.groupName = options.groupName
        self.transferInfo = options.transferInfo

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.transferPartition(
            self.transferInfo, self.groupName)
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print "transfer partition failed!"
            print errorMsg
            return "", "", 1
        print "transfer partition success!"
        return "", "", 0


class ChangeSlotCmd(base_cmd.BaseCmd):
    '''
    swift {cs|changeSlot}
       {-z zookeeper_address   | --zookeeper=zookeeper_address }
       {-r role_names       | --rolenames=role_names }
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address     : required, zookeeper root address
       -r role_names,         --rolenames=role_names          : required, role name, eg: default##broker_4_0,tisplus##broker_0_0

Example:
    swift cs -z zfs://10.250.12.23:1234/root -r group1##broker_1_0,group2##broker_2_0
    '''

    def addOptions(self):
        super(ChangeSlotCmd, self).addOptions()
        self.parser.add_option('-r', '--rolenames', action='store', dest='roleNames')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(ChangeSlotCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.roleNames is None:
            errMsg = "ERROR: roleNames must be specified!"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(ChangeSlotCmd, self).initMember(options)
        self.roleNames = options.roleNames

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.changeSlot(self.roleNames)
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print "change slot failed!"
            print errorMsg
            return "", "", 1
        print "change slot success!"
        return "", "", 0


class RegisterSchemaCmd(base_cmd.BaseCmd):
    '''
    swift {regscm|registerSchema}
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address   : required, zookeeper root address
       -t topic_name,            --topic_name                    : required, topic name
       -s schema,                --schema                        : required, schema json string
       -v version,               --version                       : optional, user defined int version
       -c cover,                 --cover                         : optional, to cover old versions or not if schema versions full, default is false

Example:
    swift regscm -z zfs://10.250.12.23:1234/root -t mainse -s "{\"topic\":\"mainse\",\"fields\":[{\"name\":\"nid\",\"type\":\"string\"}]}"
    '''

    def addOptions(self):
        super(RegisterSchemaCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicName')
        self.parser.add_option('-s', '--schema', action='store', dest='schema')
        self.parser.add_option('-v', '--version', action='store', dest='version')
        self.parser.add_option('-c', '--cover', action='store_true', dest='cover', default=False)

    def checkOptionsValidity(self, options):
        ret, errMsg = super(RegisterSchemaCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.topicName is None:
            errMsg = "ERROR: topic name must be specified!"
            return False, errMsg
        if options.schema is None:
            errMsg = "ERROR: schema must be specified!"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(RegisterSchemaCmd, self).initMember(options)
        self.schema = options.schema
        self.topicName = options.topicName
        self.version = options.version
        self.cover = options.cover

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.registerSchema(self.topicName, self.schema,
                                                                    self.version, self.cover)
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print "register schema failed!"
            print errorMsg
            return "", "", 1
        print "register schema[%d] success!" % response.version
        return response.version, "", 0


class GetSchemaCmd(base_cmd.BaseCmd):
    '''
    swift {gscm|getSchemaCmd}
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address   : required, zookeeper root address
       -t topic_name,            --topic_name                    : required, topic name
       -v version,               --version                       : required, schema version

Example:
    swift gscm -z zfs://10.250.12.23:1234/root -t mainse -v 1234
    '''

    def addOptions(self):
        super(GetSchemaCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicName')
        self.parser.add_option('-v', '--version', action='store', dest='version')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(GetSchemaCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.topicName is None:
            errMsg = "ERROR: topic name must be specified!"
            return False, errMsg
        if options.version is None:
            errMsg = "ERROR: version must be specified!"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(GetSchemaCmd, self).initMember(options)
        self.topicName = options.topicName
        self.version = options.version

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.getSchema(self.topicName, self.version)
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print "get schema failed!"
            print errorMsg
            return "", "", 1
        print response.schemaInfo
        return response.schemaInfo, "", 0


class GetTopicRWTimeCmd(base_cmd.BaseCmd):
    '''
    swift {gtrw|getopicrwtime}
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address   : required, zookeeper root address
       -t topic_name,            --topic_name                    : optional, topic name

Example:
    swift gtrw -z zfs://10.250.12.23:1234/root -t mainse
    '''

    def addOptions(self):
        super(GetTopicRWTimeCmd, self).addOptions()
        self.parser.add_option('-t', '--topic', action='store', dest='topicName')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(GetTopicRWTimeCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(GetTopicRWTimeCmd, self).initMember(options)
        self.topicName = options.topicName

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.getTopicRWTime(self.topicName)
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print errorMsg
            return "", errorMsg, 1
        print response
        return response, "", 0


class GetLastDeletedNoUseTopicCmd(base_cmd.BaseCmd):
    '''
    swift {gtldnt|getlastdeletednousetopic}
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address   : required, zookeeper root address

Example:
    swift gtldnt -z zfs://10.250.12.23:1234/root
    '''

    def addOptions(self):
        super(GetLastDeletedNoUseTopicCmd, self).addOptions()

    def checkOptionsValidity(self, options):
        ret, errMsg = super(GetLastDeletedNoUseTopicCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(GetLastDeletedNoUseTopicCmd, self).initMember(options)

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.getLastDeletedNoUseTopic()
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print errorMsg
            return "", errorMsg, 1
        else:
            print response
            return response, "", 0


class GetDeletedNoUseTopicCmd(base_cmd.BaseCmd):
    '''
    swift {gtdnt|getdeletednousetopic}
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address   : required, zookeeper root address
       -f file_name,             --file_name                     : required, deleted topic file name

Example:
    swift gtdnt -z zfs://10.250.12.23:1234/root -f 163234200
    '''

    def addOptions(self):
        super(GetDeletedNoUseTopicCmd, self).addOptions()
        self.parser.add_option('-f', '--file_name', action='store', dest='fileName')

    def checkOptionsValidity(self, options):
        ret, errMsg = super(GetDeletedNoUseTopicCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        if options.fileName is None:
            errMsg = "ERROR: file name must be specified!"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(GetDeletedNoUseTopicCmd, self).initMember(options)
        self.fileName = options.fileName

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.getDeletedNoUseTopic(self.fileName)
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print errorMsg
            return "", errorMsg, 1
        else:
            print response
            return response, "", 0


class GetDeletedNoUseTopicFilesCmd(base_cmd.BaseCmd):
    '''
    swift {gtdntf|getdeletednousetopicfiles}
    options:
       -z zookeeper_address,     --zookeeper=zookeeper_address   : required, zookeeper root address

Example:
    swift gtdntf -z zfs://10.250.12.23:1234/root
    '''

    def addOptions(self):
        super(GetDeletedNoUseTopicFilesCmd, self).addOptions()

    def checkOptionsValidity(self, options):
        ret, errMsg = super(GetDeletedNoUseTopicFilesCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(GetDeletedNoUseTopicFilesCmd, self).initMember(options)

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errorMsg = self.adminDelegate.getDeletedNoUseTopicFiles()
        if self.fromApi:
            return ret, response, errorMsg
        if not ret:
            print errorMsg
            return "", errorMsg, 1
        else:
            print response
            return response, "", 0


class TopicAclManageCmd(base_cmd.BaseCmd):
    '''
    swift {tam|topicaclmanage}
       {-z zookeeper_address           | --zookeeper=zookeeper_address }
       {-a access_operator             | --accessop=access_operator }
       {-t topic_name                  | --topic=topic_name }
       {-u access_id                   | --accessid=access_id }
       {-k access_key                  | --accesskey=access_key }
       {-p access_priority             | --accessprior=access_priority }
       {-y access_type                 | --accesstype=access_type }
       {-s check_status                | --checkstatus=check_status }

    options:
       -z zookeeper_address,      --zookeeper=zookeeper_address         : required, zookeeper root address
       -a access_operator,        --accessop=access_operator            : required, operator to be taken, should be one of ['add', 'delete', 'clear', 'update', 'key_update', 'priority_update',  'type_update', 'status', 'list', 'listall']
       -t topic_name,             --topic=topic_name                    : required except accessop='listall', topic name to be operated
       -u access_id,              --accessid=access_id                  : required when accessop='add', '*update', 'delete', access id to be operated
       -k access_key,             --accesskey=access_key                : required when accessop='add', 'key_update', new access key to be set
       -p access_priority,        --accessprior=access_priority         : required when accessop='priority_update', new priority to be set, should be one of ['p0', 'p1',...,'p5']
       -y access_type,            --accesstype=access_type              : required when accessop='type_update', new access type to be set, should be one of ['read_only', 'read_write', 'none']
       -s check_status,           --checkstatus=check_status            : required when accessop='status', check status to be set, should be one of ['on', 'off']
    '''

    def addOptions(self):
        super(TopicAclManageCmd, self).addOptions()
        self.parser.add_option('-a', '--accessop',
                               type='choice',
                               action='store',
                               dest='accessOp',
                               choices=['add',
                                        'delete',
                                        'clear',
                                        'update',
                                        'key_update',
                                        'priority_update',
                                        'type_update',
                                        'status',
                                        'list',
                                        'listall'])
        self.parser.add_option('-t', '--topic', action='store', dest="topicName")
        self.parser.add_option('-u', '--accessid', action='store', dest='accessId')
        self.parser.add_option('-k', '--accesskey', action='store', dest='accessKey')
        self.parser.add_option('-p',
                               '--accessprior',
                               type='choice',
                               action='store',
                               dest='accessPriority',
                               choices=['p0',
                                        'p1',
                                        'p2',
                                        'p3',
                                        'p4',
                                        'p5'])
        self.parser.add_option('-y',
                               '--accesstype',
                               type='choice',
                               action='store',
                               dest='accessType',
                               choices=['read_only',
                                        'read_write',
                                        'none'])
        self.parser.add_option('-s',
                               '--check_status',
                               type='choice',
                               action='store',
                               dest='checkStatus',
                               choices=['on', 'off'])

    def checkOptionsValidity(self, options):
        ret, errMsg = super(TopicAclManageCmd, self).checkOptionsValidity(options)
        if not ret:
            return False, errMsg

        if options.accessOp is None:
            errMsg = "ERROR: accessop must be specified"
            return False, errMsg

        if options.topicName is None and options.accessOp != 'listall':
            errMsg = "ERROR: topic must be specified when %s" % options.accessOp
            return False, errMsg

        if options.accessId is None and "update" in options.accessOp:
            errMsg = "ERROR: accessid must be specified when %s" % options.accessOp
            return False, errMsg

        if options.accessId is None and options.accessOp == 'delete':
            errMsg = "ERROR: accessid must be specified when delete"
            return False, errMsg

        if options.accessOp == 'add' and (options.accessId is None or options.accessKey is None):
            errMsg = "ERROR: accessid/accesskey must be specified when add"
            return False, errMsg

        if options.accessOp == 'key_update' and options.accessKey is None:
            errMsg = "ERROR: accesskey must be specified when key_update"
            return False, errMsg

        if options.accessOp == 'priority_update' and options.accessPriority is None:
            errMsg = "ERROR: accessprior must be specified when priority_update"
            return False, errMsg

        if options.accessOp == 'type_update' and options.accessType is None:
            errMsg = "ERROR: accesstype must be specified when type_update"
            return False, errMsg

        if options.accessOp == 'status' and options.checkStatus is None:
            errMsg = "ERROR: checkstatus must be specified when status"
            return False, errMsg
        return True, ''

    def initMember(self, options):
        super(TopicAclManageCmd, self).initMember(options)
        if options.accessOp == 'add':
            self.accessOp = ADD_TOPIC_ACCESS
        elif options.accessOp == 'delete':
            self.accessOp = DELETE_TOPIC_ACCESS
        elif options.accessOp == 'update':
            self.accessOp = UPDATE_TOPIC_ACCESS
        elif options.accessOp == 'key_update':
            self.accessOp = UPDATE_TOPIC_ACCESS_KEY
        elif options.accessOp == 'priority_update':
            self.accessOp = UPDATE_TOPIC_ACCESS_PRIORITY
        elif options.accessOp == 'type_update':
            self.accessOp = UPDATE_TOPIC_ACCESS_TYPE
        elif options.accessOp == 'status':
            self.accessOp = UPDATE_TOPIC_AUTH_STATUS
        elif options.accessOp == 'clear':
            self.accessOp = CLEAR_TOPIC_ACCESS
        elif options.accessOp == 'list':
            self.accessOp = LIST_ONE_TOPIC_ACCESS
        elif options.accessOp == 'listall':
            self.accessOp = LIST_ALL_TOPIC_ACCESS

        self.topicName = options.topicName
        self.accessId = options.accessId
        self.accessKey = options.accessKey
        self.accessPriority = None
        if options.accessPriority == 'p0':
            self.accessPriority = ACCESS_PRIORITY_P0
        elif options.accessPriority == 'p1':
            self.accessPriority = ACCESS_PRIORITY_P1
        elif options.accessPriority == 'p2':
            self.accessPriority = ACCESS_PRIORITY_P2
        elif options.accessPriority == 'p3':
            self.accessPriority = ACCESS_PRIORITY_P3
        elif options.accessPriority == 'p4':
            self.accessPriority = ACCESS_PRIORITY_P4
        elif options.accessPriority == 'p5':
            self.accessPriority = ACCESS_PRIORITY_P5

        self.accessType = None
        if options.accessType == 'read_only':
            self.accessType = TOPIC_ACCESS_READ
        elif options.accessType == 'read_write':
            self.accessType = TOPIC_ACCESS_READ_WRITE
        elif options.accessType == "none":
            self.accessType = TOPIC_ACCESS_NONE

        self.checkStatus = None
        if options.checkStatus == 'on':
            self.checkStatus = TOPIC_AUTH_CHECK_ON
        else:
            self.checkStatus = TOPIC_AUTH_CHECK_OFF

    def buildDelegate(self):
        self.adminDelegate = swift_admin_delegate.SwiftAdminDelegate(
            self.fileUtil, self.zfsAddress, self.adminLeader, self.options.username, self.options.passwd)
        return True

    def run(self):
        ret, response, errMsg = self.adminDelegate.topicAclManage(
            accessOp=self.accessOp, topicName=self.topicName,
            accessId=self.accessId, accessKey=self.accessKey,
            accessPriority=self.accessPriority,
            accessType=self.accessType,
            checkStatus=self.checkStatus)

        if not ret:
            print "topic acl manage failed"
            return ret, response, 1

        print "topic acl manage success"
        if self.accessOp == LIST_ONE_TOPIC_ACCESS or self.accessOp == LIST_ALL_TOPIC_ACCESS:
            self.parseAndPrintTopicAcl(response)

        return ret, response, 0

    def parseAndPrintTopicAcl(self, response):
        topicAclDatas = response.topicAclDatas.allTopicAclData
        toFormatStr = '{"topicName":"%s", "checkStatus":"%s", "topicAccessInfos":[%s]}'
        for topicAclData in topicAclDatas:
            accessInfoList = []
            accessInfoStr = '{"accessId":"%s", "accessKey":"%s", "priority":"%s", "accessType":"%s"}'
            for topicAccessInfo in topicAclData.topicAccessInfos:
                oneItem = accessInfoStr % (topicAccessInfo.accessAuthInfo.accessId,
                                           topicAccessInfo.accessAuthInfo.accessKey,
                                           self._accessPriorityToStr(topicAccessInfo.accessPriority),
                                           self._accessTypeToStr(topicAccessInfo.accessType))
                accessInfoList.append(oneItem)
            accessInfosStr = ",".join(accessInfoList)
            oneItem = toFormatStr % (topicAclData.topicName,
                                     self._checkStatusToStr(topicAclData.checkStatus),
                                     accessInfosStr)
            print oneItem

    def _accessPriorityToStr(self, priority):
        if priority == ACCESS_PRIORITY_P0:
            return "p0"
        elif priority == ACCESS_PRIORITY_P1:
            return "p1"
        elif priority == ACCESS_PRIORITY_P2:
            return "p2"
        elif priority == ACCESS_PRIORITY_P3:
            return "p3"
        elif priority == ACCESS_PRIORITY_P4:
            return "p4"
        elif priority == ACCESS_PRIORITY_P5:
            return "p5"

    def _accessTypeToStr(self, type):
        if type == TOPIC_ACCESS_NONE:
            return "none"
        elif type == TOPIC_ACCESS_READ:
            return "read_only"
        elif type == TOPIC_ACCESS_READ_WRITE:
            return "read_write"

    def _checkStatusToStr(self, status):
        if status == TOPIC_AUTH_CHECK_ON:
            return "on"
        else:
            return "off"
