func()

in go/protocol/listener.go [118:195]


func (l *listener[T]) handle(ctx context.Context, msg *message[T]) {
	// The very first check must be the version, because if we don't support it,
	// nothing else is trustworthy.
	ver := msg.Mqtt.UserProperties[constants.ProtocolVersion]
	if !version.IsSupported(ver, l.supportedVersion) {
		l.error(ctx, msg.Mqtt, &errors.Remote{
			Message: "request version not supported",
			Kind: errors.UnsupportedVersion{
				ProtocolVersion:                ver,
				SupportedMajorProtocolVersions: l.supportedVersion,
			},
		})
		return
	}

	msg.ClientID = msg.Mqtt.UserProperties[constants.SourceID]

	if l.reqCorrelation && len(msg.Mqtt.CorrelationData) == 0 {
		l.error(ctx, msg.Mqtt, &errors.Remote{
			Message: "correlation data missing",
			Kind: errors.HeaderMissing{
				HeaderName: constants.CorrelationData,
			},
		})
		return
	}
	if len(msg.Mqtt.CorrelationData) != 0 {
		correlationData, err := uuid.FromBytes(msg.Mqtt.CorrelationData)
		if err != nil {
			l.error(ctx, msg.Mqtt, &errors.Remote{
				Message: "correlation data is not a valid UUID",
				Kind: errors.HeaderInvalid{
					HeaderName: constants.CorrelationData,
				},
			})
			return
		}
		msg.CorrelationData = correlationData.String()
	}

	ts := msg.Mqtt.UserProperties[constants.Timestamp]
	if ts != "" {
		var err error
		msg.Timestamp, err = l.app.hlc.Parse(constants.Timestamp, ts)
		if err != nil {
			l.error(ctx, msg.Mqtt, &errors.Remote{
				Message: "timestamp is not a valid RFC3339 timestamp",
				Kind: errors.HeaderInvalid{
					HeaderName:  constants.Timestamp,
					HeaderValue: ts,
				},
			})
			return
		}
		if err = l.app.hlc.Set(msg.Timestamp); err != nil {
			l.error(ctx, msg.Mqtt, err)
			return
		}
	}

	msg.Metadata = make(map[string]string, len(msg.Mqtt.UserProperties))
	for key, val := range msg.Mqtt.UserProperties {
		if !strings.HasPrefix(key, constants.Protocol) {
			msg.Metadata[key] = val
		}
	}

	msg.Data = &Data{
		msg.Mqtt.Payload,
		msg.Mqtt.ContentType,
		msg.Mqtt.PayloadFormat,
	}

	if err := l.handler.onMsg(ctx, msg.Mqtt, &msg.Message); err != nil {
		l.error(ctx, msg.Mqtt, err)
		return
	}
}