pulsaradmin/pkg/rest/client.go (346 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package rest import ( "bytes" "encoding/json" "io" "net/http" "net/url" "path" ) type MediaType string const ( ApplicationJSON MediaType = "application/json" PartitionedTopicMetaJSON MediaType = "application/vnd.partitioned-topic-metadata+json" ) func (m MediaType) String() string { return string(m) } // Client is a base client that is used to make http request to the ServiceURL type Client struct { ServiceURL string HTTPClient *http.Client VersionInfo string } func (c *Client) newRequest(method, path string) (*request, error) { base, err := url.Parse(c.ServiceURL) if err != nil { return nil, err } u, err := url.Parse(path) if err != nil { return nil, err } req := &request{ method: method, url: &url.URL{ Scheme: base.Scheme, User: base.User, Host: base.Host, Path: endpoint(base.Path, u.Path), }, params: make(url.Values), } return req, nil } func (c *Client) doRequest(r *request) (*http.Response, error) { req, err := r.toHTTP() if err != nil { return nil, err } if r.contentType != "" { req.Header.Set("Content-Type", r.contentType) } else if req.Body != nil { req.Header.Set("Content-Type", ApplicationJSON.String()) } req.Header.Set("Accept", ApplicationJSON.String()) req.Header.Set("User-Agent", c.useragent()) hc := c.HTTPClient if hc == nil { hc = http.DefaultClient } return hc.Do(req) } // MakeRequest can make a simple request and handle the response by yourself func (c *Client) MakeRequest(method, endpoint string) (*http.Response, error) { req, err := c.newRequest(method, endpoint) if err != nil { return nil, err } resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return nil, err } return resp, nil } func (c *Client) MakeRequestWithURL(method string, urlOpt *url.URL) (*http.Response, error) { req := &request{ method: method, url: urlOpt, params: make(url.Values), } resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return nil, err } return resp, nil } func (c *Client) Get(endpoint string, obj interface{}) error { _, err := c.GetWithQueryParams(endpoint, obj, nil, true) return err } func (c *Client) GetWithQueryParams(endpoint string, obj interface{}, params map[string]string, decode bool) ([]byte, error) { return c.GetWithOptions(endpoint, obj, params, decode, nil) } func (c *Client) GetWithOptions(endpoint string, obj interface{}, params map[string]string, decode bool, file io.Writer) ([]byte, error) { req, err := c.newRequest(http.MethodGet, endpoint) if err != nil { return nil, err } if params != nil { query := req.url.Query() for k, v := range params { query.Add(k, v) } req.params = query } //nolint:bodyclose resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return nil, err } defer safeRespClose(resp) if obj != nil { if err := decodeJSONBody(resp, &obj); err != nil { if err == io.EOF { return nil, nil } return nil, err } } else if !decode { if file != nil { _, err := io.Copy(file, resp.Body) if err != nil { return nil, err } } else { body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } return body, err } } return nil, err } func (c *Client) useragent() string { return c.VersionInfo } func (c *Client) Put(endpoint string, in interface{}) error { return c.PutWithQueryParams(endpoint, in, nil, nil) } func (c *Client) PutWithQueryParams(endpoint string, in, obj interface{}, params map[string]string) error { return c.PutWithCustomMediaType(endpoint, in, obj, params, "") } func (c *Client) PutWithCustomMediaType(endpoint string, in, obj interface{}, params map[string]string, mediaType MediaType) error { req, err := c.newRequest(http.MethodPut, endpoint) if err != nil { return err } if mediaType != "" { req.contentType = mediaType.String() } req.obj = in if params != nil { query := req.url.Query() for k, v := range params { query.Add(k, v) } req.params = query } //nolint:bodyclose resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return err } defer safeRespClose(resp) if obj != nil { if err := decodeJSONBody(resp, &obj); err != nil { return err } } return nil } func (c *Client) PutWithMultiPart(endpoint string, body io.Reader, contentType string) error { req, err := c.newRequest(http.MethodPut, endpoint) if err != nil { return err } req.body = body req.contentType = contentType //nolint resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return err } defer safeRespClose(resp) return nil } func (c *Client) Delete(endpoint string) error { return c.DeleteWithQueryParams(endpoint, nil) } func (c *Client) DeleteWithQueryParams(endpoint string, params map[string]string) error { req, err := c.newRequest(http.MethodDelete, endpoint) if err != nil { return err } if params != nil { query := req.url.Query() for k, v := range params { query.Add(k, v) } req.params = query } //nolint resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return err } defer safeRespClose(resp) return nil } func (c *Client) Post(endpoint string, in interface{}) error { return c.PostWithObj(endpoint, in, nil) } func (c *Client) PostWithObj(endpoint string, in, obj interface{}) error { req, err := c.newRequest(http.MethodPost, endpoint) if err != nil { return err } req.obj = in //nolint resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return err } defer safeRespClose(resp) if obj != nil { if err := decodeJSONBody(resp, &obj); err != nil { return err } } return nil } func (c *Client) PostWithMultiPart(endpoint string, in interface{}, body io.Reader, contentType string) error { req, err := c.newRequest(http.MethodPost, endpoint) if err != nil { return err } req.obj = in req.body = body req.contentType = contentType //nolint resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return err } defer safeRespClose(resp) return nil } func (c *Client) PostWithQueryParams(endpoint string, in interface{}, params map[string]string) error { req, err := c.newRequest(http.MethodPost, endpoint) if err != nil { return err } if in != nil { req.obj = in } if params != nil { query := req.url.Query() for k, v := range params { query.Add(k, v) } req.params = query } //nolint resp, err := checkSuccessful(c.doRequest(req)) if err != nil { return err } defer safeRespClose(resp) return nil } type request struct { method string contentType string url *url.URL params url.Values obj interface{} body io.Reader } func (r *request) toHTTP() (*http.Request, error) { r.url.RawQuery = r.params.Encode() // add a request body if there is one if r.body == nil && r.obj != nil { body, err := encodeJSONBody(r.obj) if err != nil { return nil, err } r.body = body } req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body) if err != nil { return nil, err } req.URL.Host = r.url.Host req.URL.Scheme = r.url.Scheme req.Host = r.url.Host return req, nil } // respIsOk is used to validate a successful http status code func respIsOk(resp *http.Response) bool { return resp.StatusCode >= http.StatusOK && resp.StatusCode <= http.StatusNoContent } // checkSuccessful checks for a valid response and parses an error func checkSuccessful(resp *http.Response, err error) (*http.Response, error) { if err != nil { safeRespClose(resp) return nil, err } if !respIsOk(resp) { defer safeRespClose(resp) return nil, responseError(resp) } return resp, nil } func endpoint(parts ...string) string { return path.Join(parts...) } // encodeJSONBody is used to JSON encode a body func encodeJSONBody(obj interface{}) (io.Reader, error) { b, err := json.Marshal(obj) if err != nil { return nil, err } return bytes.NewReader(b), nil } // decodeJSONBody is used to JSON decode a body func decodeJSONBody(resp *http.Response, out interface{}) error { if resp.ContentLength == 0 { return nil } dec := json.NewDecoder(resp.Body) return dec.Decode(out) } // safeRespClose is used to close a response body func safeRespClose(resp *http.Response) { if resp != nil { // ignore error since it is closing a response body _ = resp.Body.Close() } } // responseError is used to parse a response into a client error func responseError(resp *http.Response) error { e := Error{ Code: resp.StatusCode, Reason: resp.Status, } body, err := io.ReadAll(resp.Body) if err != nil { e.Reason = err.Error() return e } err = json.Unmarshal(body, &e) if err != nil { if len(body) != 0 { e.Reason = string(body) } return e } return e }