Skip to content

Commit

Permalink
switch to new raceless routing event interface
Browse files Browse the repository at this point in the history
fixes #5616

License: MIT
Signed-off-by: Steven Allen <[email protected]>
  • Loading branch information
Stebalien committed Oct 24, 2018
1 parent c97c345 commit e41ac96
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,24 @@ var queryDhtCmd = &cmds.Command{
return
}

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)

id, err := peer.IDB58Decode(req.Arguments()[0])
if err != nil {
res.SetError(cmds.ClientError("invalid peer ID"), cmdkit.ErrClient)
return
}

ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx)

closestPeers, err := n.DHT.GetClosestPeers(ctx, string(id))
if err != nil {
cancel()
res.SetError(err, cmdkit.ErrNormal)
return
}

go func() {
defer close(events)
defer cancel()
for p := range closestPeers {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
ID: p,
Expand Down Expand Up @@ -182,8 +183,6 @@ var findProvidersDhtCmd = &cmds.Command{
return
}

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)
c, err := cid.Parse(req.Arguments()[0])

if err != nil {
Expand All @@ -194,6 +193,9 @@ var findProvidersDhtCmd = &cmds.Command{
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx)

pchan := n.Routing.FindProvidersAsync(ctx, c, numProviders)
go func() {
defer close(outChan)
Expand All @@ -207,7 +209,7 @@ var findProvidersDhtCmd = &cmds.Command{
}()

go func() {
defer close(events)
defer cancel()
for p := range pchan {
np := p
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Expand Down Expand Up @@ -320,8 +322,8 @@ var provideRefDhtCmd = &cmds.Command{
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
Expand All @@ -335,7 +337,7 @@ var provideRefDhtCmd = &cmds.Command{
}()

go func() {
defer close(events)
defer cancel()
var err error
if rec {
err = provideKeysRec(ctx, n.Routing, n.DAG, cids)
Expand Down Expand Up @@ -449,8 +451,8 @@ var findPeerDhtCmd = &cmds.Command{
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
Expand All @@ -464,7 +466,7 @@ var findPeerDhtCmd = &cmds.Command{
}()

go func() {
defer close(events)
defer cancel()
pi, err := n.Routing.FindPeer(ctx, pid)
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Expand Down Expand Up @@ -554,8 +556,8 @@ Different key types can specify other 'best' rules.
outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)
ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
Expand All @@ -568,7 +570,7 @@ Different key types can specify other 'best' rules.
}()

go func() {
defer close(events)
defer cancel()
val, err := n.Routing.GetValue(ctx, dhtkey)
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Expand Down Expand Up @@ -659,9 +661,6 @@ NOTE: A value may not exceed 2048 bytes.
return
}

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)

key, err := escapeDhtKey(req.Arguments()[0])
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
Expand All @@ -673,6 +672,9 @@ NOTE: A value may not exceed 2048 bytes.

data := req.Arguments()[1]

ctx, cancel := context.WithCancel(req.Context())
ctx, events := notif.RegisterForQueryEvents(ctx)

go func() {
defer close(outChan)
for e := range events {
Expand All @@ -685,7 +687,7 @@ NOTE: A value may not exceed 2048 bytes.
}()

go func() {
defer close(events)
defer cancel()
err := n.Routing.PutValue(ctx, key, []byte(data))
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Expand Down

0 comments on commit e41ac96

Please sign in to comment.