in v1/src/main/java/com/google/cloud/teleport/templates/common/DatadogConverters.java [203:364]
public PCollectionTuple expand(PCollection<FailsafeElement<String, String>> input) {
return input.apply(
"ConvertToDatadogEvent",
ParDo.of(
new DoFn<FailsafeElement<String, String>, DatadogEvent>() {
@ProcessElement
public void processElement(ProcessContext context) {
String input = context.element().getPayload();
try {
// Start building a DatadogEvent with the payload as the message and a
// default source.
DatadogEvent.Builder builder =
DatadogEvent.newBuilder()
.withMessage(input)
.withSource(DD_DEFAULT_SOURCE);
// We will attempt to parse the input to see
// if it is a valid JSON and if so, whether we can
// extract some additional properties.
try {
JSONObject json = new JSONObject(input);
// If valid JSON, we attempt to treat it as a LogEntry
// See:
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
JSONObject data =
isPubsubMessage(json)
? json.optJSONObject(PUBSUB_MESSAGE_DATA_FIELD)
: json;
boolean dataAvailable = (data != null && !data.isEmpty());
if (dataAvailable) {
// Check if the JSON we receive has a resource object
// See:
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource
JSONObject resource = data.optJSONObject(GCP_RESOURCE_KEY);
boolean resourceAvailable = (resource != null && !resource.isEmpty());
if (resourceAvailable) {
// Check if the resource object has a type string
// If so, convert it to a Datadog source string and add it to the
// DatadogEvent, i.e.
// "gce_instance"
// converts to
// "gcp.gce.instance"
String type = resource.optString(GCP_RESOURCE_TYPE_KEY);
if (!type.isEmpty()) {
String formattedSource =
DD_DEFAULT_SOURCE + "." + type.replaceAll("_", ".");
builder.withSource(formattedSource);
}
// Check if the resource object has a labels object
// If so, convert it to a Datadog tags string and add it to the
// DatadogEvent, i.e.
// {"projectId": "my-project", "instanceId": "12345678901234", "zone":
// "us-central1-a"}
// converts to
// "projectId:my-project,instanceId:12345678901234,zone:us-central1-a"
JSONObject labels = resource.optJSONObject(GCP_RESOURCE_LABELS_KEY);
boolean labelsAvailable = (labels != null && !labels.isEmpty());
if (labelsAvailable) {
List<String> tags = new ArrayList<>();
for (Map.Entry<String, Object> label : labels.toMap().entrySet()) {
String labelName = label.getKey();
String labelValue = label.getValue().toString();
if (labelName.isEmpty() || labelValue.isEmpty()) {
continue;
}
tags.add(String.format("%s:%s", labelName, labelValue));
}
String formattedTags = Joiner.on(",").join(tags);
if (!formattedTags.isEmpty()) {
builder.withTags(formattedTags);
}
}
}
}
// Check if metadata is provided via a nested _metadata
// JSON object
JSONObject metadata = json.optJSONObject(METADATA_KEY);
boolean metadataAvailable = (metadata != null && !metadata.isEmpty());
// For the metadata fields, we only look at the _metadata
// object if present.
// If the metadata has any matching entries, they take precedence
// over anything already in the DatadogEvent
if (metadataAvailable) {
String source = metadata.optString(DD_SOURCE_KEY);
if (!source.isEmpty()) {
builder.withSource(source);
}
String tags = metadata.optString(DD_TAGS_KEY);
if (!tags.isEmpty()) {
builder.withTags(tags);
}
String hostname = metadata.optString(DD_HOSTNAME_KEY);
if (!hostname.isEmpty()) {
builder.withHostname(hostname);
}
String service = metadata.optString(DD_SERVICE_KEY);
if (!service.isEmpty()) {
builder.withService(service);
}
String message = metadata.optString(DD_MESSAGE_KEY);
if (!message.isEmpty()) {
builder.withMessage(message);
}
// We remove the _metadata entry from the payload
// to avoid duplicates in Datadog. The relevant entries
// have been parsed and populated in the DatadogEvent metadata.
json.remove(METADATA_KEY);
// If the message was not overridden in metadata above, use
// the received JSON as message.
if (message.isEmpty()) {
builder.withMessage(json.toString());
}
}
} catch (JSONException je) {
// input is either not a properly formatted JSONObject
// or has other exceptions. In this case, we will
// simply capture the entire input as an 'event' and
// not worry about capturing any specific properties
// (for e.g Timestamp etc).
// We also do not want to LOG this as we might be running
// a pipeline to simply log text entries to Datadog and
// this is expected behavior.
}
context.output(datadogEventOutputTag, builder.build());
CONVERSION_SUCCESS.inc();
} catch (Exception e) {
CONVERSION_ERRORS.inc();
context.output(
datadogDeadletterTag,
FailsafeElement.of(input, input)
.setErrorMessage(e.getMessage())
.setStacktrace(Throwables.getStackTraceAsString(e)));
}
}
})
.withOutputTags(datadogEventOutputTag, TupleTagList.of(datadogDeadletterTag)));
}