in canary/consumer-java/src/main/java/com/amazon/kinesis/video/canary/consumer/CanaryFrameProcessor.java [40:145]
public void process(Frame frame, MkvTrackMetadata trackMetadata, Optional<FragmentMetadata> fragmentMetadata, Optional<FragmentMetadataVisitor.MkvTagProcessor> tagProcessor) throws FrameProcessException {
int frameTimeDelta = frame.getTimeCode();
long fragmentStartTime = fragmentMetadata.get().getProducerSideTimestampMillis();
byte[] data = new byte[frame.getFrameData().remaining()];
int offset = 0;
frame.getFrameData().get(data);
byte[] timeData = new byte[Long.BYTES];
System.arraycopy(data, offset, timeData, 0, timeData.length);
offset += timeData.length;
long frameTimeInsideData = Longs.fromByteArray(timeData);
byte[] indexData = new byte[Integer.BYTES];
System.arraycopy(data, offset, indexData, 0, indexData.length);
offset += indexData.length;
int frameIndex = Ints.fromByteArray(indexData);
byte[] sizeData = new byte[Integer.BYTES];
System.arraycopy(data, offset, sizeData, 0, sizeData.length);
offset += sizeData.length;
int frameSize = Ints.fromByteArray(sizeData);
List<MetricDatum> datumList = new ArrayList<>();
// frameSize == buffer size - extra canary metadata size
MetricDatum datum = new MetricDatum()
.withMetricName("FrameSizeMatch")
.withUnit(StandardUnit.None)
.withValue(frameSize == data.length ? 1.0 : 0)
.withDimensions(dimensionPerStream);
datumList.add(datum);
MetricDatum aggDatum = new MetricDatum()
.withMetricName("FrameSizeMatch")
.withUnit(StandardUnit.None)
.withValue(frameSize == data.length ? 1.0 : 0)
.withDimensions(aggregatedDimension);
datumList.add(aggDatum);
byte[] crcData = new byte[Long.BYTES];
System.arraycopy(data, offset, crcData, 0, crcData.length);
Arrays.fill(data, offset, offset + crcData.length, (byte) 0);
offset += crcData.length;
long crcValue = Longs.fromByteArray(crcData);
CRC32 crc32 = new CRC32();
crc32.update(data);
datum = new MetricDatum()
.withMetricName("FrameDataMatches")
.withUnit(StandardUnit.None)
.withValue(crc32.getValue() == crcValue ? 1.0 : 0)
.withDimensions(dimensionPerStream);
datumList.add(datum);
aggDatum = new MetricDatum()
.withMetricName("FrameDataMatches")
.withUnit(StandardUnit.None)
.withValue(crc32.getValue() == crcValue ? 1.0 : 0)
.withDimensions(aggregatedDimension);
datumList.add(aggDatum);
// frameTimestampInsideData == producerTimestamp + frame timecode
datum = new MetricDatum()
.withMetricName("FrameTimeMatchesProducerTimestamp")
.withUnit(StandardUnit.None)
.withValue(frameTimeInsideData == fragmentStartTime + frameTimeDelta ? 1.0 : 0)
.withDimensions(dimensionPerStream);
datumList.add(datum);
aggDatum = new MetricDatum()
.withMetricName("FrameTimeMatchesProducerTimestamp")
.withUnit(StandardUnit.None)
.withValue(frameTimeInsideData == fragmentStartTime + frameTimeDelta ? 1.0 : 0)
.withDimensions(aggregatedDimension);
datumList.add(aggDatum);
// frameIndex == lastFrameIndex + 1 except lastFrameIndex is not initialized
if (lastFrameIndex >= 0) {
datum = new MetricDatum()
.withMetricName("FrameDropped")
.withUnit(StandardUnit.None)
.withValue(frameIndex != lastFrameIndex + 1 ? 1.0 : 0)
.withDimensions(dimensionPerStream);
datumList.add(datum);
aggDatum = new MetricDatum()
.withMetricName("FrameDropped")
.withUnit(StandardUnit.None)
.withValue(frameIndex != lastFrameIndex + 1 ? 1.0 : 0)
.withDimensions(aggregatedDimension);
datumList.add(aggDatum);
}
lastFrameIndex = frameIndex;
// E2E frame latency = currentTime - frameTimeInsideData
datum = new MetricDatum()
.withMetricName("EndToEndFrameLatency")
.withUnit(StandardUnit.Milliseconds)
.withValue((double) System.currentTimeMillis() - frameTimeInsideData)
.withDimensions(dimensionPerStream);
datumList.add(datum);
aggDatum = new MetricDatum()
.withMetricName("EndToEndFrameLatency")
.withUnit(StandardUnit.Milliseconds)
.withValue((double) System.currentTimeMillis() - frameTimeInsideData)
.withDimensions(aggregatedDimension);
datumList.add(aggDatum);
sendMetrics(datumList);
}