Skip to content

Commit

Permalink
pin: follow async pinner changes
Browse files Browse the repository at this point in the history
See ipfs/boxo#290

This PR follow the changes in the Pinner to make listing recursive and direct pins asynchronous, which in turns allow pin/ls to build and emit results without having to wait anything, or accumulate too much in memory.

Note: there is a tradeoff for pin/ls?type=all:
- keep the recursive pins in memory (which I chose)
- ask the pinner twice for the recursive pins, and limit memory usage

Also, follow the changes in the GC with similar benefit of not having to wait the full pin list. Add a test.
Also, follow the changes in pin.Verify.
  • Loading branch information
MichaelMure committed May 5, 2023
1 parent a6f446a commit 27dab23
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 96 deletions.
14 changes: 7 additions & 7 deletions core/commands/pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,6 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins, err := n.Pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) PinStatus
checkPin = func(root cid.Cid) PinStatus {
Expand Down Expand Up @@ -722,11 +718,15 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci
out := make(chan interface{})
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
for p := range n.Pinning.RecursiveKeys(ctx) {
if p.Err != nil {
out <- p.Err
return
}
pinStatus := checkPin(p.C)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}:
case out <- &PinVerifyRes{enc.Encode(p.C), pinStatus}:
case <-ctx.Done():
return
}
Expand Down
132 changes: 73 additions & 59 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
"github.com/ipfs/boxo/ipld/merkledag"
pin "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/kubo/tracing"
)

type PinAPI CoreAPI
Expand Down Expand Up @@ -156,6 +157,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt
}

