Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

routing/http: feat: add streaming support #18

Merged
merged 10 commits into from
May 10, 2023
143 changes: 111 additions & 32 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"encoding/json"
"errors"
"fmt"
"mime"
"net/http"
"strings"
"time"

"github.com/benbjohnson/clock"
Expand All @@ -15,6 +17,9 @@ import (
"github.com/ipfs/boxo/routing/http/internal/drjson"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
jsontypes "github.com/ipfs/boxo/routing/http/types/json"
"github.com/ipfs/boxo/routing/http/types/ndjson"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
record "github.com/libp2p/go-libp2p-record"
Expand All @@ -23,14 +28,31 @@ import (
"github.com/multiformats/go-multiaddr"
)

var logger = logging.Logger("service/delegatedrouting")
var (
_ contentrouter.Client = &client{}
logger = logging.Logger("service/delegatedrouting")
defaultHTTPClient = &http.Client{
Transport: &ResponseBodyLimitedTransport{
RoundTripper: http.DefaultTransport,
LimitBytes: 1 << 20,
UserAgent: defaultUserAgent,
},
}
)

const (
mediaTypeJSON = "application/json"
mediaTypeNDJSON = "application/x-ndjson"
)

type client struct {
baseURL string
httpClient httpClient
validator record.Validator
clock clock.Clock

accepts string

peerID peer.ID
addrs []types.Multiaddr
identity crypto.PrivKey
Expand All @@ -50,21 +72,21 @@ type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}

type option func(*client)
type Option func(*client)

func WithIdentity(identity crypto.PrivKey) option {
func WithIdentity(identity crypto.PrivKey) Option {
return func(c *client) {
c.identity = identity
}
}

func WithHTTPClient(h httpClient) option {
func WithHTTPClient(h httpClient) Option {
return func(c *client) {
c.httpClient = h
}
}

func WithUserAgent(ua string) option {
func WithUserAgent(ua string) Option {
return func(c *client) {
if ua == "" {
return
Expand All @@ -81,7 +103,7 @@ func WithUserAgent(ua string) option {
}
}

func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) option {
func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) Option {
return func(c *client) {
c.peerID = peerID
for _, a := range addrs {
Expand All @@ -90,21 +112,21 @@ func WithProviderInfo(peerID peer.ID, addrs []multiaddr.Multiaddr) option {
}
}

func WithStreamResultsRequired() Option {
return func(c *client) {
c.accepts = mediaTypeNDJSON
}
}

// New creates a content routing API client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
func New(baseURL string, opts ...option) (*client, error) {
defaultHTTPClient := &http.Client{
Transport: &ResponseBodyLimitedTransport{
RoundTripper: http.DefaultTransport,
LimitBytes: 1 << 20,
UserAgent: defaultUserAgent,
},
}
func New(baseURL string, opts ...Option) (*client, error) {
client := &client{
baseURL: baseURL,
httpClient: defaultHTTPClient,
validator: ipns.Validator{},
clock: clock.New(),
accepts: strings.Join([]string{mediaTypeNDJSON, mediaTypeJSON}, ","),
}

for _, opt := range opts {
Expand All @@ -118,43 +140,100 @@ func New(baseURL string, opts ...option) (*client, error) {
return client, nil
}

func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs []types.ProviderResponse, err error) {
measurement := newMeasurement("FindProviders")
defer func() {
measurement.length = len(provs)
measurement.record(ctx)
}()
// measuringIter measures the length of the iter and then publishes metrics about the whole req once the iter is closed.
// Of course, if the caller forgets to close the iter, this won't publish anything.
type measuringIter[T any] struct {
iter.Iter[T]
ctx context.Context
m *measurement
}

func (c *measuringIter[T]) Next() bool {
c.m.length++
return c.Iter.Next()
}

func (c *measuringIter[T]) Val() T {
return c.Iter.Val()
}

func (c *measuringIter[T]) Close() error {
c.m.record(c.ctx)
return c.Iter.Close()
}

func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.ProviderResponse], err error) {
// TODO test measurements
m := newMeasurement("FindProviders")
guseggert marked this conversation as resolved.
Show resolved Hide resolved

url := c.baseURL + server.ProvidePath + key.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
measurement.host = req.Host
req.Header.Set("Accept", c.accepts)

m.host = req.Host

start := c.clock.Now()
resp, err := c.httpClient.Do(req)

measurement.err = err
measurement.latency = c.clock.Since(start)
m.err = err
m.latency = c.clock.Since(start)

if err != nil {
m.record(ctx)
return nil, err
}
defer resp.Body.Close()
guseggert marked this conversation as resolved.
Show resolved Hide resolved

measurement.statusCode = resp.StatusCode
m.statusCode = resp.StatusCode
if resp.StatusCode == http.StatusNotFound {
return nil, nil
resp.Body.Close()
m.record(ctx)
return iter.FromSlice[iter.Result[types.ProviderResponse]](nil), nil
}

if resp.StatusCode != http.StatusOK {
return nil, httpError(resp.StatusCode, resp.Body)
err := httpError(resp.StatusCode, resp.Body)
resp.Body.Close()
m.record(ctx)
return nil, err
}

respContentType := resp.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(respContentType)
if err != nil {
resp.Body.Close()
m.err = err
m.record(ctx)
return nil, fmt.Errorf("parsing Content-Type: %w", err)
}

m.mediaType = mediaType

var skipBodyClose bool
defer func() {
if !skipBodyClose {
resp.Body.Close()
}
}()

var it iter.ResultIter[types.ProviderResponse]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.ReadProvidersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
var sliceIt iter.Iter[types.ProviderResponse] = iter.FromSlice(parsedResp.Providers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewReadProvidersResponseIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")
}

parsedResp := &types.ReadProvidersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
return parsedResp.Providers, err
return &measuringIter[iter.Result[types.ProviderResponse]]{Iter: it, ctx: ctx, m: m}, nil
}

func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
Expand Down Expand Up @@ -202,7 +281,7 @@ func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Du

// ProvideAsync makes a provide request to a delegated router
func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapProviderRecord) (time.Duration, error) {
req := types.WriteProvidersRequest{Providers: []types.WriteProviderRecord{bswp}}
req := jsontypes.WriteProvidersRequest{Providers: []types.WriteProviderRecord{bswp}}

url := c.baseURL + server.ProvidePath

Expand All @@ -225,7 +304,7 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
if resp.StatusCode != http.StatusOK {
return 0, httpError(resp.StatusCode, resp.Body)
}
var provideResult types.WriteProvidersResponse
var provideResult jsontypes.WriteProvidersResponse
err = json.NewDecoder(resp.Body).Decode(&provideResult)
if err != nil {
return 0, err
Expand Down
Loading