Skip to content

Commit

Permalink
adding startappeal function and concurrent limit #63
Browse files Browse the repository at this point in the history
  • Loading branch information
turinglabsorg committed Jul 14, 2022
1 parent 4266d49 commit 31696ad
Showing 1 changed file with 122 additions and 29 deletions.
151 changes: 122 additions & 29 deletions referee-cli/src/libs/fn.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
const axios = require('axios');
let appealsProcessed = []
let appealCache = []
let appealsProcessing = {}
let bootstrapped = []
let processed = {}
const MAX_CONCURRENT_APPEALS = 20
let CONCURRENT_APPEALS = 0

const ipfs = (node, ...args) => {
node.runIpfsNativeCommand(args.join(' '))
}
Expand Down Expand Up @@ -67,16 +71,17 @@ const processappeal = async (node, index) => {
const deal = await contract.deals(appeal.deal_index)
console.log("Deal is:", deal)
if (leader.toUpperCase() === wallet.address.toUpperCase()) {
console.log("Provider is:", deal.accepted)
const provider = await contract.providers(deal.accepted)
const ownerOf = await contract.ownerOf(appeal.deal_index)
console.log("Provider is:", ownerOf)
const provider = await contract.providers(ownerOf)
console.log("Asking file to provider at", provider.endpoint)
const retrieved = await retrievefile(provider.endpoint, deal.ipfs_hash)
console.log("Processing appeal as leader.")
if (retrieved === false) {
console.log("Slashing provider on-chain for appeal " + index + "..")
try {
const slash = await contract.processAppeal(appeal.deal_index, [], [])
console.log('Pending transaction at: ' + slash.hash)
console.log('Pending transactiofn at: ' + slash.hash)
await slash.wait()
console.log("Provider successfully slashed.")
// Sending slashed message to peers
Expand Down Expand Up @@ -120,8 +125,9 @@ const processappeal = async (node, index) => {
console.log("Leader is:", leader, "waiting for " + halt_time + " seconds before try to retrieve file..")
appealsProcessing[index] = setTimeout(async function () {
if (appealsProcessed.indexOf(index.toString()) === -1) {
console.log("Provider is:", deal.accepted)
const provider = await contract.providers(deal.accepted)
const ownerOf = await contract.ownerOf(appeal.deal_index)
console.log("Provider is:", ownerOf)
const provider = await contract.providers(ownerOf)
console.log("Asking file to provider at", provider.endpoint)
const retrieved = await retrievefile(provider.endpoint, deal.ipfs_hash)
if (retrieved === false) {
Expand Down Expand Up @@ -170,9 +176,29 @@ const processappeal = async (node, index) => {
}, halt_time)
}
} else {
console.log("Appeal #" + index + " expired, caching.")
console.log("Appeal #" + index + " terminated, caching.")
appealsProcessed.push(index.toString())
CONCURRENT_APPEALS--
}
} else {
console.log("Appeal already processed, ignoring.")
}
}

const startappeal = async (node, index) => {
if (CONCURRENT_APPEALS < MAX_CONCURRENT_APPEALS) {
console.log('Starting appeal #' + index + '..')
const { contract, wallet, ethers } = await node.contract()
try {
await contract.startAppeal(index)
CONCURRENT_APPEALS++
} catch (e) {
console.log(e)
console.log("Can't start appeal, probably already started..")
}
} else {
console.log("Adding appeal to cache, will pick up later")
appealCache.push(index)
}
}

Expand Down Expand Up @@ -213,50 +239,85 @@ const parseslash = async (node, raw) => {
console.log("File wasn't retrieved correctly, referee lies!")
}
appealsProcessed.push(slash.appeal.toString())
clearTimeout(appealsProcessing[slash.index])
clearTimeout(appealsProcessing[slash.appeal])
} else {
// Adding signature to collected one
if (verified.toUpperCase() === leader.toUpperCase()) {
console.log("Received a slash message from leader, don't need to do nothing..")
appealsProcessed.push(slash.appeal.toString())
} else {
processed[slash.appeal][round].signatures.push({
signature: slash.signature,
referee: verified
})
if (slash.signature !== undefined) {
let sigFound = false
console.log("Collecting signature..")
for (let f in processed[slash.appeal][round].signatures) {
console.log(processed[slash.appeal][round].signatures[f].signature, slash.signature)
if (processed[slash.appeal][round].signatures[f].signature === slash.signature) {
sigFound = true
}
}
if (!sigFound) {
processed[slash.appeal][round].signatures.push({
signature: slash.signature,
referee: verified
})
}
}
const retrieved = await retrievefile(leaderDetails.endpoint, deal.ipfs_hash)
if (retrieved) {
console.log("File was retrieved correctly!")
} else {
console.log("File wasn't retrieved correctly, adding signature and try slash!")
const prefix = await contract.getPrefix(slash.index)
const prefix = await contract.getPrefix(slash.appeal)
// Create hashed version of message
const message = ethers.utils.arrayify(prefix)
const hashedMessage = await ethers.utils.hashMessage(message)
console.log('Hashed message:', hashedMessage)
// Sign message
const signature = await wallet.signMessage(message)
// Run double check
const verified = await contract.verifyRefereeSignature(signature, slash.index, wallet.address)
const verified = await contract.verifyRefereeSignature(signature, slash.appeal, wallet.address)
if (verified) {
console.log("Referee signature verified correctly.")
processed[slash.appeal][round].signatures.push({
signature: signature,
referee: wallet.address
})
const threshold = await contract.refereeConsensusThreshold()
if (processed[slash.appeal][round].signatures.length >= threshold) {
let referees_addresses = []
let referees_signatures = []
for (let j in processed[slash.appeal][round].signatures) {
referees_addresses.push(processed[slash.appeal][round][j].signatures.referee)
referees_signatures.push(processed[slash.appeal][round][j].signatures.signature)
const threshold = parseInt((await contract.refereeConsensusThreshold()).toString())
console.log("Network threshold is:", threshold)
let referees_addresses = []
let referees_signatures = []
let unique = []
for (let j in processed[slash.appeal][round].signatures) {
if (unique.indexOf(processed[slash.appeal][round].signatures[j].referee.toLowerCase()) === -1) {
unique.push(processed[slash.appeal][round].signatures[j].referee.toLowerCase())
try {
const verified = await contract.verifyRefereeSignature(processed[slash.appeal][round].signatures[j].signature, slash.appeal, processed[slash.appeal][round].signatures[j].referee.toLowerCase())
console.log("Is signature valid?", verified)
if (verified) {
referees_addresses.push(processed[slash.appeal][round].signatures[j].referee.toLowerCase())
referees_signatures.push(processed[slash.appeal][round].signatures[j].signature)
}
} catch (e) {
console.log("Can't verify signature from:", processed[slash.appeal][round].signatures[j].referee.toLowerCase())
}
}
}
let parsedSignaturesCount = referees_signatures.length * 100
console.log("Collected signatures:", parsedSignaturesCount)
if (parsedSignaturesCount >= threshold) {
console.log("Collected enough signatures, trying slash.")
console.log("Slashing provider because leader didn't and collected enough signatures.")
const slashTransaction = await contract.processAppeal(appeal.deal_index, referees_addresses, referees_signatures)
console.log('Pending transaction at: ' + slash.hash)
await slashTransaction.wait()
console.log("Provider successfully slashed.")
appealsProcessed.push(slash.appeal.toString())
try {
const slashTransaction = await contract.processAppeal(appeal.deal_index, referees_addresses, referees_signatures)
console.log('Pending transaction at: ' + slashTransaction.hash)
await slashTransaction.wait()
console.log("Provider successfully slashed.")
appealsProcessed.push(slash.appeal.toString())
} catch (e) {
console.log("Error while slashing provider, probably round processed yet..")
}
} else {
console.log("Don't have enough signatures, can't slash..")
}
}
}
Expand Down Expand Up @@ -288,21 +349,42 @@ const setuplisteners = (node) => {
}
for (let k in global['servers']) {
const peer = global['servers'][k]
if (bootstrapped.indexOf(k) === -1) {
if (bootstrapped.indexOf(k) === -1 && peer !== undefined) {
peer.on('slash', async function (raw) {
parseslash(node, raw)
})
}
}
}

const processcache = (node) => {
if (appealCache.length > 0) {
let temp = []
for (let k in appealCache) {
const appealIndex = appealCache[k]
if (CONCURRENT_APPEALS < MAX_CONCURRENT_APPEALS) {
startappeal(node, appealIndex)
} else {
temp.push(appealIndex)
}
}
appealCache = temp
}
}

const returnappeals = async (node) => {
const { contract, wallet, ethers } = await node.contract()
const filter = await contract.filters.AppealCreated()
const appealsEvents = await contract.queryFilter(filter)
return appealsEvents
}

const returnappeal = async (node, appealIndex) => {
const { contract, wallet, ethers } = await node.contract()
const appeal = await contract.appeals(appealIndex)
return appeal
}

// Bootstrap referee listeners
async function bootstrap(node) {
if (Object.keys(global['servers']).length > 0 || Object.keys(global['clients']).length > 0) {
Expand All @@ -325,18 +407,29 @@ const daemon = async (node) => {
const appealEvent = appealsEvents[k]
const appealIndex = appealEvent.args.index
// TODO: Be sure deal is started, if not started startappeal first
const appeal = returnappeal(appealIndex)
const appeal = await returnappeal(node, appealIndex)
if (appeal.origin_timestamp > 0) {
console.log("Appeal #" + appealIndex + " already started, processing..")
processappeal(node, appealIndex)
} else {
startappeal(appealIndex)
console.log("Appeal #" + appealIndex + " not started, starting..")
startappeal(node, appealIndex)
}
}
// Listen for appeals in contract
contract.on("AppealCreated", (index, provider, ipfs_hash) => {
contract.on("AppealCreated", (index) => {
console.log("New appeal created, processing..")
startappeal(node, index)
})
// Listen for appeals in contract
contract.on("AppealStarted", (index) => {
console.log("New appeal started, processing..")
processappeal(node, index)
})
// Setting up timer to flush the cache
setInterval(function () {
processcache(node)
}, 20000)
}

module.exports = { setuplisteners, getidentity, ipfs, sendmessage, withdraw, getbalance, daemon }

0 comments on commit 31696ad

Please sign in to comment.