public PCollectionTuple expand()

in v1/src/main/java/com/google/cloud/teleport/templates/common/DatadogConverters.java [203:364]


    public PCollectionTuple expand(PCollection<FailsafeElement<String, String>> input) {

      return input.apply(
          "ConvertToDatadogEvent",
          ParDo.of(
                  new DoFn<FailsafeElement<String, String>, DatadogEvent>() {

                    @ProcessElement
                    public void processElement(ProcessContext context) {

                      String input = context.element().getPayload();

                      try {

                        // Start building a DatadogEvent with the payload as the message and a
                        // default source.
                        DatadogEvent.Builder builder =
                            DatadogEvent.newBuilder()
                                .withMessage(input)
                                .withSource(DD_DEFAULT_SOURCE);

                        // We will attempt to parse the input to see
                        // if it is a valid JSON and if so, whether we can
                        // extract some additional properties.
                        try {

                          JSONObject json = new JSONObject(input);

                          // If valid JSON, we attempt to treat it as a LogEntry
                          // See:
                          // https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
                          JSONObject data =
                              isPubsubMessage(json)
                                  ? json.optJSONObject(PUBSUB_MESSAGE_DATA_FIELD)
                                  : json;
                          boolean dataAvailable = (data != null && !data.isEmpty());

                          if (dataAvailable) {
                            // Check if the JSON we receive has a resource object
                            // See:
                            // https://cloud.google.com/logging/docs/reference/v2/rest/v2/MonitoredResource
                            JSONObject resource = data.optJSONObject(GCP_RESOURCE_KEY);
                            boolean resourceAvailable = (resource != null && !resource.isEmpty());

                            if (resourceAvailable) {

                              // Check if the resource object has a type string
                              // If so, convert it to a Datadog source string and add it to the
                              // DatadogEvent, i.e.
                              // "gce_instance"
                              // converts to
                              // "gcp.gce.instance"
                              String type = resource.optString(GCP_RESOURCE_TYPE_KEY);
                              if (!type.isEmpty()) {
                                String formattedSource =
                                    DD_DEFAULT_SOURCE + "." + type.replaceAll("_", ".");
                                builder.withSource(formattedSource);
                              }

                              // Check if the resource object has a labels object
                              // If so, convert it to a Datadog tags string and add it to the
                              // DatadogEvent, i.e.
                              // {"projectId": "my-project", "instanceId": "12345678901234", "zone":
                              // "us-central1-a"}
                              // converts to
                              // "projectId:my-project,instanceId:12345678901234,zone:us-central1-a"
                              JSONObject labels = resource.optJSONObject(GCP_RESOURCE_LABELS_KEY);
                              boolean labelsAvailable = (labels != null && !labels.isEmpty());

                              if (labelsAvailable) {
                                List<String> tags = new ArrayList<>();
                                for (Map.Entry<String, Object> label : labels.toMap().entrySet()) {
                                  String labelName = label.getKey();
                                  String labelValue = label.getValue().toString();
                                  if (labelName.isEmpty() || labelValue.isEmpty()) {
                                    continue;
                                  }

                                  tags.add(String.format("%s:%s", labelName, labelValue));
                                }

                                String formattedTags = Joiner.on(",").join(tags);
                                if (!formattedTags.isEmpty()) {
                                  builder.withTags(formattedTags);
                                }
                              }
                            }
                          }

                          // Check if metadata is provided via a nested _metadata
                          // JSON object
                          JSONObject metadata = json.optJSONObject(METADATA_KEY);
                          boolean metadataAvailable = (metadata != null && !metadata.isEmpty());

                          // For the metadata fields, we only look at the _metadata
                          // object if present.
                          // If the metadata has any matching entries, they take precedence
                          // over anything already in the DatadogEvent
                          if (metadataAvailable) {
                            String source = metadata.optString(DD_SOURCE_KEY);
                            if (!source.isEmpty()) {
                              builder.withSource(source);
                            }

                            String tags = metadata.optString(DD_TAGS_KEY);
                            if (!tags.isEmpty()) {
                              builder.withTags(tags);
                            }

                            String hostname = metadata.optString(DD_HOSTNAME_KEY);
                            if (!hostname.isEmpty()) {
                              builder.withHostname(hostname);
                            }

                            String service = metadata.optString(DD_SERVICE_KEY);
                            if (!service.isEmpty()) {
                              builder.withService(service);
                            }

                            String message = metadata.optString(DD_MESSAGE_KEY);
                            if (!message.isEmpty()) {
                              builder.withMessage(message);
                            }

                            // We remove the _metadata entry from the payload
                            // to avoid duplicates in Datadog. The relevant entries
                            // have been parsed and populated in the DatadogEvent metadata.
                            json.remove(METADATA_KEY);

                            // If the message was not overridden in metadata above, use
                            // the received JSON as message.
                            if (message.isEmpty()) {
                              builder.withMessage(json.toString());
                            }
                          }

                        } catch (JSONException je) {
                          // input is either not a properly formatted JSONObject
                          // or has other exceptions. In this case, we will
                          // simply capture the entire input as an 'event' and
                          // not worry about capturing any specific properties
                          // (for e.g Timestamp etc).
                          // We also do not want to LOG this as we might be running
                          // a pipeline to simply log text entries to Datadog and
                          // this is expected behavior.
                        }

                        context.output(datadogEventOutputTag, builder.build());
                        CONVERSION_SUCCESS.inc();

                      } catch (Exception e) {
                        CONVERSION_ERRORS.inc();
                        context.output(
                            datadogDeadletterTag,
                            FailsafeElement.of(input, input)
                                .setErrorMessage(e.getMessage())
                                .setStacktrace(Throwables.getStackTraceAsString(e)));
                      }
                    }
                  })
              .withOutputTags(datadogEventOutputTag, TupleTagList.of(datadogDeadletterTag)));
    }