example/sink/oss_sink_sample.go (134 lines of code) (raw):
package main
import (
"encoding/json"
"fmt"
sls "github.com/aliyun/aliyun-log-go-sdk"
"time"
)
const (
endpoint = "your endpoint" // https://help.aliyun.com/document_detail/29008.html
accessKeyId = "your akId"
accessKeySecret = "your akSecret"
securityToken = ""
projectName = "your project name"
logStore = "your logstore name"
roleArn = "your roleArn"
jobName = "your job name"
bucket = "your bucket"
compressType = sls.OSSCompressionTypeNone
contentType = sls.OSSContentDetailTypeORC
)
func main() {
// create the client with ak and endpoint
client := sls.CreateNormalInterface(endpoint, accessKeyId, accessKeySecret, securityToken)
// create the oss sink export job
if err := client.CreateExport(projectName, getOssExport(contentType, compressType)); err != nil {
fmt.Println(err)
}
// get the export job
if getExport, err := client.GetExport(projectName, jobName); err != nil {
fmt.Println(err)
} else {
detail, _ := json.Marshal(getExport)
fmt.Println(string(detail))
}
// list the jobs under the logStore
if exports, total, count, err := client.ListExport(projectName, logStore, "", "", 0, 10); err != nil {
fmt.Println(err)
} else {
detail, _ := json.Marshal(exports)
fmt.Println(string(detail))
fmt.Println(total)
fmt.Println(count)
}
//client.UpdateExport(projectName, getOssExport(contentType, compressType))
//client.DeleteExport(projectName, jobName)
}
func getOssExport(contentType sls.OSSContentType, compressionType sls.OSSCompressionType) *sls.Export {
timeUnix := time.Now().Unix()
return &sls.Export{
ScheduledJob: sls.ScheduledJob{
BaseJob: sls.BaseJob{
Name: jobName,
DisplayName: jobName,
Description: "",
Type: sls.EXPORT_JOB,
},
Schedule: &sls.Schedule{
Type: "Resident",
},
},
ExportConfiguration: &sls.ExportConfiguration{
FromTime: timeUnix - 3600,
ToTime: 0,
LogStore: logStore,
Parameters: make(map[string]string),
RoleArn: roleArn,
Version: sls.ExportVersion2,
DataSink: &sls.AliyunOSSSink{
Type: sls.DataSinkOSS,
RoleArn: roleArn,
Bucket: bucket,
Prefix: "",
Suffix: "",
PathFormat: "%Y/%m/%d/%H/%M",
PathFormatType: "time",
BufferSize: 256,
BufferInterval: 300,
TimeZone: "+0800",
ContentType: contentType,
CompressionType: compressionType,
ContentDetail: getContentDetail(contentType),
},
},
}
}
func getContentDetail(contentType sls.OSSContentType) interface{} {
if contentType == sls.OSSContentDetailTypeCSV {
// default csvDetail ,you can replace the parameters
csvDetail := sls.CsvContentDetail{
ColumnNames: append(make([]string, 0), "k1", "k2"), // column key
Delimiter: ",", // you can set " " , "|" , "," and "\t"
Quote: "\"", // you can set "'" (single quote) , "\"" (double quote) and ""
Escape: "\"",
Null: "",
Header: true,
LineFeed: "\n",
}
return csvDetail
}
if contentType == sls.OSSContentDetailTypeJSON {
// default enableTag
jsonDetail := sls.JsonContentDetail{
EnableTag: true,
}
return jsonDetail
}
if contentType == "parquet" {
parquetDetail := sls.ParquetContentDetail{
Columns: []sls.Column{
{Name: "newline", Type: "string"},
{Name: "chinese", Type: "string"},
{Name: "special characters", Type: "string"},
{Name: "normal", Type: "string"},
{Name: "user_escape", Type: "string"},
{Name: "int32_field", Type: "int32"},
{Name: "int64_field", Type: "int64"},
{Name: "boolean_field", Type: "boolean"},
{Name: "float_field", Type: "float"},
{Name: "double_field", Type: "double"},
},
}
return parquetDetail
}
if contentType == "orc" {
orcDetail := sls.OrcContentDetail{
Columns: []sls.Column{
{Name: "newline", Type: "string"},
{Name: "chinese", Type: "string"},
{Name: "special characters", Type: "string"},
{Name: "normal", Type: "string"},
{Name: "user_escape", Type: "string"},
{Name: "int32_field", Type: "int32"},
{Name: "int64_field", Type: "int64"},
{Name: "boolean_field", Type: "boolean"},
{Name: "float_field", Type: "float"},
{Name: "double_field", Type: "double"},
},
}
return orcDetail
}
return nil
}