func()

in pulsaradmin/pkg/admin/functions.go [590:656]


func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) {
	endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, "trigger")

	// buffer to store our request as bytes
	bodyBuf := bytes.NewBufferString("")

	multiPartWriter := multipart.NewWriter(bodyBuf)

	if triggerFile != "" {
		file, err := os.Open(triggerFile)
		if err != nil {
			return "", err
		}
		defer file.Close()

		part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name()))

		if err != nil {
			return "", err
		}

		// copy the actual file content to the filed's writer
		_, err = io.Copy(part, file)
		if err != nil {
			return "", err
		}
	}

	if triggerValue != "" {
		valueWriter, err := f.createTextFromFiled(multiPartWriter, "data")
		if err != nil {
			return "", err
		}

		_, err = valueWriter.Write([]byte(triggerValue))
		if err != nil {
			return "", err
		}
	}

	if topic != "" {
		topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic")
		if err != nil {
			return "", err
		}

		_, err = topicWriter.Write([]byte(topic))
		if err != nil {
			return "", err
		}
	}

	// In here, we completed adding the file and the fields, let's close the multipart writer
	// So it writes the ending boundary
	if err := multiPartWriter.Close(); err != nil {
		return "", err
	}

	contentType := multiPartWriter.FormDataContentType()
	var str string
	err := f.pulsar.Client.PostWithMultiPart(endpoint, &str, bodyBuf, contentType)
	if err != nil {
		return "", err
	}

	return str, nil
}