internal/app/adapters/inbound/file/file_inbound.go (264 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package file
import (
"context"
"fmt"
"log/slog"
"strconv"
"sync"
"time"
"github.com/apache/synapse-go/internal/app/core/domain"
"github.com/apache/synapse-go/internal/app/core/ports"
"github.com/apache/synapse-go/internal/pkg/core/synctx"
"golang.org/x/text/cases"
"golang.org/x/text/language"
)
// FileInboundEndpoint handles file-based inbound operations
type FileInboundEndpoint struct {
config domain.InboundConfig
isRunning bool
clock FileClock
mediator ports.InboundMessageMediator
processingFiles sync.Map
protocolHandler ProtocolHandler
}
// NewFileInboundEndpoint creates a new FileInboundEndpoint instance
func NewFileInboundEndpoint(
config domain.InboundConfig,
mediator ports.InboundMessageMediator,
) *FileInboundEndpoint {
return &FileInboundEndpoint{
config: config,
clock: NewFileClock(),
mediator: mediator,
}
}
func (f *FileInboundEndpoint) Start(ctx context.Context, mediator ports.InboundMessageMediator) error {
// Check if context is already canceled before proceeding
select {
case <-ctx.Done():
// Context already canceled, don't decrement WaitGroup
return ctx.Err()
default:
// Context still valid, proceed with normal operation
}
if err := f.validateConfig(); err != nil {
slog.Error("invalid configuration", "error", err)
return fmt.Errorf("configuration validation failed: %w", err)
}
f.mediator = mediator
vfsFactory := &VFSProtocolHandlerFactory{}
handler, err := vfsFactory.CreateHandler(f.config)
if err != nil {
slog.Error("failed to create protocol handler", "error", err)
return fmt.Errorf("failed to create protocol handler: %w", err)
}
f.protocolHandler = handler
slog.Info("starting file inbound endpoint")
// Start polling
err = f.poll(ctx)
// When context is cancelled, wait for all processing to complete
slog.Info("waiting for in-progress file operations to complete")
return err
}
// Call this using a channel
func (f *FileInboundEndpoint) Stop() error {
slog.Info("stopping file inbound endpoint")
f.isRunning = false
return nil
}
func (f *FileInboundEndpoint) poll(ctx context.Context) error {
interval, err := strconv.Atoi(f.config.Parameters["interval"])
if err != nil {
slog.Error("invalid interval value", "error", err)
return fmt.Errorf("invalid interval value: %w", err)
}
ticker := f.clock.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
processingWg := &sync.WaitGroup{}
for {
select {
case <-ctx.Done():
slog.Info("received shutdown signal, stopping file polling")
// Wait for all processing to complete before returning
processingWg.Wait()
return ctx.Err()
case <-ticker.C:
processingWg.Add(1)
go func() {
defer processingWg.Done()
if err := f.processingCycle(ctx); err != nil {
if err != context.Canceled {
slog.Error("error in processing cycle", "error", err)
}
}
}()
}
}
}
func (f *FileInboundEndpoint) processingCycle(ctx context.Context) error {
pattern, exists := f.config.Parameters["transport.vfs.FileNamePattern"]
if !exists {
pattern = ".*"
}
files, err := f.protocolHandler.ListFiles(pattern)
if err != nil {
return fmt.Errorf("failed to scan directory: %w", err)
}
// Process the files
return f.processFiles(ctx, files)
}
func (f *FileInboundEndpoint) processFiles(ctx context.Context, files []string) error {
// Check if sequential processing is required
sequential := false
if val, exists := f.config.Parameters["sequential"]; exists {
var err error
sequential, err = strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("invalid sequential value: must be true/false, got '%s', defaulting to false", val)
}
}
fileWg := &sync.WaitGroup{}
for _, file := range files {
select {
case <-ctx.Done():
slog.Info("cancelling remaining file processing")
// Wait for all processing to complete before returning
fileWg.Wait()
return ctx.Err()
default:
// Check if file is already being processed
if _, exists := f.processingFiles.LoadOrStore(file, true); exists {
slog.Debug("skipping file - already being processed", "file", file)
continue
}
if sequential {
if err := f.processFile(ctx, file); err != nil {
slog.Error("failed to process file", "error", err)
}
} else {
fileWg.Add(1)
go func(fileName string) {
defer fileWg.Done()
if err := f.processFile(ctx, fileName); err != nil {
slog.Error("failed to process file", "error", err)
}
}(file)
}
}
}
return nil
}
func (f *FileInboundEndpoint) processFile(ctx context.Context, fileURI string) error {
defer f.processingFiles.Delete(fileURI)
fileSize, err := f.protocolHandler.GetSize(fileURI)
if err != nil {
return fmt.Errorf("failed to get file size: %w", err)
}
lastModified, err := f.protocolHandler.GetLastModified(fileURI)
if err != nil {
return fmt.Errorf("failed to get last modified: %w", err)
}
filePath := f.protocolHandler.GetPath(fileURI)
fileName := f.protocolHandler.GetName(fileURI)
// Set up headers with file metadata
headers := map[string]string{
"FILE_LENGTH": fmt.Sprintf("%d", fileSize),
"LAST_MODIFIED": fmt.Sprintf("%d", lastModified.Unix()),
"FILE_URI": fileURI,
"FILE_PATH": filePath,
"FILE_NAME": fileName,
}
properties := map[string]string{
"isInbound": "true",
"ARTIFACT_NAME": "inboundendpointfile",
"inboundEndpointName": "file",
"ClientApiNonBlocking": "true",
}
// Create a message context with metadata but no content yet
msgContext := &synctx.MsgContext{
Properties: properties,
Message: synctx.Message{
ContentType: f.config.Parameters["transport.vfs.ContentType"],
},
Headers: headers,
}
// Read the file content
content, err := f.protocolHandler.ReadFile(fileURI)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Set the payload if read was successful
msgContext.Message.RawPayload = content
// Check context before proceeding with mediation
select {
case <-ctx.Done():
return ctx.Err()
default:
// Process the file through mediator
if err := f.mediator.MediateInboundMessage(ctx, f.config.SequenceName, msgContext); err != nil {
if err := f.handleFileAction(fileURI, "Failure"); err != nil {
return fmt.Errorf("failed to handle file after failure: %w", err)
}
} else {
if err := f.handleFileAction(fileURI, "Process"); err != nil {
return fmt.Errorf("failed to handle file after process: %w", err)
}
}
return nil
}
}
// handleFileAction handles file operations based on the configured action (MOVE, DELETE)
func (f *FileInboundEndpoint) handleFileAction(fileURI, actionType string) error {
slog.Info("handling file action")
titleCaser := cases.Title(language.English)
actionKey := fmt.Sprintf("transport.vfs.ActionAfter%s", titleCaser.String(actionType))
if action, exists := f.config.Parameters[actionKey]; exists {
slog.Info(action)
if action == "MOVE" {
movePathKey := fmt.Sprintf("transport.vfs.MoveAfter%s", titleCaser.String(actionType))
movePath, exists := f.config.Parameters[movePathKey]
if !exists || movePath == "" {
return fmt.Errorf("move path not specified for %s action", actionType)
}
return f.protocolHandler.MoveFile(fileURI, movePath)
}
}
// Default to DELETE as per specification
return f.protocolHandler.DeleteFile(fileURI)
}
func (f *FileInboundEndpoint) validateConfig() error {
// Check required interval parameter
interval, exists := f.config.Parameters["interval"]
if !exists || interval == "" {
return fmt.Errorf("missing required parameter: 'interval'")
}
intervalMs, err := strconv.Atoi(interval)
if err != nil {
return fmt.Errorf("invalid interval value: must be an integer, got '%s'", interval)
}
if intervalMs <= 0 {
return fmt.Errorf("invalid interval value: must be positive, got '%d'", intervalMs)
}
// Check sequential parameter (optional, default false)
if val, exists := f.config.Parameters["sequential"]; exists {
_, err := strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("invalid sequential value: must be true/false, got '%s'", val)
}
}
// Check coordination parameter (optional, default false)
if val, exists := f.config.Parameters["coordination"]; exists {
_, err := strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("invalid coordination value: must be true/false, got '%s'", val)
}
}
// Check required FileURI parameter
fileURI, exists := f.config.Parameters["transport.vfs.FileURI"]
if !exists || fileURI == "" {
return fmt.Errorf("missing required parameter: 'transport.vfs.FileURI'")
}
// Check required ContentType parameter
contentType, exists := f.config.Parameters["transport.vfs.ContentType"]
if !exists || contentType == "" {
return fmt.Errorf("missing required parameter: 'transport.vfs.ContentType'")
}
// Check action after process (optional, default DELETE)
if val, exists := f.config.Parameters["transport.vfs.ActionAfterProcess"]; exists && val != "" {
if val == "MOVE" {
if _, exists := f.config.Parameters["transport.vfs.MoveAfterProcess"]; !exists || f.config.Parameters["transport.vfs.MoveAfterProcess"] == "" {
return fmt.Errorf("missing required parameter: 'transport.vfs.MoveAfterProcess' is required when ActionAfterProcess is 'MOVE'")
}
}
}
if val, exists := f.config.Parameters["transport.vfs.ActionAfterFailure"]; exists && val != "" {
if val == "MOVE" {
if _, exists := f.config.Parameters["transport.vfs.MoveAfterFailure"]; !exists || f.config.Parameters["transport.vfs.MoveAfterFailure"] == "" {
return fmt.Errorf("missing required parameter: 'transport.vfs.MoveAfterFailure' is required when ActionAfterFailure is 'MOVE'")
}
}
}
// Validate AutoLockReleaseInterval if provided
if val, exists := f.config.Parameters["transport.vfs.AutoLockReleaseInterval"]; exists {
timeout, err := strconv.Atoi(val)
if err != nil {
return fmt.Errorf("invalid AutoLockReleaseInterval value: must be an integer, got '%s'", val)
}
// Allow -1 for never timeout, or positive values
if timeout != -1 && timeout <= 0 {
return fmt.Errorf("invalid AutoLockReleaseInterval value: must be -1 or positive integer, got '%d'", timeout)
}
}
return nil
}