in ingestor/adx/uploader.go [209:336]
func (n *uploader) upload(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case batch := <-n.queue:
segments := batch.Segments
if batch.Database != n.database {
logger.Errorf("Database mismatch: %s != %s. Skipping batch", batch.Database, n.database)
continue
}
func() {
defer batch.Release()
var (
readers = make([]io.Reader, 0, len(segments))
segmentReaders = make([]*wal.SegmentReader, 0, len(segments))
database string
table string
schema string
header string
err error
)
for _, si := range segments {
database, table, schema, _, err = wal.ParseFilename(si.Path)
if err != nil {
logger.Errorf("Failed to parse file: %s", err.Error())
continue
}
metrics.SampleLatency.WithLabelValues(database, table).Set(time.Since(si.CreatedAt).Seconds())
var opts []wal.Option
if schema != "" {
opts = append(opts, wal.WithSkipHeader)
}
f, err := wal.NewSegmentReader(si.Path, opts...)
if os.IsNotExist(err) {
// batches are not disjoint, so the same segments could be included in multiple batches.
continue
} else if err != nil {
logger.Errorf("Failed to open file: %s", err.Error())
continue
}
segmentReaders = append(segmentReaders, f)
readers = append(readers, f)
}
defer func(segmentReaders []*wal.SegmentReader) {
for _, sr := range segmentReaders {
sr.Close()
}
}(segmentReaders)
if len(segmentReaders) == 0 {
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
return
}
samplePath := segmentReaders[0].Path()
database, table, schema, _, err = wal.ParseFilename(samplePath)
if err != nil {
logger.Errorf("Failed to parse file: %s: %s", samplePath, err.Error())
return
}
mapping := n.opts.DefaultMapping
if schema != "" {
header, err = n.extractSchema(samplePath)
if err != nil {
logger.Errorf("Failed to extract schema: %s: %s", samplePath, err.Error())
// This batch is invalid, remove it.
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
return
}
mapping, err = adxschema.UnmarshalSchema(header)
if err != nil {
logger.Errorf("Failed to unmarshal schema: %s: %s", samplePath, err.Error())
// This batch is invalid, remove it.
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
return
}
}
mr := io.MultiReader(readers...)
now := time.Now()
if err := n.uploadReader(mr, database, table, mapping); err != nil {
logger.Errorf("Failed to upload file: %s", err.Error())
return
}
if logger.IsDebug() {
logger.Debugf("Uploaded %v duration=%s", segments, time.Since(now).String())
}
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
for _, sr := range segmentReaders {
sampleType, sampleCount := sr.SampleMetadata()
switch sampleType {
case wal.MetricSampleType:
metrics.MetricsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
case wal.LogSampleType:
metrics.LogsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
}
}
}()
}
}
}