pkg/client/reader.go (97 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package client
import (
"crypto/tls"
"crypto/x509"
"errors"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
protobuf "google.golang.org/protobuf/proto"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)
// ErrV2Unavailable error returned when Elastic Agent doesn't support V2.
var ErrV2Unavailable = errors.New("v2 protocol is not available")
// Service defined different services that the Elastic Agent states is available.
type Service proto.ConnInfoServices
const (
// ServiceCheckin V1 checkin service is available.
ServiceCheckin = Service(proto.ConnInfoServices_Checkin)
// ServiceCheckinV2 V2 checkin service is available.
ServiceCheckinV2 Service = Service(proto.ConnInfoServices_CheckinV2)
// ServiceStore store service is available.
ServiceStore Service = Service(proto.ConnInfoServices_Store)
// ServiceArtifact artifact service is available.
ServiceArtifact Service = Service(proto.ConnInfoServices_Artifact)
// ServiceLog log service is available.
ServiceLog Service = Service(proto.ConnInfoServices_Log)
)
// NewFromReader creates a new client reading the connection information from the io.Reader.
func NewFromReader(reader io.Reader, impl StateInterface, actions ...Action) (Client, error) {
connInfo := &proto.StartUpInfo{}
data, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
err = protobuf.Unmarshal(data, connInfo)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(connInfo.PeerCert, connInfo.PeerKey)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(connInfo.CaCert)
trans := credentials.NewTLS(&tls.Config{
ServerName: connInfo.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
})
return New(connInfo.Addr, connInfo.Token, impl, actions, grpc.WithTransportCredentials(trans)), nil
}
// NewV2FromReader creates a new V2 client reading the connection information from the io.Reader.
func NewV2FromReader(reader io.Reader, ver VersionInfo, opts ...V2ClientOption) (V2, []Service, error) {
info := &proto.StartUpInfo{}
data, err := io.ReadAll(reader)
if err != nil {
return nil, nil, err
}
err = protobuf.Unmarshal(data, info)
if err != nil {
return nil, nil, err
}
if info.AgentInfo != nil {
opts = append(opts, WithAgentInfo(AgentInfo{
ID: info.AgentInfo.Id,
Version: info.AgentInfo.Version,
Snapshot: info.AgentInfo.Snapshot,
ManagedMode: info.AgentInfo.Mode,
Unprivileged: info.AgentInfo.Unprivileged,
}))
}
if info.Services == nil {
return nil, []Service{ServiceCheckin}, ErrV2Unavailable
}
cert, err := tls.X509KeyPair(info.PeerCert, info.PeerKey)
if err != nil {
return nil, nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(info.CaCert)
trans := credentials.NewTLS(&tls.Config{
ServerName: info.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
})
for _, s := range info.Supports {
if s == proto.ConnectionSupports_CheckinChunking {
opts = append(opts, WithChunking(true))
}
}
if info.MaxMessageSize > 0 {
opts = append(opts, WithMaxMessageSize(int(info.MaxMessageSize)))
}
opts = append(opts, WithGRPCDialOptions(grpc.WithTransportCredentials(trans)))
client := NewV2(
info.Addr,
info.Token,
ver,
opts...,
)
services := make([]Service, 0, len(info.Services))
for _, srv := range info.Services {
services = append(services, Service(srv))
}
return client, services, nil
}