internal/conn/http/http.go (194 lines of code) (raw):

// Package http provides a client for interacting with the ARN receiver API using an // azcore.Client. package http import ( "bytes" "compress/zlib" "context" "fmt" "io" "net/http" "path" "sync" "testing" "github.com/Azure/arn-sdk/internal/build" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" ) /* Note: these come from: https://eng.ms/docs/cloud-ai-platform/azure-core/azure-management-and-platforms/control-plane-bburns/azure-resource-notifications/azure-resource-notifications-documentation/partners/publisher/receiver-api-usage#authentication Environment AAD Cloud Tenant Audience DogfoodProd Prod 72f988bf-86f1-41af-91ab-2d7cd011db47 api://41fc9deb-1ccc-4fcc-871d-12bf54ad8986/ DogfoodPPE PPE ea8a4392-515e-481f-879e-6571ff2a8a36 api://41fc9deb-1ccc-4fcc-871d-12bf54ad8986/ Public Cloud (including canary) Prod 33e01921-4d64-4f8c-a055-5bdaffd5e33d api://41fc9deb-1ccc-4fcc-871d-12bf54ad8986/ AzureUSGovernment Prod cab8a31a-1906-4287-a0d8-4eef66b95f6e api://41fc9deb-1ccc-4fcc-871d-12bf54ad8986/ PAzureChinaCloud Prod a55a4d5b-9241-49b1-b4ff-befa8db00269 api://41fc9deb-1ccc-4fcc-871d-12bf54ad8986/ */ const ( scopeDefault = "https://arg.management.core.windows.net//.default" allOthers = "api://41fc9deb-1ccc-4fcc-871d-12bf54ad8986//.default" ) // Note: The SDK does not seem to have anything for DogfoodProd or DogfoodPPE. var changeScope = map[string]bool{ cloud.AzureChina.ActiveDirectoryAuthorityHost: true, cloud.AzureGovernment.ActiveDirectoryAuthorityHost: true, cloud.AzurePublic.ActiveDirectoryAuthorityHost: true, } var readerPool = sync.Pool{ New: func() any { return bytes.NewReader(nil) }, } var flatePool = sync.Pool{ New: func() any { return &bytes.Buffer{} }, } // zlibTransport is a custom RoundTripper that applies Deflate compression at the desired level. type zlibTransport struct { deflateLevel int flatePool chan *zlib.Writer } func newFlateTransport() *zlibTransport { return &zlibTransport{ flatePool: make(chan *zlib.Writer, 20), } } // Do performs the actual request and compresses the body using Deflate. func (t *zlibTransport) Do(req *policy.Request) (*http.Response, error) { // Get the underlying http.Request httpReq := req.Raw() // If the request has a body, apply Deflate compression. if httpReq.Body != nil && httpReq.ContentLength > 0 { // Read the original body content. var buf bytes.Buffer _, err := io.Copy(&buf, httpReq.Body) if err != nil { return nil, err } // Compress the content using Deflate at the specified level. compressedBuffer := flatePool.Get().(*bytes.Buffer) defer func() { compressedBuffer.Reset() flatePool.Put(compressedBuffer) }() var writer *zlib.Writer select { case writer = <-t.flatePool: default: writer, err = zlib.NewWriterLevel(compressedBuffer, 5) if err != nil { return nil, err } } writer.Reset(compressedBuffer) _, err = writer.Write(buf.Bytes()) if err != nil { return nil, err } writer.Close() select { case t.flatePool <- writer: default: } // Update the request with the compressed body. httpReq.Body = io.NopCloser(compressedBuffer) httpReq.ContentLength = int64(compressedBuffer.Len()) httpReq.Header.Set("Content-Encoding", "deflate") } // Use the base RoundTripper to perform the actual request. return req.Next() } // Client is a client for interacting with the ARN receiver API. type Client struct { endpoint string client *azcore.Client compress bool fakeSender Sender } // Option is a function that configures the client. type Option func(*Client) error // WihtoutCompression turns off deflate compression for the client. func WithoutCompression() Option { return func(c *Client) error { c.compress = false return nil } } // Sender is an interface to provide a fake sender for testing. type Sender interface { Send(ctx context.Context, event []byte) error } // WithFake configures the client to use a fake sender for testing. // This will be used instead of .Send(). Can only be used in tests. func WithFake(s Sender) Option { return func(c *Client) error { if !testing.Testing() { return fmt.Errorf("http.WithFakeSender() can only be used in tests") } c.fakeSender = s return nil } } // New returns a new Client for accessing the ARN receiver API. func New(endpoint string, cred azcore.TokenCredential, opts *policy.ClientOptions, options ...Option) (*Client, error) { if opts == nil { opts = &policy.ClientOptions{} } c := &Client{ endpoint: endpoint, compress: true, } for _, option := range options { if err := option(c); err != nil { return nil, err } } if c.fakeSender != nil { return c, nil } var scope = scopeDefault if changeScope[opts.Cloud.ActiveDirectoryAuthorityHost] { scope = allOthers } plOpts := runtime.PipelineOptions{ PerRetry: []policy.Policy{ runtime.NewBearerTokenPolicy(cred, []string{scope}, nil), }, } if c.compress { plOpts.PerRetry = append(plOpts.PerRetry, newFlateTransport()) } azclient, err := azcore.NewClient("arn.Client", build.Version, plOpts, opts) if err != nil { return nil, err } if path.Dir(endpoint) != "arnnotify" { endpoint = runtime.JoinPaths(endpoint, "/arnnotify") } return &Client{ endpoint: endpoint, client: azclient, }, nil } // Send sends an event (converted to JSON bytes) to the ARN receiver API. func (c *Client) Send(ctx context.Context, event []byte, headers []string) error { if c.fakeSender != nil { return c.fakeSender.Send(ctx, event) } if len(headers)%2 != 0 { return fmt.Errorf("headers must be key-value pairs") } read := readerPool.Get().(*bytes.Reader) read.Reset(event) defer readerPool.Put(read) req, err := c.setup(ctx, read, headers) if err != nil { return err } // Send the event to the ARN service. resp, err := c.client.Pipeline().Do(req) if err != nil { return err } if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status code: %d", resp.StatusCode) } return nil } // appJSON is the Accept header for application/json. Set as a package // variable to avoid allocations. var appJSON = []string{"application/json"} // setup creates a new request with the event as the body. func (c *Client) setup(ctx context.Context, event *bytes.Reader, headers []string) (*policy.Request, error) { if event.Len() == 0 { return nil, fmt.Errorf("event is empty") } r := rsc{event} req, err := runtime.NewRequest(ctx, http.MethodPost, c.endpoint) if err != nil { return nil, err } req.Raw().Header["Accept"] = appJSON for i := 0; i < len(headers); i += 2 { req.Raw().Header.Add(headers[i], headers[i+1]) } return req, req.SetBody(r, "application/json") } // Compile-time check to verify implements interface. var _ io.ReadSeekCloser = rsc{} // rsc is an implementation of ReadSeekCloser. type rsc struct { *bytes.Reader } // Close is a no-op for byteReadSeekCloser. func (b rsc) Close() error { return nil }