worker.go (167 lines of code) (raw):
// Copyright 2010 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"net/http"
"os"
"sort"
"text/tabwriter"
"time"
"github.com/GoogleCloudPlatform/gcping/internal/config"
)
type input struct {
region string
endpoint string
}
func (i *input) HTTP() output {
return i.benchmark(func() error {
req, _ := http.NewRequest("GET", i.endpoint+"/api/ping", nil)
req.Header.Add("User-Agent", "GCPing-CLI")
res, err := client.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("status code: %v", res.StatusCode)
}
return nil
})
}
func (i *input) benchmark(fn func() error) output {
if verbose {
fmt.Printf("Pinging %q\n", i.region)
}
start := time.Now()
err := fn()
duration := time.Since(start)
o := output{
region: i.region,
durations: []time.Duration{duration},
}
if err != nil {
o.errors++
}
if verbose {
fmt.Printf("Ping to %q completed in %v\n", i.region, duration)
}
if csv {
fmt.Printf("%v,%v,%v,%v\n", i.region, i.endpoint, duration.Nanoseconds(), err != nil)
}
return o
}
type output struct {
region string
durations []time.Duration
errors int
med time.Duration // median of durations; calculated on first call to median()
}
func (o *output) median() time.Duration {
if o.med == 0 {
// Sort durations and pick the middle one.
sort.Slice(o.durations, func(i, j int) bool {
return o.durations[i] < o.durations[j]
})
o.med = o.durations[len(o.durations)/2]
}
return o.med
}
type worker struct {
inputs chan input
outputs chan output
}
func (w *worker) start() {
for worker := 0; worker < concurrency; worker++ {
go func() {
for m := range w.inputs {
o := m.HTTP()
w.outputs <- o
}
}()
}
}
func (w *worker) sortOutput(em map[string]config.Endpoint) []output {
m := make(map[string]output)
for i := 0; i < w.size(em, region); i++ {
o := <-w.outputs
a := m[o.region]
a.region = o.region
a.durations = append(a.durations, o.durations[0])
a.errors += o.errors
m[o.region] = a
}
all := make([]output, 0, len(m))
for _, t := range m {
all = append(all, t)
}
// sort all by median duration.
sort.Slice(all, func(i, j int) bool {
return all[i].median() < all[j].median()
})
return all
}
func (w *worker) reportAll(em map[string]config.Endpoint) {
w.inputs = make(chan input, concurrency)
w.outputs = make(chan output, w.size(em, region))
for i := 0; i < number; i++ {
for r, e := range em {
w.inputs <- input{region: r, endpoint: e.URL}
}
}
close(w.inputs)
sorted := w.sortOutput(em)
tr := tabwriter.NewWriter(os.Stdout, 3, 2, 2, ' ', 0)
for i, a := range sorted {
fmt.Fprintf(tr, "%2d.\t[%v]\t%v", i+1, a.region, a.median())
if a.errors > 0 {
fmt.Fprintf(tr, "\t(%d errors)", a.errors)
}
fmt.Fprintln(tr)
}
tr.Flush()
}
func (w *worker) reportCSV(em map[string]config.Endpoint) {
w.inputs = make(chan input, concurrency)
w.outputs = make(chan output, w.size(em, region))
for i := 0; i < number; i++ {
for r, e := range em {
w.inputs <- input{region: r, endpoint: e.URL}
}
}
close(w.inputs)
sorted := w.sortOutput(em)
fmt.Println("region,latency_ns,errors")
for _, a := range sorted {
fmt.Printf("%v,%v,%v\n", a.region, a.median().Nanoseconds(), a.errors)
}
}
func (w *worker) reportTop(em map[string]config.Endpoint) {
w.inputs = make(chan input, concurrency)
w.outputs = make(chan output, w.size(em, region))
for i := 0; i < number; i++ {
for r, e := range em {
w.inputs <- input{region: r, endpoint: e.URL}
}
}
close(w.inputs)
sorted := w.sortOutput(em)
t := sorted[0].region
if t == "global" {
t = sorted[1].region
}
fmt.Println(t)
return
}
func (w *worker) reportRegion(em map[string]config.Endpoint, region string) {
w.inputs = make(chan input, concurrency)
w.outputs = make(chan output, w.size(em, region))
for i := 0; i < number; i++ {
e, _ := em[region]
w.inputs <- input{region: region, endpoint: e.URL}
}
close(w.inputs)
sorted := w.sortOutput(em)
fmt.Println(sorted[0].median())
}
func (w *worker) size(em map[string]config.Endpoint, region string) int {
if region != "" {
return number
}
return number * len(em)
}