storage/store.go (312 lines of code) (raw):

package storage import ( "bytes" "context" "fmt" "io" "os" "path/filepath" "strconv" "sync" "text/tabwriter" "time" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/otlp" "github.com/Azure/adx-mon/pkg/pool" "github.com/Azure/adx-mon/pkg/prompb" "github.com/Azure/adx-mon/pkg/service" "github.com/Azure/adx-mon/pkg/wal" "github.com/Azure/adx-mon/schema" transform2 "github.com/Azure/adx-mon/transform" gbp "github.com/libp2p/go-buffer-pool" "github.com/prometheus/client_golang/prometheus" ) var ( csvWriterPool = pool.NewGeneric(1000, func(sz int) interface{} { return transform2.NewCSVWriter(bytes.NewBuffer(make([]byte, 0, sz)), nil) }) nativeLogsCSVWriterPool = pool.NewGeneric(1000, func(sz int) interface{} { return transform2.NewCSVNativeLogsCSVWriter(bytes.NewBuffer(make([]byte, 0, sz)), nil) }) metricsCSVWriterPool = pool.NewGeneric(1000, func(sz int) interface{} { return transform2.NewMetricsCSVWriter(bytes.NewBuffer(make([]byte, 0, sz)), nil) }) ) type Store interface { service.Component // WriteTimeSeries writes a batch of time series to the Store. WriteTimeSeries(ctx context.Context, ts []*prompb.TimeSeries) error // WriteOTLPLogs writes a batch of logs to the Store. WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error // WriteNativeLogs writes a batch of logs to the Store. WriteNativeLogs(ctx context.Context, logs *types.LogBatch) error // Import imports a file into the LocalStore and returns the number of bytes stored. Import(filename string, body io.ReadCloser) (int, error) } // LocalStore provides local storage of time series data. It manages Write Ahead Logs (WALs) for each metric. type LocalStore struct { opts StoreOpts mu sync.RWMutex repository *wal.Repository metricsMu sync.RWMutex metrics map[string]prometheus.Counter } type StoreOpts struct { StorageDir string SegmentMaxSize int64 SegmentMaxAge time.Duration MaxDiskUsage int64 LiftedLabels []string LiftedAttributes []string LiftedResources []string WALFlushInterval time.Duration EnableWALFsync bool } func NewLocalStore(opts StoreOpts) *LocalStore { return &LocalStore{ opts: opts, repository: wal.NewRepository(wal.RepositoryOpts{ StorageDir: opts.StorageDir, SegmentMaxSize: opts.SegmentMaxSize, SegmentMaxAge: opts.SegmentMaxAge, MaxDiskUsage: opts.MaxDiskUsage, WALFlushInterval: opts.WALFlushInterval, EnableWALFsync: opts.EnableWALFsync, }), metrics: make(map[string]prometheus.Counter), } } func (s *LocalStore) Open(ctx context.Context) error { return s.repository.Open(ctx) } func (s *LocalStore) Close() error { return s.repository.Close() } func (s *LocalStore) GetWAL(ctx context.Context, key []byte) (*wal.WAL, error) { return s.repository.Get(ctx, key) } func (s *LocalStore) WALCount() int { return s.repository.Count() } func (s *LocalStore) WriteTimeSeries(ctx context.Context, ts []*prompb.TimeSeries) error { enc := metricsCSVWriterPool.Get(8 * 1024).(*transform2.MetricsCSVWriter) defer metricsCSVWriterPool.Put(enc) enc.InitColumns(s.opts.LiftedLabels) b := gbp.Get(256) defer gbp.Put(b) for _, v := range ts { key, err := SegmentKey(b[:0], v.Labels, enc.SchemaHash()) if err != nil { return err } wal, err := s.GetWAL(ctx, key) if err != nil { return err } s.incMetrics(v.Labels[0].Value, len(v.Samples)) enc.Reset() if err := enc.MarshalCSV(v); err != nil { return err } if err := wal.Write(ctx, enc.Bytes()); err != nil { return err } } return nil } func (s *LocalStore) WriteOTLPLogs(ctx context.Context, database, table string, logs *otlp.Logs) error { sanitizedDB := schema.NormalizeAdxIdentifier(database) sanitizedTable := schema.NormalizeAdxIdentifier(table) if sanitizedDB == "" || sanitizedTable == "" { logger.Warnf("Invalid database or table name: %s.%s", database, table) return nil // Do not retry - move on } enc := csvWriterPool.Get(8 * 1024).(*transform2.CSVWriter) defer csvWriterPool.Put(enc) key := gbp.Get(256) defer gbp.Put(key) if logger.IsDebug() { logger.Debugf("Store received %d logs for %s.%s", len(logs.Logs), sanitizedDB, sanitizedTable) } key = fmt.Appendf(key[:0], "%s_%s", sanitizedDB, sanitizedTable) w, err := s.GetWAL(ctx, key) if err != nil { return err } metrics.SamplesStored.WithLabelValues(sanitizedTable).Add(float64(len(logs.Logs))) enc.Reset() if err := enc.MarshalLog(logs); err != nil { return err } wo := wal.WithSampleMetadata(wal.LogSampleType, uint32(len(logs.Logs))) if err := w.Write(ctx, enc.Bytes(), wo); err != nil { return err } return nil } func (s *LocalStore) WriteNativeLogs(ctx context.Context, logs *types.LogBatch) error { enc := nativeLogsCSVWriterPool.Get(8 * 1024).(*transform2.NativeLogsCSVWriter) defer nativeLogsCSVWriterPool.Put(enc) enc.InitColumns(s.opts.LiftedResources) key := gbp.Get(256) defer gbp.Put(key) if logger.IsDebug() { logger.Debugf("Store received %d native logs", len(logs.Logs)) } noDestinationCount := 0 // Each log can potentially have a different destination. // Instead of splitting ahead of time and allocating n slices, just encode and write to the wal on // a per-log basis. for _, log := range logs.Logs { // If we don't have a destination, we can't do anything with the log. // Skip instead of trying again, which will just repeat the same error. database := types.StringOrEmpty(log.GetAttributeValue(types.AttributeDatabaseName)) if database == "" { noDestinationCount++ continue } sanitizedDB := schema.NormalizeAdxIdentifier(database) if sanitizedDB == "" { noDestinationCount++ continue } table := types.StringOrEmpty(log.GetAttributeValue(types.AttributeTableName)) if table == "" { noDestinationCount++ continue } sanitizedTable := schema.NormalizeAdxIdentifier(table) if sanitizedTable == "" { noDestinationCount++ continue } key = fmt.Appendf(key[:0], "%s_%s_", sanitizedDB, sanitizedTable) key = strconv.AppendUint(key, enc.SchemaHash(), 36) wal, err := s.GetWAL(ctx, key) if err != nil { return err } metrics.SamplesStored.WithLabelValues(table).Inc() enc.Reset() if err := enc.MarshalNativeLog(log); err != nil { return err } if err := wal.Write(ctx, enc.Bytes()); err != nil { return err } } if noDestinationCount > 0 { logger.Warnf("Got %d logs without ADX destinations - dropped", noDestinationCount) metrics.InvalidLogsDropped.WithLabelValues("no_destination").Add(float64(noDestinationCount)) } return nil } func (s *LocalStore) PrefixesByAge() []string { s.mu.RLock() defer s.mu.RUnlock() return s.repository.PrefixesByAge() } func (s *LocalStore) Import(filename string, body io.ReadCloser) (int, error) { db, table, schema, _, err := wal.ParseFilename(filename) if err != nil { return 0, err } key := gbp.Get(256) defer gbp.Put(key) if schema != "" { key = fmt.Appendf(key[:0], "%s_%s_%s", db, table, schema) } else { key = fmt.Appendf(key[:0], "%s_%s", db, table) } wal, err := s.GetWAL(context.Background(), key) if err != nil { return 0, err } buf := pool.BytesBufferPool.Get(512 * 1024).(*gbp.Buffer) defer pool.BytesBufferPool.Put(buf) buf.Reset() n, err := io.Copy(buf, body) if err != nil { return 0, err } return int(n), wal.Append(context.Background(), buf.Bytes()) } func (s *LocalStore) Remove(path string) error { db, table, schema, _, err := wal.ParseFilename(path) if err != nil { return err } var key string if schema != "" { key = fmt.Sprintf("%s_%s_%s", db, table, schema) } else { key = fmt.Sprintf("%s_%s", db, table) } wal, err := s.GetWAL(context.Background(), []byte(key)) if err != nil { return err } return wal.Remove(path) } func (s *LocalStore) incMetrics(value []byte, n int) { s.metricsMu.RLock() counter := s.metrics[string(value)] s.metricsMu.RUnlock() if counter != nil { counter.Add(float64(n)) return } s.metricsMu.Lock() counter = s.metrics[string(value)] if counter == nil { counter = metrics.SamplesStored.WithLabelValues(string(value)) s.metrics[string(value)] = counter } s.metricsMu.Unlock() counter.Add(float64(n)) } func (s *LocalStore) Index() *wal.Index { return s.repository.Index() } // WriteDebug writes debug information to the given writer. func (s *LocalStore) WriteDebug(w io.Writer) error { if err := s.repository.WriteDebug(w); err != nil { return err } w.Write([]byte("\nSegments On Disk:\n")) tw := tabwriter.NewWriter(w, 0, 8, 1, ' ', 0) tw.Write([]byte("Path\tSize\tCreatedAt\n")) var totalSize, totalSegments int64 if err := filepath.Walk(s.opts.StorageDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { tw.Write([]byte(fmt.Sprintf("%s\t%d\t%s\n", path, info.Size(), info.ModTime().Format(time.RFC3339)))) totalSize += info.Size() totalSegments += 1 } return nil }); err != nil { return err } fmt.Fprintf(w, "Actual: Disk Usage: %d, Segments: %d\n", totalSize, totalSegments) tw.Flush() return nil } func SegmentKey(dst []byte, labels []*prompb.Label, hash uint64) ([]byte, error) { var name, database []byte for _, v := range labels { if bytes.Equal(v.Name, []byte("adxmon_database")) { database = v.Value continue } if bytes.Equal(v.Name, []byte("__name__")) { name = v.Value continue } } if len(database) == 0 { return nil, fmt.Errorf("database label not found") } if len(name) == 0 { return nil, fmt.Errorf("name label not found") } dst = schema.AppendNormalizeAdxIdentifier(dst, database) dst = append(dst, delim...) dst = schema.AppendNormalizeMetricName(dst, name) dst = append(dst, delim...) return strconv.AppendUint(dst, hash, 36), nil } var delim = []byte("_")