public boolean commit()

in helix-core/src/main/java/org/apache/helix/GroupCommit.java [87:192]


  public boolean commit(BaseDataAccessor<ZNRecord> accessor, int options, String key,
      ZNRecord record, boolean removeIfEmpty) {
    Queue queue = getQueue(key);
    Entry entry = new Entry(key, record);

    boolean success = true;
    queue._pending.add(entry);

    while (!entry._sent.get()) {
      if (queue._running.compareAndSet(null, Thread.currentThread())) {
        ArrayList<Entry> processed = new ArrayList<>();
        try {
          if (queue._pending.peek() == null) {
            return true;
          }

          // remove from queue
          Entry first = queue._pending.poll();
          processed.add(first);

          String mergedKey = first._key;
          ZNRecord merged = null;

          try {
            // accessor will fallback to zk if not found in cache
            merged = accessor.get(mergedKey, null, options);
          } catch (ZkNoNodeException e) {
            // OK.
          } catch (Exception e) {
            LOG.error("Fail to get " + mergedKey + " from ZK", e);
            return false;
          }

          /**
           * If the local cache does not contain a value, need to check if there is a
           * value in ZK; use it as initial value if exists
           */
          if (merged == null) {
            merged = new ZNRecord(first._record);
          }
          merged.merge(first._record);

          Iterator<Entry> it = queue._pending.iterator();
          while (it.hasNext()) {
            Entry ent = it.next();
            if (!ent._key.equals(mergedKey)) {
              continue;
            }
            processed.add(ent);
            merged.merge(ent._record);
            // System.out.println("After merging:" + merged);
            it.remove();
          }

          int retry = 0;
          success = false;
          while (++retry <= MAX_RETRY && !success) {
            if (removeIfEmpty && merged.getMapFields().isEmpty()) {
              try {
                success = accessor.remove(mergedKey, options);
              } catch (Exception e) {
                LOG.error("Fails to remove " + mergedKey + " from ZK due to ZK issue.", e);
                success = false;
              }
              if (!success) {
                LOG.error("Fails to remove " + mergedKey + " from ZK, retry it!");
              } else {
                LOG.info("Removed " + mergedKey);
              }
            } else {
              try {
                success = accessor.set(mergedKey, merged, options);
              } catch (Exception e) {
                LOG.error("Fails to update " + mergedKey + " to ZK due to ZK issue.", e);
                success = false;
              }
              if (!success) {
                LOG.error("Fails to update " + mergedKey + " to ZK, retry it! ");
              }
            }
          }
        } finally {
          queue._running.set(null);
          for (Entry e : processed) {
            synchronized (e) {
              e._sent.set(true);
              e.notify();
            }
          }
        }
      } else {
        synchronized (entry) {
          try {
            entry.wait(10);
          } catch (InterruptedException e) {
            LOG.error("Interrupted while committing change, key: " + key + ", record: " + record,
                e);
            // Restore interrupt status
            Thread.currentThread().interrupt();
            return false;
          }
        }
      }
    }
    return success;
  }