changelog/mysql.go (832 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package changelog
import (
"bytes"
"fmt"
"math/rand"
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/gofrs/uuid"
"github.com/uber/storagetapper/config"
"github.com/uber/storagetapper/db"
"github.com/uber/storagetapper/encoder"
"github.com/uber/storagetapper/log"
"github.com/uber/storagetapper/metrics"
"github.com/uber/storagetapper/pipe"
"github.com/uber/storagetapper/pool"
"github.com/uber/storagetapper/schema"
"github.com/uber/storagetapper/shutdown"
"github.com/uber/storagetapper/state"
"github.com/uber/storagetapper/types"
"github.com/uber/storagetapper/util"
"golang.org/x/net/context" //"context"
)
// SeqnoSaveInterval is the number of messages after which seqno gets persisted in state
const SeqnoSaveInterval = 1000000
const eventsBatchSize = 16
type table struct {
id int64
producer pipe.Producer
rawSchema string
schemaGtid string
service string
encoder encoder.Encoder
output string
outputFormat string
params string
version int
noDeletes bool
dead bool
snapshottedAt time.Time
}
type mysqlReader struct {
gtidSet *mysql.MysqlGTIDSet
seqNo uint64
masterCI *db.Addr
tables map[string]map[string]map[string][]*table
numTables int
bufPipe pipe.Pipe
dbl db.Loc
inputType string
ctx context.Context
log log.Logger
tpool pool.Thread
metrics *metrics.ChangelogReader
batchSize int
curGTID replication.GTIDEvent
workerID string
heartbeatTime time.Time
}
func init() {
registerPlugin("mysql", createMySQLReader)
}
func createMySQLReader(c context.Context, _ *config.AppConfig, bp pipe.Pipe,
tp pool.Thread) (Reader, error) {
return &mysqlReader{ctx: c, tpool: tp, bufPipe: bp, inputType: types.InputMySQL, batchSize: eventsBatchSize, heartbeatTime: time.Now()}, nil
}
type queryHandler struct {
regexp string
handler func(*mysqlReader, *replication.QueryEvent, [][]string) bool
compiled *regexp.Regexp
}
var queryHandlers = []queryHandler{
//Handle all four combination of quotes for alter table: `a`.`b`, `a`.b,
//a.`b`, a.b
{regexp: "(?mis)^\\s*(?:/\\*[^\\*]*\\*/)?\\s*alter\\s+table\\s+(?:([^`][^\\.]*)\\.)?(\\w+)\\s+(.+)", handler: handleAlterTable},
{regexp: "(?mis)^\\s*(?:/\\*[^\\*]*\\*/)?\\s*alter\\s+table\\s+(?:`([^`]+)`\\.)?`([^`]+)`\\s+(.+)", handler: handleAlterTable},
{regexp: "(?mis)^\\s*(?:/\\*[^\\*]*\\*/)?\\s*alter\\s+table\\s+([^`][^\\.]*)\\.`([^`]+)`\\s+(.+)", handler: handleAlterTable},
{regexp: "(?mis)^\\s*(?:/\\*[^\\*]*\\*/)?\\s*alter\\s+table\\s+`([^`]+)`\\.(\\w+)\\s+(.+)", handler: handleAlterTable},
//Handle only fully quoted and fully unquoted cases for rename table: `a`.`b`, a.b
{regexp: "(?mi)(^\\s*(?:/\\*[^\\*]*\\*/)?\\s*rename\\s+table\\s+)|(?:(?:\\s*,\\s*)?(?:([^`][^\\.]*)\\.)?(\\w+)\\s+TO\\s+(?:([^`][^\\.]*)\\.)?(\\w+))", handler: handleRenameTable},
{regexp: "(?mi)(^\\s*rename\\s+table\\s+)|(?:(?:\\s*,\\s*)?(?:`([^`]+)`\\.)?`([^`]+)`\\s+TO\\s+(?:`([^`]+)`\\.)?`([^`]+)`)", handler: handleRenameTable},
}
var thisInstanceCluster string
var injectAlterFailure = false
/*ThisInstanceCluster returns the cluster name name this instance's binlog
* mysqlReader is working on. This is used by local streamers to identify tables they
* have to stream*/
func ThisInstanceCluster() string {
return thisInstanceCluster
}
func getTags(cluster string, input string) map[string]string {
return map[string]string{"cluster": cluster, "input": input}
}
func (b *mysqlReader) binlogFormat() string {
var rf string
masterDB, err := db.Open(b.masterCI)
if log.E(err) {
return ""
}
defer func() { log.EL(b.log, masterDB.Close()) }()
err = masterDB.QueryRow("SELECT @@global.binlog_format").Scan(&rf)
if log.E(err) {
return ""
}
if rf == "" {
log.EL(b.log, fmt.Errorf("invalid (empty) binlog format"))
return ""
}
log.Debugf("Master's binlog format: %s", rf)
return rf
}
func (b *mysqlReader) pushSchema(tver []*table) bool {
seqno := b.nextSeqNo()
if seqno == 0 {
log.Errorf("Failed to generate next seqno. Current seqno:%+v", b.seqNo)
return false
}
buffered := config.Get().ChangelogBuffer
for i := 0; i < len(tver); i++ {
t := tver[i]
bd, err := t.encoder.EncodeSchema(seqno)
if log.EL(b.log, err) {
return false
}
if bd == nil {
continue
}
if buffered && encoder.Internal.Type() != t.encoder.Type() {
bd, err = encoder.WrapEvent(t.outputFormat, "", bd, seqno)
if log.EL(b.log, err) {
return false
}
}
err = t.producer.PushSchema("", bd)
if log.E(err) {
return false
}
log.Debugf("Pushed schema for id=%v, seqno=%v", t.id, seqno)
}
return true
}
func (b *mysqlReader) createProducer(tn string, t *state.Row) (pipe.Producer, error) {
var err error
pipe2 := b.bufPipe
format := encoder.Internal.Type()
if !config.Get().ChangelogBuffer {
pipe2, err = pipe.CacheGet(t.Output, &t.Params.Pipe, state.GetDB())
if err != nil {
b.log.WithFields(log.Fields{"service": t.Service, "db": t.DB, "table": t.Table, "pipe": t.Output}).Errorf("%v", err)
return nil, err
}
format = t.OutputFormat
}
p, err := pipe2.NewProducer(tn)
if err != nil {
return nil, err
}
p.SetFormat(format)
return p, nil
}
func (b *mysqlReader) addNewTable(t *state.Row) bool {
b.log.Infof("Adding table to MySQL binlog reader (%v,%v,%v,%v,%v,%v)", t.Service, t.DB, t.Table, t.Output, t.Version, t.OutputFormat)
enc, err := encoder.Create(t.OutputFormat, t.Service, t.DB, t.Table, t.Input, t.Output, t.Version)
if log.EL(b.log, err) {
return false
}
if !schema.HasPrimaryKey(enc.Schema()) {
b.log.Errorf("Table %v doesn't have a primary key. Won't ingest the table", t.Table)
return true
}
pn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.SnapshottedAt)
if log.EL(b.log, err) {
return false
}
p, err := b.createProducer(pn, t)
if log.EL(b.log, err) {
return false
}
nt := &table{t.ID, p, t.RawSchema, t.SchemaGtid, t.Service, enc, t.Output, t.OutputFormat, t.ParamsRaw, t.Version, t.Params.NoDeleteOnUpdate, false, t.SnapshottedAt}
if b.tables[t.DB][t.Table][t.Service] == nil {
b.tables[t.DB][t.Table][t.Service] = make([]*table, 0)
}
b.tables[t.DB][t.Table][t.Service] = append(b.tables[t.DB][t.Table][t.Service], nt)
b.log.Infof("New table added to MySQL binlog reader (%v,%v,%v,%v,%v,%v), will produce to: %v", t.Service, t.DB, t.Table, t.Output, t.Version, t.OutputFormat, pn)
return true
}
func (b *mysqlReader) removeDeletedTables() (count uint) {
var s string
for dbn := range b.tables {
for tbn := range b.tables[dbn] {
for svc := range b.tables[dbn][tbn] {
for i := 0; i < len(b.tables[dbn][tbn][svc]); {
t := b.tables[dbn][tbn][svc][i]
if t.dead {
b.log.Infof("Table with id %v removed from binlog reader", t.id)
err := t.producer.Close()
log.EL(b.log, err)
s := b.tables[dbn][tbn][svc]
s[i] = s[len(s)-1]
s[len(s)-1] = nil
b.tables[dbn][tbn][svc] = s[:len(s)-1]
} else {
t.dead = true
count++
s += fmt.Sprintf("%v,", t.id)
i++
}
}
if len(b.tables[dbn][tbn][svc]) == 0 {
delete(b.tables[dbn][tbn], svc)
}
}
if len(b.tables[dbn][tbn]) == 0 {
delete(b.tables[dbn], tbn)
}
}
if len(b.tables[dbn]) == 0 {
delete(b.tables, dbn)
}
}
b.log.Debugf("Working on %v tables with ids: %s", count, s)
return
}
func (b *mysqlReader) closeTableProducers() {
for dbn, sdb := range b.tables {
for tbn, svc := range sdb {
for svcn, tver := range svc {
for i := 0; i < len(tver); i++ {
b.log.Debugf("Closing producer for service=%v, table=%v", tver[i].service, tver[i].id)
log.EL(b.log, tver[i].producer.Close())
tver[i] = nil
}
delete(b.tables[dbn][tbn], svcn)
}
delete(b.tables[dbn], tbn)
}
delete(b.tables, dbn)
}
}
func findInVersionArray(a []*table, output string, version int) int {
j := 0
for ; j < len(a) && (a[j].version != version || a[j].output != output); j++ {
}
return j
}
func (b *mysqlReader) reloadState() bool {
st, err := state.GetCond("cluster=? AND input='mysql'", b.dbl.Cluster)
if err != nil {
b.log.Errorf("Failed to read state, Error: %v", err.Error())
return false
}
b.log.Debugf("reloadState")
for i := 0; i < len(st); i++ {
t := &st[i]
if b.tables[t.DB] == nil {
b.tables[t.DB] = make(map[string]map[string][]*table)
}
if b.seqNo == 0 {
b.seqNo = t.SeqNo
}
if b.tables[t.DB][t.Table] == nil {
b.tables[t.DB][t.Table] = make(map[string][]*table)
}
if b.tables[t.DB][t.Table][t.Service] == nil {
b.tables[t.DB][t.Table][t.Service] = make([]*table, 0)
}
tver := b.tables[t.DB][t.Table][t.Service]
j := findInVersionArray(tver, t.Output, t.Version)
if j < len(tver) {
/* Table was deleted and inserted again. Reinitialize */
if tver[j].id != t.ID {
err = tver[j].producer.Close()
log.EL(b.log, err)
tver[j] = tver[len(tver)-1]
tver[len(tver)-1] = nil
b.tables[t.DB][t.Table][t.Service] = tver[:len(tver)-1]
} else {
tver[j].dead = false
if t.SnapshotTimeChanged(tver[j].snapshottedAt) {
pn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.SnapshottedAt)
if log.EL(b.log, err) {
return false
}
err = tver[j].producer.Close()
log.EL(b.log, err)
tver[j].producer, err = b.createProducer(pn, t)
if log.EL(b.log, err) {
return false
}
tver[j].snapshottedAt = t.SnapshottedAt
}
}
} else if !b.addNewTable(t) {
return false
}
}
c := b.removeDeletedTables()
b.metrics.NumTablesIngesting.Set(int64(c))
if b.bufPipe.Type() == "local" && b.tpool != nil {
b.tpool.Adjust(c + 1)
}
if c == 0 {
log.Debugf("No tables remaining. Finish binlog reader")
return false
}
b.numTables = int(c)
return true
}
/* Generates next seqno, seqno is used as a logical time in the produced events */
/* Saves seqno in the state every SeqnoSaveInterval */
func (b *mysqlReader) nextSeqNo() uint64 {
b.seqNo++
if b.seqNo%SeqnoSaveInterval == 0 && !b.updateState(false) {
return 0
}
return b.seqNo
}
func (b *mysqlReader) updateState(inc bool) bool {
log.Debugf("Updating state")
if !state.RefreshClusterLock(b.dbl.Cluster, b.workerID) {
return false
}
if !b.reloadState() {
return false
}
/* Skip all seqNo possibly used before restart.*/
if inc {
b.seqNo += SeqnoSaveInterval
}
if log.E(state.SaveBinlogState(b.dbl.Cluster, util.SortedGTIDString(b.gtidSet), b.seqNo)) {
return false
}
if !db.IsValidConn(&b.dbl, db.Slave, b.masterCI, b.inputType) {
return false
}
/*
//Push schema down the stream for all just added tables
for _, d := range b.tables {
for _, t := range d {
if t.justAdded {
log.Debugf("Pushing schema for table: %v", t.id)
if !b.pushSchema(t) {
return false
}
log.Debugf("Pushed schema for table: %v", t.id)
t.justAdded = false
}
}
}
*/
state.EmitRegisteredTablesCount()
b.metrics.NumWorkers.Emit()
log.Debugf("Updating state finished")
return true
}
func gtidToString(v *replication.GTIDEvent) string {
u, err := uuid.FromBytes(v.SID)
s := "error"
if !log.E(err) {
s = u.String()
}
return fmt.Sprintf("%v:%v", s, v.GNO)
}
func (b *mysqlReader) produceRow(tp int, t *table, ts time.Time, row *[]interface{}) error {
var err error
buffered := config.Get().ChangelogBuffer
seqno := b.nextSeqNo()
if seqno == 0 {
return fmt.Errorf("failed to generate next seqno. Current seqno: %+v", b.seqNo)
}
key := encoder.GetRowKey(t.encoder.Schema(), row)
if buffered && b.bufPipe.Type() == "local" {
err = t.producer.PushBatch(key, &types.RowMessage{Type: tp, Key: key, Data: row, SeqNo: seqno, Timestamp: ts})
} else {
var bd []byte
bd, err = t.encoder.Row(tp, row, seqno, ts)
if err != nil {
var masterGTID string
if strings.Contains(err.Error(), "column count mismatch") {
var err1 error
masterGTID, err1 = db.GetCurrentGTID(b.masterCI)
log.E(err1)
}
b.log.WithFields(log.Fields{"state gtid": util.SortedGTIDString(b.gtidSet), "event gtid": gtidToString(&b.curGTID), "master gtid": masterGTID, "table": t.id}).Errorf(err.Error())
return err
}
if !buffered {
key = t.producer.PartitionKey("log", key)
} else if t.encoder.Type() != encoder.Internal.Type() {
bd, err = encoder.WrapEvent(t.outputFormat, key, bd, seqno)
if log.EL(b.log, err) {
return err
}
}
err = t.producer.PushBatch(key, bd)
b.metrics.BytesWritten.Inc(int64(len(bd)))
}
//log.Debugf("Pushed to buffer. seqno=%v, table=%v", seqno, t.id)
if shutdown.Initiated() {
return nil
}
if err != nil {
b.log.Errorf("Type: %v, Error: %v", tp, err.Error())
return err
}
b.metrics.ChangelogRowEventsWritten.Inc(1)
return nil
}
func (b *mysqlReader) handleRowsEventLow(ev *replication.BinlogEvent, t *table) bool {
var err error
re := ev.Event.(*replication.RowsEvent)
ts := time.Unix(int64(ev.Header.Timestamp), 0)
/*
bb := new(bytes.Buffer)
ev.Dump(bb)
fmt.Fprintf(os.Stderr, "Handle rows event %+v", bb.String())
*/
switch ev.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
//TODO: Produce as a batch
for i := 0; i < len(re.Rows) && err == nil; i++ {
err = b.produceRow(types.Insert, t, ts, &re.Rows[i])
}
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
for i := 0; i < len(re.Rows) && err == nil; i++ {
err = b.produceRow(types.Delete, t, ts, &re.Rows[i])
}
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
for i := 0; i < len(re.Rows) && err == nil; i += 2 {
if !t.noDeletes && !strings.HasSuffix(t.outputFormat, "_idempotent") {
err = b.produceRow(types.Delete, t, ts, &re.Rows[i])
}
if err == nil {
err = b.produceRow(types.Insert, t, ts, &re.Rows[i+1])
}
}
default:
err = fmt.Errorf("not supported event type %v", ev.Header.EventType)
}
return !log.E(err)
}
func (b *mysqlReader) handleRowsEvent(ev *replication.BinlogEvent, dbn string, tbn string) bool {
if b.tables[dbn] == nil || b.tables[dbn][tbn] == nil {
return true
}
//Produce event to all outputs and versions of the table
for _, svc := range b.tables[dbn][tbn] {
if svc == nil {
continue
}
for ver := range svc {
if !b.handleRowsEventLow(ev, svc[ver]) {
return false
}
}
}
return true
}
func handleAlterTable(b *mysqlReader, qe *replication.QueryEvent, m [][]string) bool {
//Make sure that we have up to date state before deciding whether this
// alter table is for being ingested table or not
if !b.updateState(false) {
return false
}
dbname := m[0][1]
table := m[0][2]
if dbname == "" {
dbname = util.BytesToString(qe.Schema)
}
d := b.tables[dbname]
if d != nil && d[table] != nil {
svc := d[table]
b.log.WithFields(log.Fields{"db": dbname, "table": table, "alter": m[0][3]}).Debugf("detected alter statement of being ingested table")
b.metrics.ChangelogAlterTableEvents.Inc(1)
if strings.Contains(strings.ToLower(m[0][3]), " foreign key ") {
log.Debugf("Skipping foreign key alter")
return true
}
if injectAlterFailure {
log.Errorf("Exited due to failure injection")
return false
}
for _, tver := range svc {
for i := range tver {
t := tver[i]
newGtid := util.SortedGTIDString(b.gtidSet)
if !schema.MutateTable(state.GetNoDB(), t.service, dbname, table, m[0][3], t.encoder.Schema(), &t.rawSchema) ||
!state.ReplaceSchema(t.service, b.dbl.Cluster, t.encoder.Schema(), t.rawSchema, t.schemaGtid, newGtid, b.inputType, t.output, t.version, t.outputFormat, t.params) {
b.log.WithFields(log.Fields{"db": dbname, "table": table, "alter": m[0][3]}).Warnf("error executing alter table")
return false
}
t.schemaGtid = newGtid
err := t.encoder.UpdateCodec()
if log.EL(b.log, err) {
return false
}
log.Debugf("Updated codec. id=%v", t.id)
}
if !b.pushSchema(tver) {
return false
}
b.metrics.ChangelogQueryEventsWritten.Inc(int64(len(tver)))
}
return true
}
b.log.WithFields(log.Fields{"query": util.BytesToString(qe.Query), "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query (alter). cluster: " + b.dbl.Cluster)
return true
}
func handleRenameTable(b *mysqlReader, qe *replication.QueryEvent, m [][]string) bool {
s := util.BytesToString(qe.Query)
renameRE := regexp.MustCompile(`(?i)(^\s*(?:/\*[^\*]*\*/)?\s*rename\s+table\s+)`)
r := renameRE.FindAllStringSubmatch(s, -1)
if len(r) == 0 || len(m) < 2 {
b.log.WithFields(log.Fields{"query": util.BytesToString(qe.Query), "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query (rename). cluster: " + b.dbl.Cluster)
return true
}
for _, t := range m {
dbname := t[4]
table := t[5]
if dbname == "" {
dbname = util.BytesToString(qe.Schema)
}
d := b.tables[dbname]
if d != nil && len(d[table]) > 0 {
svc := d[table]
b.log.WithFields(log.Fields{"db": dbname, "table": table, "rename": t[0]}).Debugf("detected rename statement of being ingested table")
b.metrics.ChangelogAlterTableEvents.Inc(1)
for _, tver := range svc {
newGtid := util.SortedGTIDString(b.gtidSet)
ts, rs := state.PullCurrentSchema(&db.Loc{Service: tver[0].service, Cluster: b.dbl.Cluster, Name: dbname}, table, types.InputMySQL)
if ts == nil {
return false
}
for i := range tver {
t := tver[i]
t.rawSchema = rs
ets := t.encoder.Schema()
ets.DBName = ts.DBName
ets.TableName = ts.TableName
ets.Columns = ts.Columns
if !state.ReplaceSchema(t.service, b.dbl.Cluster, t.encoder.Schema(), t.rawSchema, t.schemaGtid, newGtid, b.inputType, t.output, t.version, t.outputFormat, t.params) {
return false
}
t.schemaGtid = newGtid
err := t.encoder.UpdateCodec()
if log.EL(b.log, err) {
return false
}
log.Debugf("Updated codec. id=%v", t.id)
}
if !b.pushSchema(tver) {
return false
}
b.metrics.ChangelogQueryEventsWritten.Inc(int64(len(tver)))
}
return true
}
}
return true
}
func (b *mysqlReader) handleQueryEvent(ev *replication.BinlogEvent) bool {
qe := ev.Event.(*replication.QueryEvent)
s := util.BytesToString(qe.Query)
if s == "BEGIN" || s == "COMMIT" {
return true
}
if strings.HasPrefix(s, "UPDATE `heartbeat`.`heartbeat`") || strings.HasPrefix(s, "FLUSH ") {
return true
}
b.log.Debugf("handleQueryEvent %+v", s)
matched := false
for _, v := range queryHandlers {
m := v.compiled.FindAllStringSubmatch(s, -1)
if len(m) > 0 {
matched = true
b.log.Debugf("Match result: %q", m)
if !v.handler(b, qe, m) {
return false
}
}
}
if !matched {
b.log.WithFields(log.Fields{"query": s, "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query. cluster: " + b.dbl.Cluster)
}
return true
}
func (b *mysqlReader) incGTID(v *replication.GTIDEvent) bool {
if b.curGTID.SID == nil {
b.curGTID = *v
return true
}
u, err := uuid.FromBytes(b.curGTID.SID)
if log.E(err) {
return false
}
if s, ok := b.gtidSet.Sets[u.String()]; ok {
l := &s.Intervals[len(s.Intervals)-1]
if l.Stop == b.curGTID.GNO {
l.Stop++
b.curGTID = *v
return true
}
b.log.Infof("non-sequential gtid GNO: %+v %+v", l.Stop, b.curGTID.GNO)
}
gtid := fmt.Sprintf("%s:%d", u.String(), b.curGTID.GNO)
us, err := mysql.ParseUUIDSet(gtid)
if log.E(err) {
return false
}
b.gtidSet.AddSet(us)
b.log.WithFields(log.Fields{"gtid": gtid, "out_gtid_set": util.SortedGTIDString(b.gtidSet)}).Debugf("non-sequential gtid event")
b.curGTID = *v
return true
}
func (b *mysqlReader) handleEvent(ev *replication.BinlogEvent) bool {
if ev.Header.Timestamp != 0 {
b.metrics.TimeToEncounter.Record(time.Duration(time.Now().Unix()-int64(ev.Header.Timestamp)) * time.Second)
}
switch v := ev.Event.(type) {
case *replication.FormatDescriptionEvent:
b.log.Infof("ServerVersion: %+v, BinlogFormatVersion: %+v, ChecksumAlgorithm: %+v", util.BytesToString(v.ServerVersion), v.Version, v.ChecksumAlgorithm)
case *replication.RowsEvent:
if !b.handleRowsEvent(ev, util.BytesToString(v.Table.Schema), util.BytesToString(v.Table.Table)) {
return false
}
case *replication.QueryEvent:
if !b.handleQueryEvent(ev) {
return false
}
case *replication.GTIDEvent:
if !b.incGTID(v) {
return false
}
case *replication.TableMapEvent:
//It's already in RowsEvent, not need to handle separately
case *replication.XIDEvent:
//ignoring
default:
if ev.Header.EventType != replication.HEARTBEAT_EVENT {
b.metrics.ChangelogUnhandledEvents.Inc(1)
bb := new(bytes.Buffer)
ev.Dump(bb)
b.log.Debugf("Unhandled binlog event: %+v", bb.String())
}
}
return true
}
type result struct {
ev *replication.BinlogEvent
err error
}
func (b *mysqlReader) commitBatch() bool {
//TODO: Commit only tables which had data in this batch
w := b.metrics.ProduceLatency
w.Start()
for _, sdb := range b.tables {
for _, svc := range sdb {
for _, tver := range svc {
for i := 0; i < len(tver); i++ {
err := tver[i].producer.PushBatchCommit()
if log.EL(b.log, err) {
w.Stop()
return false
}
}
}
}
}
w.Stop()
return true
}
func (b *mysqlReader) processBatch(msg *result, msgCh chan *result) bool {
var i int
L:
for {
if msg.err != nil {
merr, ok := msg.err.(*mysql.MyError)
if ok && merr.Code == 1236 {
serverGtid, err := db.GetCurrentGTID(b.masterCI)
if err != nil {
b.log.Errorf("Error fetchching gtid from server: %v", err)
}
purgedGtid, err := db.GetPurgedGTID(b.masterCI)
if err != nil {
b.log.Errorf("Error fetchching purged gtid from server: %v", err)
}
b.log.WithFields(log.Fields{"my_gtid_set": strings.Replace(util.SortedGTIDString(b.gtidSet), ",", ",\n", -1), "server_gtid_set": serverGtid, "purged_gtid": purgedGtid, "host": b.masterCI.Host, "port": b.masterCI.Port, "user": b.masterCI.User}).Errorf("Error connecting to master: %v", merr)
return false
} else if msg.err.Error() != "context canceled" {
b.log.Errorf("BinlogReadEvents: %v", msg.err.Error())
return false
}
break L //Shutting down, let it commit current batch
}
if !b.handleEvent(msg.ev) {
return false
}
i++
//TODO: Unify with streamer logic. Extract MaxBatchSize from per table
//pipe config
if i >= b.batchSize*b.numTables {
break
}
select {
case msg = <-msgCh:
default:
break L //No messages for now, break the loop and commit whatever we pushed to the batch already
}
}
b.heartbeatTime = time.Now()
b.metrics.EventsRead.Inc(int64(i))
b.metrics.BatchSize.Record(time.Duration(i) * time.Millisecond)
return b.commitBatch()
}
//eventFetcher is blocking call to buffered channel converter
func (b *mysqlReader) eventFetcher(ctx context.Context, s *replication.BinlogStreamer, wg *sync.WaitGroup, msgCh chan *result, exitCh chan bool) {
defer wg.Done()
L:
for {
msg := &result{}
msg.ev, msg.err = s.GetEvent(ctx)
select {
case msgCh <- msg:
if msg.err != nil && msg.err.Error() != "context canceled" {
break L
}
case <-exitCh:
break L
}
}
b.log.Debugf("Finished MySQL binlog reader helper goroutine")
}
func (b *mysqlReader) fetchFirstEvent(stateUpdateTicker *time.Ticker, watchdogTicker *time.Ticker, msgCh chan *result) (bool, *result) {
defer b.metrics.ReadLatency.Start().Stop()
select {
case <-watchdogTicker.C:
if b.heartbeatTime.Add(config.Get().ChangelogWatchdogInterval).After(time.Now()) {
return true, nil
}
b.log.Errorf("Watchdog expired")
case <-stateUpdateTicker.C:
if b.updateState(false) {
return true, nil
}
case msg, ok := <-msgCh:
return ok, msg
case <-shutdown.InitiatedCh():
b.incGTID(&replication.GTIDEvent{})
g := util.SortedGTIDString(b.gtidSet)
if !log.EL(b.log, state.SaveBinlogState(b.dbl.Cluster, g, b.seqNo)) {
b.log.WithFields(log.Fields{"gtid": g, "SeqNo": b.seqNo}).Infof("Binlog state saved")
return false, nil
}
}
b.metrics.Errors.Inc(1)
return false, nil
}
func (b *mysqlReader) readEvents(c *db.Addr, stateUpdateInterval time.Duration) {
id := rand.Uint32()
for id == 0 {
id = rand.Uint32()
}
cfg := replication.BinlogSyncerConfig{
ServerID: id,
Flavor: "mysql",
Host: c.Host,
Port: c.Port,
User: c.User,
Password: c.Pwd,
ParseTime: true,
}
syncer := replication.NewBinlogSyncer(cfg)
streamer, err := syncer.StartSyncGTID(b.gtidSet.Clone())
if log.EL(b.log, err) {
b.metrics.Errors.Inc(1)
return
}
defer syncer.Close()
stateUpdateTicker := time.NewTicker(stateUpdateInterval)
defer stateUpdateTicker.Stop()
watchdogTicker := time.NewTicker(config.Get().ChangelogWatchdogInterval)
defer watchdogTicker.Stop()
defer b.closeTableProducers()
if !b.updateState(true) {
b.metrics.Errors.Inc(1)
return
}
b.log.WithFields(log.Fields{"gtid": util.SortedGTIDString(b.gtidSet), "SeqNo": b.seqNo}).Infof("Binlog start")
msgCh, exitCh := make(chan *result, b.batchSize), make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
defer func() { cancel(); close(exitCh); wg.Wait() }()
/*This goroutine is to multiplex blocking streamer.GetEvent and tickCh*/
go b.eventFetcher(ctx, streamer, &wg, msgCh, exitCh)
for {
more, msg := b.fetchFirstEvent(stateUpdateTicker, watchdogTicker, msgCh)
if !more {
break
}
if msg != nil && !b.processBatch(msg, msgCh) {
b.metrics.Errors.Inc(1)
break
}
}
b.log.Debugf("Finishing MySQL binlog reader")
}
func (b *mysqlReader) start(cfg *config.AppConfig) bool {
var err error
h, _ := os.Hostname()
w := log.GenWorkerID()
b.workerID = h + "." + w
b.dbl.Service, b.dbl.Cluster, b.dbl.Name, err = state.GetClusterTask(types.InputMySQL, b.workerID, cfg.LockExpireTimeout)
if err != nil || b.dbl.Service == "" {
return false
}
b.metrics = metrics.NewChangelogReaderMetrics(getTags(b.dbl.Cluster, b.inputType))
b.metrics.NumWorkers.Inc()
defer func() {
if err != nil {
b.metrics.Errors.Inc(1)
}
b.metrics.NumWorkers.Dec()
}()
thisInstanceCluster = b.dbl.Cluster
b.log = log.WithFields(log.Fields{"worker_id": w, "cluster": b.dbl.Cluster})
b.log.Infof("Starting MySQL binlog reader")
// Get slave's connection info. Connecting to slave guarantees that we will connect to DC local slave
b.masterCI, err = db.GetConnInfo(&b.dbl, db.Slave, b.inputType)
if err != nil {
return true
}
rf := b.binlogFormat()
if rf != "ROW" {
if rf != "" {
b.log.Errorf("binlog format is %s. row binlog format required. skipping", rf)
}
return true
}
b.tables = make(map[string]map[string]map[string][]*table)
for i := 0; i < len(queryHandlers); i++ {
queryHandlers[i].compiled = regexp.MustCompile(queryHandlers[i].regexp)
}
// Start reading binlogs from the gtid set saved in the state
var gtid string
gtid, _, err = state.GetGTID(b.dbl.Cluster)
if log.EL(b.log, err) {
return true
}
/* If no gtid in the state get current gtid set from master */
if gtid == "" {
gtid, err = db.GetCurrentGTID(b.masterCI)
if err != nil {
return true
}
err = state.SetGTID(b.dbl.Cluster, gtid)
if err != nil {
b.log.Errorf("Error saving gtid. gtid %v. Error: %v", gtid, err.Error())
}
}
var s mysql.GTIDSet
s, err = mysql.ParseMysqlGTIDSet(gtid)
if err != nil {
b.log.Errorf("Invalid gtid: '%v' Error: %v", gtid, err.Error())
return true
}
b.gtidSet = s.(*mysql.MysqlGTIDSet)
b.readEvents(b.masterCI, cfg.StateUpdateInterval)
b.log.Infof("MySQL Binlog reader finished")
return true
}
func (b *mysqlReader) Worker() bool {
return b.start(config.Get())
}