public List poll()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [185:244]


  public List<SourceRecord> poll() throws InterruptedException {
    log.debug("Polling...");
    try {
      List<ReceivedMessage> response = subscriber.pull().get();
      List<SourceRecord> sourceRecords = new ArrayList<>();
      log.trace("Received " + response.size() + " messages");
      for (ReceivedMessage rm : response) {
        PubsubMessage message = rm.getMessage();
        String ackId = rm.getAckId();
        Map<String, String> messageAttributes = message.getAttributesMap();
        String key;
        String orderingKey = message.getOrderingKey();
        if (ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE.equals(kafkaMessageKeyAttribute)) {
          key = orderingKey;
        } else {
          key = messageAttributes.get(kafkaMessageKeyAttribute);
        }
        Long timestamp = getLongValue(messageAttributes.get(kafkaMessageTimestampAttribute));
        if (timestamp == null) {
          timestamp = Timestamps.toMillis(message.getPublishTime());
        }
        ByteString messageData = message.getData();
        byte[] messageBytes = messageData.toByteArray();

        boolean hasCustomAttributes = !standardAttributes.containsAll(messageAttributes.keySet())
            || (makeOrderingKeyAttribute && orderingKey != null && !orderingKey.isEmpty());

        Map<String, String> ack = Collections.singletonMap(cpsSubscription.toString(), ackId);
        SourceRecord record = null;
        if (hasCustomAttributes) {
          if (useKafkaHeaders) {
            record =
                createRecordWithHeaders(
                    messageAttributes, ack, key, orderingKey, messageBytes, timestamp);
          } else {
            record =
                createRecordWithStruct(
                    messageAttributes, ack, key, orderingKey, messageBytes, timestamp);
          }
        } else {
          record =
              new SourceRecord(
                  null,
                  ack,
                  kafkaTopic,
                  selectPartition(key, messageBytes, orderingKey),
                  Schema.OPTIONAL_STRING_SCHEMA,
                  key,
                  Schema.BYTES_SCHEMA,
                  messageBytes,
                  timestamp);
        }
        sourceRecords.add(record);
      }
      return sourceRecords;
    } catch (Exception e) {
      log.info("Error while retrieving records, treating as an empty poll. " + e);
      return new ArrayList<>();
    }
  }