pkg/exporter/nettop/cri.go (486 lines of code) (raw):

package nettop import ( "context" "fmt" "net" "net/url" "os" "strings" "time" log "github.com/sirupsen/logrus" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" ) var ( criClient internalapi.RuntimeService criInfo *CRIInfo apiserverClient *PodCache ) const ( unixProtocol = "unix" maxMsgSize = 1024 * 1024 * 16 kubeAPIVersion = "0.1.0" ) var runtimeEndpoints = []string{"/var/run/dockershim.sock", "/run/containerd/containerd.sock", "/run/k3s/containerd/containerd.sock", "/var/run/cri-dockerd.sock"} func initCriClient(eps []string) (err error) { if criClient != nil { return } if sock, ok := os.LookupEnv("RUNTIME_SOCK"); ok { if _, err = os.Stat(sock); os.IsNotExist(err) { return fmt.Errorf("cannot find cri sock %s", sock) } criClient, err = NewRemoteRuntimeService(sock, 10*time.Second) if err != nil { return fmt.Errorf("connect cri sock %s error: %w", sock, err) } return } for _, candidate := range eps { if _, err := os.Stat(candidate); os.IsNotExist(err) { continue } criClient, err = NewRemoteRuntimeService(candidate, 10*time.Second) if err != nil { continue } return } return fmt.Errorf("cannot find valid cri sock in %s", strings.Join(eps, ",")) } func initCriInfo() error { if criInfo != nil { return nil } version, err := criClient.Version(kubeAPIVersion) if err != nil { return fmt.Errorf("failed get runtime version: %w", err) } criInfo = &CRIInfo{ Version: version.RuntimeApiVersion, RuntimeName: version.RuntimeName, RuntimeVersion: version.RuntimeVersion, } log.Infof("cri info: version=%s runtime=%s runtimeVersion=%s", criInfo.Version, criInfo.RuntimeName, criInfo.RuntimeVersion) return nil } // remoteRuntimeService is a gRPC implementation of internalapi.RuntimeService. type remoteRuntimeService struct { timeout time.Duration runtimeClient runtimeapi.RuntimeServiceClient runtimeClientV1alpha2 runtimeapiV1alpha2.RuntimeServiceClient } // useV1API returns true if the v1 CRI API should be used instead of v1alpha2. func (r *remoteRuntimeService) useV1API() bool { return r.runtimeClientV1alpha2 == nil } func (r *remoteRuntimeService) versionV1alpha2(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error) { typedVersion, err := r.runtimeClientV1alpha2.Version(ctx, &runtimeapiV1alpha2.VersionRequest{ Version: apiVersion, }) if err != nil { return nil, err } if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" { return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion) } return fromV1alpha2VersionResponse(typedVersion), err } // Version returns the runtime name, runtime version and runtime API version. func (r *remoteRuntimeService) Version(apiVersion string) (*runtimeapi.VersionResponse, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.versionV1(ctx, apiVersion) } return r.versionV1alpha2(ctx, apiVersion) } func (r *remoteRuntimeService) versionV1(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error) { typedVersion, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{ Version: apiVersion, }) if err != nil { return nil, err } if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" { return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion) } return typedVersion, err } func getConnection(ctx context.Context, endPoint string) (*grpc.ClientConn, error) { var conn *grpc.ClientConn addr, dialer, err := GetAddressAndDialer(endPoint) if err != nil { return nil, err } conn, err = grpc.DialContext(ctx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithContextDialer(dialer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize))) if err != nil { return nil, fmt.Errorf("connect endpoint '%s', make sure you are running as root and the endpoint has been started", endPoint) } return conn, nil } func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) { ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel() conn, err := getConnection(ctx, endpoint) if err != nil { return nil, err } service := &remoteRuntimeService{ timeout: connectionTimeout, } if err := service.determineAPIVersion(conn); err != nil { return nil, err } return service, nil } // Attach prepares a streaming endpoint to attach to a running container, and returns the address. func (r *remoteRuntimeService) Attach(_ *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { return nil, nil } // CheckpointContainer triggers a checkpoint of the given CheckpointContainerRequest func (r *remoteRuntimeService) CheckpointContainer(_ *runtimeapi.CheckpointContainerRequest) error { return nil } // ContainerStats returns the stats of the container. func (r *remoteRuntimeService) ContainerStats(_ string) (*runtimeapi.ContainerStats, error) { return nil, nil } // CreateContainer creates a new container in the specified PodSandbox. func (r *remoteRuntimeService) CreateContainer(_ string, _ *runtimeapi.ContainerConfig, _ *runtimeapi.PodSandboxConfig) (string, error) { return "", nil } // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. func (r *remoteRuntimeService) Exec(_ *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { return nil, nil } // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (r *remoteRuntimeService) ExecSync(_ string, _ []string, _ time.Duration) (stdout []byte, stderr []byte, err error) { return nil, nil, nil } func (r *remoteRuntimeService) GetContainerEvents(_ chan *runtimeapi.ContainerEventResponse) error { return nil } // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. func (r *remoteRuntimeService) PortForward(_ *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { return nil, nil } // RemoveContainer removes the container. If the container is running, the container // should be forced to removal. func (r *remoteRuntimeService) RemoveContainer(_ string) (err error) { return nil } // RemovePodSandbox removes the sandbox. If there are any containers in the // sandbox, they should be forcibly removed. func (r *remoteRuntimeService) RemovePodSandbox(_ string) (err error) { return nil } // ReopenContainerLog reopens the container log file. func (r *remoteRuntimeService) ReopenContainerLog(_ string) (err error) { return nil } // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. func (r *remoteRuntimeService) RunPodSandbox(_ *runtimeapi.PodSandboxConfig, _ string) (string, error) { return "", nil } // StartContainer starts the container. func (r *remoteRuntimeService) StartContainer(_ string) (err error) { return nil } // StopContainer stops a running container with a grace period (i.e., timeout). func (r *remoteRuntimeService) StopContainer(_ string, _ int64) (err error) { return nil } // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be forced to termination. func (r *remoteRuntimeService) StopPodSandbox(_ string) (err error) { return nil } // UpdateContainerResources updates a containers resource config func (r *remoteRuntimeService) UpdateContainerResources(_ string, _ *runtimeapi.ContainerResources) (err error) { return nil } // UpdateRuntimeConfig updates the config of a runtime service. The only // update payload currently supported is the pod CIDR assigned to a node, // and the runtime service just proxies it down to the network plugin. func (r *remoteRuntimeService) UpdateRuntimeConfig(_ *runtimeapi.RuntimeConfig) (err error) { return nil } // Status returns the status of the runtime. func (r *remoteRuntimeService) Status(verbose bool) (*runtimeapi.StatusResponse, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.statusV1(ctx, verbose) } return r.statusV1alpha2(ctx, verbose) } func (r *remoteRuntimeService) statusV1alpha2(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error) { resp, err := r.runtimeClientV1alpha2.Status(ctx, &runtimeapiV1alpha2.StatusRequest{ Verbose: verbose, }) if err != nil { return nil, err } if resp.Status == nil || len(resp.Status.Conditions) < 2 { errorMessage := "RuntimeReady or NetworkReady condition are not set" err := errors.New(errorMessage) return nil, err } return fromV1alpha2StatusResponse(resp), nil } func (r *remoteRuntimeService) statusV1(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error) { resp, err := r.runtimeClient.Status(ctx, &runtimeapi.StatusRequest{ Verbose: verbose, }) if err != nil { return nil, err } if resp.Status == nil || len(resp.Status.Conditions) < 2 { errorMessage := "RuntimeReady or NetworkReady condition are not set" err := errors.New(errorMessage) return nil, err } return resp, nil } // PodSandboxStatus returns the status of the PodSandbox. func (r *remoteRuntimeService) PodSandboxStatus(podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.podSandboxStatusV1(ctx, podSandBoxID, verbose) } return r.podSandboxStatusV1alpha2(ctx, podSandBoxID, verbose) } func (r *remoteRuntimeService) podSandboxStatusV1alpha2(ctx context.Context, podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) { resp, err := r.runtimeClientV1alpha2.PodSandboxStatus(ctx, &runtimeapiV1alpha2.PodSandboxStatusRequest{ PodSandboxId: podSandBoxID, Verbose: verbose, }) if err != nil { return nil, err } res := fromV1alpha2PodSandboxStatusResponse(resp) if res.Status != nil { if err := verifySandboxStatus(res.Status); err != nil { return nil, err } } return res, nil } func (r *remoteRuntimeService) podSandboxStatusV1(ctx context.Context, podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) { resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{ PodSandboxId: podSandBoxID, Verbose: verbose, }) if err != nil { return nil, err } status := resp.Status if resp.Status != nil { if err := verifySandboxStatus(status); err != nil { return nil, err } } return resp, nil } // PodSandboxStats returns the stats of the pod. func (r *remoteRuntimeService) PodSandboxStats(podSandboxID string) (*runtimeapi.PodSandboxStats, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.podSandboxStatsV1(ctx, podSandboxID) } return r.podSandboxStatsV1alpha2(ctx, podSandboxID) } func (r *remoteRuntimeService) podSandboxStatsV1alpha2(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) { resp, err := r.runtimeClientV1alpha2.PodSandboxStats(ctx, &runtimeapiV1alpha2.PodSandboxStatsRequest{ PodSandboxId: podSandboxID, }) if err != nil { return nil, err } return fromV1alpha2PodSandboxStats(resp.GetStats()), nil } func (r *remoteRuntimeService) podSandboxStatsV1(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) { resp, err := r.runtimeClient.PodSandboxStats(ctx, &runtimeapi.PodSandboxStatsRequest{ PodSandboxId: podSandboxID, }) if err != nil { return nil, err } return resp.GetStats(), nil } // ListPodSandboxStats returns the list of pod sandbox stats given the filter func (r *remoteRuntimeService) ListPodSandboxStats(filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) { // Set timeout, because runtimes are able to cache disk stats results ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.listPodSandboxStatsV1(ctx, filter) } return r.listPodSandboxStatsV1alpha2(ctx, filter) } func (r *remoteRuntimeService) listPodSandboxStatsV1alpha2(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) { resp, err := r.runtimeClientV1alpha2.ListPodSandboxStats(ctx, &runtimeapiV1alpha2.ListPodSandboxStatsRequest{ Filter: v1alpha2PodSandboxStatsFilter(filter), }) if err != nil { return nil, err } return fromV1alpha2ListPodSandboxStatsResponse(resp).GetStats(), nil } func (r *remoteRuntimeService) listPodSandboxStatsV1(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) { resp, err := r.runtimeClient.ListPodSandboxStats(ctx, &runtimeapi.ListPodSandboxStatsRequest{ Filter: filter, }) if err != nil { return nil, err } return resp.GetStats(), nil } // ListPodSandbox returns a list of PodSandboxes. func (r *remoteRuntimeService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.listPodSandboxV1(ctx, filter) } return r.listPodSandboxV1alpha2(ctx, filter) } // ListContainers lists containers by filters. func (r *remoteRuntimeService) ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.listContainersV1(ctx, filter) } return r.listContainersV1alpha2(ctx, filter) } func (r *remoteRuntimeService) listPodSandboxV1alpha2(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) { resp, err := r.runtimeClientV1alpha2.ListPodSandbox(ctx, &runtimeapiV1alpha2.ListPodSandboxRequest{ Filter: v1alpha2PodSandboxFilter(filter), }) if err != nil { return nil, err } return fromV1alpha2ListPodSandboxResponse(resp).Items, nil } func (r *remoteRuntimeService) listPodSandboxV1(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) { resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{ Filter: filter, }) if err != nil { return nil, err } return resp.Items, nil } func (r *remoteRuntimeService) listContainersV1alpha2(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) { resp, err := r.runtimeClientV1alpha2.ListContainers(ctx, &runtimeapiV1alpha2.ListContainersRequest{ Filter: v1alpha2ContainerFilter(filter), }) if err != nil { return nil, err } return fromV1alpha2ListContainersResponse(resp).Containers, nil } func (r *remoteRuntimeService) listContainersV1(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) { resp, err := r.runtimeClient.ListContainers(ctx, &runtimeapi.ListContainersRequest{ Filter: filter, }) if err != nil { return nil, err } return resp.Containers, nil } // ListContainerStats returns the list of ContainerStats given the filter. func (r *remoteRuntimeService) ListContainerStats(filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { // Do not set timeout, because writable layer stats collection takes time. // TODO(random-liu): Should we assume runtime should cache the result, and set timeout here? ctx, cancel := getContextWithCancel() defer cancel() if r.useV1API() { return r.listContainerStatsV1(ctx, filter) } return r.listContainerStatsV1alpha2(ctx, filter) } func (r *remoteRuntimeService) listContainerStatsV1(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { resp, err := r.runtimeClient.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{ Filter: filter, }) if err != nil { return nil, err } return resp.GetStats(), nil } func (r *remoteRuntimeService) listContainerStatsV1alpha2(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) { resp, err := r.runtimeClientV1alpha2.ListContainerStats(ctx, &runtimeapiV1alpha2.ListContainerStatsRequest{ Filter: v1alpha2ContainerStatsFilter(filter), }) if err != nil { return nil, err } return fromV1alpha2ListContainerStatsResponse(resp).GetStats(), nil } // ContainerStatus returns the container status. func (r *remoteRuntimeService) ContainerStatus(containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() if r.useV1API() { return r.containerStatusV1(ctx, containerID, verbose) } return r.containerStatusV1alpha2(ctx, containerID, verbose) } func (r *remoteRuntimeService) containerStatusV1(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) { resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{ ContainerId: containerID, Verbose: verbose, }) if err != nil { return nil, err } status := resp.Status if resp.Status != nil { if err := verifyContainerStatus(status); err != nil { return nil, err } } return resp, nil } func (r *remoteRuntimeService) containerStatusV1alpha2(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) { resp, err := r.runtimeClientV1alpha2.ContainerStatus(ctx, &runtimeapiV1alpha2.ContainerStatusRequest{ ContainerId: containerID, Verbose: verbose, }) if err != nil { return nil, err } res := fromV1alpha2ContainerStatusResponse(resp) if resp.Status != nil { if err := verifyContainerStatus(res.Status); err != nil { return nil, err } } return res, nil } // determineAPIVersion tries to connect to the remote runtime by using the // highest available API version. // // A GRPC redial will always use the initially selected (or automatically // determined) CRI API version. If the redial was due to the container runtime // being upgraded, then the container runtime must also support the initially // selected version or the redial is expected to fail, which requires a restart // of kubelet. func (r *remoteRuntimeService) determineAPIVersion(conn *grpc.ClientConn) error { ctx, cancel := getContextWithTimeout(r.timeout) defer cancel() r.runtimeClient = runtimeapi.NewRuntimeServiceClient(conn) if _, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{}); err == nil { log.Warn("Using CRI v1 runtime API") } else if status.Code(err) == codes.Unimplemented { log.Warn("Using CRI v1alpha2 runtime API") r.runtimeClientV1alpha2 = runtimeapiV1alpha2.NewRuntimeServiceClient(conn) } else { return fmt.Errorf("unable to determine runtime API version: %w", err) } return nil } func parseEndpoint(endpoint string) (string, string, error) { u, err := url.Parse(endpoint) if err != nil { return "", "", err } switch u.Scheme { case "tcp": return "tcp", u.Host, nil case "unix": return "unix", u.Path, nil case "": return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint) default: return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme) } } func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) { if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" { fallbackEndpoint := fallbackProtocol + "://" + endpoint protocol, addr, err = parseEndpoint(fallbackEndpoint) } return } // GetAddressAndDialer returns the address parsed from the given endpoint and a context dialer. func GetAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) { protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol) if err != nil { return "", nil, err } if protocol != unixProtocol { return "", nil, fmt.Errorf("only support unix socket endpoint") } return addr, dial, nil } func dial(ctx context.Context, addr string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr) }