Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion jobs/initJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CURRENT_PRICE_REPEAT_DELAY } from 'constants/index'
import { Queue } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import EventEmitter from 'events'
import { syncCurrentPricesWorker } from './workers'
import { syncCurrentPricesWorker, syncBlockchainAndPricesWorker } from './workers'

EventEmitter.defaultMaxListeners = 20

Expand All @@ -20,6 +20,10 @@ const main = async (): Promise<void> => {
)

await syncCurrentPricesWorker(pricesQueue.name)

const blockchainQueue = new Queue('blockchainSync', { connection: redisBullMQ })
await blockchainQueue.add('syncBlockchainAndPrices', {}, { jobId: 'syncBlockchainAndPrices' })
await syncBlockchainAndPricesWorker(blockchainQueue.name)
}

void main()
39 changes: 36 additions & 3 deletions jobs/workers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Worker } from 'bullmq'
import { redisBullMQ } from 'redis/clientInstance'
import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index'
import { multiBlockchainClient } from 'services/chronikService'
import { connectAllTransactionsToPrices } from 'services/transactionService'

import * as priceService from 'services/priceService'

Expand All @@ -17,13 +19,44 @@ export const syncCurrentPricesWorker = async (queueName: string): Promise<void>
}
)
worker.on('completed', job => {
console.log(`syncing of ${job.data.syncType as string} prices finished`)
console.log('syncing of current prices finished')
})

worker.on('failed', (job, err) => {
if (job !== undefined) {
console.log(`syncing of ${job.data.syncType as string} prices FAILED`)
console.log(`error for initial syncing of ${job.data.syncType as string} prices: ${err.message}`)
console.log('syncing of current prices FAILED')
console.log(`error for initial syncing of current prices: ${err.message}`)
}
})
}

export const syncBlockchainAndPricesWorker = async (queueName: string): Promise<void> => {
const worker = new Worker(
queueName,
async (job) => {
console.log(`job ${job.id as string}: syncing missed transactions and connecting prices...`)
await multiBlockchainClient.syncMissedTransactions()
await connectAllTransactionsToPrices()
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
}
)

worker.on('completed', (job) => {
// teardown
void (async () => {
console.log('Cleaning up MultiBlockchainClient global instance...')
await multiBlockchainClient.destroy()
console.log('Done.')
console.log(`job ${job.id as string}: blockchain + prices sync finished`)
})()
})
Comment on lines +47 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fire-and-forget async cleanup may not complete before process exit.

The void (async () => {...})() pattern in the completed handler creates an async operation that runs detached from the worker lifecycle. This means:

  • The destroy() call may not complete before the process exits
  • The completion log on Line 53 may print before cleanup finishes
  • Errors during cleanup are swallowed

Move the cleanup to the job handler with proper awaiting:

  const worker = new Worker(
    queueName,
    async (job) => {
-      console.log(`job ${job.id as string}: syncing missed transactions and connecting prices...`)
-      await multiBlockchainClient.syncMissedTransactions()
-      await connectAllTransactionsToPrices()
+      try {
+        console.log(`job ${job.id as string}: syncing missed transactions and connecting prices...`)
+        await multiBlockchainClient.syncMissedTransactions()
+        await connectAllTransactionsToPrices()
+      } finally {
+        console.log('Cleaning up MultiBlockchainClient global instance...')
+        await multiBlockchainClient.destroy()
+        console.log('Done.')
+      }
    },
    {
      connection: redisBullMQ,
      lockDuration: DEFAULT_WORKER_LOCK_DURATION
    }
  )

  worker.on('completed', (job) => {
-    // teardown
-    void (async () => {
-      console.log('Cleaning up MultiBlockchainClient global instance...')
-      await multiBlockchainClient.destroy()
-      console.log('Done.')
-      console.log(`job ${job.id as string}: blockchain + prices sync finished`)
-    })()
+    console.log(`job ${job.id as string}: blockchain + prices sync finished`)
  })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
worker.on('completed', (job) => {
// teardown
void (async () => {
console.log('Cleaning up MultiBlockchainClient global instance...')
await multiBlockchainClient.destroy()
console.log('Done.')
console.log(`job ${job.id as string}: blockchain + prices sync finished`)
})()
})
const worker = new Worker(
queueName,
async (job) => {
try {
console.log(`job ${job.id as string}: syncing missed transactions and connecting prices...`)
await multiBlockchainClient.syncMissedTransactions()
await connectAllTransactionsToPrices()
} finally {
console.log('Cleaning up MultiBlockchainClient global instance...')
await multiBlockchainClient.destroy()
console.log('Done.')
}
},
{
connection: redisBullMQ,
lockDuration: DEFAULT_WORKER_LOCK_DURATION
}
)
worker.on('completed', (job) => {
console.log(`job ${job.id as string}: blockchain + prices sync finished`)
})


worker.on('failed', (job, err) => {
if (job != null) {
console.error(`job ${job.id as string}: FAILED — ${err.message}`)
}
})
}
6 changes: 3 additions & 3 deletions scripts/docker-exec-shortcuts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case "$command" in
eval "$base_command_db" mariadb-dump -h "$MAIN_DB_HOST" -u root -p"$MAIN_DB_ROOT_PASSWORD" "$@"
;;
"databaseshell" | "dbs")
eval "$base_command_db" bash -l "$@"
eval "$base_command_db" bash -c bash -l "$@"
;;
Comment on lines 51 to 53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix shell invocation: 'bash -c bash -l' is not a login shell; also risks argument loss

  • bash -c bash -l executes a non-login bash; -l is not applied to the spawned shell. This breaks expected login env loading compared to previous bash -l.
  • For ns/rns, arguments are dropped; for dbs, quoting with eval is brittle. ShellCheck flags this (SC2294). Based on static analysis hints.

