streamer/streamer.go (229 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package streamer
import (
"crypto/md5"
"fmt"
"io"
"os"
"time"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/uber/storagetapper/changelog"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/db"
"github.com/uber/storagetapper/encoder"
"github.com/uber/storagetapper/lock"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/metrics"
"github.com/uber/storagetapper/pipe"
"github.com/uber/storagetapper/shutdown"
"github.com/uber/storagetapper/state"
"github.com/uber/storagetapper/util"
)
const waitForGtidTimeout = 3600 * time.Second
//Streamer struct defines common properties of Event streamer worker
type Streamer struct {
row *state.Row
topic string
inPipe pipe.Pipe
outPipe pipe.Pipe
outEncoder encoder.Encoder
envEncoder encoder.Encoder
log log.Logger
metrics *metrics.Streamer
BytesWritten int64
BytesRead int64
stateUpdateInterval time.Duration
clusterLock lock.Lock
workerID string
}
func (s *Streamer) ensureChangelogReaderStartLow(prevGTID string, ts time.Time) (string, time.Time, error) {
s.log.WithFields(log.Fields{"prev_gtid": prevGTID}).Debugf("Waiting for Changelog reader to start publishing event")
ticker := time.NewTicker(time.Millisecond * 1000)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tblInfo, err := state.GetTableByID(s.row.ID)
if err != nil {
return "", time.Time{}, err
}
if tblInfo == nil {
return "", time.Time{}, fmt.Errorf("table not found. id=%v", s.row.ID)
}
if tblInfo.Gtid != prevGTID || (prevGTID != "" && ts != tblInfo.GtidUpdatedAt) {
s.log.Debugf("Changelog reader confirmed started from %v, updated at %v", tblInfo.Gtid, tblInfo.GtidUpdatedAt)
return tblInfo.Gtid, tblInfo.GtidUpdatedAt, nil
}
case <-shutdown.InitiatedCh():
return "", time.Time{}, fmt.Errorf("shutdown in progress")
}
}
}
// ensureChangelogReaderStart ensures that Changelog reader worker has started
// publishing to Kafka buffer for this cluster in order to avoid a gap in the
// event stream
func (s *Streamer) ensureChangelogReaderStart() (string, time.Time, error) {
//ensure that changelog started for this cluster
gtid, ts, err := s.ensureChangelogReaderStartLow("", time.Time{})
if err != nil {
return "", time.Time{}, err
}
//ensure that changelog reader sees our possible change of snapshotted_at in
//lockTable
return s.ensureChangelogReaderStartLow(gtid, ts)
}
func (s *Streamer) waitForGtid(svc string, cluster string, sdb string, inputType string, gtid string) bool {
var current mysql.GTIDSet
s.log.Debugf("Waiting for snapshot server to catch up to: %v", gtid)
target, err := mysql.ParseGTIDSet("mysql", gtid)
if log.EL(s.log, err) {
return false
}
conn, err := db.OpenService(&db.Loc{Service: svc, Cluster: cluster, Name: sdb}, "", inputType)
if log.EL(s.log, err) {
return false
}
defer func() { log.EL(s.log, conn.Close()) }()
tickCheck := time.NewTicker(5 * time.Second)
defer tickCheck.Stop()
tickLock := time.NewTicker(s.stateUpdateInterval)
defer tickLock.Stop()
toCh := time.After(waitForGtidTimeout)
for {
err = conn.QueryRow("SELECT @@global.gtid_executed").Scan(>id)
if log.EL(s.log, err) {
return false
}
current, err = mysql.ParseGTIDSet("mysql", gtid)
if log.EL(s.log, err) {
return false
}
if current.Contain(target) {
break
}
select {
case <-toCh:
s.log.WithFields(log.Fields{"server has": current, "we need": target.String()}).Errorf("Timeout waiting snapshot server to catch up")
return false
case <-tickLock.C:
if !state.RefreshTableLock(s.row.ID, s.workerID) || !s.clusterLock.Refresh() {
s.log.Debugf("Lost the lock while waiting for gtid")
return false
}
s.log.WithFields(log.Fields{"server has": current, "we need": target.String()}).Errorf("Still waiting snapshot server to catch up")
case <-shutdown.InitiatedCh():
return false
case <-tickCheck.C:
}
}
s.log.Debugf("Snapshot server at: %v", current)
return true
}
func (s *Streamer) lockTable() bool {
var err error
//Table not found or snapshot is not needed and input doesn't need
//changelog tailing
if s.row == nil || ((!s.row.NeedSnapshot || s.row.Params.NoSnapshot) && changelog.Plugins[s.row.Input] == nil) {
return false
}
//If cluster concurrency is limited try to get our ticket
cc := s.row.Params.ClusterConcurrency
//MySQL 5.7 lock names are limited to 64 character, so we hash possible long
//cluster names here
h := md5.New()
_, err = io.WriteString(h, fmt.Sprintf("%v.%v", s.row.Service, s.row.Cluster))
log.E(err)
if cc != 0 && s.row.NeedSnapshot && !s.clusterLock.TryLockShared(fmt.Sprintf("%x", h.Sum(nil)), cc) {
return false
}
s.outPipe, err = pipe.CacheGet(s.row.Output, &s.row.Params.Pipe, state.GetDB())
return !log.E(err)
}
func (s *Streamer) setupChangelogConsumer(cfg *config.AppConfig) (pipe.Consumer, bool) {
var err error
var consumer pipe.Consumer
if cfg.ChangelogBuffer {
var tn string
tn, err = config.Get().GetChangelogTopicName(s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version, s.row.SnapshottedAt)
if log.EL(s.log, err) {
return nil, false
}
s.log.Debugf("Setting up consumer for buffer topic: %v", tn)
//Consumer MUST be created before snapshotting the table.
//Creating it after may leave a gap in events stream.
consumer, err = s.inPipe.NewConsumer(tn)
if log.EL(s.log, err) {
return nil, false
}
}
var gtid string
gtid, _, err = s.ensureChangelogReaderStart()
if log.E(err) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return nil, false
}
if s.row.NeedSnapshot && !s.row.Params.NoSnapshot && !s.waitForGtid(s.row.Service, s.row.Cluster, s.row.DB, s.row.Input, gtid) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return nil, false
}
return consumer, true
}
func (s *Streamer) start(cfg *config.AppConfig) bool {
var err error
s.clusterLock = lock.Create(state.GetDBAddr())
defer s.clusterLock.Close()
h, _ := os.Hostname()
w := log.GenWorkerID()
s.workerID = h + "." + w
s.row, err = state.GetTableTask(s.workerID, cfg.LockExpireTimeout)
if err != nil {
if util.MySQLError(err, mysql.ER_LOCK_WAIT_TIMEOUT) {
return true
}
log.E(err)
return false
}
if !s.lockTable() {
log.Debugf("Finished streamer: No free tables to work on")
return false
}
s.log = log.WithFields(log.Fields{"worker_id": w, "service": s.row.Service, "db": s.row.DB, "table": s.row.Table, "version": s.row.Version})
s.metrics = metrics.NewStreamerMetrics(s.getTag())
s.log.Debugf("Started event streamer")
s.metrics.NumWorkers.Inc()
defer func() {
if err != nil {
s.metrics.Errors.Inc(1)
}
s.metrics.NumWorkers.Dec()
}()
// Event Streamer worker has successfully acquired a lock on a table. Proceed further
// Each Event Streamer handles events from all partitions from Input buffer for a table
s.topic, err = cfg.GetOutputTopicName(s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version, s.row.SnapshottedAt)
if log.E(err) {
return false
}
s.stateUpdateInterval = cfg.StateUpdateInterval
s.log.Debugf("Will be streaming to output topic: %v", s.topic)
consumer, res := s.setupChangelogConsumer(cfg)
if !res {
return false
}
s.outEncoder, err = encoder.Create(s.row.OutputFormat, s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version)
if log.EL(s.log, err) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return false
}
if s.row.NeedSnapshot && !s.row.Params.NoSnapshot && !s.streamFromConsistentSnapshot(cfg.ThrottleTargetMB, cfg.ThrottleTargetIOPS) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return false
}
s.clusterLock.Close() // ClusterConcurrency is limited for the duration of snapshot only
if cfg.ChangelogBuffer {
//Transit format encoder, aka envelope encoder
//It must be per table to be able to decode schematized events
s.envEncoder, err = encoder.Create(encoder.Internal.Type(), s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version)
if log.EL(s.log, err) {
log.E(consumer.CloseOnFailure())
return false
}
if !s.streamTable(consumer) {
s.metrics.Errors.Inc(1)
}
}
s.log.Debugf("Finished streamer")
return true
}
// Worker : Initializer function
func Worker(cfg *config.AppConfig, inP pipe.Pipe) bool {
s := &Streamer{inPipe: inP}
return s.start(cfg)
}