func()

in pkg/source/gcp/query/queryutil/parallel.go [56:147]


func (p *ParallelQueryWorker) Query(ctx context.Context, readerFactory *structure.ReaderFactory, resourceNames []string, progress *progress.TaskProgress) ([]*log.LogEntity, error) {
	timeSegments := divideTimeSegments(p.startTime, p.endTime, p.workerCount)
	percentages := make([]float32, p.workerCount)
	logSink := make(chan *log.LogEntity)
	logEntries := []*log.LogEntity{}
	wg := sync.WaitGroup{}
	queryStartTime := time.Now()
	threadCount := atomic.Int32{}
	threadCount.Add(1)
	go func() {
		cancellable, cancel := context.WithCancel(ctx)
		go func() {
			for {
				select {
				case <-cancellable.Done():
					return
				case <-time.After(time.Second):
					currentTime := time.Now()

					speed := float64(len(logEntries)) / currentTime.Sub(queryStartTime).Seconds()
					s := float32(0)
					for _, p := range percentages {
						s += p
					}
					progressRatio := s / float32(len(percentages))
					progress.Update(progressRatio, fmt.Sprintf("%.2f lps(concurrency %d)", speed, threadCount.Load()))
				}
			}
		}()
		for logEntry := range logSink {
			logEntries = append(logEntries, logEntry)
		}
		cancel()
	}()

	cancellableCtx, cancel := context.WithCancelCause(ctx)
	defer cancel(errors.New("query completed"))

	for i := 0; i < len(timeSegments)-1; i++ {
		workerIndex := i
		begin := timeSegments[i]
		end := timeSegments[i+1]
		includeEnd := i == len(timeSegments)-1
		query := fmt.Sprintf("%s\n%s", p.baseQuery, TimeRangeQuerySection(begin, end, includeEnd))
		subLogSink := make(chan any)
		wg.Add(1)
		p.pool.Run(func() {
			defer wg.Done()
			go func() {
				threadCount.Add(1)
				err := p.apiClient.ListLogEntries(cancellableCtx, resourceNames, query, subLogSink)
				if err != nil && !errors.Is(err, context.Canceled) {
					slog.WarnContext(cancellableCtx, fmt.Sprintf("query thread failed with an error\n%s", err))
					cancel(err)
				}
			}()
			for logEntryAny := range subLogSink {
				yamlString, err := yamlutil.MarshalToYamlString(logEntryAny)
				if err != nil {
					slog.WarnContext(ctx, "failed to parse a log as YAML. Skipping.")
					continue
				}
				logReader, err := readerFactory.NewReader(adapter.Yaml(yamlString))
				if err != nil {
					slog.WarnContext(ctx, fmt.Sprintf("failed to create reader for log entry\n%s", err))
					continue
				}
				commonLogFieldCache := log.NewCachedLogFieldExtractor(gcp_log.GCPCommonFieldExtractor{})
				commonLogFieldCache.SetLogBodyCacheDirect(yamlString)
				logEntry := log.NewLogEntity(logReader, commonLogFieldCache)
				percentages[workerIndex] = float32(logEntry.Timestamp().Sub(begin)) / float32(end.Sub(begin))
				logSink <- logEntry
			}
			percentages[workerIndex] = 1
			threadCount.Add(-1)
		})
		if errors.Is(cancellableCtx.Err(), context.Canceled) {
			break
		}
		// To avoid being rate limited by accessing all at once, the access timing is shifted by 3000ms.
		<-time.After(time.Second * 3)
	}
	wg.Wait()
	close(logSink)
	err := context.Cause(cancellableCtx)
	if err != nil {
		cancel(err)
		return nil, err
	}
	cancel(nil)
	return logEntries, nil
}