private void recoverProcessOpSendMsgFrom()

in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java [2471:2620]


    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAttemptedRegisteredSchema,
                                             boolean failedIncompatibleSchema, long expectedEpoch) {
        if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == null) {
            // In this case, the cnx passed to this method is no longer the active connection. This method will get
            // called again once the new connection registers the producer with the broker.
            log.info("[{}][{}] Producer epoch mismatch or the current connection is null. Skip re-sending the "
                            + " {} pending messages since they will deliver using another connection.", topic,
                    producerName, pendingMessages.messagesCount());
            return;
        }
        final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
        Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
        MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
        OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
        while (msgIterator.hasNext()) {
            OpSendMsg op = msgIterator.next();
            if (loopStartAt != null) {
                if (op.msg == loopStartAt) {
                    loopStartAt = null;
                } else {
                    continue;
                }
            }
            if (op.msg != null) {
                if (Broken.equals(op.msg.getSchemaState())) {
                    // "Event 1-1" happens after "Event 3-1-1".
                    // Maybe user has changed the schema compatibility strategy, will retry to register the new schema.
                    if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
                        loopEndDueToSchemaRegisterNeeded = op;
                        break;
                    } else {
                        // This scenario will never happen because the message will be removed from the queue as soon
                        // as it was set to "schemaState -> Broken".
                        SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator()
                                : op.msg.getSchemaInfo();
                        log.error("[{}] [{}] A message attempts to register new schema, but failed. It should be"
                            + " removed from the pending queue but not, which is not expected. {}",
                            topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
                        releaseSemaphoreForSendOp(op);
                        msgIterator.remove();
                        op.recycle();
                        continue;
                    }
                } else if (op.msg == latestMsgAttemptedRegisteredSchema && failedIncompatibleSchema
                        && op.msg.getSchemaState() == None) {
                    op.msg.setSchemaState(Broken);
                    SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? op.msg.getSchemaInfoForReplicator()
                            : op.msg.getSchemaInfo();
                    // Event 3-1-1.
                    // When a schema is incompatible, we need to pause the producer to preserve message order.
                    // Otherwise, subsequent messages with compatible schemas would be delivered while this message
                    // remains stuck, causing out-of-order delivery or potential message loss with deduplication.
                    if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
                        log.error("[{}] [{}] Publishing paused: message schema incompatible with target cluster."
                                + " To resume publishing: 1) Adjust schema compatibility strategy on target cluster"
                                + " 2) Unload topic on target cluster. Schema details: {}",
                                topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
                        loopEndDueToSchemaRegisterNeeded = op;
                        break;
                    }
                    // Event 3-1-2.
                    // Give user a failed callback and remove the message from "pendingMessages".
                    String failedMsg = format("[%s] [%s] incompatible schema %s", topic, producerName,
                            String.valueOf(msgSchemaInfo));
                    log.error(failedMsg);
                    // The messages' release rely on "op.cmd"'s release, we need to initialize "op.cmd" and
                    // release it to release "msg.payload".
                    if (op.cmd == null) {
                        op.rePopulate.run();
                    }
                    msgIterator.remove();
                    ReferenceCountUtil.safeRelease(op.cmd);
                    try {
                        // Need to protect ourselves from any exception being thrown in the future handler from the
                        // application
                        op.sendComplete(new IncompatibleSchemaException(failedMsg));
                    } catch (Throwable t) {
                        log.warn("Got exception while completing the failed publishing: {}", failedMsg, t);
                    }
                    releaseSemaphoreForSendOp(op);
                    op.recycle();
                    continue;
                } else if (op.msg.getSchemaState() == None) {
                    // Event 1-1.
                    // There is a message needs to register new schema when flushing pending messages after reconnected.
                    if (!rePopulateMessageSchema(op.msg)) {
                        loopEndDueToSchemaRegisterNeeded = op;
                        break;
                    }
                }
            }
            // "Event 1-2" or "Event 2-2" or "Event 3-1-2".
            if (op.cmd == null) {
                checkState(op.rePopulate != null);
                op.rePopulate.run();
                if (isMessageSizeExceeded(op)) {
                    continue;
                }
            }
            if (stripChecksum) {
                stripChecksum(op);
            }
            op.cmd.retain();
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
                        cnx.channel(), op.sequenceId);
            }
            cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
            op.updateSentTimestamp();
            stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
        }
        cnx.ctx().flush();

        // "Event 1-1" or "Event 3-1-1" or "Event 3-2".
        if (loopEndDueToSchemaRegisterNeeded != null) {
            if (compareAndSetState(State.Connecting, State.Ready)) {
                // "Event 1-1" happens after "Event 3-1-1".
                // After a topic unload, ask the producer retry to register schema, which avoids restart client
                // after users changed the compatibility strategy to make the schema is compatible.
                tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
                    expectedEpoch);
            } else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) {
                // "Event 2-1" or "Event 3-2".
                // "pendingMessages" has more messages to register new schema.
                // This operation will not be conflict with another schema registration because both operations are
                // attempt to acquire the same lock "ProducerImpl.this".
                tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
                        expectedEpoch);
            }
            // Nothing to do if the event is "Event 3-1-1", just keep stuck.
            return;
        } else if (latestMsgAttemptedRegisteredSchema != null) {
            // Event 2-2 or "Event 3-1-2".
            // Switch state to "Ready" after a successful schema registration.
            compareAndSetState(State.RegisteringSchema, State.Ready);
        }
        // "Event 1-2".
        // Change state to "Ready" after reconnected.
        if (!changeToReadyState()) {
            // Producer was closed while reconnecting, close the connection to make sure the broker
            // drops the producer on its side
            cnx.channel().close();
            return;
        }
        // If any messages were enqueued while the producer was not Ready, we would have skipped
        // scheduling the batch flush task. Schedule it now, if there are messages in the batch container.
        if (isBatchMessagingEnabled() && !batchMessageContainer.isEmpty()) {
            maybeScheduleBatchFlushTask();
        }
    }