Apply this minimal fix:

-        eval "$base_command_db" bash -c bash -l "$@"
+        eval "$base_command_db" bash -l "$@"
-        eval "$base_command_node" bash -c bash -l
+        eval "$base_command_node" bash -l "$@"
-        eval "$base_command_node_root" bash -c bash -l
+        eval "$base_command_node_root" bash -l "$@"

Optional hardening (drop eval to preserve whitespace/args and avoid injection pitfalls):

-        eval "$base_command_db" bash -l "$@"
+        docker exec -it "$db_container_name" bash -l "$@"
-        eval "$base_command_node" bash -l "$@"
+        docker exec -it "$node_container_name" bash -l "$@"
-        eval "$base_command_node_root" bash -l "$@"
+        docker exec -it -u 0 "$node_container_name" bash -l "$@"

Also applies to: 72-74, 75-77

🧰 Tools
🪛 Shellcheck (0.11.0)

[warning] 52-52: eval negates the benefit of arrays. Drop eval to preserve whitespace/symbols (or eval as string).

(SC2294)

🤖 Prompt for AI Agents
In scripts/docker-exec-shortcuts.sh around lines 51-53 (and similarly update
72-74, 75-77), the current invocation uses `eval` and `bash -c bash -l "$@"`
which does not start a true login shell and risks dropping or mangling
arguments; replace this with a direct docker exec invocation that starts bash as
a login shell and passes arguments safely: invoke bash with the -l option before
-c (so the spawned shell is a login shell), use -c to run the provided command,
include -- to stop option parsing, and pass the script's "$@" (or "$*" only if
you intentionally want a single string) directly without eval to preserve
whitespace and avoid injection; apply the same pattern to the ns/rns/dbs
branches.

