cmd/receiver/receiver.go (92 lines of code) (raw):
// this is an receiver example connect to different tunnels
package main
import (
"errors"
"flag"
"fmt"
"os"
"strconv"
"syscall"
utils "github.com/alibaba/MongoShake/v2/common"
replayer "github.com/alibaba/MongoShake/v2/receiver"
conf "github.com/alibaba/MongoShake/v2/receiver/configure"
"github.com/alibaba/MongoShake/v2/tunnel"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
type Exit struct{ Code int }
func main() {
var err error
defer handleExit()
defer LOG.Close()
// argument options
configuration := flag.String("conf", "", "configure file absolute path")
verbose := flag.Int("verbose", 0, "where log goes to: 0 - file,1 - file+stdout,2 - stdout")
flag.Parse()
if *configuration == "" {
fmt.Println(utils.BRANCH)
panic(Exit{0})
}
var file *os.File
if file, err = os.Open(*configuration); err != nil {
crash(fmt.Sprintf("Configure file open failed. %v", err), -1)
}
configure := nimo.NewConfigLoader(file)
configure.SetDateFormat(utils.GolangSecurityTime)
if err := configure.Load(&conf.Options); err != nil {
crash(fmt.Sprintf("Configure file %s parse failed. %v", *configuration, err), -2)
}
// verify receiver options and revise
if err = sanitizeOptions(); err != nil {
crash(fmt.Sprintf("Conf.Options check failed: %s", err.Error()), -4)
}
if err := utils.InitialLogger(conf.Options.LogDirectory, conf.Options.LogFileName, conf.Options.LogLevel, conf.Options.LogFlush, *verbose); err != nil {
crash(fmt.Sprintf("initial log.dir[%v] log.name[%v] failed[%v].", conf.Options.LogDirectory,
conf.Options.LogFileName, err), -2)
}
nimo.Profiling(int(conf.Options.SystemProfilePort))
signalProfile, _ := strconv.Atoi(utils.SIGNALPROFILE)
signalStack, _ := strconv.Atoi(utils.SIGNALSTACK)
if signalProfile > 0 {
nimo.RegisterSignalForProfiling(syscall.Signal(signalProfile)) // syscall.SIGUSR2
nimo.RegisterSignalForPrintStack(syscall.Signal(signalStack), func(bytes []byte) { // syscall.SIGUSR1
LOG.Info(string(bytes))
})
}
startup()
select {}
}
func sanitizeOptions() error {
if conf.Options.Tunnel == "" {
return errors.New("tunnel is empty")
}
if len(conf.Options.TunnelAddress) == 0 {
return errors.New("tunnel address is illegal")
}
return nil
}
// this is the main connector function
func startup() {
factory := tunnel.ReaderFactory{Name: conf.Options.Tunnel}
reader := factory.Create(conf.Options.TunnelAddress)
if reader == nil {
return
}
/*
* create re-players, the number of re-players number is equal to the
* collector worker number to fulfill load balance. The tunnel that message
* sent to is determined in the collector side: `TMessage.Shard`.
*/
repList := make([]tunnel.Replayer, conf.Options.ReplayerNum)
for i := range repList {
repList[i] = replayer.NewExampleReplayer(i)
}
LOG.Info("receiver is starting...")
if err := reader.Link(repList); err != nil {
LOG.Critical("Replayer link to tunnel error %v", err)
return
}
}
func crash(msg string, errCode int) {
fmt.Println(msg)
panic(Exit{errCode})
}
func handleExit() {
if e := recover(); e != nil {
if exit, ok := e.(Exit); ok == true {
os.Exit(exit.Code)
}
panic(e)
}
}