index.js (183 lines of code) (raw):
const config = require('./config.js');
const { Octokit } = require('@octokit/rest');
const { createAppAuth } = require('@octokit/auth-app');
const { retry } = require('@octokit/plugin-retry');
const { throttling } = require('@octokit/plugin-throttling');
const { Client } = require('@elastic/elasticsearch');
const moment = require('moment');
const CACHE_INDEX = 'crawler-cache';
const client = new Client({ ...config.elasticsearch, compression: 'gzip' });
const RetryOctokit = Octokit.plugin(retry, throttling);
const octokit = new RetryOctokit({
previews: ['squirrel-girl-preview'],
authStrategy: createAppAuth,
auth: config.githubAuth,
request: { retries: 2 },
throttle: {
onRateLimit: (retryAfter, options, octokit) => {
octokit.log.warn(`Request quota exhausted.`);
},
onAbuseLimit: (retryAfter, options, octokit) => {
octokit.log.warn(`Abuse limit triggered, retrying after ${retryAfter}s ...`);
return true;
}
},
retry: {
doNotRetry: ['429'],
},
});
/**
* Enhace a passed in date, into an object that contains further useful
* information about that date (e.g. day of the week or hour of day).
*/
function enhanceDate(date) {
if (!date) return null;
const m = moment(date);
return {
time: m.format(),
weekday: m.format('ddd'),
weekday_number: parseInt(m.format('d')),
hour_of_day: parseInt(m.format('H'))
};
}
/**
* Takes in the raw issue from the GitHub API response and must return the
* object that should be stored inside Elasticsearch.
*/
function convertIssue(owner, repo, raw) {
const time_to_fix = (raw.created_at && raw.closed_at) ?
moment(raw.closed_at).diff(moment(raw.created_at)) :
null;
return {
id: raw.id,
last_crawled_at: Date.now(),
owner: owner,
repo: repo,
state: raw.state,
title: raw.title,
number: raw.number,
url: raw.url,
locked: raw.locked,
comments: raw.comments,
created_at: enhanceDate(raw.created_at),
updated_at: enhanceDate(raw.updated_at),
closed_at: enhanceDate(raw.closed_at),
author_association: raw.author_association,
user: raw.user.login,
body: raw.body,
labels: raw.labels.map(label => label.name),
is_pullrequest: !!raw.pull_request,
assignees: !raw.assignees ? null : raw.assignees.map(a => a.login),
reactions: !raw.reactions ? null : {
total: raw.reactions.total_count,
upVote: raw.reactions['+1'],
downVote: raw.reactions['-1'],
laugh: raw.reactions.laugh,
hooray: raw.reactions.hooray,
confused: raw.reactions.confused,
heart: raw.reactions.hearts,
rocket: raw.reactions.rocket,
eyes: raw.reactions.eyes,
},
time_to_fix: time_to_fix,
};
}
/**
* Create a bulk request body for all issues. You need to specify the index in
* which these issues should be stored.
*/
function getIssueBulkUpdates(index, issues) {
return [].concat(...issues.map(issue => [
{ index: { _index: index, _id: issue.id }},
issue
]));
}
/**
* Returns the bulk request body to update the cache key for the specified repo
* and page.
*/
function getCacheKeyUpdate(owner, repo, page, key) {
const id = `${owner}_${repo}_${page}`
return [
{ index: { _index: CACHE_INDEX, _id: id }},
{ owner, repo, page, key }
];
}
/**
* Processes a GitHub response for the specified page of issues.
* This will convert all issues to the desired format, store them into
* Elasticsearch and update the cache key, we got from GitHub.
*/
async function processGitHubIssues(owner, repo, response, page, indexName, logDisplayName) {
console.log(`[${logDisplayName}#${page}] Found ${response.data.length} issues`);
if (response.data.length > 0) {
const issues = response.data.map(issue => convertIssue(owner, repo, issue));
const bulkIssues = getIssueBulkUpdates(indexName, issues);
const updateCacheKey = getCacheKeyUpdate(owner, repo, page, response.headers.etag);
const body = [...bulkIssues, ...updateCacheKey];
console.log(`[${logDisplayName}#${page}] Writing issues and new cache key "${response.headers.etag}" to Elasticsearch`);
const esResult = await client.bulk({ body });
if (esResult.body.errors) {
console.warn(`[${logDisplayName}#${page}] [ERROR] ${JSON.stringify(esResult.body, null, 2)}`);
}
if (esResult.warnings?.length > 0) {
esResult.warnings.forEach(warning => console.warn(`[${logDisplayName}#${page}] [WARN] ${warning}`));
}
}
}
/**
* Load the existing cache for the specified repository. The result will be
* in the format { [pageNr]: 'cacheKey' }.
*/
async function loadCacheForRepo(owner, repo) {
const { body } = await client.search({
index: CACHE_INDEX,
_source: ['page', 'key'],
size: 10000,
body: {
query: {
bool: {
filter: [
{ match: { owner } },
{ match: { repo } }
]
}
}
}
});
return body.hits.hits.reduce((cache, entry) => {
cache[entry._source.page] = entry._source.key;
return cache;
}, {});
}
async function main() {
async function handleRepository(repository, displayName = repository, isPrivate = false) {
console.log(`[${displayName}] Processing repository ${displayName}`);
const [ owner, repo ] = repository.split('/');
console.log(`[${displayName}] Loading cache entries...`);
const cache = await loadCacheForRepo(owner, repo);
console.log(`[${displayName}] Found ${Object.keys(cache).length} cache entries`);
let page = 1;
let shouldCheckNextPage = true;
while(shouldCheckNextPage) {
console.log(`[${displayName}#${page}] Requesting issues using etag: ${cache[page]}`);
try {
const headers = cache[page] ? { 'If-None-Match': cache[page] } : {};
const response = await octokit.issues.listForRepo({
owner,
repo,
page,
per_page: 100,
state: 'all',
sort: 'created',
direction: 'asc',
headers: headers
});
console.log(`[${displayName}#${page}] Remaining request limit: %s/%s`,
response.headers['x-ratelimit-remaining'],
response.headers['x-ratelimit-limit']
);
const indexName = isPrivate ? `private-issues-${owner}-${repo}` : `issues-${owner}-${repo}`;
await processGitHubIssues(owner, repo, response, page, indexName, displayName);
shouldCheckNextPage = response.headers.link && response.headers.link.includes('rel="next"');
page++;
} catch (error) {
if (error.name === 'HttpError' && error.status === 304) {
// Ignore not modified responses and continue with the next page.
console.log(`[${displayName}#${page}] Page was not modified. Continue with next page.`);
page++;
continue;
}
if(error.request && error.request.request.retryCount) {
console.error(`[${displayName}#${page}] Failed request for page after ${error.request.request.retryCount} retries.`);
console.error(`[${displayName}#${page}] ${error.toString()}`);
} else {
console.error(error);
}
throw error;
}
}
}
const results = await Promise.allSettled([
...config.repos.map(rep => handleRepository(rep)),
...(config.privateRepos.length > 0 ? config.privateRepos.map((rep, index) => handleRepository(rep, `PRIVATE_REPOS[${index}]`, true)) : [])
]);
if (results.some(({ status }) => status === 'rejected')) {
process.exit(1);
}
}
main();