packages/checkpoint_harmony_endpoint/data_stream/threatextraction/agent/stream/cel.yml.hbs (375 lines of code) (raw):

config_version: 3 resource.rate_limit.limit: {{resource_rate_limit_limit}} resource.rate_limit.burst: {{resource_rate_limit_burst}} {{#if enable_request_tracer}} resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson" resource.tracer.maxbackups: 5 {{/if}} resource.url: {{base_url}} interval: {{interval}} state: auth_client_id: {{client_id}} auth_access_key: {{access_key}} initial_interval: {{initial_interval}} limit: {{limit}} page_limit: {{page_limit}} filter: {{filter}} program: | ( state.?cursor.auth_data.expires.optMap(t, t.parse_time(time_layout.RFC1123) - now() > duration("5m") ).orValue(false) ? // Current auth data exists - Use it. state.cursor.auth_data : // No current auth data - Use credentials to fetch a new token. request("POST", state.url.trim_right("/") + "/auth/external").with( { "Header": { "Accept": ["application/json"], "Content-Type": ["application/json"], }, "Body": { "clientId": state.auth_client_id, "accessKey": state.auth_access_key, }.encode_json(), } ).do_request().as(resp, resp.StatusCode == 200 ? bytes(resp.Body).decode_json().as(body, { "token": body.data.token, "expires": body.data.expires, } ) : state.with( { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "POST " + state.url.trim_right("/") + "/auth/external: " + ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, } ) ) ).as(v, has(v.?events.error) ? v : v.as(auth_data, (state.?cursor.task_id.orValue(null) == null) ? // No task ID - Submit a query and get its task ID. { "startTime": state.?cursor.next_startTime.orValue( timestamp(now() - duration(state.initial_interval)).format(time_layout.RFC3339) ), "endTime": state.?cursor.next_endTime.orValue(null) == null ? timestamp(now() - duration("1m")).format(time_layout.RFC3339) : state.cursor.next_endTime, }.as(timeframe, request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query").with( { "Header": { "Accept": ["application/json"], "Content-Type": ["application/json"], "Authorization": ["Bearer " + auth_data.token], }, "Body": { "filter": state.filter, "limit": state.limit, "pageLimit": state.page_limit, "cloudService": "Harmony Endpoint", "timeframe": { "startTime": timeframe.startTime, "endTime": timeframe.endTime, }, }.encode_json(), } ).do_request().as(resp, (resp.StatusCode != 200) ? // Any error - We're at the start, so clear everything and retry after interval. state.with( { "events": {"error": {"message": "Error response: " + string(resp.Body)}}, "want_more": false, "cursor": state.cursor.with( { "auth_data": null, "task_id": null, "page_token": null, } ), } ) : // Query submitted - Save the task ID. bytes(resp.Body).decode_json().as(body, state.with( { "events": [{"message": {"event": {"reason": "polling"}}.encode_json()}], "want_more": true, "cursor": { "auth_data": auth_data, "task_id": body.data.taskId, "page_token": null, "current_startTime": timeframe.startTime, "current_endTime": timeframe.endTime, "next_startTime": timeframe.endTime, "next_endTime": null, }, } ) ) ) ) : (state.?cursor.page_token.orValue(null) == null) ? // Task exists with no page token - Check whether it's ready or done. request("GET", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/" + state.cursor.task_id).with( { "Header": { "Accept": ["application/json"], "Content-Type": ["application/json"], "Authorization": ["Bearer " + auth_data.token], }, } ).do_request().as(resp, (resp.StatusCode == 401) ? // 401 Unauthorized - Clear the auth data and retry immediately. state.with( { "events": [{"message": {"event": {"reason": "polling"}}.encode_json()}], "want_more": true, "cursor": state.cursor.with({"auth_data": null}), } ) : (resp.StatusCode == 200) ? bytes(resp.Body).decode_json().as(body, (body.data.state == "Ready") ? // 'Ready' (Results found) - Save the first page token. state.with( { "events": [{"message": {"event": {"reason": "polling"}}.encode_json()}], "want_more": true, "cursor": state.cursor.with( { "auth_data": auth_data, "page_token": body.data.pageTokens[0], } ), } ) : (body.data.state == "Done") ? // 'Done' (Results empty) - Clear the task ID and end the sequence. state.with( { "events": [], "want_more": false, "cursor": state.cursor.with( { "auth_data": auth_data, "task_id": null, "next_startTime": state.cursor.current_startTime, } ), } ) : (body.data.state == "Canceled") ? // 'Canceled' (Error or timeout) - Clear the task ID and reset the sequence for the same timeframe. state.with( { "events": [], "want_more": false, "cursor": state.cursor.with( { "auth_data": auth_data, "task_id": null, "next_startTime": state.cursor.current_startTime, "next_endTime": state.cursor.current_endTime, } ), } ) : // Not ready or done - Keep polling. state.with( { "events": [{"message": {"event": {"reason": "polling"}}.encode_json()}], "want_more": true, "cursor": state.cursor.with( { "auth_data": auth_data, } ), } ) ) : // Clear the task ID, and reset the sequence for the same timeframe. state.with( { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "GET " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query: " + ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "cursor": state.cursor.with( { "auth_data": auth_data, "task_id": null, "next_startTime": state.cursor.current_startTime, "next_endTime": state.cursor.current_endTime, } ), } ) ) : // Task is ready - Use the task ID and page token to retrieve a page of results. request("POST", state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve").with( { "Header": { "Accept": ["application/json"], "Content-Type": ["application/json"], "Authorization": ["Bearer " + auth_data.token], }, "Body": { "taskId": state.cursor.task_id, "pageToken": state.cursor.page_token, }.encode_json(), } ).do_request().as(resp, (resp.StatusCode == 401) ? // 401 Unauthorized - Clear the auth data and retry immediately. state.with( { "events": [{"message": {"event": {"reason": "polling"}}.encode_json()}], "want_more": true, "cursor": state.cursor.with({"auth_data": null}), } ) : (resp.StatusCode == 503) ? // 503 Service Unavailable - Clear the task ID and page token, and end the sequence. state.with( { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "POST " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve: " + ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "cursor": state.cursor.with( { "auth_data": auth_data, "task_id": null, "page_token": null, } ), } ) : (resp.StatusCode == 200) ? bytes(resp.Body).decode_json().as(body, (body.data.nextPageToken != "NULL") ? // Not the last page - Save the next page token and continue. state.with( { "events": body.data.records.map(e, {"message": e.encode_json()}), "want_more": true, "cursor": state.cursor.with( { "auth_data": auth_data, "page_token": body.data.nextPageToken, } ), } ) : // Last page - Clear the task ID and page token, and end the sequence. // Next sequence starts pulling data from the last timestamp received. state.with( { "events": body.data.records.map(e, {"message": e.encode_json()}), "want_more": false, "cursor": state.cursor.with( { "auth_data": auth_data, "task_id": null, "page_token": null, "next_startTime": has(body.?data.records) && body.data.records.size() > 0 ? optional.of(body.data.records.map(t, timestamp(t.time)).max().format(time_layout.RFC3339)) : state.cursor.current_endTime, "next_endTime": null, } ), } ) ) : // Clear the task ID and page token, and reset the sequence for the same timeframe. state.with( { "events": { "error": { "code": string(resp.StatusCode), "id": string(resp.Status), "message": "POST " + state.url.trim_right("/") + "/app/laas-logs-api/api/logs_query/retrieve: " + ( size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + ' (' + string(resp.StatusCode) + ')' ), }, }, "want_more": false, "cursor": state.cursor.with( { "auth_data": auth_data, "task_id": null, "page_token": null, "next_startTime": state.cursor.current_startTime, "next_endTime": state.cursor.current_endTime, } ), } ) ) )) tags: {{#if preserve_original_event}} - preserve_original_event {{/if}} {{#each tags as |tag|}} - {{tag}} {{/each}} {{#contains "forwarded" tags}} publisher_pipeline.disable_host: true {{/contains}} {{#if processors}} processors: {{processors}} {{/if}}