in modules/core/src/main/java/org/apache/fluo/core/oracle/OracleClient.java [150:226]
private void doWork() {
ArrayList<TimeRequest> request = new ArrayList<>();
while (true) {
try {
request.clear();
TimeRequest trh = null;
while (trh == null) {
if (closed.get()) {
return;
}
trh = queue.poll(1, TimeUnit.SECONDS);
}
request.add(trh);
queue.drainTo(request);
long txStampsStart;
long gcStamp;
while (true) {
try {
String currentLeaderId;
OracleService.Client localClient;
synchronized (this) {
currentLeaderId = getOracle();
localClient = client;
}
final Context timerContext = responseTimer.time();
Stamps stamps = localClient.getTimestamps(env.getFluoApplicationID(), request.size());
txStampsStart = stamps.txStampsStart;
gcStamp = stamps.gcStamp;
String leaderId = getOracle();
if (leaderId != null && !leaderId.equals(currentLeaderId)) {
reconnect();
continue;
}
stampsHistogram.update(request.size());
timerContext.close();
break;
} catch (TTransportException tte) {
log.info("Oracle connection lost. Retrying...");
reconnect();
} catch (TException e) {
log.error("TException occurred in doWork() method", e);
}
}
for (int i = 0; i < request.size(); i++) {
TimeRequest tr = request.get(i);
Stamp stampRes = new Stamp(txStampsStart + i, gcStamp);
tr.stampRef.set(stampRes);
if (tr.cf == null) {
tr.cdl.countDown();
} else {
tr.cf.complete(stampRes);
}
}
} catch (InterruptedException e) {
if (!closed.get()) {
log.error("InterruptedException occurred in doWork() method", e);
} else {
log.debug("InterruptedException occurred in doWork() method", e);
}
} catch (Exception e) {
log.error("Exception occurred in doWork() method", e);
}
}
}