packages/splunk/data_stream/alert/agent/stream/cel.yml.hbs (268 lines of code) (raw):
config_version: 2
interval: {{interval}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 5
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{url}}
state:
user: {{username}}
password: {{password}}
offset: 0
batch_size: {{batch_size}}
initial_interval: {{initial_interval}}
redact:
fields:
- user
- password
program: |
(
has(state.?next.sid) ?
state
:
(
state.?want_more.orValue(false) ?
state
:
state.with({
"start_time": state.?cursor.last_timestamp.orValue((now - duration(state.initial_interval)).format(time_layout.RFC3339)),
"end_time": now.format(time_layout.RFC3339),
})
).as(state,
// To perform search and get search id.
post_request(
state.url.trim_right("/") + "/services/search/v2/jobs",
"application/x-www-form-urlencoded",
{
"output_mode":["json"],
"search":["search index=notable"],
"earliest_time": [state.start_time],
"latest_time": [state.end_time],
}.format_query()
).with({
"Header":{
"Authorization": ["Basic "+base64(state.user+":"+state.password)],
}
}).do_request().as(resp, resp.StatusCode == 201 ?
resp.Body.decode_json().as(body, {
"next": {
"sid": body.sid,
},
"url": state.url,
"user": state.user,
"password": state.password,
"offset": state.offset,
"batch_size": state.batch_size,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/services/search/v2/jobs: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"offset": 0,
"batch_size": state.batch_size,
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
}
)
)
).as(state,
has(state.?error.message) ?
state
: state.?next.dispatchState == optional.of("DONE") || !has(state.?next.sid) ? // If more pages are still needs to be fetched.
{
"batch_size": state.batch_size,
"offset": state.offset,
"next": {
?"dispatchState": state.?next.dispatchState,
?"sid": state.?next.sid,
},
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
}
: // To check the status of the search id if it is completed or not.
request("GET",
state.url.trim_right("/") + "/services/search/v2/jobs/" + string(state.next.sid) + "?output_mode=json"
).with({
"Header":{
"Authorization": ["Basic "+base64(state.user+":"+state.password)],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
resp.Body.decode_json().as(body,{
"next": {
?"dispatchState": body.entry[?0].content.dispatchState,
?"sid": state.?next.sid,
},
"batch_size": state.batch_size,
"offset": state.offset,
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.url.trim_right("/") + "/services/search/v2/jobs/" + string(state.next.sid) + ":" + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"offset": 0,
"batch_size": state.batch_size,
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
}
)
).as(state,
has(state.?events.error) ?
state
: (state.?next.dispatchState == optional.of("DONE")) ? // To fetch the events from the respective search id.
request("GET",
state.url.trim_right("/") + "/services/search/v2/jobs/" + string(state.next.sid) + "/events?"+ {
"output_mode": ["json"],
"count": [string(state.batch_size)],
"offset": [string(state.offset)],
}.format_query()
).with({
"Header":{
"Authorization": ["Basic "+base64(state.user+":"+state.password)],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,{
"events": body.results.map(e,{
"message": e.encode_json(),
}),
"cursor": {
?"last_timestamp": has(body.results) && body.results.size() > 0 ?
(
has(state.?cursor.last_timestamp) && body.results.map(e, e._time).max() < state.cursor.last_timestamp ?
optional.of(state.cursor.last_timestamp)
:
optional.of(body.results.map(e, e._time).max())
)
:
state.?cursor.last_timestamp
},
"next": {
?"sid": body.results.size() == state.batch_size ? optional.of(state.next.sid) : optional.none(),
?"dispatchState": body.results.size() == state.batch_size ? optional.of(state.next.dispatchState) : optional.none(),
},
"batch_size": state.batch_size,
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"offset": body.results.size() == state.batch_size ? int(state.offset) + body.results.size() : 0,
"want_more": body.results.size() == state.batch_size,
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.url.trim_right("/") + "/services/search/v2/jobs/" + state.sid + "/events:"+ (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"offset": 0,
"batch_size": state.batch_size,
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
}
)
:
{
"events": [{"message": "want_more"}],
"want_more": true,
"next": {
?"sid": state.?next.sid,
?"dispatchState": state.?next.dispatchState,
},
"batch_size": state.batch_size,
"offset": state.offset,
"url": state.url,
"user": state.user,
"password": state.password,
"initial_interval": state.initial_interval,
"cursor": {
?"last_timestamp": state.?cursor.last_timestamp,
},
}
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}