in internal/app/adapters/inbound/file/file_inbound.go [195:264]
func (f *FileInboundEndpoint) processFile(ctx context.Context, fileURI string) error {
defer f.processingFiles.Delete(fileURI)
fileSize, err := f.protocolHandler.GetSize(fileURI)
if err != nil {
return fmt.Errorf("failed to get file size: %w", err)
}
lastModified, err := f.protocolHandler.GetLastModified(fileURI)
if err != nil {
return fmt.Errorf("failed to get last modified: %w", err)
}
filePath := f.protocolHandler.GetPath(fileURI)
fileName := f.protocolHandler.GetName(fileURI)
// Set up headers with file metadata
headers := map[string]string{
"FILE_LENGTH": fmt.Sprintf("%d", fileSize),
"LAST_MODIFIED": fmt.Sprintf("%d", lastModified.Unix()),
"FILE_URI": fileURI,
"FILE_PATH": filePath,
"FILE_NAME": fileName,
}
properties := map[string]string{
"isInbound": "true",
"ARTIFACT_NAME": "inboundendpointfile",
"inboundEndpointName": "file",
"ClientApiNonBlocking": "true",
}
// Create a message context with metadata but no content yet
msgContext := &synctx.MsgContext{
Properties: properties,
Message: synctx.Message{
ContentType: f.config.Parameters["transport.vfs.ContentType"],
},
Headers: headers,
}
// Read the file content
content, err := f.protocolHandler.ReadFile(fileURI)
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Set the payload if read was successful
msgContext.Message.RawPayload = content
// Check context before proceeding with mediation
select {
case <-ctx.Done():
return ctx.Err()
default:
// Process the file through mediator
if err := f.mediator.MediateInboundMessage(ctx, f.config.SequenceName, msgContext); err != nil {
if err := f.handleFileAction(fileURI, "Failure"); err != nil {
return fmt.Errorf("failed to handle file after failure: %w", err)
}
} else {
if err := f.handleFileAction(fileURI, "Process"); err != nil {
return fmt.Errorf("failed to handle file after process: %w", err)
}
}
return nil
}
}