in pulsaradmin/pkg/admin/sinks.go [307:369]
func (s *sinks) UpdateSinkWithURL(config *utils.SinkConfig, pkgURL string, updateOptions *utils.UpdateOptions) error {
endpoint := s.pulsar.endpoint(s.basePath, config.Tenant, config.Namespace, config.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
textWriter, err := s.createTextFromFiled(multiPartWriter, "url")
if err != nil {
return err
}
_, err = textWriter.Write([]byte(pkgURL))
if err != nil {
return err
}
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
stringWriter, err := s.createStringFromField(multiPartWriter, "sinkConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if updateOptions != nil {
updateData, err := json.Marshal(updateOptions)
if err != nil {
return err
}
updateStrWriter, err := s.createStringFromField(multiPartWriter, "updateOptions")
if err != nil {
return err
}
_, err = updateStrWriter.Write(updateData)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = s.pulsar.Client.PutWithMultiPart(endpoint, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}