public void run()

in hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java [696:877]


  public void run() {
    TraceScope scope = null;
    while (!streamerClosed && dfsClient.clientRunning) {
      // if the Responder encountered an error, shutdown Responder
      if (errorState.hasError()) {
        closeResponder();
      }

      DFSPacket one;
      try {
        // process datanode IO errors if any
        boolean doSleep = processDatanodeOrExternalError();

        synchronized (dataQueue) {
          // wait for a packet to be sent.
          while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
            long timeout = 1000;
            if (stage == BlockConstructionStage.DATA_STREAMING) {
              timeout = sendHeartbeat();
            }
            try {
              dataQueue.wait(timeout);
            } catch (InterruptedException  e) {
              LOG.debug("Thread interrupted", e);
            }
            doSleep = false;
          }
          if (shouldStop()) {
            continue;
          }
          // get packet to be sent.
          one = dataQueue.getFirst(); // regular data packet
          SpanContext[] parents = one.getTraceParents();
          if (parents != null && parents.length > 0) {
            // The original code stored multiple parents in the DFSPacket, and
            // use them ALL here when creating a new Span. We only use the
            // last one FOR NOW. Moreover, we don't activate the Span for now.
            scope = dfsClient.getTracer().
                newScope("dataStreamer", parents[0], false);
            //scope.getSpan().setParents(parents);
          }
        }

        // The DataStreamer has to release the dataQueue before sleeping,
        // otherwise it will cause the ResponseProcessor to accept the ACK delay.
        try {
          backOffIfNecessary();
        } catch (InterruptedException e) {
          LOG.debug("Thread interrupted", e);
        }

        // get new block from namenode.
        LOG.debug("stage={}, {}", stage, this);

        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
          LOG.debug("Allocating new block: {}", this);
          setupPipelineForCreate();
          initDataStreaming();
        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
          LOG.debug("Append to block {}", block);
          setupPipelineForAppendOrRecovery();
          if (streamerClosed) {
            continue;
          }
          initDataStreaming();
        }

        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
        if (lastByteOffsetInBlock > stat.getBlockSize()) {
          throw new IOException("BlockSize " + stat.getBlockSize() +
              " < lastByteOffsetInBlock, " + this + ", " + one);
        }

        if (one.isLastPacketInBlock()) {
          // wait for all data packets have been successfully acked
          waitForAllAcks();
          if(shouldStop()) {
            continue;
          }
          stage = BlockConstructionStage.PIPELINE_CLOSE;
        }

        // send the packet
        SpanContext spanContext = null;
        synchronized (dataQueue) {
          // move packet from dataQueue to ackQueue
          if (!one.isHeartbeatPacket()) {
            if (scope != null) {
              one.setSpan(scope.span());
              spanContext = scope.span().getContext();
              scope.close();
            }
            scope = null;
            dataQueue.removeFirst();
            ackQueue.addLast(one);
            packetSendTime.put(one.getSeqno(), Time.monotonicNowNanos());
            dataQueue.notifyAll();
          }
        }

        LOG.debug("{} sending {}", this, one);

        // write out data to remote datanode
        try (TraceScope ignored = dfsClient.getTracer().
            newScope("DataStreamer#writeTo", spanContext)) {
          sendPacket(one);
        } catch (IOException e) {
          // HDFS-3398 treat primary DN is down since client is unable to
          // write to primary DN. If a failed or restarting node has already
          // been recorded by the responder, the following call will have no
          // effect. Pipeline recovery can handle only one node error at a
          // time. If the primary node fails again during the recovery, it
          // will be taken out then.
          errorState.markFirstNodeIfNotMarked();
          throw e;
        }

        // update bytesSent
        long tmpBytesSent = one.getLastByteOffsetBlock();
        if (bytesSent < tmpBytesSent) {
          bytesSent = tmpBytesSent;
        }

        if (shouldStop()) {
          continue;
        }

        // Is this block full?
        if (one.isLastPacketInBlock()) {
          // wait for the close packet has been acked
          try {
            waitForAllAcks();
          } catch (IOException ioe) {
            // No need to do a close recovery if the last packet was acked.
            // i.e. ackQueue is empty.  waitForAllAcks() can get an exception
            // (e.g. connection reset) while sending a heartbeat packet,
            // if the DN sends the final ack and closes the connection.
            synchronized (dataQueue) {
              if (!ackQueue.isEmpty()) {
                throw ioe;
              }
            }
          }
          if (shouldStop()) {
            continue;
          }

          endBlock();
        }
        if (progress != null) { progress.progress(); }

        // This is used by unit test to trigger race conditions.
        if (artificialSlowdown != 0 && dfsClient.clientRunning) {
          Thread.sleep(artificialSlowdown);
        }
      } catch (Throwable e) {
        // Log warning if there was a real error.
        if (!errorState.isRestartingNode()) {
          // Since their messages are descriptive enough, do not always
          // log a verbose stack-trace WARN for quota exceptions.
          if (e instanceof QuotaExceededException) {
            LOG.debug("DataStreamer Quota Exception", e);
          } else {
            LOG.warn("DataStreamer Exception", e);
          }
        }
        lastException.set(e);
        assert !(e instanceof NullPointerException);
        errorState.setInternalError();
        if (!errorState.isNodeMarked()) {
          // Not a datanode issue
          streamerClosed = true;
        }
      } finally {
        if (scope != null) {
          scope.close();
          scope = null;
        }
      }
    }
    closeInternal();
  }