diff --git a/index.js b/index.js index 4b2aa56a6..8446c2f8e 100644 --- a/index.js +++ b/index.js @@ -13,7 +13,6 @@ nodeVersionCheck(process.versions.node, packageData.engines.node); module.exports = require('./lib'); module.exports.nodeVersionCheck = nodeVersionCheck; module.exports.Node = require('./lib/node'); -module.exports.Transaction = require('./lib/transaction'); module.exports.Service = require('./lib/service'); module.exports.errors = require('./lib/errors'); diff --git a/lib/services/address/constants.js b/lib/services/address/constants.js index 3653e9cb7..3b34247a0 100644 --- a/lib/services/address/constants.js +++ b/lib/services/address/constants.js @@ -3,9 +3,10 @@ var exports = {}; exports.PREFIXES = { - OUTPUTS: new Buffer('02', 'hex'), // Query outputs by address and/or height - SPENTS: new Buffer('03', 'hex'), // Query inputs by address and/or height - SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output + SUMMARY: new Buffer('02', 'hex'), // Query the balance of an address + UNSPENT: new Buffer('03', 'hex'), // Query unspent output positions + PREV: new Buffer('05', 'hex'), // Query the address and satoshis for an output by txid and output index + TXIDS: new Buffer('06', 'hex') // Query all of the txids for an address }; exports.MEMPREFIXES = { @@ -29,6 +30,13 @@ exports.HASH_TYPES_READABLE = { '02': 'scripthash' }; +exports.ADDRESS_KEY_SIZE = 21; +exports.SPACER_SIZE = 1; +exports.PREFIX_SIZE = 1; +exports.HEIGHT_SIZE = 4; +exports.TXID_SIZE = 32; +exports.PAGE_SIZE = 10; + exports.HASH_TYPES_MAP = { 'pubkeyhash': exports.HASH_TYPES.PUBKEY, 'scripthash': exports.HASH_TYPES.REDEEMSCRIPT @@ -51,6 +59,8 @@ exports.MAX_HISTORY_QUERY_LENGTH = 100; exports.MAX_ADDRESSES_QUERY = 10000; // The maximum number of simultaneous requests exports.MAX_ADDRESSES_LIMIT = 5; +// The maximum number of transactions that can be queried with multiple addresses +exports.MAX_MULTI_HISTORY_COUNT = 1000000; module.exports = exports; diff --git a/lib/services/address/encoding.js b/lib/services/address/encoding.js index 8ebce51c9..34f4c327b 100644 --- a/lib/services/address/encoding.js +++ b/lib/services/address/encoding.js @@ -9,32 +9,104 @@ var $ = bitcore.util.preconditions; var exports = {}; -exports.encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) { - var outputIndexBuffer = new Buffer(4); +exports.encodeAddressTxIdKey = function(hashBuffer, hashTypeBuffer) { + return Buffer.concat([ + constants.PREFIXES.TXIDS, + hashBuffer, + hashTypeBuffer + ]); +}; + +exports.decodeAddressTxIdValue = function(buffer, page, range) { + var pos = 0; + if (page) { + pos = 36 * page * range; + } + var txids = []; + while (pos < buffer.length) { + var height = buffer.readUInt32BE(pos); + var txid = buffer.slice(pos, pos + 32); + txids.push({ + height: height, + txid: txid + }); + pos += 36; + } + return txids; +}; + +exports.encodePrevOutputKey = function(prevTxIdBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(new Array(4)); outputIndexBuffer.writeUInt32BE(outputIndex); - var key = Buffer.concat([ - txidBuffer, + return Buffer.concat([ + constants.PREFIXES.PREV, + prevTxIdBuffer, outputIndexBuffer ]); - return key.toString('binary'); }; -exports.encodeMempoolAddressIndexKey = function(hashBuffer, hashTypeBuffer) { - var key = Buffer.concat([ +exports.encodePrevOutputValue = function(hashBuffer, hashTypeBuffer, satoshis) { + var satoshisBuffer = new Buffer(new Array(8)); + satoshisBuffer.writeDoubleBE(satoshis); + return Buffer.concat([ hashBuffer, hashTypeBuffer, + satoshisBuffer ]); - return key.toString('binary'); }; +exports.encodeSummaryKey = function(hashBuffer, hashTypeBuffer) { + return Buffer.concat([ + constants.PREFIXES.SUMMARY, + hashBuffer, + hashTypeBuffer + ]); +}; -exports.encodeOutputKey = function(hashBuffer, hashTypeBuffer, height, txidBuffer, outputIndex) { - var heightBuffer = new Buffer(4); - heightBuffer.writeUInt32BE(height); +/** + * This function is optimized to return address information about an output script + * without constructing a Bitcore Address instance. + * @param {Script} - An instance of a Bitcore Script + * @param {Network|String} - The network for the address + */ +exports.extractAddressInfoFromScript = function(script, network) { + $.checkArgument(network, 'Second argument is expected to be a network'); + var hashBuffer; + var addressType; + var hashTypeBuffer; + if (script.isPublicKeyHashOut()) { + hashBuffer = script.chunks[2].buf; + hashTypeBuffer = constants.HASH_TYPES.PUBKEY; + addressType = Address.PayToPublicKeyHash; + } else if (script.isScriptHashOut()) { + hashBuffer = script.chunks[1].buf; + hashTypeBuffer = constants.HASH_TYPES.REDEEMSCRIPT; + addressType = Address.PayToScriptHash; + } else if (script.isPublicKeyOut()) { + var pubkey = script.chunks[0].buf; + var address = Address.fromPublicKey(new PublicKey(pubkey), network); + hashBuffer = address.hashBuffer; + hashTypeBuffer = constants.HASH_TYPES.PUBKEY; + // pay-to-publickey doesn't have an address, however for compatibility + // purposes, we can create an address + addressType = Address.PayToPublicKeyHash; + } else { + return false; + } + return { + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + addressType: addressType + }; +}; + +exports.encodeUnspentKey = function(hashBuffer, hashTypeBuffer, height, txidBuffer, outputIndex) { var outputIndexBuffer = new Buffer(4); outputIndexBuffer.writeUInt32BE(outputIndex); + var heightBuffer = new Buffer(4); + heightBuffer.writeUInt32BE(height); var key = Buffer.concat([ - constants.PREFIXES.OUTPUTS, + constants.PREFIXES.UNSPENT, hashBuffer, hashTypeBuffer, constants.SPACER_MIN, @@ -53,21 +125,46 @@ exports.decodeOutputKey = function(buffer) { var spacer = reader.read(1); var height = reader.readUInt32BE(); var txid = reader.read(32); - var outputIndex = reader.readUInt32BE(); return { prefix: prefix, hashBuffer: hashBuffer, hashTypeBuffer: hashTypeBuffer, height: height, - txid: txid, + txid: txid + }; +}; + +exports.encodeOutputValue = function(outputIndex, satoshis) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + return outputIndexBuffer; +}; + +exports.decodeOutputValue = function(buffer) { + var outputIndex = buffer.readUInt32BE(0); + return { outputIndex: outputIndex }; }; -exports.encodeOutputValue = function(satoshis, scriptBuffer) { - var satoshisBuffer = new Buffer(8); - satoshisBuffer.writeDoubleBE(satoshis); - return Buffer.concat([satoshisBuffer, scriptBuffer]); +//////////////////////////////////////////////////////////////////////////////////// + +exports.encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var key = Buffer.concat([ + txidBuffer, + outputIndexBuffer + ]); + return key.toString('binary'); +}; + +exports.encodeMempoolAddressIndexKey = function(hashBuffer, hashTypeBuffer) { + var key = Buffer.concat([ + hashBuffer, + hashTypeBuffer, + ]); + return key.toString('binary'); }; exports.encodeOutputMempoolValue = function(satoshis, timestampBuffer, scriptBuffer) { @@ -76,15 +173,6 @@ exports.encodeOutputMempoolValue = function(satoshis, timestampBuffer, scriptBuf return Buffer.concat([satoshisBuffer, timestampBuffer, scriptBuffer]); }; -exports.decodeOutputValue = function(buffer) { - var satoshis = buffer.readDoubleBE(0); - var scriptBuffer = buffer.slice(8, buffer.length); - return { - satoshis: satoshis, - scriptBuffer: scriptBuffer - }; -}; - exports.decodeOutputMempoolValue = function(buffer) { var satoshis = buffer.readDoubleBE(0); var timestamp = buffer.readDoubleBE(8); @@ -259,7 +347,7 @@ exports.decodeSummaryCacheValue = function(buffer) { exports.getAddressInfo = function(addressStr) { var addrObj = bitcore.Address(addressStr); var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type]; - + return { hashBuffer: addrObj.hashBuffer, hashTypeBuffer: hashTypeBuffer, @@ -267,41 +355,4 @@ exports.getAddressInfo = function(addressStr) { }; }; -/** - * This function is optimized to return address information about an output script - * without constructing a Bitcore Address instance. - * @param {Script} - An instance of a Bitcore Script - * @param {Network|String} - The network for the address - */ -exports.extractAddressInfoFromScript = function(script, network) { - $.checkArgument(network, 'Second argument is expected to be a network'); - var hashBuffer; - var addressType; - var hashTypeBuffer; - if (script.isPublicKeyHashOut()) { - hashBuffer = script.chunks[2].buf; - hashTypeBuffer = constants.HASH_TYPES.PUBKEY; - addressType = Address.PayToPublicKeyHash; - } else if (script.isScriptHashOut()) { - hashBuffer = script.chunks[1].buf; - hashTypeBuffer = constants.HASH_TYPES.REDEEMSCRIPT; - addressType = Address.PayToScriptHash; - } else if (script.isPublicKeyOut()) { - var pubkey = script.chunks[0].buf; - var address = Address.fromPublicKey(new PublicKey(pubkey), network); - hashBuffer = address.hashBuffer; - hashTypeBuffer = constants.HASH_TYPES.PUBKEY; - // pay-to-publickey doesn't have an address, however for compatibility - // purposes, we can create an address - addressType = Address.PayToPublicKeyHash; - } else { - return false; - } - return { - hashBuffer: hashBuffer, - hashTypeBuffer: hashTypeBuffer, - addressType: addressType - }; -}; - module.exports = exports; diff --git a/lib/services/address/history.js b/lib/services/address/history.js index 2d1dcd345..37bd6a3e7 100644 --- a/lib/services/address/history.js +++ b/lib/services/address/history.js @@ -5,6 +5,7 @@ var async = require('async'); var _ = bitcore.deps._; var constants = require('./constants'); +var encoding = require('./encoding'); /** * This represents an instance that keeps track of data over a series of @@ -22,7 +23,7 @@ function AddressHistory(args) { this.addresses = [args.addresses]; } - this.maxHistoryQueryLength = args.options.maxHistoryQueryLength || constants.MAX_HISTORY_QUERY_LENGTH; + this.maxMultiHistoryCount = args.options.maxMultiHistoryCount || constants.MAX_MULTI_HISTORY_COUNT; this.maxAddressesQuery = args.options.maxAddressesQuery || constants.MAX_ADDRESSES_QUERY; this.maxAddressesLimit = args.options.maxAddressesLimit || constants.MAX_ADDRESSES_LIMIT; @@ -41,10 +42,25 @@ function AddressHistory(args) { this.detailedArray = []; } +AddressHistory.prototype._paginate = function(allTxids) { + var txids; + var totalCount = allTxids.length; + if (this.options.from >= 0 && this.options.to >= 0) { + var fromOffset = totalCount - this.options.from; + var toOffset = totalCount - this.options.to; + txids = allTxids.slice(toOffset, fromOffset); + } else { + txids = allTxids; + } + return txids; +}; + AddressHistory.prototype._mergeAndSortTxids = function(summaries) { var appearanceIds = {}; var unconfirmedAppearanceIds = {}; + // Slice the page starting with the most recent + for (var i = 0; i < summaries.length; i++) { var summary = summaries[i]; for (var key in summary.appearanceIds) { @@ -81,61 +97,62 @@ AddressHistory.prototype.get = function(callback) { } var opts = _.clone(this.options); - opts.noBalance = true; if (this.addresses.length === 1) { var address = this.addresses[0]; - self.node.services.address.getAddressSummary(address, opts, function(err, summary) { + + async.parallel({ + count: function(done) { + self.node.services.address.getTransactionCount(address, opts, done); + }, + txids: function(done) { + self.node.services.address.getTransactionIds(address, opts, done); + } + }, function(err, results) { if (err) { return callback(err); } - return self._paginateWithDetails.call(self, summary.txids, callback); + var txids = self._paginate(results.txids); + return self._getDetails.call(self, txids, results.count, callback); }); - } else { + } else { opts.fullTxList = true; + + var count = 0; + async.mapLimit( self.addresses, self.maxAddressesLimit, function(address, next) { - self.node.services.address.getAddressSummary(address, opts, next); + self.node.services.address.getTransactionIds(address, opts, function(err, txids) { + if (err) { + return next(err); + } + count += txids.length; + if (count > self.maxMultiHistoryCount) { + return next(new Error( + 'Maximum number of transactions reached for multiple address ' + + 'query, try querying by a single or fewer addresses.' + )); + } + next(null, txids); + }); }, - function(err, summaries) { + function(err, combinedTxids) { if (err) { return callback(err); } - var txids = self._mergeAndSortTxids(summaries); - return self._paginateWithDetails.call(self, txids, callback); + var sorted = self._mergeAndSortTxids(combinedTxids); + var txids = self._paginate(sorted); + return self._getDetails.call(self, txids, count, callback); } ); } - }; -AddressHistory.prototype._paginateWithDetails = function(allTxids, callback) { +AddressHistory.prototype._getDetails = function(txids, count, callback) { var self = this; - var totalCount = allTxids.length; - - // Slice the page starting with the most recent - var txids; - if (self.options.from >= 0 && self.options.to >= 0) { - var fromOffset = totalCount - self.options.from; - var toOffset = totalCount - self.options.to; - txids = allTxids.slice(toOffset, fromOffset); - } else { - txids = allTxids; - } - - // Verify that this query isn't too long - if (txids.length > self.maxHistoryQueryLength) { - return callback(new Error( - 'Maximum length query (' + self.maxHistoryQueryLength + ') exceeded for address(es): ' + - self.addresses.join(',') - )); - } - - // Reverse to include most recent at the top - txids.reverse(); async.eachSeries( txids, @@ -147,7 +164,7 @@ AddressHistory.prototype._paginateWithDetails = function(allTxids, callback) { return callback(err); } callback(null, { - totalCount: totalCount, + totalCount: count, items: self.detailedArray }); } @@ -173,6 +190,7 @@ AddressHistory.prototype.getDetailedInfo = function(txid, next) { return next(err); } + // TODO: optimize querying prevout info transaction.populateInputs(self.node.services.db, [], function(err) { if (err) { return next(err); diff --git a/lib/services/address/index.js b/lib/services/address/index.js index eef4ed072..f0b6ea9d0 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -21,9 +21,6 @@ var Address = bitcore.Address; var AddressHistory = require('./history'); var constants = require('./constants'); var encoding = require('./encoding'); -var InputsTransformStream = require('./streams/inputs-transform'); -var OutputsTransformStream = require('./streams/outputs-transform'); - /** * The Address Service builds upon the Database Service and the Bitcoin Service to add additional @@ -70,6 +67,8 @@ AddressService.dependencies = [ AddressService.prototype.start = function(callback) { var self = this; + self.store = self.node.services.db.store; + async.series([ function(next) { @@ -100,6 +99,45 @@ AddressService.prototype.start = function(callback) { }, next ); + }, + function(next) { + self.store.createCollection('addressSummary', { + storageEngine: { + wiredTiger: { + configString: 'type=lsm' + } + } + }, next); + }, + function(next) { + self.store.createCollection('prevOuts', { + storageEngine: { + wiredTiger: { + configString: 'type=lsm' + } + } + }, next); + }, + function(next) { + self.store.createCollection('addressTxids', { + storageEngine: { + wiredTiger: { + configString: 'type=lsm' + } + } + }, next); + }, + function(next) { + self.addressSummaryDb = self.store.collection('addressSummary'); + self.prevOutsDb = self.store.collection('prevOuts'); + self.addressTxidsDb = self.store.collection('addressTxids'); + self.addressTxidsDb.createIndex({a: 1, h: -1}, function(err) { + if (err) { + return next(err); + } + next(); + }); + } ], callback); @@ -144,12 +182,12 @@ AddressService.prototype._getDBPathFor = function(dbname) { AddressService.prototype.getAPIMethods = function() { return [ ['getBalance', this, this.getBalance, 2], - ['getOutputs', this, this.getOutputs, 2], - ['getUnspentOutputs', this, this.getUnspentOutputs, 2], - ['getInputForOutput', this, this.getInputForOutput, 2], - ['isSpent', this, this.isSpent, 2], + ['getAddressSummary', this, this.getAddressSummary, 2], + ['getTransactionIds', this, this.getTransactionIds, 2], + ['getTransactionCount', this, this.getTransactionCount, 2], ['getAddressHistory', this, this.getAddressHistory, 2], - ['getAddressSummary', this, this.getAddressSummary, 1] + ['getUnspentOutputs', this, this.getUnspentOutputs, 2], + ['getInputForOutput', this, this.getInputForOutput, 2] ]; }; @@ -442,135 +480,263 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { /** * The Database Service will run this function when blocks are connected and * disconnected to the chain during syncing and reorganizations. + * + * The database uses the following collections and indexes: + * + * Collection Document Indexes + * ------------ --------------------------------------- -------------------- + * summary _id: addressHash:type (BinaryData) + * balance: (Number) + * received: (Number) + * change: (Number) + * count: (Number) + * + * txids txid: txid (BinaryData) address:height + * address: addressHash:type (BinaryData) + * height: (Number) + * + * outputs _id: txid:outputIndex (BinaryData) + * address: addressHash:type (BinaryData) + * satoshis: (Number) + * + * unspentOuts _id: txid:outputIndex (BinaryData) + * address: addressHash:type (BinaryData) + * height: (Number) + * spent: txid:inputIndex:height (BinaryData) + * * @param {Block} block - An instance of a Bitcore Block - * @param {Boolean} addOutput - If the block is being removed or added to the chain + * @param {Boolean} add - If the block is being removed or added to the chain * @param {Function} callback */ -AddressService.prototype.blockHandler = function(block, addOutput, callback) { - var txs = block.transactions; +AddressService.prototype.blockHandler = function(block, add, finishBlock) { + // TODO: refactor into smaller functions + var self = this; + var height = block.__height; + var transactions = block.transactions; - var action = 'put'; - if (!addOutput) { - action = 'del'; - } + var summaryDeltas = {}; + var summaryTxids = {}; + var prevOutputs = {}; - var operations = []; + async.eachSeries(transactions, function processTransaction(transaction, nextTransaction) { + var txHash = transaction.hash; + var txHashBuffer = new Buffer(txHash, 'hex'); - var transactionLength = txs.length; - for (var i = 0; i < transactionLength; i++) { + var addressesInTransaction = {}; + var inputSatoshis = {}; - var tx = txs[i]; - var txid = tx.id; - var txidBuffer = new Buffer(txid, 'hex'); - var inputs = tx.inputs; - var outputs = tx.outputs; + async.series([ + function processInputs(inputsDone) { + if (transaction.isCoinbase()) { + return inputsDone(); + } + async.each(transaction.inputs, function processInput(input, nextInput) { + + function finish(key, satoshis) { + var balanceDiff = add ? satoshis * -1 : satoshis; + + // Keep track of incoming satoshis for address within this transaction + // for the purposes of calculating the change received + inputSatoshis[key] = satoshis; + + if (summaryDeltas[key]) { + summaryDeltas[key].balance += balanceDiff; + } else { + summaryDeltas[key] = { + balance: balanceDiff, + received: 0, + change: 0 + }; + } + addressesInTransaction[key] = true; + nextInput(); + } - // Subscription messages - var txmessages = {}; + var prevOutId = input.prevTxId.toString('hex') + input.outputIndex; + + if (prevOutputs[prevOutId]) { + var addressKey = prevOutputs[prevOutId][0].toString('hex'); + var satoshis = prevOutputs[prevOutId][1]; + finish(addressKey, satoshis); + } else { + self.prevOutsDb.findOne({_id: prevOutId}, function findPreviousOutput(err, prevOut) { + if (err) { + return nextInput(err); + } else if (!prevOut) { + return nextInput(); + } else { + var addressKey = prevOut.a.buffer.toString('hex'); + var satoshis = prevOut.s; + finish(addressKey, satoshis); + } + }); + } - var outputLength = outputs.length; - for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { - var output = outputs[outputIndex]; + }, inputsDone); + }, + function(outputsDone) { + async.forEachOf(transaction.outputs, function processOutput(output, outputIndex, nextOutput) { + var script = output.script; + if (!script) { + return nextOutput(); + } + var addressInfo = encoding.extractAddressInfoFromScript(script, self.node.network); + if (!addressInfo) { + return nextOutput(); + } - var script = output.script; + var key = Buffer.concat([addressInfo.hashBuffer, addressInfo.hashTypeBuffer]).toString('hex'); + var balanceDiff = add ? output.satoshis : output.satoshis * -1; - if(!script) { - log.debug('Invalid script'); - continue; - } + var change = 0; + if (inputSatoshis[key]) { + change = Math.min(inputSatoshis[key], output.satoshis); + inputSatoshis -= change; + } - var addressInfo = encoding.extractAddressInfoFromScript(script, this.node.network); - if (!addressInfo) { - continue; - } + if (summaryDeltas[key]) { + summaryDeltas[key].balance += balanceDiff; + summaryDeltas[key].received += balanceDiff; + summaryDeltas[key].change += add ? change : change * -1; + } else { + summaryDeltas[key] = { + balance: balanceDiff, + received: balanceDiff, + change: change + }; + } - // We need to use the height for indexes (and not the timestamp) because the - // the timestamp has unreliable sequential ordering. The next block - // can have a time that is previous to the previous block (however not - // less than the mean of the 11 previous blocks) and not greater than 2 - // hours in the future. - var key = encoding.encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer, - height, txidBuffer, outputIndex); - var value = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer); - operations.push({ - type: action, - key: key, - value: value - }); + addressesInTransaction[key] = true; - addressInfo.hashHex = addressInfo.hashBuffer.toString('hex'); + var prevOutId = txHash + outputIndex; + prevOutputs[prevOutId] = [ + Buffer.concat([addressInfo.hashBuffer, addressInfo.hashTypeBuffer]), + output.satoshis + ]; + nextOutput(); - // Collect data for subscribers - if (txmessages[addressInfo.hashHex]) { - txmessages[addressInfo.hashHex].outputIndexes.push(outputIndex); - } else { - txmessages[addressInfo.hashHex] = { - tx: tx, - height: height, - outputIndexes: [outputIndex], - addressInfo: addressInfo, - timestamp: block.header.timestamp - }; + }, outputsDone); } - - this.balanceEventHandler(block, addressInfo); - - } - - // Publish events to any subscribers for this transaction - for (var addressKey in txmessages) { - this.transactionEventHandler(txmessages[addressKey]); - } - - if(tx.isCoinbase()) { - continue; + ], function() { + for (var key in addressesInTransaction) { + if (summaryTxids[key]) { + summaryTxids[key].push(txHashBuffer); + } else { + summaryTxids[key] = [txHashBuffer]; + } + } + nextTransaction(); + }); + }, function(err) { + if (err) { + return finishBlock(err); } - for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) { - - var input = inputs[inputIndex]; - var inputHash; - var inputHashType; - - if (input.script.isPublicKeyHashIn()) { - inputHash = Hash.sha256ripemd160(input.script.chunks[1].buf); - inputHashType = constants.HASH_TYPES.PUBKEY; - } else if (input.script.isScriptHashIn()) { - inputHash = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf); - inputHashType = constants.HASH_TYPES.REDEEMSCRIPT; + // These bulk writes are designed to be able to be applied partially and then + // completed with another pass or rolled back to the previous block state. + + // Operations that insert new documents should also replace an existing document if + // it already exists. Likewise a document that is being removed should be able to be + // removed again without error. + + // All write information that is needed to parse a block should be handled internally + // to this block without depending on changing the state of the database. + + // In the typical sense of a database transaction, each block can be committed as a + // "transaction" that is atomic. This is achieved with a two-phase commit (e.g. + // https://docs.mongodb.org/manual/tutorial/perform-two-phase-commits/). We keep track + // of the blocks that have been applied as the final action. In the event that there is an error + // or crash while writing, we can recover and complete the write on a second pass. + + var outOperations = []; + for (var prevOutId in prevOutputs) { + var prevOut = prevOutputs[prevOutId]; + if (add) { + outOperations.push({ + replaceOne: { + filter: { + _id: prevOutId + }, + replacement: { + _id: prevOutId, // txid + outputIndex + a: prevOut[0], // address + s: prevOut[1] // script + }, + upsert: true + } + }); } else { - continue; + outOperations.push({ + deleteOne: { + filter: { + _id: prevOutId + } + } + }); } + } - var prevTxIdBuffer = new Buffer(input.prevTxId, 'hex'); - - // To be able to query inputs by address and spent height - var inputKey = encoding.encodeInputKey(inputHash, inputHashType, height, prevTxIdBuffer, input.outputIndex); - var inputValue = encoding.encodeInputValue(txidBuffer, inputIndex); - - operations.push({ - type: action, - key: inputKey, - value: inputValue - }); - - // To be able to search for an input spending an output - var inputKeyMap = encoding.encodeInputKeyMap(prevTxIdBuffer, input.outputIndex); - var inputValueMap = encoding.encodeInputValueMap(txidBuffer, inputIndex); - - operations.push({ - type: action, - key: inputKeyMap, - value: inputValueMap + var txidOperations = []; + for (var addressKey in summaryTxids) { + var txids = summaryTxids[addressKey]; + for (var i = 0; i < txids.length; i++) { + if (add) { + // TODO: don't insert duplicates? + txidOperations.push({ + insertOne: { + document: { + t: txids[i], + a: new Buffer(addressKey, 'hex'), + h: block.height + } + } + }); + } else { + // TODO remove + } + } + } + var summaryOperations = []; + for (var addressKey in summaryDeltas) { + var delta = summaryDeltas[addressKey]; + summaryOperations.push({ + updateOne: { + filter: { + _id: new Buffer(addressKey, 'hex'), + h: { + $lt: height + } + }, + update: { + $set: { + h: height + }, + $inc: { + b: delta.balance, + r: delta.received, + c: delta.change + } + }, + upsert: true + } }); - } - } - setImmediate(function() { - callback(null, operations); + async.parallel([ + function(txidsDone) { + self.addressTxidsDb.bulkWrite(txidOperations, {ordered: false}, txidsDone); + }, + function(deltasDone) { + self.addressSummaryDb.bulkWrite(summaryOperations, {ordered: false}, deltasDone); + }, + function(outsDone) { + self.prevOutsDb.bulkWrite(outOperations, {ordered: false}, outsDone); + } + ], finishBlock); + }); + }; /** @@ -703,6 +869,26 @@ AddressService.prototype.unsubscribeAll = function(name, emitter) { } }; +AddressService.prototype.getTransactionCount = function(addressArg, queryMempool, callback) { + // TODO: mempool + + var address = new Address(addressArg); + var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; + var addressKey = Buffer.concat([address.hashBuffer, hashTypeBuffer]); + + this.addressSummaryDb.findOne({ + _id: addressKey + }, function(err, doc) { + if (err) { + return callback(err); + } else if (!doc) { + return callback(null, 0); + } + callback(null, doc.cn); + + }); +}; + /** * Will sum the total of all unspent outputs to calculate the balance * for an address. @@ -710,22 +896,34 @@ AddressService.prototype.unsubscribeAll = function(name, emitter) { * @param {Boolean} queryMempool - Include mempool in the results * @param {Function} callback */ -AddressService.prototype.getBalance = function(address, queryMempool, callback) { - this.getUnspentOutputs(address, queryMempool, function(err, outputs) { - if(err) { +AddressService.prototype.getBalance = function(addressArg, queryMempool, callback) { + // TODO: mempool + + var address = new Address(addressArg); + var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; + var addressKey = Buffer.concat([address.hashBuffer, hashTypeBuffer]); + + this.addressSummaryDb.findOne({ + _id: addressKey + }, function(err, doc) { + if (err) { return callback(err); + } else if (!doc) { + return callback(null, { + balance: 0, + received: 0, + change: 0, + count: 0 + }); } - - var satoshis = outputs.map(function(output) { - return output.satoshis; + callback(null, { + balance: doc.b, + received: doc.r, + change: doc.c }); - var sum = satoshis.reduce(function(a, b) { - return a + b; - }, 0); - - return callback(null, sum); }); + }; /** @@ -755,167 +953,9 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options return this._getSpentMempool(txidBuffer, outputIndex, callback); } } - var key = encoding.encodeInputKeyMap(txidBuffer, outputIndex); - var dbOptions = { - valueEncoding: 'binary', - keyEncoding: 'binary' - }; - this.node.services.db.store.get(key, dbOptions, function(err, buffer) { - if (err instanceof levelup.errors.NotFoundError) { - return callback(null, false); - } else if (err) { - return callback(err); - } - var value = encoding.decodeInputValueMap(buffer); - callback(null, { - inputTxId: value.inputTxId.toString('hex'), - inputIndex: value.inputIndex - }); - }); -}; - -/** - * A streaming equivalent to `getInputs`, and returns a transform stream with data - * emitted in the same format as `getInputs`. - * - * @param {String} addressStr - The relevant address - * @param {Object} options - Additional options for query the outputs - * @param {Number} [options.start] - The relevant start block height - * @param {Number} [options.end] - The relevant end block height - * @param {Function} callback - */ -AddressService.prototype.createInputsStream = function(addressStr, options) { - var inputStream = new InputsTransformStream({ - address: new Address(addressStr, this.node.network), - tipHeight: this.node.services.db.tip.__height - }); - - var stream = this.createInputsDBStream(addressStr, options) - .on('error', function(err) { - // Forward the error - inputStream.emit('error', err); - inputStream.end(); - }).pipe(inputStream); - - return stream; - -}; - -AddressService.prototype.createInputsDBStream = function(addressStr, options) { - var stream; - var addrObj = encoding.getAddressInfo(addressStr); - var hashBuffer = addrObj.hashBuffer; - var hashTypeBuffer = addrObj.hashTypeBuffer; - - if (options.start >= 0 && options.end >= 0) { - - var endBuffer = new Buffer(4); - endBuffer.writeUInt32BE(options.end, 0); - - var startBuffer = new Buffer(4); - // Because the key has additional data following it, we don't have an ability - // to use "gte" or "lte" we can only use "gt" and "lt", we therefore need to adjust the number - // to be one value larger to include it. - var adjustedStart = options.start + 1; - startBuffer.writeUInt32BE(adjustedStart, 0); - - stream = this.node.services.db.store.createReadStream({ - gt: Buffer.concat([ - constants.PREFIXES.SPENTS, - hashBuffer, - hashTypeBuffer, - constants.SPACER_MIN, - endBuffer - ]), - lt: Buffer.concat([ - constants.PREFIXES.SPENTS, - hashBuffer, - hashTypeBuffer, - constants.SPACER_MIN, - startBuffer - ]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } else { - var allKey = Buffer.concat([constants.PREFIXES.SPENTS, hashBuffer, hashTypeBuffer]); - stream = this.node.services.db.store.createReadStream({ - gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]), - lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } - - return stream; -}; - -/** - * Will give inputs that spend previous outputs for an address as an object with: - * address - The base58check encoded address - * hashtype - The type of the address, e.g. 'pubkeyhash' or 'scripthash' - * txid - A string of the transaction hash - * outputIndex - A number of corresponding transaction input - * height - The height of the block the transaction was included, will be -1 for mempool transactions - * confirmations - The number of confirmations, will equal 0 for mempool transactions - * - * @param {String} addressStr - The relevant address - * @param {Object} options - Additional options for query the outputs - * @param {Number} [options.start] - The relevant start block height - * @param {Number} [options.end] - The relevant end block height - * @param {Boolean} [options.queryMempool] - Include the mempool in the results - * @param {Function} callback - */ -AddressService.prototype.getInputs = function(addressStr, options, callback) { - - var self = this; - - var inputs = []; - - var addrObj = encoding.getAddressInfo(addressStr); - var hashBuffer = addrObj.hashBuffer; - var hashTypeBuffer = addrObj.hashTypeBuffer; - - var stream = this.createInputsStream(addressStr, options); - - stream.on('data', function(input) { - inputs.push(input); - if (inputs.length > self.maxInputsQueryLength) { - log.warn('Tried to query too many inputs (' + self.maxInputsQueryLength + ') for address '+ addressStr); - error = new Error('Maximum number of inputs (' + self.maxInputsQueryLength + ') per query reached'); - stream.end(); - } - }); - - var error; - - stream.on('error', function(streamError) { - if (streamError) { - error = streamError; - } - }); - - stream.on('finish', function() { - if (error) { - return callback(error); - } - - if(options.queryMempool) { - self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { - if (err) { - return callback(err); - } - inputs = inputs.concat(mempoolInputs); - callback(null, inputs); - }); - } else { - callback(null, inputs); - } - - }); - - return stream; - + var key = encoding.encodePrevOutputKey(txidBuffer, outputIndex); + // TODO + callback(null, {}); }; AddressService.prototype._getInputsMempool = function(addressStr, hashBuffer, hashTypeBuffer, callback) { @@ -997,147 +1037,6 @@ AddressService.prototype._getSpentMempool = function(txidBuffer, outputIndex, ca ); }; -AddressService.prototype.createOutputsStream = function(addressStr, options) { - var outputStream = new OutputsTransformStream({ - address: new Address(addressStr, this.node.network), - tipHeight: this.node.services.db.tip.__height - }); - - var stream = this.createOutputsDBStream(addressStr, options) - .on('error', function(err) { - // Forward the error - outputStream.emit('error', err); - outputStream.end(); - }) - .pipe(outputStream); - - return stream; - -}; - -AddressService.prototype.createOutputsDBStream = function(addressStr, options) { - - var addrObj = encoding.getAddressInfo(addressStr); - var hashBuffer = addrObj.hashBuffer; - var hashTypeBuffer = addrObj.hashTypeBuffer; - var stream; - - if (options.start >= 0 && options.end >= 0) { - - var endBuffer = new Buffer(4); - endBuffer.writeUInt32BE(options.end, 0); - - var startBuffer = new Buffer(4); - // Because the key has additional data following it, we don't have an ability - // to use "gte" or "lte" we can only use "gt" and "lt", we therefore need to adjust the number - // to be one value larger to include it. - var startAdjusted = options.start + 1; - startBuffer.writeUInt32BE(startAdjusted, 0); - - stream = this.node.services.db.store.createReadStream({ - gt: Buffer.concat([ - constants.PREFIXES.OUTPUTS, - hashBuffer, - hashTypeBuffer, - constants.SPACER_MIN, - endBuffer - ]), - lt: Buffer.concat([ - constants.PREFIXES.OUTPUTS, - hashBuffer, - hashTypeBuffer, - constants.SPACER_MIN, - startBuffer - ]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } else { - var allKey = Buffer.concat([constants.PREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer]); - stream = this.node.services.db.store.createReadStream({ - gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]), - lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } - - return stream; - -}; - -/** - * Will give outputs for an address as an object with: - * address - The base58check encoded address - * hashtype - The type of the address, e.g. 'pubkeyhash' or 'scripthash' - * txid - A string of the transaction hash - * outputIndex - A number of corresponding transaction output - * height - The height of the block the transaction was included, will be -1 for mempool transactions - * satoshis - The satoshis value of the output - * script - The script of the output as a hex string - * confirmations - The number of confirmations, will equal 0 for mempool transactions - * - * @param {String} addressStr - The relevant address - * @param {Object} options - Additional options for query the outputs - * @param {Number} [options.start] - The relevant start block height - * @param {Number} [options.end] - The relevant end block height - * @param {Boolean} [options.queryMempool] - Include the mempool in the results - * @param {Function} callback - */ -AddressService.prototype.getOutputs = function(addressStr, options, callback) { - var self = this; - $.checkArgument(_.isObject(options), 'Second argument is expected to be an options object.'); - $.checkArgument(_.isFunction(callback), 'Third argument is expected to be a callback function.'); - - var addrObj = encoding.getAddressInfo(addressStr); - var hashBuffer = addrObj.hashBuffer; - var hashTypeBuffer = addrObj.hashTypeBuffer; - if (!hashTypeBuffer) { - return callback(new Error('Unknown address type: ' + addrObj.hashTypeReadable + ' for address: ' + addressStr)); - } - - var outputs = []; - var stream = this.createOutputsStream(addressStr, options); - - stream.on('data', function(data) { - outputs.push(data); - if (outputs.length > self.maxOutputsQueryLength) { - log.warn('Tried to query too many outputs (' + self.maxOutputsQueryLength + ') for address ' + addressStr); - error = new Error('Maximum number of outputs (' + self.maxOutputsQueryLength + ') per query reached'); - stream.end(); - } - }); - - var error; - - stream.on('error', function(streamError) { - if (streamError) { - error = streamError; - } - }); - - stream.on('finish', function() { - if (error) { - return callback(error); - } - - if(options.queryMempool) { - self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { - if (err) { - return callback(err); - } - outputs = outputs.concat(mempoolOutputs); - callback(null, outputs); - }); - } else { - callback(null, outputs); - } - }); - - return stream; - -}; - AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, hashTypeBuffer, callback) { var self = this; var mempoolOutputs = []; @@ -1197,103 +1096,91 @@ AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, h }; /** + * txid - A string of the transaction hash + * outputIndex - A number of corresponding transaction output + * height - The height of the block the transaction was included, will be -1 for mempool transactions + * satoshis - The satoshis value of the output + * script - The script of the output as a hex string + * * Will give unspent outputs for an address or an array of addresses. * @param {Array|String} addresses - An array of addresses - * @param {Boolean} queryMempool - Include or exclude the mempool + * @param {Object} options + * @param {Boolean} options.queryMempool - Include or exclude the mempool * @param {Function} callback */ -AddressService.prototype.getUnspentOutputs = function(addresses, queryMempool, callback) { +AddressService.prototype.getUnspentOutputs = function(addressArg, options, callback) { + //TODO: multiple addresses + //TODO: queryMempool option var self = this; - - if(!Array.isArray(addresses)) { - addresses = [addresses]; - } + var address = new Address(addressArg); var utxos = []; + var count = 0; + var from = options.from || 0; + var to = options.to || constants.PAGE_SIZE; + if (to < from) { + return callback(new Error('"to" option can not be less than "from"')); + } - async.eachSeries(addresses, function(address, next) { - self.getUnspentOutputsForAddress(address, queryMempool, function(err, unspents) { - if(err && err instanceof errors.NoOutputs) { - return next(); - } else if(err) { - return next(err); - } - - utxos = utxos.concat(unspents); - next(); - }); - }, function(err) { - callback(err, utxos); + var addressKey = Buffer.concat([ + constants.PREFIXES.UNSPENT, + address.hashBuffer, + constants.HASH_TYPES_MAP[address.type] + ]); + var stream = this.node.services.db.store.createKeyStream({ + gt: Buffer.concat([addressKey, constants.SPACER_MIN]), + lt: Buffer.concat([addressKey, constants.SPACER_MAX]), + keyEncoding: 'binary' }); -}; - -/** - * Will give unspent outputs for an address. - * @param {String} address - An address in base58check encoding - * @param {Boolean} queryMempool - Include or exclude the mempool - * @param {Function} callback - */ -AddressService.prototype.getUnspentOutputsForAddress = function(address, queryMempool, callback) { - - var self = this; - - this.getOutputs(address, {queryMempool: queryMempool}, function(err, outputs) { - if (err) { - return callback(err); - } else if(!outputs.length) { - return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []); + stream.on('data', function(key) { + if (count >= from && count < to) { + var offset = constants.PREFIX_SIZE + constants.ADDRESS_KEY_SIZE + constants.SPACER_SIZE; + var height = key.readUInt32BE(offset); + var txidEndOffset = offset + constants.HEIGHT_SIZE + constants.TXID_SIZE; + var txid = key.slice(offset + constants.HEIGHT_SIZE, txidEndOffset); + var outputIndex = key.readUInt32BE(txidEndOffset); + utxos.push([height, txid, outputIndex]); + } else if (count > to) { + stream.push(null); } + }); - var opts = { - queryMempool: queryMempool - }; - - var isUnspent = function(output, callback) { - self.isUnspent(output, opts, callback); - }; + var error; - async.filter(outputs, isUnspent, function(results) { - callback(null, results); - }); + stream.on('error', function(err) { + log.error(err); + error = err; }); -}; -/** - * Will give the inverse of isSpent - * @param {Object} output - * @param {Object} options - * @param {Boolean} options.queryMempool - Include mempool in results - * @param {Function} callback - */ -AddressService.prototype.isUnspent = function(output, options, callback) { - $.checkArgument(_.isFunction(callback)); - this.isSpent(output, options, function(spent) { - callback(!spent); + stream.on('end', function() { + if (error) { + return callback(error); + } + // TODO: do many at once and keep order? + async.mapSeries(utxos, getDetails, callback); }); -}; -/** - * Will determine if an output is spent. - * @param {Object} output - An output as returned from getOutputs - * @param {Object} options - * @param {Boolean} options.queryMempool - Include mempool in results - * @param {Function} callback - */ -AddressService.prototype.isSpent = function(output, options, callback) { - $.checkArgument(_.isFunction(callback)); - var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool; - var self = this; - var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid; - var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex); - if (!spent && queryMempool) { - var txidBuffer = new Buffer(txid, 'hex'); - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); - spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false; + function getDetails(utxo, next) { + var height = utxo[0]; + var txid = utxo[1]; + var outputIndex = utxo[2]; + var prevOutputKeyBuffer = encoding.encodePrevOutputKey(txid, outputIndex); + self.node.services.db.store.get(prevOutputKeyBuffer, { + keyEncoding: 'binary' + }, function(err, buffer) { + var satoshis = buffer.readDoubleBE(21); + var script = buffer.read();// TODO + var details = { + txid: txid, + outputIndex: outputIndex, + height: height, + satoshis: satoshis, + script: script + }; + next(null, details); + }); } - setImmediate(function() { - // TODO error should be the first argument? - callback(spent); - }); + }; @@ -1339,272 +1226,89 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba history.get(callback); }; -/** - * This will give an object with: - * balance - confirmed balance - * unconfirmedBalance - unconfirmed balance - * totalReceived - satoshis received - * totalSpent - satoshis spent - * appearances - number of transactions - * unconfirmedAppearances - number of unconfirmed transactions - * txids - list of txids (unless noTxList is set) - * - * @param {String} address - * @param {Object} options - * @param {Boolean} [options.noTxList] - if set, txid array will not be included - * @param {Function} callback - */ -AddressService.prototype.getAddressSummary = function(addressArg, options, callback) { +AddressService.prototype.getTransactionIds = function(addressArg, options, callback) { + // TODO: optional mempool + // TODO: optional buffer txids + // TODO: multiple with $in + // TODO: limit var self = this; - var startTime = new Date(); var address = new Address(addressArg); + var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; + var addressKey = Buffer.concat([address.hashBuffer, hashTypeBuffer]); - if (_.isUndefined(options.queryMempool)) { - options.queryMempool = true; - } - - async.waterfall([ - function(next) { - self._getAddressConfirmedSummary(address, options, next); - }, - function(result, next) { - self._getAddressMempoolSummary(address, options, result, next); - }, - function(result, next) { - self._setAndSortTxidsFromAppearanceIds(result, next); + self.addressTxidsDb.find({ + a: { + $in: [addressKey] } - ], function(err, result) { + }).sort({height: -1}).toArray(function(err, docs) { if (err) { return callback(err); } - - var summary = self._transformAddressSummaryFromResult(result, options); - - var timeDelta = new Date() - startTime; - if (timeDelta > 5000) { - var seconds = Math.round(timeDelta / 1000); - log.warn('Slow (' + seconds + 's) getAddressSummary request for address: ' + address.toString()); - } - - callback(null, summary); - - }); - -}; - -AddressService.prototype._getAddressConfirmedSummary = function(address, options, callback) { - var self = this; - var baseResult = { - appearanceIds: {}, - totalReceived: 0, - balance: 0, - unconfirmedAppearanceIds: {}, - unconfirmedBalance: 0 - }; - - async.waterfall([ - function(next) { - self._getAddressConfirmedInputsSummary(address, baseResult, options, next); - }, - function(result, next) { - self._getAddressConfirmedOutputsSummary(address, result, options, next); + if (!docs) { + return callback(null, []); } - ], callback); - -}; - -AddressService.prototype._getAddressConfirmedInputsSummary = function(address, result, options, callback) { - $.checkArgument(address instanceof Address); - var self = this; - var error = null; - var count = 0; - - var inputsStream = self.createInputsStream(address, options); - inputsStream.on('data', function(input) { - var txid = input.txid; - result.appearanceIds[txid] = input.height; - - count++; - - if (count > self.maxInputsQueryLength) { - log.warn('Tried to query too many inputs (' + self.maxInputsQueryLength + ') for summary of address ' + address.toString()); - error = new Error('Maximum number of inputs (' + self.maxInputsQueryLength + ') per query reached'); - inputsStream.end(); + if (!options.verbose) { + var txids = []; + for (var i = 0; i < docs.length; i++) { + txids.push(docs[i].t.buffer.toString('hex')); + } + return callback(null, txids); } - + callback(null, docs); }); - inputsStream.on('error', function(err) { - error = err; - }); - - inputsStream.on('end', function() { - if (error) { - return callback(error); - } - callback(null, result); - }); }; -AddressService.prototype._getAddressConfirmedOutputsSummary = function(address, result, options, callback) { - $.checkArgument(address instanceof Address); - $.checkArgument(!_.isUndefined(result) && - !_.isUndefined(result.appearanceIds) && - !_.isUndefined(result.unconfirmedAppearanceIds)); - +AddressService.prototype.getAddressSummary = function(addressArg, options, callback) { + // TODO: optional mempool var self = this; - var count = 0; - - var outputStream = self.createOutputsStream(address, options); - - outputStream.on('data', function(output) { - - var txid = output.txid; - var outputIndex = output.outputIndex; - result.totalReceived += output.satoshis; - result.appearanceIds[txid] = output.height; - - if(!options.noBalance) { + var summary = {}; + var address = new Address(addressArg); - // Bitcoind's isSpent only works for confirmed transactions - var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex); + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } - if(!spentDB) { - result.balance += output.satoshis; + function getBalance(done) { + self.getBalance(address, options, function(err, data) { + if (err) { + return done(err); } + summary.totalReceived = data.received; + summary.totalSpent = data.received - data.balance; + summary.balance = data.balance; + summary.change = data.change; + summary.appearances = data.count; + done(); + }); + } - if(options.queryMempool) { - // Check to see if this output is spent in the mempool and if so - // we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta) - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( - new Buffer(txid, 'hex'), // TODO: get buffer directly - outputIndex - ); - var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; - if(spentMempool) { - result.unconfirmedBalance -= output.satoshis; - } + function getTxList(done) { + // todo: fullTxList option + self.getTransactionIds(address, options, function(err, txids) { + if (err) { + return done(err); } - } - - count++; - - if (count > self.maxOutputsQueryLength) { - log.warn('Tried to query too many outputs (' + self.maxOutputsQueryLength + ') for summary of address ' + address.toString()); - error = new Error('Maximum number of outputs (' + self.maxOutputsQueryLength + ') per query reached'); - outputStream.end(); - } - - }); - - var error = null; - - outputStream.on('error', function(err) { - error = err; - }); - - outputStream.on('end', function() { - if (error) { - return callback(error); - } - callback(null, result); - }); - -}; - -AddressService.prototype._setAndSortTxidsFromAppearanceIds = function(result, callback) { - result.txids = Object.keys(result.appearanceIds); - result.txids.sort(function(a, b) { - return result.appearanceIds[a] - result.appearanceIds[b]; - }); - result.unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds); - result.unconfirmedTxids.sort(function(a, b) { - return result.unconfirmedAppearanceIds[a] - result.unconfirmedAppearanceIds[b]; - }); - callback(null, result); -}; - -AddressService.prototype._getAddressMempoolSummary = function(address, options, result, callback) { - var self = this; - - // Skip if the options do not want to include the mempool - if (!options.queryMempool) { - return callback(null, result); + summary.txids = txids; + done(); + }); } - var addressStr = address.toString(); - var hashBuffer = address.hashBuffer; - var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; - var addressIndexKey = encoding.encodeMempoolAddressIndexKey(hashBuffer, hashTypeBuffer); - - if(!this.mempoolAddressIndex[addressIndexKey]) { - return callback(null, result); + var tasks = []; + if (!options.noBalance) { + tasks.push(getBalance); } - - async.waterfall([ - function(next) { - self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { - if (err) { - return next(err); - } - for(var i = 0; i < mempoolInputs.length; i++) { - var input = mempoolInputs[i]; - result.unconfirmedAppearanceIds[input.txid] = input.timestamp; - } - next(null, result); - }); - - }, function(result, next) { - self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { - if (err) { - return next(err); - } - for(var i = 0; i < mempoolOutputs.length; i++) { - var output = mempoolOutputs[i]; - - result.unconfirmedAppearanceIds[output.txid] = output.timestamp; - - if(!options.noBalance) { - var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( - new Buffer(output.txid, 'hex'), // TODO: get buffer directly - output.outputIndex - ); - var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; - // Only add this to the balance if it's not spent in the mempool already - if(!spentMempool) { - result.unconfirmedBalance += output.satoshis; - } - } - } - next(null, result); - }); - } - ], callback); -}; - -AddressService.prototype._transformAddressSummaryFromResult = function(result, options) { - - var confirmedTxids = result.txids; - var unconfirmedTxids = result.unconfirmedTxids; - - var summary = { - totalReceived: result.totalReceived, - totalSpent: result.totalReceived - result.balance, - balance: result.balance, - appearances: confirmedTxids.length, - unconfirmedBalance: result.unconfirmedBalance, - unconfirmedAppearances: unconfirmedTxids.length - }; - - if (options.fullTxList) { - summary.appearanceIds = result.appearanceIds; - summary.unconfirmedAppearanceIds = result.unconfirmedAppearanceIds; - } else if (!options.noTxList) { - summary.txids = confirmedTxids.concat(unconfirmedTxids); + if (!options.noTxList) { + tasks.push(getTxList); } - return summary; + async.parallel(tasks, function(err) { + if (err) { + return callback(err); + } + callback(null, summary); + }); }; diff --git a/lib/services/address/streams/inputs-transform.js b/lib/services/address/streams/inputs-transform.js deleted file mode 100644 index 8b8f71d3d..000000000 --- a/lib/services/address/streams/inputs-transform.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict'; - -var Transform = require('stream').Transform; -var inherits = require('util').inherits; -var bitcore = require('bitcore-lib'); -var encodingUtil = require('../encoding'); -var $ = bitcore.util.preconditions; - -function InputsTransformStream(options) { - $.checkArgument(options.address instanceof bitcore.Address); - Transform.call(this, { - objectMode: true - }); - this._address = options.address; - this._addressStr = this._address.toString(); - this._tipHeight = options.tipHeight; -} -inherits(InputsTransformStream, Transform); - -InputsTransformStream.prototype._transform = function(chunk, encoding, callback) { - var self = this; - - var key = encodingUtil.decodeInputKey(chunk.key); - var value = encodingUtil.decodeInputValue(chunk.value); - - var input = { - address: this._addressStr, - hashType: this._address.type, - txid: value.txid.toString('hex'), - inputIndex: value.inputIndex, - height: key.height, - confirmations: this._tipHeight - key.height + 1 - }; - - self.push(input); - callback(); - -}; - -module.exports = InputsTransformStream; diff --git a/lib/services/address/streams/outputs-transform.js b/lib/services/address/streams/outputs-transform.js deleted file mode 100644 index b9c8e8d36..000000000 --- a/lib/services/address/streams/outputs-transform.js +++ /dev/null @@ -1,42 +0,0 @@ -'use strict'; - -var Transform = require('stream').Transform; -var inherits = require('util').inherits; -var bitcore = require('bitcore-lib'); -var encodingUtil = require('../encoding'); -var $ = bitcore.util.preconditions; - -function OutputsTransformStream(options) { - Transform.call(this, { - objectMode: true - }); - $.checkArgument(options.address instanceof bitcore.Address); - this._address = options.address; - this._addressStr = this._address.toString(); - this._tipHeight = options.tipHeight; -} -inherits(OutputsTransformStream, Transform); - -OutputsTransformStream.prototype._transform = function(chunk, encoding, callback) { - var self = this; - - var key = encodingUtil.decodeOutputKey(chunk.key); - var value = encodingUtil.decodeOutputValue(chunk.value); - - var output = { - address: this._addressStr, - hashType: this._address.type, - txid: key.txid.toString('hex'), //TODO use a buffer - outputIndex: key.outputIndex, - height: key.height, - satoshis: value.satoshis, - script: value.scriptBuffer.toString('hex'), //TODO use a buffer - confirmations: this._tipHeight - key.height + 1 - }; - - self.push(output); - callback(); - -}; - -module.exports = OutputsTransformStream; diff --git a/lib/services/db.js b/lib/services/db.js index 679935ca9..97f30ec36 100644 --- a/lib/services/db.js +++ b/lib/services/db.js @@ -3,8 +3,6 @@ var util = require('util'); var fs = require('fs'); var async = require('async'); -var levelup = require('levelup'); -var leveldown = require('leveldown'); var mkdirp = require('mkdirp'); var bitcore = require('bitcore-lib'); var BufferUtil = bitcore.util.buffer; @@ -16,6 +14,8 @@ var errors = index.errors; var log = index.log; var Transaction = require('../transaction'); var Service = require('../service'); +var MongoClient = require('mongodb'); +var _ = bitcore.deps._; /** * This service synchronizes a leveldb database with bitcoin block chain by connecting and @@ -24,7 +24,6 @@ var Service = require('../service'); * * @param {Object} options * @param {Node} options.node - A reference to the node - * @param {Node} options.store - A levelup backend store */ function DB(options) { /* jshint maxstatements: 20 */ @@ -40,7 +39,7 @@ function DB(options) { // Used to keep track of the version of the indexes // to determine during an upgrade if a reindex is required - this.version = 2; + this.version = 3; this.tip = null; this.genesis = null; @@ -48,16 +47,13 @@ function DB(options) { $.checkState(this.node.network, 'Node is expected to have a "network" property'); this.network = this.node.network; - this._setDataPath(); + this._setDatabaseName(); + + this.mongoBaseUrl = options.mongoBaseUrl || DB.DEFAULT_MONGO_BASE_URL; this.maxOpenFiles = options.maxOpenFiles || DB.DEFAULT_MAX_OPEN_FILES; this.maxTransactionLimit = options.maxTransactionLimit || DB.MAX_TRANSACTION_LIMIT; - this.levelupStore = leveldown; - if (options.store) { - this.levelupStore = options.store; - } - this.retryInterval = 60000; this.subscriptions = { @@ -68,69 +64,66 @@ function DB(options) { util.inherits(DB, Service); -DB.dependencies = ['bitcoind']; +DB.DEFAULT_MONGO_BASE_URL = 'mongodb://localhost:27017/'; +DB.DEFAULT_BLOCKINFO_LIMIT = 10; +DB.MAX_BLOCKINFO_LIMIT = 100; -DB.PREFIXES = { - VERSION: new Buffer('ff', 'hex'), - BLOCKS: new Buffer('01', 'hex'), - TIP: new Buffer('04', 'hex') -}; +DB.dependencies = ['bitcoind']; // The maximum number of transactions to query at once // Used for populating previous inputs DB.MAX_TRANSACTION_LIMIT = 5; -// The default maxiumum number of files open for leveldb -DB.DEFAULT_MAX_OPEN_FILES = 200; - /** * This function will set `this.dataPath` based on `this.node.network`. * @private */ -DB.prototype._setDataPath = function() { +DB.prototype._setDatabaseName = function() { $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); + + // The database should be unique based on the location of the bitcoin directory + // to avoid using special characters in the database name, a hash is generated + // and we will use the last 4 bytes for uniqueness. + var uniqueSlug = bitcore.crypto.Hash.sha256(new Buffer(this.node.datadir, 'utf8')).slice(28, 32); + if (this.node.network === Networks.livenet) { - this.dataPath = this.node.datadir + '/bitcore-node.db'; + this.databaseName = 'bitcoreLivenet' + uniqueSlug.toString('hex'); } else if (this.node.network === Networks.testnet) { if (this.node.network.regtestEnabled) { - this.dataPath = this.node.datadir + '/regtest/bitcore-node.db'; + this.databaseName = 'bitcoreRegtest' + uniqueSlug.toString('hex'); } else { - this.dataPath = this.node.datadir + '/testnet3/bitcore-node.db'; + this.databaseName = 'bitcoreTestnet3' + uniqueSlug.toString('hex'); } } else { throw new Error('Unknown network: ' + this.network); } + log.info('Using database name: ' + this.databaseName); }; DB.prototype._checkVersion = function(callback) { var self = this; - var options = { - keyEncoding: 'binary', - valueEncoding: 'binary' - }; - self.store.get(DB.PREFIXES.TIP, options, function(err) { - if (err instanceof levelup.errors.NotFoundError) { + self.store.collection('blocks').find().sort({ht: -1}).limit(1).toArray(function(err, doc) { + if (err) { + return callback(err); + } else if (!doc.length) { // The database is brand new and doesn't have a tip stored // we can skip version checking return callback(); - } else if (err) { - return callback(err); } - self.store.get(DB.PREFIXES.VERSION, options, function(err, buffer) { + self.store.collection('meta').findOne({key: 'version'}, function(err, doc) { var version; - if (err instanceof levelup.errors.NotFoundError) { - // The initial version (1) of the database didn't store the version number - version = 1; - } else if (err) { + if (err) { return callback(err); + } else if (!doc) { + return callback(new Error('Could not identify the version of the data.')); } else { - version = buffer.readUInt32BE(); + version = doc.value; } if (self.version !== version) { var helpUrl = 'https://github.com/bitpay/bitcore-node/blob/master/docs/services/db.md#how-to-reindex'; return callback(new Error( 'The version of the database "' + version + '" does not match the expected version "' + - self.version + '". A recreation of "' + self.dataPath + '" (can take several hours) is ' + + self.version + '". A recreation of the database "' + self.databaseName + '" (can take several hours) is ' + 'required or to switch versions of software to match. Please see ' + helpUrl + ' for more information.' )); @@ -143,7 +136,33 @@ DB.prototype._checkVersion = function(callback) { DB.prototype._setVersion = function(callback) { var versionBuffer = new Buffer(new Array(4)); versionBuffer.writeUInt32BE(this.version); - this.store.put(DB.PREFIXES.VERSION, versionBuffer, callback); + var metaDb = this.store.collection('meta'); + metaDb.update({key: 'version'}, {key: 'version', value: this.version}, {upsert: true}, callback); +}; + +DB.prototype._createIndexes = function(db, callback) { + async.series([ + function(done) { + db.createCollection('blocks', { + storageEngine: { + wiredTiger: { + configString: 'type=lsm' + } + } + }, done); + }, + function(done) { + db.collection('meta').createIndex({key: 1}, {unique: true}, done); + }, + function(done) { + // Create index for height + db.collection('blocks').createIndex({ht: -1}, {unique: true}, done); + }, + function(done) { + // Create index for timestamp + db.collection('blocks').createIndex({t: 1}, done); + } + ], callback); }; /** @@ -153,45 +172,67 @@ DB.prototype._setVersion = function(callback) { DB.prototype.start = function(callback) { var self = this; - if (!fs.existsSync(this.dataPath)) { - mkdirp.sync(this.dataPath); - } - this.genesis = Block.fromBuffer(this.node.services.bitcoind.genesisBuffer); - this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles }); - this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + self.genesis = Block.fromBuffer(self.node.services.bitcoind.genesisBuffer); - this.once('ready', function() { - log.info('Bitcoin Database Ready'); + var url = self.mongoBaseUrl + self.databaseName; + + self.node.once('ready', function() { + log.debug('Bitcoin Database starting sync'); + self.sync(); + + self.node.services.bitcoind.on('tx', self.transactionHandler.bind(self)); // Notify that there is a new tip - self.node.services.bitcoind.on('tip', function(height) { + self.node.services.bitcoind.on('tip', function() { if(!self.node.stopping) { self.sync(); } }); + }); - async.series([ - function(next) { - self._checkVersion(next); - }, - function(next) { - self._setVersion(next); - } - ], function(err) { + self.once('ready', function() { + log.info('Bitcoin Database Ready'); + }); + + // Use connect method to connect to the Server + MongoClient.connect(url, function(err, db) { if (err) { return callback(err); } - self.loadTip(function(err) { + + self._createIndexes(db, function(err) { if (err) { return callback(err); } - self.sync(); - self.emit('ready'); - setImmediate(callback); + self.store = db; + + async.series([ + function(next) { + self._checkVersion(next); + }, + function(next) { + self._setVersion(next); + } + ], function(err) { + if (err) { + return callback(err); + } + self.loadTip(function(err) { + if (err) { + return callback(err); + } + + self.emit('ready'); + setImmediate(callback); + }); + + }); + }); + }); }; @@ -208,7 +249,11 @@ DB.prototype.stop = function(callback) { }, function(next) { setTimeout(next, 10); }, function() { - self.store.close(callback); + if (self.store) { + self.store.close(callback); + } else { + setImmediate(callback); + } }); }; @@ -255,7 +300,7 @@ DB.prototype.transactionHandler = function(txInfo) { DB.prototype.getAPIMethods = function() { var methods = [ ['getBlock', this, this.getBlock, 1], - ['getBlockHashesByTimestamp', this, this.getBlockHashesByTimestamp, 2], + ['getBlockInfoByTimestamp', this, this.getBlockInfoByTimestamp, 2], ['getTransaction', this, this.getTransaction, 2], ['getTransactionWithBlockInfo', this, this.getTransactionWithBlockInfo, 2], ['sendTransaction', this, this.sendTransaction, 1], @@ -267,16 +312,13 @@ DB.prototype.getAPIMethods = function() { DB.prototype.loadTip = function(callback) { var self = this; - var options = { - keyEncoding: 'binary', - valueEncoding: 'binary' - }; - - self.store.get(DB.PREFIXES.TIP, options, function(err, tipData) { - if(err && err instanceof levelup.errors.NotFoundError) { + self.store.collection('blocks').find().sort({ht: -1}).limit(1).toArray(function(err, blocks) { + if (err) { + return callback(err); + } else if (!blocks.length) { self.tip = self.genesis; self.tip.__height = 0; - self.connectBlock(self.genesis, function(err) { + self.connectBlock(self.genesis, self.genesis.toBuffer().length, function(err) { if(err) { return callback(err); } @@ -285,11 +327,9 @@ DB.prototype.loadTip = function(callback) { callback(); }); return; - } else if(err) { - return callback(err); } - var hash = tipData.toString('hex'); + var hash = blocks[0]._id.buffer.toString('hex'); var times = 0; async.retry({times: 3, interval: self.retryInterval}, function(done) { @@ -342,47 +382,38 @@ DB.prototype.getBlock = function(hash, callback) { * @param {Number} low - low timestamp, in seconds, inclusive * @param {Function} callback */ -DB.prototype.getBlockHashesByTimestamp = function(high, low, callback) { +DB.prototype.getBlockInfoByTimestamp = function(high, low, callback) { var self = this; - var hashes = []; - var lowKey; - var highKey; + // TODO: add options + // TODO: sanitize high, low - try { - lowKey = this._encodeBlockIndexKey(low); - highKey = this._encodeBlockIndexKey(high); - } catch(e) { - return callback(e); - } + var limit = DB.DEFAULT_BLOCKINFO_LIMIT; - var stream = this.store.createReadStream({ - gte: lowKey, - lte: highKey, - reverse: true, - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - - stream.on('data', function(data) { - hashes.push(self._decodeBlockIndexValue(data.value)); - }); - - var error; - - stream.on('error', function(streamError) { - if (streamError) { - error = streamError; + self.store.collection('blocks').find({ + ht: { + $gte: low, + $lte: high } - }); - - stream.on('close', function() { - if (error) { - return callback(error); + }).sort({height: -1}).limit(limit).toArray(function(err, docs) { + if (err) { + return callback(err); } - callback(null, hashes); + var result = []; + for (var i = 0; i < docs.length; i++) { + var doc = docs[i]; + result.push({ + timestamp: doc.t, + hash: doc._id.buffer.toString('hex'), + count: doc.c, + bytes: doc.b, + version: doc.v, + coinbase: doc.cb, + height: doc.ht + }); + } + callback(null, result); }); - return stream; }; /** @@ -511,9 +542,9 @@ DB.prototype.getPrevHash = function(blockHash, callback) { * @param {Block} block - The bitcore block * @param {Function} callback */ -DB.prototype.connectBlock = function(block, callback) { +DB.prototype.connectBlock = function(block, blockBytes, callback) { log.debug('DB handling new chain block'); - this.runAllBlockHandlers(block, true, callback); + this.runAllBlockHandlers(block, blockBytes, true, callback); }; /** @@ -521,9 +552,9 @@ DB.prototype.connectBlock = function(block, callback) { * @param {Block} block - The bitcore block * @param {Function} callback */ -DB.prototype.disconnectBlock = function(block, callback) { +DB.prototype.disconnectBlock = function(block, blockBytes, callback) { log.debug('DB removing chain block'); - this.runAllBlockHandlers(block, false, callback); + this.runAllBlockHandlers(block, blockBytes, false, callback); }; /** @@ -533,46 +564,20 @@ DB.prototype.disconnectBlock = function(block, callback) { * @param {Boolean} add - If the block is being added/connected or removed/disconnected * @param {Function} callback */ -DB.prototype.runAllBlockHandlers = function(block, add, callback) { +DB.prototype.runAllBlockHandlers = function(block, blockBytes, add, callback) { var self = this; - var operations = []; // Notify block subscribers for (var i = 0; i < this.subscriptions.block.length; i++) { - this.subscriptions.block[i].emit('db/block', block.hash); + self.subscriptions.block[i].emit('db/block', block.hash); } - // Update tip - var tipHash = add ? new Buffer(block.hash, 'hex') : BufferUtil.reverse(block.header.prevHash); - operations.push({ - type: 'put', - key: DB.PREFIXES.TIP, - value: tipHash - }); - - // Update block index - operations.push({ - type: add ? 'put' : 'del', - key: this._encodeBlockIndexKey(block.header.timestamp), - value: this._encodeBlockIndexValue(block.hash) - }); - async.eachSeries( - this.node.services, + self.node.services, function(mod, next) { if(mod.blockHandler) { $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); - - mod.blockHandler.call(mod, block, add, function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); - operations = operations.concat(ops); - } - next(); - }); + mod.blockHandler.call(mod, block, add, next); } else { setImmediate(next); } @@ -582,27 +587,28 @@ DB.prototype.runAllBlockHandlers = function(block, add, callback) { return callback(err); } - log.debug('Updating the database with operations', operations); - self.store.batch(operations, callback); + var coinbaseScriptBuffer = block.transactions[0].inputs[0]._scriptBuffer; + + var blockData = { + _id: new Buffer(block.hash, 'hex'), + t: block.header.timestamp, + c: block.transactions.length, + b: blockBytes, + v: block.header.version, + cb: coinbaseScriptBuffer, + ht: block.__height + }; + + // Update block index + if (add) { + self.store.collection('blocks').insert(blockData, callback); + } else { + self.store.collection('blocks').remove(blockData, callback); + } } ); }; -DB.prototype._encodeBlockIndexKey = function(timestamp) { - $.checkArgument(timestamp >= 0 && timestamp <= 4294967295, 'timestamp out of bounds'); - var timestampBuffer = new Buffer(4); - timestampBuffer.writeUInt32BE(timestamp); - return Buffer.concat([DB.PREFIXES.BLOCKS, timestampBuffer]); -}; - -DB.prototype._encodeBlockIndexValue = function(hash) { - return new Buffer(hash, 'hex'); -}; - -DB.prototype._decodeBlockIndexValue = function(value) { - return value.toString('hex'); -}; - /** * This function will find the common ancestor between the current chain and a forked block, * by moving backwards on both chains until there is a meeting point. @@ -704,7 +710,7 @@ DB.prototype.syncRewind = function(block, done) { } // Undo the related indexes for this block - self.disconnectBlock(tip, function(err) { + self.disconnectBlock(tip, tip.toBuffer().length, function(err) { if (err) { return removeDone(err); } @@ -750,6 +756,7 @@ DB.prototype.sync = function() { } var block = Block.fromBuffer(blockBuffer); + var blockBytes = blockBuffer.length; // TODO: expose prevHash as a string from bitcore var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); @@ -763,7 +770,7 @@ DB.prototype.sync = function() { block.__height = self.tip.__height + 1; // Create indexes - self.connectBlock(block, function(err) { + self.connectBlock(block, blockBytes, function(err) { if (err) { return done(err); } diff --git a/package.json b/package.json index 8d3a4e7d2..0463577b4 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "liftoff": "^2.2.0", "memdown": "^1.0.0", "mkdirp": "0.5.0", + "mongodb": "^2.1.7", "nan": "^2.0.9", "npm": "^2.14.1", "semver": "^5.0.1",