Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
Merge pull request #455 from Starnop/local-storage-lock
Browse files Browse the repository at this point in the history
bugfix: add context for the interface of storage driver
  • Loading branch information
lowzj authored Mar 25, 2019
2 parents 2e88654 + 55bcc20 commit 0ae9f70
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 39 deletions.
28 changes: 14 additions & 14 deletions supernode/store/local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package store

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sync"
"sync/atomic"
"time"

"github.com/dragonflyoss/Dragonfly/common/util"
)
Expand All @@ -43,7 +43,7 @@ type fileMutex struct {
sync.RWMutex
}

func getLock(key string, ro bool) {
func lock(key string, ro bool) {
v, _ := fileMutexLocker.LoadOrStore(key, &fileMutex{})
f := v.(*fileMutex)

Expand Down Expand Up @@ -104,13 +104,13 @@ func NewLocalStorage(config interface{}) (StorageDriver, error) {
}

// Get the content of key from storage and return in io stream.
func (ls *localStorage) Get(raw *Raw, writer io.Writer) error {
func (ls *localStorage) Get(ctx context.Context, raw *Raw, writer io.Writer) error {
path, _, err := ls.statPath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), true)
lock(getLockKey(path, raw.offset), true)
defer releaseLock(getLockKey(path, raw.offset), true)

f, err := os.Open(path)
Expand All @@ -133,13 +133,13 @@ func (ls *localStorage) Get(raw *Raw, writer io.Writer) error {
}

// GetBytes gets the content of key from storage and return in bytes.
func (ls *localStorage) GetBytes(raw *Raw) (data []byte, err error) {
func (ls *localStorage) GetBytes(ctx context.Context, raw *Raw) (data []byte, err error) {
path, _, err := ls.statPath(raw.key)
if err != nil {
return nil, err
}

getLock(getLockKey(path, raw.offset), true)
lock(getLockKey(path, raw.offset), true)
defer releaseLock(getLockKey(path, raw.offset), true)

f, err := os.Open(path)
Expand All @@ -163,13 +163,13 @@ func (ls *localStorage) GetBytes(raw *Raw) (data []byte, err error) {
}

// Put reads the content from reader and put it into storage.
func (ls *localStorage) Put(raw *Raw, data io.Reader) error {
func (ls *localStorage) Put(ctx context.Context, raw *Raw, data io.Reader) error {
path, err := ls.preparePath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), false)
lock(getLockKey(path, raw.offset), false)
defer releaseLock(getLockKey(path, raw.offset), false)

f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644)
Expand All @@ -187,13 +187,13 @@ func (ls *localStorage) Put(raw *Raw, data io.Reader) error {
}

// PutBytes puts the content of key from storage with bytes.
func (ls *localStorage) PutBytes(raw *Raw, data []byte) error {
func (ls *localStorage) PutBytes(ctx context.Context, raw *Raw, data []byte) error {
path, err := ls.preparePath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), false)
lock(getLockKey(path, raw.offset), false)
defer releaseLock(getLockKey(path, raw.offset), false)

f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644)
Expand All @@ -211,7 +211,7 @@ func (ls *localStorage) PutBytes(raw *Raw, data []byte) error {
}

// Stat determine whether the file exists.
func (ls *localStorage) Stat(raw *Raw) (*StorageInfo, error) {
func (ls *localStorage) Stat(ctx context.Context, raw *Raw) (*StorageInfo, error) {
path, fileInfo, err := ls.statPath(raw.key)
if err != nil {
return nil, err
Expand All @@ -230,13 +230,13 @@ func (ls *localStorage) Stat(raw *Raw) (*StorageInfo, error) {
}

// Remove deletes a file or dir.
func (ls *localStorage) Remove(raw *Raw) error {
func (ls *localStorage) Remove(ctx context.Context, raw *Raw) error {
path, _, err := ls.statPath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), false)
lock(getLockKey(path, raw.offset), false)
defer releaseLock(getLockKey(path, raw.offset), false)

if err := os.RemoveAll(path); err != nil {
Expand Down Expand Up @@ -274,7 +274,7 @@ func (ls *localStorage) statPath(key string) (string, os.FileInfo, error) {
}

func getLockKey(path string, offset int64) string {
return fmt.Sprintf("%s%d%d", path, offset, time.Now().Unix())
return fmt.Sprintf("%s%d", path, offset)
}

func getPrefix(str string) string {
Expand Down
15 changes: 8 additions & 7 deletions supernode/store/local_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package store

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -108,10 +109,10 @@ func (s *LocalStorageSuite) TestGetPutBytes(c *check.C) {

for _, v := range cases {
// put
s.storeLocal.PutBytes(v.raw, v.data)
s.storeLocal.PutBytes(context.Background(), v.raw, v.data)

// get
result, err := s.storeLocal.GetBytes(v.raw)
result, err := s.storeLocal.GetBytes(context.Background(), v.raw)
c.Assert(err, check.IsNil)
c.Assert(string(result), check.Equals, v.expected)

Expand Down Expand Up @@ -159,11 +160,11 @@ func (s *LocalStorageSuite) TestGetPut(c *check.C) {

for _, v := range cases {
// put
s.storeLocal.Put(v.raw, v.data)
s.storeLocal.Put(context.Background(), v.raw, v.data)

// get
buf1 := new(bytes.Buffer)
err := s.storeLocal.Get(v.raw, buf1)
err := s.storeLocal.Get(context.Background(), v.raw, buf1)
c.Assert(err, check.IsNil)
c.Assert(buf1.String(), check.Equals, v.expected)

Expand Down Expand Up @@ -195,7 +196,7 @@ func (s *LocalStorageSuite) TestGetPrefix(c *check.C) {
// helper function

func (s *LocalStorageSuite) checkStat(raw *Raw, c *check.C) {
info, err := s.storeLocal.Stat(raw)
info, err := s.storeLocal.Stat(context.Background(), raw)
c.Assert(err, check.IsNil)

cfg := s.storeLocal.config.(*localStorage)
Expand All @@ -212,9 +213,9 @@ func (s *LocalStorageSuite) checkStat(raw *Raw, c *check.C) {
}

func (s *LocalStorageSuite) checkRemove(raw *Raw, c *check.C) {
err := s.storeLocal.Remove(raw)
err := s.storeLocal.Remove(context.Background(), raw)
c.Assert(err, check.IsNil)

_, err = s.storeLocal.Stat(raw)
_, err = s.storeLocal.Stat(context.Background(), raw)
c.Assert(err, check.DeepEquals, ErrNotFound)
}
13 changes: 7 additions & 6 deletions supernode/store/storage_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package store

import (
"context"
"fmt"
"io"
"time"
Expand All @@ -42,31 +43,31 @@ type StorageDriver interface {
// The data should be written into the writer as io stream.
// If the length<=0, the driver should return all data from the raw.offest.
// Otherwise, just return the data which starts from raw.offset and the length is raw.length.
Get(raw *Raw, writer io.Writer) error
Get(ctx context.Context, raw *Raw, writer io.Writer) error

// Get data from the storage based on raw information.
// The data should be returned in bytes.
// If the length<=0, the storage driver should return all data from the raw.offest.
// Otherwise, just return the data which starts from raw.offset and the length is raw.length.
GetBytes(raw *Raw) ([]byte, error)
GetBytes(ctx context.Context, raw *Raw) ([]byte, error)

// Put the data into the storage with raw information.
// The storage will get data from io.Reader as io stream.
// If the offset>0, the storage driver should starting at byte raw.offset off.
Put(raw *Raw, data io.Reader) error
Put(ctx context.Context, raw *Raw, data io.Reader) error

// PutBytes puts the data into the storage with raw information.
// The data is passed in bytes.
// If the offset>0, the storage driver should starting at byte raw.offset off.
PutBytes(raw *Raw, data []byte) error
PutBytes(ctx context.Context, raw *Raw, data []byte) error

// Remove the data from the storage based on raw information.
Remove(raw *Raw) error
Remove(ctx context.Context, raw *Raw) error

// Stat determine whether the data exists based on raw information.
// If that, and return some info that in the form of struct StorageInfo.
// If not, return the ErrNotFound.
Stat(raw *Raw) (*StorageInfo, error)
Stat(ctx context.Context, raw *Raw) (*StorageInfo, error)
}

// Raw identifies a piece of data uniquely.
Expand Down
25 changes: 13 additions & 12 deletions supernode/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package store

import (
"context"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -54,53 +55,53 @@ func NewStore(name string, cfg interface{}) (*Store, error) {
}

// Get the data from the storage driver in io stream.
func (s *Store) Get(raw *Raw, writer io.Writer) error {
func (s *Store) Get(ctx context.Context, raw *Raw, writer io.Writer) error {
if err := isEmptyKey(raw.key); err != nil {
return err
}
return s.driver.Get(raw, writer)
return s.driver.Get(ctx, raw, writer)
}

// GetBytes gets the data from the storage driver in bytes.
func (s *Store) GetBytes(raw *Raw) ([]byte, error) {
func (s *Store) GetBytes(ctx context.Context, raw *Raw) ([]byte, error) {
if err := isEmptyKey(raw.key); err != nil {
return nil, err
}
return s.driver.GetBytes(raw)
return s.driver.GetBytes(ctx, raw)
}

// Put puts data into the storage in io stream.
func (s *Store) Put(raw *Raw, data io.Reader) error {
func (s *Store) Put(ctx context.Context, raw *Raw, data io.Reader) error {
if err := isEmptyKey(raw.key); err != nil {
return err
}
return s.driver.Put(raw, data)
return s.driver.Put(ctx, raw, data)
}

// PutBytes puts data into the storage in bytes.
func (s *Store) PutBytes(raw *Raw, data []byte) error {
func (s *Store) PutBytes(ctx context.Context, raw *Raw, data []byte) error {
if err := isEmptyKey(raw.key); err != nil {
return err
}
return s.driver.PutBytes(raw, data)
return s.driver.PutBytes(ctx, raw, data)
}

// Remove the data from the storage based on raw information.
func (s *Store) Remove(raw *Raw) error {
func (s *Store) Remove(ctx context.Context, raw *Raw) error {
if err := isEmptyKey(raw.key); err != nil {
return err
}
return s.driver.Remove(raw)
return s.driver.Remove(ctx, raw)
}

// Stat determine whether the data exists based on raw information.
// If that, and return some info that in the form of struct StorageInfo.
// If not, return the ErrNotFound.
func (s *Store) Stat(raw *Raw) (*StorageInfo, error) {
func (s *Store) Stat(ctx context.Context, raw *Raw) (*StorageInfo, error) {
if err := isEmptyKey(raw.key); err != nil {
return nil, err
}
return s.driver.Stat(raw)
return s.driver.Stat(ctx, raw)
}

func isEmptyKey(str string) error {
Expand Down

0 comments on commit 0ae9f70

Please sign in to comment.