PVOID ProducerClientTestBase::basicProducerRoutine()

in tst/ProducerClientBasicTest.cpp [341:463]


PVOID ProducerClientTestBase::basicProducerRoutine(STREAM_HANDLE streamHandle, STREAMING_TYPE streamingType)
{
    if (!IS_VALID_STREAM_HANDLE(streamHandle)) {
        DLOGE("Invalid stream handle");
        return NULL;
    }

    UINT32 index = 0, persistentMetadataIndex = 0;
    UINT64 timestamp = GETTIME();
    Frame frame;
    std::string persistentMetadataName;
    TID tid = GETTID();

    PStreamInfo pStreamInfo;
    EXPECT_EQ(STATUS_SUCCESS, kinesisVideoStreamGetStreamInfo(streamHandle, &pStreamInfo));

    // Set an indicator that the producer is not stopped
    mProducerStopped = FALSE;

    // Loop until cancelled
    frame.version = FRAME_CURRENT_VERSION;
    frame.duration = TEST_FRAME_DURATION;
    frame.frameData = mFrameBuffer;
    frame.trackId = DEFAULT_VIDEO_TRACK_ID;

    MEMSET(frame.frameData, 0x55, mFrameSize);

    BYTE cpd[] = {0x00, 0x00, 0x00, 0x01, 0x67, 0x64, 0x00, 0x34,
                  0xAC, 0x2B, 0x40, 0x1E, 0x00, 0x78, 0xD8, 0x08,
                  0x80, 0x00, 0x01, 0xF4, 0x00, 0x00, 0xEA, 0x60,
                  0x47, 0xA5, 0x50, 0x00, 0x00, 0x00, 0x01, 0x68,
                  0xEE, 0x3C, 0xB0};
    UINT32 cpdSize = SIZEOF(cpd);

    EXPECT_EQ(STATUS_SUCCESS, kinesisVideoStreamFormatChanged(streamHandle, cpdSize, cpd, DEFAULT_VIDEO_TRACK_ID));

    while (!mStopProducer) {
        // Produce frames
        if (IS_OFFLINE_STREAMING_MODE(streamingType)) {
            timestamp += frame.duration;
        } else {
            timestamp = GETTIME();
        }

        frame.index = index++;
        frame.decodingTs = timestamp;
        frame.presentationTs = timestamp;
        frame.size = mFrameSize;

        // Add small variation to the frame size (if larger frames
        if (frame.size > 100) {
            frame.size -= RAND() % 100;
        }

        // Key frame every 50th
        frame.flags = (frame.index % TEST_KEY_FRAME_INTERVAL == 0) ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;

        DLOGD("Putting frame for stream: %s, TID: 0x%" PRIx64 ", Id: %u, Key Frame: %s, Size: %u, Dts: %" PRIu64 ", Pts: %" PRIu64,
              pStreamInfo->name,
              tid,
              frame.index,
              (((frame.flags & FRAME_FLAG_KEY_FRAME) == FRAME_FLAG_KEY_FRAME) ? "true" : "false"),
              frame.size,
              frame.decodingTs,
              frame.presentationTs);

        // Apply some non-persistent metadata every few frames
        if (frame.index % 20 == 0) {
            std::ostringstream metadataName;
            std::ostringstream metadataValue;

            metadataName << "MetadataNameForFrame_" << frame.index;
            metadataValue << "MetadataValueForFrame_" << frame.index;
            EXPECT_EQ(STATUS_SUCCESS, putKinesisVideoFragmentMetadata(streamHandle,
                                                                      (PCHAR) metadataName.str().c_str(),
                                                                      (PCHAR) metadataValue.str().c_str(),
                                                                      FALSE));
        }

        // Apply some persistent metadata on a larger intervals to span fragments
        if (frame.index % 60 == 0) {
            std::ostringstream metadataName;
            std::ostringstream metadataValue;

            metadataName << "PersistentMetadataName_" << persistentMetadataIndex;

            // Set or clear persistent metadata every other time.
            if (persistentMetadataIndex % 2 == 0) {
                persistentMetadataName = metadataName.str();
                metadataValue << "PersistentMetadataValue_" << persistentMetadataIndex;
            }

            persistentMetadataIndex++;
            EXPECT_EQ(STATUS_SUCCESS, putKinesisVideoFragmentMetadata(streamHandle,
                                                                      (PCHAR) persistentMetadataName.c_str(),
                                                                      (PCHAR) metadataValue.str().c_str(),
                                                                      TRUE));
        }

#if 0
            // Simulate EoFr first
if (frame.index % 50 == 0 && frame.index != 0) {
Frame eofr = EOFR_FRAME_INITIALIZER;
EXPECT_TRUE(kinesis_video_stream->putFrame(eofr));
}
#endif

        EXPECT_EQ(STATUS_SUCCESS, putKinesisVideoFrame(streamHandle, &frame));

        // Sleep a while for non-offline modes
        if (streamingType != STREAMING_TYPE_OFFLINE) {
            THREAD_SLEEP(TEST_FRAME_DURATION);
        }
    }

    DLOGD("Stopping the stream: %s", pStreamInfo->name);
    EXPECT_EQ(STATUS_SUCCESS, stopKinesisVideoStreamSync(streamHandle)) << "Timed out awaiting for the stream stop notification";

    // Indicate that the producer routine had stopped
    mProducerStopped = true;

    return NULL;
}