public void put()

in kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java [168:215]


  public void put(Collection<SinkRecord> sinkRecords) {
    log.debug("Received " + sinkRecords.size() + " messages to send to CPS.");
    for (SinkRecord record : sinkRecords) {
      log.trace("Received record: " + record.toString());
      Map<String, String> attributes = new HashMap<>();
      ByteString value = handleValue(record.valueSchema(), record.value(), attributes);
      String key = null;
      String partition = record.kafkaPartition().toString();
      if (record.key() != null) {
        key = record.key().toString();
        attributes.put(ConnectorUtils.CPS_MESSAGE_KEY_ATTRIBUTE, key);
      }
      if (includeMetadata) {
        attributes.put(ConnectorUtils.KAFKA_TOPIC_ATTRIBUTE, record.topic());
        attributes.put(
            ConnectorUtils.KAFKA_PARTITION_ATTRIBUTE, partition);
        attributes.put(ConnectorUtils.KAFKA_OFFSET_ATTRIBUTE, Long.toString(record.kafkaOffset()));
        if (record.timestamp() != null) {
          attributes.put(ConnectorUtils.KAFKA_TIMESTAMP_ATTRIBUTE, record.timestamp().toString());
        }
      }
      if (includeHeaders) {
        for (Header header : getRecordHeaders(record)) {
          attributes.put(header.key(), header.value().toString());
        }
      }
      if (attributes.size() == 0 && value == null) {
        log.warn("Message received with no value and no attributes. Not publishing message");
        SettableApiFuture<String> nullMessageFuture = SettableApiFuture.<String>create();
        nullMessageFuture.set("No message");
        addPendingMessageFuture(record.topic(), record.kafkaPartition(), nullMessageFuture);
        continue;
      }
      PubsubMessage.Builder builder = PubsubMessage.newBuilder();
      builder.putAllAttributes(attributes);
      if (value != null) {
        builder.setData(value);
      }
      if (orderingKeySource == OrderingKeySource.KEY && key != null && !key.isEmpty()) {
        builder.setOrderingKey(key);
      } else if (orderingKeySource == OrderingKeySource.PARTITION) {
        builder.setOrderingKey(partition);
      }

      PubsubMessage message = builder.build();
      publishMessage(record.topic(), record.kafkaPartition(), message);
    }
  }