datahub/types.go (406 lines of code) (raw):

package datahub import ( "encoding/json" "fmt" "math" "strconv" "github.com/shopspring/decimal" ) type DataType interface { fmt.Stringer } // Bigint type Bigint int64 func (bi Bigint) String() string { return strconv.FormatInt(int64(bi), 10) } // String type String string func (str String) String() string { return string(str) } // Boolean type Boolean bool func (bl Boolean) String() string { return strconv.FormatBool(bool(bl)) } // Double type Double float64 func (d Double) String() string { return strconv.FormatFloat(float64(d), 'f', -1, 64) } // Timestamp type Timestamp uint64 func (t Timestamp) String() string { return strconv.FormatUint(uint64(t), 10) } // DECIMAL type Decimal decimal.Decimal func (d Decimal) String() string { return decimal.Decimal(d).String() } type Integer int32 func (i Integer) String() string { return strconv.FormatInt(int64(i), 10) } type Float float32 func (f Float) String() string { return strconv.FormatFloat(float64(f), 'f', -1, 32) } type Tinyint int8 func (ti Tinyint) String() string { return strconv.FormatInt(int64(ti), 10) } type Smallint int16 func (si Smallint) String() string { return strconv.FormatInt(int64(si), 10) } // FieldType type FieldType string func (ft FieldType) String() string { return string(ft) } const ( // BIGINT 8-bit long signed integer, not include (-9223372036854775808) // -9223372036854775807 ~ 9223372036854775807 BIGINT FieldType = "BIGINT" // only support utf-8 // 1Mb max size STRING FieldType = "STRING" // BOOLEAN // True/False,true/false, 0/1 BOOLEAN FieldType = "BOOLEAN" // DOUBLE 8-bit double // -1.0 * 10^308 ~ 1.0 * 10^308 DOUBLE FieldType = "DOUBLE" // TIMESTAMP // unit: us TIMESTAMP FieldType = "TIMESTAMP" // DECIMAL // can "only" represent numbers with a maximum of 2^31 digits after the decimal point. DECIMAL FieldType = "DECIMAL" // 4-byte signed integer INTEGER FieldType = "INTEGER" // Float type FLOAT FieldType = "FLOAT" // 1-byte signed integer TINYINT FieldType = "TINYINT" // 2-byte signed integer SMALLINT FieldType = "SMALLINT" ) // validateFieldType validate field type func validateFieldType(ft FieldType) bool { switch ft { case BIGINT, STRING, BOOLEAN, DOUBLE, TIMESTAMP, DECIMAL, INTEGER, FLOAT, TINYINT, SMALLINT: return true default: return false } } func getIntegerValue(val interface{}) (int64, error) { var realval int64 switch v := val.(type) { case int: realval = int64(v) case int8: realval = int64(v) case int16: realval = int64(v) case int32: realval = int64(v) case int64: realval = int64(v) case uint: realval = int64(v) case uint8: realval = int64(v) case uint16: realval = int64(v) case uint32: realval = int64(v) case Bigint: realval = int64(v) case Integer: realval = int64(v) case Smallint: realval = int64(v) case Tinyint: realval = int64(v) case uint64: if v > 9223372036854775807 { return 0, fmt.Errorf("Integer type field must be in [-9223372036854775807,9223372036854775807]") } realval = int64(v) case json.Number: nval, err := v.Int64() if err != nil { return 0, err } realval = int64(nval) default: return 0, fmt.Errorf("value type[%T] not match field type", val) } return realval, nil } // validateFieldValue validate field value func validateFieldValue(ft FieldType, val interface{}) (DataType, error) { switch ft { case BIGINT: realval, err := getIntegerValue(val) if err != nil { return nil, err } if int64(realval) < -9223372036854775807 || int64(realval) > 9223372036854775807 { return nil, fmt.Errorf("BIGINT type field must be in [-9223372036854775807,9223372036854775807]") } return Bigint(realval), nil case STRING: var realval String switch v := val.(type) { case String: realval = v case string: realval = String(v) default: return nil, fmt.Errorf("value type[%T] not match field type[STRING]", val) } return realval, nil case BOOLEAN: switch v := val.(type) { case Boolean: return v, nil case bool: return Boolean(v), nil default: return nil, fmt.Errorf("value type[%T] not match field type[BOOLEAN]", val) } case DOUBLE: switch v := val.(type) { case Double: return v, nil case float64: return Double(v), nil case json.Number: nval, err := v.Float64() if err != nil { return nil, err } return Double(nval), nil default: return nil, fmt.Errorf("value type[%T] not match field type[DOUBLE]", val) } case TIMESTAMP: var realval Timestamp switch v := val.(type) { case Timestamp: realval = v case uint: realval = Timestamp(v) case uint8: realval = Timestamp(v) case uint16: realval = Timestamp(v) case uint32: realval = Timestamp(v) case uint64: realval = Timestamp(v) case int: if v < 0 { return nil, fmt.Errorf("TIMESTAMP type field must be in positive") } realval = Timestamp(v) case int8: if v < 0 { return nil, fmt.Errorf("TIMESTAMP type field must be in positive") } realval = Timestamp(v) case int16: if v < 0 { return nil, fmt.Errorf("TIMESTAMP type field must be in positive") } realval = Timestamp(v) case int32: if v < 0 { return nil, fmt.Errorf("TIMESTAMP type field must be in positive") } realval = Timestamp(v) case int64: if v < 0 { return nil, fmt.Errorf("TIMESTAMP type field must be in positive") } realval = Timestamp(v) case json.Number: nval, err := v.Int64() if err != nil { return nil, err } if nval < 0 { return nil, fmt.Errorf("TIMESTAMP type field must be in positive") } realval = Timestamp(nval) default: return nil, fmt.Errorf("value type[%T] not match field type[TIMESTAMP]", val) } return realval, nil case DECIMAL: var realval Decimal switch v := val.(type) { case decimal.Decimal: realval = Decimal(v) default: return nil, fmt.Errorf("value type[%T] not match field type[DECIMAL]", val) } return realval, nil case INTEGER: realval, err := getIntegerValue(val) if err != nil { return nil, err } if realval > math.MaxInt32 || realval < math.MinInt32 { return nil, fmt.Errorf("%T exceed the range of INTEGER", val) } return Integer(realval), nil case FLOAT: switch v := val.(type) { case Float: return v, nil case float32: return Float(v), nil case json.Number: nval, err := v.Float64() if err != nil { return nil, err } return Float(nval), nil default: return nil, fmt.Errorf("value type[%T] not match field type[FLOAT]", val) } case TINYINT: realval, err := getIntegerValue(val) if err != nil { return nil, err } if realval > math.MaxInt8 || realval < math.MinInt8 { return nil, fmt.Errorf("%T exceed the range of TINYINT", val) } return Tinyint(realval), nil case SMALLINT: realval, err := getIntegerValue(val) if err != nil { return nil, err } if realval > math.MaxInt16 || realval < math.MinInt16 { return nil, fmt.Errorf("%T exceed the range of TINYINT", val) } return Smallint(realval), nil default: return nil, fmt.Errorf("field type[%T] is not illegal", ft) } } // CastValueFromString cast value from string func castValueFromString(str string, ft FieldType) (DataType, error) { switch ft { case BIGINT: v, err := strconv.ParseInt(str, 10, 64) if err == nil { return Bigint(v), nil } return nil, err case STRING: return String(str), nil case BOOLEAN: v, err := strconv.ParseBool(str) if err == nil { return Boolean(v), nil } return nil, err case DOUBLE: v, err := strconv.ParseFloat(str, 64) if err == nil { return Double(v), nil } return nil, err case TIMESTAMP: v, err := strconv.ParseUint(str, 10, 64) if err == nil { return Timestamp(v), nil } return nil, err case DECIMAL: v, err := decimal.NewFromString(str) if err == nil { return Decimal(v), nil } return nil, err case INTEGER: v, err := strconv.ParseInt(str, 10, 32) if err == nil { return Integer(v), nil } return nil, err case FLOAT: v, err := strconv.ParseFloat(str, 32) if err == nil { return Float(v), nil } return nil, err case TINYINT: v, err := strconv.ParseInt(str, 10, 32) if err == nil { return Tinyint(v), nil } return nil, err case SMALLINT: v, err := strconv.ParseInt(str, 10, 32) if err == nil { return Smallint(v), nil } return nil, err default: return nil, fmt.Errorf("not support field type %s", string(ft)) } } // RecordType type RecordType string func (rt RecordType) String() string { return string(rt) } const ( // BLOB record BLOB RecordType = "BLOB" // TUPLE record TUPLE RecordType = "TUPLE" ) type TopicStatus string func (ts TopicStatus) String() string { return string(ts) } const ( TOPIC_ON TopicStatus = "on" TOPIC_OFF TopicStatus = "off" ) type ExpandMode string func (ft ExpandMode) String() string { return string(ft) } const ( SPLIT_EXTEND ExpandMode = "" ONLY_SPLIT ExpandMode = "split" ONLY_EXTEND ExpandMode = "extend" ) // ShardState type ShardState string func (state ShardState) String() string { return string(state) } const ( // OPENING shard is creating or fail over, not available OPENING ShardState = "OPENING" // ACTIVE is available ACTIVE ShardState = "ACTIVE" // CLOSED read-only CLOSED ShardState = "CLOSED" // CLOSING shard is closing, not available CLOSING ShardState = "CLOSING" ) // CursorType type CursorType string func (ct CursorType) String() string { return string(ct) } const ( // OLDEST OLDEST CursorType = "OLDEST" // LATEST LATEST CursorType = "LATEST" // SYSTEM_TIME point to first record after system_time SYSTEM_TIME CursorType = "SYSTEM_TIME" // SEQUENCE point to the specified sequence SEQUENCE CursorType = "SEQUENCE" ) // SubscriptionType type SubscriptionType int const ( // SUBTYPE_USER SUBTYPE_USER SubscriptionType = iota // SUBTYPE_SYSTEM SUBTYPE_SYSTEM // SUBTYPE_TT SUBTYPE_TT ) func (subType SubscriptionType) Value() int { return int(subType) } // SubscriptionState type SubscriptionState int const ( // SUB_OFFLINE SUB_OFFLINE SubscriptionState = iota // SUB_ONLINE SUB_ONLINE ) func (subState SubscriptionState) Value() int { return int(subState) } type OffsetAction string func (oa OffsetAction) String() string { return string(oa) } //const ( // OPENSESSION OffsetAction = "open" // GETOFFSET OffsetAction = "get" // COMMITOFFSET OffsetAction = "commit" // RESETOFFSET OffsetAction = "reset" //) const ( maxWaitingTimeInMs = 600000 minWaitingTimeInMs = 60000 )