in rocketmq-knative/source/pkg/controller/sinks/sinks.go [33:66]
func GetSinkURI(ctx context.Context, c client.Client, sink *corev1.ObjectReference, namespace string) (string, error) {
if sink == nil {
return "", fmt.Errorf("sink ref is nil")
}
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(sink.GroupVersionKind())
err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: sink.Name}, u)
if err != nil {
return "", err
}
objIdentifier := fmt.Sprintf("\"%s/%s\" (%s)", u.GetNamespace(), u.GetName(), u.GroupVersionKind())
// Special case v1/Service to allow it be addressable
if u.GroupVersionKind().Kind == "Service" && u.GroupVersionKind().Version == "v1" {
return fmt.Sprintf("http://%s.%s.svc/", u.GetName(), u.GetNamespace()), nil
}
t := duckv1alpha1.AddressableType{}
err = duck.FromUnstructured(u, &t)
if err != nil {
return "", fmt.Errorf("failed to deserialize sink %s: %v", objIdentifier, err)
}
if t.Status.Address == nil {
return "", fmt.Errorf("sink %s does not contain address", objIdentifier)
}
url := t.Status.Address.GetURL()
if url.Host == "" {
return "", fmt.Errorf("sink %s contains an empty hostname", objIdentifier)
}
return url.String(), nil
}