Elastiflix/movie-data-loader/index.js (64 lines of code) (raw):

const zlib = require('zlib'); const { Client } = require('@elastic/elasticsearch'); const { Readable } = require('stream'); const filePath = './movies.json.gz'; const fs = require('fs'); var data = {}; var totalDocs = 0; const ELASTICSEARCH_URL = process.env.ELASTICSEARCH_URL || ""; const ELASTICSEARCH_USERNAME = process.env.ELASTICSEARCH_USERNAME || "elastic"; const ELASTICSEARCH_PASSWORD = process.env.ELASTICSEARCH_PASSWORD || ""; if (ELASTICSEARCH_URL == "") { console.log("ELASTICSEARCH_URL environment variable not set, exiting") process.exit(1) } else { // remove trailing slash from ELASTICSEARCH_URL if (ELASTICSEARCH_URL.endsWith("/")) { ELASTICSEARCH_URL = ELASTICSEARCH_URL.slice(0, -1); } } if (ELASTICSEARCH_PASSWORD == "") { console.log("ELASTICSEARCH_PASSWORD environment variable not set, exiting") process.exit(1) } function loadFile() { console.log("loadFile") try { const gunzip = zlib.createGunzip(); const readStream = fs.createReadStream(filePath); const jsonStream = readStream.pipe(gunzip); let jsonString = ''; jsonStream.on('data', function(chunk) { jsonString += chunk.toString(); }); return new Promise((resolve, reject) => { jsonStream.on('end', function() { data = JSON.parse(jsonString); totalDocs = Object.keys(data).length; resolve(data) }); }); } catch (err) { console.log(err); reject(err); } } async function loadMovies() { const client = new Client({ nodes: [ELASTICSEARCH_URL], auth: { username: ELASTICSEARCH_USERNAME, password: ELASTICSEARCH_PASSWORD } }); // delete the index if it exists await client.indices.delete({ index: 'elastiflix-movies', ignore_unavailable: true }); // create the index using the mapping in the file ./mapping.json await client.indices.create({ index: 'elastiflix-movies', body: require('./mapping.json') }); var counter = 0; const result = await client.helpers.bulk({ flushBytes: 90000, datasource: await loadFile(), onDocument (doc) { // log every 1000th doc if (counter % 100 === 0) { console.log(counter / totalDocs * 100 + "%"); } counter++; if (doc.poster_path !== null && doc.backdrop_path !== null) { return { index: { _index: 'elastiflix-movies', _id: doc.id } }; } return { index: { _index: 'elastiflix-movies-errors', _id: doc.id } }; }, onDrop (doc) { console.log(doc) } }); console.log(result); console.log("indexing finished") } console.log("starting movie indexing") loadMovies()