example/ingest_processor/producer_with_processor/main.go (96 lines of code) (raw):
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"strconv"
"sync"
"time"
"github.com/aliyun/aliyun-log-go-sdk/producer"
)
// variables you should fill
//
// sample processor spl:
// * | parse-regexp content, '(\S+)\s-\s(\S+)\s\[(\S+)\]\s"(\S+)\s(\S+)\s(\S+)"\s(\d+)\s(\d+)\s(\d+)\s(\d+)\s(\S+)\s(\S+)\s"(.*)"' as remote_addr, remote_user, time_local, request_method, request_uri, http_protocol, request_time, request_length, status, body_bytes_sent, host, referer, user_agent | project-away content
var (
accessKeyId = os.Getenv("ACCESS_KEY_ID")
accessKeySecret = os.Getenv("ACCESS_KEY_SECRET")
endpoint = ""
project = ""
logstore = ""
processor = ""
)
// mock data config
var (
remoteUsers = []string{"Alice", "Bob", "Candy", "David", "Elisa"}
requestMethods = []string{"GET", "POST", "PUT", "DELETE", "HEAD"}
statuses = []string{"200", "301", "302", "400", "401", "403", "500", "501", "502"}
httpProtocol = "HTTP/1.1"
userAgent = "Mozilla/5.0 (Windows NT 5.2; WOW64) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1"
)
func mockNginxLog() string {
var (
remoteAddr = fmt.Sprintf("192.168.1.%d", rand.Intn(100))
remoteUser = remoteUsers[rand.Intn(len(remoteUsers))]
timeLocal = time.Now().Format(time.RFC3339)
requestMethod = requestMethods[rand.Intn(len(requestMethods))]
requestUri = fmt.Sprintf("/request/path-%d/file-%d", rand.Intn(10), rand.Intn(10))
requestTime = strconv.Itoa(rand.Intn(1000))
requestLength = strconv.Itoa(rand.Intn(100000))
status = statuses[rand.Intn(len(statuses))]
bodyBytesSent = strconv.Itoa(rand.Intn(100000))
host = fmt.Sprintf("www.test%d.com", rand.Intn(10))
referer = fmt.Sprintf("www.test%d.com", rand.Intn(10))
)
content := fmt.Sprintf(
`%s - %s [%s] "%s %s %s" %s %s %s %s %s %s "%s"`,
remoteAddr,
remoteUser,
timeLocal,
requestMethod,
requestUri,
httpProtocol,
requestTime,
requestLength,
status,
bodyBytesSent,
host,
referer,
userAgent,
)
return content
}
func main() {
config := producer.GetDefaultProducerConfig()
config.Endpoint = endpoint
config.AccessKeyID = accessKeyId
config.AccessKeySecret = accessKeySecret
config.GeneratePackId = true
config.Processor = processor
producerInstance, err := producer.NewProducer(config)
if err != nil {
panic(err)
}
producerInstance.Start()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
log := producer.GenerateLog(
uint32(time.Now().Unix()),
map[string]string{"content": mockNginxLog()},
)
err := producerInstance.SendLog(project, logstore, "producer", "", log)
if err != nil {
fmt.Println(err)
}
}
}()
}
wg.Wait()
fmt.Println("Send completion")
term := make(chan os.Signal)
signal.Notify(term, os.Kill, os.Interrupt)
if _, ok := <-term; ok {
fmt.Println("Get the shutdown signal and start to shut down")
producerInstance.Close(60000)
}
}