public static List makeLockComponents()

in ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java [2894:3121]


  public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, Set<ReadEntity> inputs,
      Context.Operation operation, HiveConf conf) {

    List<LockComponent> lockComponents = new ArrayList<>();
    boolean isLocklessReadsEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);
    boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS);
    boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS);

    boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK);
    boolean isExternalEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED);
    boolean isMerge = operation == Context.Operation.MERGE;

    // We don't want to acquire read locks during update or delete as we'll be acquiring write
    // locks instead. Also, there's no need to lock temp tables since they're session wide
    List<ReadEntity> readEntities = inputs.stream()
      .filter(input -> !input.isDummy()
        && input.needsLock()
        && !input.isUpdateOrDelete()
        && AcidUtils.needsLock(input, isExternalEnabled, isLocklessReadsEnabled)
        && !skipReadLock)
      .collect(Collectors.toList());

    Set<Table> fullTableLock = getFullTableLock(readEntities, conf);

    // For each source to read, get a shared_read lock
    for (ReadEntity input : readEntities) {
      LockComponentBuilder compBuilder = new LockComponentBuilder();
      compBuilder.setSharedRead();
      compBuilder.setOperationType(DataOperationType.SELECT);

      Table t = null;
      switch (input.getType()) {
        case DATABASE:
          compBuilder.setDbName(input.getDatabase().getName());
          break;

        case TABLE:
          t = input.getTable();
          if (!fullTableLock.contains(t)) {
            continue;
          }
          compBuilder.setDbName(t.getDbName());
          compBuilder.setTableName(t.getTableName());
          break;

        case PARTITION:
        case DUMMYPARTITION:
          compBuilder.setPartitionName(input.getPartition().getName());
          t = input.getPartition().getTable();
          if (fullTableLock.contains(t)) {
            continue;
          }
          compBuilder.setDbName(t.getDbName());
          compBuilder.setTableName(t.getTableName());
          break;

        default:
          // This is a file or something we don't hold locks for.
          continue;
      }
      if (skipNonAcidReadLock && !AcidUtils.isTransactionalTable(t)) {
        // skip read-locks for non-transactional tables
        // read-locks don't protect non-transactional tables data consistency
        continue;
      }
      if (t != null) {
        compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
      }
      LockComponent comp = compBuilder.build();
      LOG.debug("Adding lock component to lock request {} ", comp);
      lockComponents.add(comp);
    }
    // For each source to write to, get the appropriate lock type.  If it's
    // an OVERWRITE, we need to get an exclusive lock.  If it's an insert (no
    // overwrite) than we need a shared.  If it's update or delete then we
    // need a SHARED_WRITE.
    for (WriteEntity output : outputs) {
      if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR
          || !AcidUtils.needsLock(output, isExternalEnabled)) {
        // We don't lock files or directories. We also skip locking temp tables.
        continue;
      }
      LockComponentBuilder compBuilder = new LockComponentBuilder();
      Table t = null;
      /**
       * For any insert/updates set dir cache to read-only mode, where it wouldn't
       * add any new entry to cache.
       * When updates are executed, delta folders are created only at the end of the statement
       * and at the time of acquiring locks, there would not be any delta folders. This can cause wrong data to be reported
       * when "insert" followed by "update" statements are executed. In such cases, use the cache as read only mode.
       */
      HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION, 0);
      switch (output.getType()) {
      case DATABASE:
        compBuilder.setDbName(output.getDatabase().getName());
        break;

      case TABLE:
      case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
        t = output.getTable();
        compBuilder.setDbName(t.getDbName());
        compBuilder.setTableName(t.getTableName());
        break;

      case PARTITION:
        compBuilder.setPartitionName(output.getPartition().getName());
        t = output.getPartition().getTable();
        compBuilder.setDbName(t.getDbName());
        compBuilder.setTableName(t.getTableName());
        break;

      default:
        // This is a file or something we don't hold locks for.
        continue;
      }
      switch (output.getWriteType()) {
        /* base this on HiveOperation instead?  this and DDL_NO_LOCK is peppered all over the code...
         Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
         makes sense everywhere).  This however would be problematic for merge...*/
      case DDL_EXCLUSIVE:
        compBuilder.setExclusive();
        compBuilder.setOperationType(DataOperationType.NO_TXN);
        break;

      case DDL_EXCL_WRITE:
        compBuilder.setExclWrite();
        compBuilder.setOperationType(DataOperationType.NO_TXN);
        break;

      case CTAS:
        assert t != null;
        if (AcidUtils.isTransactionalTable(t)) {
          compBuilder.setExclWrite();
          compBuilder.setOperationType(DataOperationType.INSERT);
        } else {
          compBuilder.setExclusive();
          compBuilder.setOperationType(DataOperationType.NO_TXN);
        }
        break;

      case INSERT_OVERWRITE:
        assert t != null;
        if (AcidUtils.isTransactionalTable(t)) {
          if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && !sharedWrite
                  && !isLocklessReadsEnabled) {
            compBuilder.setExclusive();
          } else {
            compBuilder.setExclWrite();
          }
          compBuilder.setOperationType(DataOperationType.UPDATE);
        } else if (t.isNonNative()) {
          compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
          compBuilder.setOperationType(DataOperationType.UPDATE);
        } else {
          compBuilder.setExclusive();
          compBuilder.setOperationType(DataOperationType.NO_TXN);
        }
        break;

      case INSERT:
        assert t != null;
        if (AcidUtils.isTransactionalTable(t)) {
          boolean isExclMergeInsert = conf.getBoolVar(ConfVars.TXN_MERGE_INSERT_X_LOCK) && isMerge;
          compBuilder.setSharedRead();

          if (sharedWrite || !isExclMergeInsert && isLocklessReadsEnabled) {
            compBuilder.setSharedWrite();
          } else if (isExclMergeInsert) {
            compBuilder.setExclWrite();
          }
          if (isExclMergeInsert) {
            compBuilder.setOperationType(DataOperationType.UPDATE);
            break;
          }
        } else if (t.isNonNative()) {
          compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
        } else {
          if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
            compBuilder.setExclusive();
          } else {  // this is backward compatible for non-ACID resources, w/o ACID semantics
            compBuilder.setSharedRead();
          }
        }
        compBuilder.setOperationType(DataOperationType.INSERT);
        break;

      case DDL_SHARED:
        compBuilder.setSharedRead();
        if (output.isTxnAnalyze()) {
          // Analyze needs txn components to be present, otherwise an aborted analyze write ID
          // might be rolled under the watermark by compactor while stats written by it are
          // still present.
          continue;
        }
        compBuilder.setOperationType(DataOperationType.NO_TXN);
        break;

      case UPDATE:
      case DELETE:
        assert t != null;
        if (AcidUtils.isTransactionalTable(t) && sharedWrite) {
          compBuilder.setSharedWrite();
        } else if (t.isNonNative()) {
          compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
        } else {
          compBuilder.setExclWrite();
        }
        compBuilder.setOperationType(DataOperationType.valueOf(
            output.getWriteType().name()));
        break;

      case DDL_NO_LOCK:
        continue; // No lock required here

      default:
        throw new RuntimeException("Unknown write type " + output.getWriteType().toString());
      }
      if (t != null) {
        compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
      }

      compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
      LockComponent comp = compBuilder.build();
      LOG.debug("Adding lock component to lock request " + comp.toString());
      lockComponents.add(comp);
    }
    return lockComponents;
  }