in client/src/main/java/org/apache/celeborn/client/write/DataPusher.java [63:141]
public DataPusher(
int shuffleId,
int mapId,
int attemptId,
long taskId,
int numMappers,
int numPartitions,
CelebornConf conf,
ShuffleClient client,
LinkedBlockingQueue<PushTask> pushTasks,
Consumer<Integer> afterPush,
LongAdder[] mapStatusLengths)
throws InterruptedException {
final int pushQueueCapacity = conf.clientPushQueueCapacity();
final int pushBufferMaxSize = conf.clientPushBufferMaxSize();
if (pushTasks == null) {
idleQueue = new LinkedBlockingQueue<>(pushQueueCapacity);
} else {
idleQueue = pushTasks;
}
dataPushQueue =
new DataPushQueue(
conf, this, client, shuffleId, mapId, attemptId, numMappers, numPartitions);
for (int i = idleQueue.size(); i < pushQueueCapacity; i++) {
idleQueue.put(new PushTask(pushBufferMaxSize));
}
this.shuffleId = shuffleId;
this.mapId = mapId;
this.attemptId = attemptId;
this.numMappers = numMappers;
this.numPartitions = numPartitions;
this.client = client;
this.afterPush = afterPush;
this.mapStatusLengths = mapStatusLengths;
pushThread =
new Thread("DataPusher-" + taskId) {
private void reclaimTask(PushTask task) throws InterruptedException {
idleLock.lockInterruptibly();
try {
idleQueue.put(task);
if (idleQueue.remainingCapacity() == 0) {
idleFull.signal();
}
} catch (InterruptedException e) {
logger.error("DataPusher thread interrupted while reclaiming data.");
throw e;
} finally {
idleLock.unlock();
}
}
@Override
public void run() {
while (stillRunning()) {
try {
ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
for (int i = 0; i < tasks.size(); i++) {
PushTask task = tasks.get(i);
pushData(task);
reclaimTask(task);
}
} catch (CelebornIOException e) {
exceptionRef.set(e);
} catch (IOException e) {
exceptionRef.set(new CelebornIOException(e));
} catch (InterruptedException e) {
logger.error("DataPusher push thread interrupted while pushing data.");
break;
}
}
}
};
pushThread.setDaemon(true);
pushThread.start();
}