pkg/kv/etcd3/etcd3.go (140 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package etcd3
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/elastic/harp/pkg/kv"
"github.com/elastic/harp/pkg/sdk/log"
)
const (
// ListBatchSize defines the pagination page size.
ListBatchSize = 50
)
type etcd3Driver struct {
client *clientv3.Client
}
func Store(client *clientv3.Client) kv.Store {
return &etcd3Driver{
client: client,
}
}
// -----------------------------------------------------------------------------
func (d *etcd3Driver) Get(ctx context.Context, key string) (*kv.Pair, error) {
// Retrieve key value
resp, err := d.client.KV.Get(ctx, d.normalize(key), clientv3.WithLimit(1))
if err != nil {
return nil, fmt.Errorf("etcd3: unable to retrieve '%s' key: %w", key, err)
}
if resp == nil {
return nil, fmt.Errorf("etcd3: got nil response for '%s'", key)
}
// Unpack result
if len(resp.Kvs) == 0 {
return nil, kv.ErrKeyNotFound
}
if len(resp.Kvs) > 1 {
return nil, fmt.Errorf("etcd3: '%s' key returned multiple result where only one is expected", key)
}
// No error
return &kv.Pair{
Key: string(resp.Kvs[0].Key),
Value: resp.Kvs[0].Value,
Version: uint64(resp.Kvs[0].Version),
}, nil
}
func (d *etcd3Driver) Put(ctx context.Context, key string, value []byte) error {
// Prepare a transaction
tx := d.client.Txn(ctx)
// Put a value
tx.Then(clientv3.OpPut(key, string(value)))
// Commit transaction
_, err := tx.Commit()
if err != nil {
return fmt.Errorf("etcd3: unable to put '%s' value: %w", key, err)
}
// No error
return nil
}
func (d *etcd3Driver) Delete(ctx context.Context, key string) error {
// Try to delete from store
resp, err := d.client.Delete(ctx, d.normalize(key))
if err != nil {
return fmt.Errorf("etcd3: unable to delete '%s' key: %w", key, err)
}
if resp == nil {
return fmt.Errorf("etcd3: got nil response for '%s'", key)
}
if resp.Deleted == 0 {
return kv.ErrKeyNotFound
}
// No error
return nil
}
func (d *etcd3Driver) Exists(ctx context.Context, key string) (bool, error) {
_, err := d.Get(ctx, key)
if err != nil {
if errors.Is(err, kv.ErrKeyNotFound) {
return false, nil
}
return false, fmt.Errorf("etcd3: unable to check key '%s' existence: %w", key, err)
}
// No error
return true, nil
}
func (d *etcd3Driver) List(ctx context.Context, basePath string) ([]*kv.Pair, error) {
log.For(ctx).Debug("etcd3: Try to list keys", zap.String("prefix", basePath))
var (
results = []*kv.Pair{}
lastKey string
)
for {
// Check if operation is ended
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Prepare query options
opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(ListBatchSize),
}
// If lastkey is defined set the cursor
if lastKey != "" {
opts = append(opts, clientv3.WithFromKey())
basePath = lastKey
}
log.For(ctx).Debug("etcd3: Get all keys", zap.String("key", basePath))
// Retrieve key value
resp, err := d.client.KV.Get(ctx, d.normalize(basePath), opts...)
if err != nil {
return nil, fmt.Errorf("etcd3: unable to retrieve '%s' from base path: %w", basePath, err)
}
if resp == nil {
return nil, fmt.Errorf("etcd3: got nil response for '%s'", basePath)
}
// Exit on empty result
if len(resp.Kvs) == 0 {
log.For(ctx).Debug("etcd3: No more result, stop.")
break
}
// Unpack values
for _, item := range resp.Kvs {
log.For(ctx).Debug("etcd3: Unpack result", zap.String("key", string(item.Key)))
// Skip first if lastKey is defined
if lastKey != "" && bytes.Equal(item.Key, []byte(lastKey)) {
continue
}
results = append(results, &kv.Pair{
Key: string(item.Key),
Value: item.Value,
Version: uint64(item.Version),
})
}
// No need to paginate
if len(resp.Kvs) < ListBatchSize {
break
}
// Retrieve last key
lastKey = string(resp.Kvs[len(resp.Kvs)-1].Key)
}
// Raise keynotfound if no result.
if len(results) == 0 {
return nil, kv.ErrKeyNotFound
}
// No error
return results, nil
}
func (d *etcd3Driver) Close() error {
// Skip if client instance is nil
if d.client == nil {
return nil
}
// Try to close client connection.
if err := d.client.Close(); err != nil {
return fmt.Errorf("etcd3: unable to close client connection: %w", err)
}
// No error
return nil
}
// -----------------------------------------------------------------------------
// Normalize the key for usage in Consul
func (d *etcd3Driver) normalize(key string) string {
key = kv.Normalize(key)
return strings.TrimPrefix(key, "/")
}