in hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java [198:436]
private ContainerCommandResponseProto dispatchRequest(
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Preconditions.checkNotNull(msg);
if (LOG.isTraceEnabled()) {
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType(),
msg.getTraceID());
}
AuditAction action = getAuditAction(msg.getCmdType());
EventType eventType = getEventType(msg);
PerformanceStringBuilder perf = new PerformanceStringBuilder();
ContainerType containerType;
ContainerCommandResponseProto responseProto = null;
long startTime = Time.monotonicNowNanos();
Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
Container container = getContainer(containerID);
boolean isWriteStage =
(cmdType == Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.WRITE_DATA)
|| (cmdType == Type.StreamInit);
boolean isWriteCommitStage =
(cmdType == Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMMIT_DATA);
if (dispatcherContext == null) {
// increase all op not through ratis
metrics.incContainerOpsMetrics(cmdType);
} else if (isWriteStage) {
// increase WriteChunk in only WRITE_STAGE
metrics.incContainerOpsMetrics(cmdType);
} else if (cmdType != Type.WriteChunk) {
metrics.incContainerOpsMetrics(cmdType);
}
try {
if (DispatcherContext.op(dispatcherContext).validateToken()) {
validateToken(msg);
}
} catch (IOException ioe) {
final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+ " for " + dispatcherContext + ": " + ioe.getMessage();
final StorageContainerException sce = new StorageContainerException(
s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
// if the command gets executed other than Ratis, the default write stage
// is WriteChunkStage.COMBINED
boolean isCombinedStage =
cmdType == Type.WriteChunk && (dispatcherContext == null
|| dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMBINED);
Map<Long, Long> container2BCSIDMap = null;
if (dispatcherContext != null) {
container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
}
if (isWriteCommitStage) {
// check if the container Id exist in the loaded snapshot file. if
// it does not , it infers that , this is a restart of dn where
// the we are reapplying the transaction which was not captured in the
// snapshot.
// just add it to the list, and remove it from missing container set
// as it might have been added in the list during "init".
Preconditions.checkNotNull(container2BCSIDMap);
if (container != null && container2BCSIDMap.get(containerID) == null) {
container2BCSIDMap.put(
containerID, container.getBlockCommitSequenceId());
getMissingContainerSet().remove(containerID);
}
}
if (cmdType != Type.CreateContainer && !HddsUtils.isReadOnly(msg)
&& getMissingContainerSet().contains(containerID)) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID
+ " has been lost and cannot be recreated on this DataNode",
ContainerProtos.Result.CONTAINER_MISSING);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
if (cmdType != Type.CreateContainer) {
/**
* Create Container should happen only as part of Write_Data phase of
* writeChunk.
* In EC, we are doing empty putBlock. In the partial stripe writes, if
* file size is less than chunkSize*(ECData-1), we are making empty block
* to get the container created in non writing nodes. If replica index is
* >0 then we know it's for ec container.
*/
if (container == null && ((isWriteStage || isCombinedStage)
|| cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock)) {
// If container does not exist, create one for WriteChunk and
// PutSmallFile request
responseProto = createContainer(msg);
metrics.incContainerOpsMetrics(Type.CreateContainer);
metrics.incContainerOpsLatencies(Type.CreateContainer,
Time.monotonicNowNanos() - startTime);
if (responseProto.getResult() != Result.SUCCESS) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " creation failed",
responseProto.getResult());
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
|| dispatcherContext == null
|| cmdType == Type.PutBlock);
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
// with initial BCSID recorded as 0.
container2BCSIDMap.putIfAbsent(containerID, 0L);
}
container = getContainer(containerID);
}
// if container not found return error
if (container == null) {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception("MALFORMED_REQUEST"));
return malformedRequest(msg);
}
containerType = msg.getCreateContainer().getContainerType();
}
// Small performance optimization. We check if the operation is of type
// write before trying to send CloseContainerAction.
if (!HddsUtils.isReadOnly(msg)) {
sendCloseContainerActionIfNeeded(container);
}
Handler handler = getHandler(containerType);
if (handler == null) {
StorageContainerException ex = new StorageContainerException("Invalid " +
"ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
// log failure
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime);
responseProto = handler.handle(msg, container, dispatcherContext);
long opLatencyNs = Time.monotonicNowNanos() - startTime;
if (responseProto != null) {
metrics.incContainerOpsLatencies(cmdType, opLatencyNs);
// If the request is of Write Type and the container operation
// is unsuccessful, it implies the applyTransaction on the container
// failed. All subsequent transactions on the container should fail and
// hence replica will be marked unhealthy here. In this case, a close
// container action will be sent to SCM to close the container.
// ApplyTransaction called on closed Container will fail with Closed
// container exception. In such cases, ignore the exception here
// If the container is already marked unhealthy, no need to change the
// state here.
Result result = responseProto.getResult();
if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
// If the container is open/closing and the container operation
// has failed, it should be first marked unhealthy and the initiate the
// close container action. This also implies this is the first
// transaction which has failed, so the container is marked unhealthy
// right here.
// Once container is marked unhealthy, all the subsequent write
// transactions will fail with UNHEALTHY_CONTAINER exception.
if (container == null) {
throw new NullPointerException(
"Error on creating containers " + result + " " + responseProto
.getMessage());
}
// For container to be moved to unhealthy state here, the container can
// only be in open or closing state.
State containerState = container.getContainerData().getState();
Preconditions.checkState(
containerState == State.OPEN
|| containerState == State.CLOSING
|| containerState == State.RECOVERING);
// mark and persist the container state to be unhealthy
try {
// TODO HDDS-7096 + HDDS-8781: Use on demand scanning for the open
// container instead.
handler.markContainerUnhealthy(container,
ScanResult.unhealthy(ScanResult.FailureType.WRITE_FAILURE,
new File(container.getContainerData().getContainerPath()),
new StorageContainerException(result)));
LOG.info("Marked Container UNHEALTHY, ContainerID: {}", containerID);
} catch (IOException ioe) {
// just log the error here in case marking the container fails,
// Return the actual failure response to the client
LOG.error("Failed to mark container " + containerID + " UNHEALTHY. ",
ioe);
}
// in any case, the in memory state of the container should be unhealthy
Preconditions.checkArgument(
container.getContainerData().getState() == State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}
if (cmdType == Type.CreateContainer
&& result == Result.SUCCESS && dispatcherContext != null) {
Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
}
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.SUCCESS, null);
} else {
//TODO HDDS-7096:
// This is a too general place for on demand scanning.
// Create a specific exception that signals for on demand scanning
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
OnDemandContainerDataScanner.scanContainer(container);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
perf.appendOpLatencyNanos(opLatencyNs);
performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs);
return responseProto;
} else {
// log failure
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception("UNSUPPORTED_REQUEST"));
return unsupportedRequest(msg);
}
}