in bedrock/bedrock.go [56:149]
func (e *eventstreamDecoder) Next() bool {
if e.err != nil {
return false
}
msg, err := e.Decoder.Decode(e.rc, nil)
if err != nil {
e.err = err
return false
}
messageType := msg.Headers.Get(eventstreamapi.MessageTypeHeader)
if messageType == nil {
e.err = fmt.Errorf("%s event header not present", eventstreamapi.MessageTypeHeader)
return false
}
switch messageType.String() {
case eventstreamapi.EventMessageType:
eventType := msg.Headers.Get(eventstreamapi.EventTypeHeader)
if eventType == nil {
e.err = fmt.Errorf("%s event header not present", eventstreamapi.EventTypeHeader)
return false
}
if eventType.String() == "chunk" {
chunk := eventstreamChunk{}
err = json.Unmarshal(msg.Payload, &chunk)
if err != nil {
e.err = err
return false
}
decoded, err := base64.StdEncoding.DecodeString(chunk.Bytes)
if err != nil {
e.err = err
return false
}
e.evt = ssestream.Event{
Type: gjson.GetBytes(decoded, "type").String(),
Data: decoded,
}
}
case eventstreamapi.ExceptionMessageType:
// See https://github.com/aws/aws-sdk-go-v2/blob/885de40869f9bcee29ad11d60967aa0f1b571d46/service/iotsitewise/deserializers.go#L15511C1-L15567C2
exceptionType := msg.Headers.Get(eventstreamapi.ExceptionTypeHeader)
if exceptionType == nil {
e.err = fmt.Errorf("%s event header not present", eventstreamapi.ExceptionTypeHeader)
return false
}
// See https://github.com/aws/aws-sdk-go-v2/blob/885de40869f9bcee29ad11d60967aa0f1b571d46/aws/protocol/restjson/decoder_util.go#L15-L48k
var errInfo struct {
Code string
Type string `json:"__type"`
Message string
}
err = json.Unmarshal(msg.Payload, &errInfo)
if err != nil && err != io.EOF {
e.err = fmt.Errorf("received exception %s: parsing exception payload failed: %w", exceptionType.String(), err)
return false
}
errorCode := "UnknownError"
errorMessage := errorCode
if ev := exceptionType.String(); len(ev) > 0 {
errorCode = ev
} else if len(errInfo.Code) > 0 {
errorCode = errInfo.Code
} else if len(errInfo.Type) > 0 {
errorCode = errInfo.Type
}
if len(errInfo.Message) > 0 {
errorMessage = errInfo.Message
}
e.err = fmt.Errorf("received exception %s: %s", errorCode, errorMessage)
return false
case eventstreamapi.ErrorMessageType:
errorCode := "UnknownError"
errorMessage := errorCode
if header := msg.Headers.Get(eventstreamapi.ErrorCodeHeader); header != nil {
errorCode = header.String()
}
if header := msg.Headers.Get(eventstreamapi.ErrorMessageHeader); header != nil {
errorMessage = header.String()
}
e.err = fmt.Errorf("received error %s: %s", errorCode, errorMessage)
return false
}
return true
}