in helix-core/src/main/java/org/apache/helix/GroupCommit.java [87:192]
public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
ZNRecord record, boolean removeIfEmpty) {
Queue queue = getQueue(key);
Entry entry = new Entry(key, record);
boolean success = true;
queue._pending.add(entry);
while (!entry._sent.get()) {
if (queue._running.compareAndSet(null, Thread.currentThread())) {
ArrayList<Entry> processed = new ArrayList<>();
try {
if (queue._pending.peek() == null) {
return true;
}
// remove from queue
Entry first = queue._pending.poll();
processed.add(first);
String mergedKey = first._key;
ZNRecord merged = null;
try {
// accessor will fallback to zk if not found in cache
merged = accessor.get(mergedKey, null, options);
} catch (ZkNoNodeException e) {
// OK.
} catch (Exception e) {
LOG.error("Fail to get " + mergedKey + " from ZK", e);
return false;
}
/**
* If the local cache does not contain a value, need to check if there is a
* value in ZK; use it as initial value if exists
*/
if (merged == null) {
merged = new ZNRecord(first._record);
}
merged.merge(first._record);
Iterator<Entry> it = queue._pending.iterator();
while (it.hasNext()) {
Entry ent = it.next();
if (!ent._key.equals(mergedKey)) {
continue;
}
processed.add(ent);
merged.merge(ent._record);
// System.out.println("After merging:" + merged);
it.remove();
}
int retry = 0;
success = false;
while (++retry <= MAX_RETRY && !success) {
if (removeIfEmpty && merged.getMapFields().isEmpty()) {
try {
success = accessor.remove(mergedKey, options);
} catch (Exception e) {
LOG.error("Fails to remove " + mergedKey + " from ZK due to ZK issue.", e);
success = false;
}
if (!success) {
LOG.error("Fails to remove " + mergedKey + " from ZK, retry it!");
} else {
LOG.info("Removed " + mergedKey);
}
} else {
try {
success = accessor.set(mergedKey, merged, options);
} catch (Exception e) {
LOG.error("Fails to update " + mergedKey + " to ZK due to ZK issue.", e);
success = false;
}
if (!success) {
LOG.error("Fails to update " + mergedKey + " to ZK, retry it! ");
}
}
}
} finally {
queue._running.set(null);
for (Entry e : processed) {
synchronized (e) {
e._sent.set(true);
e.notify();
}
}
}
} else {
synchronized (entry) {
try {
entry.wait(10);
} catch (InterruptedException e) {
LOG.error("Interrupted while committing change, key: " + key + ", record: " + record,
e);
// Restore interrupt status
Thread.currentThread().interrupt();
return false;
}
}
}
}
return success;
}