private void doWork()

in modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java [150:226]


    private void doWork() {

      ArrayList<TimeRequest> request = new ArrayList<>();

      while (true) {

        try {
          request.clear();
          TimeRequest trh = null;
          while (trh == null) {
            if (closed.get()) {
              return;
            }
            trh = queue.poll(1, TimeUnit.SECONDS);
          }
          request.add(trh);
          queue.drainTo(request);

          long txStampsStart;
          long gcStamp;

          while (true) {

            try {
              String currentLeaderId;
              OracleService.Client localClient;
              synchronized (this) {
                currentLeaderId = getOracle();
                localClient = client;
              }

              final Context timerContext = responseTimer.time();

              Stamps stamps = localClient.getTimestamps(env.getFluoApplicationID(), request.size());
              txStampsStart = stamps.txStampsStart;
              gcStamp = stamps.gcStamp;

              String leaderId = getOracle();
              if (leaderId != null && !leaderId.equals(currentLeaderId)) {
                reconnect();
                continue;
              }

              stampsHistogram.update(request.size());
              timerContext.close();

              break;

            } catch (TTransportException tte) {
              log.info("Oracle connection lost. Retrying...");
              reconnect();
            } catch (TException e) {
              log.error("TException occurred in doWork() method", e);
            }
          }

          for (int i = 0; i < request.size(); i++) {
            TimeRequest tr = request.get(i);
            Stamp stampRes = new Stamp(txStampsStart + i, gcStamp);
            tr.stampRef.set(stampRes);
            if (tr.cf == null) {
              tr.cdl.countDown();
            } else {
              tr.cf.complete(stampRes);
            }
          }
        } catch (InterruptedException e) {
          if (!closed.get()) {
            log.error("InterruptedException occurred in doWork() method", e);
          } else {
            log.debug("InterruptedException occurred in doWork() method", e);
          }
        } catch (Exception e) {
          log.error("Exception occurred in doWork() method", e);
        }
      }
    }