packages/splunk/data_stream/alert/agent/stream/cel.yml.hbs (268 lines of code) (raw):

config_version: 2 interval: {{interval}} resource.tracer: enabled: {{enable_request_tracer}} filename: "../../logs/cel/http-request-trace-*.ndjson" maxbackups: 5 {{#if proxy_url}} resource.proxy_url: {{proxy_url}} {{/if}} {{#if ssl}} resource.ssl: {{ssl}} {{/if}} {{#if http_client_timeout}} resource.timeout: {{http_client_timeout}} {{/if}} resource.url: {{url}} state: user: {{username}} password: {{password}} offset: 0 batch_size: {{batch_size}} initial_interval: {{initial_interval}} redact: fields: - user - password program: | ( has(state.?next.sid) ? state : ( state.?want_more.orValue(false) ? state : state.with({ "start_time": state.?cursor.last_timestamp.orValue((now - duration(state.initial_interval)).format(time_layout.RFC3339)), "end_time": now.format(time_layout.RFC3339), }) ).as(state, // To perform search and get search id. post_request( state.url.trim_right("/") + "/services/search/v2/jobs", "application/x-www-form-urlencoded", { "output_mode":["json"], "search":["search index=notable"], "earliest_time": [state.start_time], "latest_time": [state.end_time], }.format_query() ).with({ "Header":{ "Authorization": ["Basic "+base64(state.user+":"+state.password)], } }).do_request().as(resp, resp.StatusCode == 201 ? resp.Body.decode_json().as(body, { "next": { "sid": body.sid, }, "url": state.url, "user": state.user, "password": state.password, "offset": state.offset, "batch_size": state.batch_size, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "POST " + state.url.trim_right("/") + "/services/search/v2/jobs: " + ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "offset": 0, "batch_size": state.batch_size, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, } ) ) ).as(state, has(state.?error.message) ? state : state.?next.dispatchState == optional.of("DONE") || !has(state.?next.sid) ? // If more pages are still needs to be fetched. { "batch_size": state.batch_size, "offset": state.offset, "next": { ?"dispatchState": state.?next.dispatchState, ?"sid": state.?next.sid, }, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, } : // To check the status of the search id if it is completed or not. request("GET", state.url.trim_right("/") + "/services/search/v2/jobs/" + string(state.next.sid) + "?output_mode=json" ).with({ "Header":{ "Authorization": ["Basic "+base64(state.user+":"+state.password)], } }).do_request().as(resp, resp.StatusCode == 200 ? resp.Body.decode_json().as(body,{ "next": { ?"dispatchState": body.entry[?0].content.dispatchState, ?"sid": state.?next.sid, }, "batch_size": state.batch_size, "offset": state.offset, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + state.url.trim_right("/") + "/services/search/v2/jobs/" + string(state.next.sid) + ":" + ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "offset": 0, "batch_size": state.batch_size, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, } ) ).as(state, has(state.?events.error) ? state : (state.?next.dispatchState == optional.of("DONE")) ? // To fetch the events from the respective search id. request("GET", state.url.trim_right("/") + "/services/search/v2/jobs/" + string(state.next.sid) + "/events?"+ { "output_mode": ["json"], "count": [string(state.batch_size)], "offset": [string(state.offset)], }.format_query() ).with({ "Header":{ "Authorization": ["Basic "+base64(state.user+":"+state.password)], } }).do_request().as(resp, resp.StatusCode == 200 ? bytes(resp.Body).decode_json().as(body,{ "events": body.results.map(e,{ "message": e.encode_json(), }), "cursor": { ?"last_timestamp": has(body.results) && body.results.size() > 0 ? ( has(state.?cursor.last_timestamp) && body.results.map(e, e._time).max() < state.cursor.last_timestamp ? optional.of(state.cursor.last_timestamp) : optional.of(body.results.map(e, e._time).max()) ) : state.?cursor.last_timestamp }, "next": { ?"sid": body.results.size() == state.batch_size ? optional.of(state.next.sid) : optional.none(), ?"dispatchState": body.results.size() == state.batch_size ? optional.of(state.next.dispatchState) : optional.none(), }, "batch_size": state.batch_size, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "offset": body.results.size() == state.batch_size ? int(state.offset) + body.results.size() : 0, "want_more": body.results.size() == state.batch_size, }) : { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + state.url.trim_right("/") + "/services/search/v2/jobs/" + state.sid + "/events:"+ ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "offset": 0, "batch_size": state.batch_size, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, } ) : { "events": [{"message": "want_more"}], "want_more": true, "next": { ?"sid": state.?next.sid, ?"dispatchState": state.?next.dispatchState, }, "batch_size": state.batch_size, "offset": state.offset, "url": state.url, "user": state.user, "password": state.password, "initial_interval": state.initial_interval, "cursor": { ?"last_timestamp": state.?cursor.last_timestamp, }, } ) tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#if preserve_duplicate_custom_fields}} - preserve_duplicate_custom_fields {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}