packages/checkpoint_harmony_endpoint/data_stream/antimalware/agent/stream/cel.yml.hbs (375 lines of code) (raw):
config_version: 3
resource.rate_limit.limit: {{resource_rate_limit_limit}}
resource.rate_limit.burst: {{resource_rate_limit_burst}}
{{#if enable_request_tracer}}
resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
resource.tracer.maxbackups: 5
{{/if}}
resource.url: {{base_url}}
interval: {{interval}}
state:
auth_client_id: {{client_id}}
auth_access_key: {{access_key}}
initial_interval: {{initial_interval}}
limit: {{limit}}
page_limit: {{page_limit}}
filter: {{filter}}
program: |
(
state.?cursor.auth_data.expires.optMap(t,
t.parse_time(time_layout.RFC1123) - now() > duration("5m")
).orValue(false) ?
// Current auth data exists - Use it.
state.cursor.auth_data
:
// No current auth data - Use credentials to fetch a new token.
request("POST", state.url.trim_right("/") + "/auth/external").with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
},
"Body": {
"clientId": state.auth_client_id,
"accessKey": state.auth_access_key,
}.encode_json(),
}
).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"token": body.data.token,
"expires": body.data.expires,
}
)
:
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/auth/external: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)
).as(v, has(v.?events.error) ?
v
: v.as(auth_data,
(state.?cursor.task_id.orValue(null) == null) ?
// No task ID - Submit a query and get its task ID.
{
"startTime": state.?cursor.next_startTime.orValue(
timestamp(now() - duration(state.initial_interval)).format(time_layout.RFC3339)
),
"endTime": state.?cursor.next_endTime.orValue(null) == null ?
timestamp(now() - duration("1m")).format(time_layout.RFC3339)
:
state.cursor.next_endTime,
}.as(timeframe,
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query").with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + auth_data.token],
},
"Body": {
"filter": state.filter,
"limit": state.limit,
"pageLimit": state.page_limit,
"cloudService": "Harmony Endpoint",
"timeframe": {
"startTime": timeframe.startTime,
"endTime": timeframe.endTime,
},
}.encode_json(),
}
).do_request().as(resp,
(resp.StatusCode != 200) ?
// Any error - We're at the start, so clear everything and retry after interval.
state.with(
{
"events": {"error": {"message": "Error response: " + string(resp.Body)}},
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": null,
"task_id": null,
"page_token": null,
}
),
}
)
:
// Query submitted - Save the task ID.
bytes(resp.Body).decode_json().as(body,
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": {
"auth_data": auth_data,
"task_id": body.data.taskId,
"page_token": null,
"current_startTime": timeframe.startTime,
"current_endTime": timeframe.endTime,
"next_startTime": timeframe.endTime,
"next_endTime": null,
},
}
)
)
)
)
: (state.?cursor.page_token.orValue(null) == null) ?
// Task exists with no page token - Check whether it's ready or done.
request("GET", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/" + state.cursor.task_id).with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + auth_data.token],
},
}
).do_request().as(resp,
(resp.StatusCode == 401) ?
// 401 Unauthorized - Clear the auth data and retry immediately.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with({"auth_data": null}),
}
)
:
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
(body.data.state == "Ready") ?
// 'Ready' (Results found) - Save the first page token.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"page_token": body.data.pageTokens[0],
}
),
}
)
: (body.data.state == "Done") ?
// 'Done' (Results empty) - Clear the task ID and end the sequence.
state.with(
{
"events": [],
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"next_startTime": state.cursor.current_startTime,
}
),
}
)
: (body.data.state == "Canceled") ?
// 'Canceled' (Error or timeout) - Clear the task ID and reset the sequence for the same timeframe.
state.with(
{
"events": [],
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"next_startTime": state.cursor.current_startTime,
"next_endTime": state.cursor.current_endTime,
}
),
}
)
:
// Not ready or done - Keep polling.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
}
),
}
)
)
:
// Clear the task ID, and reset the sequence for the same timeframe.
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"next_startTime": state.cursor.current_startTime,
"next_endTime": state.cursor.current_endTime,
}
),
}
)
)
:
// Task is ready - Use the task ID and page token to retrieve a page of results.
request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve").with(
{
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"],
"Authorization": ["Bearer " + auth_data.token],
},
"Body": {
"taskId": state.cursor.task_id,
"pageToken": state.cursor.page_token,
}.encode_json(),
}
).do_request().as(resp,
(resp.StatusCode == 401) ?
// 401 Unauthorized - Clear the auth data and retry immediately.
state.with(
{
"events": [{"message": {"event": {"reason": "polling"}}.encode_json()}],
"want_more": true,
"cursor": state.cursor.with({"auth_data": null}),
}
)
:
(resp.StatusCode == 503) ?
// 503 Service Unavailable - Clear the task ID and page token, and end the sequence.
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"page_token": null,
}
),
}
)
:
(resp.StatusCode == 200) ?
bytes(resp.Body).decode_json().as(body,
(body.data.nextPageToken != "NULL") ?
// Not the last page - Save the next page token and continue.
state.with(
{
"events": body.data.records.map(e, {"message": e.encode_json()}),
"want_more": true,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"page_token": body.data.nextPageToken,
}
),
}
)
:
// Last page - Clear the task ID and page token, and end the sequence.
// Next sequence starts pulling data from the last timestamp received.
state.with(
{
"events": body.data.records.map(e, {"message": e.encode_json()}),
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"page_token": null,
"next_startTime": has(body.?data.records) && body.data.records.size() > 0 ?
optional.of(body.data.records.map(t, timestamp(t.time)).max().format(time_layout.RFC3339))
:
state.cursor.current_endTime,
"next_endTime": null,
}
),
}
)
)
:
// Clear the task ID and page token, and reset the sequence for the same timeframe.
state.with(
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve: " + (
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
"cursor": state.cursor.with(
{
"auth_data": auth_data,
"task_id": null,
"page_token": null,
"next_startTime": state.cursor.current_startTime,
"next_endTime": state.cursor.current_endTime,
}
),
}
)
)
))
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}