datahub/record.go (213 lines of code) (raw):

package datahub import ( "encoding/base64" "encoding/json" "fmt" "reflect" ) // BaseRecord type BaseRecord struct { ShardId string `json:"ShardId,omitempty"` PartitionKey string `json:"PartitionKey,omitempty"` HashKey string `json:"HashKey,omitempty"` SystemTime int64 `json:"SystemTime,omitempty"` Sequence int64 `json:"Sequence,omitempty"` Cursor string `json:"Cursor,omitempty"` NextCursor string `json:"NextCursor,omitempty"` Serial int64 `json:"Serial,omitempty"` Attributes map[string]interface{} `json:"Attributes,omitempty"` } func (br *BaseRecord) GetSystemTime() int64 { return br.SystemTime } func (br *BaseRecord) GetSequence() int64 { return br.Sequence } // SetAttribute set or modify(if exist) attribute func (br *BaseRecord) SetAttribute(key string, val interface{}) { if br.Attributes == nil { br.Attributes = make(map[string]interface{}) } br.Attributes[key] = val } func (br *BaseRecord) GetAttributes() map[string]interface{} { return br.Attributes } // RecordEntry type RecordEntry struct { Data interface{} `json:"Data"` BaseRecord } // IRecord record interface type IRecord interface { fmt.Stringer GetSystemTime() int64 GetSequence() int64 GetData() interface{} FillData(data interface{}) error GetBaseRecord() BaseRecord SetBaseRecord(baseRecord BaseRecord) SetAttribute(key string, val interface{}) GetAttributes() map[string]interface{} } // BlobRecord blob type record type BlobRecord struct { RawData []byte BaseRecord } // NewBlobRecord new a tuple type record from given record schema func NewBlobRecord(bytedata []byte, systemTime int64) *BlobRecord { br := &BlobRecord{} br.RawData = bytedata br.Attributes = make(map[string]interface{}) br.SystemTime = systemTime return br } func (br *BlobRecord) String() string { record := struct { Data string `json:"Data"` Attributes map[string]interface{} `json:"Attributes"` }{ Data: string(br.RawData), Attributes: br.Attributes, } byts, _ := json.Marshal(record) return string(byts) } // FillData implement of IRecord interface func (br *BlobRecord) FillData(data interface{}) error { switch v := data.(type) { case string: bytedata, err := base64.StdEncoding.DecodeString(v) if err != nil { return err } br.RawData = bytedata case []byte: br.RawData = v default: return fmt.Errorf("invalid data type: %s", reflect.TypeOf(data)) } return nil } // GetData implement of IRecord interface func (br *BlobRecord) GetData() interface{} { return br.RawData } // GetBaseRecord get base record enbry func (br *BlobRecord) GetBaseRecord() BaseRecord { return br.BaseRecord } func (br *BlobRecord) SetBaseRecord(baseRecord BaseRecord) { br.BaseRecord = baseRecord } // TupleRecord tuple type record type TupleRecord struct { RecordSchema *RecordSchema Values []DataType BaseRecord } // NewTupleRecord new a tuple type record from given record schema func NewTupleRecord(schema *RecordSchema, systemTime int64) *TupleRecord { tr := &TupleRecord{} if schema != nil { tr.RecordSchema = schema tr.Values = make([]DataType, schema.Size()) } tr.Attributes = make(map[string]interface{}) tr.SystemTime = systemTime for idx := range tr.Values { tr.Values[idx] = nil } return tr } func (tr *TupleRecord) String() string { record := struct { RecordSchema *RecordSchema `json:"RecordSchema"` Values []DataType `json:"Values"` Attributes map[string]interface{} `json:"Attributes"` }{ RecordSchema: tr.RecordSchema, Values: tr.Values, Attributes: tr.Attributes, } byts, _ := json.Marshal(record) return string(byts) } // SetValueByIdx set a value by idx func (tr *TupleRecord) SetValueByIdx(idx int, val interface{}) *TupleRecord { if idx < 0 || idx >= tr.RecordSchema.Size() { panic(fmt.Sprintf("index[%d] out range", idx)) } field := tr.RecordSchema.Fields[idx] if val == nil && !field.AllowNull { panic(fmt.Sprintf("[%s] not allow null", field.Name)) } v, err := validateFieldValue(field.Type, val) if err != nil { panic(err) } tr.Values[idx] = v return tr } // SetValueByName set a value by name func (tr *TupleRecord) SetValueByName(name string, val interface{}) *TupleRecord { idx := tr.RecordSchema.GetFieldIndex(name) return tr.SetValueByIdx(idx, val) } func (tr *TupleRecord) GetValueByIdx(idx int) DataType { return tr.Values[idx] } func (tr *TupleRecord) GetValueByName(name string) DataType { idx := tr.RecordSchema.GetFieldIndex(name) return tr.GetValueByIdx(idx) } func (tr *TupleRecord) GetValues() map[string]DataType { values := make(map[string]DataType) for i, f := range tr.RecordSchema.Fields { values[f.Name] = tr.Values[i] } return values } // SetValues batch set values func (tr *TupleRecord) SetValues(values []DataType) *TupleRecord { if fsize := tr.RecordSchema.Size(); fsize != len(values) { panic(fmt.Sprintf("values size not match field size(field.size=%d, values.size=%d)", fsize, len(values))) } for idx, val := range values { v, err := validateFieldValue(tr.RecordSchema.Fields[idx].Type, val) if err != nil { panic(err) } tr.Values[idx] = v } return tr } // FillData implement of IRecord interface func (tr *TupleRecord) FillData(data interface{}) error { datas, ok := data.([]interface{}) if !ok { return fmt.Errorf("data must be array") } //else if fsize := tr.RecordSchema.Size(); len(datas) != fsize { // return fmt.Errorf("data array size not match field size(field.size=%d, values.size=%d)", fsize, len(datas)) //} for idx, v := range datas { if v != nil { s, ok := v.(string) if !ok { return fmt.Errorf("data value type[%T] illegal", v) } tv, err := castValueFromString(s, tr.RecordSchema.Fields[idx].Type) if err != nil { return err } tr.Values[idx] = tv } } return nil } // GetData implement of IRecord interface func (tr *TupleRecord) GetData() interface{} { result := make([]interface{}, len(tr.Values)) for idx, val := range tr.Values { if val != nil { result[idx] = val.String() } else { result[idx] = nil } } return result } // GetBaseRecord get base record entry func (tr *TupleRecord) GetBaseRecord() BaseRecord { return tr.BaseRecord } func (tr *TupleRecord) SetBaseRecord(baseRecord BaseRecord) { tr.BaseRecord = baseRecord } type FailedRecord struct { Index int `json:"Index"` ErrorCode string `json:"ErrorCode"` ErrorMessage string `json:"ErrorMessage"` }