pipe/local.go (104 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package pipe
import (
"database/sql"
"fmt"
"sync"
"github.com/uber/storagetapper/config"
"golang.org/x/net/context"
//"context"
)
//localPipe pipe based on channels
type localPipe struct {
mutex sync.Mutex
ch map[string](chan interface{})
cfg config.PipeConfig
}
//localProducerConsumer implements both producer and consumer
type localProducerConsumer struct {
baseConsumer
ch chan interface{}
}
func init() {
registerPlugin("local", initLocalPipe)
}
func initLocalPipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
return &localPipe{ch: make(map[string](chan interface{})), cfg: *cfg}, nil
}
//Type returns type of the type
func (p *localPipe) Type() string {
return "local"
}
// Config returns pipe configuration
func (p *localPipe) Config() *config.PipeConfig {
return &p.cfg
}
// Close releases resources associated with the pipe
func (p *localPipe) Close() error {
return nil
}
func (p *localPipe) registerProducerConsumer(key string, consumer bool) (*localProducerConsumer, error) {
p.mutex.Lock()
ch := p.ch[key]
if ch == nil {
ch = make(chan interface{}, p.cfg.MaxBatchSize)
p.ch[key] = ch
}
p.mutex.Unlock()
l := &localProducerConsumer{ch: ch}
if consumer {
l.initBaseConsumer(l.fetchNext)
} else {
l.ctx, l.cancel = context.WithCancel(context.Background())
}
return l, nil
}
//NewConsumer registers consumer with the given pipe name
func (p *localPipe) NewConsumer(key string) (Consumer, error) {
return p.registerProducerConsumer(key, true)
}
//NewProducer registers producer with the given pipe name
func (p *localPipe) NewProducer(key string) (Producer, error) {
return p.registerProducerConsumer(key, false)
}
func (p *localProducerConsumer) pushLow(b interface{}) error {
select {
case p.ch <- b:
return nil
case <-p.ctx.Done():
}
return fmt.Errorf("context canceled")
}
//Push pushes the given message to pipe
func (p *localProducerConsumer) Push(data interface{}) error {
return p.pushLow(data)
}
//PushBatch stashes the given message in pipe buffer, all stashed messages will
//be send by subsequent PushBatchCommit call
func (p *localProducerConsumer) PushBatch(key string, data interface{}) error {
return p.pushLow(data)
}
//PushBatchCommit sends all stashed messages to pipe
func (p *localProducerConsumer) PushBatchCommit() error {
//TODO: Drain the channel here
return nil
}
func (p *localProducerConsumer) PushSchema(key string, data []byte) error {
return p.PushBatch(key, data)
}
/*
func (p *localProducerConsumer) FetchNext() (interface{}, error) {
return p.fetchNext()
}
*/
func (p *localProducerConsumer) fetchNext() (interface{}, error) {
select {
case msg := <-p.ch:
return msg, nil
case <-p.ctx.Done():
}
return nil, nil
}
//PushK pushes keyed message to pipe
func (p *localProducerConsumer) PushK(key string, b interface{}) error {
return p.Push(b)
}
//Close producer/consumer
func (p *localProducerConsumer) close(graceful bool) error {
p.cancel()
p.wg.Wait()
return nil
}
func (p *localProducerConsumer) Close() error {
return p.close(true)
}
func (p *localProducerConsumer) CloseOnFailure() error {
return p.close(false)
}
//SaveOffset is not applicable for local pipe
func (p *localProducerConsumer) SaveOffset() error {
return nil
}
//SetFormat specifies format, which pipe can pass down the stack
func (p *localProducerConsumer) SetFormat(format string) {
}
func (p *localProducerConsumer) PartitionKey(_ string, key string) string {
return key
}