in client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java [178:343]
public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
LOG.info(
"Uniffle {} version: {}", this.getClass().getName(), Constants.VERSION_AND_REVISION_SHORT);
this.sparkConf = conf;
checkSupported(sparkConf);
boolean supportsRelocation =
Optional.ofNullable(SparkEnv.get())
.map(env -> env.serializer().supportsRelocationOfSerializedObjects())
.orElse(true);
if (!supportsRelocation) {
LOG.warn(
"RSSShuffleManager requires a serializer which supports relocations of serialized object. Please set "
+ "spark.serializer to org.apache.spark.serializer.KryoSerializer instead");
}
this.user = sparkConf.get("spark.rss.quota.user", "user");
this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
// fetch client conf and apply them if necessary
if (isDriver && this.dynamicConfEnabled) {
fetchAndApplyDynamicConf(sparkConf);
}
RssSparkShuffleUtils.validateRssClientConf(sparkConf);
// convert spark conf to rss conf after fetching dynamic client conf
this.rssConf = RssSparkConfig.toRssConf(sparkConf);
RssUtils.setExtraJavaProperties(rssConf);
// set & check replica config
this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
this.dataReplicaWrite = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
this.dataReplicaRead = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ);
this.dataReplicaSkipEnabled = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED);
LOG.info(
"Check quorum config ["
+ dataReplica
+ ":"
+ dataReplicaWrite
+ ":"
+ dataReplicaRead
+ ":"
+ dataReplicaSkipEnabled
+ "]");
RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite, dataReplicaRead);
this.maxConcurrencyPerPartitionToWrite = rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
// configure block id layout
this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
this.speculation = sparkConf.getBoolean("spark.speculation", false);
// configureBlockIdLayout requires maxFailures and speculation to be initialized
configureBlockIdLayout(sparkConf, rssConf);
this.blockIdLayout = BlockIdLayout.from(rssConf);
this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
this.dataCommitPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
// External shuffle service is not supported when using remote shuffle service
sparkConf.set("spark.shuffle.service.enabled", "false");
sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "false");
sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
LOG.info("Disable external shuffle service in RssShuffleManager.");
sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false");
LOG.info("Disable local shuffle reader in RssShuffleManager.");
// If we store shuffle data in distributed filesystem or in a disaggregated
// shuffle cluster, we don't need shuffle data locality
sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
LOG.info("Disable shuffle data locality in RssShuffleManager.");
taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
this.shuffleIdToPartitionNum = JavaUtils.newConcurrentMap();
this.shuffleIdToNumMapTasks = JavaUtils.newConcurrentMap();
// stage retry for write/fetch failure
rssStageRetryForFetchFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED);
rssStageRetryForWriteFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
if (rssStageRetryForFetchFailureEnabled || rssStageRetryForWriteFailureEnabled) {
rssStageRetryEnabled = true;
List<String> logTips = new ArrayList<>();
if (rssStageRetryForWriteFailureEnabled) {
logTips.add("write");
}
if (rssStageRetryForWriteFailureEnabled) {
logTips.add("fetch");
}
LOG.info(
"Activate the stage retry mechanism that will resubmit stage on {} failure",
StringUtils.join(logTips, "/"));
}
this.partitionReassignEnabled = rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
// The feature of partition reassign is exclusive with multiple replicas and stage retry.
if (partitionReassignEnabled) {
if (dataReplica > 1) {
throw new RssException(
"The feature of task partition reassign is incompatible with multiple replicas mechanism.");
}
this.partitionSplitMode = rssConf.get(RssClientConf.RSS_CLIENT_PARTITION_SPLIT_MODE);
this.partitionSplitLoadBalanceServerNum =
rssConf.get(RssClientConf.RSS_CLIENT_PARTITION_SPLIT_LOAD_BALANCE_SERVER_NUMBER);
LOG.info("Partition reassign is enabled.");
}
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
partitionReassignEnabled
|| rssStageRetryEnabled
|| blockIdSelfManagedEnabled
|| isSparkUIEnabled(conf);
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
if (shuffleManagerRpcServiceEnabled) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
rssConf.set(RPC_SERVER_PORT, 0);
ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(this, rssConf);
service = factory.getService();
shuffleManagerServer = factory.getServer(service);
try {
shuffleManagerServer.start();
// pass this as a spark.rss.shuffle.manager.grpc.port config, so it can be propagated to
// executor properly.
sparkConf.set(
RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT, shuffleManagerServer.getPort());
} catch (Exception e) {
LOG.error("Failed to start shuffle manager server", e);
throw new RssException(e);
}
}
}
if (shuffleManagerRpcServiceEnabled) {
getOrCreateShuffleManagerClientSupplier();
}
// Start heartbeat thread.
this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
this.heartbeatTimeout =
sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
this.shuffleWriteClient = createShuffleWriteClient();
registerCoordinator();
LOG.info("Rss data pusher is starting...");
int poolSize = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
this.dataPusher =
new DataPusher(
shuffleWriteClient,
taskToSuccessBlockIds,
taskToFailedBlockSendTracker,
failedTaskIds,
poolSize,
keepAliveTime);
this.partitionReassignMaxServerNum =
rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
this.rssStageResubmitManager = new RssStageResubmitManager();
}