in pulsaradmin/pkg/admin/functions.go [590:656]
func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, "trigger")
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
if triggerFile != "" {
file, err := os.Open(triggerFile)
if err != nil {
return "", err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name()))
if err != nil {
return "", err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return "", err
}
}
if triggerValue != "" {
valueWriter, err := f.createTextFromFiled(multiPartWriter, "data")
if err != nil {
return "", err
}
_, err = valueWriter.Write([]byte(triggerValue))
if err != nil {
return "", err
}
}
if topic != "" {
topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic")
if err != nil {
return "", err
}
_, err = topicWriter.Write([]byte(topic))
if err != nil {
return "", err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err := multiPartWriter.Close(); err != nil {
return "", err
}
contentType := multiPartWriter.FormDataContentType()
var str string
err := f.pulsar.Client.PostWithMultiPart(endpoint, &str, bodyBuf, contentType)
if err != nil {
return "", err
}
return str, nil
}