pkg/eas/types/types.go (367 lines of code) (raw):

package types import ( "context" "encoding/json" "fmt" "io" "math" "strconv" "strings" "time" ) type Tags map[string]string func (t Tags) Validate() error { if len(t) == 0 { return nil } for key := range t { if strings.HasPrefix(key, "_") { return fmt.Errorf("tag key %q contains prefix _ which is reserved", key) } } return nil } func (t Tags) Empty() bool { if t == nil { return true } return len(t) == 0 } func (t Tags) Equals(t1 Tags) bool { if len(t) != len(t1) { return false } for key, val := range t { if t[key] != val { return false } } return true } func (t Tags) Contains(t1 Tags) bool { if len(t) < len(t1) { return false } count := 0 for key, val := range t1 { if t[key] == val { count++ } else { return false } } return count == len(t1) } func (t Tags) Has(key string) bool { _, ok := t[key] return ok } func (t Tags) Set(key string, value string) { t[key] = value } func (t Tags) Get(key string) string { return t[key] } func (t Tags) Diff(t1 Tags) (add Tags, del Tags, update Tags) { handled := make(map[string]bool, len(t)+len(t1)) add, del, update = Tags{}, Tags{}, Tags{} for key, val := range t { handled[key] = true if val1, exist := t1[key]; exist { if val1 != val { update[key] = val1 } } else { del[key] = val } } for key, val := range t1 { if !handled[key] { add[key] = val } } return } func (t Tags) ToJSON() string { data, _ := json.Marshal(t) return string(data) } type DataFrame struct { Data []byte Index Index Tags Tags Message string } func (f *DataFrame) Empty() bool { return len(f.Data) == 0 && len(f.Tags) == 0 && len(f.Message) == 0 } type DataFrameEncoder interface { // Encode encodes DataFrame into bytes. Encode(frame DataFrame, w io.Writer) error // EncodeList attempts to encode batch of DataFrame into bytes. EncodeList(list []DataFrame, w io.Writer) error } type DataFrameDecoder interface { // Decode decodes DataFrame from bytes. Decode([]byte, *DataFrame) error // DecodeList attempts to decode DataFrameList from bytes. DecodeList([]byte) ([]DataFrame, error) } type AttributesEncoder interface { Encode(Attributes Attributes, w io.Writer) error } type AttributesDecoder interface { Decode([]byte, *Attributes) error } // DataFrameCodec helps to encode or decode a DataFrame from or to bytes. type DataFrameCodec interface { MediaType() string DataFrameEncoder DataFrameDecoder } // AttributesCodec helps to encode or decode Attributes from or to bytes. type AttributesCodec interface { MediaType() string AttributesEncoder AttributesDecoder } func LargestIndex(dfs []DataFrame) Index { var max Index for i := range dfs { if dfs[i].Index > max { max = dfs[i].Index } } return max } type Range struct { LeftInclude bool RightInclude bool PositiveInf bool Begin uint64 End uint64 } func ParseRange(input string) (Range, error) { const ( stateBegin = iota stateEnd stateLVal stateRVal stateDelim stateInf ) var ( err error state = stateBegin result = Range{} lval, rval, inf []rune ) // remove all spaces. for i, c := range strings.ReplaceAll(input, " ", "") { switch state { case stateBegin: switch c { case '(': result.LeftInclude = false state = stateLVal case '[': result.LeftInclude = true state = stateLVal default: return result, fmt.Errorf("invalid character '%c' at index %d", c, i) } case stateLVal: switch c { case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': lval = append(lval, c) case ',': if len(lval) == 0 { return result, fmt.Errorf("malformed range left value") } state = stateDelim default: return result, fmt.Errorf("invalid character '%c' at index %d", c, i) } case stateDelim: result.Begin, err = strconv.ParseUint(string(lval), 10, 64) if err != nil { return result, err } switch c { case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': rval = append(rval, c) state = stateRVal case '+', 'i', 'I': state = stateInf default: return result, fmt.Errorf("invalid character '%c' at index %d", c, i) } case stateRVal: switch c { case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': rval = append(rval, c) case ')', ']': state = stateEnd result.End, err = strconv.ParseUint(string(rval), 10, 64) if err != nil { return result, err } switch c { case ']': result.RightInclude = true case ')': result.RightInclude = false } default: return result, fmt.Errorf("invalid character '%c' at index %d", c, i) } case stateInf: switch c { case 'i', 'n', 'f', '+': inf = append(inf, c) case ')': state = stateEnd if string(inf) == "+inf" || string(inf) == "inf" || string(inf) == "+Inf" || string(inf) == "Inf" { result.PositiveInf = true } else { return result, fmt.Errorf("invalid symbol '%s'", string(inf)) } default: return result, fmt.Errorf("invalid character '%c' at index %d", c, i) } case stateEnd: return result, fmt.Errorf("invalid character '%c' at index %d", c, i) } } return result, nil } func (r Range) String() string { var sb strings.Builder li := "(" ri := ")" if r.LeftInclude { li = "[" } if r.RightInclude && !r.PositiveInf { ri = "]" } sb.WriteString(li) sb.WriteString(fmt.Sprintf("%d", r.Begin)) sb.WriteRune(',') if r.PositiveInf { sb.WriteString("+inf") } else { sb.WriteString(fmt.Sprintf("%d", r.End)) } sb.WriteString(ri) return sb.String() } func (r Range) Empty() bool { return r.Begin == 0 && r.End == 0 } // Watcher is the entity following the stream. type Watcher interface { // Watcher is a kind of DataFrameReader. DataFrameReader // Close stops Watcher and closes the FrameChan. Close() } type Attributes map[string]string // const attributes keys the queue service implement must provide. const ( Backend = "meta.backend" MaxPayloadBytes = "meta.maxPayloadBytes" UserIdentifyHeader = "meta.header.userIdentifyHeader" GroupIdentifyHeader = "meta.header.groupIdentifyHeader" StreamLength = "stream.length" StreamApproximateLength = "stream.approxMaxLength" ) var MaxIndex = FromUint64(uint64(math.MaxUint64)) // Interface of QueueService. Core abstraction for streaming framework. type Interface interface { // End normally emits 'EOS' symbol to end up the queue asynchronously, // but if force set to true, stream ends up directly. // Undelivered data will be truncated. End(ctx context.Context, force bool) error // Truncate truncates data before the specific index. Truncate(ctx context.Context, index uint64) error // Put appends new data into stream. Put(ctx context.Context, data []byte, tags Tags) (index uint64, err error) // Get returns data frames from the index of stream in queue. // Param length specifies the expected message count. // And if timeout is set, this call will block until length got satisfied or // timeout timer fires. Get(ctx context.Context, index uint64, length int, timeout time.Duration, tags Tags) (dfs []DataFrame, err error) // Watch subscribe to queue service, when new data frame is appended through Put method, // watcher will emit it through its result channel. // Param index specifies the beginning message index of the watch. // Param window specifies the largest size the Watcher could transfer at one time. Watch(ctx context.Context, index uint64, indexOnly bool, noAck bool, window uint64) (Watcher, error) // Commit commits indices to make the corresponding messages marked as consumed. Commit(ctx context.Context, del bool, indexes ...uint64) error // Del deletes indices to make the corresponding messages deleted from stream. Del(ctx context.Context, indexes ...uint64) error // Attributes reflects self dynamic attributes by K/V pairs. Attributes() Attributes } type DataFrameReader interface { // FrameChan return a DataFrame channel. FrameChan() <-chan DataFrame } // User authenticated information. type User interface { // Uid represents the user id. Uid() string // Gid represents the group id of user. Gid() string // Token represents the access token of the queue service. Token() string } type UserAware interface { // User returns the user info. User() User } type UserWithToken interface { // Token to access the backend service. Token() string } const ( userKey = "__user__" ) // WithUser saves User into context. func WithUser(ctx context.Context, user User) context.Context { return context.WithValue(ctx, userKey, user) } // UserFromContext loads User from context. func UserFromContext(ctx context.Context) (User, bool) { i := ctx.Value(userKey) if u, ok := i.(User); ok { return u, ok } return nil, false } type WorkerStatus string const ( WorkerRunning WorkerStatus = "Running" WorkerStopped WorkerStatus = "Stopped" WorkerError WorkerStatus = "Error" WorkerUnknown WorkerStatus = "Unknown" ) type StreamStatus string const ( StreamOk StreamStatus = "OK" StreamCancel StreamStatus = "Cancel" StreamEnd StreamStatus = "End" ) const ( OffsetEOS Offset = "eos" ) type Offset string func (o Offset) IsInf() bool { low := strings.ToLower(string(o)) return low == "inf" || low == "+inf" } func (o Offset) Uint64() (uint64, bool) { u, err := strconv.ParseUint(string(o), 10, 64) if err != nil { return 0, false } return u, true } func Compare(o1, o2 Offset) (int, error) { uint1, ok1 := o1.Uint64() uint2, ok2 := o2.Uint64() switch { case ok1 && ok2: if uint1 > uint2 { return 1, nil } else if uint1 < uint2 { return -1, nil } else { return 0, nil } case ok1 && !ok2: if o2 == OffsetEOS || o2.IsInf() { return -1, nil } else { return -2, fmt.Errorf("unexpected offset: %v", o2) } case !ok1 && ok2: if o1 == OffsetEOS || o1.IsInf() { return 1, nil } else { return -2, fmt.Errorf("unexpected offset: %v", o1) } default: if o1 == OffsetEOS && o2 == OffsetEOS { return 0, nil } if o1.IsInf() && o2.IsInf() { return 0, nil } return -2, fmt.Errorf("unexpected compare: %s vs %s", o1, o2) } }