public static long getTimeout()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/grpc/GrpcUtils.java [28:78]


  public static long getTimeout(
      Job job, GrpcDispatcherConfiguration configuration, long timeoutCount, Scope scope) {
    long rpcTimeoutMs = job.getRpcDispatcherTask().getRpcTimeoutMs();
    if (rpcTimeoutMs < configuration.getMinRpcTimeoutMs()) {
      rpcTimeoutMs = configuration.getMinRpcTimeoutMs();
    } else if (rpcTimeoutMs > configuration.getMaxRpcTimeoutMs()) {
      rpcTimeoutMs = configuration.getMaxRpcTimeoutMs();
    }
    // adjust the rpc timeout value based on dispatchAttempt;
    long copyOfTimeoutCount = timeoutCount;
    while (rpcTimeoutMs < configuration.getMaxRpcTimeoutMs() && copyOfTimeoutCount > 0) {
      rpcTimeoutMs = rpcTimeoutMs << 1;
      if (rpcTimeoutMs >= configuration.getMaxRpcTimeoutMs()) {
        rpcTimeoutMs = configuration.getMaxRpcTimeoutMs();
        break;
      }
      copyOfTimeoutCount--;
    }
    if (timeoutCount > 0) {
      String cluster = job.getKafkaConsumerTask().getCluster();
      String group = job.getKafkaConsumerTask().getConsumerGroup();
      String topic = job.getKafkaConsumerTask().getTopic();
      int partition = job.getKafkaConsumerTask().getPartition();
      String routingKey = RoutingUtils.extractAddress(job.getRpcDispatcherTask().getUri());
      LOGGER.info(
          "using adjusted rpc timeout",
          StructuredLogging.rpcRoutingKey(routingKey),
          StructuredLogging.kafkaCluster(cluster),
          StructuredLogging.kafkaGroup(group),
          StructuredLogging.kafkaTopic(topic),
          StructuredLogging.kafkaPartition(partition),
          StructuredArguments.keyValue("adjusted_rpc_timeout", rpcTimeoutMs),
          StructuredArguments.keyValue("timeout_count", timeoutCount));
      Scope taggedScope =
          scope.tagged(
              ImmutableMap.of(
                  StructuredFields.URI,
                  routingKey,
                  StructuredFields.KAFKA_GROUP,
                  group,
                  StructuredFields.KAFKA_CLUSTER,
                  cluster,
                  StructuredFields.KAFKA_TOPIC,
                  topic,
                  StructuredFields.KAFKA_PARTITION,
                  Integer.toString(partition)));
      taggedScope.gauge(MetricNames.TIMEOUT_COUNT).update(timeoutCount);
      taggedScope.gauge(MetricNames.ADJUSTED_RPC_TIMEOUT).update(rpcTimeoutMs);
    }
    return rpcTimeoutMs;
  }