public void put()

in kafka-connector/src/main/java/com/google/pubsublite/kafka/sink/PubSubLiteSinkTask.java [55:92]


  public void put(Collection<SinkRecord> collection) {
    if (publisher.state() != State.RUNNING) {
      if (publisher.state() == State.FAILED) {
        throw new IllegalStateException("Publisher has failed.", publisher.failureCause());
      } else {
        throw new IllegalStateException("Publisher not currently running.");
      }
    }
    for (SinkRecord record : collection) {
      Message.Builder message = Message.builder();
      if (record.key() != null) {
        message.setKey(encodeToBytes(record.keySchema(), record.key()));
      }
      if (record.value() != null) {
        message.setData(encodeToBytes(record.valueSchema(), record.value()));
      }
      ImmutableListMultimap.Builder<String, ByteString> attributes = ImmutableListMultimap
          .builder();
      getRecordHeaders(record).forEach(header -> attributes
          .put(header.key(), Schemas.encodeToBytes(header.schema(), header.value())));
      if (record.topic() != null) {
        attributes.put(Constants.KAFKA_TOPIC_HEADER, ByteString.copyFromUtf8(record.topic()));
      }
      if (record.kafkaPartition() != null) {
        attributes.put(Constants.KAFKA_PARTITION_HEADER,
            ByteString.copyFromUtf8(record.kafkaPartition().toString()));
        attributes.put(Constants.KAFKA_OFFSET_HEADER,
            ByteString.copyFromUtf8(Long.toString(record.kafkaOffset())));
      }
      if (record.timestamp() != null) {
        attributes.put(Constants.KAFKA_EVENT_TIME_TYPE_HEADER,
            ByteString.copyFromUtf8(record.timestampType().name));
        message.setEventTime(Timestamps.fromMillis(record.timestamp()));
      }
      message.setAttributes(attributes.build());
      publisher.publish(message.build());
    }
  }