in flume-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchLogStashEventSerializer.java [90:134]
private void appendHeaders(XContentBuilder builder, Event event)
throws IOException {
Map<String, String> headers = Maps.newHashMap(event.getHeaders());
String timestamp = headers.get("timestamp");
if (!StringUtils.isBlank(timestamp)
&& StringUtils.isBlank(headers.get("@timestamp"))) {
long timestampMs = Long.parseLong(timestamp);
builder.field("@timestamp", new Date(timestampMs));
}
String source = headers.get("source");
if (!StringUtils.isBlank(source)
&& StringUtils.isBlank(headers.get("@source"))) {
ContentBuilderUtil.appendField(builder, "@source",
source.getBytes(charset));
}
String type = headers.get("type");
if (!StringUtils.isBlank(type)
&& StringUtils.isBlank(headers.get("@type"))) {
ContentBuilderUtil.appendField(builder, "@type", type.getBytes(charset));
}
String host = headers.get("host");
if (!StringUtils.isBlank(host)
&& StringUtils.isBlank(headers.get("@source_host"))) {
ContentBuilderUtil.appendField(builder, "@source_host",
host.getBytes(charset));
}
String srcPath = headers.get("src_path");
if (!StringUtils.isBlank(srcPath)
&& StringUtils.isBlank(headers.get("@source_path"))) {
ContentBuilderUtil.appendField(builder, "@source_path",
srcPath.getBytes(charset));
}
builder.startObject("@fields");
for (String key : headers.keySet()) {
byte[] val = headers.get(key).getBytes(charset);
ContentBuilderUtil.appendField(builder, key, val);
}
builder.endObject();
}