static int dequeue_operation()

in src/channel_internal.c [378:432]


static int dequeue_operation(THANDLE(CHANNEL_INTERNAL) channel_internal, THANDLE(ASYNC_OP)* out_op, THANDLE(RC_STRING) pull_correlation_id, ON_DATA_AVAILABLE_CB on_data_available_cb, void* on_data_available_context, THANDLE(RC_STRING) push_correlation_id, ON_DATA_CONSUMED_CB on_data_consumed_cb, void* on_data_consumed_context, THANDLE(RC_PTR) data)
{
    int result;

    CHANNEL_INTERNAL* channel_internal_ptr = THANDLE_GET_T(CHANNEL_INTERNAL)(channel_internal);

    /*Codes_SRS_CHANNEL_INTERNAL_43_109: [ channel_internal_pull shall call DList_RemoveHeadList on the list of pending operations to obtain the operation. ]*/
    /*Codes_SRS_CHANNEL_INTERNAL_43_125: [ channel_internal_push shall call DList_RemoveHeadList on the list of pending operations to obtain the operation. ]*/
    PDLIST_ENTRY op_entry = DList_RemoveHeadList(&channel_internal_ptr->op_list);

    CHANNEL_OP* channel_op = CONTAINING_RECORD(op_entry, CHANNEL_OP, anchor);
    if (on_data_available_cb != NULL)
    {
        /*Codes_SRS_CHANNEL_INTERNAL_43_112: [ channel_internal_pull shall store the correlation_id, on_data_available_cb and on_data_available_context in the obtained operation. ]*/
        THANDLE_INITIALIZE(RC_STRING)(&channel_op->pull_correlation_id, pull_correlation_id);
        channel_op->on_data_available_cb = on_data_available_cb;
        channel_op->on_data_available_context = on_data_available_context;
    }
    else if (on_data_consumed_cb != NULL)
    {
        /*Codes_SRS_CHANNEL_INTERNAL_43_128: [ channel_internal_push shall store the correlation_id, on_data_consumed_cb, on_data_consumed_context and data in the obtained operation. ]*/
        THANDLE_INITIALIZE(RC_STRING)(&channel_op->push_correlation_id, push_correlation_id);
        channel_op->on_data_consumed_cb = on_data_consumed_cb;
        channel_op->on_data_consumed_context = on_data_consumed_context;
        THANDLE_ASSIGN(RC_PTR)(&channel_op->data, data);
    }
    THANDLE(ASYNC_OP) temp = NULL;
    THANDLE_INITIALIZE(ASYNC_OP)(&temp, channel_op->async_op);
    /*Codes_SRS_CHANNEL_INTERNAL_43_113: [ channel_internal_pull shall call threadpool_schedule_work with execute_callbacks as work_function and the obtained operation as work_function_context. ]*/
    /*Codes_SRS_CHANNEL_INTERNAL_43_129: [ channel_internal_push shall call threadpool_schedule_work with execute_callbacks as work_function and the obtained operation as work_function_context. ]*/
    if (threadpool_schedule_work(channel_internal_ptr->threadpool, execute_callbacks, channel_op) != 0)
    {
        LogError("Failure in threadpool_schedule_work(channel_internal_ptr->threadpool=%p, execute_callbacks=%p, channel_op=%p)", channel_internal_ptr->threadpool, execute_callbacks, channel_op);
        result = MU_FAILURE;
        // execute_callbacks calls sm_exec_end once for each non-NULL callback.
        // In the case where threadpool_schedule_work fails, sm_exec_end is called immediately.Therefore the callback must be unset so that sm_exec_end is not called twice.
        if (on_data_available_cb != NULL)
        {
            channel_op->on_data_available_cb = NULL;
        }
        else if (on_data_consumed_cb != NULL)
        {
            channel_op->on_data_consumed_cb = NULL;
        }
    }
    else
    {
        /*Codes_SRS_CHANNEL_INTERNAL_43_114: [ channel_internal_pull shall set *out_op_pull to the THANDLE(ASYNC_OP) of the obtained operation. ]*/
        /*Codes_SRS_CHANNEL_INTERNAL_43_130: [ channel_internal_push shall set *out_op_push to the THANDLE(ASYNC_OP) of the obtained operation. ]*/
        THANDLE_INITIALIZE(ASYNC_OP)(out_op, temp);
        result = 0;
    }
    THANDLE_ASSIGN(ASYNC_OP)(&temp, NULL);
    return result;
}