in mockbroker.go [238:350]
func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {
defer wg.Done()
defer func() {
_ = conn.Close()
}()
s := spew.NewDefaultConfig()
s.MaxDepth = 1
Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
var err error
abort := make(chan none)
defer close(abort)
go func() {
select {
case <-b.closing:
_ = conn.Close()
case <-abort:
}
}()
var bytesWritten int
var bytesRead int
for {
buffer, err := b.readToBytes(conn)
if err != nil {
if !isConnectionClosedError(err) {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
b.serverError(err)
}
break
}
bytesWritten = 0
if !b.isGSSAPI(buffer) {
req, br, err := decodeRequest(bytes.NewReader(buffer))
bytesRead = br
if err != nil {
if !isConnectionClosedError(err) {
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
b.serverError(err)
}
break
}
if b.latency > 0 {
time.Sleep(b.latency)
}
b.lock.Lock()
res := b.handler(req)
b.history = append(b.history, RequestResponse{req.body, res})
b.lock.Unlock()
if res == nil {
Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
continue
}
Logger.Printf(
"*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
b.brokerID, idx, req.body, res,
s.Sprintf("%#v", req.body),
s.Sprintf("%#v", res),
)
encodedRes, err := encode(res, nil)
if err != nil {
b.serverError(fmt.Errorf("failed to encode %T - %w", res, err))
break
}
if len(encodedRes) == 0 {
b.lock.Lock()
if b.notifier != nil {
b.notifier(bytesRead, 0)
}
b.lock.Unlock()
continue
}
resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
if _, err = conn.Write(resHeader); err != nil {
b.serverError(err)
break
}
if _, err = conn.Write(encodedRes); err != nil {
b.serverError(err)
break
}
bytesWritten = len(resHeader) + len(encodedRes)
} else {
// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
b.lock.Lock()
res := b.gssApiHandler(buffer)
b.lock.Unlock()
if res == nil {
Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
continue
}
if _, err = conn.Write(res); err != nil {
b.serverError(err)
break
}
bytesWritten = len(res)
}
b.lock.Lock()
if b.notifier != nil {
b.notifier(bytesRead, bytesWritten)
}
b.lock.Unlock()
}
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}