func newCluster()

in pkg/apisix/cluster.go [132:245]


func newCluster(ctx context.Context, o *ClusterOptions) (Cluster, error) {
	if o.BaseURL == "" {
		return nil, errors.New("empty base url")
	}
	if o.Timeout == time.Duration(0) {
		o.Timeout = _defaultTimeout
	}
	if o.SyncInterval.Duration == time.Duration(0) {
		o.SyncInterval = types.TimeDuration{Duration: _defaultSyncInterval}
	}
	o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")

	u, err := url.Parse(o.BaseURL)
	if err != nil {
		return nil, err
	}

	// if the version is not v3, then fallback to v2
	adminVersion := o.AdminAPIVersion
	if adminVersion != "v3" {
		adminVersion = "v2"
	}
	c := &cluster{
		adminVersion: adminVersion,
		name:         o.Name,
		baseURL:      o.BaseURL,
		baseURLHost:  u.Host,
		adminKey:     o.AdminKey,
		prefix:       o.Prefix,
		cli: &http.Client{
			Timeout:   o.Timeout,
			Transport: _defaultTransport,
		},
		cacheState:        _cacheSyncing, // default state
		cacheSynced:       make(chan struct{}),
		syncComparison:    o.SyncComparison,
		metricsCollector:  o.MetricsCollector,
		sslKeyEncryptSalt: o.SSLKeyEncryptSalt,
	}

	if o.EnableEtcdServer {
		api7log.DefaultLogger, _ = api7log.NewLogger(
			api7log.WithSkipFrames(3),
			api7log.WithLogLevel("info"),
		)
		c.adapter = adapter.NewEtcdAdapter(nil)
		c.route = newRouteMem(c)
		c.upstream = newUpstreamMem(c)
		c.ssl = newSSLMem(c)
		c.streamRoute = newStreamRouteMem(c)
		c.globalRules = newGlobalRuleMem(c)
		c.consumer = newConsumerMem(c)
		c.plugin = newPluginClient(c)
		c.schema = newSchemaClient(c)
		c.pluginConfig = newPluginConfigMem(c)
		c.upstreamServiceRelation = newUpstreamServiceRelation(c)
		c.pluginMetadata = newPluginMetadataMem(c)

		c.validator, err = NewReferenceFile("conf/apisix-schema.json")
		if err != nil {
			return nil, err
		}

		c.generatedObjCache, _ = cache.NewNoopDBCache()
		c.cache, err = cache.NewMemDBCache()
		if err != nil {
			return nil, err
		}

		fmt.Println("start etcd server")
		ln, err := net.Listen("tcp", o.ListenAddress)
		if err != nil {
			return nil, err
		}
		go c.adapter.Serve(ctx, ln)
	} else {
		c.route = newRouteClient(c)
		c.upstream = newUpstreamClient(c)
		c.ssl = newSSLClient(c)
		c.streamRoute = newStreamRouteClient(c)
		c.globalRules = newGlobalRuleClient(c)
		c.consumer = newConsumerClient(c)
		c.plugin = newPluginClient(c)
		c.schema = newSchemaClient(c)
		c.pluginConfig = newPluginConfigClient(c)
		c.upstreamServiceRelation = newUpstreamServiceRelation(c)
		c.pluginMetadata = newPluginMetadataClient(c)
		c.validator = newDummyValidator()

		c.cache, err = cache.NewMemDBCache()
		if err != nil {
			return nil, err
		}

		if o.SyncComparison {
			c.generatedObjCache, err = cache.NewMemDBCache()
		} else {
			c.generatedObjCache, err = cache.NewNoopDBCache()
		}
		if err != nil {
			return nil, err
		}

		if o.CacheSynced {
			c.waitforCacheSync = true
			go c.syncCache(ctx)
		}
		if o.SchemaSynced {
			go c.syncSchema(ctx, o.SyncInterval.Duration)
		}
	}

	return c, nil
}