size_t ZSTD_compressStream2()

in lib/compress/zstd_compress.c [5664:5761]


size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
                             ZSTD_outBuffer* output,
                             ZSTD_inBuffer* input,
                             ZSTD_EndDirective endOp)
{
    DEBUGLOG(5, "ZSTD_compressStream2, endOp=%u ", (unsigned)endOp);
    /* check conditions */
    RETURN_ERROR_IF(output->pos > output->size, dstSize_tooSmall, "invalid output buffer");
    RETURN_ERROR_IF(input->pos  > input->size, srcSize_wrong, "invalid input buffer");
    RETURN_ERROR_IF((U32)endOp > (U32)ZSTD_e_end, parameter_outOfBound, "invalid endDirective");
    assert(cctx != NULL);

    /* transparent initialization stage */
    if (cctx->streamStage == zcss_init) {
        size_t const inputSize = input->size - input->pos;  /* no obligation to start from pos==0 */
        size_t const totalInputSize = inputSize + cctx->stableIn_notConsumed;
        if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */
          && (endOp == ZSTD_e_continue)                             /* no flush requested, more input to come */
          && (totalInputSize < ZSTD_BLOCKSIZE_MAX) ) {              /* not even reached one block yet */
            if (cctx->stableIn_notConsumed) {  /* not the first time */
                /* check stable source guarantees */
                RETURN_ERROR_IF(input->src != cctx->expectedInBuffer.src, stabilityCondition_notRespected, "stableInBuffer condition not respected: wrong src pointer");
                RETURN_ERROR_IF(input->pos != cctx->expectedInBuffer.size, stabilityCondition_notRespected, "stableInBuffer condition not respected: externally modified pos");
            }
            /* pretend input was consumed, to give a sense forward progress */
            input->pos = input->size;
            /* save stable inBuffer, for later control, and flush/end */
            cctx->expectedInBuffer = *input;
            /* but actually input wasn't consumed, so keep track of position from where compression shall resume */
            cctx->stableIn_notConsumed += inputSize;
            /* don't initialize yet, wait for the first block of flush() order, for better parameters adaptation */
            return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format);  /* at least some header to produce */
        }
        FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, totalInputSize), "compressStream2 initialization failed");
        ZSTD_setBufferExpectations(cctx, output, input);   /* Set initial buffer expectations now that we've initialized */
    }
    /* end of transparent initialization stage */

    FORWARD_IF_ERROR(ZSTD_checkBufferStability(cctx, output, input, endOp), "invalid buffers");
    /* compression stage */
#ifdef ZSTD_MULTITHREAD
    if (cctx->appliedParams.nbWorkers > 0) {
        size_t flushMin;
        if (cctx->cParamsChanged) {
            ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
            cctx->cParamsChanged = 0;
        }
        if (cctx->stableIn_notConsumed) {
            assert(cctx->appliedParams.inBufferMode == ZSTD_bm_stable);
            /* some early data was skipped - make it available for consumption */
            assert(input->pos >= cctx->stableIn_notConsumed);
            input->pos -= cctx->stableIn_notConsumed;
            cctx->stableIn_notConsumed = 0;
        }
        for (;;) {
            size_t const ipos = input->pos;
            size_t const opos = output->pos;
            flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
            cctx->consumedSrcSize += (U64)(input->pos - ipos);
            cctx->producedCSize += (U64)(output->pos - opos);
            if ( ZSTD_isError(flushMin)
              || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
                if (flushMin == 0)
                    ZSTD_CCtx_trace(cctx, 0);
                ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
            }
            FORWARD_IF_ERROR(flushMin, "ZSTDMT_compressStream_generic failed");

            if (endOp == ZSTD_e_continue) {
                /* We only require some progress with ZSTD_e_continue, not maximal progress.
                 * We're done if we've consumed or produced any bytes, or either buffer is
                 * full.
                 */
                if (input->pos != ipos || output->pos != opos || input->pos == input->size || output->pos == output->size)
                    break;
            } else {
                assert(endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
                /* We require maximal progress. We're done when the flush is complete or the
                 * output buffer is full.
                 */
                if (flushMin == 0 || output->pos == output->size)
                    break;
            }
        }
        DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
        /* Either we don't require maximum forward progress, we've finished the
         * flush, or we are out of output space.
         */
        assert(endOp == ZSTD_e_continue || flushMin == 0 || output->pos == output->size);
        ZSTD_setBufferExpectations(cctx, output, input);
        return flushMin;
    }
#endif /* ZSTD_MULTITHREAD */
    FORWARD_IF_ERROR( ZSTD_compressStream_generic(cctx, output, input, endOp) , "");
    DEBUGLOG(5, "completed ZSTD_compressStream2");
    ZSTD_setBufferExpectations(cctx, output, input);
    return cctx->outBuffContentSize - cctx->outBuffFlushedSize; /* remaining to flush */
}