public void open()

in flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java [100:136]


    public void open(Configuration configuration) throws Exception {
        serializationSchema.open(
                RuntimeContextInitializationContextAdapters.serializationAdapter(
                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));

        Publisher.Builder builder =
                Publisher.newBuilder(TopicName.of(projectName, topicName))
                        .setCredentialsProvider(FixedCredentialsProvider.create(credentials));

        // Having the host and port for the emulator means we are in a testing scenario.
        if (hostAndPortForEmulator != null) {
            managedChannel =
                    ManagedChannelBuilder.forTarget(hostAndPortForEmulator)
                            .usePlaintext() // This is 'Ok' because this is ONLY used for testing.
                            .build();
            channel = GrpcTransportChannel.newBuilder().setManagedChannel(managedChannel).build();
            builder.setChannelProvider(FixedTransportChannelProvider.create(channel))
                    .setCredentialsProvider(EmulatorCredentialsProvider.create())
                    // In test scenarios we are limiting the Retry settings.
                    // The values here are based on the default settings with lower attempts and
                    // timeouts.
                    .setRetrySettings(
                            RetrySettings.newBuilder()
                                    .setMaxAttempts(10)
                                    .setTotalTimeout(Duration.ofSeconds(10))
                                    .setInitialRetryDelay(Duration.ofMillis(100))
                                    .setRetryDelayMultiplier(1.3)
                                    .setMaxRetryDelay(Duration.ofSeconds(5))
                                    .setInitialRpcTimeout(Duration.ofSeconds(5))
                                    .setRpcTimeoutMultiplier(1)
                                    .setMaxRpcTimeout(Duration.ofSeconds(10))
                                    .build());
        }

        publisher = builder.build();
        isRunning = true;
    }