internal/openapi/client.go (392 lines of code) (raw):

/* * Copyright (c) 2023 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package openapi import ( "bytes" "encoding/json" "errors" "fmt" "io" "net/http" "strconv" "strings" "sync" "time" "github.com/alibaba/schedulerx-worker-go/internal/utils" "github.com/alibaba/schedulerx-worker-go/logger" ) var ( client *Client once sync.Once ) func InitOpenAPIClient(cli *Client) { once.Do(func() { client = cli client.SetDomain() }) } // GetOpenAPIClient first executes InitOpenAPIClient and then calls it, otherwise it returns nil func GetOpenAPIClient() *Client { return client } type Option func(*Client) type Client struct { // Schedulerx application management interface to view user identity verification (required) appKey string // View the groupId application management interface (required) groupId string // Send to the corresponding domain name (optional). // By default, it is filled in according to the environment where the machine is located. // Daily、Pre、Online domain string // endpoint only exited at Alibaba Cloud, using it to get domain endpoint string // Operator's convenience record query (optional) user string // Deprecated userId string userName string // Namespace specific environment usage namespace string namespaceSource string httpClient http.Client initMethod string app string } func WithHTTPClient(httpClient http.Client) Option { return func(client *Client) { client.httpClient = httpClient } } func WithOpenAPIDomain(domain string) Option { return func(client *Client) { client.domain = domain } } func WithOpenAPIEndpoint(endpoint string) Option { return func(client *Client) { client.endpoint = endpoint } } func WithInitMethod(initMethod string) Option { return func(client *Client) { client.initMethod = initMethod } } func WithNamespace(namespace string) Option { return func(client *Client) { client.namespace = namespace } } func WithGroupId(groupId string) Option { return func(client *Client) { client.groupId = groupId } } func WithAppKey(appKey string) Option { return func(client *Client) { client.appKey = appKey } } func WithApp(app string) Option { return func(client *Client) { client.app = app } } func WithUser(user string) Option { return func(client *Client) { client.user = user } } func NewClient(opts ...Option) *Client { cli := new(Client) for _, opt := range opts { opt(cli) } return cli } type CreateJavaJobRequest struct { BaseRequest JavaJobConfig } type CreateJavaJobResponse struct { JobId int64 `json:"job_id"` } // CreateJob create java type job func (c *Client) CreateJob(req *CreateJavaJobRequest) (int64, error) { if !req.IsRequired() { return -1, errors.New("Required fields of CreateJavaJobRequest is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/create", c.domain) respData, err := c.sendRequest(url, req) if err != nil { return -1, err } resp := new(CreateJavaJobResponse) if err := json.Unmarshal(respData, resp); err != nil { return -1, err } return resp.JobId, nil } type CreateHTTPJobRequest struct { BaseRequest HttpJobConfig } type CreateHTTPJobResponse struct { JobId int64 `json:"job_id"` } // CreateHTTPJob create http job func (c *Client) CreateHTTPJob(req *CreateHTTPJobRequest) (int64, error) { if !req.IsRequired() { return -1, errors.New("Required fields of CreateHTTPJob is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/create", c.domain) respData, err := c.sendRequest(url, req) if err != nil { return -1, err } resp := new(CreateHTTPJobResponse) if err := json.Unmarshal(respData, resp); err != nil { return -1, err } return resp.JobId, nil } type UpdateJobRequest struct { BaseRequest // Task information to be modified JobConfigInfo } type UpdateJobResponse struct { JobId int64 `json:"job_id"` } // UpdateJob update job func (c *Client) UpdateJob(req *UpdateJobRequest) (int64, error) { if !req.IsRequired() { return -1, errors.New("Required fields of UpdateJob is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/update", c.domain) respData, err := c.sendRequest(url, req) if err != nil { return -1, err } resp := new(UpdateJobResponse) if err := json.Unmarshal(respData, resp); err != nil { return -1, err } return resp.JobId, nil } type DeleteJobRequest struct { BaseRequest jobId int } // DeleteJob delete job func (c *Client) DeleteJob(req *DeleteJobRequest) error { if req.jobId <= 0 { return errors.New("Required fields of UpdateJob is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/delete", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } type ExecJobRequest struct { BaseRequest jobId int } func (c *Client) ExecJob(req *DeleteJobRequest) error { if req.jobId <= 0 { return errors.New("Required fields of ExecJob is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/execute", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } type EnableJobRequest struct { BaseRequest jobId int } func (c *Client) EnableJob(req *EnableJobRequest) error { if req.jobId <= 0 { return errors.New("Required fields of EnableJob is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/enable", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } type DisableJobRequest struct { BaseRequest jobId int } func (c *Client) DisableJob(req *DisableJobRequest) error { if req.jobId <= 0 { return errors.New("Required fields of DisableJob is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/job/disable", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } type GetJobInstanceRequest struct { BaseRequest jobId int jobInstanceId int } func (c *Client) GetJobInstance(req *GetJobInstanceRequest) error { if req.jobId <= 0 || req.jobInstanceId <= 0 { return errors.New("Required fields of GetJobInstance is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/instance/get", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } type GetJobInstanceListRequest struct { BaseRequest jobId int } func (c *Client) GetJobInstanceList(req *GetJobInstanceListRequest) error { if req.jobId <= 0 { return errors.New("Required fields of GetJobInstanceList is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/instance/get", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } type KillJobInstanceRequest struct { BaseRequest jobId int instanceId int } func (c *Client) KillJobInstance(req *KillJobInstanceRequest) error { if req.jobId <= 0 || req.instanceId <= 0 { return errors.New("Required fields of KillJobInstance is empty. ") } url := fmt.Sprintf("http://%s/openapi/v1/instance/kill", c.domain) _, err := c.sendRequest(url, req) if err != nil { return err } return nil } // Fill headers func (c *Client) genHeaders() map[string]string { headers := make(map[string]string) // Optional parameters if c.namespace != "" { headers[NAMESPACE_KEY_HEADER] = c.namespace if c.namespaceSource != "" { headers[NAMESPACE_SOURCE_HEADER] = c.namespaceSource } } // Time timeNowStr := strconv.Itoa(int(time.Now().UnixMilli())) headers[TIME_STAMP_HEADER] = timeNowStr // Signature if c.groupId != "" && c.appKey != "" { headers[SIGNATURE_HEADER] = utils.HmacSHA1Encrypt(c.groupId+timeNowStr, c.appKey) } // GroupId headers[GROUPID_HEADER] = c.groupId // Whether to create a group if c.groupId != "" { headers[CREATE_GROUP_HEADER] = "true" } else { headers[CREATE_GROUP_HEADER] = "false" } return headers } func (c *Client) sendRequest(url string, request BaseRequest) ([]byte, error) { // Fill in initialization parameters if c.groupId != "" { request.setGroupId(c.groupId) } if c.namespace != "" { request.setNamespace(c.namespace) } if c.namespaceSource != "" { request.setGroupId(c.namespaceSource) } if c.userId != "" { request.setParam("userId", c.userId) } if c.userName != "" { request.setParam("userName", c.userName) } if c.userId != "" && c.userName != "" { request.setParam("user", c.userName+" "+c.userId) } else { user := "openapi" if c.user != "" { user = c.user } request.setParam("user", user) } // Required parameter verification if !c.validateRequiredParam() { return nil, errors.New("Missing required param. ") } // Fill headers headers := c.genHeaders() logger.Infof("SendRequest url: %s, request params: %+v, headers: %+v", url, request.getParams(), headers) postBody, _ := json.Marshal(request.getParams()) req, err := http.NewRequest("POST", url, bytes.NewBuffer(postBody)) if err != nil { return nil, fmt.Errorf("HTTP NewRequest failed, err=%s ", err.Error()) } req.Header.Set("Content-Type", "application/json") for k, v := range headers { req.Header.Set(k, v) } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("HTTP post failed, err=%s ", err.Error()) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("Read http post response failed, err=%s ", err.Error()) } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("Read http post response failed, statusCode=%s ", resp.Status) } return body, nil } func (c *Client) validateRequiredParam() bool { return c.domain != "" } func (c *Client) SetDomain() { var ( // 1. using config's domain domainAddr = c.domain err error ) // 2. using endpoint to get domainAddr, compatible Alibaba Cloud // endpoint only exited at Alibaba Cloud if domainAddr == "" && client.endpoint != "" { domainAddr, err = client.getDomainByEndpoint() if err != nil { logger.Warnf("Cannot get domainAddr from endpoint, init openAPI client failed, endpoint=%s, err=%s", client.endpoint, err.Error()) } } if domainAddr == "" { panic(fmt.Sprintf("Cannot get domainAddr, init openAPI client failed, err=%s", err.Error())) } c.domain = domainAddr } func (c *Client) getDomainByEndpoint() (string, error) { if c.endpoint == "" { return "", errors.New("Required fields of KillJobInstance is empty. ") } url := fmt.Sprintf("http://%s:%s/schedulerx2/consolelist", c.endpoint, "8080") resp, err := c.httpClient.Get(url) if err != nil { return "", fmt.Errorf("HTTP post failed, err=%s ", err.Error()) } if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("Read http post response failed, statusCode=%s ", resp.Status) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return "", fmt.Errorf("Read http post response failed, err=%s ", err.Error()) } return strings.TrimSpace(string(body)), nil } func (c *Client) Domain() string { return c.domain } func (c *Client) Namespace() string { return c.namespace } func (c *Client) NamespaceSource() string { return c.namespaceSource } func (c *Client) HttpClient() *http.Client { return &c.httpClient }