in src/statequery/jobs.go [65:133]
func retrieveJobsProject(ctx context.Context, client *bigquerySDK.Service, project string, reservation Reservation, ch chan<- Job, wg *sync.WaitGroup) {
// Defer completion signal on wait group
defer wg.Done()
// Query BQ API for jobs from all users in given project with 'RUNNING' state.
list, err := client.Jobs.List(project).AllUsers(true).StateFilter("running").Do()
if err != nil {
log.Printf("failed to get BigQuery jobs: %v\n", err)
}
// Iterate jobs lists
for _, job := range list.Jobs {
// Refresh job object for current state and details jobs statistics.
current, err := client.Jobs.Get(project, job.JobReference.JobId).Do()
if err != nil {
log.Printf("failed to refresh job: %v\n", err)
}
// Switch on job types and ignore everything but 'QUERY' jobs.
switch current.Configuration.JobType {
case "LOAD":
log.Printf("skipping job of LOAD type: %v\n", current.JobReference.JobId)
continue
case "COPY":
log.Printf("skipping job of COPY type: %v\n", current.JobReference.JobId)
continue
case "EXTRACT":
log.Printf("skipping job of EXTRACT type: %v\n", current.JobReference.JobId)
continue
case "UNKNOWN":
log.Printf("skipping job of UNKNOWN type: %v\n", current.JobReference.JobId)
continue
case "QUERY":
// Get query-specific stats on this job.
stats := current.Statistics.Query
// Find the latest statistics sample in the job timeline
latestSnapshot := &bigquerySDK.QueryTimelineSample{}
for _, sample := range stats.Timeline {
if sample.ElapsedMs > latestSnapshot.ElapsedMs {
latestSnapshot = sample
}
}
runtimeMillis := latestSnapshot.ElapsedMs
slotMillis := latestSnapshot.TotalSlotMs
// Safely check stats before division
if runtimeMillis == 0 || slotMillis == 0 {
log.Printf("failed to get job runtime stats, skipping %v\n", job.JobReference.JobId)
continue
}
// Compute slot usage by eliminating time
slots := float64(slotMillis) / float64(runtimeMillis)
// Double check if the job's reservation matches the one we are looking for
fullReservationId := fmt.Sprintf("%s:%s.%s", project, reservation.Location, reservation.Name)
if current.Statistics.ReservationId != fullReservationId {
log.Printf("warn: detected missing reservation ID or mismatch: expected %s, found %s, on job %s", fullReservationId, current.Statistics.ReservationId, job.JobReference.JobId)
}
// All good. Push job down the channel.
ch <- Job{
Name: current.Id,
Usage: slots,
}
}
}
}