private Entry parseQueryEvent()

in parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java [202:307]


    private Entry parseQueryEvent(QueryLogEvent event, boolean isSeek) {
        String queryString = event.getQuery();
        if (StringUtils.startsWithIgnoreCase(queryString, XA_START)) {
            // xa start use TransactionBegin
            TransactionBegin.Builder beginBuilder = TransactionBegin.newBuilder();
            beginBuilder.setThreadId(event.getSessionId());
            beginBuilder.addProps(createSpecialPair(XA_TYPE, XA_START));
            beginBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_START)));
            TransactionBegin transactionBegin = beginBuilder.build();
            Header header = createHeader(event.getHeader(), "", "", null);
            return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_END)) {
            // xa start use TransactionEnd
            TransactionEnd.Builder endBuilder = TransactionEnd.newBuilder();
            endBuilder.setTransactionId(String.valueOf(0L));
            endBuilder.addProps(createSpecialPair(XA_TYPE, XA_END));
            endBuilder.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_END)));
            TransactionEnd transactionEnd = endBuilder.build();
            Header header = createHeader(event.getHeader(), "", "", null);
            return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_COMMIT)) {
            // xa commit
            Header header = createHeader(event.getHeader(), "", "", EventType.XACOMMIT);
            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
            rowChangeBuider.setSql(queryString);
            rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_COMMIT));
            rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_COMMIT)));
            rowChangeBuider.setEventType(EventType.XACOMMIT);
            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
        } else if (StringUtils.startsWithIgnoreCase(queryString, XA_ROLLBACK)) {
            // xa rollback
            Header header = createHeader(event.getHeader(), "", "", EventType.XAROLLBACK);
            RowChange.Builder rowChangeBuider = RowChange.newBuilder();
            rowChangeBuider.setSql(queryString);
            rowChangeBuider.addProps(createSpecialPair(XA_TYPE, XA_ROLLBACK));
            rowChangeBuider.addProps(createSpecialPair(XA_XID, getXaXid(queryString, XA_ROLLBACK)));
            rowChangeBuider.setEventType(EventType.XAROLLBACK);
            return createEntry(header, EntryType.ROWDATA, rowChangeBuider.build().toByteString());
        } else if (StringUtils.endsWithIgnoreCase(queryString, BEGIN)) {
            TransactionBegin transactionBegin = createTransactionBegin(event.getSessionId());
            Header header = createHeader(event.getHeader(), "", "", null);
            return createEntry(header, EntryType.TRANSACTIONBEGIN, transactionBegin.toByteString());
        } else if (StringUtils.endsWithIgnoreCase(queryString, COMMIT)) {
            TransactionEnd transactionEnd = createTransactionEnd(0L); // MyISAM可能不会有xid事件
            Header header = createHeader(event.getHeader(), "", "", null);
            return createEntry(header, EntryType.TRANSACTIONEND, transactionEnd.toByteString());
        } else {
            boolean notFilter = false;
            EventType type = EventType.QUERY;
            String tableName = null;
            String schemaName = null;
            if (useDruidDdlFilter) {
                List<DdlResult> results = DruidDdlParser.parse(queryString, event.getDbName());
                for (DdlResult result : results) {
                    if (!processFilter(queryString, result)) {
                        // 只要有一个数据不进行过滤
                        notFilter = true;
                    }
                }
                if (results.size() > 0) {
                    // 如果针对多行的DDL,只能取第一条
                    type = results.get(0).getType();
                    schemaName = results.get(0).getSchemaName();
                    tableName = results.get(0).getTableName();
                }
            } else {
                DdlResult result = SimpleDdlParser.parse(queryString, event.getDbName());
                if (!processFilter(queryString, result)) {
                    notFilter = true;
                }

                type = result.getType();
                schemaName = result.getSchemaName();
                tableName = result.getTableName();
            }

            if (!notFilter) {
                // 如果是过滤的数据就不处理了
                return null;
            }

            boolean isDml = (type == EventType.INSERT || type == EventType.UPDATE || type == EventType.DELETE);

            // filterQueryDdl=true的情况下,也得更新tablemeta
            if (!isSeek && !isDml) {
                // 使用新的表结构元数据管理方式
                EntryPosition position = createPosition(event.getHeader());
                tableMetaCache.apply(position, event.getDbName(), queryString, null);
            }

            if (filterQueryDdl) {
                // 全部DDL过滤,那就忽略事件生成
                return null;
            }

            Header header = createHeader(event.getHeader(), schemaName, tableName, type);
            RowChange.Builder rowChangeBuilder = RowChange.newBuilder();
            rowChangeBuilder.setIsDdl(!isDml);
            rowChangeBuilder.setSql(queryString);
            if (StringUtils.isNotEmpty(event.getDbName())) {// 可能为空
                rowChangeBuilder.setDdlSchemaName(event.getDbName());
            }
            rowChangeBuilder.setEventType(type);
            return createEntry(header, EntryType.ROWDATA, rowChangeBuilder.build().toByteString());
        }
    }