in jobs-service/jobs-recipients/job-sink-recipient/runtime/src/main/java/org/kie/kogito/job/sink/recipient/SinkJobExecutor.java [119:158]
private HTTPRequest buildStructuredRequest(SinkRecipient<?> recipient, String sinkUrl, HTTPRequest.HTTPMethod method, String limit) {
HTTPRequest.Builder requestBuilder = HTTPRequest.builder()
.url(sinkUrl)
.method(method)
.addHeader(HttpHeaders.CONTENT_TYPE.toString(), JsonFormat.CONTENT_TYPE);
CloudEventBuilder eventBuilder = CloudEventBuilder.v1()
.withType(recipient.getCeType())
.withId(buildRandomId())
.withSource(recipient.getCeSource())
.withTime(OffsetDateTime.now());
if (recipient.getCeDataContentType() != null) {
eventBuilder.withDataContentType(recipient.getCeDataContentType());
}
if (recipient.getCeDataSchema() != null) {
eventBuilder.withDataSchema(recipient.getCeDataSchema());
}
if (recipient.getCeSubject() != null) {
eventBuilder.withSubject(recipient.getCeSubject());
}
filterEntries(recipient.getCeExtensions())
.forEach((key, value) -> eventBuilder.withExtension(key, value.toString()));
if (limit != null) {
eventBuilder.withExtension("limit", limit);
}
if (recipient.getPayload() != null) {
if (recipient.getPayload().getData() instanceof byte[]) {
eventBuilder.withData((byte[]) recipient.getPayload().getData());
} else if (recipient.getPayload().getData() instanceof JsonNode) {
eventBuilder.withData(JsonCloudEventData.wrap((JsonNode) recipient.getPayload().getData()));
}
}
CloudEvent event = eventBuilder.build();
if (recipient.getCeSpecVersion() == SpecVersion.V03) {
event = CloudEventBuilder.v03(event).build();
}
byte[] body = EventFormatProvider.getInstance().resolveFormat(JsonFormat.CONTENT_TYPE).serialize(event);
return requestBuilder.body(body).build();
}