middleware/cron/syncTransactionHistory.js (277 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
const path = require('path');
require("dotenv").config({
path: path.resolve(__dirname, '.env')
})
const cron = require('node-cron');
const axios = require('axios');
const sqlite3 = require('sqlite3').verbose();
const fs = require('fs');
const { getEnv } = require('../utils/envParser');
const logger = require('../utils/logger');
const BATCH_SIZE = 1000
// Create data directory if it doesn't exist
const dataDir = path.join(__dirname, '../cache');
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}
// Initialize SQLite database
const dbPath = path.join(dataDir, 'transactions.db');
const db = new sqlite3.Database(dbPath);
/**
* Initialize the database schema
*/
function initializeDatabase() {
return new Promise((resolve, reject) => {
logger.info('Initializing transaction history database');
try {
db.serialize(() => {
// Create transactions table
db.run(`
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
block_id INTEGER NOT NULL,
volume INTEGER NOT NULL,
created_at TEXT NOT NULL
)
`, (err) => {
if (err) {
logger.error('Error creating transactions table:', err);
reject(err);
return;
}
// Create index on block_id
db.run('CREATE INDEX IF NOT EXISTS idx_transactions_block_id ON transactions(block_id)', (err) => {
if (err) {
logger.error('Error creating block_id index:', err);
reject(err);
return;
}
// Create index on created_at for time-based queries
db.run('CREATE INDEX IF NOT EXISTS idx_transactions_created_at ON transactions(created_at)', (err) => {
if (err) {
logger.error('Error creating created_at index:', err);
reject(err);
return;
}
// Verify database is accessible with a simple query
db.get('SELECT count(*) as count FROM transactions', (err, row) => {
if (err) {
logger.error('Error verifying database access:', err);
reject(err);
} else {
logger.info(`Database initialized successfully. Current record count: ${row?.count || 0}`);
resolve();
}
});
});
});
});
});
} catch (err) {
logger.error('Unexpected error during database initialization:', err);
reject(err);
}
});
}
/**
* Get the highest block ID we've processed so far
*/
function getLastProcessedBlockId() {
return new Promise((resolve, reject) => {
db.get('SELECT MAX(block_id) as last_id FROM transactions', (err, row) => {
if (err) {
logger.error('Error getting last processed block ID:', err);
reject(err);
} else {
resolve(row?.last_id || 0);
}
});
});
}
/**
* Get the total number of blocks from the explorer
*/
async function getTotalBlocks() {
try {
const baseUrl = `${getEnv("EXPLORER_BASE_URL")}/populatetable`;
const response = await axios.get(baseUrl);
const data = (Array.isArray(response?.data) && response?.data.length > 0) ? response?.data[0] : { numBlocks: 0}
return data?.blockNum || 0;
} catch (error) {
logger.error('Error fetching total blocks count:', error);
throw error;
}
}
/**
* Fetch a range of blocks from the explorer
*/
async function fetchBlockRange(start, end) {
try {
const baseUrl = `${getEnv("EXPLORER_BASE_URL")}/v1/blocks/${start}/${end}`;
const response = await axios.get(baseUrl);
return response.data || [];
} catch (error) {
logger.error(`Error fetching blocks from ${start} to ${end}:`, error);
throw error;
}
}
/**
* Store transactions from blocks in the database
*/
function storeTransactions(blocks) {
return new Promise((resolve, reject) => {
const timestamp = Math.floor(Date.now() / 1000);
db.serialize(() => {
const stmt = db.prepare(`
INSERT INTO transactions
(block_id,volume,created_at)
VALUES (?, ?, ?)
`);
let transactionCount = 0;
blocks.forEach(block => {
const { id,createdAt,transactions } = block;
stmt.run(
id,
Array.isArray(transactions) ? transactions?.length : 0,
createdAt,
);
});
stmt.finalize(err => {
if (err) {
logger.error('Error storing transactions:', err);
reject(err);
} else {
logger.info(`Successfully stored ${transactionCount} transactions from ${blocks.length} blocks`);
resolve(transactionCount);
}
});
});
});
}
/**
* Utility function to introduce a delay
*
* @param {number} ms - Milliseconds to wait
* @returns {Promise<void>} A promise that resolves after the specified time
*/
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* Main sync function to fetch and store transaction data
*/
async function syncTransactionHistory() {
try {
logger.info('Starting transaction history sync job');
// Get current state
const lastProcessedId = await getLastProcessedBlockId();
const totalBlocks = await getTotalBlocks();
logger.info(`Last processed block ID: ${lastProcessedId}, Total blocks: ${totalBlocks}`);
if (lastProcessedId < totalBlocks) {
const batchSize = BATCH_SIZE;
let currentStart = lastProcessedId + 1;
// Fetch and store blocks in batches
while (currentStart <= totalBlocks) {
const currentEnd = Math.min(currentStart + batchSize - 1, totalBlocks);
logger.info(`Fetching blocks from ${currentStart} to ${currentEnd}`);
const blocks = await fetchBlockRange(currentStart, currentEnd);
await storeTransactions(blocks);
// Add 1 second delay to avoid overwhelming the database or API
await sleep(1000);
currentStart = currentEnd + 1;
}
logger.info('Transaction history sync completed successfully');
} else {
logger.info('Transaction history is already up to date');
}
} catch (error) {
logger.error('Error syncing transaction history:', error.message);
}
}
/**
* Initialize the service and start the cron job
*
* @param {Object} options - Configuration options
* @param {boolean} options.scheduleJob - Whether to schedule the recurring job
* @param {string} options.cronSchedule - Cron schedule expression
*/
async function initSyncService(options = {}) {
const { scheduleJob = true, cronSchedule = '0 * * * *' } = options;
try {
// Initialize database
await initializeDatabase();
// Run initial sync
await syncTransactionHistory();
// Schedule recurring job if requested
if (scheduleJob) {
cron.schedule(cronSchedule, async () => {
logger.info(`Running scheduled transaction history sync (${cronSchedule})`);
await syncTransactionHistory();
});
logger.info(`Transaction history sync service initialized and scheduled (${cronSchedule})`);
} else {
logger.info('Transaction history sync completed (one-time run)');
}
} catch (error) {
logger.error('Failed to initialize transaction history sync service:', error);
}
}
/**
* Display the command usage information
*/
function showUsage() {
console.log(`
Transaction History Sync Tool
Usage:
node syncTransactionHistory.js [command] [options]
Commands:
start Start the service with scheduled cron job (default)
run-once Run the sync once without scheduling a cron job
status Show the current sync status
help Show this help message
Options:
--schedule=<cron> Custom cron schedule (default: "0 * * * *")
Example: --schedule="*/30 * * * *" for every 30 minutes
Examples:
node syncTransactionHistory.js start
node syncTransactionHistory.js run-once
node syncTransactionHistory.js start --schedule="0 */2 * * *"
node syncTransactionHistory.js status
`);
}
/**
* Check and display the current sync status
*/
async function showStatus() {
try {
const lastProcessedId = await getLastProcessedBlockId();
const totalBlocks = await getTotalBlocks();
const syncProgress = lastProcessedId > 0
? ((lastProcessedId / totalBlocks) * 100).toFixed(2)
: 0;
console.log('\nTransaction History Sync Status:');
console.log('--------------------------------');
console.log(`Last processed block ID: ${lastProcessedId}`);
console.log(`Total blocks available: ${totalBlocks}`);
console.log(`Sync progress: ${syncProgress}%`);
console.log(`Blocks remaining: ${totalBlocks - lastProcessedId}`);
console.log('--------------------------------\n');
} catch (error) {
logger.error('Error getting sync status:', error);
}
}
// Process command line arguments
if (require.main === module) {
const args = process.argv.slice(2);
const command = args[0] || 'start';
// Parse options
const options = {};
args.forEach(arg => {
if (arg.startsWith('--schedule=')) {
options.cronSchedule = arg.substring(11);
}
});
// Execute appropriate command
switch (command) {
case 'start':
logger.info('Starting transaction history sync service with scheduler');
initSyncService(options);
break;
case 'run-once':
logger.info('Running transaction history sync once');
initSyncService({ ...options, scheduleJob: false });
break;
case 'status':
showStatus();
break;
case 'help':
showUsage();
break;
default:
console.log(`Unknown command: ${command}`);
showUsage();
break;
}
}
module.exports = {
initSyncService,
syncTransactionHistory,
getLastProcessedBlockId,
getTotalBlocks
};