"databasetest" | "dbt")
eval "$base_command_db" mariadb -h "$MAIN_DB_HOST" -u "$MAIN_DB_USER"-test -p"$MAIN_DB_PASSWORD" -D "$MAIN_DB_NAME"-test "$@"
Expand All @@ -70,10 +70,10 @@ case "$command" in
eval "$base_command_node" yarn test --coverage --verbose "$@"
;;
"nodeshell" | "ns")
eval "$base_command_node" bash -l
eval "$base_command_node" bash -c bash -l
;;
"rootnodeshell" | "rns")
eval "$base_command_node_root" bash -l
eval "$base_command_node_root" bash -c bash -l
;;
"jobslogs" | "jl")
eval "$base_command_node" pm2 logs jobs
Expand Down
52 changes: 29 additions & 23 deletions services/chronikService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
upsertTransaction,
getSimplifiedTransactions,
getSimplifiedTrasaction,
connectAllTransactionsToPrices,
updateClientPaymentStatus,
getClientPayment
} from './transactionService'
Expand All @@ -28,7 +27,6 @@ import { OpReturnData, parseError, parseOpReturnData } from 'utils/validators'
import { executeAddressTriggers, executeTriggersBatch } from './triggerService'
import { appendTxsToFile } from 'prisma-local/seeds/transactions'
import { PHASE_PRODUCTION_BUILD } from 'next/dist/shared/lib/constants'
import { syncPastDaysNewerPrices } from './priceService'
import { AddressType } from 'ecashaddrjs/dist/types'
import { DecimalJsLike } from '@prisma/client/runtime/library'

Expand Down Expand Up @@ -957,25 +955,14 @@ class MultiBlockchainClient {
console.log('Initializing MultiBlockchainClient...')
this.initializing = true
void (async () => {
if (this.isRunningApp()) {
await syncPastDaysNewerPrices()
const asyncOperations: Array<Promise<void>> = []
this.clients = {
ecash: this.instantiateChronikClient('ecash', asyncOperations),
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
}
await Promise.all(asyncOperations)
this.setInitialized()
await connectAllTransactionsToPrices()
} else if (process.env.NODE_ENV === 'test') {
const asyncOperations: Array<Promise<void>> = []
this.clients = {
ecash: this.instantiateChronikClient('ecash', asyncOperations),
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
}
await Promise.all(asyncOperations)
this.setInitialized()
const asyncOperations: Array<Promise<void>> = []
this.clients = {
ecash: this.instantiateChronikClient('ecash', asyncOperations),
bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
}
await Promise.all(asyncOperations)
this.setInitialized()
console.log('Finished initializing MultiBlockchainClient.')
})()
Comment on lines +958 to 966
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Race: Multi setInitialized before per-client Chronik is ready

Promise.all(asyncOperations) may be empty (e.g., JOBS_ENV set), so setInitialized() flips flags before ChronikBlockchainClient finishes its async constructor (before this.chronik is set). Downstream syncMissedTransactions() can then run against undefined this.chronik.

Always gate Multi init on each client's waitForLatencyTest():

   void (async () => {
-      const asyncOperations: Array<Promise<void>> = []
+      const asyncOperations: Array<Promise<void>> = []
       this.clients = {
-        ecash: this.instantiateChronikClient('ecash', asyncOperations),
-        bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
+        ecash: this.instantiateChronikClient('ecash', asyncOperations),
+        bitcoincash: this.instantiateChronikClient('bitcoincash', asyncOperations)
       }
-      await Promise.all(asyncOperations)
+      // Always wait for base client readiness (chronik selected).
+      asyncOperations.push(this.clients.ecash.waitForLatencyTest())
+      asyncOperations.push(this.clients.bitcoincash.waitForLatencyTest())
+      await Promise.all(asyncOperations)
       this.setInitialized()
       console.log('Finished initializing MultiBlockchainClient.')
   })()

Alternatively, push waitForLatencyTest() inside instantiateChronikClient unconditionally.

🤖 Prompt for AI Agents
In services/chronikService.ts around lines 958-966, the code calls
Promise.all(asyncOperations) which can be empty so setInitialized() may run
before each ChronikBlockchainClient finishes async setup (this.chronik
undefined); ensure Multi init awaits each client's readiness by adding each
instantiated client's waitForLatencyTest() promise to asyncOperations (or
unconditionally call await client.waitForLatencyTest() inside
instantiateChronikClient), then await Promise.all(asyncOperations) before
calling setInitialized() so downstream code never sees an uninitialized
this.chronik.

}

