azkustoingest/internal/status/status_table_client.go (62 lines of code) (raw):
package status
import (
"context"
"encoding/json"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/resources"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/data/aztables"
"github.com/google/uuid"
"time"
)
const (
defaultTimeoutSeconds = 10
fullMetadata = aztables.MetadataFormatFull
)
// TableClient allows reading and writing to azure tables.
type TableClient struct {
tableURI resources.URI
client *aztables.Client
}
// NewTableClient Creates an azure table client.
func NewTableClient(client policy.Transporter, uri resources.URI) (*TableClient, error) {
tableClient, err := aztables.NewClientWithNoCredential(uri.String(), &aztables.ClientOptions{
ClientOptions: azcore.ClientOptions{
Transport: client,
},
})
if err != nil {
return nil, err
}
return &TableClient{
tableURI: uri,
client: tableClient,
}, nil
}
// Read reads a table record containing ingestion status.
func (c *TableClient) Read(ctx context.Context, ingestionSourceID string) (map[string]interface{}, error) {
var emptyID = uuid.Nil.String()
entity, err := c.client.GetEntity(ctx, ingestionSourceID, emptyID, nil)
if err != nil {
return nil, err
}
bytes := entity.Value
m := make(map[string]interface{})
err = json.Unmarshal(bytes, &m)
if err != nil {
return nil, err
}
return m, nil
}
// Write reads a table record containing ingestion status.
func (c *TableClient) Write(ctx context.Context, ingestionSourceID string, data map[string]interface{}) error {
ctx, cancel := context.WithTimeout(ctx, defaultTimeoutSeconds*time.Second)
defer cancel()
data["PartitionKey"] = ingestionSourceID
data["RowKey"] = uuid.Nil.String()
bytes, err := json.Marshal(data)
if err != nil {
return err
}
format := fullMetadata
_, err = c.client.AddEntity(ctx, bytes, &aztables.AddEntityOptions{
Format: &format,
})
return err
}