private void makeSubscriber()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubGRPCSubscriber.java [87:110]


  private void makeSubscriber() {
    try {
      if (subscriber != null) {
        subscriber.close();
      }
      log.info("Creating subscriber.");
      SubscriberStubSettings subscriberStubSettings =
          SubscriberStubSettings.newBuilder()
              .setTransportChannelProvider(
                  SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                      .setMaxInboundMessageSize(20 << 20) // 20MB
                      .build())
              .setCredentialsProvider(gcpCredentialsProvider)
              .setEndpoint(endpoint)
              .build();
      subscriber = GrpcSubscriberStub.create(subscriberStubSettings);
      // We change the subscriber every 25 - 35 minutes in order to avoid GOAWAY errors.
      nextSubscriberResetTime =
          System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(10 * 60 * 1000)
              + 25 * 60 * 1000;
    } catch (IOException e) {
      throw new RuntimeException("Could not create subscriber stub; no subscribing can occur.", e);
    }
  }