packages/sonataflow-operator/internal/controller/eventing/workflowdef_events.go (45 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package eventing import ( "context" "fmt" duckv1 "knative.dev/pkg/apis/duck/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/platform/services" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants" "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/log" ) // GetWorkflowDefinitionEventsTargetURL returns the target url that must be used to send the workflow definition events. func GetWorkflowDefinitionEventsTargetURL(cli client.Client, workflow *operatorapi.SonataFlow) (string, error) { var err error var sfp *operatorapi.SonataFlowPlatform var sink *duckv1.Destination var uri string if sfp, err = platform.GetActivePlatform(context.Background(), cli, workflow.Namespace, false); err != nil { return "", fmt.Errorf("failed to get active platform for workflow: %s, namespace: %s : %v", workflow.Name, workflow.Namespace, err) } if sfp == nil { klog.V(log.D).Infof("No active platform was found to calculate the workflow definition events target url for workflow: %s, namespace: %s.", workflow.Name, workflow.Namespace) return "", err } diHandler := services.NewDataIndexHandler(sfp) if !diHandler.IsServiceEnabled() { klog.V(log.D).Infof("DataIndex is not enabled for current workflow: %s, namespace: %s, neither in current platform: %s, or by a cluster platform reference.", workflow.Name, workflow.Namespace, sfp.Name) return "", nil } // First check if the workflow is connected with the knative eventing system. if sink, err = knative.GetWorkflowSink(workflow, sfp); err != nil { return "", fmt.Errorf("failed to look for a potential sink configuration for workflow: %s, namespace: %s : %v", workflow.Name, workflow.Namespace, err) } if sink != nil { // Workflow is connected via with knative eventing by using an operator managed SinkBinding. if sinkURI, err := knative.GetSinkURI(*sink); err != nil { return "", err } else { uri = sinkURI.String() } } else { // Workflow is connected via direct http invocation with the DI. uri = diHandler.GetServiceBaseUrl() + constants.KogitoProcessDefinitionsEventsPath } return uri, nil }