backend/new.js (1,420 lines of code) (raw):
const { parseOpId, copyObject } = require('../src/common')
const { COLUMN_TYPE, VALUE_TYPE, ACTIONS, OBJECT_TYPE, DOC_OPS_COLUMNS, CHANGE_COLUMNS, DOCUMENT_COLUMNS,
encoderByColumnId, decoderByColumnId, makeDecoders, decodeValue,
encodeChange, decodeChangeColumns, decodeChangeMeta, decodeChanges, decodeDocumentHeader, encodeDocumentHeader } = require('./columnar')
const MAX_BLOCK_SIZE = 600 // operations
const BLOOM_BITS_PER_ENTRY = 10, BLOOM_NUM_PROBES = 7 // 1% false positive rate
const BLOOM_FILTER_SIZE = Math.floor(BLOOM_BITS_PER_ENTRY * MAX_BLOCK_SIZE / 8) // bytes
const objActorIdx = 0, objCtrIdx = 1, keyActorIdx = 2, keyCtrIdx = 3, keyStrIdx = 4,
idActorIdx = 5, idCtrIdx = 6, insertIdx = 7, actionIdx = 8, valLenIdx = 9, valRawIdx = 10,
predNumIdx = 13, predActorIdx = 14, predCtrIdx = 15, succNumIdx = 13, succActorIdx = 14, succCtrIdx = 15
const PRED_COLUMN_IDS = CHANGE_COLUMNS
.filter(column => ['predNum', 'predActor', 'predCtr'].includes(column.columnName))
.map(column => column.columnId)
/**
* Updates `objectTree`, which is a tree of nested objects, so that afterwards
* `objectTree[path[0]][path[1]][...] === value`. Only the root object is mutated, whereas any
* nested objects are copied before updating. This means that once the root object has been
* shallow-copied, this function can be used to update it without mutating the previous version.
*/
function deepCopyUpdate(objectTree, path, value) {
if (path.length === 1) {
objectTree[path[0]] = value
} else {
let child = Object.assign({}, objectTree[path[0]])
deepCopyUpdate(child, path.slice(1), value)
objectTree[path[0]] = child
}
}
/**
* Scans a block of document operations, encoded as columns `docCols`, to find the position at which
* an operation (or sequence of operations) `ops` should be applied. `actorIds` is the array that
* maps actor numbers to hexadecimal actor IDs. `resumeInsertion` is true if we're performing a list
* insertion and we already found the reference element in a previous block, but we reached the end
* of that previous block while scanning for the actual insertion position, and so we're continuing
* the scan in a subsequent block.
*
* Returns an object with keys:
* - `found`: false if we were scanning for a reference element in a list but couldn't find it;
* true otherwise.
* - `skipCount`: the number of operations, counted from the start of the block, after which the
* new operations should be inserted or applied.
* - `visibleCount`: if modifying a list object, the number of visible (i.e. non-deleted) list
* elements that precede the position where the new operations should be applied.
*/
function seekWithinBlock(ops, docCols, actorIds, resumeInsertion) {
for (let col of docCols) col.decoder.reset()
const { objActor, objCtr, keyActor, keyCtr, keyStr, idActor, idCtr, insert } = ops
const [objActorD, objCtrD, /* keyActorD */, /* keyCtrD */, keyStrD, idActorD, idCtrD, insertD, actionD,
/* valLenD */, /* valRawD */, /* chldActorD */, /* chldCtrD */, succNumD] = docCols.map(col => col.decoder)
let skipCount = 0, visibleCount = 0, elemVisible = false, nextObjActor = null, nextObjCtr = null
let nextIdActor = null, nextIdCtr = null, nextKeyStr = null, nextInsert = null, nextSuccNum = 0
// Seek to the beginning of the object being updated
if (objCtr !== null && !resumeInsertion) {
while (!objCtrD.done || !objActorD.done || !actionD.done) {
nextObjCtr = objCtrD.readValue()
nextObjActor = actorIds[objActorD.readValue()]
actionD.skipValues(1)
if (nextObjCtr === null || !nextObjActor || nextObjCtr < objCtr ||
(nextObjCtr === objCtr && nextObjActor < objActor)) {
skipCount += 1
} else {
break
}
}
}
if ((nextObjCtr !== objCtr || nextObjActor !== objActor) && !resumeInsertion) {
return {found: true, skipCount, visibleCount}
}
// Seek to the appropriate key (if string key is used)
if (keyStr !== null) {
keyStrD.skipValues(skipCount)
while (!keyStrD.done) {
const objActorIndex = objActorD.readValue()
nextObjActor = objActorIndex === null ? null : actorIds[objActorIndex]
nextObjCtr = objCtrD.readValue()
nextKeyStr = keyStrD.readValue()
if (nextKeyStr !== null && nextKeyStr < keyStr &&
nextObjCtr === objCtr && nextObjActor === objActor) {
skipCount += 1
} else {
break
}
}
return {found: true, skipCount, visibleCount}
}
idCtrD.skipValues(skipCount)
idActorD.skipValues(skipCount)
insertD.skipValues(skipCount)
succNumD.skipValues(skipCount)
nextIdCtr = idCtrD.readValue()
nextIdActor = actorIds[idActorD.readValue()]
nextInsert = insertD.readValue()
nextSuccNum = succNumD.readValue()
// If we are inserting into a list, an opId key is used, and we need to seek to a position *after*
// the referenced operation. Moreover, we need to skip over any existing operations with a greater
// opId than the new insertion, for CRDT convergence on concurrent insertions in the same place.
if (insert) {
// If insertion is not at the head, search for the reference element
if (!resumeInsertion && keyCtr !== null && keyCtr > 0 && keyActor !== null) {
skipCount += 1
while (!idCtrD.done && !idActorD.done && (nextIdCtr !== keyCtr || nextIdActor !== keyActor)) {
if (nextInsert) elemVisible = false
if (nextSuccNum === 0 && !elemVisible) {
visibleCount += 1
elemVisible = true
}
nextIdCtr = idCtrD.readValue()
nextIdActor = actorIds[idActorD.readValue()]
nextObjCtr = objCtrD.readValue()
nextObjActor = actorIds[objActorD.readValue()]
nextInsert = insertD.readValue()
nextSuccNum = succNumD.readValue()
if (nextObjCtr === objCtr && nextObjActor === objActor) skipCount += 1; else break
}
if (nextObjCtr !== objCtr || nextObjActor !== objActor || nextIdCtr !== keyCtr ||
nextIdActor !== keyActor || !nextInsert) {
return {found: false, skipCount, visibleCount}
}
if (nextInsert) elemVisible = false
if (nextSuccNum === 0 && !elemVisible) {
visibleCount += 1
elemVisible = true
}
// Set up the next* variables to the operation following the reference element
if (idCtrD.done || idActorD.done) return {found: true, skipCount, visibleCount}
nextIdCtr = idCtrD.readValue()
nextIdActor = actorIds[idActorD.readValue()]
nextObjCtr = objCtrD.readValue()
nextObjActor = actorIds[objActorD.readValue()]
nextInsert = insertD.readValue()
nextSuccNum = succNumD.readValue()
}
// Skip over any list elements with greater ID than the new one, and any non-insertions
while ((!nextInsert || nextIdCtr > idCtr || (nextIdCtr === idCtr && nextIdActor > idActor)) &&
nextObjCtr === objCtr && nextObjActor === objActor) {
skipCount += 1
if (nextInsert) elemVisible = false
if (nextSuccNum === 0 && !elemVisible) {
visibleCount += 1
elemVisible = true
}
if (!idCtrD.done && !idActorD.done) {
nextIdCtr = idCtrD.readValue()
nextIdActor = actorIds[idActorD.readValue()]
nextObjCtr = objCtrD.readValue()
nextObjActor = actorIds[objActorD.readValue()]
nextInsert = insertD.readValue()
nextSuccNum = succNumD.readValue()
} else {
break
}
}
} else if (keyCtr !== null && keyCtr > 0 && keyActor !== null) {
// If we are updating an existing list element, seek to just before the referenced ID
while ((!nextInsert || nextIdCtr !== keyCtr || nextIdActor !== keyActor) &&
nextObjCtr === objCtr && nextObjActor === objActor) {
skipCount += 1
if (nextInsert) elemVisible = false
if (nextSuccNum === 0 && !elemVisible) {
visibleCount += 1
elemVisible = true
}
if (!idCtrD.done && !idActorD.done) {
nextIdCtr = idCtrD.readValue()
nextIdActor = actorIds[idActorD.readValue()]
nextObjCtr = objCtrD.readValue()
nextObjActor = actorIds[objActorD.readValue()]
nextInsert = insertD.readValue()
nextSuccNum = succNumD.readValue()
} else {
break
}
}
if (nextObjCtr !== objCtr || nextObjActor !== objActor || nextIdCtr !== keyCtr ||
nextIdActor !== keyActor || !nextInsert) {
return {found: false, skipCount, visibleCount}
}
}
return {found: true, skipCount, visibleCount}
}
/**
* Returns the number of list elements that should be added to a list index when skipping over the
* block with index `blockIndex` in the list object with ID `objectId`.
*/
function visibleListElements(docState, blockIndex, objectId) {
const thisBlock = docState.blocks[blockIndex]
const nextBlock = docState.blocks[blockIndex + 1]
let blockVisible = thisBlock.numVisible[objectId]
if (blockVisible !== undefined) {
// If a list element is split across the block boundary, don't double-count it
if (thisBlock.lastVisibleActor === nextBlock.firstVisibleActor &&
thisBlock.lastVisibleActor !== undefined &&
thisBlock.lastVisibleCtr === nextBlock.firstVisibleCtr &&
thisBlock.lastVisibleCtr !== undefined) blockVisible -= 1
return blockVisible
} else {
return 0
}
}
/**
* Scans the blocks of document operations to find the position where a new operation should be
* inserted. Returns an object with keys:
* - `blockIndex`: the index of the block into which we should insert the new operation
* - `skipCount`: the number of operations, counted from the start of the block, after which the
* new operations should be inserted or merged.
* - `visibleCount`: if modifying a list object, the number of visible (i.e. non-deleted) list
* elements that precede the position where the new operations should be applied.
*/
function seekToOp(docState, ops) {
const { objActor, objCtr, keyActor, keyCtr, keyStr } = ops
let blockIndex = 0, totalVisible = 0
// Skip any blocks that contain only objects with lower objectIds
if (objCtr !== null) {
while (blockIndex < docState.blocks.length - 1) {
const blockActor = docState.blocks[blockIndex].lastObjectActor === undefined ? undefined
: docState.actorIds[docState.blocks[blockIndex].lastObjectActor]
const blockCtr = docState.blocks[blockIndex].lastObjectCtr
if (blockCtr === undefined || blockCtr < objCtr || (blockCtr === objCtr && blockActor < objActor)) {
blockIndex++
} else {
break
}
}
}
if (keyStr !== null) {
// String key is used. First skip any blocks that contain only lower keys
while (blockIndex < docState.blocks.length - 1) {
const blockLastKey = docState.blocks[blockIndex].lastKey[ops.objId]
if (blockLastKey !== undefined && blockLastKey < keyStr) blockIndex++; else break
}
// When we have a candidate block, decode it to find the exact insertion position
const {skipCount} = seekWithinBlock(ops, docState.blocks[blockIndex].columns, docState.actorIds, false)
return {blockIndex, skipCount, visibleCount: 0}
} else {
// List operation
const insertAtHead = keyCtr === null || keyCtr === 0 || keyActor === null
const keyActorNum = keyActor === null ? null : docState.actorIds.indexOf(keyActor)
let resumeInsertion = false
while (true) {
// Search for the reference element, skipping any blocks whose Bloom filter does not contain
// the reference element. We only do this if not inserting at the head (in which case there is
// no reference element), or if we already found the reference element in an earlier block (in
// which case we have resumeInsertion === true). The latter case arises with concurrent
// insertions at the same position, and so we have to scan beyond the reference element to
// find the actual insertion position, and that further scan crosses a block boundary.
if (!insertAtHead && !resumeInsertion) {
while (blockIndex < docState.blocks.length - 1 &&
!bloomFilterContains(docState.blocks[blockIndex].bloom, keyActorNum, keyCtr)) {
// If we reach the end of the list object without a Bloom filter hit, the reference element
// doesn't exist
if (docState.blocks[blockIndex].lastObjectCtr > objCtr) {
throw new RangeError(`Reference element not found: ${keyCtr}@${keyActor}`)
}
// Add up number of visible list elements in any blocks we skip, for list index computation
totalVisible += visibleListElements(docState, blockIndex, ops.objId)
blockIndex++
}
}
// We have a candidate block. Decode it to see whether it really contains the reference element
const {found, skipCount, visibleCount} = seekWithinBlock(ops,
docState.blocks[blockIndex].columns,
docState.actorIds,
resumeInsertion)
if (blockIndex === docState.blocks.length - 1) {
// Last block: if we haven't found the reference element by now, it's an error
if (found) {
return {blockIndex, skipCount, visibleCount: totalVisible + visibleCount}
} else {
throw new RangeError(`Reference element not found: ${keyCtr}@${keyActor}`)
}
} else if (found && skipCount < docState.blocks[blockIndex].numOps) {
// The insertion position lies within the current block
return {blockIndex, skipCount, visibleCount: totalVisible + visibleCount}
}
// Reference element not found and there are still blocks left ==> it was probably a false positive.
// Reference element found, but we skipped all the way to the end of the block ==> we need to
// continue scanning the next block to find the actual insertion position.
// Either way, go back round the loop again to skip blocks until the next Bloom filter hit.
resumeInsertion = found && ops.insert
totalVisible += visibleListElements(docState, blockIndex, ops.objId)
blockIndex++
}
}
}
/**
* Updates Bloom filter `bloom`, given as a Uint8Array, to contain the list element ID consisting of
* counter `elemIdCtr` and actor number `elemIdActor`. We don't actually bother computing a hash
* function, since those two integers serve perfectly fine as input. We turn the two integers into a
* sequence of probe indexes using the triple hashing algorithm from the following paper:
*
* Peter C. Dillinger and Panagiotis Manolios. Bloom Filters in Probabilistic Verification.
* 5th International Conference on Formal Methods in Computer-Aided Design (FMCAD), November 2004.
* http://www.ccis.northeastern.edu/home/pete/pub/bloom-filters-verification.pdf
*/
function bloomFilterAdd(bloom, elemIdActor, elemIdCtr) {
let modulo = 8 * bloom.byteLength, x = elemIdCtr % modulo, y = elemIdActor % modulo
// Use one step of FNV-1a to compute a third value from the two inputs.
// Taken from http://www.isthe.com/chongo/tech/comp/fnv/index.html
// The prime is just over 2^24, so elemIdCtr can be up to about 2^29 = 500 million before the
// result of the multiplication exceeds 2^53. And even if it does exceed 2^53 and loses precision,
// that shouldn't be a problem as it should still be deterministic, and the Bloom filter
// computation only needs to be internally consistent within this library.
let z = ((elemIdCtr ^ elemIdActor) * 16777619 >>> 0) % modulo
for (let i = 0; i < BLOOM_NUM_PROBES; i++) {
bloom[x >>> 3] |= 1 << (x & 7)
x = (x + y) % modulo
y = (y + z) % modulo
}
}
/**
* Returns true if the list element ID consisting of counter `elemIdCtr` and actor number
* `elemIdActor` is likely to be contained in the Bloom filter `bloom`.
*/
function bloomFilterContains(bloom, elemIdActor, elemIdCtr) {
let modulo = 8 * bloom.byteLength, x = elemIdCtr % modulo, y = elemIdActor % modulo
let z = ((elemIdCtr ^ elemIdActor) * 16777619 >>> 0) % modulo
// See comments in the bloomFilterAdd function for an explanation
for (let i = 0; i < BLOOM_NUM_PROBES; i++) {
if ((bloom[x >>> 3] & (1 << (x & 7))) === 0) {
return false
}
x = (x + y) % modulo
y = (y + z) % modulo
}
return true
}
/**
* Reads the relevant columns of a block of operations and updates that block to contain the
* metadata we need to efficiently figure out where to insert new operations.
*/
function updateBlockMetadata(block, actorIds) {
block.bloom = new Uint8Array(BLOOM_FILTER_SIZE)
block.lastKey = {}
block.numVisible = {}
block.numOps = 0
block.lastObjectActor = undefined
block.lastObjectCtr = undefined
block.firstVisibleActor = undefined
block.firstVisibleCtr = undefined
block.lastVisibleActor = undefined
block.lastVisibleCtr = undefined
for (let col of block.columns) col.decoder.reset()
const [objActorD, objCtrD, keyActorD, keyCtrD, keyStrD, idActorD, idCtrD, insertD, /* actionD */,
/* valLenD */, /* valRawD */, /* chldActorD */, /* chldCtrD */, succNumD] = block.columns.map(col => col.decoder)
while (!idCtrD.done) {
block.numOps += 1
const objActor = objActorD.readValue(), objCtr = objCtrD.readValue()
const keyActor = keyActorD.readValue(), keyCtr = keyCtrD.readValue(), keyStr = keyStrD.readValue()
const idActor = idActorD.readValue(), idCtr = idCtrD.readValue()
const insert = insertD.readValue(), succNum = succNumD.readValue()
const objectId = objActor === null ? '_root' : `${objCtr}@${actorIds[objActor]}`
if (objActor !== null && objCtr !== null) {
block.lastObjectActor = objActor
block.lastObjectCtr = objCtr
}
if (keyStr !== null) {
// Map key: for each object, record the highest key contained in the block
block.lastKey[objectId] = keyStr
} else if (insert || keyCtr !== null) {
// List element
if (block.numVisible[objectId] === undefined) block.numVisible[objectId] = 0
const elemIdActor = insert ? idActor : keyActor
const elemIdCtr = insert ? idCtr : keyCtr
bloomFilterAdd(block.bloom, elemIdActor, elemIdCtr)
// If the list element is visible, update the block metadata accordingly
if (succNum === 0) {
if (block.firstVisibleActor === undefined) block.firstVisibleActor = elemIdActor
if (block.firstVisibleCtr === undefined) block.firstVisibleCtr = elemIdCtr
if (block.lastVisibleActor !== elemIdActor || block.lastVisibleCtr !== elemIdCtr) {
block.numVisible[objectId] += 1
}
block.lastVisibleActor = elemIdActor
block.lastVisibleCtr = elemIdCtr
}
}
}
}
/**
* Updates a block's metadata based on an operation being added to a block.
*/
function addBlockOperation(block, op, objectId, actorIds, isChangeOp) {
// Keep track of the largest objectId contained within a block
if (op[objActorIdx] !== null && op[objCtrIdx] !== null &&
(block.lastObjectCtr === undefined || block.lastObjectCtr < op[objCtrIdx] ||
(block.lastObjectCtr === op[objCtrIdx] && actorIds[block.lastObjectActor] < actorIds[op[objActorIdx]]))) {
block.lastObjectActor = op[objActorIdx]
block.lastObjectCtr = op[objCtrIdx]
}
if (op[keyStrIdx] !== null) {
// TODO this comparison should use UTF-8 encoding, not JavaScript's UTF-16
if (block.lastKey[objectId] === undefined || block.lastKey[objectId] < op[keyStrIdx]) {
block.lastKey[objectId] = op[keyStrIdx]
}
} else {
// List element
const elemIdActor = op[insertIdx] ? op[idActorIdx] : op[keyActorIdx]
const elemIdCtr = op[insertIdx] ? op[idCtrIdx] : op[keyCtrIdx]
bloomFilterAdd(block.bloom, elemIdActor, elemIdCtr)
if (op[succNumIdx] === 0 || isChangeOp) {
if (block.firstVisibleActor === undefined) block.firstVisibleActor = elemIdActor
if (block.firstVisibleCtr === undefined) block.firstVisibleCtr = elemIdCtr
block.lastVisibleActor = elemIdActor
block.lastVisibleCtr = elemIdCtr
}
}
}
/**
* Takes a block containing too many operations, and splits it into a sequence of adjacent blocks of
* roughly equal size.
*/
function splitBlock(block, actorIds) {
for (let col of block.columns) col.decoder.reset()
// Make each of the resulting blocks between 50% and 80% full (leaving a bit of space in each
// block so that it doesn't get split again right away the next time an operation is added).
// The upper bound cannot be lower than 75% since otherwise we would end up with a block less than
// 50% full when going from two to three blocks.
const numBlocks = Math.ceil(block.numOps / (0.8 * MAX_BLOCK_SIZE))
let blocks = [], opsSoFar = 0
for (let i = 1; i <= numBlocks; i++) {
const opsToCopy = Math.ceil(i * block.numOps / numBlocks) - opsSoFar
const encoders = block.columns.map(col => ({columnId: col.columnId, encoder: encoderByColumnId(col.columnId)}))
copyColumns(encoders, block.columns, opsToCopy)
const decoders = encoders.map(col => {
const decoder = decoderByColumnId(col.columnId, col.encoder.buffer)
return {columnId: col.columnId, decoder}
})
const newBlock = {columns: decoders}
updateBlockMetadata(newBlock, actorIds)
blocks.push(newBlock)
opsSoFar += opsToCopy
}
return blocks
}
/**
* Takes an array of blocks and concatenates the corresponding columns across all of the blocks.
*/
function concatBlocks(blocks) {
const encoders = blocks[0].columns.map(col => ({columnId: col.columnId, encoder: encoderByColumnId(col.columnId)}))
for (let block of blocks) {
for (let col of block.columns) col.decoder.reset()
copyColumns(encoders, block.columns, block.numOps)
}
return encoders
}
/**
* Copies `count` rows from the set of input columns `inCols` to the set of output columns
* `outCols`. The input columns are given as an array of `{columnId, decoder}` objects, and the
* output columns are given as an array of `{columnId, encoder}` objects. Both are sorted in
* increasing order of columnId. If there is no matching input column for a given output column, it
* is filled in with `count` blank values (according to the column type).
*/
function copyColumns(outCols, inCols, count) {
if (count === 0) return
let inIndex = 0, lastGroup = -1, lastCardinality = 0, valueColumn = -1, valueBytes = 0
for (let outCol of outCols) {
while (inIndex < inCols.length && inCols[inIndex].columnId < outCol.columnId) inIndex++
let inCol = null
if (inIndex < inCols.length && inCols[inIndex].columnId === outCol.columnId &&
inCols[inIndex].decoder.buf.byteLength > 0) {
inCol = inCols[inIndex].decoder
}
const colCount = (outCol.columnId >> 4 === lastGroup) ? lastCardinality : count
if (outCol.columnId % 8 === COLUMN_TYPE.GROUP_CARD) {
lastGroup = outCol.columnId >> 4
if (inCol) {
lastCardinality = outCol.encoder.copyFrom(inCol, {count, sumValues: true}).sum
} else {
outCol.encoder.appendValue(0, count)
lastCardinality = 0
}
} else if (outCol.columnId % 8 === COLUMN_TYPE.VALUE_LEN) {
if (inCol) {
if (inIndex + 1 === inCols.length || inCols[inIndex + 1].columnId !== outCol.columnId + 1) {
throw new RangeError('VALUE_LEN column without accompanying VALUE_RAW column')
}
valueColumn = outCol.columnId + 1
valueBytes = outCol.encoder.copyFrom(inCol, {count: colCount, sumValues: true, sumShift: 4}).sum
} else {
outCol.encoder.appendValue(null, colCount)
valueColumn = outCol.columnId + 1
valueBytes = 0
}
} else if (outCol.columnId % 8 === COLUMN_TYPE.VALUE_RAW) {
if (outCol.columnId !== valueColumn) {
throw new RangeError('VALUE_RAW column without accompanying VALUE_LEN column')
}
if (valueBytes > 0) {
outCol.encoder.appendRawBytes(inCol.readRawBytes(valueBytes))
}
} else { // ACTOR_ID, INT_RLE, INT_DELTA, BOOLEAN, or STRING_RLE
if (inCol) {
outCol.encoder.copyFrom(inCol, {count: colCount})
} else {
const blankValue = (outCol.columnId % 8 === COLUMN_TYPE.BOOLEAN) ? false : null
outCol.encoder.appendValue(blankValue, colCount)
}
}
}
}
/**
* Parses one operation from a set of columns. The argument `columns` contains a list of objects
* with `columnId` and `decoder` properties. Returns an array in which the i'th element is the
* value read from the i'th column in `columns`. Does not interpret datatypes; the only
* interpretation of values is that if `actorTable` is given, a value `v` in a column of type
* ACTOR_ID is replaced with `actorTable[v]`.
*/
function readOperation(columns, actorTable) {
let operation = [], colValue, lastGroup = -1, lastCardinality = 0, valueColumn = -1, valueBytes = 0
for (let col of columns) {
if (col.columnId % 8 === COLUMN_TYPE.VALUE_RAW) {
if (col.columnId !== valueColumn) throw new RangeError('unexpected VALUE_RAW column')
colValue = col.decoder.readRawBytes(valueBytes)
} else if (col.columnId % 8 === COLUMN_TYPE.GROUP_CARD) {
lastGroup = col.columnId >> 4
lastCardinality = col.decoder.readValue() || 0
colValue = lastCardinality
} else if (col.columnId >> 4 === lastGroup) {
colValue = []
if (col.columnId % 8 === COLUMN_TYPE.VALUE_LEN) {
valueColumn = col.columnId + 1
valueBytes = 0
}
for (let i = 0; i < lastCardinality; i++) {
let value = col.decoder.readValue()
if (col.columnId % 8 === COLUMN_TYPE.ACTOR_ID && actorTable && typeof value === 'number') {
value = actorTable[value]
}
if (col.columnId % 8 === COLUMN_TYPE.VALUE_LEN) {
valueBytes += colValue >>> 4
}
colValue.push(value)
}
} else {
colValue = col.decoder.readValue()
if (col.columnId % 8 === COLUMN_TYPE.ACTOR_ID && actorTable && typeof colValue === 'number') {
colValue = actorTable[colValue]
}
if (col.columnId % 8 === COLUMN_TYPE.VALUE_LEN) {
valueColumn = col.columnId + 1
valueBytes = colValue >>> 4
}
}
operation.push(colValue)
}
return operation
}
/**
* Appends `operation`, in the form returned by `readOperation()`, to the columns in `outCols`. The
* argument `inCols` provides metadata about the types of columns in `operation`; the value
* `operation[i]` comes from the column `inCols[i]`.
*/
function appendOperation(outCols, inCols, operation) {
let inIndex = 0, lastGroup = -1, lastCardinality = 0
for (let outCol of outCols) {
while (inIndex < inCols.length && inCols[inIndex].columnId < outCol.columnId) inIndex++
if (inIndex < inCols.length && inCols[inIndex].columnId === outCol.columnId) {
const colValue = operation[inIndex]
if (outCol.columnId % 8 === COLUMN_TYPE.GROUP_CARD) {
lastGroup = outCol.columnId >> 4
lastCardinality = colValue
outCol.encoder.appendValue(colValue)
} else if (outCol.columnId >> 4 === lastGroup) {
if (!Array.isArray(colValue) || colValue.length !== lastCardinality) {
throw new RangeError('bad group value')
}
for (let v of colValue) outCol.encoder.appendValue(v)
} else if (outCol.columnId % 8 === COLUMN_TYPE.VALUE_RAW) {
if (colValue) outCol.encoder.appendRawBytes(colValue)
} else {
outCol.encoder.appendValue(colValue)
}
} else if (outCol.columnId % 8 === COLUMN_TYPE.GROUP_CARD) {
lastGroup = outCol.columnId >> 4
lastCardinality = 0
outCol.encoder.appendValue(0)
} else if (outCol.columnId % 8 !== COLUMN_TYPE.VALUE_RAW) {
const count = (outCol.columnId >> 4 === lastGroup) ? lastCardinality : 1
let blankValue = null
if (outCol.columnId % 8 === COLUMN_TYPE.BOOLEAN) blankValue = false
if (outCol.columnId % 8 === COLUMN_TYPE.VALUE_LEN) blankValue = 0
outCol.encoder.appendValue(blankValue, count)
}
}
}
/**
* Parses the next operation from block `blockIndex` of the document. Returns an object of the form
* `{docOp, blockIndex}` where `docOp` is an operation in the form returned by `readOperation()`,
* and `blockIndex` is the block number to use on the next call (it moves on to the next block when
* we reach the end of the current block). `docOp` is null if there are no more operations.
*/
function readNextDocOp(docState, blockIndex) {
let block = docState.blocks[blockIndex]
if (!block.columns[actionIdx].decoder.done) {
return {docOp: readOperation(block.columns), blockIndex}
} else if (blockIndex === docState.blocks.length - 1) {
return {docOp: null, blockIndex}
} else {
blockIndex += 1
block = docState.blocks[blockIndex]
for (let col of block.columns) col.decoder.reset()
return {docOp: readOperation(block.columns), blockIndex}
}
}
/**
* Parses the next operation from a sequence of changes. `changeState` serves as the state of this
* pseudo-iterator, and it is mutated to reflect the new operation. In particular,
* `changeState.nextOp` is set to the operation that was read, and `changeState.done` is set to true
* when we have finished reading the last operation in the last change.
*/
function readNextChangeOp(docState, changeState) {
// If we've finished reading one change, move to the next change that contains at least one op
while (changeState.changeIndex < changeState.changes.length - 1 &&
(!changeState.columns || changeState.columns[actionIdx].decoder.done)) {
changeState.changeIndex += 1
const change = changeState.changes[changeState.changeIndex]
changeState.columns = makeDecoders(change.columns, CHANGE_COLUMNS)
changeState.opCtr = change.startOp
// Update docState based on the information in the change
updateBlockColumns(docState, changeState.columns)
const {actorIds, actorTable} = getActorTable(docState.actorIds, change)
docState.actorIds = actorIds
changeState.actorTable = actorTable
changeState.actorIndex = docState.actorIds.indexOf(change.actorIds[0])
}
// Reached the end of the last change?
if (changeState.columns[actionIdx].decoder.done) {
changeState.done = true
changeState.nextOp = null
return
}
changeState.nextOp = readOperation(changeState.columns, changeState.actorTable)
changeState.nextOp[idActorIdx] = changeState.actorIndex
changeState.nextOp[idCtrIdx] = changeState.opCtr
changeState.changes[changeState.changeIndex].maxOp = changeState.opCtr
if (changeState.opCtr > docState.maxOp) docState.maxOp = changeState.opCtr
changeState.opCtr += 1
const op = changeState.nextOp
if ((op[objCtrIdx] === null && op[objActorIdx] !== null) ||
(op[objCtrIdx] !== null && op[objActorIdx] === null)) {
throw new RangeError(`Mismatched object reference: (${op[objCtrIdx]}, ${op[objActorIdx]})`)
}
if ((op[keyCtrIdx] === null && op[keyActorIdx] !== null) ||
(op[keyCtrIdx] === 0 && op[keyActorIdx] !== null) ||
(op[keyCtrIdx] > 0 && op[keyActorIdx] === null)) {
throw new RangeError(`Mismatched operation key: (${op[keyCtrIdx]}, ${op[keyActorIdx]})`)
}
}
function emptyObjectPatch(objectId, type) {
if (type === 'list' || type === 'text') {
return {objectId, type, edits: []}
} else {
return {objectId, type, props: {}}
}
}
/**
* Returns true if the two given operation IDs have the same actor ID, and the counter of `id2` is
* exactly `delta` greater than the counter of `id1`.
*/
function opIdDelta(id1, id2, delta = 1) {
const parsed1 = parseOpId(id1), parsed2 = parseOpId(id2)
return parsed1.actorId === parsed2.actorId && parsed1.counter + delta === parsed2.counter
}
/**
* Appends a list edit operation (insert, update, remove) to an array of existing operations. If the
* last existing operation can be extended (as a multi-op), we do that.
*/
function appendEdit(existingEdits, nextEdit) {
if (existingEdits.length === 0) {
existingEdits.push(nextEdit)
return
}
let lastEdit = existingEdits[existingEdits.length - 1]
if (lastEdit.action === 'insert' && nextEdit.action === 'insert' &&
lastEdit.index === nextEdit.index - 1 &&
lastEdit.value.type === 'value' && nextEdit.value.type === 'value' &&
lastEdit.elemId === lastEdit.opId && nextEdit.elemId === nextEdit.opId &&
opIdDelta(lastEdit.elemId, nextEdit.elemId, 1) &&
lastEdit.value.datatype === nextEdit.value.datatype &&
typeof lastEdit.value.value === typeof nextEdit.value.value) {
lastEdit.action = 'multi-insert'
if (nextEdit.value.datatype) lastEdit.datatype = nextEdit.value.datatype
lastEdit.values = [lastEdit.value.value, nextEdit.value.value]
delete lastEdit.value
delete lastEdit.opId
} else if (lastEdit.action === 'multi-insert' && nextEdit.action === 'insert' &&
lastEdit.index + lastEdit.values.length === nextEdit.index &&
nextEdit.value.type === 'value' && nextEdit.elemId === nextEdit.opId &&
opIdDelta(lastEdit.elemId, nextEdit.elemId, lastEdit.values.length) &&
lastEdit.datatype === nextEdit.value.datatype &&
typeof lastEdit.values[0] === typeof nextEdit.value.value) {
lastEdit.values.push(nextEdit.value.value)
} else if (lastEdit.action === 'remove' && nextEdit.action === 'remove' &&
lastEdit.index === nextEdit.index) {
lastEdit.count += nextEdit.count
} else {
existingEdits.push(nextEdit)
}
}
/**
* `edits` is an array of (SingleInsertEdit | MultiInsertEdit | UpdateEdit | RemoveEdit) list edits
* for a patch. This function appends an UpdateEdit to this array. A conflict is represented by
* having several consecutive edits with the same index, and this can be realised by calling
* `appendUpdate` several times for the same list element. On the first such call, `firstUpdate`
* must be true.
*
* It is possible that coincidentally the previous edit (potentially arising from a different
* change) is for the same index. If this is the case, to avoid accidentally treating consecutive
* updates for the same index as a conflict, we remove the previous edit for the same index. This is
* safe because the previous edit is overwritten by the new edit being appended, and we know that
* it's for the same list elements because there are no intervening insertions/deletions that could
* have changed the indexes.
*/
function appendUpdate(edits, index, elemId, opId, value, firstUpdate) {
let insert = false
if (firstUpdate) {
// Pop all edits for the same index off the end of the edits array. This sequence may begin with
// either an insert or an update. If it's an insert, we remember that fact, and use it below.
while (!insert && edits.length > 0) {
const lastEdit = edits[edits.length - 1]
if ((lastEdit.action === 'insert' || lastEdit.action === 'update') && lastEdit.index === index) {
edits.pop()
insert = (lastEdit.action === 'insert')
} else if (lastEdit.action === 'multi-insert' && lastEdit.index + lastEdit.values.length - 1 === index) {
lastEdit.values.pop()
insert = true
} else {
break
}
}
}
// If we popped an insert edit off the edits array, we need to turn the new update into an insert
// in order to ensure the list element still gets inserted (just with a new value).
if (insert) {
appendEdit(edits, {action: 'insert', index, elemId, opId, value})
} else {
appendEdit(edits, {action: 'update', index, opId, value})
}
}
/**
* `edits` is an array of (SingleInsertEdit | MultiInsertEdit | UpdateEdit | RemoveEdit) list edits
* for a patch. We assume that there is a suffix of this array that consists of an insertion at
* position `index`, followed by zero or more UpdateEdits at the same index. This function rewrites
* that suffix to be all updates instead. This is needed because sometimes when generating a patch
* we think we are performing a list insertion, but then it later turns out that there was already
* an existing value at that list element, and so we actually need to do an update, not an insert.
*
* If the suffix is preceded by one or more updates at the same index, those earlier updates are
* removed by `appendUpdate()` to ensure we don't inadvertently treat them as part of the same
* conflict.
*/
function convertInsertToUpdate(edits, index, elemId) {
let updates = []
while (edits.length > 0) {
let lastEdit = edits[edits.length - 1]
if (lastEdit.action === 'insert') {
if (lastEdit.index !== index) throw new RangeError('last edit has unexpected index')
updates.unshift(edits.pop())
break
} else if (lastEdit.action === 'update') {
if (lastEdit.index !== index) throw new RangeError('last edit has unexpected index')
updates.unshift(edits.pop())
} else {
// It's impossible to encounter a remove edit here because the state machine in
// updatePatchProperty() ensures that a property can have either an insert or a remove edit,
// but not both. It's impossible to encounter a multi-insert here because multi-inserts always
// have equal elemId and opId (i.e. they can only be used for the operation that first inserts
// an element, but not for any subsequent assignments to that list element); moreover,
// convertInsertToUpdate is only called if an insert action is followed by a non-overwritten
// document op. The fact that there is a non-overwritten document op after another op on the
// same list element implies that the original insertion op for that list element must be
// overwritten, and thus the original insertion op cannot have given rise to a multi-insert.
throw new RangeError('last edit has unexpected action')
}
}
// Now take the edits we popped off and push them back onto the list again
let firstUpdate = true
for (let update of updates) {
appendUpdate(edits, index, elemId, update.opId, update.value, firstUpdate)
firstUpdate = false
}
}
/**
* Updates `patches` to reflect the operation `op` within the document with state `docState`.
* Can be called multiple times if there are multiple operations for the same property (e.g. due
* to a conflict). `propState` is an object that carries over state between such successive
* invocations for the same property. If the current object is a list, `listIndex` is the index
* into that list (counting only visible elements). If the operation `op` was already previously
* in the document, `oldSuccNum` is the value of `op[succNumIdx]` before the current change was
* applied (allowing us to determine whether this operation was overwritten or deleted in the
* current change). `oldSuccNum` must be undefined if the operation came from the current change.
* If we are creating an incremental patch as a result of applying one or more changes, `newBlock`
* is the block to which the operations are getting written; we will update the metadata on this
* block. `newBlock` should be null if we are creating a patch for the whole document.
*/
function updatePatchProperty(patches, newBlock, objectId, op, docState, propState, listIndex, oldSuccNum) {
const isWholeDoc = !newBlock
const type = op[actionIdx] < ACTIONS.length ? OBJECT_TYPE[ACTIONS[op[actionIdx]]] : null
const opId = `${op[idCtrIdx]}@${docState.actorIds[op[idActorIdx]]}`
const elemIdActor = op[insertIdx] ? op[idActorIdx] : op[keyActorIdx]
const elemIdCtr = op[insertIdx] ? op[idCtrIdx] : op[keyCtrIdx]
const elemId = op[keyStrIdx] ? op[keyStrIdx] : `${elemIdCtr}@${docState.actorIds[elemIdActor]}`
// When the change contains a new make* operation (i.e. with an even-numbered action), record the
// new parent-child relationship in objectMeta. TODO: also handle link/move operations.
if (op[actionIdx] % 2 === 0 && !docState.objectMeta[opId]) {
docState.objectMeta[opId] = {parentObj: objectId, parentKey: elemId, opId, type, children: {}}
deepCopyUpdate(docState.objectMeta, [objectId, 'children', elemId, opId], {objectId: opId, type, props: {}})
}
// firstOp is true if the current operation is the first of a sequence of ops for the same key
const firstOp = !propState[elemId]
if (!propState[elemId]) propState[elemId] = {visibleOps: [], hasChild: false}
// An operation is overwritten if it is a document operation that has at least one successor
const isOverwritten = (oldSuccNum !== undefined && op[succNumIdx] > 0)
// Record all visible values for the property, and whether it has any child object
if (!isOverwritten) {
propState[elemId].visibleOps.push(op)
propState[elemId].hasChild = propState[elemId].hasChild || (op[actionIdx] % 2) === 0 // even-numbered action == make* operation
}
// If one or more of the values of the property is a child object, we update objectMeta to store
// all of the visible values of the property (even the non-child-object values). Then, when we
// subsequently process an update within that child object, we can construct the patch to
// contain the conflicting values.
const prevChildren = docState.objectMeta[objectId].children[elemId]
if (propState[elemId].hasChild || (prevChildren && Object.keys(prevChildren).length > 0)) {
let values = {}
for (let visible of propState[elemId].visibleOps) {
const opId = `${visible[idCtrIdx]}@${docState.actorIds[visible[idActorIdx]]}`
if (ACTIONS[visible[actionIdx]] === 'set') {
values[opId] = Object.assign({type: 'value'}, decodeValue(visible[valLenIdx], visible[valRawIdx]))
} else if (visible[actionIdx] % 2 === 0) {
const objType = visible[actionIdx] < ACTIONS.length ? OBJECT_TYPE[ACTIONS[visible[actionIdx]]] : null
values[opId] = emptyObjectPatch(opId, objType)
}
}
// Copy so that objectMeta is not modified if an exception is thrown while applying change
deepCopyUpdate(docState.objectMeta, [objectId, 'children', elemId], values)
}
let patchKey, patchValue
// For counters, increment operations are succs to the set operation that created the counter,
// but in this case we want to add the values rather than overwriting them.
if (isOverwritten && ACTIONS[op[actionIdx]] === 'set' && (op[valLenIdx] & 0x0f) === VALUE_TYPE.COUNTER) {
// This is the initial set operation that creates a counter. Initialise the counter state
// to contain all successors of the set operation. Only if we later find that each of these
// successor operations is an increment, we make the counter visible in the patch.
if (!propState[elemId]) propState[elemId] = {visibleOps: [], hasChild: false}
if (!propState[elemId].counterStates) propState[elemId].counterStates = {}
let counterStates = propState[elemId].counterStates
let counterState = {opId, value: decodeValue(op[valLenIdx], op[valRawIdx]).value, succs: {}}
for (let i = 0; i < op[succNumIdx]; i++) {
const succOp = `${op[succCtrIdx][i]}@${docState.actorIds[op[succActorIdx][i]]}`
counterStates[succOp] = counterState
counterState.succs[succOp] = true
}
} else if (ACTIONS[op[actionIdx]] === 'inc') {
// Incrementing a previously created counter.
if (!propState[elemId] || !propState[elemId].counterStates || !propState[elemId].counterStates[opId]) {
throw new RangeError(`increment operation ${opId} for unknown counter`)
}
let counterState = propState[elemId].counterStates[opId]
counterState.value += decodeValue(op[valLenIdx], op[valRawIdx]).value
delete counterState.succs[opId]
if (Object.keys(counterState.succs).length === 0) {
patchKey = counterState.opId
patchValue = {type: 'value', datatype: 'counter', value: counterState.value}
// TODO if the counter is in a list element, we need to add a 'remove' action when deleted
}
} else if (!isOverwritten) {
// Add the value to the patch if it is not overwritten (i.e. if it has no succs).
if (ACTIONS[op[actionIdx]] === 'set') {
patchKey = opId
patchValue = Object.assign({type: 'value'}, decodeValue(op[valLenIdx], op[valRawIdx]))
} else if (op[actionIdx] % 2 === 0) { // even-numbered action == make* operation
if (!patches[opId]) patches[opId] = emptyObjectPatch(opId, type)
patchKey = opId
patchValue = patches[opId]
}
}
if (!patches[objectId]) patches[objectId] = emptyObjectPatch(objectId, docState.objectMeta[objectId].type)
const patch = patches[objectId]
// Updating a list or text object (with elemId key)
if (op[keyStrIdx] === null) {
// If we come across any document op that was previously non-overwritten/non-deleted, that
// means the current list element already had a value before this change was applied, and
// therefore the current element cannot be an insert. If we already registered an insert, we
// have to convert it into an update.
if (oldSuccNum === 0 && !isWholeDoc && propState[elemId].action === 'insert') {
propState[elemId].action = 'update'
convertInsertToUpdate(patch.edits, listIndex, elemId)
if (newBlock) newBlock.numVisible[objectId] -= 1
}
if (patchValue) {
// If the op has a non-overwritten value and it came from the change, it's an insert.
// (It's not necessarily the case that op[insertIdx] is true: if a list element is concurrently
// deleted and updated, the node that first processes the deletion and then the update will
// observe the update as a re-insertion of the deleted list element.)
if (!propState[elemId].action && (oldSuccNum === undefined || isWholeDoc)) {
propState[elemId].action = 'insert'
appendEdit(patch.edits, {action: 'insert', index: listIndex, elemId, opId: patchKey, value: patchValue})
if (newBlock) {
if (newBlock.numVisible[objectId] === undefined) newBlock.numVisible[objectId] = 0
newBlock.numVisible[objectId] += 1
}
// If the property has a value and it's not an insert, then it must be an update.
// We might have previously registered it as a remove, in which case we convert it to update.
} else if (propState[elemId].action === 'remove') {
let lastEdit = patch.edits[patch.edits.length - 1]
if (lastEdit.action !== 'remove') throw new RangeError('last edit has unexpected type')
if (lastEdit.count > 1) lastEdit.count -= 1; else patch.edits.pop()
propState[elemId].action = 'update'
appendUpdate(patch.edits, listIndex, elemId, patchKey, patchValue, true)
if (newBlock) newBlock.numVisible[objectId] += 1
} else {
// A 'normal' update
appendUpdate(patch.edits, listIndex, elemId, patchKey, patchValue, !propState[elemId].action)
if (!propState[elemId].action) propState[elemId].action = 'update'
}
} else if (oldSuccNum === 0 && !propState[elemId].action) {
// If the property used to have a non-overwritten/non-deleted value, but no longer, it's a remove
propState[elemId].action = 'remove'
appendEdit(patch.edits, {action: 'remove', index: listIndex, count: 1})
if (newBlock) newBlock.numVisible[objectId] -= 1
}
} else if (patchValue || !isWholeDoc) {
// Updating a map or table (with string key)
if (firstOp || !patch.props[op[keyStrIdx]]) patch.props[op[keyStrIdx]] = {}
if (patchValue) patch.props[op[keyStrIdx]][patchKey] = patchValue
}
}
/**
* Applies operations (from one or more changes) to the document by merging the sequence of change
* ops into the sequence of document ops. The two inputs are `changeState` and `docState`
* respectively. Assumes that the decoders of both sets of columns are at the position where we want
* to start merging. `patches` is mutated to reflect the effect of the change operations. `ops` is
* the operation sequence to apply (as decoded by `groupRelatedOps()`). `docState` is as
* documented in `applyOps()`. If the operations are updating a list or text object, `listIndex`
* is the number of visible elements that precede the position at which we start merging.
* `blockIndex` is the document block number from which we are currently reading.
*/
function mergeDocChangeOps(patches, newBlock, outCols, changeState, docState, listIndex, blockIndex) {
const firstOp = changeState.nextOp, insert = firstOp[insertIdx]
const objActor = firstOp[objActorIdx], objCtr = firstOp[objCtrIdx]
const objectId = objActor === null ? '_root' : `${objCtr}@${docState.actorIds[objActor]}`
const idActorIndex = changeState.actorIndex, idActor = docState.actorIds[idActorIndex]
let foundListElem = false, elemVisible = false, propState = {}, docOp
;({ docOp, blockIndex } = readNextDocOp(docState, blockIndex))
let docOpsConsumed = (docOp === null ? 0 : 1)
let docOpOldSuccNum = (docOp === null ? 0 : docOp[succNumIdx])
let changeOp = null, changeOps = [], changeCols = [], predSeen = [], lastChangeKey = null
changeState.objectIds.add(objectId)
// Merge the two inputs: the sequence of ops in the doc, and the sequence of ops in the change.
// At each iteration, we either output the doc's op (possibly updated based on the change's ops)
// or output an op from the change.
while (true) {
// The array `changeOps` contains operations from the change(s) we're applying. When the array
// is empty, we load changes from the change. Typically we load only a single operation at a
// time, with two exceptions: 1. all operations that update the same key or list element in the
// same object are put into changeOps at the same time (this is needed so that we can update the
// succ columns of the document ops correctly); 2. a run of consecutive insertions is also
// placed into changeOps in one go.
//
// When we have processed all the ops in changeOps we try to see whether there are further
// operations that we can also process while we're at it. Those operations must be for the same
// object, they must be for a key or list element that appears later in the document, they must
// either all be insertions or all be non-insertions, and if insertions, they must be
// consecutive. If these conditions are satisfied, that means the operations can be processed in
// the same pass. If we encounter an operation that does not meet these conditions, we leave
// changeOps empty, and this function returns after having processed any remaining document ops.
//
// Any operations that could not be processed in a single pass remain in changeState; applyOps
// will seek to the appropriate position and then call mergeDocChangeOps again.
if (changeOps.length === 0) {
foundListElem = false
let nextOp = changeState.nextOp
while (!changeState.done && nextOp[idActorIdx] === idActorIndex && nextOp[insertIdx] === insert &&
nextOp[objActorIdx] === firstOp[objActorIdx] && nextOp[objCtrIdx] === firstOp[objCtrIdx]) {
// Check if the operation's pred references a previous operation in changeOps
const lastOp = (changeOps.length > 0) ? changeOps[changeOps.length - 1] : null
let isOverwrite = false
for (let i = 0; i < nextOp[predNumIdx]; i++) {
for (let prevOp of changeOps) {
if (nextOp[predActorIdx][i] === prevOp[idActorIdx] && nextOp[predCtrIdx][i] === prevOp[idCtrIdx]) {
isOverwrite = true
}
}
}
// If any of the following `if` statements is true, we add `nextOp` to `changeOps`. If they
// are all false, we break out of the loop and stop adding to `changeOps`.
if (nextOp === firstOp) {
// First change operation in a mergeDocChangeOps call is always used
} else if (insert && lastOp !== null && nextOp[keyStrIdx] === null &&
nextOp[keyActorIdx] === lastOp[idActorIdx] &&
nextOp[keyCtrIdx] === lastOp[idCtrIdx]) {
// Collect consecutive insertions
} else if (!insert && lastOp !== null && nextOp[keyStrIdx] !== null &&
nextOp[keyStrIdx] === lastOp[keyStrIdx] && !isOverwrite) {
// Collect several updates to the same key
} else if (!insert && lastOp !== null &&
nextOp[keyStrIdx] === null && lastOp[keyStrIdx] === null &&
nextOp[keyActorIdx] === lastOp[keyActorIdx] &&
nextOp[keyCtrIdx] === lastOp[keyCtrIdx] && !isOverwrite) {
// Collect several updates to the same list element
} else if (!insert && lastOp === null && nextOp[keyStrIdx] === null &&
docOp && docOp[insertIdx] && docOp[keyStrIdx] === null &&
docOp[idActorIdx] === nextOp[keyActorIdx] &&
docOp[idCtrIdx] === nextOp[keyCtrIdx]) {
// When updating/deleting list elements, keep going if the next elemId in the change
// equals the next elemId in the doc (i.e. we're updating several consecutive elements)
} else if (!insert && lastOp === null && nextOp[keyStrIdx] !== null &&
lastChangeKey !== null && lastChangeKey < nextOp[keyStrIdx]) {
// Allow a single mergeDocChangeOps call to process changes to several keys in the same
// object, provided that they appear in ascending order
} else break
lastChangeKey = (nextOp !== null) ? nextOp[keyStrIdx] : null
changeOps.push(changeState.nextOp)
changeCols.push(changeState.columns)
predSeen.push(new Array(changeState.nextOp[predNumIdx]))
readNextChangeOp(docState, changeState)
nextOp = changeState.nextOp
}
}
if (changeOps.length > 0) changeOp = changeOps[0]
const inCorrectObject = docOp && docOp[objActorIdx] === changeOp[objActorIdx] && docOp[objCtrIdx] === changeOp[objCtrIdx]
const keyMatches = docOp && docOp[keyStrIdx] !== null && docOp[keyStrIdx] === changeOp[keyStrIdx]
const listElemMatches = docOp && docOp[keyStrIdx] === null && changeOp[keyStrIdx] === null &&
((!docOp[insertIdx] && docOp[keyActorIdx] === changeOp[keyActorIdx] && docOp[keyCtrIdx] === changeOp[keyCtrIdx]) ||
(docOp[insertIdx] && docOp[idActorIdx] === changeOp[keyActorIdx] && docOp[idCtrIdx] === changeOp[keyCtrIdx]))
// We keep going until we run out of ops in the change, except that even when we run out, we
// keep going until we have processed all doc ops for the current key/list element.
if (changeOps.length === 0 && !(inCorrectObject && (keyMatches || listElemMatches))) break
let takeDocOp = false, takeChangeOps = 0
// The change operations come first if we are inserting list elements (seekToOp already
// determines the correct insertion position), if there is no document operation, if the next
// document operation is for a different object, or if the change op's string key is
// lexicographically first (TODO check ordering of keys beyond the basic multilingual plane).
if (insert || !inCorrectObject ||
(docOp[keyStrIdx] === null && changeOp[keyStrIdx] !== null) ||
(docOp[keyStrIdx] !== null && changeOp[keyStrIdx] !== null && changeOp[keyStrIdx] < docOp[keyStrIdx])) {
// Take the operations from the change
takeChangeOps = changeOps.length
if (!inCorrectObject && !foundListElem && changeOp[keyStrIdx] === null && !changeOp[insertIdx]) {
// This can happen if we first update one list element, then another one earlier in the
// list. That is not allowed: list element updates must occur in ascending order.
throw new RangeError("could not find list element with ID: " +
`${changeOp[keyCtrIdx]}@${docState.actorIds[changeOp[keyActorIdx]]}`)
}
} else if (keyMatches || listElemMatches || foundListElem) {
// The doc operation is for the same key or list element in the same object as the change
// ops, so we merge them. First, if any of the change ops' `pred` matches the opId of the
// document operation, we update the document operation's `succ` accordingly.
for (let opIndex = 0; opIndex < changeOps.length; opIndex++) {
const op = changeOps[opIndex]
for (let i = 0; i < op[predNumIdx]; i++) {
if (op[predActorIdx][i] === docOp[idActorIdx] && op[predCtrIdx][i] === docOp[idCtrIdx]) {
// Insert into the doc op's succ list such that the lists remains sorted
let j = 0
while (j < docOp[succNumIdx] && (docOp[succCtrIdx][j] < op[idCtrIdx] ||
docOp[succCtrIdx][j] === op[idCtrIdx] && docState.actorIds[docOp[succActorIdx][j]] < idActor)) j++
docOp[succCtrIdx].splice(j, 0, op[idCtrIdx])
docOp[succActorIdx].splice(j, 0, idActorIndex)
docOp[succNumIdx]++
predSeen[opIndex][i] = true
break
}
}
}
if (listElemMatches) foundListElem = true
if (foundListElem && !listElemMatches) {
// If the previous docOp was for the correct list element, and the current docOp is for
// the wrong list element, then place the current changeOp before the docOp.
takeChangeOps = changeOps.length
} else if (changeOps.length === 0 || docOp[idCtrIdx] < changeOp[idCtrIdx] ||
(docOp[idCtrIdx] === changeOp[idCtrIdx] && docState.actorIds[docOp[idActorIdx]] < idActor)) {
// When we have several operations for the same object and the same key, we want to keep
// them sorted in ascending order by opId. Here we have docOp with a lower opId, so we
// output it first.
takeDocOp = true
updatePatchProperty(patches, newBlock, objectId, docOp, docState, propState, listIndex, docOpOldSuccNum)
// A deletion op in the change is represented in the document only by its entries in the
// succ list of the operations it overwrites; it has no separate row in the set of ops.
for (let i = changeOps.length - 1; i >= 0; i--) {
let deleted = true
for (let j = 0; j < changeOps[i][predNumIdx]; j++) {
if (!predSeen[i][j]) deleted = false
}
if (ACTIONS[changeOps[i][actionIdx]] === 'del' && deleted) {
changeOps.splice(i, 1)
changeCols.splice(i, 1)
predSeen.splice(i, 1)
}
}
} else if (docOp[idCtrIdx] === changeOp[idCtrIdx] && docState.actorIds[docOp[idActorIdx]] === idActor) {
throw new RangeError(`duplicate operation ID: ${changeOp[idCtrIdx]}@${idActor}`)
} else {
// The changeOp has the lower opId, so we output it first.
takeChangeOps = 1
}
} else {
// The document operation comes first if its string key is lexicographically first, or if
// we're using opId keys and the keys don't match (i.e. we scan the document until we find a
// matching key).
takeDocOp = true
}
if (takeDocOp) {
appendOperation(outCols, docState.blocks[blockIndex].columns, docOp)
addBlockOperation(newBlock, docOp, objectId, docState.actorIds, false)
if (docOp[insertIdx] && elemVisible) {
elemVisible = false
listIndex++
}
if (docOp[succNumIdx] === 0) elemVisible = true
newBlock.numOps++
;({ docOp, blockIndex } = readNextDocOp(docState, blockIndex))
if (docOp !== null) {
docOpsConsumed++
docOpOldSuccNum = docOp[succNumIdx]
}
}
if (takeChangeOps > 0) {
for (let i = 0; i < takeChangeOps; i++) {
let op = changeOps[i]
// Check that we've seen all ops mentioned in `pred` (they must all have lower opIds than
// the change op's own opId, so we must have seen them already)
for (let j = 0; j < op[predNumIdx]; j++) {
if (!predSeen[i][j]) {
throw new RangeError(`no matching operation for pred: ${op[predCtrIdx][j]}@${docState.actorIds[op[predActorIdx][j]]}`)
}
}
updatePatchProperty(patches, newBlock, objectId, op, docState, propState, listIndex)
appendOperation(outCols, changeCols[i], op)
addBlockOperation(newBlock, op, objectId, docState.actorIds, true)
if (op[insertIdx]) {
elemVisible = false
listIndex++
} else {
elemVisible = true
}
}
if (takeChangeOps === changeOps.length) {
changeOps.length = 0
changeCols.length = 0
predSeen.length = 0
} else {
changeOps.splice(0, takeChangeOps)
changeCols.splice(0, takeChangeOps)
predSeen.splice(0, takeChangeOps)
}
newBlock.numOps += takeChangeOps
}
}
if (docOp) {
appendOperation(outCols, docState.blocks[blockIndex].columns, docOp)
newBlock.numOps++
if (docOp[objActorIdx] === objActor && docOp[objCtrIdx] === objCtr) {
addBlockOperation(newBlock, docOp, objectId, docState.actorIds, false)
}
}
return {docOpsConsumed, blockIndex}
}
/**
* Applies operations from the change (or series of changes) in `changeState` to the document
* `docState`. Passing `changeState` to `readNextChangeOp` allows iterating over the change ops.
* `docState` is an object with keys:
* - `actorIds` is an array of actorIds (as hex strings) occurring in the document (values in
* the document's objActor/keyActor/idActor/... columns are indexes into this array).
* - `blocks` is an array of all the blocks of operations in the document.
* - `objectMeta` is a map from objectId to metadata about that object.
*
* `docState` is mutated to contain the updated document state.
* `patches` is a patch object that is mutated to reflect the operations applied by this function.
*/
function applyOps(patches, changeState, docState) {
const [objActorNum, objCtr, keyActorNum, keyCtr, keyStr, idActorNum, idCtr, insert] = changeState.nextOp
const objActor = objActorNum === null ? null : docState.actorIds[objActorNum]
const keyActor = keyActorNum === null ? null : docState.actorIds[keyActorNum]
const ops = {
objActor, objCtr, keyActor, keyCtr, keyStr, idActor: docState.actorIds[idActorNum], idCtr, insert,
objId: objActor === null ? '_root' : `${objCtr}@${objActor}`
}
const {blockIndex, skipCount, visibleCount} = seekToOp(docState, ops)
const block = docState.blocks[blockIndex]
for (let col of block.columns) col.decoder.reset()
const resetFirstVisible = (skipCount === 0) || (block.firstVisibleActor === undefined) ||
(!insert && block.firstVisibleActor === keyActorNum && block.firstVisibleCtr === keyCtr)
const newBlock = {
columns: undefined,
bloom: new Uint8Array(block.bloom),
lastKey: copyObject(block.lastKey),
numVisible: copyObject(block.numVisible),
numOps: skipCount,
lastObjectActor: block.lastObjectActor,
lastObjectCtr: block.lastObjectCtr,
firstVisibleActor: resetFirstVisible ? undefined : block.firstVisibleActor,
firstVisibleCtr: resetFirstVisible ? undefined : block.firstVisibleCtr,
lastVisibleActor: undefined,
lastVisibleCtr: undefined
}
// Copy the operations up to the insertion position (the first skipCount operations)
const outCols = block.columns.map(col => ({columnId: col.columnId, encoder: encoderByColumnId(col.columnId)}))
copyColumns(outCols, block.columns, skipCount)
// Apply the operations from the change. This may cause blockIndex to move forwards if the
// property being updated straddles a block boundary.
const {blockIndex: lastBlockIndex, docOpsConsumed} =
mergeDocChangeOps(patches, newBlock, outCols, changeState, docState, visibleCount, blockIndex)
// Copy the remaining operations after the insertion position
const lastBlock = docState.blocks[lastBlockIndex]
let copyAfterMerge = -skipCount - docOpsConsumed
for (let i = blockIndex; i <= lastBlockIndex; i++) copyAfterMerge += docState.blocks[i].numOps
copyColumns(outCols, lastBlock.columns, copyAfterMerge)
newBlock.numOps += copyAfterMerge
for (let col of lastBlock.columns) {
if (!col.decoder.done) throw new RangeError(`excess ops in column ${col.columnId}`)
}
newBlock.columns = outCols.map(col => {
const decoder = decoderByColumnId(col.columnId, col.encoder.buffer)
return {columnId: col.columnId, decoder}
})
if (blockIndex === lastBlockIndex && newBlock.numOps <= MAX_BLOCK_SIZE) {
// The result is just one output block
if (copyAfterMerge > 0 && block.lastVisibleActor !== undefined && block.lastVisibleCtr !== undefined) {
// It's possible that none of the ops after the merge point are visible, in which case the
// lastVisible may not be strictly correct, because it may refer to an operation before the
// merge point rather than a list element inserted by the current change. However, this doesn't
// matter, because the only purpose for which we need it is to check whether one block ends with
// the same visible element as the next block starts with (to avoid double-counting its index);
// if the last list element of a block is invisible, the exact value of lastVisible doesn't
// matter since it will be different from the next block's firstVisible in any case.
newBlock.lastVisibleActor = block.lastVisibleActor
newBlock.lastVisibleCtr = block.lastVisibleCtr
}
docState.blocks[blockIndex] = newBlock
} else {
// Oversized output block must be split into smaller blocks
const newBlocks = splitBlock(newBlock, docState.actorIds)
docState.blocks.splice(blockIndex, lastBlockIndex - blockIndex + 1, ...newBlocks)
}
}
/**
* Updates the columns in a document's operation blocks to contain all the columns in a change
* (including any column types we don't recognise, which have been generated by a future version
* of Automerge).
*/
function updateBlockColumns(docState, changeCols) {
// Check that the columns of a change appear at the index at which we expect them to be
if (changeCols[objActorIdx ].columnId !== CHANGE_COLUMNS[objActorIdx ].columnId || CHANGE_COLUMNS[objActorIdx ].columnName !== 'objActor' ||
changeCols[objCtrIdx ].columnId !== CHANGE_COLUMNS[objCtrIdx ].columnId || CHANGE_COLUMNS[objCtrIdx ].columnName !== 'objCtr' ||
changeCols[keyActorIdx ].columnId !== CHANGE_COLUMNS[keyActorIdx ].columnId || CHANGE_COLUMNS[keyActorIdx ].columnName !== 'keyActor' ||
changeCols[keyCtrIdx ].columnId !== CHANGE_COLUMNS[keyCtrIdx ].columnId || CHANGE_COLUMNS[keyCtrIdx ].columnName !== 'keyCtr' ||
changeCols[keyStrIdx ].columnId !== CHANGE_COLUMNS[keyStrIdx ].columnId || CHANGE_COLUMNS[keyStrIdx ].columnName !== 'keyStr' ||
changeCols[idActorIdx ].columnId !== CHANGE_COLUMNS[idActorIdx ].columnId || CHANGE_COLUMNS[idActorIdx ].columnName !== 'idActor' ||
changeCols[idCtrIdx ].columnId !== CHANGE_COLUMNS[idCtrIdx ].columnId || CHANGE_COLUMNS[idCtrIdx ].columnName !== 'idCtr' ||
changeCols[insertIdx ].columnId !== CHANGE_COLUMNS[insertIdx ].columnId || CHANGE_COLUMNS[insertIdx ].columnName !== 'insert' ||
changeCols[actionIdx ].columnId !== CHANGE_COLUMNS[actionIdx ].columnId || CHANGE_COLUMNS[actionIdx ].columnName !== 'action' ||
changeCols[valLenIdx ].columnId !== CHANGE_COLUMNS[valLenIdx ].columnId || CHANGE_COLUMNS[valLenIdx ].columnName !== 'valLen' ||
changeCols[valRawIdx ].columnId !== CHANGE_COLUMNS[valRawIdx ].columnId || CHANGE_COLUMNS[valRawIdx ].columnName !== 'valRaw' ||
changeCols[predNumIdx ].columnId !== CHANGE_COLUMNS[predNumIdx ].columnId || CHANGE_COLUMNS[predNumIdx ].columnName !== 'predNum' ||
changeCols[predActorIdx].columnId !== CHANGE_COLUMNS[predActorIdx].columnId || CHANGE_COLUMNS[predActorIdx].columnName !== 'predActor' ||
changeCols[predCtrIdx ].columnId !== CHANGE_COLUMNS[predCtrIdx ].columnId || CHANGE_COLUMNS[predCtrIdx ].columnName !== 'predCtr') {
throw new RangeError('unexpected columnId')
}
// Check if there any columns in the change that are not in the document, apart from pred*
const docCols = docState.blocks[0].columns
if (!changeCols.every(changeCol => PRED_COLUMN_IDS.includes(changeCol.columnId) ||
docCols.find(docCol => docCol.columnId === changeCol.columnId))) {
let allCols = docCols.map(docCol => ({columnId: docCol.columnId}))
for (let changeCol of changeCols) {
const { columnId } = changeCol
if (!PRED_COLUMN_IDS.includes(columnId) && !docCols.find(docCol => docCol.columnId === columnId)) {
allCols.push({columnId})
}
}
allCols.sort((a, b) => a.columnId - b.columnId)
for (let blockIndex = 0; blockIndex < docState.blocks.length; blockIndex++) {
let block = copyObject(docState.blocks[blockIndex])
block.columns = makeDecoders(block.columns.map(col => ({columnId: col.columnId, buffer: col.decoder.buf})), allCols)
docState.blocks[blockIndex] = block
}
}
}
/**
* Takes a decoded change header, including an array of actorIds. Returns an object of the form
* `{actorIds, actorTable}`, where `actorIds` is an updated array of actorIds appearing in the
* document (including the new change's actorId). `actorTable` is an array of integers where
* `actorTable[i]` contains the document's actor index for the actor that has index `i` in the
* change (`i == 0` is the author of the change).
*/
function getActorTable(actorIds, change) {
if (actorIds.indexOf(change.actorIds[0]) < 0) {
if (change.seq !== 1) {
throw new RangeError(`Seq ${change.seq} is the first change for actor ${change.actorIds[0]}`)
}
// Use concat, not push, so that the original array is not mutated
actorIds = actorIds.concat([change.actorIds[0]])
}
const actorTable = [] // translate from change's actor index to doc's actor index
for (let actorId of change.actorIds) {
const index = actorIds.indexOf(actorId)
if (index < 0) {
throw new RangeError(`actorId ${actorId} is not known to document`)
}
actorTable.push(index)
}
return {actorIds, actorTable}
}
/**
* Finalises the patch for a change. `patches` is a map from objectIds to patch for that
* particular object, `objectIds` is the array of IDs of objects that are created or updated in the
* change, and `docState` is an object containing various bits of document state, including
* `objectMeta`, a map from objectIds to metadata about that object (such as its parent in the
* document tree). Mutates `patches` such that child objects are linked into their parent object,
* all the way to the root object.
*/
function setupPatches(patches, objectIds, docState) {
for (let objectId of objectIds) {
let meta = docState.objectMeta[objectId], childMeta = null, patchExists = false
while (true) {
const hasChildren = childMeta && Object.keys(meta.children[childMeta.parentKey]).length > 0
if (!patches[objectId]) patches[objectId] = emptyObjectPatch(objectId, meta.type)
if (childMeta && hasChildren) {
if (meta.type === 'list' || meta.type === 'text') {
// In list/text objects, parentKey is an elemID. First see if it already appears in an edit
for (let edit of patches[objectId].edits) {
if (edit.opId && meta.children[childMeta.parentKey][edit.opId]) {
patchExists = true
}
}
// If we need to add an edit, we first have to translate the elemId into an index
if (!patchExists) {
const obj = parseOpId(objectId), elem = parseOpId(childMeta.parentKey)
const seekPos = {
objActor: obj.actorId, objCtr: obj.counter,
keyActor: elem.actorId, keyCtr: elem.counter,
keyStr: null, insert: false,
objId: objectId
}
const { visibleCount } = seekToOp(docState, seekPos)
for (let [opId, value] of Object.entries(meta.children[childMeta.parentKey])) {
let patchValue = value
if (value.objectId) {
if (!patches[value.objectId]) patches[value.objectId] = emptyObjectPatch(value.objectId, value.type)
patchValue = patches[value.objectId]
}
const edit = {action: 'update', index: visibleCount, opId, value: patchValue}
appendEdit(patches[objectId].edits, edit)
}
}
} else {
// Non-list object: parentKey is the name of the property being updated (a string)
if (!patches[objectId].props[childMeta.parentKey]) {
patches[objectId].props[childMeta.parentKey] = {}
}
let values = patches[objectId].props[childMeta.parentKey]
for (let [opId, value] of Object.entries(meta.children[childMeta.parentKey])) {
if (values[opId]) {
patchExists = true
} else if (value.objectId) {
if (!patches[value.objectId]) patches[value.objectId] = emptyObjectPatch(value.objectId, value.type)
values[opId] = patches[value.objectId]
} else {
values[opId] = value
}
}
}
}
if (patchExists || !meta.parentObj || (childMeta && !hasChildren)) break
childMeta = meta
objectId = meta.parentObj
meta = docState.objectMeta[objectId]
}
}
return patches
}
/**
* Takes an array of decoded changes and applies them to a document. `docState` contains a bunch of
* fields describing the document state. This function mutates `docState` to contain the updated
* document state, and mutates `patches` to contain a patch to return to the frontend. Only the
* top-level `docState` object is mutated; all nested objects within it are treated as immutable.
* `objectIds` is mutated to contain the IDs of objects that are updated in any of the changes.
*
* Returns a two-element array `[applied, enqueued]`, where `applied` is an array of changes that
* have been applied to the document, and `enqueued` is an array of changes that have not yet been
* applied because they are missing a dependency.
*/
function applyChanges(patches, decodedChanges, docState, objectIds) {
let heads = new Set(docState.heads), changeHashes = new Set()
let clock = copyObject(docState.clock)
let applied = [], enqueued = []
for (let change of decodedChanges) {
// Skip any duplicate changes that we have already seen
if (docState.changeIndexByHash[change.hash] !== undefined || changeHashes.has(change.hash)) continue
let causallyReady = true
for (let dep of change.deps) {
const depIndex = docState.changeIndexByHash[dep]
if ((depIndex === undefined || depIndex === -1) && !changeHashes.has(dep)) {
causallyReady = false
}
}
if (causallyReady) {
const expectedSeq = (clock[change.actor] || 0) + 1
if (change.seq !== expectedSeq) {
throw new RangeError(`Expected seq ${expectedSeq}, got seq ${change.seq} from actor ${change.actor}`)
}
clock[change.actor] = change.seq
changeHashes.add(change.hash)
for (let dep of change.deps) heads.delete(dep)
heads.add(change.hash)
applied.push(change)
} else {
enqueued.push(change)
}
}
if (applied.length > 0) {
let changeState = {changes: applied, changeIndex: -1, objectIds}
readNextChangeOp(docState, changeState)
while (!changeState.done) applyOps(patches, changeState, docState)
docState.heads = [...heads].sort()
docState.clock = clock
}
return [applied, enqueued]
}
/**
* Scans the operations in a document and generates a patch that can be sent to the frontend to
* instantiate the current state of the document. `objectMeta` is mutated to contain information
* about the parent and children of each object in the document.
*/
function documentPatch(docState) {
for (let col of docState.blocks[0].columns) col.decoder.reset()
let propState = {}, docOp = null, blockIndex = 0
let patches = {_root: {objectId: '_root', type: 'map', props: {}}}
let lastObjActor = null, lastObjCtr = null, objectId = '_root', elemVisible = false, listIndex = 0
while (true) {
({ docOp, blockIndex } = readNextDocOp(docState, blockIndex))
if (docOp === null) break
if (docOp[objActorIdx] !== lastObjActor || docOp[objCtrIdx] !== lastObjCtr) {
objectId = `${docOp[objCtrIdx]}@${docState.actorIds[docOp[objActorIdx]]}`
lastObjActor = docOp[objActorIdx]
lastObjCtr = docOp[objCtrIdx]
propState = {}
listIndex = 0
elemVisible = false
}
if (docOp[insertIdx] && elemVisible) {
elemVisible = false
listIndex++
}
if (docOp[succNumIdx] === 0) elemVisible = true
if (docOp[idCtrIdx] > docState.maxOp) docState.maxOp = docOp[idCtrIdx]
for (let i = 0; i < docOp[succNumIdx]; i++) {
if (docOp[succCtrIdx][i] > docState.maxOp) docState.maxOp = docOp[succCtrIdx][i]
}
updatePatchProperty(patches, null, objectId, docOp, docState, propState, listIndex, docOp[succNumIdx])
}
return patches._root
}
/**
* Takes an encoded document whose headers have been parsed using `decodeDocumentHeader()` and reads
* from it the list of changes. Returns the document's current vector clock, i.e. an object mapping
* each actor ID (as a hex string) to the number of changes seen from that actor. Also returns an
* array of the actorIds whose most recent change has no dependents (i.e. the actors that
* contributed the current heads of the document), and an array of encoders that has been
* initialised to contain the columns of the changes list.
*/
function readDocumentChanges(doc) {
const columns = makeDecoders(doc.changesColumns, DOCUMENT_COLUMNS)
const actorD = columns[0].decoder, seqD = columns[1].decoder
const depsNumD = columns[5].decoder, depsIndexD = columns[6].decoder
if (columns[0].columnId !== DOCUMENT_COLUMNS[0].columnId || DOCUMENT_COLUMNS[0].columnName !== 'actor' ||
columns[1].columnId !== DOCUMENT_COLUMNS[1].columnId || DOCUMENT_COLUMNS[1].columnName !== 'seq' ||
columns[5].columnId !== DOCUMENT_COLUMNS[5].columnId || DOCUMENT_COLUMNS[5].columnName !== 'depsNum' ||
columns[6].columnId !== DOCUMENT_COLUMNS[6].columnId || DOCUMENT_COLUMNS[6].columnName !== 'depsIndex') {
throw new RangeError('unexpected columnId')
}
let numChanges = 0, clock = {}, actorNums = [], headIndexes = new Set()
while (!actorD.done) {
const actorNum = actorD.readValue(), seq = seqD.readValue(), depsNum = depsNumD.readValue()
const actorId = doc.actorIds[actorNum]
if (seq !== 1 && seq !== clock[actorId] + 1) {
throw new RangeError(`Expected seq ${clock[actorId] + 1}, got ${seq} for actor ${actorId}`)
}
actorNums.push(actorNum)
clock[actorId] = seq
headIndexes.add(numChanges)
for (let j = 0; j < depsNum; j++) headIndexes.delete(depsIndexD.readValue())
numChanges++
}
const headActors = [...headIndexes].map(index => doc.actorIds[actorNums[index]]).sort()
for (let col of columns) col.decoder.reset()
const encoders = columns.map(col => ({columnId: col.columnId, encoder: encoderByColumnId(col.columnId)}))
copyColumns(encoders, columns, numChanges)
return {clock, headActors, encoders, numChanges}
}
/**
* Records the metadata about a change in the appropriate columns.
*/
function appendChange(columns, change, actorIds, changeIndexByHash) {
appendOperation(columns, DOCUMENT_COLUMNS, [
actorIds.indexOf(change.actor), // actor
change.seq, // seq
change.maxOp, // maxOp
change.time, // time
change.message, // message
change.deps.length, // depsNum
change.deps.map(dep => changeIndexByHash[dep]), // depsIndex
change.extraBytes ? (change.extraBytes.byteLength << 4 | VALUE_TYPE.BYTES) : VALUE_TYPE.BYTES, // extraLen
change.extraBytes // extraRaw
])
}
class BackendDoc {
constructor(buffer) {
this.maxOp = 0
this.haveHashGraph = false
this.changes = []
this.changeIndexByHash = {}
this.dependenciesByHash = {}
this.dependentsByHash = {}
this.hashesByActor = {}
this.actorIds = []
this.heads = []
this.clock = {}
this.queue = []
this.objectMeta = {_root: {parentObj: null, parentKey: null, opId: null, type: 'map', children: {}}}
if (buffer) {
const doc = decodeDocumentHeader(buffer)
const {clock, headActors, encoders, numChanges} = readDocumentChanges(doc)
this.binaryDoc = buffer
this.changes = new Array(numChanges)
this.actorIds = doc.actorIds
this.heads = doc.heads
this.clock = clock
this.changesEncoders = encoders
this.extraBytes = doc.extraBytes
// If there is a single head, we can unambiguously point at the actorId and sequence number of
// the head hash without having to reconstruct the hash graph
if (doc.heads.length === 1 && headActors.length === 1) {
this.hashesByActor[headActors[0]] = []
this.hashesByActor[headActors[0]][clock[headActors[0]] - 1] = doc.heads[0]
}
// The encoded document gives each change an index, and expresses dependencies in terms of
// those indexes. Initialise the translation table from hash to index.
if (doc.heads.length === doc.headsIndexes.length) {
for (let i = 0; i < doc.heads.length; i++) {
this.changeIndexByHash[doc.heads[i]] = doc.headsIndexes[i]
}
} else if (doc.heads.length === 1) {
// If there is only one head, it must be the last change
this.changeIndexByHash[doc.heads[0]] = numChanges - 1
} else {
// We know the heads hashes, but not their indexes
for (let head of doc.heads) this.changeIndexByHash[head] = -1
}
this.blocks = [{columns: makeDecoders(doc.opsColumns, DOC_OPS_COLUMNS)}]
updateBlockMetadata(this.blocks[0], this.actorIds)
if (this.blocks[0].numOps > MAX_BLOCK_SIZE) {
this.blocks = splitBlock(this.blocks[0], this.actorIds)
}
let docState = {blocks: this.blocks, actorIds: this.actorIds, objectMeta: this.objectMeta, maxOp: 0}
this.initPatch = documentPatch(docState)
this.maxOp = docState.maxOp
} else {
this.haveHashGraph = true
this.changesEncoders = DOCUMENT_COLUMNS.map(col => ({columnId: col.columnId, encoder: encoderByColumnId(col.columnId)}))
this.blocks = [{
columns: makeDecoders([], DOC_OPS_COLUMNS),
bloom: new Uint8Array(BLOOM_FILTER_SIZE),
lastKey: {},
numVisible: {},
numOps: 0,
lastObjectActor: undefined,
lastObjectCtr: undefined,
firstVisibleActor: undefined,
firstVisibleCtr: undefined,
lastVisibleActor: undefined,
lastVisibleCtr: undefined
}]
}
}
/**
* Makes a copy of this BackendDoc that can be independently modified.
*/
clone() {
let copy = new BackendDoc()
copy.maxOp = this.maxOp
copy.haveHashGraph = this.haveHashGraph
copy.changes = this.changes.slice()
copy.changeIndexByHash = copyObject(this.changeIndexByHash)
copy.dependenciesByHash = copyObject(this.dependenciesByHash)
copy.dependentsByHash = Object.entries(this.dependentsByHash).reduce((acc, [k, v]) => { acc[k] = v.slice(); return acc }, {})
copy.hashesByActor = Object.entries(this.hashesByActor).reduce((acc, [k, v]) => { acc[k] = v.slice(); return acc }, {})
copy.actorIds = this.actorIds // immutable, no copying needed
copy.heads = this.heads // immutable, no copying needed
copy.clock = this.clock // immutable, no copying needed
copy.blocks = this.blocks // immutable, no copying needed
copy.objectMeta = this.objectMeta // immutable, no copying needed
copy.queue = this.queue // immutable, no copying needed
return copy
}
/**
* Parses the changes given as Uint8Arrays in `changeBuffers`, and applies them to the current
* document. Returns a patch to apply to the frontend. If an exception is thrown, the document
* object is not modified.
*/
applyChanges(changeBuffers, isLocal = false) {
// decoded change has the form { actor, seq, startOp, time, message, deps, actorIds, hash, columns, buffer }
let decodedChanges = changeBuffers.map(buffer => {
const decoded = decodeChangeColumns(buffer)
decoded.buffer = buffer
return decoded
})
let patches = {_root: {objectId: '_root', type: 'map', props: {}}}
let docState = {
maxOp: this.maxOp,
changeIndexByHash: this.changeIndexByHash,
actorIds: this.actorIds,
heads: this.heads,
clock: this.clock,
blocks: this.blocks.slice(),
objectMeta: Object.assign({}, this.objectMeta)
}
let queue = (this.queue.length === 0) ? decodedChanges : decodedChanges.concat(this.queue)
let allApplied = [], objectIds = new Set()
while (true) {
const [applied, enqueued] = applyChanges(patches, queue, docState, objectIds)
queue = enqueued
if (applied.length > 0) allApplied = allApplied.concat(applied)
if (queue.length === 0) break
// If we are missing a dependency, and we haven't computed the hash graph yet, first compute
// the hashes to see if we actually have it already
if (applied.length === 0) {
if (!this.haveHashGraph) this.computeHashGraph(); else break
}
}
setupPatches(patches, objectIds, docState)
// Update the document state only if `applyChanges` does not throw an exception
for (let change of allApplied) {
this.changes.push(change.buffer)
if (!this.hashesByActor[change.actor]) this.hashesByActor[change.actor] = []
this.hashesByActor[change.actor][change.seq - 1] = change.hash
this.changeIndexByHash[change.hash] = this.changes.length - 1
this.dependenciesByHash[change.hash] = change.deps
this.dependentsByHash[change.hash] = []
for (let dep of change.deps) {
if (!this.dependentsByHash[dep]) this.dependentsByHash[dep] = []
this.dependentsByHash[dep].push(change.hash)
}
appendChange(this.changesEncoders, change, docState.actorIds, this.changeIndexByHash)
}
this.maxOp = docState.maxOp
this.actorIds = docState.actorIds
this.heads = docState.heads
this.clock = docState.clock
this.blocks = docState.blocks
this.objectMeta = docState.objectMeta
this.queue = queue
this.binaryDoc = null
this.initPatch = null
let patch = {
maxOp: this.maxOp, clock: this.clock, deps: this.heads,
pendingChanges: this.queue.length, diffs: patches._root
}
if (isLocal && decodedChanges.length === 1) {
patch.actor = decodedChanges[0].actor
patch.seq = decodedChanges[0].seq
}
return patch
}
/**
* Reconstructs the full change history of a document, and initialises the variables that allow us
* to traverse the hash graph of changes and their dependencies. When a compressed document is
* loaded we defer the computation of this hash graph to make loading faster, but if the hash
* graph is later needed (e.g. for the sync protocol), this function fills it in.
*/
computeHashGraph() {
const binaryDoc = this.save()
this.haveHashGraph = true
this.changes = []
this.changeIndexByHash = {}
this.dependenciesByHash = {}
this.dependentsByHash = {}
this.hashesByActor = {}
this.clock = {}
for (let change of decodeChanges([binaryDoc])) {
const binaryChange = encodeChange(change) // TODO: avoid decoding and re-encoding again
this.changes.push(binaryChange)
this.changeIndexByHash[change.hash] = this.changes.length - 1
this.dependenciesByHash[change.hash] = change.deps
this.dependentsByHash[change.hash] = []
for (let dep of change.deps) this.dependentsByHash[dep].push(change.hash)
if (change.seq === 1) this.hashesByActor[change.actor] = []
this.hashesByActor[change.actor].push(change.hash)
const expectedSeq = (this.clock[change.actor] || 0) + 1
if (change.seq !== expectedSeq) {
throw new RangeError(`Expected seq ${expectedSeq}, got seq ${change.seq} from actor ${change.actor}`)
}
this.clock[change.actor] = change.seq
}
}
/**
* Returns all the changes that need to be sent to another replica. `haveDeps` is a list of change
* hashes (as hex strings) of the heads that the other replica has. The changes in `haveDeps` and
* any of their transitive dependencies will not be returned; any changes later than or concurrent
* to the hashes in `haveDeps` will be returned. If `haveDeps` is an empty array, all changes are
* returned. Throws an exception if any of the given hashes are not known to this replica.
*/
getChanges(haveDeps) {
if (!this.haveHashGraph) this.computeHashGraph()
// If the other replica has nothing, return all changes in history order
if (haveDeps.length === 0) {
return this.changes.slice()
}
// Fast path for the common case where all new changes depend only on haveDeps
let stack = [], seenHashes = {}, toReturn = []
for (let hash of haveDeps) {
seenHashes[hash] = true
const successors = this.dependentsByHash[hash]
if (!successors) throw new RangeError(`hash not found: ${hash}`)
stack.push(...successors)
}
// Depth-first traversal of the hash graph to find all changes that depend on `haveDeps`
while (stack.length > 0) {
const hash = stack.pop()
seenHashes[hash] = true
toReturn.push(hash)
if (!this.dependenciesByHash[hash].every(dep => seenHashes[dep])) {
// If a change depends on a hash we have not seen, abort the traversal and fall back to the
// slower algorithm. This will sometimes abort even if all new changes depend on `haveDeps`,
// because our depth-first traversal is not necessarily a topological sort of the graph.
break
}
stack.push(...this.dependentsByHash[hash])
}
// If the traversal above has encountered all the heads, and was not aborted early due to
// a missing dependency, then the set of changes it has found is complete, so we can return it
if (stack.length === 0 && this.heads.every(head => seenHashes[head])) {
return toReturn.map(hash => this.changes[this.changeIndexByHash[hash]])
}
// If we haven't encountered all of the heads, we have to search harder. This will happen if
// changes were added that are concurrent to `haveDeps`
stack = haveDeps.slice()
seenHashes = {}
while (stack.length > 0) {
const hash = stack.pop()
if (!seenHashes[hash]) {
const deps = this.dependenciesByHash[hash]
if (!deps) throw new RangeError(`hash not found: ${hash}`)
stack.push(...deps)
seenHashes[hash] = true
}
}
return this.changes.filter(change => !seenHashes[decodeChangeMeta(change, true).hash])
}
/**
* Returns all changes that are present in this BackendDoc, but not present in the `other`
* BackendDoc.
*/
getChangesAdded(other) {
if (!this.haveHashGraph) this.computeHashGraph()
// Depth-first traversal from the heads through the dependency graph,
// until we reach a change that is already present in opSet1
let stack = this.heads.slice(), seenHashes = {}, toReturn = []
while (stack.length > 0) {
const hash = stack.pop()
if (!seenHashes[hash] && other.changeIndexByHash[hash] === undefined) {
seenHashes[hash] = true
toReturn.push(hash)
stack.push(...this.dependenciesByHash[hash])
}
}
// Return those changes in the reverse of the order in which the depth-first search
// found them. This is not necessarily a topological sort, but should usually be close.
return toReturn.reverse().map(hash => this.changes[this.changeIndexByHash[hash]])
}
getChangeByHash(hash) {
if (!this.haveHashGraph) this.computeHashGraph()
return this.changes[this.changeIndexByHash[hash]]
}
/**
* Returns the hashes of any missing dependencies, i.e. where we have tried to apply a change that
* has a dependency on a change we have not seen.
*
* If the argument `heads` is given (an array of hexadecimal strings representing hashes as
* returned by `getHeads()`), this function also ensures that all of those hashes resolve to
* either a change that has been applied to the document, or that has been enqueued for later
* application once missing dependencies have arrived. Any missing heads hashes are included in
* the returned array.
*/
getMissingDeps(heads = []) {
if (!this.haveHashGraph) this.computeHashGraph()
let allDeps = new Set(heads), inQueue = new Set()
for (let change of this.queue) {
inQueue.add(change.hash)
for (let dep of change.deps) allDeps.add(dep)
}
let missing = []
for (let hash of allDeps) {
if (this.changeIndexByHash[hash] === undefined && !inQueue.has(hash)) missing.push(hash)
}
return missing.sort()
}
/**
* Serialises the current document state into a single byte array.
*/
save() {
if (this.binaryDoc) return this.binaryDoc
// Getting the byte array for the changes columns finalises their encoders, after which we can
// no longer append values to them. We therefore copy their data over to fresh encoders.
const newEncoders = this.changesEncoders.map(col => ({columnId: col.columnId, encoder: encoderByColumnId(col.columnId)}))
const decoders = this.changesEncoders.map(col => {
const decoder = decoderByColumnId(col.columnId, col.encoder.buffer)
return {columnId: col.columnId, decoder}
})
copyColumns(newEncoders, decoders, this.changes.length)
this.binaryDoc = encodeDocumentHeader({
changesColumns: this.changesEncoders,
opsColumns: concatBlocks(this.blocks),
actorIds: this.actorIds, // TODO: sort actorIds (requires transforming all actorId columns in opsColumns)
heads: this.heads,
headsIndexes: this.heads.map(hash => this.changeIndexByHash[hash]),
extraBytes: this.extraBytes
})
this.changesEncoders = newEncoders
return this.binaryDoc
}
/**
* Returns a patch from which we can initialise the current state of the backend.
*/
getPatch() {
const objectMeta = {_root: {parentObj: null, parentKey: null, opId: null, type: 'map', children: {}}}
const docState = {blocks: this.blocks, actorIds: this.actorIds, objectMeta, maxOp: 0}
const diffs = this.initPatch ? this.initPatch : documentPatch(docState)
return {
maxOp: this.maxOp, clock: this.clock, deps: this.heads,
pendingChanges: this.queue.length, diffs
}
}
}
module.exports = { MAX_BLOCK_SIZE, BackendDoc, bloomFilterContains }