Skip to content

Commit

Permalink
Expose basic text-based datamodel selector on retrieval
Browse files Browse the repository at this point in the history
Syntaxt of selection is located at
https://pkg.go.dev/github.com/ipld/go-ipld-selector-text-lite#SelectorSpecFromPath

Example use, assuming that:
- The root of the deal is a plain dag-pb unixfs directory
- The directory is not sharded
- The user wants to retrieve the first entry in that directory

lotus client retrieve --miner f0XXXXX --datamodel-path-selector 'Links/0/Hash' bafyROOTCID ~/output
  • Loading branch information
ribasushi committed Jun 4, 2021
1 parent 8156198 commit d539169
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 9 deletions.
8 changes: 5 additions & 3 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ipfs/go-cid"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -896,9 +897,10 @@ type MarketDeal struct {

type RetrievalOrder struct {
// TODO: make this less unixfs specific
Root cid.Cid
Piece *cid.Cid
Size uint64
Root cid.Cid
Piece *cid.Cid
DatamodelPathSelector *textselector.Expression
Size uint64

LocalStore *multistore.StoreID // if specified, get data from local store
// TODO: support offset
Expand Down
9 changes: 9 additions & 0 deletions cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -1044,6 +1045,10 @@ var clientRetrieveCmd = &cli.Command{
Name: "miner",
Usage: "miner address for retrieval, if not present it'll use local discovery",
},
&cli.StringFlag{
Name: "datamodel-path-selector",
Usage: "a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal",
},
&cli.StringFlag{
Name: "maxPrice",
Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice),
Expand Down Expand Up @@ -1179,6 +1184,10 @@ var clientRetrieveCmd = &cli.Command{
IsCAR: cctx.Bool("car"),
}

if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" {
order.DatamodelPathSelector = &sel
}

updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref)
if err != nil {
return xerrors.Errorf("error setting up retrieval: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ require (
github.com/ipfs/go-unixfs v0.2.4
github.com/ipfs/interface-go-ipfs-core v0.2.3
github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db
github.com/ipld/go-codec-dagpb v1.2.0
github.com/ipld/go-ipld-prime v0.10.0
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210603153726-3feaa7573d47
github.com/kelseyhightower/envconfig v1.4.0
github.com/lib/pq v1.7.0
github.com/libp2p/go-buffer-pool v0.0.2
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -735,11 +735,14 @@ github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVI
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db h1:kFwGn8rXa/Z31ev1OFNQsYeNKNCdifnTPl/NvPy5L38=
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.10.0 h1:ZCd52SDUqvA3YUJEx9v2uIm1qWv6FAxBt2mhiFpoZ6s=
github.com/ipld/go-ipld-prime v0.10.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210603153726-3feaa7573d47 h1:Dy6oM0nWaolcCLAxgTGge59SaqDC2b4WgoMUN9fmW8E=
github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210603153726-3feaa7573d47/go.mod h1:LOfTTmw0lAwWTfPMrL8QZv4vgaOAhzo/0GWl0K51oTU=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
Expand Down
96 changes: 96 additions & 0 deletions markets/utils/selectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package utils

import (
"bytes"
"context"
"fmt"
"io"

// must be imported to init() raw-codec support
_ "github.com/ipld/go-ipld-prime/codec/raw"

"github.com/ipfs/go-cid"
mdagipld "github.com/ipfs/go-ipld-format"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
)

func TraverseDag(
ctx context.Context,
ds mdagipld.DAGService,
startFrom cid.Cid,
optionalSelector ipld.Node,
visitCallback traversal.AdvVisitFn,
) error {

// If no selector is given - use *.*
// See discusion at https://github.com/ipld/go-ipld-prime/issues/171
if optionalSelector == nil {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
optionalSelector = ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
).Node()
}

parsedSelector, err := selector.ParseSelector(optionalSelector)
if err != nil {
return err
}

// not sure what this is for TBH...
linkContext := ipld.LinkContext{Ctx: ctx}

// this is what allows us to understand dagpb
nodePrototypeChooser := dagpb.AddSupportToChooser(
func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) {
return basicnode.Prototype.Any, nil
},
)

// this is how we implement GETs
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
if cl, isCid := lnk.(cidlink.Link); !isCid {
return nil, fmt.Errorf("unexpected link type %#v", lnk)
} else {
node, err := ds.Get(context.TODO(), cl.Cid)
if err != nil {
return nil, err
}
return bytes.NewBuffer(node.RawData()), nil
}
}

// this is how we pull the start node out of the DS
startLink := cidlink.Link{Cid: startFrom}
startNodePrototype, err := nodePrototypeChooser(startLink, linkContext)
if err != nil {
return err
}
startNode, err := linkSystem.Load(
linkContext,
startLink,
startNodePrototype,
)
if err != nil {
return err
}

// this is the actual execution, invoking the supplied callback
return traversal.Progress{
Cfg: &traversal.Config{
Ctx: ctx,
LinkSystem: linkSystem,
LinkTargetNodePrototypeChooser: nodePrototypeChooser,
},
}.WalkAdv(startNode, parsedSelector, visitCallback)
}
57 changes: 53 additions & 4 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,19 @@ import (
offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files"
mdagipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multibase"
Expand All @@ -43,7 +48,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/discovery"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-multistore"
Expand All @@ -64,6 +68,8 @@ import (
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)

var log = logging.Logger("client")

var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)

const dealStartBufferHours uint64 = 49
Expand Down Expand Up @@ -728,9 +734,27 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return err
}*/

// FIXME - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.4.0/shared/selectors.go#L11-L16
// Unable to use it because we need the SelectorSpec, and markets exposes just a reified node
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selspec := ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
)

if order.DatamodelPathSelector != nil {
var err error
selspec, err = textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, selspec)
if err != nil {
finish(xerrors.Errorf("failed to parse selector '%s': %w", *order.DatamodelPathSelector, err))
return
}
log.Infof("partial retrieval of datamodel-path-selector %s/*", *order.DatamodelPathSelector)
}

ppb := types.BigDiv(order.Total, types.NewInt(order.Size))

params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, selspec.Node(), order.Piece, order.UnsealPrice)
if err != nil {
finish(xerrors.Errorf("Error in retrieval params: %s", err))
return
Expand Down Expand Up @@ -805,13 +829,38 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref

rdag := store.DAGService()

root := order.Root
if order.DatamodelPathSelector != nil {
// no err check - we just compiled this before starting, but now we do not wrap a `*`
selspec, _ := textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, nil) //nolint:errcheck
if err := utils.TraverseDag(
ctx,
rdag,
root,
selspec.Node(),
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
if !castOK {
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String())
}
root = cidLnk.Cid
}
return nil
},
); err != nil {
finish(xerrors.Errorf("Finding partial retrieval sub-root: %w", err))
return
}
}

if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
finish(err)
return
}
err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
err = car.WriteCar(ctx, rdag, []cid.Cid{root}, f)
if err != nil {
finish(err)
return
Expand All @@ -820,7 +869,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}

nd, err := rdag.Get(ctx, order.Root)
nd, err := rdag.Get(ctx, root)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
Expand Down

0 comments on commit d539169

Please sign in to comment.