From 6eb63346690c005ac1f2468af18df528be24b945 Mon Sep 17 00:00:00 2001 From: lbqds Date: Mon, 17 Jan 2022 20:58:08 +0800 Subject: [PATCH 1/5] Check duplicated shares for multiple pool instance --- lib/shareProcessor.js | 59 ++++++++++++++------ test/shareProcessorTest.js | 110 ++++++++++++++++++++++++++----------- 2 files changed, 119 insertions(+), 50 deletions(-) diff --git a/lib/shareProcessor.js b/lib/shareProcessor.js index 2f16643..0e74026 100644 --- a/lib/shareProcessor.js +++ b/lib/shareProcessor.js @@ -71,11 +71,11 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ createTables(_this.db); _this.handleShare = function(share){ persistShare(_this.db, share); - _this._handleShare(share); + _this._handleShare(share, _ => {}); } } else { - _this.handleShare = share => _this._handleShare(share); + _this.handleShare = share => _this._handleShare(share, _ => {}); } this.currentRoundKey = function(fromGroup, toGroup){ @@ -86,33 +86,56 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ return fromGroup + ':' + toGroup + ':shares:' + blockHash; } + this.shareCacheKey = function(fromGroup, toGroup){ + return fromGroup + ':' + toGroup + ':sharecache'; + } + var pendingBlocksKey = 'pendingBlocks'; var foundBlocksKey = 'foundBlocks'; var hashrateKey = 'hashrate'; var balancesKey = 'balances'; - this._handleShare = function(share){ - var redisTx = _this.redisClient.multi(); - var currentMs = Date.now(); + this._handleShare = function(share, callback){ var fromGroup = share.job.fromGroup; var toGroup = share.job.toGroup; + var blockHash = share.blockHash; var currentRound = _this.currentRoundKey(fromGroup, toGroup); - redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); + var cacheKey = _this.shareCacheKey(fromGroup, toGroup); + + _this.redisClient.sadd(cacheKey, blockHash, function(error, result){ + if (error){ + logger.error('Check share duplicated failed, error: ' + error); + callback(error); + return; + } + + if (result === 0){ + logger.error('Ignore duplicated share'); + callback('Duplicated share'); + return; + } - var currentTs = Math.floor(currentMs / 1000); - redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); + var redisTx = _this.redisClient.multi(); + var currentMs = Date.now(); + redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); - if (share.foundBlock){ - var blockHash = share.blockHash; - var newKey = _this.roundKey(fromGroup, toGroup, blockHash); - var blockWithTs = blockHash + ':' + currentMs.toString(); + var currentTs = Math.floor(currentMs / 1000); + redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); - redisTx.rename(currentRound, newKey); - redisTx.sadd(pendingBlocksKey, blockWithTs); - redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) - } - redisTx.exec(function(error, _){ - if (error) logger.error('Handle share failed, error: ' + error); + if (share.foundBlock){ + var blockHash = share.blockHash; + var newKey = _this.roundKey(fromGroup, toGroup, blockHash); + var blockWithTs = blockHash + ':' + currentMs.toString(); + + redisTx.rename(currentRound, newKey); + redisTx.sadd(pendingBlocksKey, blockWithTs); + redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) + redisTx.del(cacheKey); + } + redisTx.exec(function(error, _){ + if (error) logger.error('Handle share failed, error: ' + error); + callback(error); + }); }); } diff --git a/test/shareProcessorTest.js b/test/shareProcessorTest.js index 9e2f3cd..acf5d95 100644 --- a/test/shareProcessorTest.js +++ b/test/shareProcessorTest.js @@ -26,6 +26,49 @@ describe('test share processor', function(){ }) }) + it('should ignore duplicated shares', function(done){ + var shareProcessor = new ShareProcessor(test.config, test.logger); + shareProcessor.redisClient = redisClient; + + var share1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 1, workerAddress: 'miner1'}; + var share2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash2', difficulty: 2, workerAddress: 'miner1'}; + var invalidShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner1'}; + + var checkState = function(cacheKey, roundKey, callback){ + redisClient + .multi() + .hget(roundKey, 'miner1') + .smembers(cacheKey) + .exec(function(error, results){ + if (error) assert.fail('Test failed: ' + error); + var difficulty = results[0][1]; + var shareHashes = results[1][1]; + callback(difficulty, shareHashes); + }); + }; + + var cacheKey = shareProcessor.shareCacheKey(0, 1); + var currentRoundKey = shareProcessor.currentRoundKey(0, 1); + util.executeForEach([share1, share2, invalidShare], (share, callback) => { + shareProcessor._handleShare(share, callback); + }, function(){ + checkState(cacheKey, currentRoundKey, function(difficulty, shareHashes){ + expect(parseFloat(difficulty)).equal(share1.difficulty + share2.difficulty); + expect(shareHashes).to.deep.equal([share1.blockHash, share2.blockHash]); + + var blockShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: true, blockHash: 'hash3', difficulty: 3, workerAddress: 'miner1'}; + shareProcessor._handleShare(blockShare, function(){ + var roundKey = shareProcessor.roundKey(0, 1, blockShare.blockHash); + checkState(cacheKey, roundKey, function(difficulty, shareHashes){ + expect(parseFloat(difficulty)).equal(share1.difficulty + share2.difficulty + blockShare.difficulty); + expect(shareHashes).to.deep.equal([]); + done(); + }); + }); + }); + }); + }) + it('should process shares', function(done){ var shareProcessor = new ShareProcessor(test.config, test.logger); shareProcessor.redisClient = redisClient; @@ -38,44 +81,47 @@ describe('test share processor', function(){ foundBlock: false }; - shareProcessor.handleShare(shareData); - var currentRoundKey = shareProcessor.currentRoundKey( - shareData.job.fromGroup, - shareData.job.toGroup - ); - - redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){ - if (error) assert.fail('Test failed: ' + error); - expect(parseFloat(res)).equal(shareData.difficulty); - - shareData.foundBlock = true; - var blockHashHex = '0011'; - shareData.blockHash = blockHashHex; - shareProcessor.handleShare(shareData); + shareProcessor._handleShare(shareData, function(){ - var roundKey = shareProcessor.roundKey( + var currentRoundKey = shareProcessor.currentRoundKey( shareData.job.fromGroup, - shareData.job.toGroup, - blockHashHex + shareData.job.toGroup ); - redisClient - .multi() - .hget(roundKey, shareData.workerAddress) - .smembers('pendingBlocks') - .hget('foundBlocks', blockHashHex) - .exec(function(error, result){ - if (error) assert.fail('Test failed: ' + error); - var difficulty = result[0][1]; - var pendingBlocks = result[1][1]; - var blockMiner = result[2][1]; + redisClient.hget(currentRoundKey, shareData.workerAddress, function(error, res){ + if (error) assert.fail('Test failed: ' + error); + expect(parseFloat(res)).equal(shareData.difficulty); - expect(parseFloat(difficulty)).equal(shareData.difficulty * 2); - expect(pendingBlocks.length).equal(1); - expect(pendingBlocks[0].startsWith(blockHashHex)); - expect(blockMiner).equal(shareData.workerAddress); - done(); + shareData.foundBlock = true; + var blockHashHex = '0011'; + shareData.blockHash = blockHashHex; + shareProcessor._handleShare(shareData, function(){ + + var roundKey = shareProcessor.roundKey( + shareData.job.fromGroup, + shareData.job.toGroup, + blockHashHex + ); + + redisClient + .multi() + .hget(roundKey, shareData.workerAddress) + .smembers('pendingBlocks') + .hget('foundBlocks', blockHashHex) + .exec(function(error, result){ + if (error) assert.fail('Test failed: ' + error); + var difficulty = result[0][1]; + var pendingBlocks = result[1][1]; + var blockMiner = result[2][1]; + + expect(parseFloat(difficulty)).equal(shareData.difficulty * 2); + expect(pendingBlocks.length).equal(1); + expect(pendingBlocks[0].startsWith(blockHashHex)); + expect(blockMiner).equal(shareData.workerAddress); + done(); + }); }); + }); }); }) From 9d2074b1eff62e0dc7de674e7d305550fa1e1515 Mon Sep 17 00:00:00 2001 From: lbqds Date: Mon, 17 Jan 2022 21:01:06 +0800 Subject: [PATCH 2/5] Submit immediately when block found --- lib/pool.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pool.js b/lib/pool.js index 872298b..d5ed14b 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -100,7 +100,6 @@ var pool = module.exports = function pool(config, logger){ share_difficulty: shareData.shareDiff, ip: shareData.ip })) - _this.shareProcessor.handleShare(shareData); if (shareData.foundBlock){ logger.info('Found block for chainIndex: ' + chainIndex + ', hash: ' + shareData.blockHash + @@ -114,6 +113,7 @@ var pool = module.exports = function pool(config, logger){ } }); } + _this.shareProcessor.handleShare(shareData); }) } From 23b533b9292f76954560c5d1ae543142a4ec6dbb Mon Sep 17 00:00:00 2001 From: lbqds Date: Mon, 17 Jan 2022 21:07:18 +0800 Subject: [PATCH 3/5] Remove dead code --- lib/jobManager.js | 4 ++-- lib/pool.js | 4 ++-- lib/stratum.js | 10 ++++------ test/stratumTest.js | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/lib/jobManager.js b/lib/jobManager.js index e58e354..e86b19f 100644 --- a/lib/jobManager.js +++ b/lib/jobManager.js @@ -141,7 +141,7 @@ function JobManager(jobExpiryPeriod){ difficulty: difficulty, error: error[1] }); - return {error: error, result: null}; + return {error: error}; }; var job = _this.validJobs.getJob(params.jobId); @@ -214,7 +214,7 @@ function JobManager(jobExpiryPeriod){ foundBlock: foundBlock }); - return {result: true, error: null, blockHash: hash}; + return {error: null}; }; }; JobManager.prototype.__proto__ = events.EventEmitter.prototype; diff --git a/lib/pool.js b/lib/pool.js index d5ed14b..a610f45 100644 --- a/lib/pool.js +++ b/lib/pool.js @@ -204,14 +204,14 @@ var pool = module.exports = function pool(config, logger){ _this.varDiff.manageClient(client); client.on('submit', function(params, resultCallback){ - var result =_this.jobManager.processShare( + var result = _this.jobManager.processShare( params, client.previousDifficulty, client.difficulty, client.remoteAddress, client.socket.localPort ); - resultCallback(result.error, result.result ? true : null); + resultCallback(result.error); }).on('malformedMessage', function (message) { logger.warn('Malformed message from ' + client.getLabel() + ': ' + message); diff --git a/lib/stratum.js b/lib/stratum.js index 0a82724..1ba5d45 100644 --- a/lib/stratum.js +++ b/lib/stratum.js @@ -73,15 +73,13 @@ var StratumClient = function(params){ function handleSubmit(message){ _this.emit('submit', message.params, - function(error, result){ - if (!error && result){ - _this.emit('submitAccepted'); - } - if (!considerBan(result)){ + function(error){ + var accepted = !error; + if (!considerBan(accepted)){ sendJson({ id: message.id, method: 'mining.submit_result', - result: result, + result: accepted, error: error }); } diff --git a/test/stratumTest.js b/test/stratumTest.js index e87b81a..26becbb 100644 --- a/test/stratumTest.js +++ b/test/stratumTest.js @@ -155,7 +155,7 @@ describe('test stratum server', function(){ stratumClient.on('submit', function(params, callback){ expect(params).equal(submitMessage.params); - callback(null, false); + callback('invalid share'); }); }); From aad5ad378b1d157a729957a3c67a13b6ca499482 Mon Sep 17 00:00:00 2001 From: lbqds Date: Mon, 17 Jan 2022 21:44:27 +0800 Subject: [PATCH 4/5] Add enabled options --- README.md | 2 ++ config.json | 2 ++ lib/paymentProcessor.js | 10 ++++++---- lib/shareProcessor.js | 4 +++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index d9dae46..a0e0053 100644 --- a/README.md +++ b/README.md @@ -49,9 +49,11 @@ configs explanation: }, "withholdPercent": 0.005, // coinbase reward withhold percent(0.5% by default), used for tx fee mainly + "rewardEnabled": true, // enabled by default "rewardInterval": 600, // update miner balances every this many seconds "confirmationTime": 30600, // 510m by default, you can decrease this if your payment addresses have enough balance + "paymentEnabled": true, // enabled by default "minPaymentCoins": "3.5", // minimum number of coins that a miner must earn before sending payment "paymentInterval": 600, // send payment every this many seconds diff --git a/config.json b/config.json index 10151ed..1d5329e 100644 --- a/config.json +++ b/config.json @@ -44,9 +44,11 @@ }, "withholdPercent": 0.005, + "rewardEnabled": true, "rewardInterval": 600, "confirmationTime": 30600, + "paymentEnabled": true, "minPaymentCoins": "0.5", "paymentInterval": 3600, "txConfirmations": { diff --git a/lib/paymentProcessor.js b/lib/paymentProcessor.js index 8feeab4..6bfd971 100644 --- a/lib/paymentProcessor.js +++ b/lib/paymentProcessor.js @@ -578,10 +578,12 @@ var PaymentProcessor = module.exports = function PaymentProcessor(config, logger } this.start = function(){ - checkAddress(config.addresses); - loadPublicKey(config.wallet, function(){ - setTimeout(payment, config.paymentInterval * 1000); - }); + if (config.paymentEnabled){ + checkAddress(config.addresses); + loadPublicKey(config.wallet, function(){ + setTimeout(payment, config.paymentInterval * 1000); + }); + } } function loadPublicKey(walletConfig, callback){ diff --git a/lib/shareProcessor.js b/lib/shareProcessor.js index 0e74026..7037392 100644 --- a/lib/shareProcessor.js +++ b/lib/shareProcessor.js @@ -288,6 +288,8 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ } this.start = function(){ - setInterval(scanBlocks, config.rewardInterval * 1000); + if (config.rewardEnabled){ + setInterval(scanBlocks, config.rewardInterval * 1000); + } } } From 920d8b01e9454d51684b687557c6fa325a47e646 Mon Sep 17 00:00:00 2001 From: lbqds Date: Sat, 22 Jan 2022 11:50:49 +0800 Subject: [PATCH 5/5] Improve check duplicated shares --- lib/shareProcessor.js | 22 +++++++++++----------- test/shareProcessorTest.js | 37 ++++++++++++++++++++++--------------- 2 files changed, 33 insertions(+), 26 deletions(-) diff --git a/lib/shareProcessor.js b/lib/shareProcessor.js index 7037392..211e82c 100644 --- a/lib/shareProcessor.js +++ b/lib/shareProcessor.js @@ -6,6 +6,7 @@ const { Pool } = require('pg'); var ShareProcessor = module.exports = function ShareProcessor(config, logger){ var confirmationTime = config.confirmationTime * 1000; var rewardPercent = 1 - config.withholdPercent; + var shareExpiryPeriod = 15; var _this = this; this.redisClient = new Redis(config.redis.port, config.redis.host); @@ -86,39 +87,39 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ return fromGroup + ':' + toGroup + ':shares:' + blockHash; } - this.shareCacheKey = function(fromGroup, toGroup){ - return fromGroup + ':' + toGroup + ':sharecache'; - } - var pendingBlocksKey = 'pendingBlocks'; var foundBlocksKey = 'foundBlocks'; var hashrateKey = 'hashrate'; var balancesKey = 'balances'; + this.shareCacheKey = function(fromGroup, toGroup, hash){ + return fromGroup + ':' + toGroup + ':hashes:' + hash; + } + this._handleShare = function(share, callback){ var fromGroup = share.job.fromGroup; var toGroup = share.job.toGroup; var blockHash = share.blockHash; var currentRound = _this.currentRoundKey(fromGroup, toGroup); - var cacheKey = _this.shareCacheKey(fromGroup, toGroup); + var hashKey = _this.shareCacheKey(fromGroup, toGroup, blockHash); - _this.redisClient.sadd(cacheKey, blockHash, function(error, result){ + _this.redisClient.set(hashKey, true, 'EX', shareExpiryPeriod, 'NX', function(error, result){ if (error){ logger.error('Check share duplicated failed, error: ' + error); callback(error); return; } - if (result === 0){ - logger.error('Ignore duplicated share'); - callback('Duplicated share'); + if (result == null){ + logger.error('Ignore duplicated share, key: ' + hashKey); + callback('duplicated share'); return; } var redisTx = _this.redisClient.multi(); - var currentMs = Date.now(); redisTx.hincrbyfloat(currentRound, share.workerAddress, share.difficulty); + var currentMs = Date.now(); var currentTs = Math.floor(currentMs / 1000); redisTx.zadd(hashrateKey, currentTs, [fromGroup, toGroup, share.worker, share.difficulty, currentMs].join(':')); @@ -130,7 +131,6 @@ var ShareProcessor = module.exports = function ShareProcessor(config, logger){ redisTx.rename(currentRound, newKey); redisTx.sadd(pendingBlocksKey, blockWithTs); redisTx.hset(foundBlocksKey, blockHash, share.workerAddress) - redisTx.del(cacheKey); } redisTx.exec(function(error, _){ if (error) logger.error('Handle share failed, error: ' + error); diff --git a/test/shareProcessorTest.js b/test/shareProcessorTest.js index acf5d95..8acb452 100644 --- a/test/shareProcessorTest.js +++ b/test/shareProcessorTest.js @@ -32,37 +32,44 @@ describe('test share processor', function(){ var share1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 1, workerAddress: 'miner1'}; var share2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash2', difficulty: 2, workerAddress: 'miner1'}; - var invalidShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner1'}; + var invalidShare1 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner1'}; + var invalidShare2 = {job: {fromGroup: 0, toGroup: 1}, foundBlock: false, blockHash: 'hash1', difficulty: 3, workerAddress: 'miner2'}; - var checkState = function(cacheKey, roundKey, callback){ + var checkState = function(roundKey, callback){ redisClient .multi() .hget(roundKey, 'miner1') - .smembers(cacheKey) + .hget(roundKey, 'miner2') .exec(function(error, results){ if (error) assert.fail('Test failed: ' + error); - var difficulty = results[0][1]; - var shareHashes = results[1][1]; - callback(difficulty, shareHashes); + var difficulty1 = results[0][1]; + var difficulty2 = results[1][1]; + callback(difficulty1, difficulty2); }); }; - var cacheKey = shareProcessor.shareCacheKey(0, 1); var currentRoundKey = shareProcessor.currentRoundKey(0, 1); - util.executeForEach([share1, share2, invalidShare], (share, callback) => { + var key1 = shareProcessor.shareCacheKey(0, 1, share1.blockHash); + var key2 = shareProcessor.shareCacheKey(0, 1, share2.blockHash); + util.executeForEach([share1, share2, invalidShare1, invalidShare2], (share, callback) => { shareProcessor._handleShare(share, callback); }, function(){ - checkState(cacheKey, currentRoundKey, function(difficulty, shareHashes){ - expect(parseFloat(difficulty)).equal(share1.difficulty + share2.difficulty); - expect(shareHashes).to.deep.equal([share1.blockHash, share2.blockHash]); + checkState(currentRoundKey, function(diff1, diff2){ + expect(parseFloat(diff1)).equal(share1.difficulty + share2.difficulty); + expect(diff2).equal(null); var blockShare = {job: {fromGroup: 0, toGroup: 1}, foundBlock: true, blockHash: 'hash3', difficulty: 3, workerAddress: 'miner1'}; shareProcessor._handleShare(blockShare, function(){ var roundKey = shareProcessor.roundKey(0, 1, blockShare.blockHash); - checkState(cacheKey, roundKey, function(difficulty, shareHashes){ - expect(parseFloat(difficulty)).equal(share1.difficulty + share2.difficulty + blockShare.difficulty); - expect(shareHashes).to.deep.equal([]); - done(); + checkState(roundKey, function(diff1, diff2){ + expect(parseFloat(diff1)).equal(share1.difficulty + share2.difficulty + blockShare.difficulty); + expect(diff2).equal(null); + + redisClient.exists(key1, key2, function(error, result){ + if (error) assert.fail('Test failed: ' + error); + expect(result).equal(2); + done(); + }); }); }); });