src/common/persistence/SqlExtentMetadataStore.ts (180 lines of code) (raw):
import {
BIGINT,
Model,
Op,
Options as SequelizeOptions,
Sequelize
} from "sequelize";
import AllExtentsAsyncIterator from "./AllExtentsAsyncIterator";
import IExtentMetadataStore, { IExtentModel } from "./IExtentMetadataStore";
// tslint:disable: max-classes-per-file
class ExtentsModel extends Model {}
/**
* A SQL based extent metadata storage implementation based on Sequelize.
* Refer to CONTRIBUTION.md for how to setup SQL database environment.
*
* @export
* @class SqlExtentMetadataStore
* @implements {IExtentMetadataStore}
*/
export default class SqlExtentMetadataStore implements IExtentMetadataStore {
private initialized: boolean = false;
private closed: boolean = false;
private readonly sequelize: Sequelize;
/**
* Creates an instance of SqlExtentMetadataStore.
*
* @param {string} connectionURI For example, "postgres://user:pass@example.com:5432/dbname"
* @param {SequelizeOptions} [sequelizeOptions]
* @memberof SqlBlobMetadataStore
*/
public constructor(
connectionURI: string,
sequelizeOptions?: SequelizeOptions
) {
// Enable encrypt connection for SQL Server
if (connectionURI.startsWith("mssql") && sequelizeOptions) {
sequelizeOptions.dialectOptions = sequelizeOptions.dialectOptions || {};
(sequelizeOptions.dialectOptions as any).options =
(sequelizeOptions.dialectOptions as any).options || {};
(sequelizeOptions.dialectOptions as any).options.encrypt = true;
}
this.sequelize = new Sequelize(connectionURI, sequelizeOptions);
}
public async init(): Promise<void> {
await this.sequelize.authenticate();
ExtentsModel.init(
{
id: {
type: "VARCHAR(255)",
primaryKey: true
},
locationId: {
allowNull: false,
type: "VARCHAR(255)"
},
path: {
type: "VARCHAR(255)"
},
size: {
allowNull: false,
type: BIGINT.UNSIGNED
},
lastModifiedInMS: {
allowNull: false,
type: BIGINT.UNSIGNED
}
},
{ sequelize: this.sequelize, modelName: "Extents", timestamps: false }
);
// TODO: Remove this part which only for test.
await this.sequelize.sync();
this.initialized = true;
}
public isInitialized(): boolean {
return this.initialized;
}
public async close(): Promise<void> {
await this.sequelize.close();
this.closed = true;
}
public isClosed(): boolean {
return this.closed;
}
public async clean(): Promise<void> {
// TODO: Implement cleanup in database
}
/**
* Update the extent status in DB. A new item will be created if the extent does not exists.
*
* @param {IExtentModel} extent
* @returns {Promise<void>}
* @memberof LokiExtentMetadata
*/
public async updateExtent(extent: IExtentModel): Promise<void> {
return ExtentsModel.upsert({
...extent
})
.then(() => {
return;
})
.catch((err) => {
// console.log(`SqlExtentMetadataStore.updateExtent() upsert err:${err}`);
throw err;
});
}
/**
*
* List extents.
* @param {string} [id]
* @param {number} [maxResults]
* @param {(number | undefined)} [marker]
* @param {Date} [queryTime]
* @param {number} [protectTimeInMs]
* @returns {(Promise<[IExtentModel[], number | undefined]>)}
* @memberof SqlExtentMetadataStore
*/
public async listExtents(
id?: string,
maxResults?: number,
marker?: number | string | undefined,
queryTime?: Date,
protectTimeInMs?: number
): Promise<[IExtentModel[], number | undefined]> {
const query: any = {};
if (id !== undefined) {
query.id = id;
// console.log(`SqlExtentMetadataStore.listExtents() query ${id}`);
}
if (maxResults === undefined) {
maxResults = 5000;
}
if (protectTimeInMs === undefined) {
protectTimeInMs = 0;
}
if (queryTime !== undefined) {
query.lastModifiedInMS = {
[Op.lt]: queryTime.getTime() - protectTimeInMs
};
}
if (marker !== undefined) {
query.id = {
[Op.gt]: marker
};
}
const modelConvert = (extentsModel: ExtentsModel): IExtentModel => {
const getId = this.getModelValue<string>(extentsModel, "id", true);
return {
id: getId,
locationId: this.getModelValue<string>(
extentsModel,
"locationId",
true
),
path: this.getModelValue<string>(extentsModel, "path") || getId,
size: this.getModelValue<number>(extentsModel, "size", true),
lastModifiedInMS: this.getModelValue<number>(
extentsModel,
"lastModifiedInMS",
true
)
};
};
return ExtentsModel.findAll({
limit: maxResults,
where: query as any,
order: [["id", "ASC"]]
}).then((res) => {
if (res.length < maxResults!) {
return [res.map((val) => modelConvert(val)), undefined];
} else {
const tailItem = res[res.length - 1];
const nextMarker = this.getModelValue<number>(tailItem, "id", true);
return [res.map((val) => modelConvert(val)), nextMarker];
}
});
}
/**
* Delete the extent metadata from DB with the extentId.
*
* @param {string} extentId
* @returns {Promise<void>}
* @memberof IExtentMetadata
*/
public async deleteExtent(extentId: string): Promise<void> {
return ExtentsModel.destroy({
where: {
id: extentId
}
}).then(() => {
return;
});
}
/**
* Get the locationId for a given extentId.
*
* @param {string} extentId
* @returns {Promise<string>}
* @memberof IExtentMetadata
*/
public async getExtentLocationId(extentId: string): Promise<string> {
return ExtentsModel.findOne({
where: {
id: extentId
}
}).then((res) => {
if (res === null || res === undefined) {
throw Error(
`SqlExtentMetadataStore:getExtentLocationId() Error. Extent not exists.`
);
}
const locationId = this.getModelValue<string>(res, "locationId", true);
return locationId;
});
}
public iteratorExtents(): AsyncIterator<string[]> {
return new AllExtentsAsyncIterator(this);
}
private getModelValue<T>(model: Model, key: string): T | undefined;
private getModelValue<T>(model: Model, key: string, isRequired: true): T;
private getModelValue<T>(
model: Model,
key: string,
isRequired?: boolean
): T | undefined {
const value = model.get(key) as T | undefined;
if (value === undefined && isRequired === true) {
// tslint:disable-next-line:max-line-length
throw new Error(
`SqlBlobMetadataStore:getModelValue() error. ${key} is required but value from database model is undefined.`
);
}
return value;
}
}