api/internal/handler/data_loader/route_import.go (250 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package data_loader import ( "bytes" "context" "fmt" "path" "reflect" "github.com/gin-gonic/gin" "github.com/juliangruber/go-intersect" "github.com/pkg/errors" "github.com/shiningrush/droplet" "github.com/shiningrush/droplet/wrapper" wgin "github.com/shiningrush/droplet/wrapper/gin" "github.com/apisix/manager-api/internal/conf" "github.com/apisix/manager-api/internal/core/entity" "github.com/apisix/manager-api/internal/core/store" "github.com/apisix/manager-api/internal/handler" loader "github.com/apisix/manager-api/internal/handler/data_loader/loader" "github.com/apisix/manager-api/internal/handler/data_loader/loader/openapi3" ) type ImportHandler struct { routeStore store.Interface upstreamStore store.Interface serviceStore store.Interface consumerStore store.Interface sslStore store.Interface streamRouteStore store.Interface globalPluginStore store.Interface pluginConfigStore store.Interface protoStore store.Interface } func NewImportHandler() (handler.RouteRegister, error) { return &ImportHandler{ routeStore: store.GetStore(store.HubKeyRoute), upstreamStore: store.GetStore(store.HubKeyUpstream), serviceStore: store.GetStore(store.HubKeyService), consumerStore: store.GetStore(store.HubKeyConsumer), sslStore: store.GetStore(store.HubKeySsl), streamRouteStore: store.GetStore(store.HubKeyStreamRoute), globalPluginStore: store.GetStore(store.HubKeyGlobalRule), pluginConfigStore: store.GetStore(store.HubKeyPluginConfig), protoStore: store.GetStore(store.HubKeyProto), }, nil } func (h *ImportHandler) ApplyRoute(r *gin.Engine) { r.POST("/apisix/admin/import/routes", wgin.Wraps(h.Import, wrapper.InputType(reflect.TypeOf(ImportInput{})))) } type ImportResult struct { Total int `json:"total"` Failed int `json:"failed"` Errors []string `json:"errors"` } type LoaderType string type ImportInput struct { Type string `auto_read:"type"` TaskName string `auto_read:"task_name"` FileName string `auto_read:"_file"` FileContent []byte `auto_read:"file"` MergeMethod string `auto_read:"merge_method"` } const ( LoaderTypeOpenAPI3 LoaderType = "openapi3" ) func (h *ImportHandler) Import(c droplet.Context) (interface{}, error) { input := c.Input().(*ImportInput) // input file content check suffix := path.Ext(input.FileName) if suffix != ".json" && suffix != ".yaml" && suffix != ".yml" { return nil, errors.Errorf("required file type is .yaml, .yml or .json but got: %s", suffix) } contentLen := bytes.Count(input.FileContent, nil) - 1 if contentLen <= 0 { return nil, errors.New("uploaded file is empty") } if contentLen > conf.ImportSizeLimit { return nil, errors.Errorf("uploaded file size exceeds the limit, limit is %d", conf.ImportSizeLimit) } var l loader.Loader switch LoaderType(input.Type) { case LoaderTypeOpenAPI3: l = &openapi3.Loader{ MergeMethod: input.MergeMethod == "true", TaskName: input.TaskName, } break default: return nil, fmt.Errorf("unsupported data loader type: %s", input.Type) } dataSets, err := l.Import(input.FileContent) if err != nil { return nil, err } // Pre-checking for route duplication preCheckErrs := h.preCheck(c.Context(), dataSets) if _, ok := preCheckErrs[store.HubKeyRoute]; ok && len(preCheckErrs[store.HubKeyRoute]) > 0 { return h.convertToImportResult(dataSets, preCheckErrs), nil } // Create APISIX resources createErrs := h.createEntities(c.Context(), dataSets) return h.convertToImportResult(dataSets, createErrs), nil } // Pre-check imported data for duplicates // The main problem facing duplication is routing, so here // we mainly check the duplication of routes, based on // domain name and uri. func (h *ImportHandler) preCheck(ctx context.Context, data *loader.DataSets) map[store.HubKey][]string { errs := make(map[store.HubKey][]string) for _, route := range data.Routes { errs[store.HubKeyRoute] = make([]string, 0) o, err := h.routeStore.List(ctx, store.ListInput{ // The check logic here is that if when a duplicate HOST or URI // has been found, the HTTP method is checked for overlap, and // if there is overlap it is determined to be a duplicate route // and the import is rejected. Predicate: func(obj interface{}) bool { r := obj.(*entity.Route) // Check URI and host duplication isURIDuplicated := r.URI != "" && route.URI != "" && r.URI == route.URI isURIsDuplicated := len(r.Uris) > 0 && len(route.Uris) > 0 && len(intersect.Hash(r.Uris, route.Uris)) > 0 isMethodDuplicated := len(intersect.Hash(r.Methods, route.Methods)) > 0 // First check for duplicate URIs if isURIDuplicated || isURIsDuplicated { // Then check if the host field exists, and if it does, check for duplicates if r.Host != "" && route.Host != "" { return r.Host == route.Host && isMethodDuplicated } else if len(r.Hosts) > 0 && len(route.Hosts) > 0 { return len(intersect.Hash(r.Hosts, route.Hosts)) > 0 && isMethodDuplicated } // If the host field does not exist, only the presence or absence // of HTTP method duplication is returned by default. return isMethodDuplicated } return false }, PageSize: 0, PageNumber: 0, }) if err != nil { // When a special storage layer error occurs, return directly. return map[store.HubKey][]string{ store.HubKeyRoute: {err.Error()}, } } // Duplicate routes found if o.TotalSize > 0 { for _, row := range o.Rows { r, ok := row.(*entity.Route) if ok { errs[store.HubKeyRoute] = append(errs[store.HubKeyRoute], errors.Errorf("%s is duplicated with route %s", route.Uris[0], r.Name). Error()) } } } } return errs } // Create parsed resources func (h *ImportHandler) createEntities(ctx context.Context, data *loader.DataSets) map[store.HubKey][]string { errs := make(map[store.HubKey][]string) for _, route := range data.Routes { _, err := h.routeStore.Create(ctx, &route) if err != nil { errs[store.HubKeyRoute] = append(errs[store.HubKeyRoute], err.Error()) } } for _, upstream := range data.Upstreams { _, err := h.upstreamStore.Create(ctx, &upstream) if err != nil { errs[store.HubKeyUpstream] = append(errs[store.HubKeyUpstream], err.Error()) } } for _, service := range data.Services { _, err := h.serviceStore.Create(ctx, &service) if err != nil { errs[store.HubKeyService] = append(errs[store.HubKeyService], err.Error()) } } for _, consumer := range data.Consumers { _, err := h.consumerStore.Create(ctx, &consumer) if err != nil { errs[store.HubKeyConsumer] = append(errs[store.HubKeyConsumer], err.Error()) } } for _, ssl := range data.SSLs { _, err := h.sslStore.Create(ctx, &ssl) if err != nil { errs[store.HubKeySsl] = append(errs[store.HubKeySsl], err.Error()) } } for _, route := range data.StreamRoutes { _, err := h.streamRouteStore.Create(ctx, &route) if err != nil { errs[store.HubKeyStreamRoute] = append(errs[store.HubKeyStreamRoute], err.Error()) } } for _, plugin := range data.GlobalPlugins { _, err := h.globalPluginStore.Create(ctx, &plugin) if err != nil { errs[store.HubKeyGlobalRule] = append(errs[store.HubKeyGlobalRule], err.Error()) } } for _, config := range data.PluginConfigs { _, err := h.pluginConfigStore.Create(ctx, &config) if err != nil { errs[store.HubKeyPluginConfig] = append(errs[store.HubKeyPluginConfig], err.Error()) } } for _, proto := range data.Protos { _, err := h.protoStore.Create(ctx, &proto) if err != nil { errs[store.HubKeyProto] = append(errs[store.HubKeyProto], err.Error()) } } return errs } // Convert import errors to response result func (ImportHandler) convertToImportResult(data *loader.DataSets, errs map[store.HubKey][]string) map[store.HubKey]ImportResult { return map[store.HubKey]ImportResult{ store.HubKeyRoute: { Total: len(data.Routes), Failed: len(errs[store.HubKeyRoute]), Errors: errs[store.HubKeyRoute], }, store.HubKeyUpstream: { Total: len(data.Upstreams), Failed: len(errs[store.HubKeyUpstream]), Errors: errs[store.HubKeyUpstream], }, store.HubKeyService: { Total: len(data.Services), Failed: len(errs[store.HubKeyService]), Errors: errs[store.HubKeyService], }, store.HubKeyConsumer: { Total: len(data.Consumers), Failed: len(errs[store.HubKeyConsumer]), Errors: errs[store.HubKeyConsumer], }, store.HubKeySsl: { Total: len(data.SSLs), Failed: len(errs[store.HubKeySsl]), Errors: errs[store.HubKeySsl], }, store.HubKeyStreamRoute: { Total: len(data.StreamRoutes), Failed: len(errs[store.HubKeyStreamRoute]), Errors: errs[store.HubKeyStreamRoute], }, store.HubKeyGlobalRule: { Total: len(data.GlobalPlugins), Failed: len(errs[store.HubKeyGlobalRule]), Errors: errs[store.HubKeyGlobalRule], }, store.HubKeyPluginConfig: { Total: len(data.PluginConfigs), Failed: len(errs[store.HubKeyPluginConfig]), Errors: errs[store.HubKeyPluginConfig], }, store.HubKeyProto: { Total: len(data.Protos), Failed: len(errs[store.HubKeyProto]), Errors: errs[store.HubKeyProto], }, } }