packages/mimecast/data_stream/cloud_integrated_logs/agent/stream/cel.yml.hbs (177 lines of code) (raw):
config_version: 2
interval: {{interval}}
resource.tracer:
enabled: {{enable_request_tracer}}
filename: "../../logs/cel/http-request-trace-*.ndjson"
maxbackups: 5
resource.url: {{api_url}}
fields_under_root: true
keep_null: true
state:
client_id: {{client_id}}
client_secret: {{client_secret}}
page_size: {{batch_size}}
look_back: {{initial_interval}}
path: /siem/v1/batch/events/ci
start_field: dateRangeStartsAt
end_field: dateRangeEndsAt
{{#if types}}
types:
{{#each types as |t|}}
- {{t}}
{{/each}}
{{/if}}
redact:
fields:
- client_id
- client_secret
- token.access_token
program: |
// This program is shared between cloud_integrated_logs and siem_logs
// If it is changed here changes should be reflected in the other data
// streams. Do not differentiate the logic between these data streams
// lightly; use the state variable for this unless absolutely required.
state.with(
(
(has(state.?token.expires) && now() < timestamp(state.token.expires)) ?
// The token we have is still valid.
state.token
:
// Get a new token.
post_request(state.url.trim_right("/") + "/oauth/token", "application/x-www-form-urlencoded",
{
"client_id": [state.client_id],
"client_secret": [state.client_secret],
"grant_type": ["client_credentials"],
}.format_query()
).do_request().as(auth, auth.StatusCode == 200 ?
bytes(auth.Body).decode_json().as(auth_body, auth_body.with({
// Include 60s grace period to avoid attempting to make
// a request with a stale authentication token.
"expires": now()+duration(string(int(auth_body.expires_in)-60)+"s"),
}))
:
{
"events": {
"error": {
"code": string(auth.StatusCode),
"id": string(auth.Status),
"message": "POST /oauth/token: "+(
size(auth.Body) != 0 ?
string(auth.Body)
:
string(auth.Status) + ' (' + string(auth.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
).as(token, !has(token.access_token) ? token :
state.?cursor.work_list.orValue(state.types.map(t, {"type": t})).as(work_list, size(work_list) == 0 ?
state.types.map(t, {"type": t})
:
work_list
).as(work_list,
get_request(
state.url.trim_right("/") + state.path + "?" + {
"type": [work_list[0].type],
?"nextPage": work_list[0].?next.optMap(next, [next]),
?"dateRangeStartsAt": state.?start.optMap(start, [start.format("2006-01-02")]),
?"dateRangeEndsAt": state.?end.optMap(end, [end.format("2006-01-02")]),
?"pageSize": state.?page_size.optMap(size, [string(int(size))]),
}.format_query()
).with({
"Header": {
"Authorization": ["Bearer " + token.access_token],
"Accept": ["application/json"],
"Content-Type": ["application/json"],
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"events": body.value.map(b, has(b.url),
get(b.url).as(batch, batch.StatusCode == 200 ?
bytes(batch.Body).mime("application/gzip").mime("application/x-ndjson").map(e,
{
"message": dyn(e.encode_json()),
}
)
:
[{
"error": {
"code": string(batch.StatusCode),
"id": string(batch.Status),
"message": "GET " + b.url + ": " + (
size(batch.Body) != 0 ?
string(batch.Body)
:
string(batch.Status) + ' (' + string(batch.StatusCode) + ')'
),
},
}]
)
).flatten(),
"cursor": {
"work_list": (
"@nextPage" in body && size(body.value) != 0 ?
[work_list[0].with({"next": body["@nextPage"]})]
:
[]
) + tail(work_list),
},
"token": {
"access_token": token.access_token,
"expires": token.expires,
},
"want_more": "@nextPage" in body && size(body.value) != 0,
}.as(to_publish, to_publish.with({
"want_more": to_publish.want_more || size(to_publish.cursor.work_list) != 0,
}))
).as(state,
// Check whether we still need to get more, but have
// no event for this type. If we do, populate events
// with a place-holder to be discarded by the ingest
// pipeline.
state.want_more && size(state.events) == 0 ?
state.with({"events": [{"message": "want_more"}]})
:
state
)
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.path + ": " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}