pkg/client/store.go (171 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package client
import (
"context"
"errors"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)
// ErrStoreTxReadOnly is an error when write actions are performed on a read-only transaction.
var ErrStoreTxReadOnly = errors.New("transaction is read-only")
// ErrStoreTxBroken is an error when an action on a transaction has failed causing the whole transaction to be broken.
var ErrStoreTxBroken = errors.New("transaction is broken")
// ErrStoreTxDiscarded is an error when Commit is called on an already discarded transaction.
var ErrStoreTxDiscarded = errors.New("transaction was already discarded")
// ErrStoreTxCommitted is an error action is performed on an already committed transaction.
var ErrStoreTxCommitted = errors.New("transaction was already committed")
// StoreTxClient provides actions allowed for a started transaction.
type StoreTxClient interface {
// GetKey fetches a value from its key in the store.
GetKey(ctx context.Context, name string) ([]byte, bool, error)
// SetKey sets a value for a key in the store.
SetKey(ctx context.Context, name string, value []byte, ttl uint64) error
// DeleteKey deletes a key from the store.
DeleteKey(ctx context.Context, name string) error
// Commit commits the transaction.
Commit(ctx context.Context) error
// Discard discards the transaction.
//
// Can be called even if already committed, in that case it does nothing.
Discard(ctx context.Context) error
}
// StoreClient provides access to the key-value store from Elastic Agent for this unit.
type StoreClient interface {
// BeginTx starts a transaction for the key-value store.
BeginTx(ctx context.Context, write bool) (StoreTxClient, error)
}
type storeClientTx struct {
client *storeClient
txID string
write bool
brokenErr error
discarded bool
committed bool
}
type storeClient struct {
client *clientV2
unitID string
unitType UnitType
}
// BeginTx starts a transaction for the key-value store.
func (c *storeClient) BeginTx(ctx context.Context, write bool) (StoreTxClient, error) {
txType := proto.StoreTxType_READ_ONLY
if write {
txType = proto.StoreTxType_READ_WRITE
}
res, err := c.client.storeClient.BeginTx(ctx, &proto.StoreBeginTxRequest{
Token: c.client.token,
UnitId: c.unitID,
UnitType: proto.UnitType(c.unitType),
Type: txType,
})
if err != nil {
return nil, err
}
return &storeClientTx{
client: c,
txID: res.Id,
write: write,
}, nil
}
// GetKey fetches a value from its key in the store.
func (c *storeClientTx) GetKey(ctx context.Context, name string) ([]byte, bool, error) {
if c.brokenErr != nil {
return nil, false, ErrStoreTxBroken
}
if c.discarded {
return nil, false, ErrStoreTxDiscarded
}
if c.committed {
return nil, false, ErrStoreTxCommitted
}
res, err := c.client.client.storeClient.GetKey(ctx, &proto.StoreGetKeyRequest{
Token: c.client.client.token,
TxId: c.txID,
Name: name,
})
if err != nil {
c.brokenErr = err
return nil, false, err
}
switch res.Status {
case proto.StoreGetKeyResponse_FOUND:
return res.Value, true, nil
case proto.StoreGetKeyResponse_NOT_FOUND:
return nil, false, nil
}
err = errors.New("unknown StoreGetKeyResponseStatus")
c.brokenErr = err
return nil, false, err
}
// SetKey sets a value for a key in the store.
func (c *storeClientTx) SetKey(ctx context.Context, name string, value []byte, ttl uint64) error {
if c.brokenErr != nil {
return ErrStoreTxBroken
}
if c.discarded {
return ErrStoreTxDiscarded
}
if c.committed {
return ErrStoreTxCommitted
}
if !c.write {
return ErrStoreTxReadOnly
}
_, err := c.client.client.storeClient.SetKey(ctx, &proto.StoreSetKeyRequest{
Token: c.client.client.token,
TxId: c.txID,
Name: name,
Value: value,
Ttl: ttl,
})
if err != nil {
c.brokenErr = err
return err
}
return nil
}
// DeleteKey deletes a key from the store.
func (c *storeClientTx) DeleteKey(ctx context.Context, name string) error {
if c.brokenErr != nil {
return ErrStoreTxBroken
}
if c.discarded {
return ErrStoreTxDiscarded
}
if c.committed {
return ErrStoreTxCommitted
}
if !c.write {
return ErrStoreTxReadOnly
}
_, err := c.client.client.storeClient.DeleteKey(ctx, &proto.StoreDeleteKeyRequest{
Token: c.client.client.token,
TxId: c.txID,
Name: name,
})
if err != nil {
c.brokenErr = err
return err
}
return err
}
// Commit commits the transaction.
func (c *storeClientTx) Commit(ctx context.Context) error {
if c.brokenErr != nil {
return ErrStoreTxBroken
}
if c.discarded {
return ErrStoreTxDiscarded
}
if c.committed {
return nil
}
_, err := c.client.client.storeClient.CommitTx(ctx, &proto.StoreCommitTxRequest{
Token: c.client.client.token,
TxId: c.txID,
})
if err != nil {
c.brokenErr = err
return err
}
c.committed = true
return nil
}
// Discard discards the transaction.
//
// Can be called even if already committed, in that case it does nothing.
func (c *storeClientTx) Discard(ctx context.Context) error {
if c.brokenErr != nil {
return ErrStoreTxBroken
}
if c.discarded || c.committed {
return nil
}
_, err := c.client.client.storeClient.DiscardTx(ctx, &proto.StoreDiscardTxRequest{
Token: c.client.client.token,
TxId: c.txID,
})
if err != nil {
c.brokenErr = err
return err
}
c.discarded = true
return nil
}