pulsaradmin/pkg/admin/functions.go (463 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/textproto"
"os"
"path/filepath"
"strings"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Functions is admin interface for functions management
type Functions interface {
// CreateFunc create a new function.
CreateFunc(data *utils.FunctionConfig, fileName string) error
// CreateFuncWithURL create a new function by providing url from which fun-pkg can be downloaded.
// supported url: http/file
// eg:
// File: file:/dir/fileName.jar
// Http: http://www.repo.com/fileName.jar
//
// @param functionConfig
// the function configuration object
// @param pkgURL
// url from which pkg can be downloaded
CreateFuncWithURL(data *utils.FunctionConfig, pkgURL string) error
// StopFunction stop all function instances
StopFunction(tenant, namespace, name string) error
// StopFunctionWithID stop function instance
StopFunctionWithID(tenant, namespace, name string, instanceID int) error
// DeleteFunction delete an existing function
DeleteFunction(tenant, namespace, name string) error
// Download Function Code
// @param destinationFile
// file where data should be downloaded to
// @param path
// Path where data is located
DownloadFunction(path, destinationFile string) error
// Download Function Code
// @param destinationFile
// file where data should be downloaded to
// @param tenant
// Tenant name
// @param namespace
// Namespace name
// @param function
// Function name
DownloadFunctionByNs(destinationFile, tenant, namespace, function string) error
// StartFunction start all function instances
StartFunction(tenant, namespace, name string) error
// StartFunctionWithID start function instance
StartFunctionWithID(tenant, namespace, name string, instanceID int) error
// RestartFunction restart all function instances
RestartFunction(tenant, namespace, name string) error
// RestartFunctionWithID restart function instance
RestartFunctionWithID(tenant, namespace, name string, instanceID int) error
// GetFunctions returns the list of functions
GetFunctions(tenant, namespace string) ([]string, error)
// GetFunction returns the configuration for the specified function
GetFunction(tenant, namespace, name string) (utils.FunctionConfig, error)
// GetFunctionStatus returns the current status of a function
GetFunctionStatus(tenant, namespace, name string) (utils.FunctionStatus, error)
// GetFunctionStatusWithInstanceID returns the current status of a function instance
GetFunctionStatusWithInstanceID(tenant, namespace, name string, instanceID int) (
utils.FunctionInstanceStatusData, error)
// GetFunctionStats returns the current stats of a function
GetFunctionStats(tenant, namespace, name string) (utils.FunctionStats, error)
// GetFunctionStatsWithInstanceID gets the current stats of a function instance
GetFunctionStatsWithInstanceID(tenant, namespace, name string, instanceID int) (utils.FunctionInstanceStatsData, error)
// GetFunctionState fetch the current state associated with a Pulsar Function
//
// Response Example:
// { "value : 12, version : 2"}
GetFunctionState(tenant, namespace, name, key string) (utils.FunctionState, error)
// PutFunctionState puts the given state associated with a Pulsar Function
PutFunctionState(tenant, namespace, name string, state utils.FunctionState) error
// TriggerFunction triggers the function by writing to the input topic
TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error)
// UpdateFunction updates the configuration for a function.
UpdateFunction(functionConfig *utils.FunctionConfig, fileName string, updateOptions *utils.UpdateOptions) error
// UpdateFunctionWithURL updates the configuration for a function.
//
// Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
// eg:
// File: file:/dir/fileName.jar
// Http: http://www.repo.com/fileName.jar
UpdateFunctionWithURL(functionConfig *utils.FunctionConfig, pkgURL string, updateOptions *utils.UpdateOptions) error
// Upload function to Pulsar
Upload(sourceFile, path string) error
}
type functions struct {
pulsar *pulsarClient
basePath string
}
// Functions is used to access the functions endpoints
func (c *pulsarClient) Functions() Functions {
return &functions{
pulsar: c,
basePath: "/functions",
}
}
func (f *functions) createStringFromField(w *multipart.Writer, value string) (io.Writer, error) {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value))
h.Set("Content-Type", "application/json")
return w.CreatePart(h)
}
func (f *functions) createTextFromFiled(w *multipart.Writer, value string) (io.Writer, error) {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value))
h.Set("Content-Type", "text/plain")
return w.CreatePart(h)
}
func (f *functions) CreateFunc(funcConf *utils.FunctionConfig, fileName string) error {
endpoint := f.pulsar.endpoint(f.basePath, funcConf.Tenant, funcConf.Namespace, funcConf.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
jsonData, err := json.Marshal(funcConf)
if err != nil {
return err
}
stringWriter, err := f.createStringFromField(multiPartWriter, "functionConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if fileName != "" && !strings.HasPrefix(fileName, "builtin://") {
// If the function code is built in, we don't need to submit here
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("data", filepath.Base(file.Name()))
if err != nil {
return err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = f.pulsar.Client.PostWithMultiPart(endpoint, nil, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (f *functions) CreateFuncWithURL(funcConf *utils.FunctionConfig, pkgURL string) error {
endpoint := f.pulsar.endpoint(f.basePath, funcConf.Tenant, funcConf.Namespace, funcConf.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
textWriter, err := f.createTextFromFiled(multiPartWriter, "url")
if err != nil {
return err
}
_, err = textWriter.Write([]byte(pkgURL))
if err != nil {
return err
}
jsonData, err := json.Marshal(funcConf)
if err != nil {
return err
}
stringWriter, err := f.createStringFromField(multiPartWriter, "functionConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = f.pulsar.Client.PostWithMultiPart(endpoint, nil, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (f *functions) StopFunction(tenant, namespace, name string) error {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
return f.pulsar.Client.Post(endpoint+"/stop", nil)
}
func (f *functions) StopFunctionWithID(tenant, namespace, name string, instanceID int) error {
id := fmt.Sprintf("%d", instanceID)
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, id)
return f.pulsar.Client.Post(endpoint+"/stop", nil)
}
func (f *functions) DeleteFunction(tenant, namespace, name string) error {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
return f.pulsar.Client.Delete(endpoint)
}
func (f *functions) DownloadFunction(path, destinationFile string) error {
endpoint := f.pulsar.endpoint(f.basePath, "download")
_, err := os.Open(destinationFile)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("file %s already exists, please delete "+
"the file first or change the file name", destinationFile)
}
}
file, err := os.Create(destinationFile)
if err != nil {
return err
}
tmpMap := make(map[string]string)
tmpMap["path"] = path
_, err = f.pulsar.Client.GetWithOptions(endpoint, nil, tmpMap, false, file)
if err != nil {
return err
}
return nil
}
func (f *functions) DownloadFunctionByNs(destinationFile, tenant, namespace, function string) error {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, function, "download")
_, err := os.Open(destinationFile)
if err != nil {
if !os.IsNotExist(err) {
return fmt.Errorf("file %s already exists, please delete "+
"the file first or change the file name", destinationFile)
}
}
file, err := os.Create(destinationFile)
if err != nil {
return err
}
_, err = f.pulsar.Client.GetWithOptions(endpoint, nil, nil, false, file)
if err != nil {
return err
}
return nil
}
func (f *functions) StartFunction(tenant, namespace, name string) error {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
return f.pulsar.Client.Post(endpoint+"/start", nil)
}
func (f *functions) StartFunctionWithID(tenant, namespace, name string, instanceID int) error {
id := fmt.Sprintf("%d", instanceID)
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, id)
return f.pulsar.Client.Post(endpoint+"/start", nil)
}
func (f *functions) RestartFunction(tenant, namespace, name string) error {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
return f.pulsar.Client.Post(endpoint+"/restart", nil)
}
func (f *functions) RestartFunctionWithID(tenant, namespace, name string, instanceID int) error {
id := fmt.Sprintf("%d", instanceID)
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, id)
return f.pulsar.Client.Post(endpoint+"/restart", nil)
}
func (f *functions) GetFunctions(tenant, namespace string) ([]string, error) {
var functions []string
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace)
err := f.pulsar.Client.Get(endpoint, &functions)
return functions, err
}
func (f *functions) GetFunction(tenant, namespace, name string) (utils.FunctionConfig, error) {
var functionConfig utils.FunctionConfig
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
err := f.pulsar.Client.Get(endpoint, &functionConfig)
return functionConfig, err
}
func (f *functions) UpdateFunction(functionConfig *utils.FunctionConfig, fileName string,
updateOptions *utils.UpdateOptions) error {
endpoint := f.pulsar.endpoint(f.basePath, functionConfig.Tenant, functionConfig.Namespace, functionConfig.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
jsonData, err := json.Marshal(functionConfig)
if err != nil {
return err
}
stringWriter, err := f.createStringFromField(multiPartWriter, "functionConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if updateOptions != nil {
updateData, err := json.Marshal(updateOptions)
if err != nil {
return err
}
updateStrWriter, err := f.createStringFromField(multiPartWriter, "updateOptions")
if err != nil {
return err
}
_, err = updateStrWriter.Write(updateData)
if err != nil {
return err
}
}
if fileName != "" && !strings.HasPrefix(fileName, "builtin://") {
// If the function code is built in, we don't need to submit here
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("data", filepath.Base(file.Name()))
if err != nil {
return err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = f.pulsar.Client.PutWithMultiPart(endpoint, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (f *functions) UpdateFunctionWithURL(functionConfig *utils.FunctionConfig, pkgURL string,
updateOptions *utils.UpdateOptions) error {
endpoint := f.pulsar.endpoint(f.basePath, functionConfig.Tenant, functionConfig.Namespace, functionConfig.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
textWriter, err := f.createTextFromFiled(multiPartWriter, "url")
if err != nil {
return err
}
_, err = textWriter.Write([]byte(pkgURL))
if err != nil {
return err
}
jsonData, err := json.Marshal(functionConfig)
if err != nil {
return err
}
stringWriter, err := f.createStringFromField(multiPartWriter, "functionConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if updateOptions != nil {
updateData, err := json.Marshal(updateOptions)
if err != nil {
return err
}
updateStrWriter, err := f.createStringFromField(multiPartWriter, "updateOptions")
if err != nil {
return err
}
_, err = updateStrWriter.Write(updateData)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = f.pulsar.Client.PutWithMultiPart(endpoint, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (f *functions) GetFunctionStatus(tenant, namespace, name string) (utils.FunctionStatus, error) {
var functionStatus utils.FunctionStatus
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
err := f.pulsar.Client.Get(endpoint+"/status", &functionStatus)
return functionStatus, err
}
func (f *functions) GetFunctionStatusWithInstanceID(tenant, namespace, name string,
instanceID int) (utils.FunctionInstanceStatusData, error) {
var functionInstanceStatusData utils.FunctionInstanceStatusData
id := fmt.Sprintf("%d", instanceID)
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, id)
err := f.pulsar.Client.Get(endpoint+"/status", &functionInstanceStatusData)
return functionInstanceStatusData, err
}
func (f *functions) GetFunctionStats(tenant, namespace, name string) (utils.FunctionStats, error) {
var functionStats utils.FunctionStats
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name)
err := f.pulsar.Client.Get(endpoint+"/stats", &functionStats)
return functionStats, err
}
func (f *functions) GetFunctionStatsWithInstanceID(tenant, namespace, name string,
instanceID int) (utils.FunctionInstanceStatsData, error) {
var functionInstanceStatsData utils.FunctionInstanceStatsData
id := fmt.Sprintf("%d", instanceID)
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, id)
err := f.pulsar.Client.Get(endpoint+"/stats", &functionInstanceStatsData)
return functionInstanceStatsData, err
}
func (f *functions) GetFunctionState(tenant, namespace, name, key string) (utils.FunctionState, error) {
var functionState utils.FunctionState
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, "state", key)
err := f.pulsar.Client.Get(endpoint, &functionState)
return functionState, err
}
func (f *functions) PutFunctionState(tenant, namespace, name string, state utils.FunctionState) error {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, "state", state.Key)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
stateData, err := json.Marshal(state)
if err != nil {
return err
}
stateWriter, err := f.createStringFromField(multiPartWriter, "state")
if err != nil {
return err
}
_, err = stateWriter.Write(stateData)
if err != nil {
return err
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = f.pulsar.Client.PostWithMultiPart(endpoint, nil, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (f *functions) TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile string) (string, error) {
endpoint := f.pulsar.endpoint(f.basePath, tenant, namespace, name, "trigger")
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
if triggerFile != "" {
file, err := os.Open(triggerFile)
if err != nil {
return "", err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("dataStream", filepath.Base(file.Name()))
if err != nil {
return "", err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return "", err
}
}
if triggerValue != "" {
valueWriter, err := f.createTextFromFiled(multiPartWriter, "data")
if err != nil {
return "", err
}
_, err = valueWriter.Write([]byte(triggerValue))
if err != nil {
return "", err
}
}
if topic != "" {
topicWriter, err := f.createTextFromFiled(multiPartWriter, "topic")
if err != nil {
return "", err
}
_, err = topicWriter.Write([]byte(topic))
if err != nil {
return "", err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err := multiPartWriter.Close(); err != nil {
return "", err
}
contentType := multiPartWriter.FormDataContentType()
var str string
err := f.pulsar.Client.PostWithMultiPart(endpoint, &str, bodyBuf, contentType)
if err != nil {
return "", err
}
return str, nil
}
func (f *functions) Upload(sourceFile, path string) error {
if strings.TrimSpace(sourceFile) == "" && strings.TrimSpace(path) == "" {
return fmt.Errorf("source file or path is empty")
}
file, err := os.Open(sourceFile)
if err != nil {
return err
}
defer file.Close()
endpoint := f.pulsar.endpoint(f.basePath, "upload")
var b bytes.Buffer
w := multipart.NewWriter(&b)
writer, err := w.CreateFormFile("data", file.Name())
if err != nil {
return err
}
_, err = io.Copy(writer, file)
if err != nil {
return err
}
if err := w.WriteField("path", path); err != nil {
return err
}
err = w.Close()
if err != nil {
return err
}
return f.pulsar.Client.PostWithMultiPart(endpoint, nil, &b, w.FormDataContentType())
}