in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java [100:136]
public void open(Configuration configuration) throws Exception {
serializationSchema.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
Publisher.Builder builder =
Publisher.newBuilder(TopicName.of(projectName, topicName))
.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
// Having the host and port for the emulator means we are in a testing scenario.
if (hostAndPortForEmulator != null) {
managedChannel =
ManagedChannelBuilder.forTarget(hostAndPortForEmulator)
.usePlaintext() // This is 'Ok' because this is ONLY used for testing.
.build();
channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
.setCredentialsProvider(EmulatorCredentialsProvider.create())
// In test scenarios we are limiting the Retry settings.
// The values here are based on the default settings with lower attempts and
// timeouts.
.setRetrySettings(
RetrySettings.newBuilder()
.setMaxAttempts(10)
.setTotalTimeout(Duration.ofSeconds(10))
.setInitialRetryDelay(Duration.ofMillis(100))
.setRetryDelayMultiplier(1.3)
.setMaxRetryDelay(Duration.ofSeconds(5))
.setInitialRpcTimeout(Duration.ofSeconds(5))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build());
}
publisher = builder.build();
isRunning = true;
}