private Put transactionalizeAction()

in tephra-hbase-compat-1.4-base/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java [568:641]


  private Put transactionalizeAction(Delete delete) throws IOException {
    long transactionTimestamp = tx.getWritePointer();

    byte[] deleteRow = delete.getRow();
    Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());

    Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap();
    if (familyToDelete.isEmpty()) {
      // perform a row delete if we are using row-level conflict detection
      if (conflictLevel == TxConstants.ConflictDetection.ROW ||
        conflictLevel == TxConstants.ConflictDetection.NONE) {
        // Row delete leaves delete markers in all column families of the table
        // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet
        for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) {
          // no need to identify individual columns deleted
          deleteMarkers.add(columnDescriptor.getName(), TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
            HConstants.EMPTY_BYTE_ARRAY);
          addToChangeSet(deleteRow, columnDescriptor.getName(), null);
        }
      } else {
        Result result = get(new Get(delete.getRow()));
        // Delete everything
        NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap();
        for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) {
          NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey());
          for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
            deleteMarkers.add(familyEntry.getKey(), column.getKey(), transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey());
          }
        }
      }
    } else {
      for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
        byte[] family = familyEntry.getKey();
        List<Cell> entries = familyEntry.getValue();
        boolean isFamilyDelete = false;
        if (entries.size() == 1) {
          Cell cell = entries.get(0);
          isFamilyDelete = CellUtil.isDeleteFamily(cell);
        }
        if (isFamilyDelete) {
          if (conflictLevel == TxConstants.ConflictDetection.ROW ||
              conflictLevel == TxConstants.ConflictDetection.NONE) {
            // no need to identify individual columns deleted
            deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, family, null);
          } else {
            Result result = get(new Get(delete.getRow()).addFamily(family));
            // Delete entire family
            NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family);
            for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) {
              deleteMarkers.add(family, column.getKey(), transactionTimestamp,
                HConstants.EMPTY_BYTE_ARRAY);
              addToChangeSet(deleteRow, family, column.getKey());
            }
          }
        } else {
          for (Cell value : entries) {
            deleteMarkers.add(value.getFamily(), value.getQualifier(), transactionTimestamp,
              HConstants.EMPTY_BYTE_ARRAY);
            addToChangeSet(deleteRow, value.getFamily(), value.getQualifier());
          }
        }
      }
    }
    for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
      deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
    }
    deleteMarkers.setDurability(delete.getDurability());
    addToOperation(deleteMarkers, tx);
    return deleteMarkers;
  }