in pkg/istio-agent/xds_proxy.go [133:237]
func initXdsProxy(ia *Agent) (*XdsProxy, error) {
var err error
localHostAddr := localHostIPv4
if ia.cfg.IsIPv6 {
localHostAddr = localHostIPv6
}
var envoyProbe ready.Prober
if !ia.cfg.DisableEnvoy {
envoyProbe = &ready.Probe{
AdminPort: uint16(ia.proxyConfig.ProxyAdminPort),
LocalHostAddr: localHostAddr,
}
}
cache := wasm.NewLocalFileCache(constants.IstioDataDir, wasm.DefaultWasmModulePurgeInterval, wasm.DefaultWasmModuleExpiry, ia.cfg.WASMInsecureRegistries)
proxy := &XdsProxy{
istiodAddress: ia.proxyConfig.DiscoveryAddress,
istiodSAN: ia.cfg.IstiodSAN,
clusterID: ia.secOpts.ClusterID,
handlers: map[string]ResponseHandler{},
stopChan: make(chan struct{}),
healthChecker: health.NewWorkloadHealthChecker(ia.proxyConfig.ReadinessProbe, envoyProbe, ia.cfg.ProxyIPAddresses, ia.cfg.IsIPv6),
xdsHeaders: ia.cfg.XDSHeaders,
xdsUdsPath: ia.cfg.XdsUdsPath,
wasmCache: cache,
proxyAddresses: ia.cfg.ProxyIPAddresses,
downstreamGrpcOptions: ia.cfg.DownstreamGrpcOptions,
}
if ia.localDNSServer != nil {
proxy.handlers[v3.NameTableType] = func(resp *any.Any) error {
var nt dnsProto.NameTable
if err := resp.UnmarshalTo(&nt); err != nil {
log.Errorf("failed to unmarshal name table: %v", err)
return err
}
ia.localDNSServer.UpdateLookupTable(&nt)
return nil
}
}
if ia.cfg.EnableDynamicProxyConfig && ia.secretCache != nil {
proxy.handlers[v3.ProxyConfigType] = func(resp *any.Any) error {
pc := &meshconfig.ProxyConfig{}
if err := resp.UnmarshalTo(pc); err != nil {
log.Errorf("failed to unmarshal proxy config: %v", err)
return err
}
caCerts := pc.GetCaCertificatesPem()
log.Debugf("received new certificates to add to mesh trust domain: %v", caCerts)
trustBundle := []byte{}
for _, cert := range caCerts {
trustBundle = util.AppendCertByte(trustBundle, []byte(cert))
}
return ia.secretCache.UpdateConfigTrustBundle(trustBundle)
}
}
proxyLog.Infof("Initializing with upstream address %q and cluster %q", proxy.istiodAddress, proxy.clusterID)
if err = proxy.initDownstreamServer(); err != nil {
return nil, err
}
if err = proxy.InitIstiodDialOptions(ia); err != nil {
return nil, err
}
go func() {
if err := proxy.downstreamGrpcServer.Serve(proxy.downstreamListener); err != nil {
log.Errorf("failed to accept downstream gRPC connection %v", err)
}
}()
go proxy.healthChecker.PerformApplicationHealthCheck(func(healthEvent *health.ProbeEvent) {
// Store the same response as Delta and SotW. Depending on how Envoy connects we will use one or the other.
var req *discovery.DiscoveryRequest
if healthEvent.Healthy {
req = &discovery.DiscoveryRequest{TypeUrl: v3.HealthInfoType}
} else {
req = &discovery.DiscoveryRequest{
TypeUrl: v3.HealthInfoType,
ErrorDetail: &google_rpc.Status{
Code: int32(codes.Internal),
Message: healthEvent.UnhealthyMessage,
},
}
}
proxy.PersistRequest(req)
var deltaReq *discovery.DeltaDiscoveryRequest
if healthEvent.Healthy {
deltaReq = &discovery.DeltaDiscoveryRequest{TypeUrl: v3.HealthInfoType}
} else {
deltaReq = &discovery.DeltaDiscoveryRequest{
TypeUrl: v3.HealthInfoType,
ErrorDetail: &google_rpc.Status{
Code: int32(codes.Internal),
Message: healthEvent.UnhealthyMessage,
},
}
}
proxy.PersistDeltaRequest(deltaReq)
}, proxy.stopChan)
return proxy, nil
}