admin-cli/executor/compaction.go (150 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package executor
import (
"encoding/json"
"fmt"
)
const userSpecifiedCompaction = "user_specified_compaction"
type CompactionParams struct {
OperationType string
UpdateTTLType string
TimeValue uint
HashkeyPattern string
HashkeyMatch string
SortkeyPattern string
SortkeyMatch string
StartTTL int64
StopTTL int64
}
func SetCompaction(client *Client, tableName string, params *CompactionParams) error {
json, err := generateCompactionEnv(client, tableName, params)
if err != nil {
return err
}
if err = SetAppEnv(client, tableName, userSpecifiedCompaction, json); err != nil {
return err
}
return nil
}
// json helpers
type compactionRule struct {
RuleType string `json:"type"`
Params string `json:"params"`
}
type compactionOperation struct {
OpType string `json:"type"`
Params string `json:"params"`
Rules []compactionRule `json:"rules"`
}
type updateTTLParams struct {
UpdateTTLOpType string `json:"type"`
Value uint `json:"value"`
}
type compactionOperations struct {
Ops []compactionOperation `json:"ops"`
}
type keyRuleParams struct {
Pattern string `json:"pattern"`
MatchType string `json:"match_type"`
}
type timeRangeRuleParams struct {
StartTTL uint32 `json:"start_ttl"`
StopTTL uint32 `json:"stop_ttl"`
}
func generateCompactionEnv(client *Client, tableName string, params *CompactionParams) (string, error) {
var err error
var operation = &compactionOperation{}
switch params.OperationType {
case "delete":
operation.OpType = "COT_DELETE"
case "update-ttl":
if operation, err = generateUpdateTTLOperation(params.UpdateTTLType, params.TimeValue); err != nil {
return "", err
}
default:
return "", fmt.Errorf("invalid operation type {%s}", params.OperationType)
}
if operation.Rules, err = generateRules(params); err != nil {
return "", err
}
if len(operation.Rules) == 0 {
return "", fmt.Errorf("no rules specified")
}
compactionJSON, err := GetAppEnv(client, tableName, userSpecifiedCompaction)
if err != nil {
return "", err
}
var operations compactionOperations
if compactionJSON != "" {
_ = json.Unmarshal([]byte(compactionJSON), &operations)
}
operations.Ops = append(operations.Ops, *operation)
res, _ := json.Marshal(operations)
return string(res), nil
}
var updateTTLTypeMapping = map[string]string{
"from-now": "UTOT_FROM_NOW",
"from-current": "UTOT_FROM_CURRENT",
"timestamp": "UTOT_TIMESTAMP",
}
func generateUpdateTTLOperation(updateTTLType string, timeValue uint) (*compactionOperation, error) {
var params updateTTLParams
params.Value = timeValue
ok := false
if params.UpdateTTLOpType, ok = updateTTLTypeMapping[updateTTLType]; !ok {
return nil, fmt.Errorf("not support the type: %s", updateTTLType)
}
paramsBytes, _ := json.Marshal(params)
return &compactionOperation{
OpType: "COT_UPDATE_TTL",
Params: string(paramsBytes),
}, nil
}
func generateRules(params *CompactionParams) ([]compactionRule, error) {
var res []compactionRule
var err error
if params.HashkeyPattern != "" {
var rule *compactionRule
if rule, err = generateKeyRule("FRT_HASHKEY_PATTERN", params.HashkeyPattern, params.HashkeyMatch); err != nil {
return nil, err
}
res = append(res, *rule)
}
if params.SortkeyPattern != "" {
var rule *compactionRule
if rule, err = generateKeyRule("FRT_SORTKEY_PATTERN", params.SortkeyPattern, params.SortkeyMatch); err != nil {
return nil, err
}
res = append(res, *rule)
}
if params.StartTTL >= 0 && params.StopTTL >= 0 {
res = append(res, generateTTLRangeRule(params.StartTTL, params.StopTTL))
}
return res, nil
}
var matchTypeMapping = map[string]string{
"anywhere": "SMT_MATCH_ANYWHERE",
"prefix": "SMT_MATCH_PREFIX",
"postfix": "SMT_MATCH_POSTFIX",
}
func generateKeyRule(ruleType string, pattern string, match string) (*compactionRule, error) {
var params keyRuleParams
params.Pattern = pattern
ok := false
if params.MatchType, ok = matchTypeMapping[match]; !ok {
return nil, fmt.Errorf("invalid match type {%s}", match)
}
paramsBytes, _ := json.Marshal(params)
return &compactionRule{
RuleType: ruleType,
Params: string(paramsBytes),
}, nil
}
func generateTTLRangeRule(startTTL int64, stopTTL int64) compactionRule {
var params timeRangeRuleParams
params.StartTTL = uint32(startTTL)
params.StopTTL = uint32(stopTTL)
paramsBytes, _ := json.Marshal(params)
return compactionRule{
RuleType: "FRT_TTL_RANGE",
Params: string(paramsBytes),
}
}