in pulsaradmin/pkg/admin/sources.go [233:306]
func (s *sources) UpdateSource(config *utils.SourceConfig, fileName string, updateOptions *utils.UpdateOptions) error {
endpoint := s.pulsar.endpoint(s.basePath, config.Tenant, config.Namespace, config.Name)
// buffer to store our request as bytes
bodyBuf := bytes.NewBufferString("")
multiPartWriter := multipart.NewWriter(bodyBuf)
jsonData, err := json.Marshal(config)
if err != nil {
return err
}
stringWriter, err := s.createStringFromField(multiPartWriter, "sourceConfig")
if err != nil {
return err
}
_, err = stringWriter.Write(jsonData)
if err != nil {
return err
}
if updateOptions != nil {
updateData, err := json.Marshal(updateOptions)
if err != nil {
return err
}
updateStrWriter, err := s.createStringFromField(multiPartWriter, "updateOptions")
if err != nil {
return err
}
_, err = updateStrWriter.Write(updateData)
if err != nil {
return err
}
}
if fileName != "" && !strings.HasPrefix(fileName, "builtin://") {
// If the function code is built in, we don't need to submit here
file, err := os.Open(fileName)
if err != nil {
return err
}
defer file.Close()
part, err := multiPartWriter.CreateFormFile("data", filepath.Base(file.Name()))
if err != nil {
return err
}
// copy the actual file content to the filed's writer
_, err = io.Copy(part, file)
if err != nil {
return err
}
}
// In here, we completed adding the file and the fields, let's close the multipart writer
// So it writes the ending boundary
if err = multiPartWriter.Close(); err != nil {
return err
}
contentType := multiPartWriter.FormDataContentType()
err = s.pulsar.Client.PutWithMultiPart(endpoint, bodyBuf, contentType)
if err != nil {
return err
}
return nil
}