pulsaradmin/pkg/admin/sinks.go (298 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/textproto"
"os"
"path/filepath"
"strings"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Sinks is admin interface for sinks management
type Sinks interface {
// ListSinks returns the list of all the Pulsar Sinks.
ListSinks(tenant, namespace string) ([]string, error)
// GetSink returns the configuration for the specified sink
GetSink(tenant, namespace, Sink string) (utils.SinkConfig, error)
// CreateSink creates a new sink
CreateSink(config *utils.SinkConfig, fileName string) error
// CreateSinkWithURL creates a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
CreateSinkWithURL(config *utils.SinkConfig, pkgURL string) error
// UpdateSink updates the configuration for a sink.
UpdateSink(config *utils.SinkConfig, fileName string, options *utils.UpdateOptions) error
// UpdateSinkWithURL updates a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
UpdateSinkWithURL(config *utils.SinkConfig, pkgURL string, options *utils.UpdateOptions) error
// DeleteSink deletes an existing sink
DeleteSink(tenant, namespace, Sink string) error
// GetSinkStatus returns the current status of a sink.
GetSinkStatus(tenant, namespace, Sink string) (utils.SinkStatus, error)
// GetSinkStatusWithID returns the current status of a sink instance.
GetSinkStatusWithID(tenant, namespace, Sink string, id int) (utils.SinkInstanceStatusData, error)
// RestartSink restarts all sink instances
RestartSink(tenant, namespace, Sink string) error
// RestartSinkWithID restarts sink instance
RestartSinkWithID(tenant, namespace, Sink string, id int) error
// StopSink stops all sink instances
StopSink(tenant, namespace, Sink string) error
// StopSinkWithID stops sink instance
StopSinkWithID(tenant, namespace, Sink string, id int) error
// StartSink starts all sink instances
StartSink(tenant, namespace, Sink string) error
// StartSinkWithID starts sink instance
StartSinkWithID(tenant, namespace, Sink string, id int) error
// GetBuiltInSinks fetches a list of supported Pulsar IO sinks currently running in cluster mode
GetBuiltInSinks() ([]*utils.ConnectorDefinition, error)
// ReloadBuiltInSinks reload the available built-in connectors, include Source and Sink
ReloadBuiltInSinks() error
}
type sinks struct {
pulsar *pulsarClient
basePath string
}
// Sinks is used to access the sinks endpoints
func (c *pulsarClient) Sinks() Sinks {
return &sinks{
pulsar: c,
basePath: "/sinks",
}
}
func (s *sinks) createStringFromField(w *multipart.Writer, value string) (io.Writer, error) {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value))
h.Set("Content-Type", "application/json")
return w.CreatePart(h)
}
func (s *sinks) createTextFromFiled(w *multipart.Writer, value string) (io.Writer, error) {
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="%s" `, value))
h.Set("Content-Type", "text/plain")
return w.CreatePart(h)
}
func (s *sinks) ListSinks(tenant, namespace string) ([]string, error) {
var sinks []string
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace)
err := s.pulsar.Client.Get(endpoint, &sinks)
return sinks, err
}
func (s *sinks) GetSink(tenant, namespace, sink string) (utils.SinkConfig, error) {
var sinkConfig utils.SinkConfig
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink)
err := s.pulsar.Client.Get(endpoint, &sinkConfig)
return sinkConfig, err
}
func (s *sinks) CreateSink(config *utils.SinkConfig, fileName string) error {
endpoint := s.pulsar.endpoint(s.basePath, config.Tenant, config.Namespace, config.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
stringWriter, err := s.createStringFromField(multiPartWriter, "sinkConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if fileName != "" && !strings.HasPrefix(fileName, "builtin://") {
// If the function code is built in, we don't need to submit here
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("data", filepath.Base(file.Name()))
if err != nil {
return err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = s.pulsar.Client.PostWithMultiPart(endpoint, nil, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (s *sinks) CreateSinkWithURL(config *utils.SinkConfig, pkgURL string) error {
endpoint := s.pulsar.endpoint(s.basePath, config.Tenant, config.Namespace, config.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
textWriter, err := s.createTextFromFiled(multiPartWriter, "url")
if err != nil {
return err
}
_, err = textWriter.Write([]byte(pkgURL))
if err != nil {
return err
}
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
stringWriter, err := s.createStringFromField(multiPartWriter, "sinkConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = s.pulsar.Client.PostWithMultiPart(endpoint, nil, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (s *sinks) UpdateSink(config *utils.SinkConfig, fileName string, updateOptions *utils.UpdateOptions) error {
endpoint := s.pulsar.endpoint(s.basePath, config.Tenant, config.Namespace, config.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
stringWriter, err := s.createStringFromField(multiPartWriter, "sinkConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if updateOptions != nil {
updateData, err := json.Marshal(updateOptions)
if err != nil {
return err
}
updateStrWriter, err := s.createStringFromField(multiPartWriter, "updateOptions")
if err != nil {
return err
}
_, err = updateStrWriter.Write(updateData)
if err != nil {
return err
}
}
if fileName != "" && !strings.HasPrefix(fileName, "builtin://") {
// If the function code is built in, we don't need to submit here
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("data", filepath.Base(file.Name()))
if err != nil {
return err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = s.pulsar.Client.PutWithMultiPart(endpoint, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (s *sinks) UpdateSinkWithURL(config *utils.SinkConfig, pkgURL string, updateOptions *utils.UpdateOptions) error {
endpoint := s.pulsar.endpoint(s.basePath, config.Tenant, config.Namespace, config.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
textWriter, err := s.createTextFromFiled(multiPartWriter, "url")
if err != nil {
return err
}
_, err = textWriter.Write([]byte(pkgURL))
if err != nil {
return err
}
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
stringWriter, err := s.createStringFromField(multiPartWriter, "sinkConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if updateOptions != nil {
updateData, err := json.Marshal(updateOptions)
if err != nil {
return err
}
updateStrWriter, err := s.createStringFromField(multiPartWriter, "updateOptions")
if err != nil {
return err
}
_, err = updateStrWriter.Write(updateData)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = s.pulsar.Client.PutWithMultiPart(endpoint, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}
func (s *sinks) DeleteSink(tenant, namespace, sink string) error {
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink)
return s.pulsar.Client.Delete(endpoint)
}
func (s *sinks) GetSinkStatus(tenant, namespace, sink string) (utils.SinkStatus, error) {
var sinkStatus utils.SinkStatus
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink)
err := s.pulsar.Client.Get(endpoint+"/status", &sinkStatus)
return sinkStatus, err
}
func (s *sinks) GetSinkStatusWithID(tenant, namespace, sink string, id int) (utils.SinkInstanceStatusData, error) {
var sinkInstanceStatusData utils.SinkInstanceStatusData
instanceID := fmt.Sprintf("%d", id)
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink, instanceID)
err := s.pulsar.Client.Get(endpoint+"/status", &sinkInstanceStatusData)
return sinkInstanceStatusData, err
}
func (s *sinks) RestartSink(tenant, namespace, sink string) error {
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink)
return s.pulsar.Client.Post(endpoint+"/restart", nil)
}
func (s *sinks) RestartSinkWithID(tenant, namespace, sink string, instanceID int) error {
id := fmt.Sprintf("%d", instanceID)
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink, id)
return s.pulsar.Client.Post(endpoint+"/restart", nil)
}
func (s *sinks) StopSink(tenant, namespace, sink string) error {
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink)
return s.pulsar.Client.Post(endpoint+"/stop", nil)
}
func (s *sinks) StopSinkWithID(tenant, namespace, sink string, instanceID int) error {
id := fmt.Sprintf("%d", instanceID)
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink, id)
return s.pulsar.Client.Post(endpoint+"/stop", nil)
}
func (s *sinks) StartSink(tenant, namespace, sink string) error {
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink)
return s.pulsar.Client.Post(endpoint+"/start", nil)
}
func (s *sinks) StartSinkWithID(tenant, namespace, sink string, instanceID int) error {
id := fmt.Sprintf("%d", instanceID)
endpoint := s.pulsar.endpoint(s.basePath, tenant, namespace, sink, id)
return s.pulsar.Client.Post(endpoint+"/start", nil)
}
func (s *sinks) GetBuiltInSinks() ([]*utils.ConnectorDefinition, error) {
var connectorDefinition []*utils.ConnectorDefinition
endpoint := s.pulsar.endpoint(s.basePath, "builtinsinks")
err := s.pulsar.Client.Get(endpoint, &connectorDefinition)
return connectorDefinition, err
}
func (s *sinks) ReloadBuiltInSinks() error {
endpoint := s.pulsar.endpoint(s.basePath, "reloadBuiltInSinks")
return s.pulsar.Client.Post(endpoint, nil)
}