in pkg/kv/etcd3/etcd3.go [124:198]
func (d *etcd3Driver) List(ctx context.Context, basePath string) ([]*kv.Pair, error) {
log.For(ctx).Debug("etcd3: Try to list keys", zap.String("prefix", basePath))
var (
results = []*kv.Pair{}
lastKey string
)
for {
// Check if operation is ended
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Prepare query options
opts := []clientv3.OpOption{
clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend),
clientv3.WithLimit(ListBatchSize),
}
// If lastkey is defined set the cursor
if lastKey != "" {
opts = append(opts, clientv3.WithFromKey())
basePath = lastKey
}
log.For(ctx).Debug("etcd3: Get all keys", zap.String("key", basePath))
// Retrieve key value
resp, err := d.client.KV.Get(ctx, d.normalize(basePath), opts...)
if err != nil {
return nil, fmt.Errorf("etcd3: unable to retrieve '%s' from base path: %w", basePath, err)
}
if resp == nil {
return nil, fmt.Errorf("etcd3: got nil response for '%s'", basePath)
}
// Exit on empty result
if len(resp.Kvs) == 0 {
log.For(ctx).Debug("etcd3: No more result, stop.")
break
}
// Unpack values
for _, item := range resp.Kvs {
log.For(ctx).Debug("etcd3: Unpack result", zap.String("key", string(item.Key)))
// Skip first if lastKey is defined
if lastKey != "" && bytes.Equal(item.Key, []byte(lastKey)) {
continue
}
results = append(results, &kv.Pair{
Key: string(item.Key),
Value: item.Value,
Version: uint64(item.Version),
})
}
// No need to paginate
if len(resp.Kvs) < ListBatchSize {
break
}
// Retrieve last key
lastKey = string(resp.Kvs[len(resp.Kvs)-1].Key)
}
// Raise keynotfound if no result.
if len(results) == 0 {
return nil, kv.ErrKeyNotFound
}
// No error
return results, nil
}