internal/pkg/core/deployers/deployers.go (140 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package deployers
import (
"context"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
"github.com/apache/synapse-go/internal/app/adapters/inbound"
"github.com/apache/synapse-go/internal/app/core/domain"
"github.com/apache/synapse-go/internal/app/core/ports"
"github.com/apache/synapse-go/internal/pkg/core/artifacts"
"github.com/apache/synapse-go/internal/pkg/core/deployers/types"
"github.com/apache/synapse-go/internal/pkg/core/router"
"github.com/apache/synapse-go/internal/pkg/core/utils"
"github.com/apache/synapse-go/internal/pkg/loggerfactory"
)
const (
componentName = "deployers"
)
type Deployer struct {
inboundMediator ports.InboundMessageMediator
routerService *router.RouterService
basePath string
logger *slog.Logger
}
// Synapse/
// ├─ bin/
// │ └─ synapse (the compiled binary)
// └─ artifacts/
// ├─ APIs/
// |─ Endpoints/
// |─ Sequences/
// └─ Inbounds/
func NewDeployer(basePath string, inboundMediator ports.InboundMessageMediator, routerService *router.RouterService) *Deployer {
d := &Deployer{
basePath: basePath,
inboundMediator: inboundMediator,
routerService: routerService,
}
d.logger = loggerfactory.GetLogger(componentName, d)
return d
}
func (d *Deployer) UpdateLogger() {
d.logger = loggerfactory.GetLogger(componentName,d)
}
func (d *Deployer) Deploy(ctx context.Context) error {
files, err := os.ReadDir(d.basePath)
if err != nil {
return err
}
if len(files) == 0 {
return nil
}
for _, artifactType := range []string{"Sequences", "APIs", "Inbounds"} {
folderPath := filepath.Join(d.basePath, artifactType)
files, err := os.ReadDir(folderPath)
if err != nil {
return err
}
for _, file := range files {
if file.IsDir() || filepath.Ext(file.Name()) != ".xml" {
continue
}
xmlFile, err := os.Open(filepath.Join(folderPath, file.Name()))
if err != nil {
return err
}
defer xmlFile.Close()
data, err := io.ReadAll(xmlFile)
if err != nil {
d.logger.Error("Error reading file:", "error", err)
continue
}
switch artifactType {
case "APIs":
d.DeployAPIs(ctx, file.Name(), string(data))
case "Sequences":
d.DeploySequences(ctx, file.Name(), string(data))
case "Inbounds":
d.DeployInbounds(ctx, file.Name(), string(data))
}
}
}
return nil
}
func (d *Deployer) DeploySequences(ctx context.Context, fileName string, xmlData string) {
position := artifacts.Position{FileName: fileName}
sequence := types.Sequence{}
newSeq, err := sequence.Unmarshal(xmlData, position)
if err != nil {
d.logger.Error("Error unmarshalling sequence:", "error", err)
return
}
configContext := ctx.Value(utils.ConfigContextKey).(*artifacts.ConfigContext)
configContext.AddSequence(newSeq)
d.logger.Info("Deployed sequence: " + newSeq.Name)
}
func (d *Deployer) DeployAPIs(ctx context.Context, fileName string, xmlData string) {
position := artifacts.Position{FileName: fileName}
api := types.API{}
newApi, err := api.Unmarshal(xmlData, position)
if err != nil {
d.logger.Error("Error unmarshalling api:", "error", err)
return
}
configContext := ctx.Value(utils.ConfigContextKey).(*artifacts.ConfigContext)
configContext.AddAPI(newApi)
d.logger.Info("Deployed API: " + newApi.Name)
// Register the API with the router service
if err := d.routerService.RegisterAPI(ctx, newApi); err != nil {
d.logger.Error("Error registering API with router service:", "error", err)
return
}
}
func (d *Deployer) DeployInbounds(ctx context.Context, fileName string, xmlData string) {
position := artifacts.Position{FileName: fileName}
inboundEp := types.Inbound{}
newInbound, err := inboundEp.Unmarshal(xmlData, position)
if err != nil {
d.logger.Error("Error unmarshalling inbound:", "error", err)
return
}
configContext := ctx.Value(utils.ConfigContextKey).(*artifacts.ConfigContext)
configContext.AddInbound(newInbound)
d.logger.Info("Deployed inbound: " + newInbound.Name)
// Start the inbound endpoint
parametersMap := make(map[string]string)
for _, param := range newInbound.Parameters {
parametersMap[param.Name] = param.Value
}
inboundEndpoint, err := inbound.NewInbound(domain.InboundConfig{
SequenceName: newInbound.Sequence,
Name: newInbound.Name,
Protocol: newInbound.Protocol,
Parameters: parametersMap,
})
if err != nil {
d.logger.Error("Error creating inbound endpoint:", "error", err)
return
}
wg := ctx.Value(utils.WaitGroupKey).(*sync.WaitGroup)
wg.Add(1)
go func(endpoint ports.InboundEndpoint) {
defer wg.Done()
if err := endpoint.Start(ctx, d.inboundMediator); err != nil {
d.logger.Error("Error starting inbound endpoint:", "error", err)
}
}(inboundEndpoint)
}