in modules/core/src/main/java/org/apache/fluo/core/impl/TimestampTracker.java [55:115]
public TimestampTracker(Environment env, TransactorID tid, long updatePeriodMs) {
Objects.requireNonNull(env, "environment cannot be null");
Objects.requireNonNull(tid, "tid cannot be null");
Preconditions.checkArgument(updatePeriodMs > 0, "update period must be positive");
this.env = env;
this.tid = tid;
TimerTask tt = new TimerTask() {
private int sawZeroCount = 0;
@Override
public void run() {
try {
long ts = 0;
synchronized (TimestampTracker.this) {
if (closed) {
return;
}
if (allocationsInProgress > 0) {
sawZeroCount = 0;
if (!timestamps.isEmpty()) {
if (updatingZk) {
throw new IllegalStateException("expected updatingZk to be false");
}
ts = timestamps.first();
updatingZk = true;
}
} else if (allocationsInProgress == 0) {
sawZeroCount++;
if (sawZeroCount >= 2) {
sawZeroCount = 0;
closeZkNode();
}
} else {
throw new IllegalStateException("allocationsInProgress = " + allocationsInProgress);
}
}
// update can be done outside of sync block as timer has one thread and future
// executions of run method will block until this method returns
if (updatingZk) {
try {
updateZkNode(ts);
} finally {
synchronized (TimestampTracker.this) {
updatingZk = false;
}
}
}
} catch (Exception e) {
log.error("Exception occurred in Zookeeper update thread", e);
}
}
};
timer = new Timer("TimestampTracker timer", true);
timer.schedule(tt, updatePeriodMs, updatePeriodMs);
}