api/internal/core/storage/etcd.go (174 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storage
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/apisix/manager-api/internal/conf"
"github.com/apisix/manager-api/internal/log"
"github.com/apisix/manager-api/internal/utils"
"github.com/apisix/manager-api/internal/utils/runtime"
)
const (
// SkippedValueEtcdInitDir indicates the init_dir
// etcd event will be skipped.
SkippedValueEtcdInitDir = "init_dir"
// SkippedValueEtcdEmptyObject indicates the data with an
// empty JSON value {}, which may be set by APISIX,
// should be also skipped.
//
// Important: at present, {} is considered as invalid,
// but may be changed in the future.
SkippedValueEtcdEmptyObject = "{}"
)
var (
etcdClient *clientv3.Client
)
type EtcdV3Storage struct {
closing bool
client *clientv3.Client
}
func InitETCDClient(etcdConf *conf.Etcd) error {
config := clientv3.Config{
Endpoints: etcdConf.Endpoints,
DialTimeout: 5 * time.Second,
Username: etcdConf.Username,
Password: etcdConf.Password,
}
// mTLS
if etcdConf.MTLS != nil && etcdConf.MTLS.CaFile != "" &&
etcdConf.MTLS.CertFile != "" && etcdConf.MTLS.KeyFile != "" {
tlsInfo := transport.TLSInfo{
CertFile: etcdConf.MTLS.CertFile,
KeyFile: etcdConf.MTLS.KeyFile,
TrustedCAFile: etcdConf.MTLS.CaFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return err
}
config.TLS = tlsConfig
}
cli, err := clientv3.New(config)
if err != nil {
log.Errorf("init etcd failed: %s", err)
return fmt.Errorf("init etcd failed: %s", err)
}
etcdClient = cli
utils.AppendToClosers(Close)
return nil
}
func GenEtcdStorage() *EtcdV3Storage {
return &EtcdV3Storage{
client: etcdClient,
}
}
func Close() error {
if err := etcdClient.Close(); err != nil {
log.Errorf("etcd client close failed: %s", err)
return err
}
return nil
}
func (s *EtcdV3Storage) Get(ctx context.Context, key string) (string, error) {
resp, err := s.client.Get(ctx, key)
if err != nil {
log.Errorf("etcd get failed: %s", err)
return "", fmt.Errorf("etcd get failed: %s", err)
}
if resp.Count == 0 {
log.Warnf("key: %s is not found", key)
return "", fmt.Errorf("key: %s is not found", key)
}
return string(resp.Kvs[0].Value), nil
}
func (s *EtcdV3Storage) List(ctx context.Context, key string) ([]Keypair, error) {
resp, err := s.client.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
log.Errorf("etcd get failed: %s", err)
return nil, fmt.Errorf("etcd get failed: %s", err)
}
var ret []Keypair
for i := range resp.Kvs {
key := string(resp.Kvs[i].Key)
value := string(resp.Kvs[i].Value)
// Skip the data if its value is init_dir or {}
// during fetching-all phase.
//
// For more complex cases, an explicit function to determine if
// skippable would be better.
if value == SkippedValueEtcdInitDir || value == SkippedValueEtcdEmptyObject {
continue
}
data := Keypair{
Key: key,
Value: value,
}
ret = append(ret, data)
}
return ret, nil
}
func (s *EtcdV3Storage) Create(ctx context.Context, key, val string) error {
_, err := s.client.Put(ctx, key, val)
if err != nil {
log.Errorf("etcd put failed: %s", err)
return fmt.Errorf("etcd put failed: %s", err)
}
return nil
}
func (s *EtcdV3Storage) Update(ctx context.Context, key, val string) error {
_, err := s.client.Put(ctx, key, val)
if err != nil {
log.Errorf("etcd put failed: %s", err)
return fmt.Errorf("etcd put failed: %s", err)
}
return nil
}
func (s *EtcdV3Storage) BatchDelete(ctx context.Context, keys []string) error {
for i := range keys {
resp, err := s.client.Delete(ctx, keys[i])
if err != nil {
log.Errorf("delete etcd key[%s] failed: %s", keys[i], err)
return fmt.Errorf("delete etcd key[%s] failed: %s", keys[i], err)
}
if resp.Deleted == 0 {
log.Warnf("key: %s is not found", keys[i])
return fmt.Errorf("key: %s is not found", keys[i])
}
}
return nil
}
func (s *EtcdV3Storage) Watch(ctx context.Context, key string) <-chan WatchResponse {
eventChan := s.client.Watch(ctx, key, clientv3.WithPrefix())
ch := make(chan WatchResponse, 1)
go func() {
defer runtime.HandlePanic()
for event := range eventChan {
if event.Err() != nil {
log.Errorf("etcd watch error: key: %s err: %v", key, event.Err())
close(ch)
return
}
output := WatchResponse{
Canceled: event.Canceled,
}
for i := range event.Events {
key := string(event.Events[i].Kv.Key)
value := string(event.Events[i].Kv.Value)
// Skip the data if its value is init_dir or {}
// during watching phase.
//
// For more complex cases, an explicit function to determine if
// skippable would be better.
if value == SkippedValueEtcdInitDir || value == SkippedValueEtcdEmptyObject {
continue
}
e := Event{
Keypair: Keypair{
Key: key,
Value: value,
},
}
switch event.Events[i].Type {
case clientv3.EventTypePut:
e.Type = EventTypePut
case clientv3.EventTypeDelete:
e.Type = EventTypeDelete
}
output.Events = append(output.Events, e)
}
if output.Canceled {
log.Error("channel canceled")
output.Error = fmt.Errorf("channel canceled")
}
ch <- output
}
close(ch)
}()
return ch
}
func (s *EtcdV3Storage) GetClient() *clientv3.Client {
return s.client
}