in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java [211:242]
private TransactionalRequestResult enqueueNewPartitions() {
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
Object newPartitionsInTransaction =
getField(transactionManager, "newPartitionsInTransaction");
Object newPartitionsInTransactionIsEmpty =
invoke(newPartitionsInTransaction, "isEmpty");
TransactionalRequestResult result;
if (newPartitionsInTransactionIsEmpty instanceof Boolean
&& !((Boolean) newPartitionsInTransactionIsEmpty)) {
Object txnRequestHandler =
invoke(transactionManager, "addPartitionsToTransactionHandler");
invoke(
transactionManager,
"enqueueRequest",
new Class[] {txnRequestHandler.getClass().getSuperclass()},
new Object[] {txnRequestHandler});
result =
(TransactionalRequestResult)
getField(
txnRequestHandler,
txnRequestHandler.getClass().getSuperclass(),
"result");
} else {
// we don't have an operation but this operation string is also used in
// addPartitionsToTransactionHandler.
result = new TransactionalRequestResult("AddPartitionsToTxn");
result.done();
}
return result;
}
}