in source/packages/services/events-processor/src/api/eventsources/eventsource.dao.ts [50:180]
pk: createDelimitedAttribute(PkType.EventSource, es.id),
sk: createDelimitedAttribute(PkType.Type, PkType.EventSource),
gsi1Sort: createDelimitedAttribute(PkType.EventSource, es.enabled, es.id),
name: es.name,
sourceType: es.sourceType,
principal: es.principal,
enabled: es.enabled,
dynamoDb: es.dynamoDb,
iotCore: es.iotCore
}
}
};
params.RequestItems[this.eventConfigTable]=[eventSourceCreate];
const result = await this.dynamoDbUtils.batchWriteAll(params);
if (this.dynamoDbUtils.hasUnprocessedItems(result)) {
throw new Error('CREATE_EVENT_SOURCE_FAILED');
}
logger.debug(`events.dao create: exit:`);
}
public async list(): Promise<EventSourceItem[]> {
logger.debug('eventsource.dao get: list:');
const params:DocumentClient.QueryInput = {
TableName: this.eventConfigTable,
IndexName: this.eventConfigGSI1,
KeyConditionExpression: `#sk = :sk`,
ExpressionAttributeNames: {
'#sk': 'sk'
},
ExpressionAttributeValues: {
':sk': createDelimitedAttribute(PkType.Type, PkType.EventSource)
}
};
logger.debug(`eventsource.dao list: QueryInput: ${JSON.stringify(params)}`);
const results = await this._cachedDc.query(params).promise();
if (results.Items===undefined) {
logger.debug('eventsource.dao list: exit: undefined');
return undefined;
}
logger.debug(`query result: ${JSON.stringify(results)}`);
const response:EventSourceItem[]=[];
for(const i of results.Items) {
response.push( this.assembleItem(i));
}
logger.debug(`eventsource.dao list: exit: response:${JSON.stringify(response)}`);
return response;
}
private assembleItem(attrs:DocumentClient.AttributeMap) {
const r:EventSourceItem = {
id: expandDelimitedAttribute(attrs.pk)[1],
name: attrs.name,
sourceType: attrs.sourceType,
principal: attrs.principal,
enabled: attrs.enabled,
dynamoDb: attrs.dynamoDb,
iotCore: attrs.iotCore
} ;
return r;
}
public async get(eventSourceId:string): Promise<EventSourceItem> {
logger.debug(`eventsource.dao get: in: eventSourceId:${eventSourceId}`);
const params:DocumentClient.QueryInput = {
TableName: this.eventConfigTable,
KeyConditionExpression: `#hash=:hash AND #range=:range`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#range': 'sk'
},
ExpressionAttributeValues: {
':hash': createDelimitedAttribute(PkType.EventSource, eventSourceId),
':range': createDelimitedAttribute(PkType.Type, PkType.EventSource)
}
};
logger.debug(`eventsource.dao get: QueryInput: ${JSON.stringify(params)}`);
const results = await this._cachedDc.query(params).promise();
if (results.Items===undefined || results.Items.length===0) {
logger.debug('eventsource.dao get: exit: undefined');
return undefined;
}
logger.debug(`query result: ${JSON.stringify(results)}`);
const response:EventSourceItem = this.assembleItem(results.Items[0]);
logger.debug(`eventsource.dao get: exit: response:${JSON.stringify(response)}`);
return response;
}
public async delete(eventSourceId:string): Promise<void> {
logger.debug(`eventsource.dao delete: in: eventSourceId:${eventSourceId}`);
// start to build up delete requests
const deleteParams:DocumentClient.BatchWriteItemInput = {
RequestItems: {}
};
deleteParams.RequestItems[this.eventConfigTable]=[];
// find the event source record to be deleted
const queryParams:DocumentClient.QueryInput = {
TableName: this.eventConfigTable,
KeyConditionExpression: `#hash=:hash`,
ExpressionAttributeNames: {
'#hash': 'pk'
},
ExpressionAttributeValues: {
':hash': createDelimitedAttribute(PkType.EventSource, eventSourceId)
}
};
const results = await this._cachedDc.query(queryParams).promise();
// if found, add to the list to be deleted
if (results.Items!==undefined && results.Items.length>0) {
for (const item of results.Items) {
deleteParams.RequestItems[this.eventConfigTable].push({