in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java [679:972]
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings,
final String storageId,
final String[] targetStorageIds) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
allowLazyPersist = allowLazyPersist &&
(dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
long size = 0;
// reply to upstream datanode or client
final DataOutputStream replyOut = getBufferedOutputStream();
int nst = targetStorageTypes.length;
StorageType[] storageTypes = new StorageType[nst + 1];
storageTypes[0] = storageType;
if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
}
// To support older clients, we don't pass in empty storageIds
final int nsi = targetStorageIds.length;
final String[] storageIds;
if (nsi > 0) {
storageIds = new String[nsi + 1];
storageIds[0] = storageId;
if (targetStorageTypes.length > 0) {
System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
}
} else {
storageIds = new String[0];
}
checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
BlockTokenIdentifier.AccessMode.WRITE,
storageTypes, storageIds);
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets "
+ Arrays.asList(targets));
}
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage={}, clientname={}\n " +
"block ={}, newGs={}, bytesRcvd=[{}, {}]\n " +
"targets={}; pipelineSize={}, srcDataNode={}, pinning={}",
stage, clientname, block, latestGenerationStamp, minBytesRcvd,
maxBytesRcvd, Arrays.asList(targets), pipelineSize, srcDataNode,
pinning);
LOG.debug("isDatanode={}, isClient={}, isTransfer={}",
isDatanode, isClient, isTransfer);
LOG.debug("writeBlock receive buf size {} tcp no delay {}",
peer.getReceiveBufferSize(), peer.getTcpNoDelay());
}
// We later mutate block's generation stamp and length, but we need to
// forward the original version of the block to downstream mirrors, so
// make a copy here.
final ExtendedBlock originalBlock = new ExtendedBlock(block);
if (block.getNumBytes() == 0) {
block.setNumBytes(dataXceiverServer.estimateBlockSize);
}
LOG.info("Receiving {} src: {} dest: {}",
block, remoteAddress, localAddress);
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
final String storageUuid;
final boolean isOnTransientStorage;
try {
final Replica replica;
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning, storageId));
replica = blockReceiver.getReplica();
} else {
replica = datanode.data.recoverClose(
block, latestGenerationStamp, minBytesRcvd);
}
storageUuid = replica.getStorageUuid();
isOnTransientStorage = replica.isOnTransientStorage();
//
// Connect to downstream machine, if appropriate
//
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
LOG.debug("Connecting to datanode {}", mirrorNode);
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
DataNodeFaultInjector.get().failMirrorConnection();
int timeoutValue = dnConf.socketTimeout +
(HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setKeepAlive(true);
if (dnConf.getTransferSocketSendBufferSize() > 0) {
mirrorSock.setSendBufferSize(
dnConf.getTransferSocketSendBufferSize());
}
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
SecretKey secretKey = null;
if (dnConf.overwriteDownstreamDerivedQOP) {
String bpid = block.getBlockPoolId();
BlockKey blockKey = datanode.blockPoolTokenSecretManager
.get(bpid).getCurrentKey();
secretKey = blockKey.getKey();
}
IOStreamPair saslStreams = datanode.saslClient.socketSend(
mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
blockToken, targets[0], secretKey);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
smallBufferSize));
mirrorIn = new DataInputStream(unbufMirrorIn);
String targetStorageId = null;
if (targetStorageIds.length > 0) {
// Older clients may not have provided any targetStorageIds
targetStorageId = targetStorageIds[0];
}
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes,
srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, targetPinnings[0], targetPinnings,
targetStorageId, targetStorageIds);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes,
srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, false, targetPinnings,
targetStorageId, targetStorageIds);
}
mirrorOut.flush();
DataNodeFaultInjector.get().writeBlockAfterFlush();
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (mirrorInStatus != SUCCESS) {
LOG.debug("Datanode {} got response for connect" +
"ack from downstream datanode with firstbadlink as {}",
targets.length, firstBadLink);
}
}
} catch (IOException e) {
if (isClient) {
BlockOpResponseProto.newBuilder()
.setStatus(ERROR)
// NB: Unconditionally using the xfer addr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
LOG.error("{}:Exception transferring block {} to mirror {}",
datanode, block, mirrorNode, e);
throw e;
} else {
LOG.info("{}:Exception transferring {} to mirror {}- continuing " +
"without the mirror", datanode, block, mirrorNode, e);
incrDatanodeNetworkErrors();
}
}
}
// send connect-ack to source for clients and not transfer-RBW/Finalized
if (isClient && !isTransfer) {
if (mirrorInStatus != SUCCESS) {
LOG.debug("Datanode {} forwarding connect ack to upstream " +
"firstbadlink is {}", targets.length, firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// receive the block and mirror to the next target
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
dataXceiverServer.getWriteThrottler(), targets, false);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
LOG.trace("TRANSFER: send close-ack");
writeResponse(SUCCESS, null, replyOut);
}
}
// update its generation stamp
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
// if this write is for a replication request or recovering
// a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
LOG.info("Received {} src: {} dest: {} volume: {} of size {}",
block, remoteAddress, localAddress, replica.getVolume(),
block.getNumBytes());
}
if(isClient) {
size = block.getNumBytes();
}
} catch (IOException ioe) {
LOG.info("opWriteBlock {} received exception {}",
block, ioe.toString());
incrDatanodeNetworkErrors();
throw ioe;
} finally {
// close all opened streams
IOUtils.closeStream(mirrorOut);
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
if (blockReceiver != null) {
blockReceiver.releaseAnyRemainingReservedSpace();
}
IOUtils.closeStream(blockReceiver);
setCurrentBlockReceiver(null);
}
//update metrics
datanode.getMetrics().addWriteBlockOp(elapsed());
datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
}