public void outputEvents()

in nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java [886:1114]


    public void outputEvents(ProcessSession session, ProcessContext context, ComponentLog log) throws IOException {
        RawBinlogEvent rawBinlogEvent;
        DataCaptureState dataCaptureState = currentDataCaptureState.copy();
        final boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
        final boolean includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();

        // Drain the queue
        while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {
            Event event = rawBinlogEvent.getEvent();
            EventHeaderV4 header = event.getHeader();
            long timestamp = header.getTimestamp();
            EventType eventType = header.getEventType();
            // Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume at the event about to be processed.
            // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
            // advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which
            // is filled in during the ROTATE case
            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid()) {
                dataCaptureState.setBinlogPosition(header.getPosition());
            }
            log.debug("Message event, type={} pos={} file={}", eventType, dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
            switch (eventType) {
                case TABLE_MAP:
                    // This is sent to inform which table is about to be changed by subsequent events
                    TableMapEventData data = event.getData();

                    // Should we skip this table? Yes if we've specified a DB or table name pattern and they don't match
                    skipTable = (databaseNamePattern != null && !databaseNamePattern.matcher(data.getDatabase()).matches())
                            || (tableNamePattern != null && !tableNamePattern.matcher(data.getTable()).matches());

                    if (!skipTable) {
                        TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId());
                        binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
                        if (binlogResourceInfo.getCurrentTable() == null) {
                            // We don't have an entry for this table yet, so fetch the info from the database and populate the cache
                            try {
                                binlogResourceInfo.setCurrentTable(loadTableInfo(key));
                                tableInfoCache.put(key, binlogResourceInfo.getCurrentTable());
                            } catch (SQLException se) {
                                // Propagate the error up, so things like rollback and logging/bulletins can be handled
                                throw new IOException(se.getMessage(), se);
                            }
                        }
                    } else {
                        // Clear the current table, to force reload next time we get a TABLE_MAP event we care about
                        binlogResourceInfo.setCurrentTable(null);
                    }
                    break;
                case QUERY:
                    QueryEventData queryEventData = event.getData();
                    binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());

                    String sql = queryEventData.getSql();

                    // Is this the start of a transaction?
                    if ("BEGIN".equals(sql)) {
                        // If we're already in a transaction, something bad happened, alert the user
                        if (binlogResourceInfo.isInTransaction()) {
                            getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
                                    + "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                        }

                        if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
                            beginEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
                                    binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
                        }
                        // Whether we skip this event or not, it's still the beginning of a transaction
                        binlogResourceInfo.setInTransaction(true);

                        // Update inTransaction value to state
                        updateState(session, dataCaptureState);
                    } else if ("COMMIT".equals(sql)) {
                        // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
                        if (!binlogResourceInfo.isInTransaction()) {
                            getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). "
                                    + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state "
                                    + "or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                        }
                        if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
                            commitEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
                                    binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
                        }
                        // Whether we skip this event or not, it's the end of a transaction
                        binlogResourceInfo.setInTransaction(false);
                        updateState(session, dataCaptureState);
                        // If there is no FlowFile open, commit the session
                        if (eventWriterConfiguration.getCurrentFlowFile() == null) {
                            // Commit the NiFi session
                            session.commitAsync();
                        }
                        binlogResourceInfo.setCurrentTable(null);
                        binlogResourceInfo.setCurrentDatabase(null);
                    } else {
                        // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
                        String normalizedQuery = normalizeQuery(sql);

                        if (isQueryDDL(normalizedQuery)) {
                            if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
                                if (queryEventData.getDatabase() == null) {
                                    queryEventData.setDatabase(binlogResourceInfo.getCurrentDatabase());
                                }
                                ddlEventHandler.handleEvent(queryEventData, includeDDLEvents, currentDataCaptureState, binlogResourceInfo,
                                        binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);

                                // The altered table may not be the "active" table, so clear the cache to pick up changes
                                tableInfoCache.clear();
                            }

                            // If not in a transaction, commit the session so the DDL event(s) will be transferred
                            if (includeDDLEvents && !binlogResourceInfo.isInTransaction()) {
                                updateState(session, dataCaptureState);
                                if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) {
                                    if (currentSession != null) {
                                        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
                                        if (flowFile != null && binlogEventState.getCurrentEventWriter() != null) {
                                            // Flush the events to the FlowFile when the processor is stopped
                                            binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(currentSession, eventWriterConfiguration, binlogResourceInfo.getTransitUri(),
                                                    dataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), REL_SUCCESS);
                                        }
                                    }
                                }
                                // If there is no FlowFile open, commit the session
                                if (eventWriterConfiguration.getCurrentFlowFile() == null) {
                                    session.commitAsync();
                                }
                            }
                        }
                    }
                    break;

                case XID:
                    if (!binlogResourceInfo.isInTransaction()) {
                        getLogger().debug("COMMIT (XID) event received at pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN event). "
                                        + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.",
                                dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                    }
                    if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
                        commitEventHandler.handleEvent(event.getData(), includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
                                binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
                    }
                    // Whether we skip this event or not, it's the end of a transaction
                    binlogResourceInfo.setInTransaction(false);
                    dataCaptureState.setBinlogPosition(header.getNextPosition());
                    updateState(session, dataCaptureState);
                    // If there is no FlowFile open, commit the session
                    if (eventWriterConfiguration.getCurrentFlowFile() == null) {
                        // Commit the NiFi session
                        session.commitAsync();
                    }
                    binlogResourceInfo.setCurrentTable(null);
                    binlogResourceInfo.setCurrentDatabase(null);
                    break;

                case WRITE_ROWS:
                case EXT_WRITE_ROWS:
                case PRE_GA_WRITE_ROWS:
                case UPDATE_ROWS:
                case EXT_UPDATE_ROWS:
                case PRE_GA_UPDATE_ROWS:
                case DELETE_ROWS:
                case EXT_DELETE_ROWS:
                case PRE_GA_DELETE_ROWS:
                    // If we are skipping this table, then don't emit any events related to its modification
                    if (skipTable) {
                        break;
                    }
                    if (!binlogResourceInfo.isInTransaction()) {
                        // These events should only happen inside a transaction, warn the user otherwise
                        log.info("Event {} occurred outside of a transaction, which is unexpected.", eventType.name());
                    }
                    if (binlogResourceInfo.getCurrentTable() == null) {
                        // No Table Map event was processed prior to this event, which should not happen, so throw an error
                        throw new RowEventException("No table information is available for this event, cannot process further.");
                    }

                    if (eventType == WRITE_ROWS
                            || eventType == EXT_WRITE_ROWS
                            || eventType == PRE_GA_WRITE_ROWS) {

                        insertEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
                                binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);

                    } else if (eventType == DELETE_ROWS
                            || eventType == EXT_DELETE_ROWS
                            || eventType == PRE_GA_DELETE_ROWS) {

                        deleteEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
                                binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
                    } else {
                        // Update event
                        updateEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
                                binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
                    }
                    break;

                case ROTATE:
                    if (!currentDataCaptureState.isUseGtid()) {
                        // Update current binlog filename
                        RotateEventData rotateEventData = event.getData();
                        dataCaptureState.setBinlogFile(rotateEventData.getBinlogFilename());
                        dataCaptureState.setBinlogPosition(rotateEventData.getBinlogPosition());
                    }
                    updateState(session, dataCaptureState);
                    break;

                case GTID:
                    if (currentDataCaptureState.isUseGtid()) {
                        // Update current binlog gtid
                        GtidEventData gtidEventData = event.getData();
                        MySqlGtid mySqlGtid = gtidEventData.getMySqlGtid();
                        if (mySqlGtid != null) {
                            gtidSet.add(mySqlGtid.toString());
                            dataCaptureState.setGtidSet(gtidSet.toString());
                            updateState(session, dataCaptureState);
                        }
                    }
                    break;

                default:
                    break;
            }

            // Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
            // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
            // advance the position if it is not that type of event.
            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid() && eventType != XID) {
                dataCaptureState.setBinlogPosition(header.getNextPosition());
            }
        }
    }