azkustoingest/result.go (136 lines of code) (raw):
package azkustoingest
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/properties"
"github.com/Azure/azure-kusto-go/azkustoingest/internal/status"
)
// Result provides a way for users track the state of ingestion jobs.
type Result struct {
record statusRecord
tableClient *status.TableClient
reportToTable bool
}
// newResult creates an initial ingestion status record.
func newResult() *Result {
ret := &Result{}
ret.record = newStatusRecord()
return ret
}
// putProps sets the record to a failure state and adds the error to the record details.
func (r *Result) putProps(props properties.All) {
r.reportToTable = props.Ingestion.ReportMethod == properties.ReportStatusToTable || props.Ingestion.ReportMethod == properties.ReportStatusToQueueAndTable
r.record.FromProps(props)
}
// putQueued sets the initial success status depending on status reporting state
func (r *Result) putQueued(ctx context.Context, i *Ingestion) {
// If not checking status, just return queued
if !r.reportToTable {
r.record.Status = Queued
return
}
// Get table URI
tableResources, err := i.mgr.GetTables()
if err != nil {
r.record.Status = StatusRetrievalFailed
r.record.FailureStatus = Permanent
r.record.Details = "Failed getting status table URI: " + err.Error()
return
}
if len(tableResources) == 0 {
r.record.Status = StatusRetrievalFailed
r.record.FailureStatus = Permanent
r.record.Details = "Ingestion resources do not include a status table URI: " + err.Error()
return
}
// create a table client
client, err := status.NewTableClient(i.client.HttpClient(), *tableResources[0])
if err != nil {
r.record.Status = StatusRetrievalFailed
r.record.FailureStatus = Permanent
r.record.Details = "Failed Creating a Status Table client: " + err.Error()
return
}
// StreamIngest initial record
r.record.Status = Pending
err = client.Write(ctx, r.record.IngestionSourceID.String(), r.record.ToMap())
if err != nil {
r.record.Status = StatusRetrievalFailed
r.record.FailureStatus = Permanent
r.record.Details = "Failed writing initial status record: " + err.Error()
return
}
r.tableClient = client
}
// Wait returns a channel that can be checked for ingestion results.
// In order to check actual status please use the ReportResultToTable option when ingesting data.
func (r *Result) Wait(ctx context.Context) <-chan error {
ch := make(chan error, 1)
if r.record.Status.IsFinal() || !r.reportToTable {
close(ch)
return ch
}
go func() {
defer close(ch)
r.poll(ctx)
if !r.record.Status.IsSuccess() {
ch <- r.record
}
}()
return ch
}
func (r *Result) poll(ctx context.Context) {
const pollInterval = 10 * time.Second
attempts := 3
delay := [3]int{120, 60, 10} // attempts are counted backwards
// create a table client
if r.tableClient != nil {
// Create a ticker to poll the table in 10 second intervals.
timer := time.NewTimer(pollInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
r.record.Status = StatusRetrievalCanceled
r.record.FailureStatus = Transient
return
case <-timer.C:
smap, err := r.tableClient.Read(ctx, r.record.IngestionSourceID.String())
if err != nil {
if attempts == 0 {
r.record.Status = StatusRetrievalFailed
r.record.FailureStatus = Transient
r.record.Details = "Failed reading from Status Table: " + err.Error()
return
}
attempts = attempts - 1
time.Sleep(time.Duration(delay[attempts]+rand.Intn(5)) * time.Second)
} else {
r.record.FromMap(smap)
if r.record.Status.IsFinal() {
return
}
}
timer.Reset(pollInterval)
}
}
}
}
// IsStatusRecord verifies that the given error is a status record.
func IsStatusRecord(err error) bool {
_, ok := err.(statusRecord)
return ok
}
// GetIngestionStatus extracts the ingestion status code from an ingestion error
func GetIngestionStatus(err error) (StatusCode, error) {
if s, ok := err.(statusRecord); ok {
return s.Status, nil
}
return Failed, fmt.Errorf("Error is not an Ingestion Result")
}
// GetIngestionFailureStatus extracts the ingestion failure code from an ingestion error
func GetIngestionFailureStatus(err error) (FailureStatusCode, error) {
if s, ok := err.(statusRecord); ok {
return s.FailureStatus, nil
}
return Unknown, fmt.Errorf("Error is not an Ingestion Result")
}
// GetErrorCode extracts the error code from an ingestion error
func GetErrorCode(err error) (string, error) {
if s, ok := err.(statusRecord); ok {
return s.ErrorCode, nil
}
return "", fmt.Errorf("Error is not an Ingestion Result")
}
// IsRetryable indicates whether there's any merit in retying ingestion
func IsRetryable(err error) bool {
if s, ok := err.(statusRecord); ok {
return s.FailureStatus.IsRetryable()
}
return false
}