pulsaradmin/pkg/admin/schema.go (156 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package admin import ( "fmt" "strconv" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) // Schema is admin interface for schema management type Schema interface { // GetSchemaInfo retrieves the latest schema of a topic GetSchemaInfo(topic string) (*utils.SchemaInfo, error) // GetSchemaInfoWithVersion retrieves the latest schema with version of a topic GetSchemaInfoWithVersion(topic string) (*utils.SchemaInfoWithVersion, error) // GetSchemaInfoByVersion retrieves the schema of a topic at a given <tt>version</tt> GetSchemaInfoByVersion(topic string, version int64) (*utils.SchemaInfo, error) // GetAllSchemas retrieves all schemas of a topic GetAllSchemas(topic string) ([]*utils.SchemaInfoWithVersion, error) // DeleteSchema deletes the schema associated with a given <tt>topic</tt> DeleteSchema(topic string) error // ForceDeleteSchema force deletes the schema associated with a given <tt>topic</tt> ForceDeleteSchema(topic string) error // CreateSchemaByPayload creates a schema for a given <tt>topic</tt> CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error // CreateSchemaBySchemaInfo creates a schema for a given <tt>topic</tt> CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error // GetVersionBySchemaInfo gets the version of a schema GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) // GetVersionByPayload gets the version of a schema GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) // TestCompatibilityWithSchemaInfo tests compatibility with a schema TestCompatibilityWithSchemaInfo(topic string, schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) // TestCompatibilityWithPostSchemaPayload tests compatibility with a schema TestCompatibilityWithPostSchemaPayload(topic string, schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) } type schemas struct { pulsar *pulsarClient basePath string } // Schemas is used to access the schemas endpoints func (c *pulsarClient) Schemas() Schema { return &schemas{ pulsar: c, basePath: "/schemas", } } func (s *schemas) GetSchemaInfo(topic string) (*utils.SchemaInfo, error) { topicName, err := utils.GetTopicName(topic) if err != nil { return nil, err } var response utils.GetSchemaResponse endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schema") err = s.pulsar.Client.Get(endpoint, &response) if err != nil { return nil, err } info := utils.ConvertGetSchemaResponseToSchemaInfo(topicName, response) return info, nil } func (s *schemas) GetSchemaInfoWithVersion(topic string) (*utils.SchemaInfoWithVersion, error) { topicName, err := utils.GetTopicName(topic) if err != nil { return nil, err } var response utils.GetSchemaResponse endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schema") err = s.pulsar.Client.Get(endpoint, &response) if err != nil { fmt.Println("err:", err.Error()) return nil, err } info := utils.ConvertGetSchemaResponseToSchemaInfoWithVersion(topicName, response) return info, nil } func (s *schemas) GetSchemaInfoByVersion(topic string, version int64) (*utils.SchemaInfo, error) { topicName, err := utils.GetTopicName(topic) if err != nil { return nil, err } var response utils.GetSchemaResponse endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schema", strconv.FormatInt(version, 10)) err = s.pulsar.Client.Get(endpoint, &response) if err != nil { return nil, err } info := utils.ConvertGetSchemaResponseToSchemaInfo(topicName, response) return info, nil } func (s *schemas) GetAllSchemas(topic string) ([]*utils.SchemaInfoWithVersion, error) { topicName, err := utils.GetTopicName(topic) if err != nil { return nil, err } var response utils.GetAllSchemasResponse endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schemas") err = s.pulsar.Client.Get(endpoint, &response) if err != nil { return nil, err } infos := utils.ConvertGetAllSchemasResponseToSchemaInfosWithVersion(topicName, response) return infos, nil } func (s *schemas) DeleteSchema(topic string) error { return s.delete(topic, false) } func (s *schemas) ForceDeleteSchema(topic string) error { return s.delete(topic, true) } func (s *schemas) delete(topic string, force bool) error { topicName, err := utils.GetTopicName(topic) if err != nil { return err } endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schema") queryParams := make(map[string]string) queryParams["force"] = strconv.FormatBool(force) return s.pulsar.Client.DeleteWithQueryParams(endpoint, queryParams) } func (s *schemas) CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error { topicName, err := utils.GetTopicName(topic) if err != nil { return err } endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schema") return s.pulsar.Client.Post(endpoint, &schemaPayload) } func (s *schemas) CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error { schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) return s.CreateSchemaByPayload(topic, schemaPayload) } func (s *schemas) GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) { schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) return s.GetVersionByPayload(topic, schemaPayload) } func (s *schemas) GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) { topicName, err := utils.GetTopicName(topic) if err != nil { return 0, err } version := struct { Version int64 `json:"version"` }{} endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "version") err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &version) return version.Version, err } func (s *schemas) TestCompatibilityWithSchemaInfo(topic string, schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) { schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) return s.TestCompatibilityWithPostSchemaPayload(topic, schemaPayload) } func (s *schemas) TestCompatibilityWithPostSchemaPayload(topic string, schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) { topicName, err := utils.GetTopicName(topic) if err != nil { return nil, err } var isCompatibility utils.IsCompatibility endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "compatibility") err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &isCompatibility) return &isCompatibility, err }