packages/auth0/data_stream/logs/agent/stream/cel.yml.hbs (135 lines of code) (raw):
config_version: 2
interval: {{interval}}
{{#if enable_request_tracer}}
resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
request.tracer.maxbackups: 5
{{/if}}
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{url}}
state:
client_id: {{client_id}}
client_secret: {{client_secret}}
look_back: {{initial_interval}}
want_more: false
take: {{batch_size}}
redact:
fields:
- client_secret
program: |
state.with(
post(state.url.trim_right("/") + "/oauth/token", "application/json", {
"client_id": state.client_id,
"client_secret": state.client_secret,
"audience": state.url.trim_right("/") + "/api/v2/",
"grant_type": "client_credentials",
}.encode_json()).as(auth_resp, auth_resp.StatusCode != 200 ?
{
"events": {
"error": {
"code": string(auth_resp.StatusCode),
"id": string(auth_resp.Status),
"message": "POST:"+(
size(auth_resp.Body) != 0 ?
string(auth_resp.Body)
:
string(auth_resp.Status) + ' (' + string(auth_resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
:
{
"Body": bytes(auth_resp.Body).decode_json(),
}
).as(token, has(token.events) ? token :
get_request(
state.?next.orValue(
has(state.?cursor.next) ?
// Use the cursor next rel link if it exists.
state.cursor.next.parse_url().as(next, next.with({
// The next rel link includes the take parameter which the
// user may have changed, so replace it with the config's
// value.
"RawQuery": next.RawQuery.parse_query().with({
?"take": has(state.take) ?
optional.of([string(state.take)])
:
optional.none(),
}).format_query()
}).format_url())
:
// Otherwise construct a next rel-ish link to look back.
state.url.trim_right("/") + "/api/v2/logs?" + {
?"take": has(state.take) ?
optional.of([string(state.take)])
:
optional.none(),
?"from": has(state.look_back) ?
// Format a relative timestamp into a log ID.
optional.of(["900" + (now-duration(state.look_back)).format("20060102150405") + "000000000000000000000000000000000000000"])
:
optional.none(),
}.format_query()
)
).with({
"Header": {
"Authorization": [token.?Body.token_type.orValue("Bearer") + " " + token.?Body.access_token.orValue("MISSING")],
"Accept": ["application/json"],
}
}).do_request().as(resp, resp.StatusCode != 200 ?
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
:
{
"Body": bytes(resp.Body).decode_json(),
?"next": resp.Header.?Link[0].orValue("").as(next, next.split(";").as(attrs, attrs.exists(attr, attr.contains('rel="next"')) ?
attrs.map(attr, attr.matches("^<https?://"), attr.trim_prefix('<').trim_suffix('>'))[?0]
:
optional.none()
)),
}.as(result, result.with({
"events": result.Body.map(e, {"json": {"log_id": e.log_id, "data": e}}),
"cursor": {
?"next": result.?next,
},
"want_more": has(result.next) && size(result.Body) != 0,
})).drop("Body")
)
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}