pipe/pipe.go (113 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package pipe
import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
//"context"
"github.com/uber/storagetapper/config"
)
//Consumer consumer interface for the pipe
type Consumer interface {
Close() error
//CloseOnFailure doesn't save offsets
CloseOnFailure() error
Message() chan interface{}
Error() chan error
FetchNext() (interface{}, error)
//Allows to explicitly persists current consumer position
SaveOffset() error
//SetFormat allow to tell consumer the format of the file when there is no
//header
SetFormat(format string)
}
//Producer producer interface for pipe
type Producer interface {
Push(data interface{}) error
PushK(key string, data interface{}) error
PushSchema(key string, data []byte) error
//PushBatch queues the messages instead of sending immediately
PushBatch(key string, data interface{}) error
//PushCommit writes out all the messages queued by PushBatch
PushBatchCommit() error
Close() error
CloseOnFailure() error
SetFormat(format string)
PartitionKey(source string, key string) string
}
//Pipe connects named producers and consumers
type Pipe interface {
NewConsumer(topic string) (Consumer, error)
NewProducer(topic string) (Producer, error)
Type() string
Config() *config.PipeConfig
Close() error
}
type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error)
//Pipes is the list of registered pipes
//Plugins insert their constructors into this map
var Pipes map[string]constructor
//registerPlugin should be called from plugin's init
func registerPlugin(name string, init constructor) {
if Pipes == nil {
Pipes = make(map[string]constructor)
}
Pipes[name] = init
}
//Create is a pipe factory
//pctx is used to be able to cancel blocking calls inside pipe, like during
//shutdown
func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) {
init := Pipes[strings.ToLower(pipeType)]
if init == nil {
return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType))
}
pipe, err := init(cfg, db)
if err != nil {
return nil, err
}
return pipe, nil
}
type baseConsumer struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
msgCh chan interface{}
errCh chan error
}
type fetchFunc func() (interface{}, error)
func (p *baseConsumer) initBaseConsumer(fn fetchFunc) {
p.ctx, p.cancel = context.WithCancel(context.Background())
p.msgCh = make(chan interface{})
p.errCh = make(chan error)
p.wg.Add(1)
go p.fetchLoop(fn)
}
func (p *baseConsumer) Message() chan interface{} {
return p.msgCh
}
func (p *baseConsumer) Error() chan error {
return p.errCh
}
func (p *baseConsumer) FetchNext() (interface{}, error) {
select {
case msg := <-p.msgCh:
return msg, nil
case err := <-p.errCh:
return nil, err
case <-p.ctx.Done():
}
return nil, nil
}
func (p *baseConsumer) fetchLoop(fn fetchFunc) {
defer p.wg.Done()
for {
msg, err := fn()
if err != nil {
p.sendErr(err)
return
}
if !p.sendMsg(msg) || msg == nil {
return
}
}
}
func (p *baseConsumer) sendMsg(msg interface{}) bool {
select {
case p.msgCh <- msg:
return true
case <-p.ctx.Done():
}
return false
}
func (p *baseConsumer) sendErr(err error) {
select {
case p.errCh <- err:
case <-p.ctx.Done():
}
}