packages/sonataflow-operator/internal/controller/platform/services/services.go (685 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package services
import (
"fmt"
"strconv"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/version"
appsv1 "k8s.io/api/apps/v1"
"github.com/imdario/mergo"
"github.com/magiconair/properties"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/tracker"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/cfg"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/knative"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/utils/kubernetes"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
operatorapi "github.com/apache/incubator-kie-tools/packages/sonataflow-operator/api/v1alpha08"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/constants"
"github.com/apache/incubator-kie-tools/packages/sonataflow-operator/internal/controller/profiles/common/persistence"
)
const (
quarkusHibernateORMDatabaseGeneration string = "QUARKUS_HIBERNATE_ORM_DATABASE_GENERATION"
quarkusFlywayMigrateAtStart string = "QUARKUS_FLYWAY_MIGRATE_AT_START"
WaitingKnativeEventing = "WaitingKnativeEventing"
)
type PlatformServiceHandler interface {
// GetContainerName returns the name of the service's container in the deployment.
GetContainerName() string
// GetServiceImageName returns the image name of the service's container. It takes in the service and persistence types and returns a string
// that contains the FQDN of the image, including the tag.
GetServiceImageName(persistenceName constants.PersistenceType) string
// GetServiceName returns the name of the kubernetes service prefixed with the platform name
GetServiceName() string
// GetServiceCmName returns the name of the configmap associated to the service
GetServiceCmName() string
// GetEnvironmentVariables returns the env variables to be injected to the service container
GetEnvironmentVariables() []corev1.EnvVar
// GetPodResourceRequirements returns the pod's memory and CPU resource requirements
// Values for job service taken from
// https://github.com/parodos-dev/orchestrator-helm-chart/blob/52d09eda56fdbed3060782df29847c97f172600f/charts/orchestrator/values.yaml#L68-L72
GetPodResourceRequirements() corev1.ResourceRequirements
// GetReplicaCount Returns the default pod replica count for the given service
GetReplicaCount() int32
// GetDeploymentStrategy Returns the deployment strategy for the service
GetDeploymentStrategy() appsv1.DeploymentStrategy
// MergeContainerSpec performs a merge with override using the containerSpec argument and the expected values based on the service's pod template specifications. The returning
// object is the merged result
MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error)
// ConfigurePersistence sets the persistence's image and environment values when it is defined in the Persistence field of the service, overriding any existing value.
ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container
// MergePodSpec performs a merge with override between the podSpec argument and the expected values based on the service's pod template specification. The returning
// object is the result of the merge
MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error)
// GenerateServiceProperties returns a property object that contains the application properties required by the service deployment
GenerateServiceProperties() (*properties.Properties, error)
// GenerateKnativeResources returns knative resources that bridge between workflow deploys and the service
GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error)
// IsServiceSetInSpec returns true if the service is set in the spec.
IsServiceSetInSpec() bool
// IsServiceEnabledInSpec returns true if the service is enabled in the spec.
IsServiceEnabledInSpec() bool
// IsPersistenceEnabledtInSpec returns true if the service has persistence set in the spec.
IsPersistenceEnabledtInSpec() bool
// GetLocalServiceBaseUrl returns the base url of the local service
GetLocalServiceBaseUrl() string
// GetServiceBaseUrl returns the base url of the service, based on whether using local or cluster-scoped service.
GetServiceBaseUrl() string
// IsServiceEnabled returns true if the service is enabled in either the spec or the status.clusterPlatformRef.
IsServiceEnabled() bool
// SetServiceUrlInPlatformStatus sets the service url in the platform's status. if reconciled instance does not have service set in spec AND
// if cluster referenced platform has said service enabled, use the cluster platform's service
SetServiceUrlInPlatformStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform)
// SetServiceUrlInWorkflowStatus sets the service url in a workflow's status.
SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow)
// GetServiceSource returns the source Broker configured for the given service by applying the following precedence rule.
// The source declared in the given service definition is returned first, if any, otherwise a source declared in the
// service platform is returned, if any.
GetServiceSource() *duckv1.Destination
// Check if K_SINK has injected for Job Service. No Op for Data Index
CheckKSinkInjected() (bool, error)
// Returns whether job based, service based or no DB migration is needed
GetDBMigrationStrategy() operatorapi.DBMigrationStrategyType
}
type DataIndexHandler struct {
platform *operatorapi.SonataFlowPlatform
}
// GetDBMigrationStrategy returns DB migration approach
func (d *DataIndexHandler) GetDBMigrationStrategy() operatorapi.DBMigrationStrategyType {
return GetDBMigrationStrategy(d.platform.Spec.Services.DataIndex.Persistence)
}
func NewDataIndexHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler {
return &DataIndexHandler{platform: platform}
}
func (d *DataIndexHandler) GetContainerName() string {
return constants.DataIndexServiceName
}
func (d DataIndexHandler) GetServiceImageName(persistenceType constants.PersistenceType) string {
if persistenceType == constants.PersistenceTypePostgreSQL && len(cfg.GetCfg().DataIndexPostgreSQLImageTag) > 0 {
return cfg.GetCfg().DataIndexPostgreSQLImageTag
}
if persistenceType == constants.PersistenceTypeEphemeral && len(cfg.GetCfg().DataIndexEphemeralImageTag) > 0 {
return cfg.GetCfg().DataIndexEphemeralImageTag
}
// returns "docker.io/apache/incubator-kie-kogito-data-index-<persistence_layer>:<tag>"
return fmt.Sprintf("%s-%s-%s:%s", constants.ImageNamePrefix, constants.DataIndexName, persistenceType.String(), version.GetImageTagVersion())
}
func (d *DataIndexHandler) GetServiceName() string {
return fmt.Sprintf("%s-%s", d.platform.Name, constants.DataIndexServiceName)
}
func (d DataIndexHandler) SetServiceUrlInPlatformStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) {
psDI := NewDataIndexHandler(clusterRefPlatform)
if !isServicesSet(d.platform) && psDI.IsServiceEnabledInSpec() {
if d.platform.Status.ClusterPlatformRef != nil {
if d.platform.Status.ClusterPlatformRef.Services == nil {
d.platform.Status.ClusterPlatformRef.Services = &operatorapi.PlatformServicesStatus{}
}
d.platform.Status.ClusterPlatformRef.Services.DataIndexRef = &operatorapi.PlatformServiceRefStatus{
Url: psDI.GetLocalServiceBaseUrl(),
}
}
}
}
func (d DataIndexHandler) SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow) {
if !profiles.IsDevProfile(workflow) && d.IsServiceEnabled() {
if workflow.Status.Services == nil {
workflow.Status.Services = &operatorapi.PlatformServicesStatus{}
}
workflow.Status.Services.DataIndexRef = &operatorapi.PlatformServiceRefStatus{
Url: d.GetServiceBaseUrl(),
}
}
}
func (d DataIndexHandler) IsServiceSetInSpec() bool {
return isDataIndexSet(d.platform)
}
func (d *DataIndexHandler) IsServiceEnabledInSpec() bool {
return isDataIndexEnabled(d.platform)
}
func (d DataIndexHandler) IsPersistenceEnabledtInSpec() bool {
return d.IsServiceSetInSpec() && d.platform.Spec.Services.DataIndex.Persistence != nil
}
func (d *DataIndexHandler) isServiceEnabledInStatus() bool {
return d.platform != nil && d.platform.Status.ClusterPlatformRef != nil &&
d.platform.Status.ClusterPlatformRef.Services != nil && d.platform.Status.ClusterPlatformRef.Services.DataIndexRef != nil &&
!isServicesSet(d.platform)
}
func (d *DataIndexHandler) IsServiceEnabled() bool {
return d.IsServiceEnabledInSpec() || d.isServiceEnabledInStatus()
}
func (d *DataIndexHandler) GetServiceBaseUrl() string {
if d.IsServiceEnabledInSpec() {
return d.GetLocalServiceBaseUrl()
}
if d.isServiceEnabledInStatus() {
return d.platform.Status.ClusterPlatformRef.Services.DataIndexRef.Url
}
return ""
}
func (d *DataIndexHandler) GetLocalServiceBaseUrl() string {
return GenerateServiceURL(constants.DefaultHTTPProtocol, d.platform.Namespace, d.GetServiceName())
}
func (d *DataIndexHandler) GetEnvironmentVariables() []corev1.EnvVar {
return []corev1.EnvVar{
{
Name: "KOGITO_DATA_INDEX_QUARKUS_PROFILE",
Value: "http-events-support",
},
}
}
func (d *DataIndexHandler) GetPodResourceRequirements() corev1.ResourceRequirements {
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("100m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
}
}
func (d *DataIndexHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) {
c := podSpec.DeepCopy()
err := mergo.Merge(c, d.platform.Spec.Services.DataIndex.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride)
return *c, err
}
// hasPostgreSQLConfigured returns true when either the SonataFlow Platform PostgreSQL CR's structure or the one in the Data Index service specification is not nil
func (d *DataIndexHandler) hasPostgreSQLConfigured() bool {
return d.IsServiceSetInSpec() &&
((d.platform.Spec.Services.DataIndex.Persistence != nil && d.platform.Spec.Services.DataIndex.Persistence.PostgreSQL != nil) ||
(d.platform.Spec.Persistence != nil && d.platform.Spec.Persistence.PostgreSQL != nil))
}
func GetDBMigrationStrategy(persistence *operatorapi.PersistenceOptionsSpec) operatorapi.DBMigrationStrategyType {
dbMigrationStrategy := operatorapi.DBMigrationStrategyNone
if persistence != nil {
return operatorapi.DBMigrationStrategyType(persistence.DBMigrationStrategy)
}
return dbMigrationStrategy
}
func IsServiceBasedDBMigration(persistence *operatorapi.PersistenceOptionsSpec) bool {
dbMigrationStrategy := GetDBMigrationStrategy(persistence)
return dbMigrationStrategy == operatorapi.DBMigrationStrategyService
}
func IsJobsBasedDBMigration(persistence *operatorapi.PersistenceOptionsSpec) bool {
dbMigrationStrategy := GetDBMigrationStrategy(persistence)
return dbMigrationStrategy == operatorapi.DBMigrationStrategyJob
}
func IsNoDBMigration(persistence *operatorapi.PersistenceOptionsSpec) bool {
dbMigrationStrategy := GetDBMigrationStrategy(persistence)
return dbMigrationStrategy == operatorapi.DBMigrationStrategyNone || dbMigrationStrategy == ""
}
func isDBMigrationStrategyService(persistence *v1alpha08.PersistenceOptionsSpec) string {
dbMigrationStrategyService := "true"
if persistence != nil {
dbMigrationStrategyService = strconv.FormatBool(IsServiceBasedDBMigration(persistence))
}
return dbMigrationStrategyService
}
func (d *DataIndexHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container {
if d.hasPostgreSQLConfigured() {
p := persistence.RetrievePostgreSQLConfiguration(d.platform.Spec.Services.DataIndex.Persistence, d.platform.Spec.Persistence, d.GetServiceName())
c := containerSpec.DeepCopy()
c.Image = d.GetServiceImageName(constants.PersistenceTypePostgreSQL)
c.Env = append(c.Env, persistence.ConfigurePostgreSQLEnv(p.PostgreSQL, d.GetServiceName(), d.platform.Namespace)...)
dbMigrationStrategyService := isDBMigrationStrategyService(d.platform.Spec.Services.DataIndex.Persistence)
// specific to DataIndex
c.Env = append(c.Env, corev1.EnvVar{Name: quarkusHibernateORMDatabaseGeneration, Value: "update"}, corev1.EnvVar{Name: quarkusFlywayMigrateAtStart, Value: dbMigrationStrategyService})
return c
}
return containerSpec
}
func (d DataIndexHandler) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) {
return mergeContainerSpec(containerSpec, &d.platform.Spec.Services.DataIndex.PodTemplate.Container)
}
func (d *DataIndexHandler) GetReplicaCount() int32 {
if d.platform.Spec.Services.DataIndex.PodTemplate.Replicas != nil {
return *d.platform.Spec.Services.DataIndex.PodTemplate.Replicas
}
return 1
}
func (d *DataIndexHandler) GetDeploymentStrategy() appsv1.DeploymentStrategy {
return appsv1.DeploymentStrategy{}
}
func (d *DataIndexHandler) GetServiceCmName() string {
return fmt.Sprintf("%s-props", d.GetServiceName())
}
func (d *DataIndexHandler) GetServiceSource() *duckv1.Destination {
if d.platform.Spec.Services.DataIndex.Source != nil {
return d.platform.Spec.Services.DataIndex.Source
}
return GetPlatformBroker(d.platform)
}
func (d *DataIndexHandler) GenerateServiceProperties() (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.KogitoServiceURLProperty, d.GetLocalServiceBaseUrl())
props.Set(constants.DataIndexKafkaHealthCheck, "false")
return props, nil
}
func (d *DataIndexHandler) CheckKSinkInjected() (bool, error) {
return true, nil // No op
}
type JobServiceHandler struct {
platform *operatorapi.SonataFlowPlatform
}
// GetDBMigrationStrategy returns db migration approach otherwise
func (j *JobServiceHandler) GetDBMigrationStrategy() operatorapi.DBMigrationStrategyType {
return GetDBMigrationStrategy(j.platform.Spec.Services.JobService.Persistence)
}
func NewJobServiceHandler(platform *operatorapi.SonataFlowPlatform) PlatformServiceHandler {
return &JobServiceHandler{platform: platform}
}
func (j *JobServiceHandler) GetContainerName() string {
return constants.JobServiceName
}
func (j JobServiceHandler) GetServiceImageName(persistenceType constants.PersistenceType) string {
if persistenceType == constants.PersistenceTypePostgreSQL && len(cfg.GetCfg().JobsServicePostgreSQLImageTag) > 0 {
return cfg.GetCfg().JobsServicePostgreSQLImageTag
}
if persistenceType == constants.PersistenceTypeEphemeral && len(cfg.GetCfg().JobsServiceEphemeralImageTag) > 0 {
return cfg.GetCfg().JobsServiceEphemeralImageTag
}
// returns "docker.io/apache/incubator-kie-kogito-jobs-service-<persistece_layer>:<tag>"
return fmt.Sprintf("%s-%s-%s:%s", constants.ImageNamePrefix, constants.JobServiceName, persistenceType.String(), version.GetImageTagVersion())
}
func (j *JobServiceHandler) GetServiceName() string {
return fmt.Sprintf("%s-%s", j.platform.Name, constants.JobServiceName)
}
func (j *JobServiceHandler) GetServiceCmName() string {
return fmt.Sprintf("%s-props", j.GetServiceName())
}
func (j JobServiceHandler) SetServiceUrlInPlatformStatus(clusterRefPlatform *operatorapi.SonataFlowPlatform) {
psJS := NewJobServiceHandler(clusterRefPlatform)
if !isServicesSet(j.platform) && psJS.IsServiceEnabledInSpec() {
if j.platform.Status.ClusterPlatformRef != nil {
if j.platform.Status.ClusterPlatformRef.Services == nil {
j.platform.Status.ClusterPlatformRef.Services = &operatorapi.PlatformServicesStatus{}
}
j.platform.Status.ClusterPlatformRef.Services.JobServiceRef = &operatorapi.PlatformServiceRefStatus{
Url: psJS.GetLocalServiceBaseUrl(),
}
}
}
}
func (j JobServiceHandler) SetServiceUrlInWorkflowStatus(workflow *operatorapi.SonataFlow) {
if !profiles.IsDevProfile(workflow) && j.IsServiceEnabled() {
if workflow.Status.Services == nil {
workflow.Status.Services = &operatorapi.PlatformServicesStatus{}
}
workflow.Status.Services.JobServiceRef = &operatorapi.PlatformServiceRefStatus{
Url: j.GetServiceBaseUrl(),
}
}
}
func (j JobServiceHandler) IsServiceSetInSpec() bool {
return isJobServiceSet(j.platform)
}
func (j *JobServiceHandler) IsServiceEnabledInSpec() bool {
return isJobServiceEnabled(j.platform)
}
func (j JobServiceHandler) IsPersistenceEnabledtInSpec() bool {
return j.IsServiceSetInSpec() && j.platform.Spec.Services.JobService.Persistence != nil
}
func (j *JobServiceHandler) isServiceEnabledInStatus() bool {
return j.platform != nil && j.platform.Status.ClusterPlatformRef != nil &&
j.platform.Status.ClusterPlatformRef.Services != nil && j.platform.Status.ClusterPlatformRef.Services.JobServiceRef != nil &&
!isServicesSet(j.platform)
}
func (j *JobServiceHandler) IsServiceEnabled() bool {
return j.IsServiceEnabledInSpec() || j.isServiceEnabledInStatus()
}
func (j *JobServiceHandler) GetServiceBaseUrl() string {
if j.IsServiceEnabledInSpec() {
return j.GetLocalServiceBaseUrl()
}
if j.isServiceEnabledInStatus() {
return j.platform.Status.ClusterPlatformRef.Services.JobServiceRef.Url
}
return ""
}
func (j *JobServiceHandler) GetLocalServiceBaseUrl() string {
return GenerateServiceURL(constants.DefaultHTTPProtocol, j.platform.Namespace, j.GetServiceName())
}
func (j *JobServiceHandler) GetEnvironmentVariables() []corev1.EnvVar {
return []corev1.EnvVar{}
}
func (j *JobServiceHandler) GetPodResourceRequirements() corev1.ResourceRequirements {
return corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
}
}
func (j *JobServiceHandler) GetReplicaCount() int32 {
if j.platform.Spec.Services.JobService.PodTemplate.Replicas != nil && *j.platform.Spec.Services.JobService.PodTemplate.Replicas == 0 {
return 0
}
return 1
}
func (j *JobServiceHandler) GetDeploymentStrategy() appsv1.DeploymentStrategy {
return appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
RollingUpdate: nil,
}
}
func (j JobServiceHandler) MergeContainerSpec(containerSpec *corev1.Container) (*corev1.Container, error) {
return mergeContainerSpec(containerSpec, &j.platform.Spec.Services.JobService.PodTemplate.Container)
}
// hasPostgreSQLConfigured returns true when either the SonataFlow Platform PostgreSQL CR's structure or the one in the Job service specification is not nil
func (j *JobServiceHandler) hasPostgreSQLConfigured() bool {
return j.IsServiceSetInSpec() &&
((j.platform.Spec.Services.JobService.Persistence != nil && j.platform.Spec.Services.JobService.Persistence.PostgreSQL != nil) ||
(j.platform.Spec.Persistence != nil && j.platform.Spec.Persistence.PostgreSQL != nil))
}
func (j *JobServiceHandler) ConfigurePersistence(containerSpec *corev1.Container) *corev1.Container {
if j.hasPostgreSQLConfigured() {
c := containerSpec.DeepCopy()
c.Image = j.GetServiceImageName(constants.PersistenceTypePostgreSQL)
p := persistence.RetrievePostgreSQLConfiguration(j.platform.Spec.Services.JobService.Persistence, j.platform.Spec.Persistence, j.GetServiceName())
c.Env = append(c.Env, persistence.ConfigurePostgreSQLEnv(p.PostgreSQL, j.GetServiceName(), j.platform.Namespace)...)
dbMigrationStrategyService := isDBMigrationStrategyService(j.platform.Spec.Services.JobService.Persistence)
// Specific to Job Service
c.Env = append(c.Env, corev1.EnvVar{Name: "QUARKUS_FLYWAY_MIGRATE_AT_START", Value: dbMigrationStrategyService})
c.Env = append(c.Env, corev1.EnvVar{Name: "KOGITO_JOBS_SERVICE_LOADJOBERRORSTRATEGY", Value: "FAIL_SERVICE"})
return c
}
return containerSpec
}
func (j *JobServiceHandler) MergePodSpec(podSpec corev1.PodSpec) (corev1.PodSpec, error) {
c := podSpec.DeepCopy()
err := mergo.Merge(c, j.platform.Spec.Services.JobService.PodTemplate.PodSpec.ToPodSpec(), mergo.WithOverride)
return *c, err
}
func (j *JobServiceHandler) GenerateServiceProperties() (*properties.Properties, error) {
props := properties.NewProperties()
props.Set(constants.KogitoServiceURLProperty, GenerateServiceURL(constants.KogitoServiceURLProtocol, j.platform.Namespace, j.GetServiceName()))
props.Set(constants.JobServiceKafkaSmallRyeHealthProperty, "false")
props.Set(constants.JobServiceLeaderLivenessSmallRyeHealthProperty, "true")
props.Set(constants.JobServiceLeaderCheckExpirationInSeconds, constants.DefaultJobServiceLeaderCheckExpirationInSeconds)
if j.GetServiceSource() == nil {
props.Set(constants.JobServiceKSinkInjectionHealthCheck, "false")
} else {
props.Set(constants.JobServiceKSinkInjectionHealthCheck, "true")
}
// add data source reactive URL
if j.hasPostgreSQLConfigured() {
p := persistence.RetrievePostgreSQLConfiguration(j.platform.Spec.Services.JobService.Persistence, j.platform.Spec.Persistence, j.GetServiceName())
dataSourceReactiveURL, err := generateReactiveURL(p.PostgreSQL, j.GetServiceName(), j.platform.Namespace, constants.DefaultDatabaseName, constants.DefaultPostgreSQLPort)
if err != nil {
return nil, err
}
props.Set(constants.JobServiceDataSourceReactiveURL, dataSourceReactiveURL)
}
if isDataIndexEnabled(j.platform) {
props.Set(constants.JobServiceStatusChangeEvents, "true")
if j.GetServiceSource() == nil {
di := NewDataIndexHandler(j.platform)
props.Set(constants.JobServiceStatusChangeEventsURL, di.GetLocalServiceBaseUrl()+"/jobs")
} else {
props.Set(constants.JobServiceStatusChangeEventsURL, constants.KnativeInjectedEnvVar)
props.Set(constants.JobServiceStatusChangeEventsConnector, constants.QuarkusHTTP)
props.Set(constants.JobServiceStatusChangeEventsMethod, constants.Post)
}
}
props.Sort()
return props, nil
}
func SetServiceUrlsInWorkflowStatus(pl *operatorapi.SonataFlowPlatform, workflow *operatorapi.SonataFlow) {
tpsDI := NewDataIndexHandler(pl)
tpsJS := NewJobServiceHandler(pl)
workflow.Status.Services = nil
tpsDI.SetServiceUrlInWorkflowStatus(workflow)
tpsJS.SetServiceUrlInWorkflowStatus(workflow)
}
func (j *JobServiceHandler) GetServiceSource() *duckv1.Destination {
if j.platform.Spec.Services.JobService.Source != nil {
return j.platform.Spec.Services.JobService.Source
}
return GetPlatformBroker(j.platform)
}
func (j *JobServiceHandler) GetServiceSink() *duckv1.Destination {
if j.platform.Spec.Services.JobService.Sink != nil {
return j.platform.Spec.Services.JobService.Sink
}
return GetPlatformBroker(j.platform)
}
func isDataIndexEnabled(platform *operatorapi.SonataFlowPlatform) bool {
return isDataIndexSet(platform) && platform.Spec.Services.DataIndex.Enabled != nil &&
*platform.Spec.Services.DataIndex.Enabled
}
func isJobServiceEnabled(platform *operatorapi.SonataFlowPlatform) bool {
return isJobServiceSet(platform) && platform.Spec.Services.JobService.Enabled != nil &&
*platform.Spec.Services.JobService.Enabled
}
func isDataIndexSet(platform *operatorapi.SonataFlowPlatform) bool {
return isServicesSet(platform) && platform.Spec.Services.DataIndex != nil
}
func isJobServiceSet(platform *operatorapi.SonataFlowPlatform) bool {
return isServicesSet(platform) && platform.Spec.Services.JobService != nil
}
func isServicesSet(platform *operatorapi.SonataFlowPlatform) bool {
return platform != nil && platform.Spec.Services != nil
}
func GenerateServiceURL(protocol string, namespace string, name string) string {
var serviceUrl string
if len(namespace) > 0 {
serviceUrl = fmt.Sprintf("%s://%s.%s", protocol, name, namespace)
} else {
serviceUrl = fmt.Sprintf("%s://%s", protocol, name)
}
return serviceUrl
}
// mergeContainerSpec Produces the merging between the operatorapi.ContainerSpec provided in a SonataFlowPlatform
// service, for example, platform.services.jobsService.podTemplate.container, and the destination container for the
// corresponding service deployment. This method consider specific processing like not overriding environment vars
// already configured by the operator in the destination container.
func mergeContainerSpec(dest *corev1.Container, sourceSpec *operatorapi.ContainerSpec) (*corev1.Container, error) {
result := dest.DeepCopy()
source := sourceSpec.ToContainer()
err := mergeContainerPreservingEnvVars(result, &source)
return result, err
}
// mergeContainerSpecPreservingEnvVars Merges the source container into the dest container by giving priority to the
// env variables already configured in the dest container when both containers have the same variable name.
func mergeContainerPreservingEnvVars(dest *corev1.Container, source *corev1.Container) error {
currentEnv := dest.Env
if err := mergo.Merge(dest, source, mergo.WithOverride); err != nil {
return err
}
dest.Env = currentEnv
for _, envVar := range source.Env {
kubernetes.AddEnvIfNotPresent(dest, envVar)
}
return nil
}
// GetPlatformBroker gets the default broker for the platform.
func GetPlatformBroker(platform *operatorapi.SonataFlowPlatform) *duckv1.Destination {
if platform != nil && platform.Spec.Eventing != nil && platform.Spec.Eventing.Broker != nil {
return platform.Spec.Eventing.Broker
}
return nil
}
func (d *DataIndexHandler) GetSourceBroker() *duckv1.Destination {
if d.platform != nil && d.platform.Spec.Services.DataIndex.Source != nil && d.platform.Spec.Services.DataIndex.Source.Ref != nil {
return d.platform.Spec.Services.DataIndex.Source
}
return GetPlatformBroker(d.platform)
}
func (d *DataIndexHandler) newTrigger(labels map[string]string, annotations map[string]string, brokerName, namespace, serviceName, tag, eventType, path string, platform *operatorapi.SonataFlowPlatform) *eventingv1.Trigger {
return &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName(fmt.Sprintf("data-index-%s-", tag), string(platform.GetUID())),
Namespace: namespace,
Labels: labels,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": eventType,
},
},
Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{
Name: serviceName,
Namespace: platform.Namespace,
APIVersion: "v1",
Kind: "Service",
},
URI: &apis.URL{
Path: path,
},
},
},
}
}
func (d *DataIndexHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) {
broker := d.GetSourceBroker()
if broker == nil || len(broker.Ref.Name) == 0 {
return nil, nil, nil // Nothing to do
}
brokerName := broker.Ref.Name
namespace := broker.Ref.Namespace
if len(namespace) == 0 {
namespace = platform.Namespace
}
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), d.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
managedAnnotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), managedAnnotations)
serviceName := d.GetServiceName()
return []client.Object{
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-error", "ProcessInstanceErrorDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-node", "ProcessInstanceNodeDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-state", "ProcessInstanceStateDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-variable", "ProcessInstanceVariableDataEvent", constants.KogitoProcessInstancesEventsPath, platform),
d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "process-definition", "ProcessDefinitionEvent", constants.KogitoProcessDefinitionsEventsPath, platform),
d.newTrigger(lbl, annotations, brokerName, namespace, serviceName, "process-instance-multiple", "MultipleProcessInstanceDataEvent", constants.KogitoProcessInstancesMultiEventsPath, platform),
d.newTrigger(lbl, managedAnnotations, brokerName, namespace, serviceName, "jobs", "JobEvent", constants.KogitoJobsPath, platform)}, nil, nil
}
func (d JobServiceHandler) GetSourceBroker() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Source != nil && d.platform.Spec.Services.JobService.Source.Ref != nil {
return d.platform.Spec.Services.JobService.Source
}
return GetPlatformBroker(d.platform)
}
func addTriggerAnnotations(brokerClass string, annotations map[string]string) {
if knative.IsKafkaBroker(brokerClass) {
annotations[knative.KafkaKnativeEventingDeliveryOrder] = knative.KafkaKnativeEventingDeliveryOrderOrdered
}
}
func (d JobServiceHandler) GetSink() *duckv1.Destination {
if d.platform.Spec.Services.JobService.Sink != nil {
return d.platform.Spec.Services.JobService.Sink
}
return GetPlatformBroker(d.platform)
}
func (j *JobServiceHandler) GenerateKnativeResources(platform *operatorapi.SonataFlowPlatform, lbl map[string]string) ([]client.Object, *corev1.Event, error) {
broker := j.GetSourceBroker()
sink := j.GetSink()
resultObjs := []client.Object{}
if broker != nil && len(broker.Ref.Name) > 0 {
brokerName := broker.Ref.Name
namespace := broker.Ref.Namespace
if len(namespace) == 0 {
namespace = platform.Namespace
}
var brokerObject *eventingv1.Broker
var err error
if brokerObject, err = knative.ValidateBroker(brokerName, namespace); err != nil {
event := &corev1.Event{
Type: corev1.EventTypeWarning,
Reason: WaitingKnativeEventing,
Message: fmt.Sprintf("%s for service: %s", err.Error(), j.GetServiceName()),
}
return nil, event, err
}
annotations := make(map[string]string)
addTriggerAnnotations(knative.GetBrokerClass(brokerObject), annotations)
jobCreateTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-create-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "job.create",
},
},
Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{
Name: j.GetServiceName(),
Namespace: platform.Namespace,
APIVersion: "v1",
Kind: "Service",
},
URI: &apis.URL{
Path: constants.JobServiceJobEventsPath,
},
},
},
}
resultObjs = append(resultObjs, jobCreateTrigger)
jobDeleteTrigger := &eventingv1.Trigger{
ObjectMeta: metav1.ObjectMeta{
Name: kmeta.ChildName("jobs-service-delete-job-", string(platform.GetUID())),
Namespace: namespace,
Labels: lbl,
Annotations: annotations,
},
Spec: eventingv1.TriggerSpec{
Broker: brokerName,
Filter: &eventingv1.TriggerFilter{
Attributes: eventingv1.TriggerFilterAttributes{
"type": "job.delete",
},
},
Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{
Name: j.GetServiceName(),
Namespace: platform.Namespace,
APIVersion: "v1",
Kind: "Service",
},
URI: &apis.URL{
Path: constants.JobServiceJobEventsPath,
},
},
},
}
resultObjs = append(resultObjs, jobDeleteTrigger)
}
if sink != nil {
sinkBinding := &sourcesv1.SinkBinding{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-jobs-service-sb", platform.Name),
Namespace: platform.Namespace,
Labels: lbl,
},
Spec: sourcesv1.SinkBindingSpec{
SourceSpec: duckv1.SourceSpec{
Sink: *sink,
},
BindingSpec: duckv1.BindingSpec{
Subject: tracker.Reference{
Name: j.GetServiceName(),
Namespace: platform.Namespace,
APIVersion: "apps/v1",
Kind: "Deployment",
},
},
},
}
resultObjs = append(resultObjs, sinkBinding)
}
return resultObjs, nil, nil
}
func (j *JobServiceHandler) CheckKSinkInjected() (bool, error) {
if j.GetSink() != nil { //job services has sink configured
return knative.CheckKSinkInjected(j.GetServiceName(), j.platform.Namespace)
}
return true, nil
}
func IsDataIndexEnabled(plf *operatorapi.SonataFlowPlatform) bool {
if plf.Spec.Services != nil {
if plf.Spec.Services.DataIndex != nil {
return pointer.BoolDeref(plf.Spec.Services.DataIndex.Enabled, false)
}
return false
}
// Check if DataIndex is enabled in the platform status
if plf.Status.ClusterPlatformRef != nil && plf.Status.ClusterPlatformRef.Services != nil && plf.Status.ClusterPlatformRef.Services.DataIndexRef != nil && len(plf.Status.ClusterPlatformRef.Services.DataIndexRef.Url) > 0 {
return true
}
return false
}
func IsJobServiceEnabled(plf *operatorapi.SonataFlowPlatform) bool {
if plf.Spec.Services != nil {
if plf.Spec.Services.JobService != nil {
return pointer.BoolDeref(plf.Spec.Services.JobService.Enabled, false)
}
return false
}
// Check if JobService is enabled in the platform status
if plf.Status.ClusterPlatformRef != nil && plf.Status.ClusterPlatformRef.Services != nil && plf.Status.ClusterPlatformRef.Services.JobServiceRef != nil && len(plf.Status.ClusterPlatformRef.Services.JobServiceRef.Url) > 0 {
return true
}
return false
}