stream/read_machine.js (107 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
'use strict';
var ConcatReadBuffer = require('./concat_read_buffer');
var errors = require('../errors');
var iface = require('../interface');
var ReadResult = require('../base').ReadResult;
module.exports = ReadMachine;
var States = {
PendingLength: 0,
Seeking: 1
};
function ReadMachine(sizeRW, chunkRW, emit) {
if (!(this instanceof ReadMachine)) {
return new ReadMachine(sizeRW, chunkRW, emit);
}
// istanbul ignore if
if (typeof sizeRW.width !== 'number') {
throw errors.expected(sizeRW, 'atomic RW');
}
this.sizeRW = sizeRW;
this.chunkRW = chunkRW;
this.buffer = new ConcatReadBuffer();
this.expecting = this.sizeRW.width;
this.state = States.PendingLength;
// istanbul ignore else
if (typeof emit === 'function') this.emit = emit;
}
// istanbul ignore next
ReadMachine.prototype.emit = function emit() {
};
ReadMachine.prototype.handleChunk = function handleChunk(buf) {
this.buffer.push(buf);
var err = null;
while (this.buffer.avail() >= this.expecting) {
switch (this.state) {
case States.PendingLength:
err = this.pend();
break;
case States.Seeking:
err = this.seek();
break;
// istanbul ignore next
default:
err = errors.BrokenReaderState({
state: this.state,
expecting: this.expecting,
avail: this.buffer.avail()
});
}
if (err) break;
}
return err;
};
var pendReadRes = new ReadResult();
ReadMachine.prototype.pend = function pend() {
var sizeRes = this.sizeRW.poolReadFrom(pendReadRes, this.buffer, 0);
var err = sizeRes.err;
if (!err && !sizeRes.value) {
err = errors.ZeroLengthChunk();
}
if (err) {
this.buffer.shift(this.sizeRW.width);
this.expecting = this.sizeRW.width;
this.state = States.PendingLength;
return err;
} else {
this.expecting = sizeRes.value;
this.state = States.Seeking;
return null;
}
};
var seekReadRes = new ReadResult();
var seekReadRes2 = new ReadResult();
ReadMachine.prototype.seek = function seek() {
var chunk = this.buffer.shift(this.expecting);
// istanbul ignore if
if (!chunk.length) {
return errors.BrokenReaderState({
state: this.state,
expecting: this.expecting,
avail: this.buffer.avail()
});
}
this.expecting = this.sizeRW.width;
this.state = States.PendingLength;
// pooled inline of fromBufferResult
this.chunkRW.poolReadFrom(seekReadRes, chunk, 0);
iface.checkAllReadFrom(seekReadRes, chunk);
if (seekReadRes.err) {
var annBuf = iface.makeAnnotatedBuffer(chunk, 0, false);
this.chunkRW.poolReadFrom(seekReadRes2, annBuf, 0);
iface.checkAllReadFrom(seekReadRes2, chunk);
iface.annotateError(seekReadRes, seekReadRes2, 0, annBuf);
return seekReadRes.err;
} else {
this.emit(seekReadRes.value);
return;
}
};
ReadMachine.prototype.flush = function flush() {
var avail = this.buffer.avail();
if (avail) {
this.buffer.clear();
this.expecting = 4;
this.state = States.PendingLength;
return errors.TruncatedRead({
length: avail,
state: this.state,
expecting: this.expecting
});
} else {
return null;
}
};