packages/claroty_ctd/data_stream/asset/agent/stream/cel.yml.hbs (150 lines of code) (raw):
config_version: 2
interval: {{interval}}
{{#if enable_request_tracer}}
resource.tracer.filename: "../../logs/cel/http-request-trace-*.ndjson"
request.tracer.maxbackups: 5
{{/if}}
{{#if proxy_url}}
resource.proxy_url: {{proxy_url}}
{{/if}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
{{#if http_client_timeout}}
resource.timeout: {{http_client_timeout}}
{{/if}}
resource.url: {{url}}
state:
page: 1
batch_size : {{batch_size}}
username: {{username}}
password: {{password}}
counter: 0
want_more: false
program: |
state.with(state.?want_more.orValue(false) ? {} :
post_request(
state.url + "/auth/authenticate",
"",
{"username":state.username,"password":state.password}.encode_json()
).with({
"Header": {
"Accept": ["application/json"],
"Content-Type": ["application/json"]
},
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body, {
"access_token": body.token,
"page" : state.page,
"counter": 0,
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "POST:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
}
}
}
)).as(state ,
request(
"GET",
state.url + "/ranger/assets?" + {
"page" : [string(state.page)],
"per_page" : [string(state.batch_size)],
?"last_updated__gt":(!state.?want_more.orValue(false) && has(state.?cursor.last_updated)) ?
optional.of([string(state.cursor.last_updated)])
: has(state.first_updated) ?
optional.of([string(state.first_updated)])
:
optional.none()
}.format_query()
).with({
"Header": {
"Authorization": [state.?access_token.orValue("")],
"Accept" : ["application/json"]
},
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body, {
"events": has(body.objects) && body.objects != null ?
body.objects.map(e, {
"message": e.encode_json(),
})
:
[],
"cursor": {
?"last_updated": (has(body.count_in_page) && int(body.count_in_page) > 0) ?
optional.of(string(([?state.?cursor.last_updated] + body.objects.map(e, e.last_updated)).map(t, timestamp(t)).max()))
:
state.?cursor.last_updated,
},
?"first_updated": (!has(state.first_updated) && has(body.objects) && (int(state.counter) + int(body.?count_in_page.orValue(0))) < int(body.?count_total.orValue(-1))) ?
optional.none()
: state.want_more && (int(state.counter) + int(body.?count_in_page.orValue(0))) < int(body.?count_total.orValue(-1)) ?
optional.of(state.first_updated)
:
state.?cursor.last_updated,
"username" : state.username,
"password" : state.password,
"batch_size" : state.batch_size,
"access_token": state.access_token,
"url" : state.url,
"counter": (int(state.counter) + int(body.?count_in_page.orValue(0))) < int(body.?count_total.orValue(-1)) ?
int(state.counter) + int(body.count_in_page)
:
0,
"want_more" : has(body.count_in_page) && has(body.count_total) && (int(state.counter) + int(body.?count_in_page.orValue(0))) < int(body.?count_total.orValue(-1)),
"page" : string(int(
has(body.objects) && (int(state.counter) + int(body.?count_in_page.orValue(0))) < int(body.?count_total.orValue(-1)) ?
state.page
:
"0"
) + 1),
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
}
},
"want_more": false,
"batch_size": state.batch_size,
"page": 0,
"counter": 0,
"username": state.username,
"password": state.password
}
)
)
tags:
{{#if preserve_original_event}}
- preserve_original_event
{{/if}}
{{#if preserve_duplicate_custom_fields}}
- preserve_duplicate_custom_fields
{{/if}}
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}