public void processElement()

in pipelines/clickstream_analytics_java/src/main/java/com/google/cloud/dataflow/solutions/clickstream_analytics/JsonToTableRows.java [61:83]


        public void processElement(ProcessContext context) {
            String jsonString = context.element();

            byte[] message_in_bytes = jsonString.getBytes(StandardCharsets.UTF_8);

            if (message_in_bytes.length >= JsonToTableRows.MESSAGE_LIMIT_SIZE) {
                LOG.error("Row is too big row, size {} bytes", message_in_bytes.length);
                Metrics.tooBigMessages.inc();
                context.output(FAILURE_TAG, KV.of("TooBigRow", jsonString));
            }

            TableRow row;
            try (InputStream inputStream = new ByteArrayInputStream(message_in_bytes)) {
                row = TableRowJsonCoder.of().decode(inputStream, Context.OUTER);
                Metrics.successfulMessages.inc();
                context.output(row);

            } catch (IOException e) {
                LOG.error(e.getMessage());
                Metrics.jsonParseErrorMessages.inc();
                context.output(FAILURE_TAG, KV.of("JsonParseError", jsonString));
            }
        }