func()

in bedrock/bedrock.go [56:149]


func (e *eventstreamDecoder) Next() bool {
	if e.err != nil {
		return false
	}

	msg, err := e.Decoder.Decode(e.rc, nil)
	if err != nil {
		e.err = err
		return false
	}

	messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader)
	if messageType == nil {
		e.err = fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
		return false
	}

	switch messageType.String() {
	case eventstreamapi.EventMessageType:
		eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader)
		if eventType == nil {
			e.err = fmt.Errorf("%s event header not present", eventstreamapi.EventTypeHeader)
			return false
		}

		if eventType.String() == "chunk" {
			chunk := eventstreamChunk{}
			err = json.Unmarshal(msg.Payload, &chunk)
			if err != nil {
				e.err = err
				return false
			}
			decoded, err := base64.StdEncoding.DecodeString(chunk.Bytes)
			if err != nil {
				e.err = err
				return false
			}
			e.evt = ssestream.Event{
				Type: gjson.GetBytes(decoded, "type").String(),
				Data: decoded,
			}
		}

	case eventstreamapi.ExceptionMessageType:
		// See https://github.com/aws/aws-sdk-go-v2/blob/885de40869f9bcee29ad11d60967aa0f1b571d46/service/iotsitewise/deserializers.go#L15511C1-L15567C2
		exceptionType := msg.Headers.Get(eventstreamapi.ExceptionTypeHeader)
		if exceptionType == nil {
			e.err = fmt.Errorf("%s event header not present", eventstreamapi.ExceptionTypeHeader)
			return false
		}

		// See https://github.com/aws/aws-sdk-go-v2/blob/885de40869f9bcee29ad11d60967aa0f1b571d46/aws/protocol/restjson/decoder_util.go#L15-L48k
		var errInfo struct {
			Code    string
			Type    string `json:"__type"`
			Message string
		}
		err = json.Unmarshal(msg.Payload, &errInfo)
		if err != nil && err != io.EOF {
			e.err = fmt.Errorf("received exception %s: parsing exception payload failed: %w", exceptionType.String(), err)
			return false
		}

		errorCode := "UnknownError"
		errorMessage := errorCode
		if ev := exceptionType.String(); len(ev) > 0 {
			errorCode = ev
		} else if len(errInfo.Code) > 0 {
			errorCode = errInfo.Code
		} else if len(errInfo.Type) > 0 {
			errorCode = errInfo.Type
		}

		if len(errInfo.Message) > 0 {
			errorMessage = errInfo.Message
		}
		e.err = fmt.Errorf("received exception %s: %s", errorCode, errorMessage)
		return false

	case eventstreamapi.ErrorMessageType:
		errorCode := "UnknownError"
		errorMessage := errorCode
		if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil {
			errorCode = header.String()
		}
		if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil {
			errorMessage = header.String()
		}
		e.err = fmt.Errorf("received error %s: %s", errorCode, errorMessage)
		return false
	}

	return true
}