in src/main/java/com/uber/rss/clients/ReplicatedReadClient.java [185:259]
public synchronized TaskDataBlock readDataBlock() {
if (endOfRead) {
return null;
}
// check whether there is current active client, ignore return value
getActiveClient();
while (currentClientIndex < clients.length) {
if (endOfRead) {
return null;
}
// There is some internal state like consumed/read record count, we need to make sure updating the state
// consistently, and throw out error if not. "retriable" flag is to make sure we only retry when the state
// is consistent.
boolean retriable = false;
try {
if (!clientsInitialized[currentClientIndex]) {
// client not initialized, need to initialize it
connectAndInitializeClient();
}
retriable = true;
TaskDataBlock record = clients[currentClientIndex].readDataBlock();
retriable = false;
if (clients.length == 1) {
return record;
}
while (record != null) {
if (shouldSkipReadRecord(record)) {
rememberLastReadRecord(record);
retriable = true;
record = clients[currentClientIndex].readDataBlock();
retriable = false;
continue;
}
rememberLastConsumedRecord(record);
rememberLastReadRecord(record);
return record;
}
// got null record, meaning end of stream
checkRecordDataConsistency();
endOfRead = true;
return null;
} catch (RssInconsistentReplicaException | RssNonRecoverableException ex) {
M3Stats.addException(ex, this.getClass().getSimpleName());
closeClient(currentClientIndex);
throw ex;
} catch (Throwable ex) {
M3Stats.addException(ex, this.getClass().getSimpleName());
closeClient(currentClientIndex);
boolean tryMoreClients = currentClientIndex < clients.length - 1;
if (retriable && tryMoreClients) {
logger.warn(String.format(
"Failed to read after reading %s records in client (current index: %s): %s. Will try next client in the replication group",
numReadRecordsMap.values().stream().mapToLong(t->t).sum(), currentClientIndex, clients[currentClientIndex]),
ex);
currentClientIndex++;
} else if (!tryMoreClients) {
// last client failed, throw out exception
throw ex;
} else {
throw new RssNonRecoverableException("Failed to read records from server replication group: " + serverReplicationGroup, ex);
}
}
}
throw new RssInvalidStateException("Should not execute here!");
}