func()

in lib/persistedretry/writeback/executor.go [125:203]


func (e *Executor) upload(ctx context.Context, t *Task) error {
	start := time.Now()

	log.WithTraceContext(ctx).With(
		"namespace", t.Namespace,
		"name", t.Name,
	).Info("Uploading cache file to the remote backend")

	client, err := e.backends.GetClient(t.Namespace)
	if err != nil {
		if err == backend.ErrNamespaceNotFound {
			log.WithTraceContext(ctx).With(
				"namespace", t.Namespace,
				"name", t.Name,
			).Info("Dropping writeback for unconfigured namespace")
			return nil
		}
		log.WithTraceContext(ctx).With(
			"namespace", t.Namespace,
			"name", t.Name,
			"error", err,
		).Error("Failed to get backend client")
		return fmt.Errorf("get client: %s", err)
	}

	if _, err := client.Stat(t.Namespace, t.Name); err == nil {
		// File already uploaded, no-op.
		log.WithTraceContext(ctx).With(
			"namespace", t.Namespace,
			"name", t.Name,
		).Debug("File already exists in backend, skipping upload")
		return nil
	}

	f, err := e.fs.GetCacheFileReader(t.Name)
	if err != nil {
		if os.IsNotExist(err) {
			// Nothing we can do about this but make noise and drop the task.
			e.stats.Counter("missing_files").Inc(1)
			log.WithTraceContext(ctx).With(
				"namespace", t.Namespace,
				"name", t.Name,
			).Error("Invariant violation: writeback cache file missing")
			return nil
		}
		log.WithTraceContext(ctx).With(
			"namespace", t.Namespace,
			"name", t.Name,
			"error", err,
		).Error("Failed to get cache file reader")
		return fmt.Errorf("get file: %s", err)
	}
	defer closers.Close(f)

	log.WithTraceContext(ctx).With(
		"namespace", t.Namespace,
		"name", t.Name,
	).Debug("Starting backend upload")

	if err := client.Upload(t.Namespace, t.Name, f); err != nil {
		log.WithTraceContext(ctx).With(
			"namespace", t.Namespace,
			"name", t.Name,
			"error", err,
		).Error("Backend upload failed")
		return fmt.Errorf("upload: %s", err)
	}

	log.WithTraceContext(ctx).With(
		"namespace", t.Namespace,
		"name", t.Name,
	).Info("Uploaded cache file to remote backend")

	// We don't want to time noops nor errors.
	e.stats.Timer("upload").Record(time.Since(start))
	e.stats.Timer("lifetime").Record(time.Since(t.CreatedAt))

	return nil
}