SIZE_T postReadCallback()

in src/source/Response.c [608:728]


SIZE_T postReadCallback(PCHAR pBuffer, SIZE_T size, SIZE_T numItems, PVOID customData)
{
    DLOGV("postBodyStreamingReadFunc (curl callback) invoked");
    PCurlResponse pCurlResponse = NULL;
    PCurlApiCallbacks pCurlApiCallbacks;
    SIZE_T bufferSize = size * numItems, bytesWritten = 0;
    STATUS retStatus = STATUS_SUCCESS;
    UINT32 retrievedSize = 0;
    UINT64 sleepTime = BASE_GET_DATA_SLEEP_TIME;
    UINT8 iter = 0;
    UPLOAD_HANDLE uploadHandle;
    PCurlRequest pCurlRequest = (PCurlRequest) customData;

    if (pCurlRequest == NULL || pCurlRequest->pCurlResponse == NULL || pCurlRequest->pCurlApiCallbacks == NULL) {
        bytesWritten = CURL_READFUNC_ABORT;
        CHK(FALSE, retStatus);
    }

    pCurlResponse = pCurlRequest->pCurlResponse;
    pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
    uploadHandle = pCurlResponse->pCurlRequest->uploadHandle;

    if (pCurlResponse->paused) {
        bytesWritten = CURL_READFUNC_PAUSE;
        CHK(FALSE, retStatus);
    }

    if (pCurlResponse->endOfStream || ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating)) {
        DLOGI("Closing connection for upload stream handle: %" PRIu64, uploadHandle);
        CHK(FALSE, retStatus);
    }

    do {
        // First iteration we do not sleep, after that we sleep
        // 20 40 80 160 320 (ms)
        if (iter > 0) {
            THREAD_SLEEP(sleepTime);
            sleepTime *=2;
        }
        retStatus =
            getKinesisVideoStreamData(pCurlResponse->pCurlRequest->streamHandle, uploadHandle, (PBYTE) pBuffer, (UINT32) bufferSize, &retrievedSize);

        if (pCurlApiCallbacks->curlReadCallbackHookFn != NULL) {
            retStatus = pCurlApiCallbacks->curlReadCallbackHookFn(pCurlResponse, uploadHandle, (PBYTE) pBuffer, (UINT32) bufferSize, &retrievedSize,
                                                                  retStatus);
        }

        bytesWritten = (SIZE_T) retrievedSize;

        DLOGV("Get Stream data returned: buffer size: %u written bytes: %u for upload handle: %" PRIu64 " current stream handle: %" PRIu64,
              bufferSize, bytesWritten, uploadHandle, pCurlResponse->pCurlRequest->streamHandle);

        iter++;
    } while(iter < MAX_GET_DATA_ITER && bytesWritten == 0 && (retStatus == STATUS_SUCCESS || retStatus == STATUS_NO_MORE_DATA_AVAILABLE));

    // The return should be OK, no more data or an end of stream
    switch (retStatus) {
        case STATUS_SUCCESS:
        case STATUS_NO_MORE_DATA_AVAILABLE:
            // Media pipeline thread might be blocked due to heap or temporal limit.
            // Pause curl read and wait for persisted ack.
            if (bytesWritten == 0) {
                DLOGD("Pausing CURL read for upload handle: %" PRIu64, uploadHandle);
                bytesWritten = CURL_READFUNC_PAUSE;
            }
            break;

        case STATUS_END_OF_STREAM:
            DLOGI("Reported end-of-stream for stream %s. Upload handle: %" PRIu64, pCurlRequest->streamName, uploadHandle);

            pCurlResponse->endOfStream = TRUE;

            // Output the remaining bytes

            break;

        case STATUS_AWAITING_PERSISTED_ACK:
            // If bytes_written == 0, set it to pause to exit the loop
            if (bytesWritten == 0) {
                DLOGD("Pausing CURL read for upload handle: %" PRIu64 " waiting for last ack.", uploadHandle);
                bytesWritten = CURL_READFUNC_PAUSE;
            }
            break;

        case STATUS_UPLOAD_HANDLE_ABORTED:
            DLOGW("Reported abort-connection for Upload handle: %" PRIu64, uploadHandle);
            bytesWritten = CURL_READFUNC_ABORT;

            // Graceful shutdown as PIC is aware of terminated stream
            pCurlResponse->endOfStream = TRUE;
            break;

        default:
            DLOGE("Failed to get data from the stream with an error: 0x%08x", retStatus);

            // set bytes_written to terminate and close the connection
            bytesWritten = CURL_READFUNC_ABORT;
    }

CleanUp:

    if (bytesWritten != CURL_READFUNC_ABORT && bytesWritten != CURL_READFUNC_PAUSE) {
        DLOGD("Wrote %u bytes to Kinesis Video. Upload stream handle: %" PRIu64, bytesWritten, uploadHandle);

        if (bytesWritten != 0 && pCurlResponse->debugDumpFile) {
            retStatus = writeFile(pCurlResponse->debugDumpFilePath, TRUE, TRUE, (PBYTE) pBuffer, bytesWritten);
            if (STATUS_FAILED(retStatus)) {
                DLOGW("Failed to write to debug dump file with error: 0x%08x", retStatus);
            }
        }
    } else if (bytesWritten == CURL_READFUNC_PAUSE) {
        pCurlResponse->paused = TRUE;
    }

    // Since curl is about to terminate gracefully, set flag to prevent shutdown thread from timing it out.
    if ((bytesWritten == CURL_READFUNC_ABORT || bytesWritten == 0) && pCurlResponse != NULL) {
        ATOMIC_STORE_BOOL(&pCurlResponse->terminated, TRUE);
    }

    return bytesWritten;
}