pipeline/endpoints/servicecontrol.go (173 lines of code) (raw):
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoints
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/GoogleCloudPlatform/ubbagent/clock"
"github.com/GoogleCloudPlatform/ubbagent/metrics"
"github.com/GoogleCloudPlatform/ubbagent/pipeline"
"github.com/GoogleCloudPlatform/ubbagent/util"
"github.com/golang/glog"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/servicecontrol/v1"
)
const (
agentIdLabel = "goog-ubb-agent-id"
timeout = 60 * time.Second
checkCacheTimeout = 60 * time.Second
)
type ServiceControlEndpoint struct {
name string
serviceName string
consumerId string
agentId string
keyData string
service *servicecontrol.Service
tracker pipeline.UsageTracker
nextCheck time.Time
clock clock.Clock
}
type checkError struct {
err error
transient bool
}
// NewServiceControlEndpoint creates a new ServiceControlEndpoint.
func NewServiceControlEndpoint(name, serviceName, agentId string, consumerId string, jsonKey []byte) (*ServiceControlEndpoint, error) {
config, err := google.JWTConfigFromJSON(jsonKey, servicecontrol.ServicecontrolScope)
if err != nil {
return nil, err
}
client := config.Client(context.Background())
client.Timeout = timeout
service, err := servicecontrol.New(client)
if err != nil {
return nil, err
}
return newServiceControlEndpoint(name, serviceName, agentId, consumerId, service, clock.NewClock()), nil
}
func newServiceControlEndpoint(name, serviceName, agentId, consumerId string, service *servicecontrol.Service, clock clock.Clock) *ServiceControlEndpoint {
ep := &ServiceControlEndpoint{
name: name,
serviceName: serviceName,
agentId: agentId,
consumerId: consumerId,
service: service,
clock: clock,
}
return ep
}
func (ep *ServiceControlEndpoint) Name() string {
return ep.name
}
func (ep *ServiceControlEndpoint) Send(report pipeline.EndpointReport) error {
operation := ep.format(report)
req := &servicecontrol.ReportRequest{
Operations: []*servicecontrol.Operation{operation},
}
glog.V(2).Infoln("ServiceControlEndpoint:Send(): serviceName: ", ep.serviceName, " body: ", func() string {
reqJson, _ := req.MarshalJSON()
return string(reqJson)
}())
// Check only every 60 seconds, following recommendation from https://godoc.org/google.golang.org/api/servicecontrol/v1#ServicesService.Check
if ep.clock.Now().After(ep.nextCheck) {
// Check requests can not have user labels.
opNoLabels := *operation
opNoLabels.UserLabels = nil
checkReq := &servicecontrol.CheckRequest{
Operation: &opNoLabels,
}
checkResp, err := ep.service.Services.Check(ep.serviceName, checkReq).Do()
if err != nil && !googleapi.IsNotModified(err) {
return err
}
if len(checkResp.CheckErrors) > 0 {
return checkErrorsToError(checkResp.CheckErrors)
}
ep.nextCheck = ep.clock.Now().Add(checkCacheTimeout)
}
resp, err := ep.service.Services.Report(ep.serviceName, req).Do()
if err != nil && !googleapi.IsNotModified(err) {
return err
}
// This will retry reporting all operations.
// However, identical operations are de-duped for billing
if len(resp.ReportErrors) > 0 {
var errs []error
for _, reportErr := range resp.ReportErrors {
errs = append(errs, reportErrorToError(reportErr))
}
return errors.Join(errs...)
}
glog.V(2).Infoln("ServiceControlEndpoint:Send(): success")
return nil
}
func (ep *ServiceControlEndpoint) BuildReport(r metrics.StampedMetricReport) (pipeline.EndpointReport, error) {
return pipeline.NewEndpointReport(r, nil)
}
func (ep *ServiceControlEndpoint) format(r pipeline.EndpointReport) *servicecontrol.Operation {
value := servicecontrol.MetricValue{
StartTime: r.StartTime.UTC().Format(time.RFC3339Nano),
EndTime: r.EndTime.UTC().Format(time.RFC3339Nano),
}
if r.Value.Int64Value != nil {
value.Int64Value = util.NewInt64(*r.Value.Int64Value)
} else if r.Value.DoubleValue != nil {
value.DoubleValue = util.NewFloat64(*r.Value.DoubleValue)
}
op := &servicecontrol.Operation{
OperationId: r.Id,
// ServiceControl requires this field but doesn't indicate what it's supposed to be.
OperationName: fmt.Sprintf("%v/report", ep.serviceName),
StartTime: r.StartTime.UTC().Format(time.RFC3339Nano),
EndTime: r.EndTime.UTC().Format(time.RFC3339Nano),
ConsumerId: ep.consumerId,
UserLabels: r.Labels,
MetricValueSets: []*servicecontrol.MetricValueSet{
{
MetricName: fmt.Sprintf("%v/%v", ep.serviceName, r.Name),
MetricValues: []*servicecontrol.MetricValue{&value},
},
},
}
if op.UserLabels == nil {
op.UserLabels = make(map[string]string)
}
// Add the agent ID label
op.UserLabels[agentIdLabel] = ep.agentId
return op
}
// Use is a no-op. ServiceControlEndpoint doesn't track usage.
func (ep *ServiceControlEndpoint) Use() {}
// Release is a no-op. ServiceControlEndpoint doesn't track usage.
func (ep *ServiceControlEndpoint) Release() error {
return nil
}
func (ep *ServiceControlEndpoint) IsTransient(err error) bool {
if err == nil {
return false
}
switch v := err.(type) {
case *googleapi.Error:
// Return true if this is an http error with a 5xx code.
return v.Code >= 500 && v.Code < 600
case net.Error:
// Return true if this error is considered temporary or a timeout.
return v.Temporary() || v.Timeout()
case *checkError:
return v.transient
default:
// Some non-http error (perhaps a connection refused or timeout?)
// We'll retry.
return true
}
}
func checkErrorsToError(checkErrors []*servicecontrol.CheckError) error {
var errs []error
var transient = true
for _, checkError := range checkErrors {
switch checkError.Code {
// These errors indicate customer disabling billing and
// is not retriable. See: https://cloud.google.com/marketplace/docs/partners/integrated-saas/backend-integration#for_usage-based_pricing_reporting_usage_to_google
case "BILLING_DISABLED", "SERVICE_NOT_ACTIVATED", "PROJECT_DELETED":
transient = false
}
bytes, _ := checkError.MarshalJSON()
errs = append(errs, errors.New(string(bytes)))
}
return &checkError{err: errors.Join(errs...), transient: transient}
}
func (ce checkError) Error() string {
return ce.err.Error()
}
func reportErrorToError(reportError *servicecontrol.ReportError) error {
bytes, _ := reportError.MarshalJSON()
return errors.New(string(bytes))
}