func()

in conn.go [643:737]


func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
	// not safe for concurrent reads

	// read a full header, ignore timeouts, as this is being ran in a loop
	// TODO: TCP level deadlines? or just query level deadlines?
	if c.r.GetTimeout() > 0 {
		c.r.SetReadDeadline(time.Time{})
	}

	headStartTime := time.Now()
	// were just reading headers over and over and copy bodies
	head, err := readHeader(r, c.headerBuf[:])
	headEndTime := time.Now()
	if err != nil {
		return err
	}

	if c.frameObserver != nil {
		c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
			Version: protoVersion(head.version),
			Flags:   head.flags,
			Stream:  int16(head.stream),
			Opcode:  frameOp(head.op),
			Length:  int32(head.length),
			Start:   headStartTime,
			End:     headEndTime,
			Host:    c.host,
		})
	}

	if head.stream > c.streams.NumStreams {
		return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.stream)
	} else if head.stream == -1 {
		// TODO: handle cassandra event frames, we shouldnt get any currently
		framer := newFramer(c.compressor, c.version)
		if err := framer.readFrame(r, &head); err != nil {
			return err
		}
		go c.session.handleEvent(framer)
		return nil
	} else if head.stream <= 0 {
		// reserved stream that we dont use, probably due to a protocol error
		// or a bug in Cassandra, this should be an error, parse it and return.
		framer := newFramer(c.compressor, c.version)
		if err := framer.readFrame(r, &head); err != nil {
			return err
		}

		frame, err := framer.parseFrame()
		if err != nil {
			return err
		}

		return &protocolError{
			frame: frame,
		}
	}

	c.mu.Lock()
	if c.closed {
		c.mu.Unlock()
		return ErrConnectionClosed
	}
	call, ok := c.calls[head.stream]
	delete(c.calls, head.stream)
	c.mu.Unlock()
	if call == nil || !ok {
		c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
		return c.discardFrame(r, head)
	} else if head.stream != call.streamID {
		panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
	}

	framer := newFramer(c.compressor, c.version)

	err = framer.readFrame(r, &head)
	if err != nil {
		// only net errors should cause the connection to be closed. Though
		// cassandra returning corrupt frames will be returned here as well.
		if _, ok := err.(net.Error); ok {
			return err
		}
	}

	// we either, return a response to the caller, the caller timedout, or the
	// connection has closed. Either way we should never block indefinatly here
	select {
	case call.resp <- callResp{framer: framer, err: err}:
	case <-call.timeout:
		c.releaseStream(call)
	case <-ctx.Done():
	}

	return nil
}