collector/logs/sources/journal/cursor.go (56 lines of code) (raw):

package journal import ( "encoding/json" "fmt" "os" "path/filepath" "strings" "github.com/Azure/adx-mon/pkg/logger" "github.com/cespare/xxhash" ) type journalcursor struct { Cursor string `json:"cursor"` } func cursorPath(cursorDirectory string, filters []string, database string, table string) string { hasher := xxhash.New() // filters are order dependent, so do not sort before creating the hash. for _, filter := range filters { hasher.Write([]byte(filter)) } filterHash := hasher.Sum64() fileName := fmt.Sprintf("journal_%s_%s_%x.cursor", database, table, filterHash) sanitized := strings.ReplaceAll(fileName, string(filepath.Separator), "_") return filepath.Join(cursorDirectory, sanitized) } func writeCursor(cursorFilePath string, cursor string) { if cursor == "" || cursorFilePath == "" { return } cursorVal := fmt.Sprintf(`{"cursor":"%s"}`, cursor) output, err := os.Create(cursorFilePath) if err != nil { logger.Errorf("journal: failed to create cursor file: %v", err) return } defer output.Close() _, err = output.Write([]byte(cursorVal)) if err != nil { logger.Errorf("journal: failed to write cursor file: %v", err) } } func readCursor(cursorFilePath string) (string, error) { input, err := os.Open(cursorFilePath) if err != nil { return "", fmt.Errorf("journal: failed to open cursor file: %w", err) } defer input.Close() var cursor journalcursor if err := json.NewDecoder(input).Decode(&cursor); err != nil { return "", fmt.Errorf("journal: failed to decode cursor file: %w", err) } return cursor.Cursor, nil } func cleanCursor(cursorFilePath string) { if err := os.Remove(cursorFilePath); err != nil { logger.Errorf("journal: failed to remove cursor file: %v", err) } }