admin-cli/executor/compaction.go (150 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package executor import ( "encoding/json" "fmt" ) const userSpecifiedCompaction = "user_specified_compaction" type CompactionParams struct { OperationType string UpdateTTLType string TimeValue uint HashkeyPattern string HashkeyMatch string SortkeyPattern string SortkeyMatch string StartTTL int64 StopTTL int64 } func SetCompaction(client *Client, tableName string, params *CompactionParams) error { json, err := generateCompactionEnv(client, tableName, params) if err != nil { return err } if err = SetAppEnv(client, tableName, userSpecifiedCompaction, json); err != nil { return err } return nil } // json helpers type compactionRule struct { RuleType string `json:"type"` Params string `json:"params"` } type compactionOperation struct { OpType string `json:"type"` Params string `json:"params"` Rules []compactionRule `json:"rules"` } type updateTTLParams struct { UpdateTTLOpType string `json:"type"` Value uint `json:"value"` } type compactionOperations struct { Ops []compactionOperation `json:"ops"` } type keyRuleParams struct { Pattern string `json:"pattern"` MatchType string `json:"match_type"` } type timeRangeRuleParams struct { StartTTL uint32 `json:"start_ttl"` StopTTL uint32 `json:"stop_ttl"` } func generateCompactionEnv(client *Client, tableName string, params *CompactionParams) (string, error) { var err error var operation = &compactionOperation{} switch params.OperationType { case "delete": operation.OpType = "COT_DELETE" case "update-ttl": if operation, err = generateUpdateTTLOperation(params.UpdateTTLType, params.TimeValue); err != nil { return "", err } default: return "", fmt.Errorf("invalid operation type {%s}", params.OperationType) } if operation.Rules, err = generateRules(params); err != nil { return "", err } if len(operation.Rules) == 0 { return "", fmt.Errorf("no rules specified") } compactionJSON, err := GetAppEnv(client, tableName, userSpecifiedCompaction) if err != nil { return "", err } var operations compactionOperations if compactionJSON != "" { _ = json.Unmarshal([]byte(compactionJSON), &operations) } operations.Ops = append(operations.Ops, *operation) res, _ := json.Marshal(operations) return string(res), nil } var updateTTLTypeMapping = map[string]string{ "from-now": "UTOT_FROM_NOW", "from-current": "UTOT_FROM_CURRENT", "timestamp": "UTOT_TIMESTAMP", } func generateUpdateTTLOperation(updateTTLType string, timeValue uint) (*compactionOperation, error) { var params updateTTLParams params.Value = timeValue ok := false if params.UpdateTTLOpType, ok = updateTTLTypeMapping[updateTTLType]; !ok { return nil, fmt.Errorf("not support the type: %s", updateTTLType) } paramsBytes, _ := json.Marshal(params) return &compactionOperation{ OpType: "COT_UPDATE_TTL", Params: string(paramsBytes), }, nil } func generateRules(params *CompactionParams) ([]compactionRule, error) { var res []compactionRule var err error if params.HashkeyPattern != "" { var rule *compactionRule if rule, err = generateKeyRule("FRT_HASHKEY_PATTERN", params.HashkeyPattern, params.HashkeyMatch); err != nil { return nil, err } res = append(res, *rule) } if params.SortkeyPattern != "" { var rule *compactionRule if rule, err = generateKeyRule("FRT_SORTKEY_PATTERN", params.SortkeyPattern, params.SortkeyMatch); err != nil { return nil, err } res = append(res, *rule) } if params.StartTTL >= 0 && params.StopTTL >= 0 { res = append(res, generateTTLRangeRule(params.StartTTL, params.StopTTL)) } return res, nil } var matchTypeMapping = map[string]string{ "anywhere": "SMT_MATCH_ANYWHERE", "prefix": "SMT_MATCH_PREFIX", "postfix": "SMT_MATCH_POSTFIX", } func generateKeyRule(ruleType string, pattern string, match string) (*compactionRule, error) { var params keyRuleParams params.Pattern = pattern ok := false if params.MatchType, ok = matchTypeMapping[match]; !ok { return nil, fmt.Errorf("invalid match type {%s}", match) } paramsBytes, _ := json.Marshal(params) return &compactionRule{ RuleType: ruleType, Params: string(paramsBytes), }, nil } func generateTTLRangeRule(startTTL int64, stopTTL int64) compactionRule { var params timeRangeRuleParams params.StartTTL = uint32(startTTL) params.StopTTL = uint32(stopTTL) paramsBytes, _ := json.Marshal(params) return compactionRule{ RuleType: "FRT_TTL_RANGE", Params: string(paramsBytes), } }