int Kvs_createStream()

in src/source/restful/kvs/restapi_kvs.c [759:857]


int Kvs_createStream(KvsServiceParameter_t *pServPara, KvsCreateStreamParameter_t *pCreatePara, unsigned int *puHttpStatusCode)
{
    int xRes = KVS_ERRNO_NONE;

    STRING_HANDLE xStHttpBody = NULL;
    STRING_HANDLE xStContentLength = NULL;
    char pcXAmzDate[DATE_TIME_ISO_8601_FORMAT_STRING_SIZE] = {0};

    AwsSigV4Handle xAwsSigV4Handle = NULL;

    unsigned int uHttpStatusCode = 0;
    HTTP_HEADERS_HANDLE xHttpReqHeaders = NULL;
    char *pRspBody = NULL;
    size_t uRspBodyLen = 0;

    NetIoHandle xNetIoHandle = NULL;

    if (puHttpStatusCode != NULL)
    {
        *puHttpStatusCode = 0; /* Set to zero to avoid misuse from previous value. */
    }

    if (prvValidateServiceParameter(pServPara) != KVS_ERRNO_NONE || prvValidateCreateStreamParameter(pCreatePara) != KVS_ERRNO_NONE)
    {
        LogError("Invalid argument");
        xRes = KVS_ERRNO_FAIL;
    }
    else if (getTimeInIso8601(pcXAmzDate, sizeof(pcXAmzDate)) != KVS_ERRNO_NONE)
    {
        LogError("Failed to get time");
        xRes = KVS_ERRNO_FAIL;
    }
    else if (
        (xStHttpBody = STRING_construct_sprintf(CREATE_STREAM_HTTP_BODY_TEMPLATE, pCreatePara->pcStreamName, pCreatePara->uDataRetentionInHours)) == NULL ||
        (xStContentLength = STRING_construct_sprintf("%u", STRING_length(xStHttpBody))) == NULL)
    {
        LogError("Failed to allocate HTTP body");
        xRes = KVS_ERRNO_FAIL;
    }
    else if (
        (xHttpReqHeaders = HTTPHeaders_Alloc()) == NULL || HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_HOST, pServPara->pcHost) != HTTP_HEADERS_OK ||
        HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_ACCEPT, VAL_ACCEPT_ANY) != HTTP_HEADERS_OK ||
        HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_CONTENT_LENGTH, STRING_c_str(xStContentLength)) != HTTP_HEADERS_OK ||
        HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_CONTENT_TYPE, VAL_CONTENT_TYPE_APPLICATION_jSON) != HTTP_HEADERS_OK ||
        HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_USER_AGENT, VAL_USER_AGENT) != HTTP_HEADERS_OK ||
        HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_X_AMZ_DATE, pcXAmzDate) != HTTP_HEADERS_OK ||
        (pServPara->pcToken != NULL && (HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_X_AMZ_SECURITY_TOKEN, pServPara->pcToken) != HTTP_HEADERS_OK)))
    {
        LogError("Failed to generate HTTP headers");
        xRes = KVS_ERRNO_FAIL;
    }
    else if (
        (xAwsSigV4Handle = prvSign(pServPara, KVS_URI_CREATE_STREAM, URI_QUERY_EMPTY, xHttpReqHeaders, STRING_c_str(xStHttpBody))) == NULL ||
        HTTPHeaders_AddHeaderNameValuePair(xHttpReqHeaders, HDR_AUTHORIZATION, AwsSigV4_GetAuthorization(xAwsSigV4Handle)) != HTTP_HEADERS_OK)
    {
        LogError("Failed to sign");
        xRes = KVS_ERRNO_FAIL;
    }
    else if (
        (xNetIoHandle = NetIo_create()) == NULL || NetIo_setRecvTimeout(xNetIoHandle, pServPara->uRecvTimeoutMs) != KVS_ERRNO_NONE ||
        NetIo_setSendTimeout(xNetIoHandle, pServPara->uSendTimeoutMs) != KVS_ERRNO_NONE || NetIo_connect(xNetIoHandle, pServPara->pcHost, PORT_HTTPS) != KVS_ERRNO_NONE)
    {
        LogError("Failed to connect to %s\r\n", pServPara->pcHost);
        xRes = KVS_ERRNO_FAIL;
    }
    else if (Http_executeHttpReq(xNetIoHandle, HTTP_METHOD_POST, KVS_URI_CREATE_STREAM, xHttpReqHeaders, STRING_c_str(xStHttpBody)) != KVS_ERRNO_NONE)
    {
        LogError("Failed send http request to %s", pServPara->pcHost);
        xRes = KVS_ERRNO_FAIL;
    }
    else if (Http_recvHttpRsp(xNetIoHandle, &uHttpStatusCode, &pRspBody, &uRspBodyLen))
    {
        LogError("Failed recv http response from %s", pServPara->pcHost);
        xRes = KVS_ERRNO_FAIL;
    }
    else
    {
        if (puHttpStatusCode != NULL)
        {
            *puHttpStatusCode = uHttpStatusCode;
        }

        if (uHttpStatusCode != 200)
        {
            LogInfo("Create Stream failed, HTTP status code: %u", uHttpStatusCode);
            LogInfo("HTTP response message:%.*s", (int)uRspBodyLen, pRspBody);
        }
    }

    NetIo_disconnect(xNetIoHandle);
    NetIo_terminate(xNetIoHandle);
    SAFE_FREE(pRspBody);
    HTTPHeaders_Free(xHttpReqHeaders);
    AwsSigV4_Terminate(xAwsSigV4Handle);
    STRING_delete(xStContentLength);
    STRING_delete(xStHttpBody);

    return xRes;
}