func main()

in example/ingestion/kafka_ingestion_sample.go [10:62]


func main() {
	fmt.Println("create kafka ingestion sample begin")
	logstoreName := util.LogStoreName
	project := util.ProjectName
	client := util.Client
	base := sls.BaseJob{
		Name:        "ingest-kafka-test-kafka", // TODO
		DisplayName: "test-kafka",              // TODO
		Description: "test-kafka",              // TODO
		Type:        "Ingestion",               // default
	}
	sj := sls.ScheduledJob{
		BaseJob: base,
		Schedule: &sls.Schedule{
			Type: "Resident", // default
		},
	}
	kafkaSource := sls.KafkaSource{
		DataSource:       sls.DataSource{DataSourceType: sls.DataSourceKafka},
		Topics:           "test",                 // TODO test,test1
		BootStrapServers: "123.123.123.123:9092", // TODO
		ValueType:        "json",                 // TODO
		FromPosition:     "lastest",              // TODO
		Communication:    "{\"protocol\":\"sasl_ssl\",\"sasl\":{\"password\":\"yyy\",\"mechanism\":\"plain\",\"username\":\"xxx\"}}",
		NameResolutions:  "{\"localhost\":\"127.0.0.1\"}",
		VpcId:            "vpc-asdasdas",
	}
	source_tmp, _ := json.Marshal(&kafkaSource)
	var source map[string]interface{}
	_ = json.Unmarshal(source_tmp, &source)

	for k, v := range source {
		if v == nil {
			delete(source, k)
		}
	}

	ingestion := &sls.Ingestion{
		ScheduledJob: sj,
		IngestionConfiguration: &sls.IngestionConfiguration{
			Version:          "v2.0",
			LogStore:         logstoreName,
			NumberOfInstance: 0,
			DataSource:       source,
		},
	}
	if err := client.CreateIngestion(project, ingestion); err != nil {
		fmt.Println(err.Error())
	} else {
		fmt.Println("create kafka ingestion over")
	}

}