in modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/Replicator.java [1250:1373]
static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
if (id == null) {
return;
}
final long startTimeMs = Utils.nowMs();
Replicator r;
if ((r = (Replicator) id.lock()) == null) {
return;
}
if (stateVersion != r.version) {
LOG.debug(
"Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.",
r, stateVersion, r.version, request, response, status);
id.unlock();
return;
}
final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.setState(State.Probe);
r.sendProbeRequest();
return;
}
boolean continueSendEntries = false;
final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Replicator ") //
.append(r) //
.append(" is processing RPC responses, ");
}
try {
int processed = 0;
while (!holdingQueue.isEmpty()) {
final RpcResponse queuedPipelinedResponse = holdingQueue.peek();
// Sequence mismatch, waiting for next response.
if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
if (processed > 0) {
if (isLogDebugEnabled) {
sb.append("has processed ") //
.append(processed) //
.append(" responses, ");
}
break;
}
else {
// Do not processed any responses, UNLOCK id and return.
continueSendEntries = false;
id.unlock();
return;
}
}
holdingQueue.remove();
processed++;
final Inflight inflight = r.pollInflight();
if (inflight == null) {
// The previous in-flight requests were cleared.
if (isLogDebugEnabled) {
sb.append("ignore response because request not found: ") //
.append(queuedPipelinedResponse) //
.append(",\n");
}
continue;
}
if (inflight.seq != queuedPipelinedResponse.seq) {
// reset state
LOG.warn(
"Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.setState(State.Probe);
continueSendEntries = false;
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
}
try {
switch (queuedPipelinedResponse.requestType) {
case AppendEntries:
continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
(AppendEntriesRequest) queuedPipelinedResponse.request,
(AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
break;
case Snapshot:
continueSendEntries = onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status,
(InstallSnapshotRequest) queuedPipelinedResponse.request,
(InstallSnapshotResponse) queuedPipelinedResponse.response);
break;
}
}
finally {
if (continueSendEntries) {
// Success, increase the response sequence.
r.getAndIncrementRequiredNextSeq();
}
else {
// The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out.
break;
}
}
}
}
finally {
if (isLogDebugEnabled) {
sb.append("after processed, continue to send entries: ") //
.append(continueSendEntries);
LOG.debug(sb.toString());
}
if (continueSendEntries) {
// unlock in sendEntries.
r.sendEntries();
}
}
}