in ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java [2894:3121]
public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, Set<ReadEntity> inputs,
Context.Operation operation, HiveConf conf) {
List<LockComponent> lockComponents = new ArrayList<>();
boolean isLocklessReadsEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);
boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS);
boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS);
boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK);
boolean isExternalEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED);
boolean isMerge = operation == Context.Operation.MERGE;
// We don't want to acquire read locks during update or delete as we'll be acquiring write
// locks instead. Also, there's no need to lock temp tables since they're session wide
List<ReadEntity> readEntities = inputs.stream()
.filter(input -> !input.isDummy()
&& input.needsLock()
&& !input.isUpdateOrDelete()
&& AcidUtils.needsLock(input, isExternalEnabled, isLocklessReadsEnabled)
&& !skipReadLock)
.collect(Collectors.toList());
Set<Table> fullTableLock = getFullTableLock(readEntities, conf);
// For each source to read, get a shared_read lock
for (ReadEntity input : readEntities) {
LockComponentBuilder compBuilder = new LockComponentBuilder();
compBuilder.setSharedRead();
compBuilder.setOperationType(DataOperationType.SELECT);
Table t = null;
switch (input.getType()) {
case DATABASE:
compBuilder.setDbName(input.getDatabase().getName());
break;
case TABLE:
t = input.getTable();
if (!fullTableLock.contains(t)) {
continue;
}
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
case PARTITION:
case DUMMYPARTITION:
compBuilder.setPartitionName(input.getPartition().getName());
t = input.getPartition().getTable();
if (fullTableLock.contains(t)) {
continue;
}
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
default:
// This is a file or something we don't hold locks for.
continue;
}
if (skipNonAcidReadLock && !AcidUtils.isTransactionalTable(t)) {
// skip read-locks for non-transactional tables
// read-locks don't protect non-transactional tables data consistency
continue;
}
if (t != null) {
compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
}
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request {} ", comp);
lockComponents.add(comp);
}
// For each source to write to, get the appropriate lock type. If it's
// an OVERWRITE, we need to get an exclusive lock. If it's an insert (no
// overwrite) than we need a shared. If it's update or delete then we
// need a SHARED_WRITE.
for (WriteEntity output : outputs) {
if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR
|| !AcidUtils.needsLock(output, isExternalEnabled)) {
// We don't lock files or directories. We also skip locking temp tables.
continue;
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
Table t = null;
/**
* For any insert/updates set dir cache to read-only mode, where it wouldn't
* add any new entry to cache.
* When updates are executed, delta folders are created only at the end of the statement
* and at the time of acquiring locks, there would not be any delta folders. This can cause wrong data to be reported
* when "insert" followed by "update" statements are executed. In such cases, use the cache as read only mode.
*/
HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION, 0);
switch (output.getType()) {
case DATABASE:
compBuilder.setDbName(output.getDatabase().getName());
break;
case TABLE:
case DUMMYPARTITION: // in case of dynamic partitioning lock the table
t = output.getTable();
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
case PARTITION:
compBuilder.setPartitionName(output.getPartition().getName());
t = output.getPartition().getTable();
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
default:
// This is a file or something we don't hold locks for.
continue;
}
switch (output.getWriteType()) {
/* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code...
Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
makes sense everywhere). This however would be problematic for merge...*/
case DDL_EXCLUSIVE:
compBuilder.setExclusive();
compBuilder.setOperationType(DataOperationType.NO_TXN);
break;
case DDL_EXCL_WRITE:
compBuilder.setExclWrite();
compBuilder.setOperationType(DataOperationType.NO_TXN);
break;
case CTAS:
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
compBuilder.setExclWrite();
compBuilder.setOperationType(DataOperationType.INSERT);
} else {
compBuilder.setExclusive();
compBuilder.setOperationType(DataOperationType.NO_TXN);
}
break;
case INSERT_OVERWRITE:
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && !sharedWrite
&& !isLocklessReadsEnabled) {
compBuilder.setExclusive();
} else {
compBuilder.setExclWrite();
}
compBuilder.setOperationType(DataOperationType.UPDATE);
} else if (t.isNonNative()) {
compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
compBuilder.setOperationType(DataOperationType.UPDATE);
} else {
compBuilder.setExclusive();
compBuilder.setOperationType(DataOperationType.NO_TXN);
}
break;
case INSERT:
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
boolean isExclMergeInsert = conf.getBoolVar(ConfVars.TXN_MERGE_INSERT_X_LOCK) && isMerge;
compBuilder.setSharedRead();
if (sharedWrite || !isExclMergeInsert && isLocklessReadsEnabled) {
compBuilder.setSharedWrite();
} else if (isExclMergeInsert) {
compBuilder.setExclWrite();
}
if (isExclMergeInsert) {
compBuilder.setOperationType(DataOperationType.UPDATE);
break;
}
} else if (t.isNonNative()) {
compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
} else {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
compBuilder.setExclusive();
} else { // this is backward compatible for non-ACID resources, w/o ACID semantics
compBuilder.setSharedRead();
}
}
compBuilder.setOperationType(DataOperationType.INSERT);
break;
case DDL_SHARED:
compBuilder.setSharedRead();
if (output.isTxnAnalyze()) {
// Analyze needs txn components to be present, otherwise an aborted analyze write ID
// might be rolled under the watermark by compactor while stats written by it are
// still present.
continue;
}
compBuilder.setOperationType(DataOperationType.NO_TXN);
break;
case UPDATE:
case DELETE:
assert t != null;
if (AcidUtils.isTransactionalTable(t) && sharedWrite) {
compBuilder.setSharedWrite();
} else if (t.isNonNative()) {
compBuilder.setLock(getLockTypeFromStorageHandler(output, t));
} else {
compBuilder.setExclWrite();
}
compBuilder.setOperationType(DataOperationType.valueOf(
output.getWriteType().name()));
break;
case DDL_NO_LOCK:
continue; // No lock required here
default:
throw new RuntimeException("Unknown write type " + output.getWriteType().toString());
}
if (t != null) {
compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
}
compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
lockComponents.add(comp);
}
return lockComponents;
}