Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Add miner lookup using DHT #2

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open

Add miner lookup using DHT #2

wants to merge 10 commits into from

Conversation

kasteph
Copy link

@kasteph kasteph commented Mar 24, 2022

This PR does the following:

  • Adds the column source to the locations.miners table
  • Uses the Filecoin kad-dht to find miners
  • Adds a time-bound crawl (currently, 5 minutes)

I would like feedback particularly on these lines:

  • Does it make sense to use most recent height from the locations.miners table to fill in the heights of miners found via DHT for now?
  • Finding the miner ID is difficult without a state tree. Should I pull that in, anyway? Because with the current method, we will not know with certainty whether this IP address is accounted for.

@kasteph kasteph requested a review from hsanjuan March 24, 2022 12:52
Copy link
Contributor

@hsanjuan hsanjuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left lots of small nitpick comments, but the last is the one that explains how this approach does not work.

crawl.go Outdated Show resolved Hide resolved
crawl.go Outdated Show resolved Hide resolved
crawl.go Outdated Show resolved Hide resolved
crawl.go Outdated Show resolved Hide resolved
crawl.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
@kasteph kasteph requested a review from hsanjuan April 1, 2022 01:17
Copy link
Contributor

@hsanjuan hsanjuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this is much better and clearer than the previous version.

My main comment would be to limit the number of parallel queries to the DHT.

main.go Outdated Show resolved Hide resolved
peer.go Outdated Show resolved Hide resolved
main.go Outdated
}

func getActiveMiners(db *pg.DB, fp *filecoinPeer) (<-chan minerInfo, error) {
go fp.bootstrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this function already does the connecting in the background and returns, so we shouldn't need to launch it as goroutine. It might be desirable to wait for it to finish connecting to bootstrappers before making dht queries though.

peer.go Outdated Show resolved Hide resolved
main.go Outdated Show resolved Hide resolved
i, activeMiner := i, activeMiner
wg.Add(1)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing that if we have 2000 active miners this is going to launch 2000 goroutines which are going to launch 2000 parallel dht queries.

Might be better to send that list of peer IDs to a channel and launch 50 workers to read from it and process each peer ID, so that we don't have more than 50 queries on flight.

}
found = append(found, f)
}()
}

go func() {
wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting in a goroutine makes the waiting useless as nothing blocks or is meant to happen after waiting.

Comment on lines 209 to 230
var (
found = make([]minerInfo, len(activeMiners))
peerIDs = make(chan minerWithDecodedPeerID, 50)
wg = sync.WaitGroup{}
maxWorkers = runtime.GOMAXPROCS(0)
sem = semaphore.NewWeighted(int64(maxWorkers))
)
for i := range activeMiners {

for i, activeMiner := range activeMiners {
i, activeMiner := i, activeMiner
wg.Add(1)
miner := activeMiners[i]

decodedPid, err := peer.Decode(miner.PeerID)
if err != nil {
log.Warnw("could not decode peer ID ", "ID", miner.PeerID, "error", err)
}
peerIDs <- minerWithDecodedPeerID{miner: miner, decodedPeerID: decodedPid}

wg.Add(1)
go func() {
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(15*time.Second))
defer cancel()

decodedPid, err := peer.Decode(activeMiner.PeerID)
if err != nil {
log.Warnw("could not decode peer ID ", "ID", activeMiner.PeerID, "error", err)
return
if err := sem.Acquire(fp.ctx, 1); err != nil {
log.Warnw("Failed to acquire semaphore: ", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not a good pattern (I have not seen anything like this anywhere). It is overly complex in that it uses all a channel, a semaphore and a waitgroup, when a worker pattern can be solved with a channel.

It might work, in the sense that only 50 things are querying the DHT, but there will be also 50 items in the channel (I guess) and 50 goroutines waiting to access the semaphore (those that managed to put something in the channel), and another 50 executing (those that managed to Acquire the semaphore). Overly complex.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants