public synchronized TaskDataBlock readDataBlock()

in src/main/java/com/uber/rss/clients/ReplicatedReadClient.java [185:259]


  public synchronized TaskDataBlock readDataBlock() {
    if (endOfRead) {
      return null;
    }

    // check whether there is current active client, ignore return value
    getActiveClient();

    while (currentClientIndex < clients.length) {
      if (endOfRead) {
        return null;
      }

      // There is some internal state like consumed/read record count, we need to make sure updating the state
      // consistently, and throw out error if not. "retriable" flag is to make sure we only retry when the state
      // is consistent.
      boolean retriable = false;

      try {
        if (!clientsInitialized[currentClientIndex]) {
          // client not initialized, need to initialize it
          connectAndInitializeClient();
        }

        retriable = true;
        TaskDataBlock record = clients[currentClientIndex].readDataBlock();
        retriable = false;

        if (clients.length == 1) {
          return record;
        }

        while (record != null) {
          if (shouldSkipReadRecord(record)) {
            rememberLastReadRecord(record);
            retriable = true;
            record = clients[currentClientIndex].readDataBlock();
            retriable = false;
            continue;
          }

          rememberLastConsumedRecord(record);
          rememberLastReadRecord(record);
          return record;
        }

        // got null record, meaning end of stream
        checkRecordDataConsistency();
        endOfRead = true;
        return null;
      } catch (RssInconsistentReplicaException | RssNonRecoverableException ex) {
        M3Stats.addException(ex, this.getClass().getSimpleName());
        closeClient(currentClientIndex);
        throw ex;
      } catch (Throwable ex) {
        M3Stats.addException(ex, this.getClass().getSimpleName());
        closeClient(currentClientIndex);
        boolean tryMoreClients = currentClientIndex < clients.length - 1;
        if (retriable && tryMoreClients) {
          logger.warn(String.format(
              "Failed to read after reading %s records in client (current index: %s): %s. Will try next client in the replication group",
              numReadRecordsMap.values().stream().mapToLong(t->t).sum(), currentClientIndex, clients[currentClientIndex]),
              ex);
          currentClientIndex++;
        } else if (!tryMoreClients) {
          // last client failed, throw out exception
          throw ex;
        } else {
          throw new RssNonRecoverableException("Failed to read records from server replication group: " + serverReplicationGroup, ex);
        }
      }
    }

    throw new RssInvalidStateException("Should not execute here!");
  }