Skip to content

Commit

Permalink
minimal tus implmentation
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <[email protected]>
  • Loading branch information
butonic committed Apr 19, 2020
1 parent 92869da commit 0afd832
Show file tree
Hide file tree
Showing 10 changed files with 677 additions and 111 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/golang/protobuf v1.3.5
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/huandu/xstrings v1.3.0 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jedib0t/go-pretty v4.3.0+incompatible
Expand All @@ -31,12 +31,12 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/xattr v0.4.1
github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 // indirect
github.com/rs/cors v1.7.0
github.com/rs/zerolog v1.18.0
github.com/tus/tusd v1.1.0
go.opencensus.io v0.22.3
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
google.golang.org/grpc v1.28.1
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.27 // indirect
Expand Down
79 changes: 54 additions & 25 deletions go.sum

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -243,14 +244,30 @@ func (s *service) InitiateFileDownload(ctx context.Context, req *provider.Initia
func (s *service) InitiateFileUpload(ctx context.Context, req *provider.InitiateFileUploadRequest) (*provider.InitiateFileUploadResponse, error) {
// TODO(labkode): same considerations as download
log := appctx.GetLogger(ctx)
url := *s.dataServerURL
newRef, err := s.unwrap(ctx, req.Ref)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error unwrapping path"),
}, nil
}
url.Path = path.Join("/", url.Path, newRef.GetPath())
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil && req.Opaque.Map["Upload-Length"] != nil {
var err error
uploadLength, err = strconv.ParseInt(string(req.Opaque.Map["Upload-Length"].Value), 10, 64)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error parsing upload length"),
}, nil
}
}
uploadID, err := s.storage.InitiateUpload(ctx, newRef, uploadLength)
if err != nil {
return &provider.InitiateFileUploadResponse{
Status: status.NewInternal(ctx, err, "error getting upload id"),
}, nil
}
url := *s.dataServerURL
url.Path = path.Join("/", url.Path, uploadID)
log.Info().Str("data-server", url.String()).
Str("fn", req.Ref.GetPath()).
Str("xs", fmt.Sprintf("%+v", s.conf.AvailableXS)).
Expand Down
130 changes: 93 additions & 37 deletions internal/http/services/dataprovider/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,23 @@ package dataprovider
import (
"fmt"
"net/http"
"os"

"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/rhttp/global"
"github.com/cs3org/reva/pkg/storage"
"github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
tusd "github.com/tus/tusd/pkg/handler"
)

func init() {
global.Register("dataprovider", New)
}

type config struct {
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
TmpFolder string `mapstructure:"tmp_folder"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
Prefix string `mapstructure:"prefix"`
Driver string `mapstructure:"driver"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
}

type svc struct {
Expand All @@ -54,18 +53,6 @@ func New(m map[string]interface{}) (global.Service, error) {
return nil, err
}

if conf.Prefix == "" {
conf.Prefix = "data"
}

if conf.TmpFolder == "" {
conf.TmpFolder = os.TempDir()
}

if err := os.MkdirAll(conf.TmpFolder, 0755); err != nil {
return nil, errors.Wrap(err, "could not create tmp dir")
}

fs, err := getFS(conf)
if err != nil {
return nil, err
Expand All @@ -75,8 +62,8 @@ func New(m map[string]interface{}) (global.Service, error) {
storage: fs,
conf: conf,
}
s.setHandler()
return s, nil
err = s.setHandler()
return s, err
}

// Close performs cleanup.
Expand All @@ -88,6 +75,12 @@ func (s *svc) Unprotected() []string {
return []string{}
}

// Create a new DataStore instance which is responsible for
// storing the uploaded file on disk in the specified directory.
// This path _must_ exist before tusd will store uploads in it.
// If you want to save them on a different medium, for example
// a remote FTP server, you can implement your own storage backend
// by implementing the tusd.DataStore interface.
func getFS(c *config) (storage.FS, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
Expand All @@ -103,24 +96,87 @@ func (s *svc) Handler() http.Handler {
return s.handler
}

func (s *svc) setHandler() {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
// Composable is the interface that a struct needs to implement to be composable by this composer
type Composable interface {
UseIn(composer *tusd.StoreComposer)
}

func (s *svc) setHandler() (err error) {
composable, ok := s.storage.(Composable)
if ok {
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store but you may plug in multiple.
composer := tusd.NewStoreComposer()
// TODO use Terminator
// TODO use Locker
// TODO use Concater
// TODO use LenghtDeferrer
composable.UseIn(composer)

//logger := log.New(os.Stdout, "tusd ", log.Ldate|log.Ltime|log.Lshortfile)

config := tusd.Config{
BasePath: s.conf.Prefix,
StoreComposer: composer,
//Logger: logger,
}

handler, err := tusd.NewUnroutedHandler(config)
if err != nil {
return err
}
})

s.handler = handler.Middleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

log := appctx.GetLogger(r.Context())
log.Info().Msgf("tusd routing: path=%s", r.URL.Path)

switch r.Method {
// old fashioned download.

// GET is not part of the tus.io protocol
// currently there is no way to GET an upload that is in progress
// TODO allow range based get requests? that end before the current offset
case "GET":
s.doGet(w, r)

// tus.io based upload

// uploads are initiated using the CS3 APIs Initiate Download call
case "POST":
handler.PostFile(w, r)
case "HEAD":
handler.HeadFile(w, r)
case "PATCH":
handler.PatchFile(w, r)
// TODO Only attach the DELETE handler if the Terminate() method is provided
case "DELETE":
handler.DelFile(w, r)
}
}))
} else {
s.handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "HEAD":
addCorsHeader(w)
w.WriteHeader(http.StatusOK)
return
case "GET":
s.doGet(w, r)
return
case "PUT":
s.doPut(w, r)
return
default:
w.WriteHeader(http.StatusNotImplemented)
return
}
})
}

