Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minimal tus implementation #669

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions cmd/reva/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"fmt"
"io"
"math"
"net/http"
"os"
"path/filepath"

"github.com/cheggaaa/pb"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"

"github.com/eventials/go-tus"
"github.com/eventials/go-tus/memorystore"

// TODO(labkode): this should not come from this package.
"github.com/cs3org/reva/internal/grpc/services/storageprovider"
"github.com/cs3org/reva/pkg/crypto"
Expand Down Expand Up @@ -65,7 +68,7 @@ func uploadCommand() *command {

fmt.Printf("Local file size: %d bytes\n", md.Size())

client, err := getClient()
gwc, err := getClient()
if err != nil {
return err
}
Expand All @@ -78,7 +81,7 @@ func uploadCommand() *command {
},
}

res, err := client.InitiateFileUpload(ctx, req)
res, err := gwc.InitiateFileUpload(ctx, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -113,39 +116,51 @@ func uploadCommand() *command {
bar.Start()
reader := bar.NewProxyReader(fd)

httpReq, err := rhttp.NewRequest(ctx, "PUT", dataServerURL, reader)
// create the tus client.
c := tus.DefaultConfig()
c.Resume = true
c.HttpClient = rhttp.GetHTTPClient(ctx)
c.Store, err = memorystore.NewMemoryStore()
if err != nil {
return err
}
c.Header.Add("X-Reva-Transfer", res.Token)
tusc, err := tus.NewClient(dataServerURL, c)
if err != nil {
return err
}

metadata := map[string]string{
"filename": filepath.Base(target),
"dir": filepath.Dir(target),
"checksum": fmt.Sprintf("%s %s", storageprovider.GRPC2PKGXS(xsType).String(), xs),
}

fingerprint := fmt.Sprintf("%s-%d-%s-%s", md.Name(), md.Size(), md.ModTime(), xs)

httpReq.Header.Set("X-Reva-Transfer", res.Token)
q := httpReq.URL.Query()
q.Add("xs", xs)
q.Add("xs_type", storageprovider.GRPC2PKGXS(xsType).String())
httpReq.URL.RawQuery = q.Encode()
// create an upload from a file.
upload := tus.NewUpload(reader, md.Size(), metadata, fingerprint)

httpClient := rhttp.GetHTTPClient(ctx)
// create the uploader.
c.Store.Set(upload.Fingerprint, dataServerURL)
uploader := tus.NewUploader(tusc, dataServerURL, upload, 0)

httpRes, err := httpClient.Do(httpReq)
// start the uploading process.
err = uploader.Upload()
if err != nil {
return err
}
defer httpRes.Body.Close()

bar.Finish()

if httpRes.StatusCode != http.StatusOK {
return err
}

req2 := &provider.StatRequest{
Ref: &provider.Reference{
Spec: &provider.Reference_Path{
Path: target,
},
},
}
res2, err := client.Stat(ctx, req2)
res2, err := gwc.Stat(ctx, req2)
if err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/go-cs3apis v0.0.0-20200408065125-6e23f3ecec0a
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/eventials/go-tus v0.0.0-20190617130015-9db47421f6a0
github.com/fatih/color v1.7.0 // indirect
github.com/go-openapi/strfmt v0.19.2 // indirect
github.com/gofrs/uuid v3.2.0+incompatible
github.com/golang/protobuf v1.3.5
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/huandu/xstrings v1.3.0 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jedib0t/go-pretty v4.3.0+incompatible
Expand All @@ -31,12 +32,12 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.1
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 // indirect
github.com/rs/cors v1.7.0
github.com/rs/zerolog v1.18.0
github.com/tus/tusd v1.1.0
go.opencensus.io v0.22.3
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
google.golang.org/grpc v1.28.1
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.27 // indirect
Expand All @@ -45,3 +46,5 @@ require (
)

go 1.13

replace github.com/eventials/go-tus => github.com/andrewmostello/go-tus v0.0.0-20200314041820-904a9904af9a
92 changes: 67 additions & 25 deletions go.sum

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -243,14 +244,30 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia
func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
// TODO(labkode): same considerations as download
log := appctx.GetLogger(ctx)
url := *s.dataServerURL
newRef, err := s.unwrap(ctx, req.Ref)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error unwrapping path"),
}, nil
}
url.Path = path.Join("/", url.Path, newRef.GetPath())
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil && req.Opaque.Map["Upload-Length"] != nil {
var err error
uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing upload length"),
}, nil
}
}
uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error getting upload id"),
}, nil
}
url := *s.dataServerURL
url.Path = path.Join("/", url.Path, uploadID)
log.Info().Str("data-server", url.String()).
Str("fn", req.Ref.GetPath()).
Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)).
Expand Down
126 changes: 89 additions & 37 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ package dataprovider
import (
"fmt"
"net/http"
"os"

"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp/global"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
)

