in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java [2471:2620]
private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAttemptedRegisteredSchema,
boolean failedIncompatibleSchema, long expectedEpoch) {
if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) {
// In this case, the cnx passed to this method is no longer the active connection. This method will get
// called again once the new connection registers the producer with the broker.
log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
+ " {} pending messages since they will deliver using another connection.", topic,
producerName, pendingMessages.messagesCount());
return;
}
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
while (msgIterator.hasNext()) {
OpSendMsg op = msgIterator.next();
if (loopStartAt != null) {
if (op.msg == loopStartAt) {
loopStartAt = null;
} else {
continue;
}
}
if (op.msg != null) {
if (Broken.equals(op.msg.getSchemaState())) {
// "Event 1-1" happens after "Event 3-1-1".
// Maybe user has changed the schema compatibility strategy, will retry to register the new schema.
if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
loopEndDueToSchemaRegisterNeeded = op;
break;
} else {
// This scenario will never happen because the message will be removed from the queue as soon
// as it was set to "schemaState -> Broken".
SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator()
: op.msg.getSchemaInfo();
log.error("[{}] [{}] A message attempts to register new schema, but failed. It should be"
+ " removed from the pending queue but not, which is not expected. {}",
topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
releaseSemaphoreForSendOp(op);
msgIterator.remove();
op.recycle();
continue;
}
} else if (op.msg == latestMsgAttemptedRegisteredSchema && failedIncompatibleSchema
&& op.msg.getSchemaState() == None) {
op.msg.setSchemaState(Broken);
SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator()
: op.msg.getSchemaInfo();
// Event 3-1-1.
// When a schema is incompatible, we need to pause the producer to preserve message order.
// Otherwise, subsequent messages with compatible schemas would be delivered while this message
// remains stuck, causing out-of-order delivery or potential message loss with deduplication.
if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
log.error("[{}] [{}] Publishing paused: message schema incompatible with target cluster."
+ " To resume publishing: 1) Adjust schema compatibility strategy on target cluster"
+ " 2) Unload topic on target cluster. Schema details: {}",
topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
loopEndDueToSchemaRegisterNeeded = op;
break;
}
// Event 3-1-2.
// Give user a failed callback and remove the message from "pendingMessages".
String failedMsg = format("[%s] [%s] incompatible schema %s", topic, producerName,
String.valueOf(msgSchemaInfo));
log.error(failedMsg);
// The messages' release rely on "op.cmd"'s release, we need to initialize "op.cmd" and
// release it to release "msg.payload".
if (op.cmd == null) {
op.rePopulate.run();
}
msgIterator.remove();
ReferenceCountUtil.safeRelease(op.cmd);
try {
// Need to protect ourselves from any exception being thrown in the future handler from the
// application
op.sendComplete(new IncompatibleSchemaException(failedMsg));
} catch (Throwable t) {
log.warn("Got exception while completing the failed publishing: {}", failedMsg, t);
}
releaseSemaphoreForSendOp(op);
op.recycle();
continue;
} else if (op.msg.getSchemaState() == None) {
// Event 1-1.
// There is a message needs to register new schema when flushing pending messages after reconnected.
if (!rePopulateMessageSchema(op.msg)) {
loopEndDueToSchemaRegisterNeeded = op;
break;
}
}
}
// "Event 1-2" or "Event 2-2" or "Event 3-1-2".
if (op.cmd == null) {
checkState(op.rePopulate != null);
op.rePopulate.run();
if (isMessageSizeExceeded(op)) {
continue;
}
}
if (stripChecksum) {
stripChecksum(op);
}
op.cmd.retain();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
cnx.channel(), op.sequenceId);
}
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
}
cnx.ctx().flush();
// "Event 1-1" or "Event 3-1-1" or "Event 3-2".
if (loopEndDueToSchemaRegisterNeeded != null) {
if (compareAndSetState(State.Connecting, State.Ready)) {
// "Event 1-1" happens after "Event 3-1-1".
// After a topic unload, ask the producer retry to register schema, which avoids restart client
// after users changed the compatibility strategy to make the schema is compatible.
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
expectedEpoch);
} else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) {
// "Event 2-1" or "Event 3-2".
// "pendingMessages" has more messages to register new schema.
// This operation will not be conflict with another schema registration because both operations are
// attempt to acquire the same lock "ProducerImpl.this".
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
expectedEpoch);
}
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
return;
} else if (latestMsgAttemptedRegisteredSchema != null) {
// Event 2-2 or "Event 3-1-2".
// Switch state to "Ready" after a successful schema registration.
compareAndSetState(State.RegisteringSchema, State.Ready);
}
// "Event 1-2".
// Change state to "Ready" after reconnected.
if (!changeToReadyState()) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
return;
}
// If any messages were enqueued while the producer was not Ready, we would have skipped
// scheduling the batch flush task. Schedule it now, if there are messages in the batch container.
if (isBatchMessagingEnabled() && !batchMessageContainer.isEmpty()) {
maybeScheduleBatchFlushTask();
}
}