return err
}

func addCorsHeader(res http.ResponseWriter) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/storage/fs/eos/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2018-2020 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package eos

import (
"context"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
)

// InitiateUpload returns an upload id that can be used for uploads with tus
func (fs *eosfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64) (uploadID string, err error) {
return "", errtypes.NotSupported("op not supported")
}
31 changes: 31 additions & 0 deletions pkg/storage/fs/local/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2018-2020 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package local

import (
"context"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/errtypes"
)

// InitiateUpload returns an upload id that can be used for uploads with tus
func (fs *localfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64) (uploadID string, err error) {
return "", errtypes.NotSupported("op not supported")
}
59 changes: 15 additions & 44 deletions pkg/storage/fs/owncloud/owncloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ func (fs *ocfs) getVersionsPath(ctx context.Context, np string) string {

}

func (fs *ocfs) getUploadPath(ctx context.Context, uploadID string) (string, error) {
u, ok := user.ContextGetUser(ctx)
if !ok {
err := errors.Wrap(errtypes.UserRequired("userrequired"), "error getting user from ctx")
return "", err
}
return path.Join(fs.c.DataDirectory, u.Username, "uploads", uploadID), nil
}

// ownloud stores trashed items in the files_trashbin subfolder of a users home
func (fs *ocfs) getRecyclePath(ctx context.Context) (string, error) {
u, ok := user.ContextGetUser(ctx)
Expand Down Expand Up @@ -1240,50 +1249,9 @@ func (fs *ocfs) ListFolder(ctx context.Context, ref *provider.Reference) ([]*pro
return finfos, nil
}

func (fs *ocfs) Upload(ctx context.Context, ref *provider.Reference, r io.ReadCloser) error {
np, err := fs.resolve(ctx, ref)
if err != nil {
return errors.Wrap(err, "ocfs: error resolving reference")
}

// we cannot rely on /tmp as it can live in another partition and we can
// hit invalid cross-device link errors, so we create the tmp file in the same directory
// the file is supposed to be written.
tmp, err := ioutil.TempFile(path.Dir(np), "._reva_atomic_upload")
if err != nil {
return errors.Wrap(err, "ocfs: error creating tmp fn at "+path.Dir(np))
}
defer os.RemoveAll(tmp.Name())

_, err = io.Copy(tmp, r)
tmp.Close()
if err != nil {
return errors.Wrap(err, "ocfs: error writing to tmp file "+tmp.Name())
}

// if destination exists
if _, err := os.Stat(np); err == nil {
// copy attributes of existing file to tmp file
if err := fs.copyMD(np, tmp.Name()); err != nil {
return errors.Wrap(err, "ocfs: error copying metadata from "+np+" to "+tmp.Name())
}
// create revision
if err := fs.archiveRevision(ctx, np); err != nil {
return err
}
}

// TODO(jfd): make sure rename is atomic, missing fsync ...
if err := os.Rename(tmp.Name(), np); err != nil {
return errors.Wrap(err, "ocfs: error renaming from "+tmp.Name()+" to "+np)
}

return nil
}

func (fs *ocfs) archiveRevision(ctx context.Context, np string) error {
func (fs *ocfs) archiveRevision(ctx context.Context, vbp string, np string) error {
// move existing file to versions dir
vp := fmt.Sprintf("%s.v%d", fs.getVersionsPath(ctx, np), time.Now().Unix())
vp := fmt.Sprintf("%s.v%d", vbp, time.Now().Unix())
if err := os.MkdirAll(path.Dir(vp), 0700); err != nil {
return errors.Wrap(err, "ocfs: error creating versions dir "+vp)
}
Expand Down Expand Up @@ -1403,7 +1371,7 @@ func (fs *ocfs) RestoreRevision(ctx context.Context, ref *provider.Reference, re
defer source.Close()

// destination should be available, otherwise we could not have navigated to its revisions
if err := fs.archiveRevision(ctx, np); err != nil {
if err := fs.archiveRevision(ctx, fs.getVersionsPath(ctx, np), np); err != nil {
return err
}

Expand Down Expand Up @@ -1558,3 +1526,6 @@ func (fs *ocfs) RestoreRecycleItem(ctx context.Context, key string) error {
// TODO(jfd) restore versions
return nil
}

// TODO propagate etag and mtime or append event to history? propagate on disk ...
// - but propagation is a separate task. only if upload was successful ...
Loading

0 comments on commit 0afd832

Please sign in to comment.