private SourceRecord createRecordWithHeaders()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [246:275]


  private SourceRecord createRecordWithHeaders(
      Map<String, String> messageAttributes,
      Map<String, String> ack,
      String key,
      String orderingKey,
      byte[] messageBytes,
      Long timestamp) {
    ConnectHeaders headers = new ConnectHeaders();
    for (Entry<String, String> attribute :
        messageAttributes.entrySet()) {
      if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
        headers.addString(attribute.getKey(), attribute.getValue());
      }
    }
    if (makeOrderingKeyAttribute && orderingKey != null && !orderingKey.isEmpty()) {
      headers.addString(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE, orderingKey);
    }

    return new SourceRecord(
        null,
        ack,
        kafkaTopic,
        selectPartition(key, messageBytes, orderingKey),
        Schema.OPTIONAL_STRING_SCHEMA,
        key,
        Schema.BYTES_SCHEMA,
        messageBytes,
        timestamp,
        headers);
  }