in dbsync/src/main/java/com/taobao/tddl/dbsync/binlog/event/QueryLogEvent.java [673:844]
private final void unpackVariables(LogBuffer buffer, final int end) throws IOException {
int code = -1;
try {
while (buffer.position() < end) {
switch (code = buffer.getUint8()) {
case Q_FLAGS2_CODE:
flags2 = buffer.getUint32();
break;
case Q_SQL_MODE_CODE:
sql_mode = buffer.getLong64(); // QQ: Fix when sql_mode
// is ulonglong
break;
case Q_CATALOG_NZ_CODE:
catalog = buffer.getName();
break;
case Q_AUTO_INCREMENT:
autoIncrementIncrement = buffer.getUint16();
autoIncrementOffset = buffer.getUint16();
break;
case Q_CHARSET_CODE:
// Charset: 6 byte character set flag.
// 1-2 = character set client
// 3-4 = collation client
// 5-6 = collation server
clientCharset = buffer.getUint16();
clientCollation = buffer.getUint16();
serverCollation = buffer.getUint16();
break;
case Q_TIME_ZONE_CODE:
timezone = buffer.getName();
break;
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
final int len = buffer.getUint8();
catalog = buffer.getFixString(len + 1);
break;
case Q_LC_TIME_NAMES_CODE:
// lc_time_names_number = buffer.getUint16();
buffer.forward(2);
break;
case Q_CHARSET_DATABASE_CODE:
// charset_database_number = buffer.getUint16();
buffer.forward(2);
break;
case Q_TABLE_MAP_FOR_UPDATE_CODE:
// table_map_for_update = buffer.getUlong64();
buffer.forward(8);
break;
case Q_MASTER_DATA_WRITTEN_CODE:
// data_written = master_data_written =
// buffer.getUint32();
buffer.forward(4);
break;
case Q_INVOKER:
user = buffer.getName();
host = buffer.getName();
break;
case Q_MICROSECONDS:
// when.tv_usec= uint3korr(pos);
tvSec = buffer.getInt24();
break;
case Q_UPDATED_DB_NAMES:
int mtsAccessedDbs = buffer.getUint8();
/**
* Notice, the following check is positive also in case
* of the master's MAX_DBS_IN_EVENT_MTS > the slave's
* one and the event contains e.g the master's
* MAX_DBS_IN_EVENT_MTS db:s.
*/
if (mtsAccessedDbs > MAX_DBS_IN_EVENT_MTS) {
mtsAccessedDbs = OVER_MAX_DBS_IN_EVENT_MTS;
break;
}
String mtsAccessedDbNames[] = new String[mtsAccessedDbs];
for (int i = 0; i < mtsAccessedDbs && buffer.position() < end; i++) {
int length = end - buffer.position();
mtsAccessedDbNames[i] = buffer.getFixName(length < NAME_LEN ? length : NAME_LEN);
}
break;
case Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP:
// thd->variables.explicit_defaults_for_timestamp
buffer.forward(1);
break;
case Q_DDL_LOGGED_WITH_XID:
ddlXid = buffer.getUlong64();
break;
case Q_DEFAULT_COLLATION_FOR_UTF8MB4:
// int2store(start,
// default_collation_for_utf8mb4_number);
buffer.forward(2);
break;
case Q_SQL_REQUIRE_PRIMARY_KEY:
// *start++ = thd->variables.sql_require_primary_key;
buffer.forward(1);
break;
case Q_DEFAULT_TABLE_ENCRYPTION:
// *start++ = thd->variables.default_table_encryption;
buffer.forward(1);
break;
case Q_OPT_FLASHBACK_AREA:
if (compatiablePercona) {
// percona
// *start++ = thd->variables.binlog_ddl_skip_rewrite;
buffer.forward(1);
}else {
// PolarDB-X
// *start++ = thd->variables.opt_flashback_area;
buffer.forward(1);
}
break;
case Q_OPT_INDEX_FORMAT_GPP_ENABLED :
// *start++ = thd->variables.opt_index_format_gpp_enabled;
buffer.forward(1);
break;
case Q_HRNOW:
// https://github.com/alibaba/canal/issues/4940
// percona 和 mariadb各自扩展mysql binlog的格式后有冲突
// 需要精确识别一下数据库类型做兼容处理
if (compatiablePercona) {
// percona 8.0.31
// Q_WSREP_SKIP_READONLY_CHECKS *start++ = 1;
buffer.forward(1);
} else {
// int when_sec_part = buffer.getUint24();
buffer.forward(3);
}
break;
case Q_XID:
// xid= uint8korr(pos);
buffer.forward(8);
break;
case Q_GTID_FLAGS3:
// gtid_flags_extra= *pos++;
// if (gtid_flags_extra & (Gtid_log_event::FL_COMMIT_ALTER_E1 |
// Gtid_log_event::FL_ROLLBACK_ALTER_E1)) {
// sa_seq_no = uint8korr(pos);
// pos+= 8;
// }
int gtid_flags_extra = buffer.getUint8();
final int FL_COMMIT_ALTER_E1= 4;
final int FL_ROLLBACK_ALTER_E1= 8;
if ((gtid_flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))> 0) {
buffer.forward(8);
}
break;
case Q_CHARACTER_SET_COLLATIONS :
// mariadb
int count = buffer.getUint8();
// character_set_collations= Lex_cstring((const char *) pos0 , (const char *) pos);
buffer.forward(count * 4);
break;
case Q_LIZARD_COMMIT_GCN:
// commitGCN = buffer.getLong64();
buffer.forward(8);
break;
case Q_LIZARD_PREPARE_GCN:
// prepareGCN = buffer.getLong64();
buffer.forward(8);
break;
default:
/*
* That's why you must write status vars in growing
* order of code
*/
logger.error("Query_log_event has unknown status vars (first has code: " + code
+ "), skipping the rest of them");
break; // Break loop
}
}
} catch (RuntimeException e) {
throw new IOException("Read " + findCodeName(code) + " error: " + e.getMessage(), e);
}
}