in lib/compress/zstdmt_compress.c [667:788]
static void ZSTDMT_compressionJob(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);
buffer_t dstBuff = job->dstBuff;
size_t lastCBlockSize = 0;
/* resources */
if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));
if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */
dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));
job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
}
if (jobParams.ldmParams.enableLdm == ZSTD_ps_enable && rawSeqStore.seq == NULL)
JOB_ERROR(ERROR(memory_allocation));
/* Don't compute the checksum for chunks, since we compute it externally,
* but write it in the header.
*/
if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;
/* Don't run LDM for the chunks, since we handle it externally */
jobParams.ldmParams.enableLdm = ZSTD_ps_disable;
/* Correct nbWorkers to 0. */
jobParams.nbWorkers = 0;
/* init */
if (job->cdict) {
size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);
assert(job->firstJob); /* only allowed for first job */
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} else { /* srcStart points at reloaded section */
U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;
{ size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);
if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);
}
if (!job->firstJob) {
size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0);
if (ZSTD_isError(err)) JOB_ERROR(err);
}
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
ZSTD_dtlm_fast,
NULL, /*cdict*/
&jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) JOB_ERROR(initError);
} }
/* Perform serial step as early as possible, but after CCtx initialization */
ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);
if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */
size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);
if (ZSTD_isError(hSize)) JOB_ERROR(hSize);
DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);
ZSTD_invalidateRepCodes(cctx);
}
/* compress */
{ size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX;
int const nbChunks = (int)((job->src.size + (chunkSize-1)) / chunkSize);
const BYTE* ip = (const BYTE*) job->src.start;
BYTE* const ostart = (BYTE*)dstBuff.start;
BYTE* op = ostart;
BYTE* oend = op + dstBuff.capacity;
int chunkNb;
if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize); /* check overflow */
DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks);
assert(job->cSize == 0);
for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {
size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize);
if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
ip += chunkSize;
op += cSize; assert(op < oend);
/* stats */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
job->cSize += cSize;
job->consumed = chunkSize * chunkNb;
DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
(U32)cSize, (U32)job->cSize);
ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}
/* last block */
assert(chunkSize > 0);
assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {
size_t const lastBlockSize1 = job->src.size & (chunkSize-1);
size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;
size_t const cSize = (job->lastJob) ?
ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) :
ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
if (ZSTD_isError(cSize)) JOB_ERROR(cSize);
lastCBlockSize = cSize;
} }
if (!job->firstJob) {
/* Double check that we don't have an ext-dict, because then our
* repcode invalidation doesn't work.
*/
assert(!ZSTD_window_hasExtDict(cctx->blockState.matchState.window));
}
ZSTD_CCtx_trace(cctx, 0);
_endJob:
ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize);
if (job->prefix.size > 0)
DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start);
DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start);
/* release resources */
ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);
if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);
job->cSize += lastCBlockSize;
job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */
ZSTD_pthread_cond_signal(&job->job_cond);
ZSTD_pthread_mutex_unlock(&job->job_mutex);
}