exports.step=function()

in lambda/import/index.js [11:157]


exports.step=function(event,context,cb){
    console.log("step")
    console.log("Request",JSON.stringify(event,null,2))
    var Bucket=event.Records[0].s3.bucket.name
    var Key=decodeURI(event.Records[0].s3.object.key)
    
    var progress
    console.log(Bucket,Key);
    s3.waitFor('objectExists',{Bucket,Key}).promise()
    .then(()=>s3.getObject({Bucket,Key}).promise())
    .then(x=>JSON.parse(x.Body.toString()))
    .then(function(config){
        console.log("Config:",JSON.stringify(config,null,2));
        if(config.status==="InProgress"){
            // TODO - design a more robust way to identify target ES index for auto import of metrics and feedback
            // Filenames must match across:
            // aws-ai-qna-bot/templates/import/UpgradeAutoImport.js
            // aws-ai-qna-bot/templates/master/UpgradeAutoExport.js
            // and pattern in /aws-ai-qna-bot/lambda/import/index.js
            var esindex = process.env.ES_INDEX ;
            if (Key.match(/.*ExportAll_QnABot_.*_metrics\.json/)) {
                esindex = process.env.ES_METRICSINDEX ;
            } else if (Key.match(/.*ExportAll_QnABot_.*_feedback\.json/)) {
                esindex = process.env.ES_FEEDBACKINDEX ;        
            } 
            console.log("Importing to index: ", esindex);
            return s3.getObject({
                Bucket:config.bucket,
                Key:config.key,
                VersionId:config.version,
                Range:`bytes=${config.start}-${config.end}`
            }).promise()
            .then(function(result){
                config.buffer+=result.Body.toString()
                
                var objects=config.buffer.split(/\n/)
                try {
                    JSON.parse(objects[objects.length-1])
                    config.buffer=""
                } catch(e){
                    config.buffer=objects.pop()
                }
                var out=[]
                objects.filter(x=>x)
                .forEach(x=>{
                    try{
                        var obj=JSON.parse(x)
                        var timestamp=_.get(obj,'datetime',"");
                        var docid ;
                        if (timestamp === "") {
                            // only metrics and feedback items have datetime field.. This must be a qna item.
                            obj.type=obj.type || 'qna'
                            obj.q = obj.q.map(x=>{ x = x.replace(/\\*"/g,''); return x});
                            if(obj.type==='qna'){
                                try
                                {
                                    obj.questions=obj.q.map(x=>{return {q:x}});
                                    obj.quniqueterms=obj.q.join(" ");
                                }
                                catch(err){
                                    console.log("skipping question invalid answer format")
                                }
                                delete obj.q
                            }
                            docid = obj._id || obj.qid ;
                        } else {
                            docid = obj._id || obj.qid + "_upgrade_restore_" + timestamp;
                            // Stringify session attributes
                            var sessionAttrs = _.get(obj,"entireResponse.session",{}) ;
                            for (var key of Object.keys(sessionAttrs)) {
                                if (typeof sessionAttrs[key] != 'string') {
                                    sessionAttrs[key]=JSON.stringify(sessionAttrs[key]);
                                }
                            }
                        }
                        delete obj._id;
                        out.push(JSON.stringify({
                            index:{
                                "_index":esindex,
                                "_type":"_doc",
                                "_id":docid 
                            }
                        }))
                        config.count+=1
                        out.push(JSON.stringify(obj))
                    } catch(e){
                        config.failed+=1
                        console.log("Failed to Parse:",e,x)
                    }
                })
                console.log(result.ContentRange)
                var tmp=result.ContentRange.match(/bytes (.*)-(.*)\/(.*)/)
                progress=(parseInt(tmp[2])+1)/parseInt(tmp[3])

                return out.join('\n')+'\n'
            })
            .then(function(result){
                var body={
                    endpoint:process.env.ES_ENDPOINT,
                    method:"POST",
                    path:"/_bulk",
                    body:result
                }
                
                return lambda.invoke({
                    FunctionName:process.env.ES_PROXY,
                    Payload:JSON.stringify(body)
                }).promise()
                .tap(console.log)
                .then(x=>{
                    config.EsErrors.push(JSON.parse(_.get(x,"Payload","{}")).errors)
                })
            })
            .then(()=>{
                config.start=(config.end+1)
                config.end=config.start+config.stride
                config.progress=progress
                config.time.rounds+=1
                
                if(config.progress>=1){
                    config.status="Complete"
                    config.time.end=(new Date()).toISOString()
                }
            
                console.log("EndConfig:",JSON.stringify(config,null,2))
                return s3.putObject({
                    Bucket:Bucket,
                    Key:Key,
                    Body:JSON.stringify(config)
                }).promise()
                .then(result=>cb(null))
            })
            .catch(error=>{
                console.log(error)
                config.status="Error"
                config.message=JSON.stringify(error)
                return s3.putObject({
                    Bucket:Bucket,
                    Key:Key,
                    Body:JSON.stringify(config)
                }).promise()
                .then(()=>cb(error))
            })
        }
    })
    .catch(cb)
}