func init() {
global.Register("dataprovider", New)
}

type config struct {
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
TmpFolder string `mapstructure:"tmp_folder"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
}

type svc struct {
Expand All @@ -54,18 +53,6 @@ func New(m map[string]interface{}) (global.Service, error) {
return nil, err
}

if conf.Prefix == "" {
conf.Prefix = "data"
}

if conf.TmpFolder == "" {
conf.TmpFolder = os.TempDir()
}

if err := os.MkdirAll(conf.TmpFolder, 0755); err != nil {
return nil, errors.Wrap(err, "could not create tmp dir")
}

fs, err := getFS(conf)
if err != nil {
return nil, err
Expand All @@ -75,8 +62,8 @@ func New(m map[string]interface{}) (global.Service, error) {
storage: fs,
conf: conf,
}
s.setHandler()
return s, nil
err = s.setHandler()
return s, err
}

// Close performs cleanup.
Expand All @@ -88,6 +75,12 @@ func (s *svc) Unprotected() []string {
return []string{}
}

// Create a new DataStore instance which is responsible for
// storing the uploaded file on disk in the specified directory.
// This path _must_ exist before tusd will store uploads in it.
// If you want to save them on a different medium, for example
// a remote FTP server, you can implement your own storage backend
// by implementing the tusd.DataStore interface.
func getFS(c *config) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
Expand All @@ -103,24 +96,83 @@ func (s *svc) Handler() http.Handler {
return s.handler
}

func (s *svc) setHandler() {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
// Composable is the interface that a struct needs to implement to be composable by this composer
type Composable interface {
UseIn(composer *tusd.StoreComposer)
}

func (s *svc) setHandler() (err error) {
composable, ok := s.storage.(Composable)
if ok {
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()

// let the composable storage tell tus which extensions it supports
composable.UseIn(composer)

config := tusd.Config{
BasePath: s.conf.Prefix,
StoreComposer: composer,
//Logger: logger, // TODO use logger
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return err
}
})

s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

switch r.Method {
// old fashioned download.

// GET is not part of the tus.io protocol
// currently there is no way to GET an upload that is in progress
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)

// tus.io based upload

// uploads are initiated using the CS3 APIs Initiate Download call
case "POST":
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
handler.PatchFile(w, r)
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
handler.DelFile(w, r)
}
}))
} else {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})
}

return err
}

func addCorsHeader(res http.ResponseWriter) {
Expand Down
18 changes: 12 additions & 6 deletions pkg/eosclient/eosclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,8 @@ func (c *Client) Read(ctx context.Context, username, path string) (io.ReadCloser
return os.Open(localTarget)
}

// Write writes a file to the mgm
// Write writes a stream to the mgm
func (c *Client) Write(ctx context.Context, username, path string, stream io.ReadCloser) error {
unixUser, err := c.getUnixUser(username)
if err != nil {
return err
}
fd, err := ioutil.TempFile(c.opt.CacheDirectory, "eoswrite-")
if err != nil {
return err
Expand All @@ -609,8 +605,18 @@ func (c *Client) Write(ctx context.Context, username, path string, stream io.Rea
if err != nil {
return err
}

return c.WriteFile(ctx, username, path, fd.Name())
}

// WriteFile writes an existing file to the mgm
func (c *Client) WriteFile(ctx context.Context, username, path, source string) error {
unixUser, err := c.getUnixUser(username)
if err != nil {
return err
}
xrdPath := fmt.Sprintf("%s//%s", c.opt.URL, path)
cmd := exec.CommandContext(ctx, c.opt.XrdcopyBinary, "--nopbar", "--silent", "-f", fd.Name(), xrdPath, fmt.Sprintf("-ODeos.ruid=%s&eos.rgid=%s", unixUser.Uid, unixUser.Gid))
cmd := exec.CommandContext(ctx, c.opt.XrdcopyBinary, "--nopbar", "--silent", "-f", source, xrdPath, fmt.Sprintf("-ODeos.ruid=%s&eos.rgid=%s", unixUser.Uid, unixUser.Gid))
_, _, err = c.execute(ctx, cmd)
return err
}
Expand Down
Loading