private SourceRecord createRecordWithStruct()

in kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java [277:322]


  private SourceRecord createRecordWithStruct(
      Map<String, String> messageAttributes,
      Map<String, String> ack,
      String key,
      String orderingKey,
      byte[] messageBytes,
      Long timestamp) {
    SchemaBuilder valueSchemaBuilder =
        SchemaBuilder.struct()
            .field(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD, Schema.BYTES_SCHEMA);

    for (Entry<String, String> attribute :
        messageAttributes.entrySet()) {
      if (!attribute.getKey().equals(kafkaMessageKeyAttribute)) {
        valueSchemaBuilder.field(attribute.getKey(),
            Schema.STRING_SCHEMA);
      }
    }
    if (makeOrderingKeyAttribute && orderingKey != null && !orderingKey.isEmpty()) {
      valueSchemaBuilder.field(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE, Schema.STRING_SCHEMA);
    }

    Schema valueSchema = valueSchemaBuilder.build();
    Struct value =
        new Struct(valueSchema)
            .put(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD,
                messageBytes);
    for (Field field : valueSchema.fields()) {
      if (field.name().equals(ConnectorUtils.CPS_ORDERING_KEY_ATTRIBUTE)) {
        value.put(field.name(), orderingKey);
      } else if (!field.name().equals(
          ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
        value.put(field.name(), messageAttributes.get(field.name()));
      }
    }
    return new SourceRecord(
        null,
        ack,
        kafkaTopic,
        selectPartition(key, value, orderingKey),
        Schema.OPTIONAL_STRING_SCHEMA,
        key,
        valueSchema,
        value,
        timestamp);
  }