pkg/flow/streaming/streaming.go (78 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package streaming implement the flow framework to provide the sliding window, top-n aggregation, and etc. package streaming import ( "context" "time" "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/pkg/flow" ) var _ flow.Flow = (*streamingFlow)(nil) type streamingFlow struct { ctx context.Context source flow.Source sink flow.Sink drain chan error ops []flow.Operator } // New returns a new streaming flow. func New(source flow.Source) flow.Flow { return &streamingFlow{ source: source, ops: make([]flow.Operator, 0), drain: make(chan error), } } func (f *streamingFlow) init() error { f.prepareContext() return nil } func (f *streamingFlow) prepareContext() { if f.ctx == nil { f.ctx = context.TODO() } // TODO: add more runtime utilities } func (f *streamingFlow) To(sink flow.Sink) flow.Flow { f.sink = sink return f } func (f *streamingFlow) Close() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err := f.source.Teardown(ctx) for _, op := range f.ops { err = multierr.Append(err, op.Teardown(ctx)) } defer close(f.drain) return multierr.Append(err, f.sink.Teardown(ctx)) } func (f *streamingFlow) Open() <-chan error { if err := f.init(); err != nil { go f.drainErr(err) return f.drain } // setup sources if err := f.source.Setup(f.ctx); err != nil { go f.drainErr(err) return f.drain } // setup all operators one by one for _, op := range f.ops { if err := op.Setup(f.ctx); err != nil { go f.drainErr(err) return f.drain } } // setup sink if err := f.sink.Setup(f.ctx); err != nil { go f.drainErr(err) return f.drain } // connect all operator and sink for i := len(f.ops) - 1; i >= 0; i-- { last := i == len(f.ops)-1 if last { f.ops[i].Exec(f.sink) } else { f.ops[i].Exec(f.ops[i+1]) } } // finally connect sources and the first operator f.source.Exec(f.ops[0]) return f.drain } func (f *streamingFlow) drainErr(err error) { f.drain <- err }