public void process()

in canary/consumer-java/src/main/java/com/amazon/kinesis/video/canary/consumer/CanaryFrameProcessor.java [40:145]


    public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata, Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) throws FrameProcessException {
        int frameTimeDelta = frame.getTimeCode();
        long fragmentStartTime = fragmentMetadata.get().getProducerSideTimestampMillis();
        byte[] data = new byte[frame.getFrameData().remaining()];
        int offset = 0;
        frame.getFrameData().get(data);
        byte[] timeData = new byte[Long.BYTES];

        System.arraycopy(data, offset, timeData, 0, timeData.length);
        offset += timeData.length;
        long frameTimeInsideData = Longs.fromByteArray(timeData);

        byte[] indexData = new byte[Integer.BYTES];
        System.arraycopy(data, offset, indexData, 0, indexData.length);
        offset += indexData.length;
        int frameIndex = Ints.fromByteArray(indexData);

        byte[] sizeData = new byte[Integer.BYTES];
        System.arraycopy(data, offset, sizeData, 0, sizeData.length);
        offset += sizeData.length;
        int frameSize = Ints.fromByteArray(sizeData);

        List<MetricDatum> datumList = new ArrayList<>();
        // frameSize == buffer size - extra canary metadata size
        MetricDatum datum = new MetricDatum()
                .withMetricName("FrameSizeMatch")
                .withUnit(StandardUnit.None)
                .withValue(frameSize == data.length ? 1.0 : 0)
                .withDimensions(dimensionPerStream);
        datumList.add(datum);
        MetricDatum aggDatum = new MetricDatum()
                .withMetricName("FrameSizeMatch")
                .withUnit(StandardUnit.None)
                .withValue(frameSize == data.length ? 1.0 : 0)
                .withDimensions(aggregatedDimension);
        datumList.add(aggDatum);

        byte[] crcData = new byte[Long.BYTES];
        System.arraycopy(data, offset, crcData, 0, crcData.length);
        Arrays.fill(data, offset, offset + crcData.length, (byte) 0);
        offset += crcData.length;
        long crcValue = Longs.fromByteArray(crcData);
        CRC32 crc32 = new CRC32();
        crc32.update(data);

        datum = new MetricDatum()
                .withMetricName("FrameDataMatches")
                .withUnit(StandardUnit.None)
                .withValue(crc32.getValue() == crcValue ? 1.0 : 0)
                .withDimensions(dimensionPerStream);
        datumList.add(datum);
        aggDatum = new MetricDatum()
                .withMetricName("FrameDataMatches")
                .withUnit(StandardUnit.None)
                .withValue(crc32.getValue() == crcValue ? 1.0 : 0)
                .withDimensions(aggregatedDimension);
        datumList.add(aggDatum);


        // frameTimestampInsideData == producerTimestamp + frame timecode
        datum = new MetricDatum()
                .withMetricName("FrameTimeMatchesProducerTimestamp")
                .withUnit(StandardUnit.None)
                .withValue(frameTimeInsideData == fragmentStartTime + frameTimeDelta ? 1.0 : 0)
                .withDimensions(dimensionPerStream);
        datumList.add(datum);
        aggDatum = new MetricDatum()
                .withMetricName("FrameTimeMatchesProducerTimestamp")
                .withUnit(StandardUnit.None)
                .withValue(frameTimeInsideData == fragmentStartTime + frameTimeDelta ? 1.0 : 0)
                .withDimensions(aggregatedDimension);
        datumList.add(aggDatum);

        // frameIndex == lastFrameIndex + 1 except lastFrameIndex is not initialized
        if (lastFrameIndex >= 0) {
            datum = new MetricDatum()
                    .withMetricName("FrameDropped")
                    .withUnit(StandardUnit.None)
                    .withValue(frameIndex != lastFrameIndex + 1 ? 1.0 : 0)
                    .withDimensions(dimensionPerStream);
            datumList.add(datum);
            aggDatum = new MetricDatum()
                    .withMetricName("FrameDropped")
                    .withUnit(StandardUnit.None)
                    .withValue(frameIndex != lastFrameIndex + 1 ? 1.0 : 0)
                    .withDimensions(aggregatedDimension);
            datumList.add(aggDatum);
        }
        lastFrameIndex = frameIndex;

        // E2E frame latency = currentTime - frameTimeInsideData
        datum = new MetricDatum()
                .withMetricName("EndToEndFrameLatency")
                .withUnit(StandardUnit.Milliseconds)
                .withValue((double) System.currentTimeMillis() - frameTimeInsideData)
                .withDimensions(dimensionPerStream);
        datumList.add(datum);
        aggDatum = new MetricDatum()
                .withMetricName("EndToEndFrameLatency")
                .withUnit(StandardUnit.Milliseconds)
                .withValue((double) System.currentTimeMillis() - frameTimeInsideData)
                .withDimensions(aggregatedDimension);
        datumList.add(aggDatum);

        sendMetrics(datumList);
    }