in agent/session/plugin/client/client.go [510:618]
func (c *Client) writeLoop(wg *sync.WaitGroup) int {
defer wg.Done()
fname := "writeLoop"
buff := make([]byte, 2048)
rdfs := &goselect.FDSet{}
reader := io.ReadCloser(c.Input)
pr := NewEscapeProxy(reader, c.EscapeKeys)
defer reader.Close()
if c.idleTimeout > 0 {
log.GetLogger().Infof("Idle timeout %d seconds.", c.idleTimeout)
go func() {
// Init timer as 1 seconds,
timer := time.NewTimer(time.Second)
defer timer.Stop()
for {
select {
case <-c.poison:
return
case <-timer.C:
elapsedTime := int32(time.Since(c.startTimestamp).Seconds()) - c.lastDataTimestampOffset.Load()
if elapsedTime >= c.idleTimeout {
// Idle for too long
log.GetLogger().Infoln("Idle for too long, close client.")
c.SendCloseMessage()
openPoison(fname, c.poison)
}
timer.Reset(time.Duration(c.idleTimeout - elapsedTime) * time.Second)
}
}
}()
go func() {
// minimumIdleTimeout is 60s, so the period of sending keep-alive
// package is set to 60s, it does not need to be set too small.
// But it cannot exceed 180 seconds, because the Agent will
// disconnect if agent does not receive a data packet within 180s.
timer := time.NewTicker(time.Minute)
defer timer.Stop()
for {
select {
case <-c.poison:
return
case <-timer.C:
if err := c.SendKeepAliveDataMessage(); err != nil {
log.GetLogger().Error("Send keep alive package failed: ", err)
}
}
}
}()
}
for {
select {
case <-c.poison:
return die(fname, c.poison)
default:
}
rdfs.Zero()
rdfs.Set(reader.(exposeFd).Fd())
err := goselect.Select(1, rdfs, nil, nil, 50*time.Millisecond)
if err != nil {
// log.GetLogger().Errorf("get raw input failed: %v", err)
continue
// return openPoison(fname, c.poison)
}
if rdfs.IsSet(reader.(exposeFd).Fd()) {
size, err := pr.Read(buff)
if err != nil {
log.GetLogger().Infoln("err in input empty")
if err == io.EOF {
log.GetLogger().Infoln("EOF in input empty")
// Send EOF to GoTTY
// Send 'Input' marker, as defined in GoTTY::client_context.go,
// followed by EOT (a translation of Ctrl-D for terminals)
err = c.SendStreamDataMessage((append([]byte{}, byte(4))))
return openPoison(fname, c.poison)
continue
} else {
log.GetLogger().Errorln("err in input empty", err)
return openPoison(fname, c.poison)
}
}
if size <= 0 {
log.GetLogger().Infoln("user input empty")
continue
}
data := buff[:size]
if c.verbosemode {
log.GetLogger().Infoln("begin send user input:", string(data), size)
}
err = c.SendStreamDataMessage(data)
if err != nil {
return openPoison(fname, c.poison)
}
}
}
}