runner/server.go (135 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package runner
import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"sync"
"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/runner/group"
"github.com/Azure/kperf/runner/localstore"
"github.com/gorilla/mux"
)
// Server is to deploy runner groups and expose endpoints for runner report.
type Server struct {
store *localstore.Store
listeners []net.Listener
groups []*group.Handler
readyCh chan struct{}
report *types.RunnerMetricReport
}
// NewServer returns new instance of server.
func NewServer(dataDir string, addrs []string, groups ...*group.Handler) (*Server, error) {
s, err := localstore.NewStore(dataDir)
if err != nil {
return nil, err
}
listeners, err := buildNetListeners(addrs)
if err != nil {
return nil, err
}
return &Server{
listeners: listeners,
groups: groups,
store: s,
readyCh: make(chan struct{}),
}, nil
}
// Run is to expose endpoints.
func (s *Server) Run() error {
if err := s.deployRunnerGroups(); err != nil {
return fmt.Errorf("failed to deploy runner group %w", err)
}
go s.waitForRunnerGroups()
r := mux.NewRouter()
// NOTE: Please update ./runnergroup_list.go if endpoint has been changed.
r.HandleFunc("/v1/runnergroups", s.listRunnerGroupsHandler).Methods("GET")
// NOTE: Please update ./runnergroup_result.go if endpoint has been changed.
r.HandleFunc("/v1/runnergroups/summary", s.getRunnerGroupsSummary).Methods("GET")
r.HandleFunc("/v1/runnergroups/{runner_name}/result", s.postRunnerGroupsRunnerResult).Methods("POST")
errCh := make(chan error, len(s.listeners))
var wg sync.WaitGroup
for _, lis := range s.listeners {
wg.Add(1)
go func(l net.Listener) {
defer wg.Done()
//nolint:gosec
errCh <- http.Serve(l, r)
}(lis)
}
wg.Wait()
for err := range errCh {
if err != nil {
return err
}
}
return nil
}
// listRunnerGroupsHandler lists all the runner groups.
func (s *Server) listRunnerGroupsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
res := make([]*types.RunnerGroup, 0, len(s.groups))
for _, g := range s.groups {
res = append(res, g.Info(ctx))
}
data, _ := json.Marshal(res)
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
}
// getRunnerGroupsSummary returns summary report.
func (s *Server) getRunnerGroupsSummary(w http.ResponseWriter, r *http.Request) {
wait := r.URL.Query().Has("wait")
select {
case <-s.readyCh:
default:
if !wait {
renderErrorResponse(w, http.StatusNotFound, fmt.Errorf("summary is not ready"))
return
}
}
ctx := r.Context()
select {
case <-s.readyCh:
case <-ctx.Done():
renderErrorResponse(w, http.StatusRequestTimeout, fmt.Errorf("request has been canceled"))
return
}
data, _ := json.Marshal(s.report)
w.WriteHeader(http.StatusOK)
_, _ = w.Write(data)
}
// postRunnerGroupsRunnerResult receives summary result from runner.
func (s *Server) postRunnerGroupsRunnerResult(w http.ResponseWriter, r *http.Request) {
runnerName := mux.Vars(r)["runner_name"]
ctx := r.Context()
var found = false
var err error
for _, g := range s.groups {
found, err = g.IsControlled(ctx, runnerName)
if err != nil {
renderErrorResponse(w, http.StatusInternalServerError, err)
return
}
if found {
break
}
}
if !found {
renderErrorResponse(w, http.StatusNotFound, fmt.Errorf("no such runner %s", runnerName))
return
}
writer, err := s.store.OpenWriter()
if err != nil {
renderErrorResponse(w, http.StatusInternalServerError, err)
return
}
defer writer.Close()
_, err = io.Copy(writer, r.Body)
if err != nil {
renderErrorResponse(w, http.StatusInternalServerError, err)
return
}
err = writer.Commit(runnerName)
if err != nil {
code := http.StatusInternalServerError
if errors.Is(err, localstore.ErrAlreadyExists) {
code = http.StatusConflict
}
renderErrorResponse(w, code, err)
return
}
w.WriteHeader(http.StatusCreated)
}