in go/protocol/listener.go [118:195]
func (l *listener[T]) handle(ctx context.Context, msg *message[T]) {
// The very first check must be the version, because if we don't support it,
// nothing else is trustworthy.
ver := msg.Mqtt.UserProperties[constants.ProtocolVersion]
if !version.IsSupported(ver, l.supportedVersion) {
l.error(ctx, msg.Mqtt, &errors.Remote{
Message: "request version not supported",
Kind: errors.UnsupportedVersion{
ProtocolVersion: ver,
SupportedMajorProtocolVersions: l.supportedVersion,
},
})
return
}
msg.ClientID = msg.Mqtt.UserProperties[constants.SourceID]
if l.reqCorrelation && len(msg.Mqtt.CorrelationData) == 0 {
l.error(ctx, msg.Mqtt, &errors.Remote{
Message: "correlation data missing",
Kind: errors.HeaderMissing{
HeaderName: constants.CorrelationData,
},
})
return
}
if len(msg.Mqtt.CorrelationData) != 0 {
correlationData, err := uuid.FromBytes(msg.Mqtt.CorrelationData)
if err != nil {
l.error(ctx, msg.Mqtt, &errors.Remote{
Message: "correlation data is not a valid UUID",
Kind: errors.HeaderInvalid{
HeaderName: constants.CorrelationData,
},
})
return
}
msg.CorrelationData = correlationData.String()
}
ts := msg.Mqtt.UserProperties[constants.Timestamp]
if ts != "" {
var err error
msg.Timestamp, err = l.app.hlc.Parse(constants.Timestamp, ts)
if err != nil {
l.error(ctx, msg.Mqtt, &errors.Remote{
Message: "timestamp is not a valid RFC3339 timestamp",
Kind: errors.HeaderInvalid{
HeaderName: constants.Timestamp,
HeaderValue: ts,
},
})
return
}
if err = l.app.hlc.Set(msg.Timestamp); err != nil {
l.error(ctx, msg.Mqtt, err)
return
}
}
msg.Metadata = make(map[string]string, len(msg.Mqtt.UserProperties))
for key, val := range msg.Mqtt.UserProperties {
if !strings.HasPrefix(key, constants.Protocol) {
msg.Metadata[key] = val
}
}
msg.Data = &Data{
msg.Mqtt.Payload,
msg.Mqtt.ContentType,
msg.Mqtt.PayloadFormat,
}
if err := l.handler.onMsg(ctx, msg.Mqtt, &msg.Message); err != nil {
l.error(ctx, msg.Mqtt, err)
return
}
}