in nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java [886:1114]
public void outputEvents(ProcessSession session, ProcessContext context, ComponentLog log) throws IOException {
RawBinlogEvent rawBinlogEvent;
DataCaptureState dataCaptureState = currentDataCaptureState.copy();
final boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
final boolean includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
// Drain the queue
while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {
Event event = rawBinlogEvent.getEvent();
EventHeaderV4 header = event.getHeader();
long timestamp = header.getTimestamp();
EventType eventType = header.getEventType();
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume at the event about to be processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which
// is filled in during the ROTATE case
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid()) {
dataCaptureState.setBinlogPosition(header.getPosition());
}
log.debug("Message event, type={} pos={} file={}", eventType, dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be changed by subsequent events
TableMapEventData data = event.getData();
// Should we skip this table? Yes if we've specified a DB or table name pattern and they don't match
skipTable = (databaseNamePattern != null && !databaseNamePattern.matcher(data.getDatabase()).matches())
|| (tableNamePattern != null && !tableNamePattern.matcher(data.getTable()).matches());
if (!skipTable) {
TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId());
binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
if (binlogResourceInfo.getCurrentTable() == null) {
// We don't have an entry for this table yet, so fetch the info from the database and populate the cache
try {
binlogResourceInfo.setCurrentTable(loadTableInfo(key));
tableInfoCache.put(key, binlogResourceInfo.getCurrentTable());
} catch (SQLException se) {
// Propagate the error up, so things like rollback and logging/bulletins can be handled
throw new IOException(se.getMessage(), se);
}
}
} else {
// Clear the current table, to force reload next time we get a TABLE_MAP event we care about
binlogResourceInfo.setCurrentTable(null);
}
break;
case QUERY:
QueryEventData queryEventData = event.getData();
binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
String sql = queryEventData.getSql();
// Is this the start of a transaction?
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad happened, alert the user
if (binlogResourceInfo.isInTransaction()) {
getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
+ "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
beginEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
}
// Whether we skip this event or not, it's still the beginning of a transaction
binlogResourceInfo.setInTransaction(true);
// Update inTransaction value to state
updateState(session, dataCaptureState);
} else if ("COMMIT".equals(sql)) {
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (!binlogResourceInfo.isInTransaction()) {
getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state "
+ "or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
commitEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
}
// Whether we skip this event or not, it's the end of a transaction
binlogResourceInfo.setInTransaction(false);
updateState(session, dataCaptureState);
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
// Commit the NiFi session
session.commitAsync();
}
binlogResourceInfo.setCurrentTable(null);
binlogResourceInfo.setCurrentDatabase(null);
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
String normalizedQuery = normalizeQuery(sql);
if (isQueryDDL(normalizedQuery)) {
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
if (queryEventData.getDatabase() == null) {
queryEventData.setDatabase(binlogResourceInfo.getCurrentDatabase());
}
ddlEventHandler.handleEvent(queryEventData, includeDDLEvents, currentDataCaptureState, binlogResourceInfo,
binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
// The altered table may not be the "active" table, so clear the cache to pick up changes
tableInfoCache.clear();
}
// If not in a transaction, commit the session so the DDL event(s) will be transferred
if (includeDDLEvents && !binlogResourceInfo.isInTransaction()) {
updateState(session, dataCaptureState);
if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) {
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && binlogEventState.getCurrentEventWriter() != null) {
// Flush the events to the FlowFile when the processor is stopped
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(currentSession, eventWriterConfiguration, binlogResourceInfo.getTransitUri(),
dataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), REL_SUCCESS);
}
}
}
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
session.commitAsync();
}
}
}
}
break;
case XID:
if (!binlogResourceInfo.isInTransaction()) {
getLogger().debug("COMMIT (XID) event received at pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.",
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
commitEventHandler.handleEvent(event.getData(), includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
}
// Whether we skip this event or not, it's the end of a transaction
binlogResourceInfo.setInTransaction(false);
dataCaptureState.setBinlogPosition(header.getNextPosition());
updateState(session, dataCaptureState);
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
// Commit the NiFi session
session.commitAsync();
}
binlogResourceInfo.setCurrentTable(null);
binlogResourceInfo.setCurrentDatabase(null);
break;
case WRITE_ROWS:
case EXT_WRITE_ROWS:
case PRE_GA_WRITE_ROWS:
case UPDATE_ROWS:
case EXT_UPDATE_ROWS:
case PRE_GA_UPDATE_ROWS:
case DELETE_ROWS:
case EXT_DELETE_ROWS:
case PRE_GA_DELETE_ROWS:
// If we are skipping this table, then don't emit any events related to its modification
if (skipTable) {
break;
}
if (!binlogResourceInfo.isInTransaction()) {
// These events should only happen inside a transaction, warn the user otherwise
log.info("Event {} occurred outside of a transaction, which is unexpected.", eventType.name());
}
if (binlogResourceInfo.getCurrentTable() == null) {
// No Table Map event was processed prior to this event, which should not happen, so throw an error
throw new RowEventException("No table information is available for this event, cannot process further.");
}
if (eventType == WRITE_ROWS
|| eventType == EXT_WRITE_ROWS
|| eventType == PRE_GA_WRITE_ROWS) {
insertEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
|| eventType == PRE_GA_DELETE_ROWS) {
deleteEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
} else {
// Update event
updateEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
}
break;
case ROTATE:
if (!currentDataCaptureState.isUseGtid()) {
// Update current binlog filename
RotateEventData rotateEventData = event.getData();
dataCaptureState.setBinlogFile(rotateEventData.getBinlogFilename());
dataCaptureState.setBinlogPosition(rotateEventData.getBinlogPosition());
}
updateState(session, dataCaptureState);
break;
case GTID:
if (currentDataCaptureState.isUseGtid()) {
// Update current binlog gtid
GtidEventData gtidEventData = event.getData();
MySqlGtid mySqlGtid = gtidEventData.getMySqlGtid();
if (mySqlGtid != null) {
gtidSet.add(mySqlGtid.toString());
dataCaptureState.setGtidSet(gtidSet.toString());
updateState(session, dataCaptureState);
}
}
break;
default:
break;
}
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event.
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid() && eventType != XID) {
dataCaptureState.setBinlogPosition(header.getNextPosition());
}
}
}