in client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java [75:146]
public ShuffleReadClientImpl(ShuffleClientFactory.ReadClientBuilder builder) {
// add default value
if (builder.getShuffleDataDistributionType() == null) {
builder.shuffleDataDistributionType(ShuffleDataDistributionType.NORMAL);
}
if (builder.getHadoopConf() == null) {
builder.hadoopConf(new Configuration());
}
if (builder.getRssConf() != null
&&
// if rssConf contains only block id config, consider this as test mode as well
!builder
.getRssConf()
.getKeySet()
.equals(
Sets.newHashSet(
RssClientConf.BLOCKID_SEQUENCE_NO_BITS.key(),
RssClientConf.BLOCKID_PARTITION_ID_BITS.key(),
RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key()))) {
final int indexReadLimit = builder.getRssConf().get(RssClientConf.RSS_INDEX_READ_LIMIT);
final String storageType = builder.getRssConf().get(RssClientConf.RSS_STORAGE_TYPE);
long readBufferSize =
builder
.getRssConf()
.getSizeAsBytes(
RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key(),
RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.defaultValue());
if (readBufferSize > Integer.MAX_VALUE) {
LOG.warn(RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE.key() + " can support 2g as max");
readBufferSize = Integer.MAX_VALUE;
}
boolean offHeapEnabled = builder.getRssConf().get(RssClientConf.OFF_HEAP_MEMORY_ENABLE);
builder.indexReadLimit(indexReadLimit);
builder.storageType(storageType);
builder.readBufferSize(readBufferSize);
builder.offHeapEnable(offHeapEnabled);
if (builder.getClientType() == null) {
builder.clientType(builder.getRssConf().get(RssClientConf.RSS_CLIENT_TYPE));
}
} else {
// most for test
RssConf rssConf = (builder.getRssConf() == null) ? new RssConf() : builder.getRssConf();
rssConf.set(RssClientConf.RSS_STORAGE_TYPE, builder.getStorageType());
rssConf.set(RssClientConf.RSS_INDEX_READ_LIMIT, builder.getIndexReadLimit());
rssConf.set(
RssClientConf.RSS_CLIENT_READ_BUFFER_SIZE, String.valueOf(builder.getReadBufferSize()));
if (!rssConf.contains(RssClientConf.BLOCKID_SEQUENCE_NO_BITS)) {
rssConf.setInteger(
RssClientConf.BLOCKID_SEQUENCE_NO_BITS, BlockIdLayout.DEFAULT.sequenceNoBits);
}
if (!rssConf.contains(RssClientConf.BLOCKID_PARTITION_ID_BITS)) {
rssConf.setInteger(
RssClientConf.BLOCKID_PARTITION_ID_BITS, BlockIdLayout.DEFAULT.partitionIdBits);
}
if (!rssConf.contains(RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS)) {
rssConf.setInteger(
RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS, BlockIdLayout.DEFAULT.taskAttemptIdBits);
}
builder.rssConf(rssConf);
builder.offHeapEnable(false);
builder.expectedTaskIdsBitmapFilterEnable(false);
if (builder.getClientType() == null) {
builder.clientType(rssConf.get(RssClientConf.RSS_CLIENT_TYPE));
}
}
if (builder.getIdHelper() == null) {
builder.idHelper(new DefaultIdHelper(BlockIdLayout.from(builder.getRssConf())));
}
init(builder);
}