lib/persistedretry/writeback/executor.go (148 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writeback
import (
"context"
"fmt"
"os"
"time"
"github.com/uber-go/tally"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/persistedretry"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/utils/closers"
"github.com/uber/kraken/utils/log"
"go.opentelemetry.io/otel/trace"
)
// FileStore defines store operations required for write-back.
type FileStore interface {
DeleteCacheFileMetadata(name string, md metadata.Metadata) error
GetCacheFileReader(name string) (store.FileReader, error)
}
// Executor executes write back tasks.
type Executor struct {
stats tally.Scope
fs FileStore
backends *backend.Manager
}
// NewExecutor creates a new Executor.
func NewExecutor(
stats tally.Scope,
fs FileStore,
backends *backend.Manager,
) *Executor {
stats = stats.Tagged(map[string]string{
"module": "writebackexecutor",
})
return &Executor{stats, fs, backends}
}
// Name returns the executor name.
func (e *Executor) Name() string {
return "writeback"
}
// Exec uploads the cache file corresponding to r's digest to the remote backend
// that matches r's namespace.
func (e *Executor) Exec(r persistedretry.Task) error {
t, ok := r.(*Task)
if !ok {
return fmt.Errorf("expected *Task, got %T", r)
}
// Extract context from task for trace propagation
// Tasks from public endpoints will have trace context, internal tasks may not
ctx := e.getContextFromTask(t)
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
"has_trace_context", t.HasTraceContext(),
).Debug("Executing writeback task")
if err := e.upload(ctx, t); err != nil {
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
"error", err,
).Error("Failed to upload during writeback")
return err
}
err := e.fs.DeleteCacheFileMetadata(t.Name, &metadata.Persist{})
if err != nil && !os.IsNotExist(err) {
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
"error", err,
).Error("Failed to delete persist metadata")
return fmt.Errorf("delete persist metadata: %s", err)
}
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Debug("Successfully completed writeback task")
return nil
}
// getContextFromTask extracts trace context from a task if available.
// Returns context.Background() if no trace context is present.
func (e *Executor) getContextFromTask(t *Task) context.Context {
if !t.HasTraceContext() {
return context.Background()
}
spanCtx := t.SpanContext()
if !spanCtx.IsValid() {
return context.Background()
}
// Create a context with the span context for logging correlation
return trace.ContextWithSpanContext(context.Background(), spanCtx)
}
func (e *Executor) upload(ctx context.Context, t *Task) error {
start := time.Now()
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Info("Uploading cache file to the remote backend")
client, err := e.backends.GetClient(t.Namespace)
if err != nil {
if err == backend.ErrNamespaceNotFound {
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Info("Dropping writeback for unconfigured namespace")
return nil
}
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
"error", err,
).Error("Failed to get backend client")
return fmt.Errorf("get client: %s", err)
}
if _, err := client.Stat(t.Namespace, t.Name); err == nil {
// File already uploaded, no-op.
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Debug("File already exists in backend, skipping upload")
return nil
}
f, err := e.fs.GetCacheFileReader(t.Name)
if err != nil {
if os.IsNotExist(err) {
// Nothing we can do about this but make noise and drop the task.
e.stats.Counter("missing_files").Inc(1)
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Error("Invariant violation: writeback cache file missing")
return nil
}
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
"error", err,
).Error("Failed to get cache file reader")
return fmt.Errorf("get file: %s", err)
}
defer closers.Close(f)
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Debug("Starting backend upload")
if err := client.Upload(t.Namespace, t.Name, f); err != nil {
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
"error", err,
).Error("Backend upload failed")
return fmt.Errorf("upload: %s", err)
}
log.WithTraceContext(ctx).With(
"namespace", t.Namespace,
"name", t.Name,
).Info("Uploaded cache file to remote backend")
// We don't want to time noops nor errors.
e.stats.Timer("upload").Record(time.Since(start))
e.stats.Timer("lifetime").Record(time.Since(t.CreatedAt))
return nil
}