pkg/source/gcp/api/gcp_client.go (574 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"math"
"net/http"
"strings"
"sync"
"github.com/GoogleCloudPlatform/khi/pkg/common/cache"
"github.com/GoogleCloudPlatform/khi/pkg/common/httpclient"
"github.com/GoogleCloudPlatform/khi/pkg/common/token"
)
var ErrorRateLimitExceeds = errors.New("ratelimit exceeds. retry it later")
var ErrorInternalServerError = errors.New("internal server error")
var MinWaitTimeOnRetriableError = 5
var MaxWaitTimeOnRetriableError = 60
var MaxRetryCount = 10
var RetriableHttpResponseCodes = []int{
429, 500, 501, 502, 503,
}
var RetriableWithRefreshingTokenHttpResponseCodes = []int{
401, 403,
}
type multicloudAPIEndpoint struct {
Endpoint string
Location string
}
var multicloudAPIEndpoints = []multicloudAPIEndpoint{
{
Endpoint: "https://asia-east2-gkemulticloud.googleapis.com",
Location: "asia-east2",
},
{
Endpoint: "https://asia-northeast2-gkemulticloud.googleapis.com",
Location: "asia-northeast2",
},
{
Endpoint: "https://asia-south1-gkemulticloud.googleapis.com",
Location: "asia-south1",
},
{
Endpoint: "https://asia-southeast1-gkemulticloud.googleapis.com",
Location: "asia-southeast1",
},
{
Endpoint: "https://asia-southeast2-gkemulticloud.googleapis.com",
Location: "asia-southeast2",
},
{
Endpoint: "https://australia-southeast1-gkemulticloud.googleapis.com",
Location: "australia-southeast1",
},
{
Endpoint: "https://europe-north1-gkemulticloud.googleapis.com",
Location: "europe-north1",
},
{
Endpoint: "https://europe-west1-gkemulticloud.googleapis.com",
Location: "europe-west1",
},
{
Endpoint: "https://europe-west2-gkemulticloud.googleapis.com",
Location: "europe-west2",
},
{
Endpoint: "https://europe-west3-gkemulticloud.googleapis.com",
Location: "europe-west3",
},
{
Endpoint: "https://europe-west4-gkemulticloud.googleapis.com",
Location: "europe-west4",
},
{
Endpoint: "https://europe-west6-gkemulticloud.googleapis.com",
Location: "europe-west6",
},
{
Endpoint: "https://europe-west9-gkemulticloud.googleapis.com",
Location: "europe-west9",
},
{
Endpoint: "https://northamerica-northeast1-gkemulticloud.googleapis.com",
Location: "northamerica-northeast1",
},
{
Endpoint: "https://southamerica-east1-gkemulticloud.googleapis.com",
Location: "southamerica-east1",
},
{
Endpoint: "https://us-east4-gkemulticloud.googleapis.com",
Location: "us-east4",
},
{
Endpoint: "https://us-west1-gkemulticloud.googleapis.com",
Location: "us-west1",
},
}
type GCPClientImpl struct {
BaseClient httpclient.HTTPClient[*http.Response]
// This is a parameter for limiting the result length of List log entries api call for testing purpose.
MaxLogEntries int
}
// Digest implements task.CachableDependency.
func (pi *GCPClientImpl) Digest() string {
return "singleton"
}
var _ cache.CacheDependency = (*GCPClientImpl)(nil)
var _ GCPClient = (*GCPClientImpl)(nil)
func NewGCPClient(refresher token.TokenRefresher, headerProviders []httpclient.HTTPHeaderProvider) (GCPClient, error) {
return &GCPClientImpl{
BaseClient: httpclient.NewRetryHttpClient(httpclient.NewBasicHttpClient().WithHeaderProvider(headerProviders...), MinWaitTimeOnRetriableError, MaxWaitTimeOnRetriableError, MaxRetryCount, RetriableHttpResponseCodes, RetriableWithRefreshingTokenHttpResponseCodes,
refresher),
MaxLogEntries: math.MaxInt,
}, nil
}
func (c *GCPClientImpl) CreateGCPHttpRequest(ctx context.Context, method string, url string, body io.Reader) (*http.Request, error) {
req, err := http.NewRequestWithContext(ctx, method, url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
return req, nil
}
/**
* Get the list of GKE cluster names.
*/
func (c *GCPClientImpl) GetClusterNames(ctx context.Context, projectId string) ([]string, error) {
clusters, err := c.GetClusters(ctx, projectId)
if err != nil {
return nil, err
}
var result []string
for _, cluster := range clusters {
result = append(result, cluster.Name)
}
return result, nil
}
// Get all GKE clusters from container.googleapis.com
// ref: https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1/projects.locations.clusters/list
func (c *GCPClientImpl) GetClusters(ctx context.Context, projectId string) ([]Cluster, error) {
type clusterListResponse struct {
Clusters []*Cluster `json:"clusters"`
NextPageToken string `json:"nextPageToken"`
}
pc := NewPageClient[clusterListResponse](c.BaseClient)
clusterListResponses, err := pc.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
// location="-" is a special literal to express "all locations"
endpoint := fmt.Sprintf("https://container.googleapis.com/v1/projects/%s/locations/-/clusters", projectId)
if nextPageToken != "-" {
endpoint += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
}, func(response *clusterListResponse) string {
return response.NextPageToken
})
if err != nil {
return nil, err
}
var result []Cluster
for _, response := range clusterListResponses {
for i := 0; i < len(response.Clusters); i++ {
result = append(result, *response.Clusters[i])
}
}
return result, nil
}
// GetAnthosAWSClusterNames retrieves the list of Anthos on AWS cluster names.
func (c *GCPClientImpl) GetAnthosAWSClusterNames(ctx context.Context, projectId string) ([]string, error) {
type awsCluster struct {
Name string `json:"name"`
}
type clusterListResponse struct {
AwsClusters []*awsCluster `json:"awsClusters"`
NextPageToken string `json:"nextPageToken"`
}
var result []string
var lock sync.Mutex
var wg sync.WaitGroup
for _, endpoint := range multicloudAPIEndpoints {
wg.Add(1)
go func(endpoint multicloudAPIEndpoint) {
defer wg.Done()
pc := NewPageClient[clusterListResponse](c.BaseClient)
awsClusterLists, err := pc.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
ep := fmt.Sprintf("%s/v1/projects/%s/locations/%s/awsClusters", endpoint.Endpoint, projectId, endpoint.Location)
if hasToken {
ep += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", ep, nil)
}, func(response *clusterListResponse) string {
return response.NextPageToken
})
if err != nil {
return
}
lock.Lock()
for _, awsClusterList := range awsClusterLists {
for _, awsCluster := range awsClusterList.AwsClusters {
clusterNameSegments := strings.Split(awsCluster.Name, "/")
result = append(result, clusterNameSegments[len(clusterNameSegments)-1])
}
}
lock.Unlock()
}(endpoint)
}
wg.Wait()
return result, nil
}
// GetAnthosAzureClusterNames retrieves the list of Anthos on Azure cluster names.
func (c *GCPClientImpl) GetAnthosAzureClusterNames(ctx context.Context, projectId string) ([]string, error) {
type azureCluster struct {
Name string `json:"name"`
}
type clusterListResponse struct {
AzureClusters []*azureCluster `json:"azureClusters"`
NextPageToken string `json:"nextPageToken"`
}
var result []string
var lock sync.Mutex
var wg sync.WaitGroup
for _, endpoint := range multicloudAPIEndpoints {
wg.Add(1)
go func(endpoint multicloudAPIEndpoint) {
defer wg.Done()
pc := NewPageClient[clusterListResponse](c.BaseClient)
azureClusterLists, err := pc.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
ep := fmt.Sprintf("%s/v1/projects/%s/locations/%s/azureClusters", endpoint.Endpoint, projectId, endpoint.Location)
if hasToken {
ep += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", ep, nil)
}, func(response *clusterListResponse) string {
return response.NextPageToken
})
if err != nil {
return
}
lock.Lock()
for _, azureClusterList := range azureClusterLists {
for _, azureCluster := range azureClusterList.AzureClusters {
clusterNameSegments := strings.Split(azureCluster.Name, "/")
result = append(result, clusterNameSegments[len(clusterNameSegments)-1])
}
}
lock.Unlock()
}(endpoint)
}
wg.Wait()
return result, nil
}
func (c *GCPClientImpl) GetAnthosOnBaremetalClusterNames(ctx context.Context, projectId string) ([]string, error) {
type baremetalCluster struct {
Name string `json:"name"`
// Ignoreing the other fields...
}
type clusterListResponse struct {
BaremetalClusters []*baremetalCluster `json:"bareMetalClusters"`
NextPageToken string `json:"nextPageToken"`
}
type baremetalAdminCluster struct {
Name string `json:"name"`
// Ignoreing the other fields...
}
type clusterAdminListResponse struct {
BaremetalAdminClusters []*baremetalAdminCluster `json:"bareMetalAdminClusters"`
NextPageToken string `json:"nextPageToken"`
}
wg := sync.WaitGroup{}
wg.Add(3)
resultLock := sync.Mutex{}
result := make([]string, 0)
go func() {
defer wg.Done()
// Admin cluster can be only registered on the fleet membership.
// Query fleet membership status as well.
fleets, err := c.GetFleetMembershipNames(ctx, projectId)
if err != nil {
return
}
resultLock.Lock()
defer resultLock.Unlock()
result = append(result, fleets...)
}()
go func() {
defer wg.Done()
pc := NewPageClient[clusterListResponse](c.BaseClient)
clusterLists, err := pc.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
endpoint := fmt.Sprintf("https://gkeonprem.googleapis.com/v1/projects/%s/locations/-/bareMetalClusters", projectId)
if hasToken {
endpoint += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
}, func(response *clusterListResponse) string {
return response.NextPageToken
})
if err != nil {
return
}
resultLock.Lock()
defer resultLock.Unlock()
for _, clusters := range clusterLists {
for _, cluster := range clusters.BaremetalClusters {
nameSegments := strings.Split(cluster.Name, "/")
result = append(result, nameSegments[len(nameSegments)-1])
}
}
}()
go func() {
defer wg.Done()
pac := NewPageClient[clusterAdminListResponse](c.BaseClient)
clusterAdminLists, err := pac.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
endpoint := fmt.Sprintf("https://gkeonprem.googleapis.com/v1/projects/%s/locations/-/bareMetalAdminClusters", projectId)
if hasToken {
endpoint += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
}, func(response *clusterAdminListResponse) string {
return response.NextPageToken
})
if err != nil {
return
}
resultLock.Lock()
defer resultLock.Unlock()
for _, cluster := range clusterAdminLists {
for _, cluster := range cluster.BaremetalAdminClusters {
nameSegments := strings.Split(cluster.Name, "/")
result = append(result, nameSegments[len(nameSegments)-1])
}
}
}()
wg.Wait()
return result, nil
}
func (c *GCPClientImpl) GetAnthosOnVMWareClusterNames(ctx context.Context, projectId string) ([]string, error) {
type vmwareCluster struct {
Name string `json:"name"`
// Ignoreing the other fields...
}
type clusterListResponse struct {
VMWareClusters []*vmwareCluster `json:"vmwareClusters"`
NextPageToken string `json:"nextPageToken"`
}
type vmwareAdminCluster struct {
Name string `json:"name"`
// Ignoreing the other fields...
}
type clusterAdminListResponse struct {
VMWareAdminClusters []*vmwareAdminCluster `json:"vmwareAdminClusters"`
NextPageToken string `json:"nextPageToken"`
}
wg := sync.WaitGroup{}
wg.Add(3)
resultLock := sync.Mutex{}
result := make([]string, 0)
go func() {
defer wg.Done()
// Admin cluster can be only registered on the fleet membership.
// Query fleet membership status as well.
fleets, err := c.GetFleetMembershipNames(ctx, projectId)
if err != nil {
return
}
resultLock.Lock()
defer resultLock.Unlock()
result = append(result, fleets...)
}()
go func() {
defer wg.Done()
pc := NewPageClient[clusterListResponse](c.BaseClient)
clusterLists, err := pc.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
endpoint := fmt.Sprintf("https://gkeonprem.googleapis.com/v1/projects/%s/locations/-/vmwareClusters", projectId)
if hasToken {
endpoint += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
}, func(response *clusterListResponse) string {
return response.NextPageToken
})
if err != nil {
return
}
resultLock.Lock()
defer resultLock.Unlock()
for _, clusters := range clusterLists {
for _, cluster := range clusters.VMWareClusters {
nameSegments := strings.Split(cluster.Name, "/")
result = append(result, nameSegments[len(nameSegments)-1])
}
}
}()
go func() {
defer wg.Done()
pac := NewPageClient[clusterAdminListResponse](c.BaseClient)
clusterAdminLists, err := pac.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
endpoint := fmt.Sprintf("https://gkeonprem.googleapis.com/v1/projects/%s/locations/-/vmwareAdminClusters", projectId)
if hasToken {
endpoint += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
}, func(response *clusterAdminListResponse) string {
return response.NextPageToken
})
if err != nil {
return
}
resultLock.Lock()
defer resultLock.Unlock()
for _, cluster := range clusterAdminLists {
for _, cluster := range cluster.VMWareAdminClusters {
nameSegments := strings.Split(cluster.Name, "/")
result = append(result, nameSegments[len(nameSegments)-1])
}
}
}()
wg.Wait()
return result, nil
}
func (c *GCPClientImpl) GetFleetMembershipNames(ctx context.Context, projectId string) ([]string, error) {
type membershipResource struct {
Name string `json:"name"`
// Ignoreing the other fields...
}
type clusterAdminListResponse struct {
Resources []*membershipResource `json:"resources"`
NextPageToken string `json:"nextPageToken"`
}
pc := NewPageClient[clusterAdminListResponse](c.BaseClient)
membershipLists, err := pc.GetAll(ctx, func(hasToken bool, nextPageToken string) (*http.Request, error) {
endpoint := fmt.Sprintf("https://gkehub.googleapis.com/v1/projects/%s/locations/-/memberships", projectId)
if hasToken {
endpoint += "?pageToken=" + nextPageToken
}
return c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
}, func(response *clusterAdminListResponse) string {
return response.NextPageToken
})
if err != nil {
return nil, err
}
result := make([]string, 0)
for _, membershipList := range membershipLists {
for _, membership := range membershipList.Resources {
membershipFragments := strings.Split(membership.Name, "/")
result = append(result, membershipFragments[len(membershipFragments)-1])
}
}
return result, nil
}
// Get all composer environment names from composer.googleapis.com in a region
// refs: https://cloud.google.com/composer/docs/reference/rest/v1/projects.locations.environments/list
func (c *GCPClientImpl) GetComposerEnvironmentNames(ctx context.Context, projectId string, location string) ([]string, error) {
type environment struct {
Name string `json:"name"`
}
type environmentListResponse struct {
Environments []environment `json:"environments"`
NextPageToken string `json:"nextPageToken"`
}
var result []string
for nextPageToken := "-"; nextPageToken != ""; {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
return nil, err
}
continue
default:
endpoint := fmt.Sprintf("https://composer.googleapis.com/v1/projects/%s/locations/%s/environments", projectId, location)
if nextPageToken != "-" {
endpoint += "?pageToken=" + nextPageToken
}
req, err := c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("failed to create GCP HTTP request: %w", err)
}
client := httpclient.NewJsonResponseHttpClient[environmentListResponse](c.BaseClient)
response, httpResponse, err := client.DoWithContext(ctx, req)
if httpResponse != nil && httpResponse.Body != nil {
defer httpResponse.Body.Close()
}
if err != nil {
return nil, fmt.Errorf("failed to get JSON response: %w", err)
}
for _, environment := range response.Environments {
// fullname: projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME
fullname := environment.Name
name := strings.Split(fullname, "/")[len(strings.Split(fullname, "/"))-1]
result = append(result, name)
}
nextPageToken = response.NextPageToken
}
}
return result, nil
}
// Get all regions(locations) from compute.googleapis.com
// ref: https://cloud.google.com/compute/docs/reference/rest/v1/regions/list
// Note: No filters are applid to the list operation. Literary "all" regions will return.(.items[].status be ignored)
func (c *GCPClientImpl) ListRegions(ctx context.Context, projectId string) ([]string, error) {
if projectId == "" {
return nil, fmt.Errorf("projectId is empty")
}
type item struct {
Name string `json:"name"`
}
type listRegionRespnse struct {
Items []item `json:"items"`
}
endpoint := fmt.Sprintf("https://compute.googleapis.com/compute/v1/projects/%s/regions", projectId)
req, err := c.CreateGCPHttpRequest(ctx, "GET", endpoint, nil)
if err != nil {
return nil, fmt.Errorf("failed to create GCP HTTP request: %w", err)
}
client := httpclient.NewJsonResponseHttpClient[listRegionRespnse](c.BaseClient)
response, httpResponse, err := client.DoWithContext(ctx, req)
if httpResponse != nil && httpResponse.Body != nil {
defer httpResponse.Body.Close()
}
if err != nil {
return nil, fmt.Errorf("failed to get JSON response: %w", err)
}
var result []string
for _, item := range response.Items {
result = append(result, item.Name)
}
return result, nil
}
/**
* Query logs with specified filter
*/
func (c *GCPClientImpl) ListLogEntries(ctx context.Context, resourceNames []string, filter string, logSink chan any) error {
type logEntriesListRequest struct {
ResourceNames []string `json:"resourceNames"`
Filter string `json:"filter"`
OrderBy string `json:"orderBy"`
PageSize int64 `json:"pageSize"`
PageToken string `json:"pageToken,omitempty"`
}
type logEntriesListResponse struct {
Entries []any `json:"entries"`
NextPageToken string `json:"nextPageToken"`
}
defer close(logSink)
ENDPOINT := "https://logging.googleapis.com/v2/entries:list"
MAXIMUM_PAGE_SIZE := 1000
nextPageToken := ""
pageCount := 0
for entryIndex := 0; entryIndex < c.MaxLogEntries; entryIndex += MAXIMUM_PAGE_SIZE {
queryEnd := false
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
// cancel operation
return err
}
default:
requestBody := logEntriesListRequest{
ResourceNames: resourceNames,
Filter: filter,
OrderBy: "timestamp asc",
PageSize: int64(math.Min(float64(MAXIMUM_PAGE_SIZE), float64(c.MaxLogEntries-entryIndex))), // logging API can take 1000 entries at most.
PageToken: nextPageToken,
}
requestBytes, err := json.Marshal(requestBody)
if err != nil {
return err
}
req, err := c.CreateGCPHttpRequest(ctx, "POST", ENDPOINT, bytes.NewReader(requestBytes))
if err != nil {
return err
}
client := httpclient.NewJsonResponseHttpClient[logEntriesListResponse](c.BaseClient)
response, httpResponse, err := client.DoWithContext(ctx, req)
if httpResponse != nil && httpResponse.Body != nil {
defer httpResponse.Body.Close()
}
if err != nil {
if httpResponse != nil {
slog.ErrorContext(ctx, fmt.Sprintf("Unretriable error found: %d:%s", httpResponse.StatusCode, httpResponse.Status))
}
return err
}
for _, entry := range response.Entries {
logSink <- entry
}
if response.NextPageToken == "" {
queryEnd = true
break
}
nextPageToken = response.NextPageToken
pageCount += 1
}
if queryEnd {
break
}
}
return nil
}