in plugins/input/mysqlbinlog/mysql_binlog_helper.go [28:152]
func binLogEventToSlsLog(event *replication.BinlogEvent, logData map[string]string, collector pipeline.Collector) error {
switch event.Header.EventType {
case replication.ROTATE_EVENT:
if rotateEvent, cvt := event.Event.(*replication.RotateEvent); cvt {
logData["rotate_file"] = string(rotateEvent.NextLogName)
logData["start_position"] = strconv.Itoa(int(rotateEvent.Position))
}
case replication.FORMAT_DESCRIPTION_EVENT:
if descEvent, ok := event.Event.(*replication.FormatDescriptionEvent); ok {
logData["version"] = strconv.Itoa(int(descEvent.Version))
logData["server"] = string(descEvent.ServerVersion)
logData["check_sum_algorithm"] = strconv.Itoa(int(descEvent.ChecksumAlgorithm))
}
case replication.QUERY_EVENT:
if qEvent, ok := event.Event.(*replication.QueryEvent); ok {
logData["slave_proxy"] = strconv.Itoa(int(qEvent.SlaveProxyID))
logData["execution_time"] = strconv.Itoa(int(qEvent.ExecutionTime))
logData["error_code"] = strconv.Itoa(int(qEvent.ErrorCode))
logData["schema"] = string(qEvent.Schema)
logData["query"] = string(qEvent.Query)
if qEvent.GSet != nil {
logData["gtid_set"] = qEvent.GSet.String()
}
}
case replication.XID_EVENT:
if e, ok := event.Event.(*replication.XIDEvent); ok {
logData["xid"] = strconv.Itoa(int(e.XID))
if e.GSet != nil {
logData["gtid_set"] = e.GSet.String()
}
}
case replication.TABLE_MAP_EVENT:
if e, ok := event.Event.(*replication.TableMapEvent); ok {
logData["table_id"] = strconv.Itoa(int(e.TableID))
logData["table"] = string(e.Table)
logData["flags"] = strconv.Itoa(int(e.Flags))
logData["schema"] = string(e.Schema)
logData["column_count"] = strconv.Itoa(int(e.ColumnCount))
logData["column_type_hex"] = hex.EncodeToString(e.ColumnType)
}
case replication.WRITE_ROWS_EVENTv0,
replication.UPDATE_ROWS_EVENTv0,
replication.DELETE_ROWS_EVENTv0,
replication.WRITE_ROWS_EVENTv1,
replication.DELETE_ROWS_EVENTv1,
replication.UPDATE_ROWS_EVENTv1,
replication.WRITE_ROWS_EVENTv2,
replication.UPDATE_ROWS_EVENTv2,
replication.DELETE_ROWS_EVENTv2:
if e, ok := event.Event.(*replication.RowsEvent); ok {
if e.Table != nil {
logData["table"] = string(e.Table.Table)
}
logData["table_id"] = strconv.Itoa(int(e.TableID))
logData["flags"] = strconv.Itoa(int(e.Flags))
logData["column_count"] = strconv.Itoa(int(e.ColumnCount))
for i, rows := range e.Rows {
if i > 10 {
break
}
iStr := strconv.Itoa(i) + "_"
for j, d := range rows {
if _, ok := d.([]byte); ok {
logData[iStr+strconv.Itoa(j)] = fmt.Sprintf("%q", d)
} else {
logData[iStr+strconv.Itoa(j)] = fmt.Sprintf("%#v", d)
}
}
}
}
case replication.ROWS_QUERY_EVENT:
if e, ok := event.Event.(*replication.RowsQueryEvent); ok {
logData["query"] = string(e.Query)
}
case replication.GTID_EVENT:
if e, ok := event.Event.(*replication.GTIDEvent); ok {
logData["sid"] = string(e.SID)
logData["gno"] = strconv.Itoa(int(e.GNO))
logData["commit_flag"] = strconv.Itoa(int(e.CommitFlag))
}
case replication.BEGIN_LOAD_QUERY_EVENT:
if e, ok := event.Event.(*replication.BeginLoadQueryEvent); ok {
logData["file_id"] = strconv.Itoa(int(e.FileID))
logData["block_data"] = string(e.BlockData)
}
case replication.EXECUTE_LOAD_QUERY_EVENT:
if e, ok := event.Event.(*replication.ExecuteLoadQueryEvent); ok {
logData["dum_handling_flags"] = strconv.Itoa(int(e.DupHandlingFlags))
logData["end_pos"] = strconv.Itoa(int(e.EndPos))
logData["error_code"] = strconv.Itoa(int(e.ErrorCode))
logData["execution_time"] = strconv.Itoa(int(e.ExecutionTime))
logData["file_id"] = strconv.Itoa(int(e.FileID))
logData["schema_length"] = strconv.Itoa(int(e.SchemaLength))
logData["slave_proxy_id"] = strconv.Itoa(int(e.SlaveProxyID))
logData["start_pos"] = strconv.Itoa(int(e.StartPos))
logData["status_vars"] = strconv.Itoa(int(e.StatusVars))
}
case replication.MARIADB_ANNOTATE_ROWS_EVENT:
if e, ok := event.Event.(*replication.MariadbAnnotateRowsEvent); ok {
logData["query"] = string(e.Query)
}
case replication.MARIADB_BINLOG_CHECKPOINT_EVENT:
if e, ok := event.Event.(*replication.MariadbBinlogCheckPointEvent); ok {
logData["info"] = string(e.Info)
}
case replication.MARIADB_GTID_LIST_EVENT:
if e, ok := event.Event.(*replication.MariadbGTIDListEvent); ok {
for i, gtids := range e.GTIDs {
logData["gtid_"+strconv.Itoa(i)] = gtids.String()
}
}
case replication.MARIADB_GTID_EVENT:
if e, ok := event.Event.(*replication.MariadbGTIDEvent); ok {
logData["gtid"] = e.GTID.String()
}
default:
if e, ok := event.Event.(*replication.GenericEvent); ok {
logData["data"] = hex.EncodeToString(e.Data)
}
}
collector.AddData(nil, logData, time.Unix(int64(event.Header.Timestamp), 0))
return nil
}