dmg/io.c (309 lines of code) (raw):
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <dmg/dmg.h>
#include <dmg/compress.h>
#include <dmg/attribution.h>
#include <inttypes.h>
typedef struct block {
size_t bufferSize;
uint32_t idx;
BLKXRun run;
int keepRaw;
unsigned char* inbuf;
size_t insize;
unsigned char* outbuf;
size_t outsize;
struct block* next;
} block;
typedef struct {
size_t runSectors;
size_t bufferSize;
AbstractAttribution* attribution;
// Read
pthread_mutex_t inMut;
AbstractFile* in;
uint32_t numSectors;
uint32_t curRun;
uint64_t curSector;
uint64_t startOff;
unsigned char *nextInBuffer;
size_t nextInSize;
enum ShouldKeepRaw keepRaw;
// Write
pthread_mutex_t outMut;
AbstractFile* out;
BLKXTable *blkx;
uint32_t roomForRuns;
ChecksumFunc uncompressedChk;
void* uncompressedChkToken;
ChecksumFunc compressedChk;
void* compressedChkToken;
Compressor *compressor;
size_t nextPending;
block* pending;
} threadData;
static block* blockAlloc(size_t bufferSize, size_t idx) {
block* b;
ASSERT(b = (block*)malloc(sizeof(block)), "malloc");
b->idx = idx;
b->bufferSize = bufferSize;
b->keepRaw = 0;
b->run.reserved = 0;
ASSERT(b->inbuf = (unsigned char*)malloc(bufferSize), "malloc");
ASSERT(b->outbuf = (unsigned char*)malloc(bufferSize), "malloc");
return b;
}
static void blockFree(block* b) {
free(b->inbuf);
free(b->outbuf);
free(b);
}
// Return NULL when no more blocks
static block* blockRead(threadData* d) {
ASSERT(pthread_mutex_lock(&d->inMut) == 0, "pthread_mutex_lock");
if (d->numSectors == 0) {
ASSERT(pthread_mutex_unlock(&d->inMut) == 0, "pthread_mutex_unlock");
return NULL;
}
block* b = blockAlloc(d->bufferSize, d->curRun);
b->run.sectorStart = d->curSector;
b->run.sectorCount = (d->numSectors > d->runSectors) ? d->runSectors : d->numSectors;
size_t readSize = b->run.sectorCount * SECTOR_SIZE;
if (b->idx == 0) {
ASSERT((b->insize = d->in->read(d->in, b->inbuf, readSize)) == readSize, "mRead");
} else {
// Steal from the next block
memcpy(b->inbuf, d->nextInBuffer, d->nextInSize);
b->insize = d->nextInSize;
}
if (d->numSectors - b->run.sectorCount > 0) {
d->nextInSize = d->in->read(d->in, d->nextInBuffer, readSize);
}
// printf("run %d: sectors=%" PRId64 ", left=%d\n", b->idx, b->run.sectorCount, d->numSectors);
if (d->attribution) {
// We either haven't found the sentinel value yet, or are already past it.
// Either way, keep searching.
if (d->keepRaw == KeepNoneRaw) {
d->keepRaw = d->attribution->shouldKeepRaw(d->attribution, b->inbuf, b->insize, d->nextInBuffer, d->nextInSize);
}
// KeepCurrentAndNextRaw means that the *previous* time through the loop `shouldKeepRaw`
// found the sentinel string, and that it crosses two runs. The previous
// loop already kept its run raw, and so must we. We don't want the _next_ run
// to also be raw though, so we adjust this appropriately.
// Note that KeepCurrentRaw will switch to KeepNoneRaw further down, when we've
// set the run raw.
else if (d->keepRaw == KeepCurrentAndNextRaw) {
d->keepRaw = KeepCurrentRaw;
}
else if (d->keepRaw == KeepCurrentRaw) {
d->keepRaw = KeepRemainingRaw;
}
// printf("keepRaw = %d (%p, %ld)\n", d->keepRaw, b->inbuf, b->insize);
b->keepRaw = (d->keepRaw == KeepCurrentRaw || d->keepRaw == KeepCurrentAndNextRaw);
}
d->curSector += b->run.sectorCount;
d->numSectors -= b->run.sectorCount;
d->curRun++;
ASSERT(pthread_mutex_unlock(&d->inMut) == 0, "pthread_mutex_unlock");
return b;
}
static void blockCompress(block* b, Compressor* comp) {
if (!b->keepRaw) {
ASSERT(comp->compress(b->inbuf, b->insize, b->outbuf, b->bufferSize, &b->outsize, comp->level) == 0, "compress");
}
if(b->keepRaw || ((b->outsize / SECTOR_SIZE) >= (b->run.sectorCount - 15))) {
// printf("Setting type = BLOCK_RAW\n");
b->run.type = BLOCK_RAW;
memcpy(b->outbuf, b->inbuf, b->insize);
b->outsize = b->insize;
} else {
b->run.type = comp->block_type;
}
b->run.compLength = b->outsize;
}
static void blockWrite(threadData* d, block* b) {
if(d->uncompressedChk)
(*d->uncompressedChk)(d->uncompressedChkToken, b->inbuf, b->run.sectorCount * SECTOR_SIZE);
if(d->compressedChk)
(*d->compressedChk)(d->compressedChkToken, b->outbuf, b->outsize);
if (d->attribution)
d->attribution->observeBuffers(d->attribution, b->keepRaw, b->inbuf, b->insize, b->outbuf, b->outsize);
b->run.compOffset = d->out->tell(d->out) - d->blkx->dataStart;
ASSERT(d->out->write(d->out, b->outbuf, b->outsize) == b->outsize, "fwrite");
if(b->idx >= d->roomForRuns) {
d->roomForRuns <<= 1;
d->blkx = (BLKXTable*) realloc(d->blkx, sizeof(BLKXTable) + (d->roomForRuns * sizeof(BLKXRun)));
}
d->blkx->runs[b->idx] = b->run;
blockFree(b);
}
static void blockQueue(threadData* d, block* b) {
// Add to correct slot in ordered pending list
block** bp;
for (bp = &d->pending; *bp && (*bp)->idx < b->idx; bp = &(*bp)->next)
; // pass
b->next = *bp;
*bp = b;
}
static void blockWriteAll(threadData* d) {
while (d->pending && d->pending->idx == d->nextPending) {
block* next = d->pending->next;
blockWrite(d, d->pending);
d->nextPending++;
d->pending = next;
}
}
static void blockQueueAndWrite(threadData* d, block* b) {
ASSERT(pthread_mutex_lock(&d->outMut) == 0, "pthread_mutex_lock");
blockQueue(d, b);
blockWriteAll(d);
ASSERT(pthread_mutex_unlock(&d->outMut) == 0, "pthread_mutex_unlock");
}
static void *threadWorker(void* arg) {
threadData* d = (threadData*)arg;
block *b;
while(true) {
if (!(b = blockRead(d)))
break;
blockCompress(b, d->compressor);
blockQueueAndWrite(d, b);
}
return NULL;
}
BLKXTable* insertBLKX(AbstractFile* out_, AbstractFile* in_, uint32_t firstSectorNumber, uint32_t numSectors_, uint32_t blocksDescriptor,
uint32_t checksumType, ChecksumFunc uncompressedChk_, void* uncompressedChkToken_, ChecksumFunc compressedChk_,
void* compressedChkToken_, Volume* volume, AbstractAttribution* attribution_, Compressor* comp, size_t runSectors) {
threadData td = {
.out = out_,
.in = in_,
.runSectors = runSectors,
.numSectors = numSectors_,
.uncompressedChk = uncompressedChk_,
.uncompressedChkToken = uncompressedChkToken_,
.compressedChk = compressedChk_,
.compressedChkToken = compressedChkToken_,
.attribution = attribution_,
.nextPending = 0,
.pending = NULL,
};
pthread_mutex_init(&td.inMut, NULL);
pthread_mutex_init(&td.outMut, NULL);
td.compressor = comp;
td.blkx = (BLKXTable*) malloc(sizeof(BLKXTable) + (2 * sizeof(BLKXRun)));
td.roomForRuns = 2;
memset(td.blkx, 0, sizeof(BLKXTable) + (td.roomForRuns * sizeof(BLKXRun)));
td.blkx->fUDIFBlocksSignature = UDIF_BLOCK_SIGNATURE;
td.blkx->infoVersion = 1;
td.blkx->firstSectorNumber = firstSectorNumber;
td.blkx->sectorCount = td.numSectors;
td.blkx->dataStart = 0;
td.blkx->decompressBufferRequested = comp->decompressBuffer(runSectors);
if (MIN_DECOMPRESS_BUFFER_SECTORS > td.blkx->decompressBufferRequested) {
td.blkx->decompressBufferRequested = MIN_DECOMPRESS_BUFFER_SECTORS;
}
td.blkx->blocksDescriptor = blocksDescriptor;
td.blkx->reserved1 = 0;
td.blkx->reserved2 = 0;
td.blkx->reserved3 = 0;
td.blkx->reserved4 = 0;
td.blkx->reserved5 = 0;
td.blkx->reserved6 = 0;
memset(&(td.blkx->checksum), 0, sizeof(td.blkx->checksum));
td.blkx->checksum.type = checksumType;
td.blkx->checksum.bitness = checksumBitness(checksumType);
td.blkx->blocksRunCount = 0;
td.bufferSize = SECTOR_SIZE * td.blkx->decompressBufferRequested;
ASSERT(td.nextInBuffer = (unsigned char*) malloc(td.bufferSize), "malloc");
td.curRun = 0;
td.curSector = 0;
td.startOff = td.in->tell(td.in);
td.keepRaw = KeepNoneRaw;
size_t nthreads = sysconf(_SC_NPROCESSORS_ONLN) + 2; // input + output
pthread_t* threads;
ASSERT(threads = (pthread_t*) malloc(nthreads * sizeof(pthread_t)), "malloc");
size_t i;
for (i = 0; i < nthreads; i++)
ASSERT(pthread_create(&threads[i], NULL, threadWorker, &td) == 0, "pthread_create");
for (i = 0; i < nthreads; i++) {
void *ret;
ASSERT(pthread_join(threads[i], &ret) == 0, "pthread_join");
ASSERT(ret == NULL, "thread return");
}
if(td.curRun >= td.roomForRuns) {
td.roomForRuns <<= 1;
td.blkx = (BLKXTable*) realloc(td.blkx, sizeof(BLKXTable) + (td.roomForRuns * sizeof(BLKXRun)));
}
td.blkx->runs[td.curRun].type = BLOCK_TERMINATOR;
td.blkx->runs[td.curRun].reserved = 0;
td.blkx->runs[td.curRun].sectorStart = td.curSector;
td.blkx->runs[td.curRun].sectorCount = 0;
td.blkx->runs[td.curRun].compOffset = td.out->tell(td.out) - td.blkx->dataStart;
td.blkx->runs[td.curRun].compLength = 0;
td.blkx->blocksRunCount = td.curRun + 1;
free(td.nextInBuffer);
return td.blkx;
}
#define DEFAULT_BUFFER_SIZE (1 * 1024 * 1024)
void extractBLKX(AbstractFile* in, AbstractFile* out, BLKXTable* blkx) {
unsigned char* inBuffer;
unsigned char* outBuffer;
unsigned char zero;
size_t bufferSize;
size_t have;
size_t expectedSize;
off_t initialOffset;
int i;
int ret;
uint32_t type;
bufferSize = SECTOR_SIZE * blkx->decompressBufferRequested;
ASSERT(inBuffer = (unsigned char*) malloc(bufferSize), "malloc");
initialOffset = out->tell(out);
ASSERT(initialOffset != -1, "ftello");
zero = 0;
for(i = 0; i < blkx->blocksRunCount; i++) {
ASSERT(in->seek(in, blkx->dataStart + blkx->runs[i].compOffset) == 0, "fseeko");
ASSERT(out->seek(out, initialOffset + (blkx->runs[i].sectorStart * SECTOR_SIZE)) == 0, "mSeek");
if(blkx->runs[i].sectorCount > 0) {
ASSERT(out->seek(out, initialOffset + (blkx->runs[i].sectorStart + blkx->runs[i].sectorCount) * SECTOR_SIZE - 1) == 0, "mSeek");
ASSERT(out->write(out, &zero, 1) == 1, "mWrite");
ASSERT(out->seek(out, initialOffset + (blkx->runs[i].sectorStart * SECTOR_SIZE)) == 0, "mSeek");
}
if(blkx->runs[i].type == BLOCK_TERMINATOR) {
break;
}
if( blkx->runs[i].compLength == 0) {
continue;
}
printf("run %d: start=%" PRId64 " sectors=%" PRId64 ", length=%" PRId64 ", fileOffset=0x%" PRIx64 "\n", i, initialOffset + (blkx->runs[i].sectorStart * SECTOR_SIZE), blkx->runs[i].sectorCount, blkx->runs[i].compLength, blkx->runs[i].compOffset);
switch(blkx->runs[i].type) {
case BLOCK_RAW:
if(blkx->runs[i].compLength > bufferSize) {
uint64_t left = blkx->runs[i].compLength;
void* pageBuffer = malloc(DEFAULT_BUFFER_SIZE);
while(left > 0) {
size_t thisRead;
if(left > DEFAULT_BUFFER_SIZE) {
thisRead = DEFAULT_BUFFER_SIZE;
} else {
thisRead = left;
}
ASSERT((have = in->read(in, pageBuffer, thisRead)) == thisRead, "fread");
ASSERT(out->write(out, pageBuffer, have) == have, "mWrite");
left -= have;
}
free(pageBuffer);
} else {
ASSERT((have = in->read(in, inBuffer, blkx->runs[i].compLength)) == blkx->runs[i].compLength, "fread");
ASSERT(out->write(out, inBuffer, have) == have, "mWrite");
}
break;
case BLOCK_IGNORE:
break;
case BLOCK_COMMENT:
break;
case BLOCK_TERMINATOR:
break;
default:
type = blkx->runs[i].type;
if (compressionBlockTypeSupported(type) != 0) {
fprintf(stderr, "Unsupported block type %#08x\n", type);
exit(1);
}
expectedSize = blkx->runs[i].sectorCount * SECTOR_SIZE;
ASSERT(outBuffer = (unsigned char*)malloc(expectedSize), "malloc");
ASSERT(in->read(in, inBuffer, blkx->runs[i].compLength) == blkx->runs[i].compLength, "fread");
ASSERT(decompressRun(type, inBuffer, blkx->runs[i].compLength, outBuffer, expectedSize) == 0,
"decompression failed");
ASSERT(out->write(out, outBuffer, expectedSize) == expectedSize, "mWrite");
free(outBuffer);
}
}
free(inBuffer);
}