in packages/dynamodb-batch-iterator/src/BatchWrite.ts [24:87]
protected async doBatchRequest() {
const inFlight: Array<[string, WriteRequest]> = [];
const operationInput: BatchWriteItemInput = {RequestItems: {}};
let batchSize = 0;
while (this.toSend.length > 0) {
const [
tableName,
marshalled
] = this.toSend.shift() as [string, WriteRequest];
inFlight.push([tableName, marshalled]);
if (operationInput.RequestItems[tableName] === undefined) {
operationInput.RequestItems[tableName] = [];
}
operationInput.RequestItems[tableName].push(marshalled);
if (++batchSize === this.batchSize) {
break;
}
}
const {
UnprocessedItems = {}
} = await this.client.batchWriteItem(operationInput).promise();
const unprocessedTables = new Set<string>();
for (const table of Object.keys(UnprocessedItems)) {
unprocessedTables.add(table);
const unprocessed: Array<WriteRequest> = [];
for (const item of UnprocessedItems[table]) {
if (item.DeleteRequest || item.PutRequest) {
unprocessed.push(item as WriteRequest);
const identifier = itemIdentifier(table, item as WriteRequest);
for (let i = inFlight.length - 1; i >= 0; i--) {
const [tableName, attributes] = inFlight[i];
if (
tableName === table &&
itemIdentifier(tableName, attributes) === identifier
) {
inFlight.splice(i, 1);
}
}
}
}
this.handleThrottled(table, unprocessed);
}
this.movePendingToThrottled(unprocessedTables);
const processedTables = new Set<string>();
for (const [tableName, marshalled] of inFlight) {
processedTables.add(tableName);
this.pending.push([tableName, marshalled]);
}
for (const tableName of processedTables) {
this.state[tableName].backoffFactor =
Math.max(0, this.state[tableName].backoffFactor - 1);
}
}