tephra-hbase-compat-2.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java [425:731]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public Result append(Append append) throws IOException {
    if (allowNonTransactional) {
      return hTable.append(append);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public Result increment(Increment increment) throws IOException {
    if (allowNonTransactional) {
      return hTable.increment(increment);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
      throws IOException {
    if (allowNonTransactional) {
      return hTable.incrementColumnValue(row, family, qualifier, amount);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
      Durability durability) throws IOException {
    if (allowNonTransactional) {
      return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public void close() throws IOException {
    hTable.close();
  }

  @Override
  public CoprocessorRpcChannel coprocessorService(byte[] row) {
    return hTable.coprocessorService(row);
  }

  @Override
  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey,
      byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
    return hTable.coprocessorService(service, startKey, endKey, callable);
  }

  @Override
  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
      byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
      throws ServiceException, Throwable {
    hTable.coprocessorService(service, startKey, endKey, callable, callback);
  }

  @Override
  public <R extends Message> Map<byte[], R> batchCoprocessorService(
      MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
      R responsePrototype) throws ServiceException, Throwable {
    return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
      responsePrototype);
  }

  @Override
  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
      throws ServiceException, Throwable {
    hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
      callback);
  }

  @Override
  public RegionLocator getRegionLocator() throws IOException {
      return hTable.getRegionLocator();
  }

  public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
    op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
  }

  protected void makeRollbackOperation(Delete delete) {
    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
  }

  @Override
  public TableDescriptor getDescriptor() throws IOException {
    return hTable.getDescriptor();
  }

  @Override
  public boolean[] exists(List<Get> gets) throws IOException {
    if (tx == null) {
      throw new IOException("Transaction not started");
    }
    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
    for (Get get : gets) {
      transactionalizedGets.add(transactionalizeAction(get));
    }
    return hTable.exists(transactionalizedGets);
  }

  @Override
  public long getRpcTimeout(TimeUnit unit) {
    return hTable.getRpcTimeout(unit);
  }

  @Override
  public int getRpcTimeout() {
    return hTable.getRpcTimeout();
  }

  @Override
  public void setRpcTimeout(int rpcTimeout) {
    hTable.setRpcTimeout(rpcTimeout);

  }

  @Override
  public long getReadRpcTimeout(TimeUnit unit) {
    return hTable.getReadRpcTimeout(unit);
  }

  @Override
  public int getReadRpcTimeout() {
    return hTable.getReadRpcTimeout();
  }

  @Override
  public void setReadRpcTimeout(int readRpcTimeout) {
    hTable.setReadRpcTimeout(readRpcTimeout);

  }

  @Override
  public long getWriteRpcTimeout(TimeUnit unit) {
    return hTable.getWriteRpcTimeout(unit);
  }

  @Override
  public int getWriteRpcTimeout() {
    return hTable.getWriteRpcTimeout();
  }

  @Override
  public void setWriteRpcTimeout(int writeRpcTimeout) {
    hTable.setWriteRpcTimeout(writeRpcTimeout);

  }

  @Override
  public long getOperationTimeout(TimeUnit unit) {
    return hTable.getOperationTimeout(unit);
  }

  @Override
  public int getOperationTimeout() {
    return hTable.getOperationTimeout();
  }

  @Override
  public void setOperationTimeout(int operationTimeout) {
    hTable.setOperationTimeout(operationTimeout);
    ;
  }

  @Override
  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
    if (allowNonTransactional) {
      return hTable.checkAndMutate(row, family);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.

  private Get transactionalizeAction(Get get) throws IOException {
    addToOperation(get, tx);
    return get;
  }

  private Scan transactionalizeAction(Scan scan) throws IOException {
    addToOperation(scan, tx);
    return scan;
  }

  private Put transactionalizeAction(Put put) throws IOException {
    Put txPut = new Put(put.getRow(), tx.getWritePointer());
    Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
    if (!familyMap.isEmpty()) {
      for (Map.Entry<byte[], List<Cell>> family : familyMap) {
        List<Cell> familyValues = family.getValue();
        if (!familyValues.isEmpty()) {
          for (Cell value : familyValues) {
            txPut.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value),
              tx.getWritePointer(), CellUtil.cloneValue(value));
            addToChangeSet(txPut.getRow(), CellUtil.cloneFamily(value),
              CellUtil.cloneQualifier(value));
          }
        }
      }
    }
    for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
      txPut.setAttribute(entry.getKey(), entry.getValue());
    }
    txPut.setDurability(put.getDurability());
    addToOperation(txPut, tx);
    return txPut;
  }

  private Put transactionalizeAction(Delete delete) throws IOException {
    long transactionTimestamp = tx.getWritePointer();

    byte[] deleteRow = delete.getRow();
    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());

    Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
    if (familyToDelete.isEmpty()) {
      // perform a row delete if we are using row-level conflict detection
      if (conflictLevel == TxConstants.ConflictDetection.ROW
          || conflictLevel == TxConstants.ConflictDetection.NONE) {
        // Row delete leaves delete markers in all column families of the table
        // Therefore get all the column families of the hTable from the HTableDescriptor and
        // add them to the
        // changeSet
        for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
          // no need to identify individual columns deleted
          deleteMarkers.addColumn(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
            HConstants.EMPTY_BYTE_ARRAY);
          addToChangeSet(deleteRow, columnDescriptor.getName(), null);
        }
      } else {
        Result result = get(new Get(delete.getRow()));
        // Delete everything
        NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
        for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
          NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
          for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
            deleteMarkers.addColumn(familyEntry.getKey(), column.getKey(), transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
          }
        }
      }
    } else {
      for (Map.Entry<byte[], List<Cell>> familyEntry : familyToDelete.entrySet()) {
        byte[] family = familyEntry.getKey();
        List<Cell> entries = familyEntry.getValue();
        boolean isFamilyDelete = false;
        if (entries.size() == 1) {
          Cell cell = entries.get(0);
          isFamilyDelete = CellUtil.isDeleteFamily(cell);
        }
        if (isFamilyDelete) {
          if (conflictLevel == TxConstants.ConflictDetection.ROW
              || conflictLevel == TxConstants.ConflictDetection.NONE) {
            // no need to identify individual columns deleted
            deleteMarkers.addColumn(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, family, null);
          } else {
            Result result = get(new Get(delete.getRow()).addFamily(family));
            // Delete entire family
            NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
            for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
              deleteMarkers.addColumn(family, column.getKey(), transactionTimestamp, HConstants.EMPTY_BYTE_ARRAY);
              addToChangeSet(deleteRow, family, column.getKey());
            }
          }
        } else {
          for (Cell value : entries) {
            deleteMarkers.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value), transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value));
          }
        }
      }
    }
    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
    }
    deleteMarkers.setDurability(delete.getDurability());
    addToOperation(deleteMarkers, tx);
    return deleteMarkers;
  }

  private List<? extends Row> transactionalizeActions(List<? extends Row> actions)
      throws IOException {
    List<Row> transactionalizedActions = new ArrayList<>(actions.size());
    for (Row action : actions) {
      if (action instanceof Get) {
        transactionalizedActions.add(transactionalizeAction((Get) action));
      } else if (action instanceof Put) {
        transactionalizedActions.add(transactionalizeAction((Put) action));
      } else if (action instanceof Delete) {
        transactionalizedActions.add(transactionalizeAction((Delete) action));
      } else {
        transactionalizedActions.add(action);
      }
    }
    return transactionalizedActions;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tephra-hbase-compat-2.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java [425:731]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  @Override
  public Result append(Append append) throws IOException {
    if (allowNonTransactional) {
      return hTable.append(append);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public Result increment(Increment increment) throws IOException {
    if (allowNonTransactional) {
      return hTable.increment(increment);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
      throws IOException {
    if (allowNonTransactional) {
      return hTable.incrementColumnValue(row, family, qualifier, amount);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
      Durability durability) throws IOException {
    if (allowNonTransactional) {
      return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  @Override
  public void close() throws IOException {
    hTable.close();
  }

  @Override
  public CoprocessorRpcChannel coprocessorService(byte[] row) {
    return hTable.coprocessorService(row);
  }

  @Override
  public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey,
      byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
    return hTable.coprocessorService(service, startKey, endKey, callable);
  }

  @Override
  public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
      byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
      throws ServiceException, Throwable {
    hTable.coprocessorService(service, startKey, endKey, callable, callback);
  }

  @Override
  public <R extends Message> Map<byte[], R> batchCoprocessorService(
      MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
      R responsePrototype) throws ServiceException, Throwable {
    return hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
      responsePrototype);
  }

  @Override
  public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
      Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
      throws ServiceException, Throwable {
    hTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
      callback);
  }

  @Override
  public RegionLocator getRegionLocator() throws IOException {
      return hTable.getRegionLocator();
  }

  public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException {
    op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx));
  }

  protected void makeRollbackOperation(Delete delete) {
    delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
  }

  @Override
  public TableDescriptor getDescriptor() throws IOException {
    return hTable.getDescriptor();
  }

  @Override
  public boolean[] exists(List<Get> gets) throws IOException {
    if (tx == null) {
      throw new IOException("Transaction not started");
    }
    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
    for (Get get : gets) {
      transactionalizedGets.add(transactionalizeAction(get));
    }
    return hTable.exists(transactionalizedGets);
  }

  @Override
  public long getRpcTimeout(TimeUnit unit) {
    return hTable.getRpcTimeout(unit);
  }

  @Override
  public int getRpcTimeout() {
    return hTable.getRpcTimeout();
  }

  @Override
  public void setRpcTimeout(int rpcTimeout) {
    hTable.setRpcTimeout(rpcTimeout);

  }

  @Override
  public long getReadRpcTimeout(TimeUnit unit) {
    return hTable.getReadRpcTimeout(unit);
  }

  @Override
  public int getReadRpcTimeout() {
    return hTable.getReadRpcTimeout();
  }

  @Override
  public void setReadRpcTimeout(int readRpcTimeout) {
    hTable.setReadRpcTimeout(readRpcTimeout);

  }

  @Override
  public long getWriteRpcTimeout(TimeUnit unit) {
    return hTable.getWriteRpcTimeout(unit);
  }

  @Override
  public int getWriteRpcTimeout() {
    return hTable.getWriteRpcTimeout();
  }

  @Override
  public void setWriteRpcTimeout(int writeRpcTimeout) {
    hTable.setWriteRpcTimeout(writeRpcTimeout);

  }

  @Override
  public long getOperationTimeout(TimeUnit unit) {
    return hTable.getOperationTimeout(unit);
  }

  @Override
  public int getOperationTimeout() {
    return hTable.getOperationTimeout();
  }

  @Override
  public void setOperationTimeout(int operationTimeout) {
    hTable.setOperationTimeout(operationTimeout);
    ;
  }

  @Override
  public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
    if (allowNonTransactional) {
      return hTable.checkAndMutate(row, family);
    } else {
      throw new UnsupportedOperationException("Operation is not supported transactionally");
    }
  }

  // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.

  private Get transactionalizeAction(Get get) throws IOException {
    addToOperation(get, tx);
    return get;
  }

  private Scan transactionalizeAction(Scan scan) throws IOException {
    addToOperation(scan, tx);
    return scan;
  }

  private Put transactionalizeAction(Put put) throws IOException {
    Put txPut = new Put(put.getRow(), tx.getWritePointer());
    Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet();
    if (!familyMap.isEmpty()) {
      for (Map.Entry<byte[], List<Cell>> family : familyMap) {
        List<Cell> familyValues = family.getValue();
        if (!familyValues.isEmpty()) {
          for (Cell value : familyValues) {
            txPut.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value),
              tx.getWritePointer(), CellUtil.cloneValue(value));
            addToChangeSet(txPut.getRow(), CellUtil.cloneFamily(value),
              CellUtil.cloneQualifier(value));
          }
        }
      }
    }
    for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) {
      txPut.setAttribute(entry.getKey(), entry.getValue());
    }
    txPut.setDurability(put.getDurability());
    addToOperation(txPut, tx);
    return txPut;
  }

  private Put transactionalizeAction(Delete delete) throws IOException {
    long transactionTimestamp = tx.getWritePointer();

    byte[] deleteRow = delete.getRow();
    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());

    Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
    if (familyToDelete.isEmpty()) {
      // perform a row delete if we are using row-level conflict detection
      if (conflictLevel == TxConstants.ConflictDetection.ROW
          || conflictLevel == TxConstants.ConflictDetection.NONE) {
        // Row delete leaves delete markers in all column families of the table
        // Therefore get all the column families of the hTable from the HTableDescriptor and
        // add them to the
        // changeSet
        for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
          // no need to identify individual columns deleted
          deleteMarkers.addColumn(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
            HConstants.EMPTY_BYTE_ARRAY);
          addToChangeSet(deleteRow, columnDescriptor.getName(), null);
        }
      } else {
        Result result = get(new Get(delete.getRow()));
        // Delete everything
        NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
        for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
          NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
          for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
            deleteMarkers.addColumn(familyEntry.getKey(), column.getKey(), transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
          }
        }
      }
    } else {
      for (Map.Entry<byte[], List<Cell>> familyEntry : familyToDelete.entrySet()) {
        byte[] family = familyEntry.getKey();
        List<Cell> entries = familyEntry.getValue();
        boolean isFamilyDelete = false;
        if (entries.size() == 1) {
          Cell cell = entries.get(0);
          isFamilyDelete = CellUtil.isDeleteFamily(cell);
        }
        if (isFamilyDelete) {
          if (conflictLevel == TxConstants.ConflictDetection.ROW
              || conflictLevel == TxConstants.ConflictDetection.NONE) {
            // no need to identify individual columns deleted
            deleteMarkers.addColumn(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, family, null);
          } else {
            Result result = get(new Get(delete.getRow()).addFamily(family));
            // Delete entire family
            NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
            for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
              deleteMarkers.addColumn(family, column.getKey(), transactionTimestamp, HConstants.EMPTY_BYTE_ARRAY);
              addToChangeSet(deleteRow, family, column.getKey());
            }
          }
        } else {
          for (Cell value : entries) {
            deleteMarkers.addColumn(CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value), transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, CellUtil.cloneFamily(value), CellUtil.cloneQualifier(value));
          }
        }
      }
    }
    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
    }
    deleteMarkers.setDurability(delete.getDurability());
    addToOperation(deleteMarkers, tx);
    return deleteMarkers;
  }

  private List<? extends Row> transactionalizeActions(List<? extends Row> actions)
      throws IOException {
    List<Row> transactionalizedActions = new ArrayList<>(actions.size());
    for (Row action : actions) {
      if (action instanceof Get) {
        transactionalizedActions.add(transactionalizeAction((Get) action));
      } else if (action instanceof Put) {
        transactionalizedActions.add(transactionalizeAction((Put) action));
      } else if (action instanceof Delete) {
        transactionalizedActions.add(transactionalizeAction((Delete) action));
      } else {
        transactionalizedActions.add(action);
      }
    }
    return transactionalizedActions;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