type pinStatus struct {
err error
cid cid.Cid
ok bool
badNodes []coreiface.BadPinNode
Expand All @@ -175,6 +177,10 @@ func (s *pinStatus) BadNodes() []coreiface.BadPinNode {
return s.badNodes
}

func (s *pinStatus) Err() error {
return s.err
}

func (n *badNode) Path() path.Resolved {
return n.path
}
Expand All @@ -191,10 +197,6 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins, err := api.pinning.RecursiveKeys(ctx)
if err != nil {
return nil, err
}

var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus {
Expand Down Expand Up @@ -229,8 +231,18 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
out := make(chan coreiface.PinStatus)
go func() {
defer close(out)
for _, c := range recPins {
out <- checkPin(c)
for p := range api.pinning.RecursiveKeys(ctx) {
var res *pinStatus
if p.Err != nil {
res = &pinStatus{err: p.Err}
} else {
res = checkPin(p.C)
}
select {
case <-ctx.Done():
return
case out <- res:
}
}
}()

Expand Down Expand Up @@ -262,71 +274,68 @@ func (p *pinInfo) Err() error {
func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreiface.Pin {
out := make(chan coreiface.Pin, 1)

keys := cid.NewSet()

AddToResultKeys := func(keyList []cid.Cid, typeStr string) error {
for _, c := range keyList {
if keys.Visit(c) {
select {
case out <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}:
case <-ctx.Done():
return ctx.Err()
}
emittedSet := cid.NewSet()

AddToResultKeys := func(c cid.Cid, typeStr string) error {
if emittedSet.Visit(c) {
select {
case out <- &pinInfo{
pinType: typeStr,
path: path.IpldPath(c),
}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

VisitKeys := func(keyList []cid.Cid) {
for _, c := range keyList {
keys.Visit(c)
}
}

go func() {
defer close(out)

var dkeys, rkeys []cid.Cid
var rkeys []cid.Cid
var err error
if typeStr == "recursive" || typeStr == "all" {
rkeys, err = api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
if err = AddToResultKeys(rkeys, "recursive"); err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.RecursiveKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
if err = AddToResultKeys(streamedCid.C, "recursive"); err != nil {
out <- &pinInfo{err: err}
return
}
}
}
if typeStr == "direct" || typeStr == "all" {
dkeys, err = api.pinning.DirectKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
}
if err = AddToResultKeys(dkeys, "direct"); err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.DirectKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
if err = AddToResultKeys(streamedCid.C, "direct"); err != nil {
out <- &pinInfo{err: err}
return
}
}
}
if typeStr == "all" {
set := cid.NewSet()
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
walkingSet.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
out <- &pinInfo{err: err}
return
}
}
if err = AddToResultKeys(set.Keys(), "indirect"); err != nil {
err = walkingSet.ForEach(func(c cid.Cid) error {
return AddToResultKeys(c, "indirect")
})
if err != nil {
out <- &pinInfo{err: err}
return
}
Expand All @@ -335,33 +344,38 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac
// We need to first visit the direct pins that have priority
// without emitting them

dkeys, err = api.pinning.DirectKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.DirectKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
emittedSet.Add(streamedCid.C)
}
VisitKeys(dkeys)

rkeys, err = api.pinning.RecursiveKeys(ctx)
if err != nil {
out <- &pinInfo{err: err}
return
for streamedCid := range api.pinning.RecursiveKeys(ctx) {
if streamedCid.Err != nil {
out <- &pinInfo{err: streamedCid.Err}
return
}
emittedSet.Add(streamedCid.C)
}
VisitKeys(rkeys)

set := cid.NewSet()
walkingSet := cid.NewSet()
for _, k := range rkeys {
err = merkledag.Walk(
ctx, merkledag.GetLinksWithDAG(api.dag), k,
set.Visit,
walkingSet.Visit,
merkledag.SkipRoot(), merkledag.Concurrent(),
)
if err != nil {
out <- &pinInfo{err: err}
return
}
}
if err = AddToResultKeys(set.Keys(), "indirect"); err != nil {
err = emittedSet.ForEach(func(c cid.Cid) error {
return AddToResultKeys(c, "indirect")
})
if err != nil {
out <- &pinInfo{err: err}
return
}
Expand Down
71 changes: 43 additions & 28 deletions gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn
// Descendants recursively finds all the descendants of the given roots and
// adds them to the given cid.Set, using the provided dag.GetLinks function
// to walk the tree.
func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []cid.Cid) error {
func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedCid) error {
verifyGetLinks := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
err := verifcid.ValidateCid(c)
if err != nil {
Expand All @@ -167,27 +167,37 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots
verboseCidError := func(err error) error {
if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) ||
strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) {
err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ //nolint
err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ // nolint
" to list insecure hashes. If you want to read them,"+
" please downgrade your go-ipfs to 0.4.13\n", err)
log.Error(err)
}
return err
}

for _, c := range roots {
// Walk recursively walks the dag and adds the keys to the given set
err := dag.Walk(ctx, verifyGetLinks, c, func(k cid.Cid) bool {
return set.Visit(toCidV1(k))
}, dag.Concurrent())
for {
select {
case <-ctx.Done():
return ctx.Err()
case wrapper, ok := <-roots:
if !ok {
return nil
}
if wrapper.Err != nil {
return wrapper.Err
}

if err != nil {
err = verboseCidError(err)
return err
// Walk recursively walks the dag and adds the keys to the given set
err := dag.Walk(ctx, verifyGetLinks, wrapper.C, func(k cid.Cid) bool {
return set.Visit(toCidV1(k))
}, dag.Concurrent())

if err != nil {
err = verboseCidError(err)
return err
}
}
}

return nil
}

// toCidV1 converts any CIDv0s to CIDv1s.
Expand Down Expand Up @@ -217,11 +227,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
rkeys, err := pn.RecursiveKeys(ctx)
if err != nil {
return nil, err
}
err = Descendants(ctx, getLinks, gcs, rkeys)
rkeys := pn.RecursiveKeys(ctx)
err := Descendants(ctx, getLinks, gcs, rkeys)
if err != nil {
errors = true
select {
Expand All @@ -243,7 +250,18 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
return links, nil
}
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots)
bestEffortRootsChan := make(chan pin.StreamedCid)
go func() {
defer close(bestEffortRootsChan)
for _, root := range bestEffortRoots {
select {
case <-ctx.Done():
return
case bestEffortRootsChan <- pin.StreamedCid{C: root}:
}
}
}()
err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRootsChan)
if err != nil {
errors = true
select {
Expand All @@ -253,18 +271,15 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo
}
}

dkeys, err := pn.DirectKeys(ctx)
if err != nil {
return nil, err
}
for _, k := range dkeys {
gcs.Add(toCidV1(k))
dkeys := pn.DirectKeys(ctx)
for k := range dkeys {
if k.Err != nil {
return nil, k.Err
}
gcs.Add(toCidV1(k.C))
}

ikeys, err := pn.InternalPins(ctx)
if err != nil {
return nil, err
}
ikeys := pn.InternalPins(ctx)
err = Descendants(ctx, getLinks, gcs, ikeys)
if err != nil {
errors = true
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
module github.com/ipfs/kubo

// https://github.com/ipfs/boxo/pull/290
replace github.com/ipfs/boxo => github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f

require (
bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc
contrib.go.opencensus.io/exporter/prometheus v0.4.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f h1:2UbpOJ6cIC43V/hIDxgvP0VLbJIk+cBofPAWmXBlSrg=
github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
Expand Down Expand Up @@ -356,8 +358,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866 h1:ThRTXD/EyoLb/jz+YW+ZlOLbjX9FyaxP0dEpgUp3cCE=
github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
Expand Down

0 comments on commit 27dab23

Please sign in to comment.