in pipelines/clickstream_analytics_java/src/main/java/com/google/cloud/dataflow/solutions/clickstream_analytics/JsonToTableRows.java [61:83]
public void processElement(ProcessContext context) {
String jsonString = context.element();
byte[] message_in_bytes = jsonString.getBytes(StandardCharsets.UTF_8);
if (message_in_bytes.length >= JsonToTableRows.MESSAGE_LIMIT_SIZE) {
LOG.error("Row is too big row, size {} bytes", message_in_bytes.length);
Metrics.tooBigMessages.inc();
context.output(FAILURE_TAG, KV.of("TooBigRow", jsonString));
}
TableRow row;
try (InputStream inputStream = new ByteArrayInputStream(message_in_bytes)) {
row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
Metrics.successfulMessages.inc();
context.output(row);
} catch (IOException e) {
LOG.error(e.getMessage());
Metrics.jsonParseErrorMessages.inc();
context.output(FAILURE_TAG, KV.of("JsonParseError", jsonString));
}
}