client_etl.go (215 lines of code) (raw):
package sls
import (
"encoding/json"
"fmt"
"io/ioutil"
"time"
)
type ETL struct {
Configuration ETLConfiguration `json:"configuration"`
Description string `json:"description,omitempty"`
DisplayName string `json:"displayName"`
Name string `json:"name"`
Schedule ETLSchedule `json:"schedule"`
Type string `json:"type"`
Status string `json:"status"`
CreateTime int32 `json:"createTime,omitempty"`
LastModifiedTime int32 `json:"lastModifiedTime,omitempty"`
}
type ETLConfiguration struct {
AccessKeyId string `json:"accessKeyId"`
AccessKeySecret string `json:"accessKeySecret"`
FromTime int64 `json:"fromTime,omitempty"`
Logstore string `json:"logstore"`
Parameters map[string]string `json:"parameters,omitempty"`
RoleArn string `json:"roleArn,omitempty"`
Script string `json:"script"`
ToTime int32 `json:"toTime,omitempty"`
Version int8 `json:"version"`
Lang string `json:"lang"`
ETLSinks []ETLSink `json:"sinks"`
}
type ETLSchedule struct {
Type string `json:"type"`
}
type ETLSink struct {
AccessKeyId string `json:"accessKeyId"`
AccessKeySecret string `json:"accessKeySecret"`
Endpoint string `json:"endpoint"`
Logstore string `json:"logstore"`
Name string `json:"name"`
Project string `json:"project"`
RoleArn string `json:"roleArn,omitempty"`
Type string `json:"type,omitempty"`
DataSets []string `json:"datasets,omitempty"`
}
type ListETLResponse struct {
Total int `json:"total"`
Count int `json:"count"`
Results []*ETL `json:"results"`
}
func NewETL(endpoint, accessKeyId, accessKeySecret, logstore, name, project string) ETL {
sink := ETLSink{
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
Endpoint: endpoint,
Logstore: logstore,
Name: name,
Project: project,
Type: ETLSinksType,
}
config := ETLConfiguration{
AccessKeyId: accessKeyId,
AccessKeySecret: accessKeySecret,
FromTime: time.Now().Unix(),
Script: "e_set('new','aliyun')",
Version: ETLVersion,
Logstore: logstore,
ETLSinks: []ETLSink{sink},
Parameters: map[string]string{},
}
schedule := ETLSchedule{
Type: "Resident",
}
etljob := ETL{
Configuration: config,
DisplayName: "displayname",
Description: "go sdk case",
Name: name,
Schedule: schedule,
Type: ETLType,
}
return etljob
}
func (c *Client) CreateETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}
uri := "/jobs"
r, err := c.request(project, "POST", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}
func (c *Client) GetETL(project string, etlName string) (ETLJob *ETL, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := "/jobs/" + etlName
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, readResponseError(err)
}
etlJob := &ETL{}
if err = json.Unmarshal(buf, etlJob); err != nil {
err = NewClientError(err)
}
return etlJob, nil
}
func (c *Client) UpdateETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}
uri := "/jobs/" + etljob.Name
r, err := c.request(project, "PUT", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}
func (c *Client) DeleteETL(project string, etlName string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := "/jobs/" + etlName
r, err := c.request(project, "DELETE", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}
func (c *Client) ListETL(project string, offset int, size int) (*ListETLResponse, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := fmt.Sprintf("/jobs?offset=%d&size=%d&jobType=ETL", offset, size)
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, readResponseError(err)
}
listETLResponse := &ListETLResponse{}
if err = json.Unmarshal(buf, listETLResponse); err != nil {
err = NewClientError(err)
}
return listETLResponse, err
}
func (c *Client) StartETL(project, name string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := fmt.Sprintf("/jobs/%s?action=START", name)
r, err := c.request(project, "PUT", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}
func (c *Client) StopETL(project, name string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := fmt.Sprintf("/jobs/%s?action=STOP", name)
r, err := c.request(project, "PUT", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}
func (c *Client) RestartETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}
uri := fmt.Sprintf("/jobs/%s?action=RESTART", etljob.Name)
r, err := c.request(project, "PUT", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}