uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/SimpleOutboundMessageLimiter.java [227:239]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    TopicPartition topicPartition =
        new TopicPartition(
            processorMessage.getPhysicalMetadata().getTopic(),
            processorMessage.getPhysicalMetadata().getPartition());
    ScopeAndInflight scopeAndInflight = topicPartitionToScopeAndInflight.get(topicPartition);
    if (scopeAndInflight == null) {
      LOGGER.error(
          "a topic partition has not been assigned to this message limiter",
          StructuredLogging.kafkaTopic(topicPartition.topic()),
          StructuredLogging.kafkaPartition(topicPartition.partition()));
      throw new IllegalStateException(
          String.format(
              "topic-partition %s has not been assigned to this message limiter", topicPartition));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/SimpleOutboundMessageLimiter.java [287:299]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    TopicPartition topicPartition =
        new TopicPartition(
            processorMessage.getPhysicalMetadata().getTopic(),
            processorMessage.getPhysicalMetadata().getPartition());
    ScopeAndInflight scopeAndInflight = topicPartitionToScopeAndInflight.get(topicPartition);
    if (scopeAndInflight == null) {
      LOGGER.error(
          "a topic partition has not been assigned to this message limiter",
          StructuredLogging.kafkaTopic(topicPartition.topic()),
          StructuredLogging.kafkaPartition(topicPartition.partition()));
      throw new IllegalStateException(
          String.format(
              "topic-partition %s has not been assigned to this message limiter", topicPartition));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



