packages/logstash/data_stream/plugins/agent/stream/cel.yml.hbs (202 lines of code) (raw):

config_version: "2" interval: {{period}} resource.url: "{{url}}/_node" {{#if resource_ssl}} resource.ssl: {{resource_ssl}} {{/if}} {{#if username}} auth.basic.user: {{escape_string username}} {{/if}} {{#if password}} auth.basic.password: {{escape_string password}} {{/if}} {{#if condition}} condition: {{ condition }} {{/if}} redact: fields: ~ program: | get(state.url + "/stats?graph=true&vertices=true").as(resp, bytes(resp.Body).decode_json().as(body, body.pipelines.map(pipeline_name, pipeline_name != ".monitoring-logstash", body.pipelines[pipeline_name].with({ "name":pipeline_name, "pipeline_source_map": get(state.url + "/pipelines/" + pipeline_name + "?graph=true&vertices=true").as(resp, bytes(resp.Body).decode_json().as(pipes, has(pipes.pipeline) ? pipes.pipelines.map(pipeline_name, has(pipes.pipelines) && has(pipes.pipelines[pipeline_name].graph) && pipes.pipelines != null && pipes.pipelines[pipeline_name].graph.graph.vertices != null, pipes.pipelines[pipeline_name].graph.graph.vertices.map(vertex, vertex.type == "plugin", { "plugin_id": vertex.id, "source": vertex.meta.source, }) ).drop("graph").flatten() : [] ) ), "es_cluster_id": has(body.pipelines[pipeline_name].vertices) ? body.pipelines[pipeline_name].vertices.map(vertex, has(vertex.cluster_uuid), vertex.cluster_uuid) : [], "es_cluster_id_map": has(body.pipelines[pipeline_name].vertices) ? body.pipelines[pipeline_name].vertices.map(vertex, has(vertex.cluster_uuid), { "plugin_id": vertex.id, "cluster_id": vertex.cluster_uuid, }) : [], "counter_map": has(body.pipelines[pipeline_name].vertices) ? body.pipelines[pipeline_name].vertices.map(vertex, has(vertex.long_counters), vertex.long_counters.map(counter, { "plugin_id": vertex.id, "name": counter.name, "value": counter.value })) : [], "outputs": body.pipelines[pipeline_name].plugins.outputs, "inputs": body.pipelines[pipeline_name].plugins.inputs, "filters": body.pipelines[pipeline_name].plugins.filters, "codecs": body.pipelines[pipeline_name].plugins.codecs, "host":{ "name": body.name, "address": body.http_address, } })))).as(events, events.map(event, { "inputs": event.inputs.map(input, has(event.hash), { "name": event.name, "id": event.hash, "host": event.host, "elasticsearch.cluster.id": event.es_cluster_id, "plugin": { "type": "input", "input": { "source":event.pipeline_source_map.map(tuple, (tuple.plugin_id == input.id), tuple.source).flatten().as(source, (source.size() != 0) ? source[0] : ""), "elasticsearch.cluster.id": event.es_cluster_id_map.map(tuple, tuple.plugin_id == input.id, tuple.cluster_id), "metrics": { input.name: event.counter_map.flatten().filter(tuple, tuple.plugin_id == input.id).as(counter_map, zip( counter_map.map(tuple, tuple.name), counter_map.map(tuple, tuple.value) )) }, "name": input.name, "id": input.id, "flow": has(input.flow) ? input.flow : {}, "events": { "out": input.events.out, }, "time": { "queue_push_duration": { "ms": input.events.queue_push_duration_in_millis } } } } }.drop_empty()), "codecs": event.codecs.map(codec, has(event.hash), { "name": event.name, "id": event.hash, "host": event.host, "elasticsearch.cluster.id": event.es_cluster_id, "plugin": { "type": "codec", "codec": { "id":codec.id, "name":codec.name, "flow": has(codec.flow) ? codec.flow : {}, "decode": has(codec.decode) ? { "duration":{ "ms":codec.decode.duration_in_millis }, "in":codec.decode.writes_in, "out":codec.decode.out, } : {}, "encode": has(codec.encode) ? { "in":codec.encode.writes_in, "duration":{ "ms":codec.encode.duration_in_millis } } : {} } } }.drop_empty()), "filters": event.filters.map(filter, has(event.hash), { "name": event.name, "id": event.hash, "host": event.host, "elasticsearch.cluster.id": event.es_cluster_id, "plugin": { "type": "filter", "filter": { "source":event.pipeline_source_map.map(tuple, (tuple.plugin_id == filter.id), tuple.source).flatten().as(source, (source.size() != 0) ? source[0] : ""), "id": filter.id, "name": filter.name, "elasticsearch.cluster.id": event.es_cluster_id_map.map(tuple, tuple.plugin_id == filter.id, tuple.cluster_id), "metrics": { filter.name: event.counter_map.flatten().filter(tuple, tuple.plugin_id == filter.id).as(counter_map, zip( counter_map.map(tuple, tuple.name), counter_map.map(tuple, tuple.value) )) }, "flow": has(filter.flow) ? filter.flow : {}, "events": { "in": filter.events['in'], "out": filter.events.out, }, "time": { "duration": { "ms": filter.events.duration_in_millis } } } } }.drop_empty()), "outputs": event.outputs.map(output, has(event.hash), { "name": event.name, "id": event.hash, "host": event.host, "elasticsearch.cluster.id": event.es_cluster_id, "plugin": { "type": "output", "output": { "id": output.id, "name": output.name, "source":event.pipeline_source_map.map(tuple, (tuple.plugin_id == output.id), tuple.source).flatten().as(source, (source.size() != 0) ? source[0] : ""), "elasticsearch.cluster.id": event.es_cluster_id_map.map(tuple, tuple.plugin_id == output.id, tuple.cluster_id), "metrics": { output.name: event.counter_map.flatten().filter(tuple, tuple.plugin_id == output.id).as(counter_map, zip( counter_map.map(tuple, tuple.name), counter_map.map(tuple, tuple.value) )) }, "flow": has(output.flow) ? output.flow : {}, "events":{ "in":output.events['in'], "out":output.events.out, }, "time":{ "duration":{ "ms":output.events.duration_in_millis } } } } }.drop_empty()) }).collate(["filters", "outputs", "inputs", "codecs"])).as(plugins, { "events": plugins.map(plugin, { "logstash":{"pipeline":plugin} }) })