in source/api/services/search/lib/metadata.js [359:479]
metadata.prototype.indexColumns = function (event, cb) {
let crawlerName = event.detail.crawlerName;
let databaseName = crawlerName.replace(/ /g,"_");
let packageId = crawlerName.split(' ').pop();
let columnNames = [];
let columnComments = [];
let tableDescs = [];
let params = {};
let tableStats = {
averageRecordSize: 0,
objectCount: 0,
recordCount: 0,
sizeKey: 0
};
let glue = new AWS.Glue();
let docClient = new AWS.DynamoDB.DocumentClient(dynamoConfig);
//-------------------------------------------------------------------------
// Cancel if the package is already deleted
//-------------------------------------------------------------------------
params = {
TableName: 'data-lake-packages',
Key: {
package_id: packageId
}
};
docClient.get(params, function(err, data) {
if (err || data.deleted) {
console.log(err);
return cb({code: 502, message: "Failed to retrieve package info."}, null);
}
//---------------------------------------------------------------------
// Get column names, comments and table descriptions
//---------------------------------------------------------------------
params = {
DatabaseName: databaseName
};
glue.getTables(params, function(err, data) {
if (err) {
console.log(err);
return cb({code: 502, message: "Failed to retrieve tables info."}, null);
}
// Column Names
columnNames = data.TableList.map(function(table) {
return table.StorageDescriptor.Columns.map(function(column) {
return column.Name;
});
});
columnNames = _.chain(columnNames).flatten().uniq()._wrapped;
// Column Comments
columnComments = data.TableList.map(function(table) {
return table.StorageDescriptor.Columns.map(function(column) {
return column.Comment;
});
});
columnComments = _.chain(columnComments).flatten().uniq().compact()._wrapped;
// Column Descriptions
tableDescs = data.TableList.map(function(table) {
return table.Description;
});
tableDescs = _.chain(tableDescs).flatten().uniq().compact()._wrapped;
// Tables Statistics
let counter = 0;
data.TableList.map(function(table) {
if (table.Parameters) {
counter++;
tableStats.averageRecordSize += table.Parameters.averageRecordSize ? parseFloat(table.Parameters.averageRecordSize) : 0;
tableStats.objectCount += table.Parameters.objectCount ? parseInt(table.Parameters.objectCount) : 0;
tableStats.recordCount += table.Parameters.recordCount ? parseInt(table.Parameters.recordCount) : 0;
tableStats.sizeKey += table.Parameters.sizeKey ? parseInt(table.Parameters.sizeKey) : 0;
}
});
if (counter > 0) {
tableStats.averageRecordSize /= counter;
tableStats.averageRecordSize = tableStats.averageRecordSize.toFixed(2);
}
//-----------------------------------------------------------------
// Index meta-data
//-----------------------------------------------------------------
getConfigInfo(function(err, config) {
if (err || _.isEmpty(config)) {
console.log(err);
return cb({code: 502, message: "Failed to retrieve config."}, null);
}
let client = require('elasticsearch').Client({
hosts: config.Item.setting.esurl,
connectionClass: require('http-aws-es'),
amazonES: {
region: process.env.AWS_REGION,
credentials: creds
}
});
client.updateByQuery({
index: config.Item.setting.esindex,
type: 'package',
body: {
"query": { "match": { "package_id": packageId } },
"script": {
"inline": "ctx._source.column_name = params.columnNames; ctx._source.column_comment = params.columnComments; ctx._source.table_desc = params.tableDescs; ctx._source.table_stats = params.tableStats",
"params": {"columnNames": columnNames, "columnComments": columnComments, "tableDescs": tableDescs, "tableStats": tableStats}
}
}
}).then(function(body) {
return cb(null, {code: 200, message: "Index updated."});
}, function(err) {
console.log(err);
return cb({code: 502, message: "Failed to update ES document."}, null);
});
});
});
});
};