packages/logstash/data_stream/plugins/agent/stream/cel.yml.hbs (202 lines of code) (raw):
config_version: "2"
interval: {{period}}
resource.url: "{{url}}/_node"
{{#if resource_ssl}}
resource.ssl:
{{resource_ssl}}
{{/if}}
{{#if username}}
auth.basic.user: {{escape_string username}}
{{/if}}
{{#if password}}
auth.basic.password: {{escape_string password}}
{{/if}}
{{#if condition}}
condition: {{ condition }}
{{/if}}
redact:
fields: ~
program: |
get(state.url + "/stats?graph=true&vertices=true").as(resp, bytes(resp.Body).decode_json().as(body,
body.pipelines.map(pipeline_name, pipeline_name != ".monitoring-logstash", body.pipelines[pipeline_name].with({
"name":pipeline_name,
"pipeline_source_map":
get(state.url + "/pipelines/" + pipeline_name + "?graph=true&vertices=true").as(resp,
bytes(resp.Body).decode_json().as(pipes,
has(pipes.pipeline) ?
pipes.pipelines.map(pipeline_name,
has(pipes.pipelines) && has(pipes.pipelines[pipeline_name].graph) && pipes.pipelines != null && pipes.pipelines[pipeline_name].graph.graph.vertices != null,
pipes.pipelines[pipeline_name].graph.graph.vertices.map(vertex, vertex.type == "plugin", {
"plugin_id": vertex.id,
"source": vertex.meta.source,
})
).drop("graph").flatten()
:
[]
)
),
"es_cluster_id": has(body.pipelines[pipeline_name].vertices) ?
body.pipelines[pipeline_name].vertices.map(vertex, has(vertex.cluster_uuid), vertex.cluster_uuid)
:
[],
"es_cluster_id_map": has(body.pipelines[pipeline_name].vertices) ?
body.pipelines[pipeline_name].vertices.map(vertex, has(vertex.cluster_uuid), {
"plugin_id": vertex.id,
"cluster_id": vertex.cluster_uuid,
})
:
[],
"counter_map": has(body.pipelines[pipeline_name].vertices) ?
body.pipelines[pipeline_name].vertices.map(vertex, has(vertex.long_counters), vertex.long_counters.map(counter, {
"plugin_id": vertex.id,
"name": counter.name,
"value": counter.value
}))
:
[],
"outputs": body.pipelines[pipeline_name].plugins.outputs,
"inputs": body.pipelines[pipeline_name].plugins.inputs,
"filters": body.pipelines[pipeline_name].plugins.filters,
"codecs": body.pipelines[pipeline_name].plugins.codecs,
"host":{
"name": body.name,
"address": body.http_address,
}
})))).as(events, events.map(event, {
"inputs": event.inputs.map(input, has(event.hash), {
"name": event.name,
"id": event.hash,
"host": event.host,
"elasticsearch.cluster.id": event.es_cluster_id,
"plugin": {
"type": "input",
"input": {
"source":event.pipeline_source_map.map(tuple, (tuple.plugin_id == input.id), tuple.source).flatten().as(source, (source.size() != 0) ? source[0] : ""),
"elasticsearch.cluster.id": event.es_cluster_id_map.map(tuple, tuple.plugin_id == input.id, tuple.cluster_id),
"metrics": {
input.name: event.counter_map.flatten().filter(tuple, tuple.plugin_id == input.id).as(counter_map, zip(
counter_map.map(tuple, tuple.name),
counter_map.map(tuple, tuple.value)
))
},
"name": input.name,
"id": input.id,
"flow": has(input.flow) ?
input.flow
:
{},
"events": {
"out": input.events.out,
},
"time": {
"queue_push_duration": {
"ms": input.events.queue_push_duration_in_millis
}
}
}
}
}.drop_empty()),
"codecs": event.codecs.map(codec, has(event.hash), {
"name": event.name,
"id": event.hash,
"host": event.host,
"elasticsearch.cluster.id": event.es_cluster_id,
"plugin": {
"type": "codec",
"codec": {
"id":codec.id,
"name":codec.name,
"flow": has(codec.flow) ? codec.flow : {},
"decode": has(codec.decode) ?
{
"duration":{
"ms":codec.decode.duration_in_millis
},
"in":codec.decode.writes_in,
"out":codec.decode.out,
} :
{},
"encode": has(codec.encode) ?
{
"in":codec.encode.writes_in,
"duration":{
"ms":codec.encode.duration_in_millis
}
} :
{}
}
}
}.drop_empty()),
"filters": event.filters.map(filter, has(event.hash), {
"name": event.name,
"id": event.hash,
"host": event.host,
"elasticsearch.cluster.id": event.es_cluster_id,
"plugin": {
"type": "filter",
"filter": {
"source":event.pipeline_source_map.map(tuple, (tuple.plugin_id == filter.id), tuple.source).flatten().as(source, (source.size() != 0) ? source[0] : ""),
"id": filter.id,
"name": filter.name,
"elasticsearch.cluster.id": event.es_cluster_id_map.map(tuple, tuple.plugin_id == filter.id, tuple.cluster_id),
"metrics": {
filter.name: event.counter_map.flatten().filter(tuple, tuple.plugin_id == filter.id).as(counter_map, zip(
counter_map.map(tuple, tuple.name),
counter_map.map(tuple, tuple.value)
))
},
"flow": has(filter.flow) ?
filter.flow
:
{},
"events": {
"in": filter.events['in'],
"out": filter.events.out,
},
"time": {
"duration": {
"ms": filter.events.duration_in_millis
}
}
}
}
}.drop_empty()),
"outputs": event.outputs.map(output, has(event.hash), {
"name": event.name,
"id": event.hash,
"host": event.host,
"elasticsearch.cluster.id": event.es_cluster_id,
"plugin": {
"type": "output",
"output": {
"id": output.id,
"name": output.name,
"source":event.pipeline_source_map.map(tuple, (tuple.plugin_id == output.id), tuple.source).flatten().as(source, (source.size() != 0) ? source[0] : ""),
"elasticsearch.cluster.id": event.es_cluster_id_map.map(tuple, tuple.plugin_id == output.id, tuple.cluster_id),
"metrics": {
output.name: event.counter_map.flatten().filter(tuple, tuple.plugin_id == output.id).as(counter_map, zip(
counter_map.map(tuple, tuple.name),
counter_map.map(tuple, tuple.value)
))
},
"flow": has(output.flow) ?
output.flow
:
{},
"events":{
"in":output.events['in'],
"out":output.events.out,
},
"time":{
"duration":{
"ms":output.events.duration_in_millis
}
}
}
}
}.drop_empty())
}).collate(["filters", "outputs", "inputs", "codecs"])).as(plugins, {
"events": plugins.map(plugin, {
"logstash":{"pipeline":plugin}
})
})