Expand Down Expand Up @@ -1020,9 +1007,6 @@ class MultiBlockchainClient {
await newClient.waitForLatencyTest()
console.log(`[CHRONIK — ${networkSlug}] Subscribing addresses in database...`)
await newClient.subscribeInitialAddresses()
console.log(`[CHRONIK — ${networkSlug}] Syncing missed transactions...`)
await newClient.syncMissedTransactions()
console.log(`[CHRONIK — ${networkSlug}] Finished instantiating client.`)
})()
)
} else if (process.env.NODE_ENV === 'test') {
Expand All @@ -1033,6 +1017,7 @@ class MultiBlockchainClient {
)
}

console.log(`Finished instantiating ${networkSlug} client.`)
return newClient
}

Expand Down Expand Up @@ -1090,10 +1075,31 @@ class MultiBlockchainClient {
return await this.clients[networkSlug as MainNetworkSlugsType].getBalance(address)
}

public async syncMissedTransactions (): Promise<void> {
await this.waitForStart()
await Promise.all([
this.clients.ecash.syncMissedTransactions(),
this.clients.bitcoincash.syncMissedTransactions()
])
}
Comment on lines +1078 to +1084
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard sync against not-yet-ready clients

Even with Multi’s wait, explicitly ensure per-client readiness before syncing:

   public async syncMissedTransactions (): Promise<void> {
-    await this.waitForStart()
-    await Promise.all([
+    await this.waitForStart()
+    await Promise.all([
+      this.clients.ecash.waitForLatencyTest(),
+      this.clients.bitcoincash.waitForLatencyTest()
+    ])
+    await Promise.all([
       this.clients.ecash.syncMissedTransactions(),
       this.clients.bitcoincash.syncMissedTransactions()
     ])
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public async syncMissedTransactions (): Promise<void> {
await this.waitForStart()
await Promise.all([
this.clients.ecash.syncMissedTransactions(),
this.clients.bitcoincash.syncMissedTransactions()
])
}
public async syncMissedTransactions (): Promise<void> {
await this.waitForStart()
await Promise.all([
this.clients.ecash.waitForLatencyTest(),
this.clients.bitcoincash.waitForLatencyTest()
])
await Promise.all([
this.clients.ecash.syncMissedTransactions(),
this.clients.bitcoincash.syncMissedTransactions()
])
}
🤖 Prompt for AI Agents
In services/chronikService.ts around lines 1078–1084, the method calls
syncMissedTransactions on clients immediately after Multi.waitForStart; change
it to explicitly ensure each client is ready before syncing by awaiting each
client's readiness (e.g., await this.clients.ecash.waitForStart() and await
this.clients.bitcoincash.waitForStart() or checking an isReady flag) and then
call their syncMissedTransactions; handle each client individually (try/catch
per client) so one failing or not-ready client does not block the other.


public async syncAndSubscribeAddresses (addresses: Address[]): Promise<SyncAndSubscriptionReturn> {
await this.subscribeAddresses(addresses)
return await this.syncAddresses(addresses)
}

public async destroy (): Promise<void> {
await Promise.all(
Object.values(this.clients).map(async (c) => {
try {
c.chronikWSEndpoint.close()
c.wsEndpoint.close()
} catch (err: any) {
console.error(`Failed to close connections for client: ${err.message as string}`)
}
})
)
}
}

export interface NodeJsGlobalMultiBlockchainClient extends NodeJS.Global {
Expand Down