func retrieveJobsProject()

in src/statequery/jobs.go [65:133]


func retrieveJobsProject(ctx context.Context, client *bigquerySDK.Service, project string, reservation Reservation, ch chan<- Job, wg *sync.WaitGroup) {
	// Defer completion signal on wait group
	defer wg.Done()

	// Query BQ API for jobs from all users in given project with 'RUNNING' state.
	list, err := client.Jobs.List(project).AllUsers(true).StateFilter("running").Do()
	if err != nil {
		log.Printf("failed to get BigQuery jobs: %v\n", err)
	}

	// Iterate jobs lists
	for _, job := range list.Jobs {
		// Refresh job object for current state and details jobs statistics.
		current, err := client.Jobs.Get(project, job.JobReference.JobId).Do()
		if err != nil {
			log.Printf("failed to refresh job: %v\n", err)
		}

		// Switch on job types and ignore everything but 'QUERY' jobs.
		switch current.Configuration.JobType {
		case "LOAD":
			log.Printf("skipping job of LOAD type: %v\n", current.JobReference.JobId)
			continue
		case "COPY":
			log.Printf("skipping job of COPY type: %v\n", current.JobReference.JobId)
			continue
		case "EXTRACT":
			log.Printf("skipping job of EXTRACT type: %v\n", current.JobReference.JobId)
			continue
		case "UNKNOWN":
			log.Printf("skipping job of UNKNOWN type: %v\n", current.JobReference.JobId)
			continue
		case "QUERY":
			// Get query-specific stats on this job.
			stats := current.Statistics.Query

			// Find the latest statistics sample in the job timeline
			latestSnapshot := &bigquerySDK.QueryTimelineSample{}
			for _, sample := range stats.Timeline {
				if sample.ElapsedMs > latestSnapshot.ElapsedMs {
					latestSnapshot = sample
				}
			}

			runtimeMillis := latestSnapshot.ElapsedMs
			slotMillis := latestSnapshot.TotalSlotMs

			// Safely check stats before division
			if runtimeMillis == 0 || slotMillis == 0 {
				log.Printf("failed to get job runtime stats, skipping %v\n", job.JobReference.JobId)
				continue
			}
			// Compute slot usage by eliminating time
			slots := float64(slotMillis) / float64(runtimeMillis)

			// Double check if the job's reservation matches the one we are looking for
			fullReservationId := fmt.Sprintf("%s:%s.%s", project, reservation.Location, reservation.Name)
			if current.Statistics.ReservationId != fullReservationId {
				log.Printf("warn: detected missing reservation ID or mismatch: expected %s, found %s, on job %s", fullReservationId, current.Statistics.ReservationId, job.JobReference.JobId)
			}

			// All good. Push job down the channel.
			ch <- Job{
				Name:  current.Id,
				Usage: slots,
			}
		}
	}
}