benchmark/server/main.go (133 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package main import ( "encoding/binary" "errors" "flag" "fmt" "log" "net" "net/http" _ "net/http/pprof" "os" "os/signal" "syscall" "time" ) import ( gxsync "github.com/dubbogo/gost/sync" ) import ( getty "github.com/apache/dubbo-getty" ) var ( taskPoolMode = flag.Bool("taskPool", false, "task pool mode") taskPoolSize = flag.Int("task_pool_size", 2000, "task poll size") pprofPort = flag.Int("pprof_port", 65432, "pprof http port") ) var taskPool gxsync.GenericTaskPool const CronPeriod = 20e9 func main() { flag.Parse() go func() { http.ListenAndServe(fmt.Sprintf(":%d", *pprofPort), nil) }() options := []getty.ServerOption{getty.WithLocalAddress(":8090")} if *taskPoolMode { taskPool = gxsync.NewTaskPoolSimple(*taskPoolSize) options = append(options, getty.WithServerTaskPool(taskPool)) } server := getty.NewTCPServer(options...) go server.RunEventLoop(NewHelloServerSession) log.Printf("getty server start, listening at: 8090") signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) <-signals server.Close() } func NewHelloServerSession(session getty.Session) (err error) { pkgHandler := &PackageHandler{} EventListener := &MessageHandler{} tcpConn, ok := session.Conn().(*net.TCPConn) if !ok { panic(fmt.Sprintf("newSession: %s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn())) } if err = tcpConn.SetNoDelay(true); err != nil { return err } if err = tcpConn.SetKeepAlive(true); err != nil { return err } if err = tcpConn.SetKeepAlivePeriod(10 * time.Second); err != nil { return err } if err = tcpConn.SetReadBuffer(262144); err != nil { return err } if err = tcpConn.SetWriteBuffer(524288); err != nil { return err } session.SetName("hello") session.SetMaxMsgLen(128 * 1024) // max message package length is 128k session.SetReadTimeout(time.Second) session.SetWriteTimeout(5 * time.Second) session.SetCronPeriod(int(CronPeriod / 1e6)) session.SetWaitTime(time.Second) session.SetPkgHandler(pkgHandler) session.SetEventListener(EventListener) return nil } type MessageHandler struct { SessionOnOpen func(session getty.Session) } func (h *MessageHandler) OnOpen(session getty.Session) error { log.Printf("OnOpen session{%s} open", session.Stat()) if h.SessionOnOpen != nil { h.SessionOnOpen(session) } return nil } func (h *MessageHandler) OnError(session getty.Session, err error) { log.Printf("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err) } func (h *MessageHandler) OnClose(session getty.Session) { log.Printf("OnClose session{%s} is closing......", session.Stat()) } func (h *MessageHandler) OnMessage(session getty.Session, pkg interface{}) { log.Printf("OnMessage....") //s, ok := pkg.(string) //if !ok { // log.Printf("illegal package{%#v}", pkg) // return //} //log.Printf("OnMessage: %s", s) } func (h *MessageHandler) OnCron(session getty.Session) { log.Printf("OnCron....") } type PackageHandler struct{} func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) { dataLen := len(data) if dataLen < 4 { return nil, 0, nil } start := 0 pos := start + 4 pkgLen := int(binary.LittleEndian.Uint32(data[start:pos])) if dataLen < pos+pkgLen { return nil, pos + pkgLen, nil } start = pos pos = start + pkgLen s := string(data[start:pos]) return s, pos, nil } func (h *PackageHandler) Write(ss getty.Session, p interface{}) ([]byte, error) { pkg, ok := p.(string) if !ok { log.Printf("illegal pkg:%+v", p) return nil, errors.New("invalid package") } pkgLen := int32(len(pkg)) pkgStreams := make([]byte, 0, 4+len(pkg)) // pkg len start := 0 pos := start + 4 binary.LittleEndian.PutUint32(pkgStreams[start:pos], uint32(pkgLen)) start = pos // pkg pos = start + int(pkgLen) copy(pkgStreams[start:pos], pkg[:]) return pkgStreams[:pos], nil }