diff --git a/jobs/initJobs.ts b/jobs/initJobs.ts index e7749fefc..a763cfe06 100644 --- a/jobs/initJobs.ts +++ b/jobs/initJobs.ts @@ -2,7 +2,7 @@ import { CURRENT_PRICE_REPEAT_DELAY } from 'constants/index' import { Queue } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import EventEmitter from 'events' -import { syncCurrentPricesWorker } from './workers' +import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker } from './workers' EventEmitter.defaultMaxListeners = 20 @@ -20,6 +20,10 @@ const main = async (): Promise => { ) await syncCurrentPricesWorker(pricesQueue.name) + + const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ }) + await blockchainQueue.add('syncBlockchainAndPrices', {}, { jobId: 'syncBlockchainAndPrices' }) + await syncBlockchainAndPricesWorker(blockchainQueue.name) } void main() diff --git a/jobs/workers.ts b/jobs/workers.ts index 7a76695ce..833ebca15 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -1,6 +1,8 @@ import { Worker } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' +import { multiBlockchainClient } from 'services/chronikService' +import { connectAllTransactionsToPrices } from 'services/transactionService' import * as priceService from 'services/priceService' @@ -17,13 +19,44 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise } ) worker.on('completed', job => { - console.log(`syncing of ${job.data.syncType as string} prices finished`) + console.log('syncing of current prices finished') }) worker.on('failed', (job, err) => { if (job !== undefined) { - console.log(`syncing of ${job.data.syncType as string} prices FAILED`) - console.log(`error for initial syncing of ${job.data.syncType as string} prices: ${err.message}`) + console.log('syncing of current prices FAILED') + console.log(`error for initial syncing of current prices: ${err.message}`) + } + }) +} + +export const syncBlockchainAndPricesWorker = async (queueName: string): Promise => { + const worker = new Worker( + queueName, + async (job) => { + console.log(`job ${job.id as string}: syncing missed transactions and connecting prices...`) + await multiBlockchainClient.syncMissedTransactions() + await connectAllTransactionsToPrices() + }, + { + connection: redisBullMQ, + lockDuration: DEFAULT_WORKER_LOCK_DURATION + } + ) + + worker.on('completed', (job) => { + // teardown + void (async () => { + console.log('Cleaning up MultiBlockchainClient global instance...') + await multiBlockchainClient.destroy() + console.log('Done.') + console.log(`job ${job.id as string}: blockchain + prices sync finished`) + })() + }) + + worker.on('failed', (job, err) => { + if (job != null) { + console.error(`job ${job.id as string}: FAILED — ${err.message}`) } }) } diff --git a/scripts/docker-exec-shortcuts.sh b/scripts/docker-exec-shortcuts.sh index bce1078b7..9a4ff0992 100755 --- a/scripts/docker-exec-shortcuts.sh +++ b/scripts/docker-exec-shortcuts.sh @@ -49,7 +49,7 @@ case "$command" in eval "$base_command_db" mariadb-dump -h "$MAIN_DB_HOST" -u root -p"$MAIN_DB_ROOT_PASSWORD" "$@" ;; "databaseshell" | "dbs") - eval "$base_command_db" bash -l "$@" + eval "$base_command_db" bash -c bash -l "$@" ;; "databasetest" | "dbt") eval "$base_command_db" mariadb -h "$MAIN_DB_HOST" -u "$MAIN_DB_USER"-test -p"$MAIN_DB_PASSWORD" -D "$MAIN_DB_NAME"-test "$@" @@ -70,10 +70,10 @@ case "$command" in eval "$base_command_node" yarn test --coverage --verbose "$@" ;; "nodeshell" | "ns") - eval "$base_command_node" bash -l + eval "$base_command_node" bash -c bash -l ;; "rootnodeshell" | "rns") - eval "$base_command_node_root" bash -l + eval "$base_command_node_root" bash -c bash -l ;; "jobslogs" | "jl") eval "$base_command_node" pm2 logs jobs diff --git a/services/chronikService.ts b/services/chronikService.ts index 6dfa9fb15..1ef78561f 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -11,7 +11,6 @@ import { upsertTransaction, getSimplifiedTransactions, getSimplifiedTrasaction, - connectAllTransactionsToPrices, updateClientPaymentStatus, getClientPayment } from './transactionService' @@ -28,7 +27,6 @@ import { OpReturnData, parseError, parseOpReturnData } from 'utils/validators' import { executeAddressTriggers, executeTriggersBatch } from './triggerService' import { appendTxsToFile } from 'prisma-local/seeds/transactions' import { PHASE_PRODUCTION_BUILD } from 'next/dist/shared/lib/constants' -import { syncPastDaysNewerPrices } from './priceService' import { AddressType } from 'ecashaddrjs/dist/types' import { DecimalJsLike } from '@prisma/client/runtime/library' @@ -957,25 +955,14 @@ class MultiBlockchainClient { console.log('Initializing MultiBlockchainClient...') this.initializing = true void (async () => { - if (this.isRunningApp()) { - await syncPastDaysNewerPrices() - const asyncOperations: Array> = [] - this.clients = { - ecash: this.instantiateChronikClient('ecash', asyncOperations), - bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations) - } - await Promise.all(asyncOperations) - this.setInitialized() - await connectAllTransactionsToPrices() - } else if (process.env.NODE_ENV === 'test') { - const asyncOperations: Array> = [] - this.clients = { - ecash: this.instantiateChronikClient('ecash', asyncOperations), - bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations) - } - await Promise.all(asyncOperations) - this.setInitialized() + const asyncOperations: Array> = [] + this.clients = { + ecash: this.instantiateChronikClient('ecash', asyncOperations), + bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations) } + await Promise.all(asyncOperations) + this.setInitialized() + console.log('Finished initializing MultiBlockchainClient.') })() } @@ -1020,9 +1007,6 @@ class MultiBlockchainClient { await newClient.waitForLatencyTest() console.log(`[CHRONIK — ${networkSlug}] Subscribing addresses in database...`) await newClient.subscribeInitialAddresses() - console.log(`[CHRONIK — ${networkSlug}] Syncing missed transactions...`) - await newClient.syncMissedTransactions() - console.log(`[CHRONIK — ${networkSlug}] Finished instantiating client.`) })() ) } else if (process.env.NODE_ENV === 'test') { @@ -1033,6 +1017,7 @@ class MultiBlockchainClient { ) } + console.log(`Finished instantiating ${networkSlug} client.`) return newClient } @@ -1090,10 +1075,31 @@ class MultiBlockchainClient { return await this.clients[networkSlug as MainNetworkSlugsType].getBalance(address) } + public async syncMissedTransactions (): Promise { + await this.waitForStart() + await Promise.all([ + this.clients.ecash.syncMissedTransactions(), + this.clients.bitcoincash.syncMissedTransactions() + ]) + } + public async syncAndSubscribeAddresses (addresses: Address[]): Promise { await this.subscribeAddresses(addresses) return await this.syncAddresses(addresses) } + + public async destroy (): Promise { + await Promise.all( + Object.values(this.clients).map(async (c) => { + try { + c.chronikWSEndpoint.close() + c.wsEndpoint.close() + } catch (err: any) { + console.error(`Failed to close connections for client: ${err.message as string}`) + } + }) + ) + } } export interface NodeJsGlobalMultiBlockchainClient extends NodeJS.Global {