in hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java [696:877]
public void run() {
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
synchronized (dataQueue) {
// wait for a packet to be sent.
while ((!shouldStop() && dataQueue.isEmpty()) || doSleep) {
long timeout = 1000;
if (stage == BlockConstructionStage.DATA_STREAMING) {
timeout = sendHeartbeat();
}
try {
dataQueue.wait(timeout);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
doSleep = false;
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
one = dataQueue.getFirst(); // regular data packet
SpanContext[] parents = one.getTraceParents();
if (parents != null && parents.length > 0) {
// The original code stored multiple parents in the DFSPacket, and
// use them ALL here when creating a new Span. We only use the
// last one FOR NOW. Moreover, we don't activate the Span for now.
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0], false);
//scope.getSpan().setParents(parents);
}
}
// The DataStreamer has to release the dataQueue before sleeping,
// otherwise it will cause the ResponseProcessor to accept the ACK delay.
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
// get new block from namenode.
LOG.debug("stage={}, {}", stage, this);
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
setupPipelineForCreate();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " + this + ", " + one);
}
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
waitForAllAcks();
if(shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
SpanContext spanContext = null;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
one.setSpan(scope.span());
spanContext = scope.span().getContext();
scope.close();
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNowNanos());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending {}", this, one);
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanContext)) {
sendPacket(one);
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
errorState.markFirstNodeIfNotMarked();
throw e;
}
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (shouldStop()) {
continue;
}
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
try {
waitForAllAcks();
} catch (IOException ioe) {
// No need to do a close recovery if the last packet was acked.
// i.e. ackQueue is empty. waitForAllAcks() can get an exception
// (e.g. connection reset) while sending a heartbeat packet,
// if the DN sends the final ack and closes the connection.
synchronized (dataQueue) {
if (!ackQueue.isEmpty()) {
throw ioe;
}
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (!errorState.isRestartingNode()) {
// Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) {
LOG.debug("DataStreamer Quota Exception", e);
} else {
LOG.warn("DataStreamer Exception", e);
}
}
lastException.set(e);
assert !(e instanceof NullPointerException);
errorState.setInternalError();
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
if (scope != null) {
scope.close();
scope = null;
}
}
}
closeInternal();
}