func()

in ingestor/adx/uploader.go [209:336]


func (n *uploader) upload(ctx context.Context) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case batch := <-n.queue:
			segments := batch.Segments

			if batch.Database != n.database {
				logger.Errorf("Database mismatch: %s != %s. Skipping batch", batch.Database, n.database)
				continue
			}

			func() {
				defer batch.Release()

				var (
					readers        = make([]io.Reader, 0, len(segments))
					segmentReaders = make([]*wal.SegmentReader, 0, len(segments))
					database       string
					table          string
					schema         string
					header         string
					err            error
				)

				for _, si := range segments {
					database, table, schema, _, err = wal.ParseFilename(si.Path)
					if err != nil {
						logger.Errorf("Failed to parse file: %s", err.Error())
						continue
					}
					metrics.SampleLatency.WithLabelValues(database, table).Set(time.Since(si.CreatedAt).Seconds())

					var opts []wal.Option
					if schema != "" {
						opts = append(opts, wal.WithSkipHeader)
					}
					f, err := wal.NewSegmentReader(si.Path, opts...)
					if os.IsNotExist(err) {
						// batches are not disjoint, so the same segments could be included in multiple batches.
						continue
					} else if err != nil {
						logger.Errorf("Failed to open file: %s", err.Error())
						continue
					}

					segmentReaders = append(segmentReaders, f)
					readers = append(readers, f)
				}

				defer func(segmentReaders []*wal.SegmentReader) {
					for _, sr := range segmentReaders {
						sr.Close()
					}
				}(segmentReaders)

				if len(segmentReaders) == 0 {
					if err := batch.Remove(); err != nil {
						logger.Errorf("Failed to remove batch: %s", err.Error())
					}
					return
				}

				samplePath := segmentReaders[0].Path()
				database, table, schema, _, err = wal.ParseFilename(samplePath)
				if err != nil {
					logger.Errorf("Failed to parse file: %s: %s", samplePath, err.Error())
					return
				}

				mapping := n.opts.DefaultMapping
				if schema != "" {
					header, err = n.extractSchema(samplePath)
					if err != nil {
						logger.Errorf("Failed to extract schema: %s: %s", samplePath, err.Error())

						// This batch is invalid, remove it.
						if err := batch.Remove(); err != nil {
							logger.Errorf("Failed to remove batch: %s", err.Error())
						}

						return
					}

					mapping, err = adxschema.UnmarshalSchema(header)
					if err != nil {
						logger.Errorf("Failed to unmarshal schema: %s: %s", samplePath, err.Error())

						// This batch is invalid, remove it.
						if err := batch.Remove(); err != nil {
							logger.Errorf("Failed to remove batch: %s", err.Error())
						}
						return
					}
				}

				mr := io.MultiReader(readers...)

				now := time.Now()
				if err := n.uploadReader(mr, database, table, mapping); err != nil {
					logger.Errorf("Failed to upload file: %s", err.Error())
					return
				}

				if logger.IsDebug() {
					logger.Debugf("Uploaded %v duration=%s", segments, time.Since(now).String())
				}

				if err := batch.Remove(); err != nil {
					logger.Errorf("Failed to remove batch: %s", err.Error())
				}

				for _, sr := range segmentReaders {
					sampleType, sampleCount := sr.SampleMetadata()

					switch sampleType {
					case wal.MetricSampleType:
						metrics.MetricsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
					case wal.LogSampleType:
						metrics.LogsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
					}
				}
			}()

		}
	}
}