func()

in mockbroker.go [238:350]


func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.WaitGroup) {
	defer wg.Done()
	defer func() {
		_ = conn.Close()
	}()
	s := spew.NewDefaultConfig()
	s.MaxDepth = 1
	Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
	var err error

	abort := make(chan none)
	defer close(abort)
	go func() {
		select {
		case <-b.closing:
			_ = conn.Close()
		case <-abort:
		}
	}()

	var bytesWritten int
	var bytesRead int
	for {
		buffer, err := b.readToBytes(conn)
		if err != nil {
			if !isConnectionClosedError(err) {
				Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
				b.serverError(err)
			}
			break
		}

		bytesWritten = 0
		if !b.isGSSAPI(buffer) {
			req, br, err := decodeRequest(bytes.NewReader(buffer))
			bytesRead = br
			if err != nil {
				if !isConnectionClosedError(err) {
					Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
					b.serverError(err)
				}
				break
			}

			if b.latency > 0 {
				time.Sleep(b.latency)
			}

			b.lock.Lock()
			res := b.handler(req)
			b.history = append(b.history, RequestResponse{req.body, res})
			b.lock.Unlock()

			if res == nil {
				Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
				continue
			}
			Logger.Printf(
				"*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
				b.brokerID, idx, req.body, res,
				s.Sprintf("%#v", req.body),
				s.Sprintf("%#v", res),
			)

			encodedRes, err := encode(res, nil)
			if err != nil {
				b.serverError(fmt.Errorf("failed to encode %T - %w", res, err))
				break
			}
			if len(encodedRes) == 0 {
				b.lock.Lock()
				if b.notifier != nil {
					b.notifier(bytesRead, 0)
				}
				b.lock.Unlock()
				continue
			}

			resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
			if _, err = conn.Write(resHeader); err != nil {
				b.serverError(err)
				break
			}
			if _, err = conn.Write(encodedRes); err != nil {
				b.serverError(err)
				break
			}
			bytesWritten = len(resHeader) + len(encodedRes)
		} else {
			// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
			// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
			b.lock.Lock()
			res := b.gssApiHandler(buffer)
			b.lock.Unlock()
			if res == nil {
				Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
				continue
			}
			if _, err = conn.Write(res); err != nil {
				b.serverError(err)
				break
			}
			bytesWritten = len(res)
		}

		b.lock.Lock()
		if b.notifier != nil {
			b.notifier(bytesRead, bytesWritten)
		}
		b.lock.Unlock()
	}
	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}