in src/go/pkg/edasim/statscollector.go [19:83]
func StatsCollector(ctx context.Context, syncWaitGroup *sync.WaitGroup) {
log.Info.Printf("starting Stats Collector\n")
defer syncWaitGroup.Done()
defer log.Info.Printf("StatsCollector complete")
start := time.Now()
// for statistics
lastJobsProcessedCount := 0
lastProcessFilesWritten := 0
lastCompletedJobsCount := 0
lastUploadCount := 0
lastErrorCount := 0
jobsProcessedCount := 0
processFilesWritten := 0
completedJobsCount := 0
uploadCount := 0
errorCount := 0
statsChannel := GetStatsChannel(ctx)
ticker := time.NewTicker(time.Duration(millisecondsSleep) * time.Millisecond)
defer ticker.Stop()
keepRunning := true
for keepRunning {
select {
case <-statsChannel.ChJobProcessed:
jobsProcessedCount++
case <-statsChannel.ChProcessedFilesWritten:
processFilesWritten++
case <-statsChannel.ChJobCompleted:
completedJobsCount++
case <-statsChannel.ChUpload:
uploadCount++
case <-statsChannel.ChError:
errorCount++
case <-ctx.Done():
keepRunning = false
case <-ticker.C:
}
if time.Since(start) > secondsBetweenStats || !keepRunning {
start = start.Add(secondsBetweenStats)
if jobsProcessedCount > 0 {
log.Info.Printf("jobsProcessedCount: %d (delta %d)", jobsProcessedCount, (jobsProcessedCount - lastJobsProcessedCount))
lastJobsProcessedCount = jobsProcessedCount
}
if processFilesWritten > 0 {
log.Info.Printf("processFilesWritten: %d (delta %d)", processFilesWritten, (processFilesWritten - lastProcessFilesWritten))
lastProcessFilesWritten = processFilesWritten
}
if completedJobsCount > 0 {
log.Info.Printf("completedJobsCount: %d (delta %d)", completedJobsCount, (completedJobsCount - lastCompletedJobsCount))
lastCompletedJobsCount = completedJobsCount
}
if uploadCount > 0 {
log.Info.Printf("uploadCount: %d (delta %d)", uploadCount, (uploadCount - lastUploadCount))
lastUploadCount = uploadCount
}
if errorCount > 0 {
log.Info.Printf("errorCount: %d (delta %d)", errorCount, (errorCount - lastErrorCount))
lastErrorCount = errorCount
}
}
}
}