datahub/topic.go (103 lines of code) (raw):
package datahub
import (
"encoding/json"
"fmt"
)
type Field struct {
Name string `json:"name"`
Type FieldType `json:"type"`
AllowNull bool `json:"notnull"`
Comment string `json:"comment"`
}
// RecordSchema
type RecordSchema struct {
Fields []Field `json:"fields"`
fieldIndexMap map[string]int `json:"-"`
}
// NewRecordSchema create a new record schema for tuple record
func NewRecordSchema() *RecordSchema {
return &RecordSchema{
Fields: make([]Field, 0),
fieldIndexMap: make(map[string]int),
}
}
func NewRecordSchemaFromJson(SchemaJson string) (recordSchema *RecordSchema, err error) {
recordSchema = &RecordSchema{}
if err = json.Unmarshal([]byte(SchemaJson), recordSchema); err != nil {
return
}
for _, v := range recordSchema.Fields {
if !validateFieldType(v.Type) {
panic(fmt.Sprintf("field type %q illegal", v.Type))
}
}
return
}
func (rs *RecordSchema) UnmarshalJSON(data []byte) error {
schema := &struct {
Fields []Field `json:"fields"`
}{}
if err := json.Unmarshal(data, schema); err != nil {
return err
}
rs.fieldIndexMap = make(map[string]int)
for _, v := range schema.Fields {
rs.AddField(v)
}
return nil
}
func (rs *RecordSchema) String() string {
type FieldHelper struct {
Name string `json:"name"`
Type FieldType `json:"type"`
NotNull bool `json:"notnull,omitempty"`
Comment string `json:"comment,omitempty"`
}
fields := make([]FieldHelper, 0, rs.Size())
for _, field := range rs.Fields {
tmpField := FieldHelper{field.Name, field.Type, !field.AllowNull, field.Comment}
fields = append(fields, tmpField)
}
tmpSchema := struct {
Fields []FieldHelper `json:"fields"`
}{fields}
buf, _ := json.Marshal(tmpSchema)
return string(buf)
}
// AddField add a field
func (rs *RecordSchema) AddField(f Field) *RecordSchema {
if !validateFieldType(f.Type) {
panic(fmt.Sprintf("field type %q illegal", f.Type))
}
for _, v := range rs.Fields {
if v.Name == f.Name {
panic(fmt.Sprintf("field %q duplicated", f.Name))
}
}
rs.Fields = append(rs.Fields, f)
rs.fieldIndexMap[f.Name] = len(rs.Fields) - 1
return rs
}
// GetFieldIndex get index of given field
func (rs *RecordSchema) GetFieldIndex(fname string) int {
if idx, ok := rs.fieldIndexMap[fname]; ok {
return idx
}
return -1
}
func (rs *RecordSchema) GetFieldByIndex(idx int) (*Field, error) {
if idx < 0 || idx >= len(rs.Fields) {
return nil, fmt.Errorf("invalid Filed index %d", idx)
}
return &rs.Fields[idx], nil
}
func (rs *RecordSchema) GetFieldByName(fname string) (*Field, error) {
idx := rs.GetFieldIndex(fname)
if idx == -1 {
return nil, fmt.Errorf("field %s not exists", fname)
}
return rs.GetFieldByIndex(idx)
}
// Size get record schema fields size
func (rs *RecordSchema) Size() int {
return len(rs.Fields)
}
type RecordSchemaInfo struct {
VersionId int `json:"VersionId"`
RecordSchema RecordSchema `json:"RecordSchema"`
}