def workerHttpPort: Int = get()

in common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala [810:947]


  def workerHttpPort: Int = get(WORKER_HTTP_PORT)
  def workerHttpMaxWorkerThreads: Int = get(WORKER_HTTP_MAX_WORKER_THREADS)
  def workerHttpStopTimeout: Long = get(WORKER_HTTP_STOP_TIMEOUT)
  def workerHttpIdleTimeout: Long = get(WORKER_HTTP_IDLE_TIMEOUT)
  def workerRpcPort: Int = get(WORKER_RPC_PORT)
  def workerPushPort: Int = get(WORKER_PUSH_PORT)
  def workerFetchPort: Int = get(WORKER_FETCH_PORT)
  def workerReplicatePort: Int = get(WORKER_REPLICATE_PORT)
  def workerPushIoThreads: Option[Int] = get(WORKER_PUSH_IO_THREADS)
  def workerFetchIoThreads: Option[Int] = get(WORKER_FETCH_IO_THREADS)
  def workerReplicateIoThreads: Option[Int] = get(WORKER_REPLICATE_IO_THREADS)
  def registerWorkerTimeout: Long = get(WORKER_REGISTER_TIMEOUT)
  def workerWorkingDir: String = get(WORKER_WORKING_DIR)
  def workerCloseIdleConnections: Boolean = get(WORKER_CLOSE_IDLE_CONNECTIONS)
  def workerReplicateFastFailDuration: Long = get(WORKER_REPLICATE_FAST_FAIL_DURATION)
  def workerReplicateRandomConnectionEnabled: Boolean =
    get(WORKER_REPLICATE_RANDOM_CONNECTION_ENABLED)
  def workerCheckFileCleanMaxRetries: Int = get(WORKER_CHECK_FILE_CLEAN_MAX_RETRIES)
  def workerCheckFileCleanTimeout: Long = get(WORKER_CHECK_FILE_CLEAN_TIMEOUT)
  def workerHeartbeatTimeout: Long = get(WORKER_HEARTBEAT_TIMEOUT)
  def workerUnavailableInfoExpireTimeout: Long = get(WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT)
  def allowWorkerHostPattern: Option[Regex] = get(ALLOW_WORKER_HOST_PATTERN)
  def denyWorkerHostPattern: Option[Regex] = get(DENY_WORKER_HOST_PATTERN)

  def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
  def workerCommitThreads: Int =
    if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
  def workerCommitFilesCheckInterval: Long = get(WORKER_COMMIT_FILES_CHECK_INTERVAL)
  def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
  def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
  def maxPartitionSizeToEstimate: Long =
    get(ESTIMATED_PARTITION_SIZE_MAX_SIZE).getOrElse(partitionSplitMaximumSize * 2)
  def minPartitionSizeToEstimate: Long = get(ESTIMATED_PARTITION_SIZE_MIN_SIZE)
  def workerPartitionSorterSortPartitionTimeout: Long = get(WORKER_PARTITION_SORTER_SORT_TIMEOUT)
  def workerPartitionSorterPrefetchEnabled: Boolean =
    get(WORKER_PARTITION_SORTER_PREFETCH_ENABLED)
  def workerPartitionSorterShuffleBlockCompactionFactor: Double =
    get(WORKER_SHUFFLE_BLOCK_COMPACTION_FACTOR)
  def workerPartitionSorterReservedMemoryPerPartition: Long =
    get(WORKER_PARTITION_SORTER_RESERVED_MEMORY_PER_PARTITION)
  def workerPartitionSorterThreads: Int =
    get(WORKER_PARTITION_SORTER_THREADS).getOrElse(Runtime.getRuntime.availableProcessors)
  def workerPartitionSorterIndexCacheMaxWeight: Long =
    get(WORKER_PARTITION_SORTER_INDEX_CACHE_MAX_WEIGHT)
  def workerPartitionSorterIndexExpire: Long = get(WORKER_PARTITION_SORTER_INDEX_CACHE_EXPIRE)
  def workerPushHeartbeatEnabled: Boolean = get(WORKER_PUSH_HEARTBEAT_ENABLED)
  def workerPushMaxComponents: Int = get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
  def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
  def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
  def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)
  def workerJvmProfilerEnabled: Boolean = get(WORKER_JVM_PROFILER_ENABLED)
  def workerJvmProfilerOptions: String = get(WORKER_JVM_PROFILER_OPTIONS)
  def workerJvmProfilerLocalDir: String = get(WORKER_JVM_PROFILER_LOCAL_DIR)
  def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
  def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
  def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
  def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED)
  def workerJvmQuakeDumpPath: String = get(WORKER_JVM_QUAKE_DUMP_PATH)

  def workerJvmQuakeDumpThreshold: Duration =
    getTimeAsMs(
      WORKER_JVM_QUAKE_DUMP_THRESHOLD.key,
      WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond
  def workerJvmQuakeKillThreshold: Duration =
    getTimeAsMs(
      WORKER_JVM_QUAKE_KILL_THRESHOLD.key,
      WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
  def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)

  // //////////////////////////////////////////////////////
  //                 Metrics System                      //
  // //////////////////////////////////////////////////////
  def metricsConf: Option[String] = get(METRICS_CONF)
  def metricsSystemEnable: Boolean = get(METRICS_ENABLED)
  def metricsSampleRate: Double = get(METRICS_SAMPLE_RATE)
  def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE)
  def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED)
  def metricsCapacity: Int = get(METRICS_CAPACITY)
  def metricsExtraLabels: Map[String, String] =
    get(METRICS_EXTRA_LABELS).map(Utils.parseMetricLabels).toMap
  def metricsWorkerAppTopResourceConsumptionCount: Int =
    get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT)
  def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long =
    get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_BYTES_WRITTEN_THRESHOLD)
  def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
    get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
  def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)

  // //////////////////////////////////////////////////////
  //                      Quota                         //
  // //////////////////////////////////////////////////////
  def quotaEnabled: Boolean = get(QUOTA_ENABLED)
  def clusterQuotaEnabled: Boolean = get(CLUSTER_QUOTA_ENABLED)
  def tenantQuotaEnabled: Boolean = get(TENANT_QUOTA_ENABLED)
  def userQuotaEnabled: Boolean = get(USER_QUOTA_ENABLED)
  def quotaInterruptShuffleEnabled: Boolean = get(QUOTA_INTERRUPT_SHUFFLE_ENABLED)

  // //////////////////////////////////////////////////////
  //                      Identity                       //
  // //////////////////////////////////////////////////////
  def identityProviderClass: String = get(IDENTITY_PROVIDER)
  def userSpecificTenant: String = get(USER_SPECIFIC_TENANT)
  def userSpecificUserName: String = get(USER_SPECIFIC_USERNAME)

  // //////////////////////////////////////////////////////
  //                      Client                         //
  // //////////////////////////////////////////////////////
  def clientCloseIdleConnections: Boolean = get(CLIENT_CLOSE_IDLE_CONNECTIONS)
  def clientRegisterShuffleMaxRetry: Int = get(CLIENT_REGISTER_SHUFFLE_MAX_RETRIES)
  def clientRegisterShuffleRetryWaitMs: Long = get(CLIENT_REGISTER_SHUFFLE_RETRY_WAIT)
  def clientReserveSlotsRackAwareEnabled: Boolean = get(CLIENT_RESERVE_SLOTS_RACKAWARE_ENABLED)
  def clientReserveSlotsMaxRetries: Int = get(CLIENT_RESERVE_SLOTS_MAX_RETRIES)
  def clientReserveSlotsRetryWait: Long = get(CLIENT_RESERVE_SLOTS_RETRY_WAIT)
  def clientRequestCommitFilesMaxRetries: Int = get(CLIENT_COMMIT_FILE_REQUEST_MAX_RETRY)
  def clientCommitFilesIgnoreExcludedWorkers: Boolean = get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
  def clientShuffleDynamicResourceEnabled: Boolean =
    get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED)
  def clientShuffleDynamicResourceFactor: Double = get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR)
  def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
  def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
  def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
  def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
  def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED)

  def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
  def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
  def clientExcludeReplicaOnFailureEnabled: Boolean =
    get(CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED)
  def clientMrMaxPushData: Long = get(CLIENT_MR_PUSH_DATA_MAX)
  def clientApplicationUUIDSuffixEnabled: Boolean = get(CLIENT_APPLICATION_UUID_SUFFIX_ENABLED)

  def appUniqueIdWithUUIDSuffix(appId: String): String = {
    if (clientApplicationUUIDSuffixEnabled) {
      appId + "-" + UUID.randomUUID().toString.replaceAll("-", "")
    } else {
      appId
    }
  }