api/wikipediapageview/get_job.go (131 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package wikipediapageview import ( "net/http" "time" "cloud.google.com/go/bigquery" "github.com/go-chi/chi/v5" "github.com/rs/zerolog/log" "google.golang.org/api/iterator" hd "github.com/GoogleCloudPlatform/appengine-cloud-demo-portal/api/pkg/handler" ) type getJobResponse struct { Completed bool `json:"completed"` Error string `json:"error,omitempty"` Config *config `json:"config"` Statistics *statistics `json:"statistics,omitempty"` Results [][]bigquery.Value `json:"results,omitempty"` } type statistics struct { StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` TotalBytesProcessed int64 `json:"total_bytes_processed"` CacheHit bool `json:"cache_hit"` InputRows int64 `json:"input_rows"` } type config struct { Query string `json:"query"` QueryCache bool `json:"query_cache"` Parameters []bigquery.QueryParameter `json:"parameters"` } func (h *handler) getJob(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.Ctx(ctx) jobID := chi.URLParam(r, "jobID") job, err := h.Bigquery.JobFromID(ctx, jobID) if err != nil { hd.RespondErrorJSON(w, r, hd.Errorf(ctx, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "failed to get job: %w", err)) return } logger.Debug().Msgf("job: %+v", job) js, err := job.Status(ctx) if err != nil { hd.RespondErrorJSON(w, r, hd.Errorf(ctx, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "failed to get job status: %w", err)) return } logger.Debug().Msgf("js: %+v", job) jc, err := job.Config() if err != nil { hd.RespondErrorJSON(w, r, hd.Errorf(ctx, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "failed to get job config: %w", err)) return } logger.Debug().Msgf("jc: %+v", job) qc, ok := jc.(*bigquery.QueryConfig) if !ok { hd.RespondErrorMessage(w, r, http.StatusBadRequest, "job id must be id of query job") return } logger.Debug().Msgf("qc: %+v", job) res := &getJobResponse{ Completed: js.Done(), Config: &config{ Query: qc.Q, QueryCache: !qc.DisableQueryCache, Parameters: qc.Parameters, }, } if !js.Done() { hd.RespondJSON(w, r, http.StatusOK, res) return } if js.Err() != nil { res.Error = js.Err().Error() hd.RespondJSON(w, r, http.StatusOK, res) return } qs, ok := js.Statistics.Details.(*bigquery.QueryStatistics) if !ok { hd.RespondErrorMessage(w, r, http.StatusBadRequest, "job id must be id of query job") return } logger.Debug().Msgf("qc: %+v", job) res.Statistics = &statistics{ StartTime: js.Statistics.StartTime, EndTime: js.Statistics.EndTime, TotalBytesProcessed: js.Statistics.TotalBytesProcessed, CacheHit: qs.CacheHit, } logger.Debug().Msgf("qs.QueryPlan: %+v", qs.QueryPlan) if qs.QueryPlan != nil && len(qs.QueryPlan) > 0 { res.Statistics.InputRows = qs.QueryPlan[0].RecordsRead } else { res.Statistics.InputRows = 0 } it, err := job.Read(ctx) if err != nil { hd.RespondErrorJSON(w, r, hd.Errorf(ctx, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "failed to read job: %w", err)) return } results := [][]bigquery.Value{} for { var row []bigquery.Value err := it.Next(&row) if err == iterator.Done { break } else if err != nil { hd.RespondErrorJSON(w, r, hd.Errorf(ctx, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError), "failed to get : %w", err)) return } results = append(results, row) } res.Results = results hd.RespondJSON(w, r, http.StatusOK, res) }