in parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java [202:307]
private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
String queryString = event.getQuery();
if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
// xa start use TransactionBegin
TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
beginBuilder.setThreadId(event.getSessionId());
beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
TransactionBegin transactionBegin = beginBuilder.build();
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
// xa start use TransactionEnd
TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
endBuilder.setTransactionId(String.valueOf(0L));
endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
TransactionEnd transactionEnd = endBuilder.build();
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
// xa commit
Header header = createHeader(event.getHeader(), "", "", EventType.XACOMMIT);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_COMMIT)));
rowChangeBuider.setEventType(EventType.XACOMMIT);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
} else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
// xa rollback
Header header = createHeader(event.getHeader(), "", "", EventType.XAROLLBACK);
RowChange.Builder rowChangeBuider = RowChange.newBuilder();
rowChangeBuider.setSql(queryString);
rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
rowChangeBuider.setEventType(EventType.XAROLLBACK);
return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
} else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
} else if (StringUtils.endsWithIgnoreCase(queryString, COMMIT)) {
TransactionEnd transactionEnd = createTransactionEnd(0L); // MyISAM可能不会有xid事件
Header header = createHeader(event.getHeader(), "", "", null);
return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
} else {
boolean notFilter = false;
EventType type = EventType.QUERY;
String tableName = null;
String schemaName = null;
if (useDruidDdlFilter) {
List<DdlResult> results = DruidDdlParser.parse(queryString, event.getDbName());
for (DdlResult result : results) {
if (!processFilter(queryString, result)) {
// 只要有一个数据不进行过滤
notFilter = true;
}
}
if (results.size() > 0) {
// 如果针对多行的DDL,只能取第一条
type = results.get(0).getType();
schemaName = results.get(0).getSchemaName();
tableName = results.get(0).getTableName();
}
} else {
DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
if (!processFilter(queryString, result)) {
notFilter = true;
}
type = result.getType();
schemaName = result.getSchemaName();
tableName = result.getTableName();
}
if (!notFilter) {
// 如果是过滤的数据就不处理了
return null;
}
boolean isDml = (type == EventType.INSERT || type == EventType.UPDATE || type == EventType.DELETE);
// filterQueryDdl=true的情况下,也得更新tablemeta
if (!isSeek && !isDml) {
// 使用新的表结构元数据管理方式
EntryPosition position = createPosition(event.getHeader());
tableMetaCache.apply(position, event.getDbName(), queryString, null);
}
if (filterQueryDdl) {
// 全部DDL过滤,那就忽略事件生成
return null;
}
Header header = createHeader(event.getHeader(), schemaName, tableName, type);
RowChange.Builder rowChangeBuilder = RowChange.newBuilder();
rowChangeBuilder.setIsDdl(!isDml);
rowChangeBuilder.setSql(queryString);
if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
rowChangeBuilder.setDdlSchemaName(event.getDbName());
}
rowChangeBuilder.setEventType(type);
return createEntry(header, EntryType.ROWDATA, rowChangeBuilder.build().toByteString());
}
}