network/benchmarks/netperf/nptest/nptest.go (441 lines of code) (raw):
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
nptest.go
Dual-mode program - runs as both the orchestrator and as the worker nodes depending on command line flags
The RPC API is contained wholly within this file.
*/
package main
// Imports only base Golang packages
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"os/exec"
"strconv"
"sync"
"time"
)
type point struct {
mss int
bandwidth string
index int
}
var mode string
var port string
var host string
var worker string
var kubenode string
var podname string
var testFrom, testTo int
var workerStateMap map[string]*workerState
var dataPoints map[string][]point
var dataPointKeys []string
var datapointsFlushed bool
var active_tests []*TestCase
type Result struct {
Label string `json:"label"`
Result json.RawMessage `json:"result"`
}
var results []Result
func addResult(label, resultJson string) {
results = append(results, Result{
Label: label,
Result: json.RawMessage(resultJson),
})
}
var globalLock sync.Mutex
const (
workerMode = "worker"
orchestratorMode = "orchestrator"
iperf3Path = "/usr/local/bin/iperf3"
qperfPath = "/usr/local/bin/qperf"
netperfPath = "/usr/local/bin/netperf"
netperfServerPath = "/usr/local/bin/netserver"
outputCaptureFile = "/tmp/output.txt"
jsonDataMarker = "GENERATING JSON OUTPUT"
jsonEndDataMarker = "END JSON OUTPUT"
mssMin = 96
mssMax = 1460
mssStepSize = 64
msgSizeMax = 1 << 16
msgSizeMin = 1
parallelStreams = "8"
rpcServicePort = "5202"
iperf3SctpPort = "5004"
localhostIPv4Address = "127.0.0.1"
)
// NetPerfRPC service that exposes RegisterClient and ReceiveOutput for clients
type NetPerfRPC int
// ClientRegistrationData stores a data about a single client
type ClientRegistrationData struct {
Host string
KubeNode string
Worker string
IP string
}
// IperfClientWorkItem represents a single task for an Iperf client
type ClientWorkItem struct {
Host string
Port string
Params TestParams
}
// IperfServerWorkItem represents a single task for an Iperf server
type ServerWorkItem struct {
ListenPort string
Timeout int
}
// WorkItem represents a single task for a worker
type WorkItem struct {
IsClientItem bool
IsServerItem bool
IsIdle bool
TestCaseIndex int
ClientItem ClientWorkItem
ServerItem ServerWorkItem
}
type workerState struct {
sentServerItem bool
idle bool
IP string
worker string
}
// WorkerOutput stores the results from a single worker
type WorkerOutput struct {
TestCaseIndex int
Output string
Code int
Worker string
Type TestType
}
func init() {
flag.StringVar(&mode, "mode", "worker", "Mode for the daemon (worker | orchestrator)")
flag.StringVar(&port, "port", rpcServicePort, "Port to listen on (defaults to 5202)")
flag.IntVar(&testFrom, "testFrom", 0, "start from test number testFrom")
flag.IntVar(&testTo, "testTo", 5, "end at test number testTo")
workerStateMap = make(map[string]*workerState)
results = make([]Result, 0)
dataPoints = make(map[string][]point)
}
func initializeOutputFiles() {
fd, err := os.OpenFile(outputCaptureFile, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
fmt.Println("Failed to open output capture file", err)
os.Exit(2)
}
fd.Close()
}
func main() {
initializeOutputFiles()
flag.Parse()
if !validateParams() {
fmt.Println("Failed to parse cmdline args - fatal error - bailing out")
os.Exit(1)
}
grabEnv()
active_tests = make([]*TestCase, 0, testTo-testFrom)
active_tests = append(active_tests, testcases[testFrom:testTo]...)
fmt.Println("Running as", mode, "...")
if mode == orchestratorMode {
orchestrate()
} else {
startWork()
}
fmt.Println("Terminating npd")
}
func grabEnv() {
worker = os.Getenv("worker")
kubenode = os.Getenv("kubenode")
podname = os.Getenv("HOSTNAME")
}
func validateParams() bool {
if mode != workerMode && mode != orchestratorMode {
fmt.Println("Invalid mode", mode)
return false
}
if len(port) == 0 {
fmt.Println("Invalid port", port)
return false
}
if len(host) == 0 {
host = os.Getenv("NETPERF_ORCH_SERVICE_HOST")
}
return true
}
func allWorkersIdle() bool {
for _, v := range workerStateMap {
if !v.idle {
return false
}
}
return true
}
func writeOutputFile(filename, data string) {
fd, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
fmt.Println("Failed to append to existing file", filename, err)
return
}
defer fd.Close()
if _, err = fd.WriteString(data); err != nil {
fmt.Println("Failed to append to existing file", filename, err)
}
}
func registerDataPoint(label string, mss int, value string, index int) {
if sl, ok := dataPoints[label]; !ok {
dataPoints[label] = []point{{mss: mss, bandwidth: value, index: index}}
dataPointKeys = append(dataPointKeys, label)
} else {
dataPoints[label] = append(sl, point{mss: mss, bandwidth: value, index: index})
}
}
func flushDataPointsToCsv() {
var buffer string
// Write the MSS points for the X-axis before dumping all the testcase datapoints
for _, points := range dataPoints {
if len(points) == 1 {
continue
}
buffer = fmt.Sprintf("%-45s, Maximum,", "MSS")
for _, p := range points {
buffer = buffer + fmt.Sprintf(" %d,", p.mss)
}
break
}
fmt.Println(buffer)
for _, label := range dataPointKeys {
buffer = fmt.Sprintf("%-45s,", label)
points := dataPoints[label]
var result float64
for _, p := range points {
fv, _ := strconv.ParseFloat(p.bandwidth, 64)
if fv > result {
result = fv
}
}
buffer = buffer + fmt.Sprintf("%f,", result)
for _, p := range points {
buffer = buffer + fmt.Sprintf("%s,", p.bandwidth)
}
fmt.Println(buffer)
}
fmt.Println("END CSV DATA")
}
func flushResultJsonData() {
jsonData, err := json.MarshalIndent(results, "", " ")
if err != nil {
fmt.Println("Error generating JSON:", err)
return
}
fmt.Println(jsonDataMarker)
fmt.Println(string(jsonData))
fmt.Println(jsonEndDataMarker)
}
func serveRPCRequests(port string) {
baseObject := new(NetPerfRPC)
err := rpc.Register(baseObject)
if err != nil {
log.Fatal("failed to register rpc", err)
}
rpc.HandleHTTP()
listener, e := net.Listen("tcp", ":"+port)
if e != nil {
log.Fatal("listen error:", e)
}
err = http.Serve(listener, nil)
if err != nil {
log.Fatal("failed start server", err)
}
}
// Blocking RPC server start - only runs on the orchestrator
func orchestrate() {
serveRPCRequests(rpcServicePort)
}
// Walk the list of interfaces and find the first interface that has a valid IP
// Inside a container, there should be only one IP-enabled interface
func getMyIP() string {
ifaces, err := net.Interfaces()
if err != nil {
return localhostIPv4Address
}
for _, iface := range ifaces {
if iface.Flags&net.FlagLoopback == 0 {
addrs, _ := iface.Addrs()
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
return ip.String()
}
}
}
return "127.0.0.1"
}
func handleClientWorkItem(client *rpc.Client, workItem *WorkItem) {
testCase := active_tests[workItem.TestCaseIndex]
outputString := testCase.TestRunner(workItem.ClientItem)
var reply int
err := client.Call("NetPerfRPC.ReceiveOutput", WorkerOutput{Output: outputString, Worker: worker, Type: testCase.Type, TestCaseIndex: workItem.TestCaseIndex}, &reply)
if err != nil {
log.Fatal("failed to call client", err)
}
time.Sleep(10 * time.Second)
}
// isIPv6: Determines if an address is an IPv6 address
func isIPv6(address string) bool {
x := net.ParseIP(address)
return x != nil && x.To4() == nil && x.To16() != nil
}
// startWork : Entry point to the worker infinite loop
func startWork() {
for {
var client *rpc.Client
var err error
// Address recieved via command line
address := host
if isIPv6(address) {
address = "[" + address + "]"
}
for {
fmt.Println("Attempting to connect to orchestrator at", host)
client, err = rpc.DialHTTP("tcp", address+":"+port)
if err == nil {
break
}
fmt.Println("RPC connection to ", host, " failed:", err)
time.Sleep(5 * time.Second)
}
for {
clientData := ClientRegistrationData{Host: podname, KubeNode: kubenode, Worker: worker, IP: getMyIP()}
var workItem WorkItem
if err := client.Call("NetPerfRPC.RegisterClient", clientData, &workItem); err != nil {
// RPC server has probably gone away - attempt to reconnect
fmt.Println("Error attempting RPC call", err)
break
}
switch {
case workItem.IsIdle:
time.Sleep(5 * time.Second)
continue
case workItem.IsServerItem:
fmt.Println("Orchestrator requests worker run iperf and netperf servers")
go iperfServer()
go qperfServer()
go netperfServer()
time.Sleep(1 * time.Second)
case workItem.IsClientItem:
handleClientWorkItem(client, &workItem)
}
}
}
}
// Invoke and indefinitely run an iperf server
func iperfServer() {
output, _ := cmdExec(iperf3Path, []string{iperf3Path, "-s", host, "-J", "-i", "60", "-D"}, 15)
fmt.Println(output)
}
// Invoke and indefinitely run an qperf server
func qperfServer() {
output, success := cmdExec(qperfPath, []string{qperfPath}, 15)
if success {
fmt.Println(output)
}
}
// Invoke and indefinitely run netperf server
func netperfServer() {
output, success := cmdExec(netperfServerPath, []string{netperfServerPath, "-D"}, 15)
if success {
fmt.Println(output)
}
}
func cmdExec(command string, args []string, _ int32) (rv string, rc bool) {
cmd := exec.Cmd{Path: command, Args: args}
var stdoutput bytes.Buffer
var stderror bytes.Buffer
cmd.Stdout = &stdoutput
cmd.Stderr = &stderror
if err := cmd.Run(); err != nil {
outputstr := stdoutput.String()
errstr := stderror.String()
fmt.Println("Failed to run", outputstr, "error:", errstr, err)
return
}
rv = stdoutput.String()
rc = true
return
}
func (t *NetPerfRPC) ReceiveOutput(data *WorkerOutput, _ *int) error {
globalLock.Lock()
defer globalLock.Unlock()
fmt.Println("ReceiveOutput WorkItem TestCaseIndex: ", data.TestCaseIndex)
testcase := active_tests[data.TestCaseIndex]
outputLog := fmt.Sprintln("Received output from worker", data.Worker, "for test", testcase.Label,
"from", testcase.SourceNode, "to", testcase.DestinationNode) + data.Output
writeOutputFile(outputCaptureFile, outputLog)
if testcase.BandwidthParser != nil {
bw, mss := testcase.BandwidthParser(data.Output)
registerDataPoint(testcase.Label, mss, fmt.Sprintf("%f", bw), data.TestCaseIndex)
fmt.Println("Jobdone from worker", data.Worker, "Bandwidth was", bw, "Mbits/sec")
}
if testcase.JsonParser != nil {
addResult(
fmt.Sprintf("%s with MSS: %d", testcase.Label, testcase.MSS-mssStepSize),
testcase.JsonParser(data.Output),
)
fmt.Println("Jobdone from worker", data.Worker, "JSON output generated")
}
return nil
}
func (t *NetPerfRPC) RegisterClient(data ClientRegistrationData, workItem *WorkItem) error {
globalLock.Lock()
defer globalLock.Unlock()
state, ok := workerStateMap[data.Worker]
if !ok {
// For new clients, trigger an iperf server start immediately
state = &workerState{sentServerItem: true, idle: true, IP: data.IP, worker: data.Worker}
workerStateMap[data.Worker] = state
workItem.IsServerItem = true
workItem.ServerItem.ListenPort = "5201"
workItem.ServerItem.Timeout = 3600
return nil
}
// Worker defaults to idle unless the allocateWork routine below assigns an item
state.idle = true
// Give the worker a new work item or let it idle loop another 5 seconds
allocateWorkToClient(state, workItem)
return nil
}
func allocateWorkToClient(workerState *workerState, workItem *WorkItem) {
if !allWorkersIdle() {
workItem.IsIdle = true
return
}
// System is all idle - pick up next work item to allocate to client
for n, v := range active_tests {
if v.Finished {
continue
}
if v.SourceNode != workerState.worker {
workItem.IsIdle = true
return
}
if _, ok := workerStateMap[v.DestinationNode]; !ok {
workItem.IsIdle = true
return
}
fmt.Printf("Requesting jobrun '%s' from %s to %s for MSS %d for MsgSize %d\n", v.Label, v.SourceNode, v.DestinationNode, v.MSS, v.MsgSize)
workItem.IsClientItem = true
workItem.TestCaseIndex = n
workerState.idle = false
if !v.ClusterIP {
workItem.ClientItem.Host = workerStateMap[workerState.worker].IP
} else {
workItem.ClientItem.Host = os.Getenv("NETPERF_W2_SERVICE_HOST")
}
workItem.ClientItem.Params = v.TestParams
if v.MSS != 0 && v.MSS < mssMax {
v.MSS += mssStepSize
} else {
v.Finished = true
}
if v.Type == netperfTest {
workItem.ClientItem.Port = "12865"
} else {
workItem.ClientItem.Port = "5201"
}
return
}
for _, v := range active_tests {
if !v.Finished {
return
}
}
if !datapointsFlushed {
fmt.Println("ALL TESTCASES AND MSS RANGES COMPLETE - GENERATING CSV OUTPUT")
flushDataPointsToCsv()
flushResultJsonData()
datapointsFlushed = true
}
workItem.IsIdle = true
}