internal/pkg/core/router/service.go (155 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// Package router provides HTTP routing capabilities for Synapse APIs.
//
// The RouterService is the main component of this package, providing:
// - API registration with automatic route creation from resources
// - HTTP server lifecycle management with automatic start/stop
// - Request handling with conversion to/from Synapse message contexts
// - Method-based routing for RESTful APIs
package router
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"strings"
"time"
"encoding/json"
"github.com/apache/synapse-go/internal/pkg/core/artifacts"
"github.com/apache/synapse-go/internal/pkg/core/synctx"
"github.com/apache/synapse-go/internal/pkg/loggerfactory"
)
const (
componentName = "router"
)
// RouterService manages API routing and server lifecycle
type RouterService struct {
server *http.Server
router *http.ServeMux
port string // :8290
hostname string
logger *slog.Logger
}
// NewRouterService creates a new router service with the given port and hostname
func NewRouterService(port string, hostname string) *RouterService {
rs := &RouterService{
router: http.NewServeMux(),
hostname: hostname,
port: port,
}
rs.logger = loggerfactory.GetLogger(componentName, rs)
return rs
}
func (rs *RouterService) UpdateLogger() {
rs.logger = loggerfactory.GetLogger(componentName, rs)
}
// RegisterAPI registers a new API with the router service
func (rs *RouterService) RegisterAPI(ctx context.Context, api artifacts.API) error {
// Determine base path based on context and version
basePath := api.Context
// Remove trailing slash from context if present
if len(basePath) > 1 && basePath[len(basePath)-1] == '/' {
basePath = basePath[:len(basePath)-1]
}
// Handle versioning based on versionType
if api.Version != "" && api.VersionType != "" {
switch api.VersionType {
case "url":
// For URL type, add version as a path segment
basePath = basePath + "/" + api.Version
case "context":
// For context type, replace {version} placeholder if it exists
versionPattern := "{version}"
basePath = strings.Replace(basePath, versionPattern, api.Version, 1)
}
}
// Create a subrouter for this API
apiHandler := http.NewServeMux()
// Register each resource in the API
for _, resource := range api.Resources {
// Register a handler for each HTTP method in the resource
for _, method := range resource.Methods {
// Construct the full pattern: "METHOD /path/to/resource"
pattern := method + " " + resource.URITemplate.PathTemplate
// Create a wrapper handler that checks query parameters before forwarding to the resource handler
queryParamHandler := rs.createQueryParamMiddleware(resource, rs.createResourceHandler(resource))
apiHandler.HandleFunc(pattern, queryParamHandler)
rs.logger.Info("Registered route for API",
slog.String("api_name", api.Name),
slog.String("pattern", pattern))
}
}
// Register the API handler with the main router
rs.router.Handle(basePath+"/", http.StripPrefix(basePath, apiHandler))
return nil
}
// createHandlerFunc creates an HTTP handler function for the given API resource
func (rs *RouterService) createResourceHandler(resource artifacts.Resource) http.HandlerFunc {
handler := func(w http.ResponseWriter, r *http.Request) {
// Create message context
msgContext := synctx.CreateMsgContext()
// Set request body into message context properties
msgContext.Properties["http_request_body"] = r.Body
// Set path parameters into message context properties
pathParamsMap := make(map[string]string)
for _, pathParam := range resource.URITemplate.PathParameters {
pathParamsMap[pathParam] = r.PathValue(pathParam)
}
msgContext.Properties["uriParams"] = pathParamsMap
// Set query parameters into message context properties
queryParams := r.URL.Query()
// If there are predefined query parameters, map each to their corresponding variable
if len(resource.URITemplate.QueryParameters) > 0 {
// Create a map to store the variable mappings
queryVarMap := make(map[string]string)
// Loop through each predefined query parameter
for paramName, varName := range resource.URITemplate.QueryParameters {
// Get the value from the request
if values, exists := queryParams[paramName]; exists && len(values) > 0 {
// Map the query parameter value to the variable name
queryVarMap[varName] = values[0]
}
}
// Store the variable mapping in the message context
msgContext.Properties["queryParams"] = queryVarMap
}
// Process through mediation pipeline
success := resource.Mediate(msgContext)
// Write response
if success {
for name, value := range msgContext.Headers {
w.Header().Set(name, value)
}
if msgContext.Message.RawPayload != nil {
w.Write(msgContext.Message.RawPayload)
}
} else {
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
}
return handler
}
// createQueryParamMiddleware creates a middleware that validates query parameters against predefined parameters
func (rs *RouterService) createQueryParamMiddleware(resource artifacts.Resource, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// If there are no predefined query parameters, just call the next handler
if len(resource.URITemplate.QueryParameters) == 0 {
next(w, r)
return
}
// Get query parameters from the request
queryParams := r.URL.Query()
// Check if query parameter keys match exactly with the predefined keys
// First, ensure all request query params exist in predefined params
for key := range queryParams {
if _, exists := resource.URITemplate.QueryParameters[key]; !exists {
// Query parameter not defined in the template, reject the request
http.Error(w, fmt.Sprintf("Unsupported query parameter: %s", key), http.StatusBadRequest)
return
}
}
// Now ensure all predefined query params exist in the request
for key := range resource.URITemplate.QueryParameters {
if !queryParams.Has(key) {
// Required query parameter is missing, reject the request
http.Error(w, fmt.Sprintf("Missing required query parameter: %s", key), http.StatusBadRequest)
return
}
}
// All parameters in the request are valid and all required parameters are present
next(w, r)
}
}
// startServer starts the HTTP server
func (rs *RouterService) StartServer(ctx context.Context) {
//eg:- localhost:8290
addr := rs.hostname + rs.port
rs.server = &http.Server{
Addr: addr,
Handler: rs.router,
}
// Register health/liveness endpoints
rs.registerLivelinessEndpoint()
rs.logger.Info("liveness endpoint registered")
// Start the server in a goroutine
go func() {
rs.logger.Info("Starting HTTP server", "address", addr)
if err := rs.server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
rs.logger.Error("HTTP server error", slog.String("error", err.Error()))
}
rs.logger.Info("HTTP server stopped serving new connections")
}()
}
func (rs *RouterService) StopServer() {
if rs.server != nil {
rs.logger.Info("Shutting down HTTP server...")
shutdownCtx, shutdownRelease := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownRelease()
if err := rs.server.Shutdown(shutdownCtx); err != nil {
rs.logger.Error("Error shutting down HTTP server", "error", err.Error())
}
}
}
// registerHealthEndpoints registers health and liveness endpoints
func (rs *RouterService) registerLivelinessEndpoint() {
// liveliness probe endpoint
rs.router.HandleFunc("/livez", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "UP",
"timestamp": time.Now().Format(time.RFC3339),
})
})
}