in clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java [157:312]
public static Supplier<RequestManagers> supplier(final Time time,
final LogContext logContext,
final BackgroundEventHandler backgroundEventHandler,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final FetchBuffer fetchBuffer,
final ConsumerConfig config,
final GroupRebalanceConfig groupRebalanceConfig,
final ApiVersions apiVersions,
final FetchMetricsManager fetchMetricsManager,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final Optional<ClientTelemetryReporter> clientTelemetryReporter,
final Metrics metrics,
final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
final MemberStateListener applicationThreadMemberStateListener,
final Optional<StreamsRebalanceData> streamsRebalanceData
) {
return new CachedSupplier<>() {
@Override
protected RequestManagers create() {
final NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get();
final FetchConfig fetchConfig = new FetchConfig(config);
long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
final int requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
final int defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
final FetchRequestManager fetch = new FetchRequestManager(logContext,
time,
metadata,
subscriptions,
fetchConfig,
fetchBuffer,
fetchMetricsManager,
networkClientDelegate,
apiVersions);
final TopicMetadataRequestManager topic = new TopicMetadataRequestManager(
logContext,
time,
config);
ConsumerHeartbeatRequestManager heartbeatRequestManager = null;
ConsumerMembershipManager membershipManager = null;
CoordinatorRequestManager coordinator = null;
CommitRequestManager commitRequestManager = null;
StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = null;
StreamsMembershipManager streamsMembershipManager = null;
if (groupRebalanceConfig != null && groupRebalanceConfig.groupId != null) {
Optional<String> serverAssignor = Optional.ofNullable(config.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
coordinator = new CoordinatorRequestManager(
logContext,
retryBackoffMs,
retryBackoffMaxMs,
groupRebalanceConfig.groupId);
commitRequestManager = new CommitRequestManager(
time,
logContext,
subscriptions,
config,
coordinator,
offsetCommitCallbackInvoker,
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
metrics,
metadata);
if (streamsRebalanceData.isPresent()) {
streamsMembershipManager = new StreamsMembershipManager(
groupRebalanceConfig.groupId,
streamsRebalanceData.get(),
subscriptions,
backgroundEventHandler,
logContext,
time,
metrics);
streamsMembershipManager.registerStateListener(commitRequestManager);
streamsMembershipManager.registerStateListener(applicationThreadMemberStateListener);
if (clientTelemetryReporter.isPresent()) {
clientTelemetryReporter.get()
.updateMetricsLabels(Map.of(ClientTelemetryProvider.GROUP_MEMBER_ID, streamsMembershipManager.memberId()));
}
streamsGroupHeartbeatRequestManager = new StreamsGroupHeartbeatRequestManager(
logContext,
time,
config,
coordinator,
streamsMembershipManager,
backgroundEventHandler,
metrics,
streamsRebalanceData.get()
);
} else {
membershipManager = new ConsumerMembershipManager(
groupRebalanceConfig.groupId,
groupRebalanceConfig.groupInstanceId,
groupRebalanceConfig.rebalanceTimeoutMs,
serverAssignor,
subscriptions,
commitRequestManager,
metadata,
logContext,
backgroundEventHandler,
time,
metrics,
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
// Update the group member ID label in the client telemetry reporter.
// According to KIP-1082, the consumer will generate the member ID as the incarnation ID of the process.
// Therefore, we can update the group member ID during initialization.
if (clientTelemetryReporter.isPresent()) {
clientTelemetryReporter.get()
.updateMetricsLabels(Map.of(ClientTelemetryProvider.GROUP_MEMBER_ID, membershipManager.memberId()));
}
membershipManager.registerStateListener(commitRequestManager);
membershipManager.registerStateListener(applicationThreadMemberStateListener);
heartbeatRequestManager = new ConsumerHeartbeatRequestManager(
logContext,
time,
config,
coordinator,
subscriptions,
membershipManager,
backgroundEventHandler,
metrics);
}
}
final OffsetsRequestManager listOffsets = new OffsetsRequestManager(subscriptions,
metadata,
fetchConfig.isolationLevel,
time,
retryBackoffMs,
requestTimeoutMs,
defaultApiTimeoutMs,
apiVersions,
networkClientDelegate,
commitRequestManager,
logContext);
return new RequestManagers(
logContext,
listOffsets,
topic,
fetch,
Optional.ofNullable(coordinator),
Optional.ofNullable(commitRequestManager),
Optional.ofNullable(heartbeatRequestManager),
Optional.ofNullable(membershipManager),
Optional.ofNullable(streamsGroupHeartbeatRequestManager),
Optional.ofNullable(streamsMembershipManager)
);
}
};
}