storage/cursor.go (40 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package storage
import (
"context"
"encoding/json"
"fmt"
"cloud.google.com/go/storage"
"go.elastic.co/apm/v2"
"go.uber.org/zap"
)
type cursor struct {
Current string `json:"current"`
}
func (c *cursor) String() string {
b, err := json.Marshal(c)
if err != nil {
return err.Error()
}
return string(b)
}
func loadCursor(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, bucketName, rootStoragePath string) (*cursor, error) {
span, ctx := apm.StartSpan(ctx, "LoadCursor", "app")
defer span.End()
logger.Debug("load cursor file")
rootedCursorStoragePath := joinObjectPaths(rootStoragePath, cursorStoragePath)
objectReader, err := storageClient.Bucket(bucketName).Object(rootedCursorStoragePath).NewReader(ctx)
if err == storage.ErrObjectNotExist {
return nil, fmt.Errorf("cursor file doesn't exist, most likely a first run (bucketName: %s, path: %s): %w", bucketName, rootedCursorStoragePath, err)
}
if err != nil {
return nil, fmt.Errorf("can't read the cursor file (path: %s): %w", rootedCursorStoragePath, err)
}
defer objectReader.Close()
var c cursor
err = json.NewDecoder(objectReader).Decode(&c)
if err != nil {
return nil, fmt.Errorf("can't decode the cursor file: %w", err)
}
logger.Debug("loaded cursor file", zap.String("cursor", c.String()))
return &c, nil
}