public void writeBlock()

in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java [679:972]


  public void writeBlock(final ExtendedBlock block,
      final StorageType storageType, 
      final Token<BlockTokenIdentifier> blockToken,
      final String clientname,
      final DatanodeInfo[] targets,
      final StorageType[] targetStorageTypes,
      final DatanodeInfo srcDataNode,
      final BlockConstructionStage stage,
      final int pipelineSize,
      final long minBytesRcvd,
      final long maxBytesRcvd,
      final long latestGenerationStamp,
      DataChecksum requestedChecksum,
      CachingStrategy cachingStrategy,
      boolean allowLazyPersist,
      final boolean pinning,
      final boolean[] targetPinnings,
      final String storageId,
      final String[] targetStorageIds) throws IOException {
    previousOpClientName = clientname;
    updateCurrentThreadName("Receiving block " + block);
    final boolean isDatanode = clientname.length() == 0;
    final boolean isClient = !isDatanode;
    final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
        || stage == BlockConstructionStage.TRANSFER_FINALIZED;
    allowLazyPersist = allowLazyPersist &&
        (dnConf.getAllowNonLocalLazyPersist() || peer.isLocal());
    long size = 0;
    // reply to upstream datanode or client 
    final DataOutputStream replyOut = getBufferedOutputStream();

    int nst = targetStorageTypes.length;
    StorageType[] storageTypes = new StorageType[nst + 1];
    storageTypes[0] = storageType;
    if (targetStorageTypes.length > 0) {
      System.arraycopy(targetStorageTypes, 0, storageTypes, 1, nst);
    }

    // To support older clients, we don't pass in empty storageIds
    final int nsi = targetStorageIds.length;
    final String[] storageIds;
    if (nsi > 0) {
      storageIds = new String[nsi + 1];
      storageIds[0] = storageId;
      if (targetStorageTypes.length > 0) {
        System.arraycopy(targetStorageIds, 0, storageIds, 1, nsi);
      }
    } else {
      storageIds = new String[0];
    }
    checkAccess(replyOut, isClient, block, blockToken, Op.WRITE_BLOCK,
        BlockTokenIdentifier.AccessMode.WRITE,
        storageTypes, storageIds);

    // check single target for transfer-RBW/Finalized
    if (isTransfer && targets.length > 0) {
      throw new IOException(stage + " does not support multiple targets "
          + Arrays.asList(targets));
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("opWriteBlock: stage={}, clientname={}\n  " +
              "block  ={}, newGs={}, bytesRcvd=[{}, {}]\n  " +
              "targets={}; pipelineSize={}, srcDataNode={}, pinning={}",
          stage, clientname, block, latestGenerationStamp, minBytesRcvd,
          maxBytesRcvd, Arrays.asList(targets), pipelineSize, srcDataNode,
          pinning);
      LOG.debug("isDatanode={}, isClient={}, isTransfer={}",
          isDatanode, isClient, isTransfer);
      LOG.debug("writeBlock receive buf size {} tcp no delay {}",
          peer.getReceiveBufferSize(), peer.getTcpNoDelay());
    }

    // We later mutate block's generation stamp and length, but we need to
    // forward the original version of the block to downstream mirrors, so
    // make a copy here.
    final ExtendedBlock originalBlock = new ExtendedBlock(block);
    if (block.getNumBytes() == 0) {
      block.setNumBytes(dataXceiverServer.estimateBlockSize);
    }
    LOG.info("Receiving {} src: {} dest: {}",
        block, remoteAddress, localAddress);

    DataOutputStream mirrorOut = null;  // stream to next target
    DataInputStream mirrorIn = null;    // reply from next target
    Socket mirrorSock = null;           // socket to next target
    String mirrorNode = null;           // the name:port of next target
    String firstBadLink = "";           // first datanode that failed in connection setup
    Status mirrorInStatus = SUCCESS;
    final String storageUuid;
    final boolean isOnTransientStorage;
    try {
      final Replica replica;
      if (isDatanode || 
          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        // open a block receiver
        setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
            peer.getRemoteAddressString(),
            peer.getLocalAddressString(),
            stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
            clientname, srcDataNode, datanode, requestedChecksum,
            cachingStrategy, allowLazyPersist, pinning, storageId));
        replica = blockReceiver.getReplica();
      } else {
        replica = datanode.data.recoverClose(
            block, latestGenerationStamp, minBytesRcvd);
      }
      storageUuid = replica.getStorageUuid();
      isOnTransientStorage = replica.isOnTransientStorage();

      //
      // Connect to downstream machine, if appropriate
      //
      if (targets.length > 0) {
        InetSocketAddress mirrorTarget = null;
        // Connect to backup machine
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        LOG.debug("Connecting to datanode {}", mirrorNode);
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        mirrorSock = datanode.newSocket();
        try {

          DataNodeFaultInjector.get().failMirrorConnection();

          int timeoutValue = dnConf.socketTimeout +
              (HdfsConstants.READ_TIMEOUT_EXTENSION * targets.length);
          int writeTimeout = dnConf.socketWriteTimeout +
              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
          NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
          mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay());
          mirrorSock.setSoTimeout(timeoutValue);
          mirrorSock.setKeepAlive(true);
          if (dnConf.getTransferSocketSendBufferSize() > 0) {
            mirrorSock.setSendBufferSize(
                dnConf.getTransferSocketSendBufferSize());
          }

          OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
              writeTimeout);
          InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
          DataEncryptionKeyFactory keyFactory =
            datanode.getDataEncryptionKeyFactoryForBlock(block);
          SecretKey secretKey = null;
          if (dnConf.overwriteDownstreamDerivedQOP) {
            String bpid = block.getBlockPoolId();
            BlockKey blockKey = datanode.blockPoolTokenSecretManager
                .get(bpid).getCurrentKey();
            secretKey = blockKey.getKey();
          }
          IOStreamPair saslStreams = datanode.saslClient.socketSend(
              mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
              blockToken, targets[0], secretKey);
          unbufMirrorOut = saslStreams.out;
          unbufMirrorIn = saslStreams.in;
          mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
              smallBufferSize));
          mirrorIn = new DataInputStream(unbufMirrorIn);

          String targetStorageId = null;
          if (targetStorageIds.length > 0) {
            // Older clients may not have provided any targetStorageIds
            targetStorageId = targetStorageIds[0];
          }
          if (targetPinnings != null && targetPinnings.length > 0) {
            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
                blockToken, clientname, targets, targetStorageTypes,
                srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
                latestGenerationStamp, requestedChecksum, cachingStrategy,
                allowLazyPersist, targetPinnings[0], targetPinnings,
                targetStorageId, targetStorageIds);
          } else {
            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
                blockToken, clientname, targets, targetStorageTypes,
                srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
                latestGenerationStamp, requestedChecksum, cachingStrategy,
                allowLazyPersist, false, targetPinnings,
                targetStorageId, targetStorageIds);
          }

          mirrorOut.flush();

          DataNodeFaultInjector.get().writeBlockAfterFlush();

          // read connect ack (only for clients, not for replication req)
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (mirrorInStatus != SUCCESS) {
              LOG.debug("Datanode {} got response for connect" +
                  "ack  from downstream datanode with firstbadlink as {}",
                  targets.length, firstBadLink);
            }
          }

        } catch (IOException e) {
          if (isClient) {
            BlockOpResponseProto.newBuilder()
              .setStatus(ERROR)
               // NB: Unconditionally using the xfer addr w/o hostname
              .setFirstBadLink(targets[0].getXferAddr())
              .build()
              .writeDelimitedTo(replyOut);
            replyOut.flush();
          }
          IOUtils.closeStream(mirrorOut);
          mirrorOut = null;
          IOUtils.closeStream(mirrorIn);
          mirrorIn = null;
          IOUtils.closeSocket(mirrorSock);
          mirrorSock = null;
          if (isClient) {
            LOG.error("{}:Exception transferring block {} to mirror {}",
                datanode, block, mirrorNode, e);
            throw e;
          } else {
            LOG.info("{}:Exception transferring {} to mirror {}- continuing " +
                "without the mirror", datanode, block, mirrorNode, e);
            incrDatanodeNetworkErrors();
          }
        }
      }

      // send connect-ack to source for clients and not transfer-RBW/Finalized
      if (isClient && !isTransfer) {
        if (mirrorInStatus != SUCCESS) {
          LOG.debug("Datanode {} forwarding connect ack to upstream " +
              "firstbadlink is {}", targets.length, firstBadLink);
        }
        BlockOpResponseProto.newBuilder()
          .setStatus(mirrorInStatus)
          .setFirstBadLink(firstBadLink)
          .build()
          .writeDelimitedTo(replyOut);
        replyOut.flush();
      }

      // receive the block and mirror to the next target
      if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr,
            dataXceiverServer.getWriteThrottler(), targets, false);

        // send close-ack for transfer-RBW/Finalized 
        if (isTransfer) {
          LOG.trace("TRANSFER: send close-ack");
          writeResponse(SUCCESS, null, replyOut);
        }
      }

      // update its generation stamp
      if (isClient && 
          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        block.setGenerationStamp(latestGenerationStamp);
        block.setNumBytes(minBytesRcvd);
      }
      
      // if this write is for a replication request or recovering
      // a failed close for client, then confirm block. For other client-writes,
      // the block is finalized in the PacketResponder.
      if (isDatanode ||
          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        datanode.closeBlock(block, null, storageUuid, isOnTransientStorage);
        LOG.info("Received {} src: {} dest: {} volume: {} of size {}",
            block, remoteAddress, localAddress, replica.getVolume(),
            block.getNumBytes());
      }

      if(isClient) {
        size = block.getNumBytes();
      }
    } catch (IOException ioe) {
      LOG.info("opWriteBlock {} received exception {}",
          block, ioe.toString());
      incrDatanodeNetworkErrors();
      throw ioe;
    } finally {
      // close all opened streams
      IOUtils.closeStream(mirrorOut);
      IOUtils.closeStream(mirrorIn);
      IOUtils.closeStream(replyOut);
      IOUtils.closeSocket(mirrorSock);
      if (blockReceiver != null) {
        blockReceiver.releaseAnyRemainingReservedSpace();
      }
      IOUtils.closeStream(blockReceiver);
      setCurrentBlockReceiver(null);
    }

    //update metrics
    datanode.getMetrics().addWriteBlockOp(elapsed());
    datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
  }