private void appendHeaders()

in flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java [90:134]


  private void appendHeaders(XContentBuilder builder, Event event)
      throws IOException {
    Map<String, String> headers = Maps.newHashMap(event.getHeaders());

    String timestamp = headers.get("timestamp");
    if (!StringUtils.isBlank(timestamp)
        && StringUtils.isBlank(headers.get("@timestamp"))) {
      long timestampMs = Long.parseLong(timestamp);
      builder.field("@timestamp", new Date(timestampMs));
    }

    String source = headers.get("source");
    if (!StringUtils.isBlank(source)
        && StringUtils.isBlank(headers.get("@source"))) {
      ContentBuilderUtil.appendField(builder, "@source",
          source.getBytes(charset));
    }

    String type = headers.get("type");
    if (!StringUtils.isBlank(type)
        && StringUtils.isBlank(headers.get("@type"))) {
      ContentBuilderUtil.appendField(builder, "@type", type.getBytes(charset));
    }

    String host = headers.get("host");
    if (!StringUtils.isBlank(host)
        && StringUtils.isBlank(headers.get("@source_host"))) {
      ContentBuilderUtil.appendField(builder, "@source_host",
          host.getBytes(charset));
    }

    String srcPath = headers.get("src_path");
    if (!StringUtils.isBlank(srcPath)
        && StringUtils.isBlank(headers.get("@source_path"))) {
      ContentBuilderUtil.appendField(builder, "@source_path",
          srcPath.getBytes(charset));
    }

    builder.startObject("@fields");
    for (String key : headers.keySet()) {
      byte[] val = headers.get(key).getBytes(charset);
      ContentBuilderUtil.appendField(builder, key, val);
    }
    builder.endObject();
  }