packages/ess_billing/data_stream/billing/agent/stream/cel.yml.hbs (135 lines of code) (raw):
config_version: 2
interval: "24h"
{{#if enable_request_tracer}}
resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
resource.tracer.maxbackups: 5
{{/if}}
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: "{{url}}/api/v2/billing/organizations/{{organization_id}}/costs/instances"
state:
api_key: {{api_key}}
organization_id: "{{organization_id}}"
lookbehind: {{lookbehind}}
redact:
fields:
- api_key
program: |
state.with({
"from": (has(state.?cursor.last_to) ?
timestamp(state.cursor.last_to)
:
(now() - duration(string(int(state.lookbehind)*24) + "h"))
).format(time_layout.DateOnly).parse_time(time_layout.DateOnly),
"to": (has(state.?cursor.last_to) ?
(timestamp(state.cursor.last_to) + duration("24h"))
:
(now() - duration(string(int(state.lookbehind)*24) + "h") + duration("24h"))
).format(time_layout.DateOnly).parse_time(time_layout.DateOnly),
}.as(req, (req.to > now()) ?
// We would fetch data in the future, back off
{
"events": [],
"cursor": {
// We don't change the last_to, we're just waiting
"last_to": req.to
},
"want_more": false,
}
:
get_request(state.url + "?" +
{
"from": [req.from.format(time_layout.RFC3339)],
"to": [req.to.format(time_layout.RFC3339)],
}.format_query()
).with({
"Header": {
"Authorization": ["ApiKey " + state.api_key]
}
}).do_request().as(resp, resp.StatusCode == 200 ?
(
// Response is successful, but did we get any data?
bytes(resp.Body).decode_json().as(body, has(body.instances) && size(body.instances) > 0 ?
{
"events": body.instances.map(instance, instance.product_line_items.map(line_item,
{
"ess": {
"billing": line_item.with({
"deployment_name": instance.name, // Include deployment name
"deployment_id": instance.id, // Include deployment ID
"organization_id": state.organization_id, // Include organization ID
"from": req.from.format(time_layout.RFC3339),
"to": req.to.format(time_layout.RFC3339),
})
}
}
)).flatten(), // Pass line_items as events, with added info
"cursor": {
"last_to": req.to
},
// Are we more than 1 day behind?
"want_more": req.to < now() - duration("24h"),
}
:
// We don't have any data, but we still need to return an event
// Otherwise the "want_more" logic will not work
{
"events": [{
"fake": true, // This will either way be dropped by the ingest pipeline
}],
"cursor": {
"last_to": req.to
},
// Are we more than 1 day behind?
"want_more": req.to < now() - duration("24h"),
}
)
)
:
// Response was not successful, return an error event
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET " + resp.Request.URL + ": " +
(
(size(resp.Body) != 0) ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"cursor": {
"last_to": req.to
},
"want_more": req.to < now() - duration("24h"),
}
))
)
{{#if tags.length}}
tags:
{{else if preserve_original_event}}
tags:
{{/if}}
{{#each tags as |tag i|}}
- {{tag}}
{{/each}}
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}