lib/persistedretry/writeback/task.go (91 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package writeback import ( "context" "encoding/hex" "fmt" "time" "go.opentelemetry.io/otel/trace" "github.com/uber/kraken/core" ) // Task contains information to write back a blob to remote storage. type Task struct { Namespace string `db:"namespace"` Name string `db:"name"` CreatedAt time.Time `db:"created_at"` LastAttempt time.Time `db:"last_attempt"` Failures int `db:"failures"` Delay time.Duration `db:"delay"` // Trace context for linking async execution back to original request. TraceID string `db:"trace_id"` SpanID string `db:"span_id"` TraceFlags string `db:"trace_flags"` // Hex string of trace flags (e.g., "01" if sampled) // Deprecated. Use name instead. Digest core.Digest `db:"digest"` } // NewTask creates a new Task. // Deprecated: Use NewTaskWithContext to preserve trace context. func NewTask(namespace, name string, delay time.Duration) *Task { return &Task{ Namespace: namespace, Name: name, CreatedAt: time.Now(), Delay: delay, } } // NewTaskWithContext creates a new Task and captures the trace context from ctx. // This allows the async writeback execution to be linked to the original request trace. // It also captures TraceFlags to preserve the sampling decision. func NewTaskWithContext(ctx context.Context, namespace, name string, delay time.Duration) *Task { t := &Task{ Namespace: namespace, Name: name, CreatedAt: time.Now(), Delay: delay, } if spanCtx := trace.SpanContextFromContext(ctx); spanCtx.IsValid() { t.TraceID = spanCtx.TraceID().String() t.SpanID = spanCtx.SpanID().String() t.TraceFlags = spanCtx.TraceFlags().String() } return t } // HasTraceContext returns true if the task has captured trace context. func (t *Task) HasTraceContext() bool { return t.TraceID != "" && t.SpanID != "" } // SpanContext reconstructs a trace.SpanContext from the stored trace IDs. // Returns an invalid SpanContext if the task has no trace context or if parsing fails. func (t *Task) SpanContext() trace.SpanContext { if !t.HasTraceContext() { return trace.SpanContext{} } traceID, err := trace.TraceIDFromHex(t.TraceID) if err != nil { return trace.SpanContext{} } spanID, err := trace.SpanIDFromHex(t.SpanID) if err != nil { return trace.SpanContext{} } // Parse TraceFlags to preserve sampling decision var traceFlags trace.TraceFlags if t.TraceFlags != "" { traceFlags = trace.TraceFlags(parseHexByte(t.TraceFlags)) } return trace.NewSpanContext(trace.SpanContextConfig{ TraceID: traceID, SpanID: spanID, TraceFlags: traceFlags, Remote: true, }) } // parseHexByte parses a hex string (e.g., "01") to a byte. func parseHexByte(s string) byte { b, err := hex.DecodeString(s) if err != nil || len(b) != 1 { return 0 } return b[0] } func (t *Task) String() string { return fmt.Sprintf("writeback.Task(namespace=%s, name=%s)", t.Namespace, t.Name) } // GetLastAttempt returns when t was last attempted. func (t *Task) GetLastAttempt() time.Time { return t.LastAttempt } // GetFailures returns the number of times t has failed. func (t *Task) GetFailures() int { return t.Failures } // Ready returns whether t is ready to run. func (t *Task) Ready() bool { return time.Since(t.CreatedAt) >= t.Delay } // Tags is unused. func (t *Task) Tags() map[string]string { return nil }