private CommitData setUpBeginCommitAsync()

in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [1472:1529]


  private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
      RowColumn primary) {
    if (updates.isEmpty()) {
      // TODO do async
      deleteWeakRow();
      commitCallback.committed();
      return null;
    }

    for (Map<Column, Bytes> cols : updates.values()) {
      stats.incrementEntriesSet(cols.size());
    }

    Bytes primRow = null;
    Column primCol = null;

    if (primary != null) {
      primRow = primary.getRow();
      primCol = primary.getColumn();
      if (notification != null && !primary.equals(notification.getRowColumn())) {
        throw new IllegalArgumentException("Primary must be notification");
      }
    } else if (notification != null) {
      primRow = notification.getRow();
      primCol = notification.getColumn();
    } else {

      outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet()) {
        for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
          if (!isReadLock(entry2.getValue())) {
            primRow = entry.getKey();
            primCol = entry2.getKey();
            break outer;
          }
        }
      }

      if (primRow == null) {
        // there are only read locks, so nothing to write
        deleteWeakRow();
        commitCallback.committed();
        return null;
      }
    }

    // get a primary column
    cd.prow = primRow;
    Map<Column, Bytes> colSet = updates.get(cd.prow);
    cd.pcol = primCol;
    cd.pval = colSet.remove(primCol);
    if (colSet.isEmpty()) {
      updates.remove(cd.prow);
    }

    cd.commitObserver = commitCallback;

    return cd;
  }