in src/source/Response.c [608:728]
SIZE_T postReadCallback(PCHAR pBuffer, SIZE_T size, SIZE_T numItems, PVOID customData)
{
DLOGV("postBodyStreamingReadFunc (curl callback) invoked");
PCurlResponse pCurlResponse = NULL;
PCurlApiCallbacks pCurlApiCallbacks;
SIZE_T bufferSize = size * numItems, bytesWritten = 0;
STATUS retStatus = STATUS_SUCCESS;
UINT32 retrievedSize = 0;
UINT64 sleepTime = BASE_GET_DATA_SLEEP_TIME;
UINT8 iter = 0;
UPLOAD_HANDLE uploadHandle;
PCurlRequest pCurlRequest = (PCurlRequest) customData;
if (pCurlRequest == NULL || pCurlRequest->pCurlResponse == NULL || pCurlRequest->pCurlApiCallbacks == NULL) {
bytesWritten = CURL_READFUNC_ABORT;
CHK(FALSE, retStatus);
}
pCurlResponse = pCurlRequest->pCurlResponse;
pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
uploadHandle = pCurlResponse->pCurlRequest->uploadHandle;
if (pCurlResponse->paused) {
bytesWritten = CURL_READFUNC_PAUSE;
CHK(FALSE, retStatus);
}
if (pCurlResponse->endOfStream || ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating)) {
DLOGI("Closing connection for upload stream handle: %" PRIu64, uploadHandle);
CHK(FALSE, retStatus);
}
do {
// First iteration we do not sleep, after that we sleep
// 20 40 80 160 320 (ms)
if (iter > 0) {
THREAD_SLEEP(sleepTime);
sleepTime *=2;
}
retStatus =
getKinesisVideoStreamData(pCurlResponse->pCurlRequest->streamHandle, uploadHandle, (PBYTE) pBuffer, (UINT32) bufferSize, &retrievedSize);
if (pCurlApiCallbacks->curlReadCallbackHookFn != NULL) {
retStatus = pCurlApiCallbacks->curlReadCallbackHookFn(pCurlResponse, uploadHandle, (PBYTE) pBuffer, (UINT32) bufferSize, &retrievedSize,
retStatus);
}
bytesWritten = (SIZE_T) retrievedSize;
DLOGV("Get Stream data returned: buffer size: %u written bytes: %u for upload handle: %" PRIu64 " current stream handle: %" PRIu64,
bufferSize, bytesWritten, uploadHandle, pCurlResponse->pCurlRequest->streamHandle);
iter++;
} while(iter < MAX_GET_DATA_ITER && bytesWritten == 0 && (retStatus == STATUS_SUCCESS || retStatus == STATUS_NO_MORE_DATA_AVAILABLE));
// The return should be OK, no more data or an end of stream
switch (retStatus) {
case STATUS_SUCCESS:
case STATUS_NO_MORE_DATA_AVAILABLE:
// Media pipeline thread might be blocked due to heap or temporal limit.
// Pause curl read and wait for persisted ack.
if (bytesWritten == 0) {
DLOGD("Pausing CURL read for upload handle: %" PRIu64, uploadHandle);
bytesWritten = CURL_READFUNC_PAUSE;
}
break;
case STATUS_END_OF_STREAM:
DLOGI("Reported end-of-stream for stream %s. Upload handle: %" PRIu64, pCurlRequest->streamName, uploadHandle);
pCurlResponse->endOfStream = TRUE;
// Output the remaining bytes
break;
case STATUS_AWAITING_PERSISTED_ACK:
// If bytes_written == 0, set it to pause to exit the loop
if (bytesWritten == 0) {
DLOGD("Pausing CURL read for upload handle: %" PRIu64 " waiting for last ack.", uploadHandle);
bytesWritten = CURL_READFUNC_PAUSE;
}
break;
case STATUS_UPLOAD_HANDLE_ABORTED:
DLOGW("Reported abort-connection for Upload handle: %" PRIu64, uploadHandle);
bytesWritten = CURL_READFUNC_ABORT;
// Graceful shutdown as PIC is aware of terminated stream
pCurlResponse->endOfStream = TRUE;
break;
default:
DLOGE("Failed to get data from the stream with an error: 0x%08x", retStatus);
// set bytes_written to terminate and close the connection
bytesWritten = CURL_READFUNC_ABORT;
}
CleanUp:
if (bytesWritten != CURL_READFUNC_ABORT && bytesWritten != CURL_READFUNC_PAUSE) {
DLOGD("Wrote %u bytes to Kinesis Video. Upload stream handle: %" PRIu64, bytesWritten, uploadHandle);
if (bytesWritten != 0 && pCurlResponse->debugDumpFile) {
retStatus = writeFile(pCurlResponse->debugDumpFilePath, TRUE, TRUE, (PBYTE) pBuffer, bytesWritten);
if (STATUS_FAILED(retStatus)) {
DLOGW("Failed to write to debug dump file with error: 0x%08x", retStatus);
}
}
} else if (bytesWritten == CURL_READFUNC_PAUSE) {
pCurlResponse->paused = TRUE;
}
// Since curl is about to terminate gracefully, set flag to prevent shutdown thread from timing it out.
if ((bytesWritten == CURL_READFUNC_ABORT || bytesWritten == 0) && pCurlResponse != NULL) {
ATOMIC_STORE_BOOL(&pCurlResponse->terminated, TRUE);
}
return bytesWritten;
}