in pkg/source/gcp/query/queryutil/parallel.go [56:147]
func (p *ParallelQueryWorker) Query(ctx context.Context, readerFactory *structure.ReaderFactory, resourceNames []string, progress *progress.TaskProgress) ([]*log.LogEntity, error) {
timeSegments := divideTimeSegments(p.startTime, p.endTime, p.workerCount)
percentages := make([]float32, p.workerCount)
logSink := make(chan *log.LogEntity)
logEntries := []*log.LogEntity{}
wg := sync.WaitGroup{}
queryStartTime := time.Now()
threadCount := atomic.Int32{}
threadCount.Add(1)
go func() {
cancellable, cancel := context.WithCancel(ctx)
go func() {
for {
select {
case <-cancellable.Done():
return
case <-time.After(time.Second):
currentTime := time.Now()
speed := float64(len(logEntries)) / currentTime.Sub(queryStartTime).Seconds()
s := float32(0)
for _, p := range percentages {
s += p
}
progressRatio := s / float32(len(percentages))
progress.Update(progressRatio, fmt.Sprintf("%.2f lps(concurrency %d)", speed, threadCount.Load()))
}
}
}()
for logEntry := range logSink {
logEntries = append(logEntries, logEntry)
}
cancel()
}()
cancellableCtx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("query completed"))
for i := 0; i < len(timeSegments)-1; i++ {
workerIndex := i
begin := timeSegments[i]
end := timeSegments[i+1]
includeEnd := i == len(timeSegments)-1
query := fmt.Sprintf("%s\n%s", p.baseQuery, TimeRangeQuerySection(begin, end, includeEnd))
subLogSink := make(chan any)
wg.Add(1)
p.pool.Run(func() {
defer wg.Done()
go func() {
threadCount.Add(1)
err := p.apiClient.ListLogEntries(cancellableCtx, resourceNames, query, subLogSink)
if err != nil && !errors.Is(err, context.Canceled) {
slog.WarnContext(cancellableCtx, fmt.Sprintf("query thread failed with an error\n%s", err))
cancel(err)
}
}()
for logEntryAny := range subLogSink {
yamlString, err := yamlutil.MarshalToYamlString(logEntryAny)
if err != nil {
slog.WarnContext(ctx, "failed to parse a log as YAML. Skipping.")
continue
}
logReader, err := readerFactory.NewReader(adapter.Yaml(yamlString))
if err != nil {
slog.WarnContext(ctx, fmt.Sprintf("failed to create reader for log entry\n%s", err))
continue
}
commonLogFieldCache := log.NewCachedLogFieldExtractor(gcp_log.GCPCommonFieldExtractor{})
commonLogFieldCache.SetLogBodyCacheDirect(yamlString)
logEntry := log.NewLogEntity(logReader, commonLogFieldCache)
percentages[workerIndex] = float32(logEntry.Timestamp().Sub(begin)) / float32(end.Sub(begin))
logSink <- logEntry
}
percentages[workerIndex] = 1
threadCount.Add(-1)
})
if errors.Is(cancellableCtx.Err(), context.Canceled) {
break
}
// To avoid being rate limited by accessing all at once, the access timing is shifted by 3000ms.
<-time.After(time.Second * 3)
}
wg.Wait()
close(logSink)
err := context.Cause(cancellableCtx)
if err != nil {
cancel(err)
return nil, err
}
cancel(nil)
return logEntries, nil
}