diff --git a/supernode/store/local_storage.go b/supernode/store/local_storage.go index f0b479f11..3ca5945cd 100644 --- a/supernode/store/local_storage.go +++ b/supernode/store/local_storage.go @@ -17,6 +17,7 @@ package store import ( + "context" "fmt" "io" "io/ioutil" @@ -24,7 +25,6 @@ import ( "path" "sync" "sync/atomic" - "time" "github.com/dragonflyoss/Dragonfly/common/util" ) @@ -43,7 +43,7 @@ type fileMutex struct { sync.RWMutex } -func getLock(key string, ro bool) { +func lock(key string, ro bool) { v, _ := fileMutexLocker.LoadOrStore(key, &fileMutex{}) f := v.(*fileMutex) @@ -104,13 +104,13 @@ func NewLocalStorage(config interface{}) (StorageDriver, error) { } // Get the content of key from storage and return in io stream. -func (ls *localStorage) Get(raw *Raw, writer io.Writer) error { +func (ls *localStorage) Get(ctx context.Context, raw *Raw, writer io.Writer) error { path, _, err := ls.statPath(raw.key) if err != nil { return err } - getLock(getLockKey(path, raw.offset), true) + lock(getLockKey(path, raw.offset), true) defer releaseLock(getLockKey(path, raw.offset), true) f, err := os.Open(path) @@ -133,13 +133,13 @@ func (ls *localStorage) Get(raw *Raw, writer io.Writer) error { } // GetBytes gets the content of key from storage and return in bytes. -func (ls *localStorage) GetBytes(raw *Raw) (data []byte, err error) { +func (ls *localStorage) GetBytes(ctx context.Context, raw *Raw) (data []byte, err error) { path, _, err := ls.statPath(raw.key) if err != nil { return nil, err } - getLock(getLockKey(path, raw.offset), true) + lock(getLockKey(path, raw.offset), true) defer releaseLock(getLockKey(path, raw.offset), true) f, err := os.Open(path) @@ -163,13 +163,13 @@ func (ls *localStorage) GetBytes(raw *Raw) (data []byte, err error) { } // Put reads the content from reader and put it into storage. -func (ls *localStorage) Put(raw *Raw, data io.Reader) error { +func (ls *localStorage) Put(ctx context.Context, raw *Raw, data io.Reader) error { path, err := ls.preparePath(raw.key) if err != nil { return err } - getLock(getLockKey(path, raw.offset), false) + lock(getLockKey(path, raw.offset), false) defer releaseLock(getLockKey(path, raw.offset), false) f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644) @@ -187,13 +187,13 @@ func (ls *localStorage) Put(raw *Raw, data io.Reader) error { } // PutBytes puts the content of key from storage with bytes. -func (ls *localStorage) PutBytes(raw *Raw, data []byte) error { +func (ls *localStorage) PutBytes(ctx context.Context, raw *Raw, data []byte) error { path, err := ls.preparePath(raw.key) if err != nil { return err } - getLock(getLockKey(path, raw.offset), false) + lock(getLockKey(path, raw.offset), false) defer releaseLock(getLockKey(path, raw.offset), false) f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644) @@ -211,7 +211,7 @@ func (ls *localStorage) PutBytes(raw *Raw, data []byte) error { } // Stat determine whether the file exists. -func (ls *localStorage) Stat(raw *Raw) (*StorageInfo, error) { +func (ls *localStorage) Stat(ctx context.Context, raw *Raw) (*StorageInfo, error) { path, fileInfo, err := ls.statPath(raw.key) if err != nil { return nil, err @@ -230,13 +230,13 @@ func (ls *localStorage) Stat(raw *Raw) (*StorageInfo, error) { } // Remove deletes a file or dir. -func (ls *localStorage) Remove(raw *Raw) error { +func (ls *localStorage) Remove(ctx context.Context, raw *Raw) error { path, _, err := ls.statPath(raw.key) if err != nil { return err } - getLock(getLockKey(path, raw.offset), false) + lock(getLockKey(path, raw.offset), false) defer releaseLock(getLockKey(path, raw.offset), false) if err := os.RemoveAll(path); err != nil { @@ -274,7 +274,7 @@ func (ls *localStorage) statPath(key string) (string, os.FileInfo, error) { } func getLockKey(path string, offset int64) string { - return fmt.Sprintf("%s%d%d", path, offset, time.Now().Unix()) + return fmt.Sprintf("%s%d", path, offset) } func getPrefix(str string) string { diff --git a/supernode/store/local_storage_test.go b/supernode/store/local_storage_test.go index ed99af682..941896045 100644 --- a/supernode/store/local_storage_test.go +++ b/supernode/store/local_storage_test.go @@ -18,6 +18,7 @@ package store import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -108,10 +109,10 @@ func (s *LocalStorageSuite) TestGetPutBytes(c *check.C) { for _, v := range cases { // put - s.storeLocal.PutBytes(v.raw, v.data) + s.storeLocal.PutBytes(context.Background(), v.raw, v.data) // get - result, err := s.storeLocal.GetBytes(v.raw) + result, err := s.storeLocal.GetBytes(context.Background(), v.raw) c.Assert(err, check.IsNil) c.Assert(string(result), check.Equals, v.expected) @@ -159,11 +160,11 @@ func (s *LocalStorageSuite) TestGetPut(c *check.C) { for _, v := range cases { // put - s.storeLocal.Put(v.raw, v.data) + s.storeLocal.Put(context.Background(), v.raw, v.data) // get buf1 := new(bytes.Buffer) - err := s.storeLocal.Get(v.raw, buf1) + err := s.storeLocal.Get(context.Background(), v.raw, buf1) c.Assert(err, check.IsNil) c.Assert(buf1.String(), check.Equals, v.expected) @@ -195,7 +196,7 @@ func (s *LocalStorageSuite) TestGetPrefix(c *check.C) { // helper function func (s *LocalStorageSuite) checkStat(raw *Raw, c *check.C) { - info, err := s.storeLocal.Stat(raw) + info, err := s.storeLocal.Stat(context.Background(), raw) c.Assert(err, check.IsNil) cfg := s.storeLocal.config.(*localStorage) @@ -212,9 +213,9 @@ func (s *LocalStorageSuite) checkStat(raw *Raw, c *check.C) { } func (s *LocalStorageSuite) checkRemove(raw *Raw, c *check.C) { - err := s.storeLocal.Remove(raw) + err := s.storeLocal.Remove(context.Background(), raw) c.Assert(err, check.IsNil) - _, err = s.storeLocal.Stat(raw) + _, err = s.storeLocal.Stat(context.Background(), raw) c.Assert(err, check.DeepEquals, ErrNotFound) } diff --git a/supernode/store/storage_driver.go b/supernode/store/storage_driver.go index df01e9449..97eeaecc8 100644 --- a/supernode/store/storage_driver.go +++ b/supernode/store/storage_driver.go @@ -17,6 +17,7 @@ package store import ( + "context" "fmt" "io" "time" @@ -42,31 +43,31 @@ type StorageDriver interface { // The data should be written into the writer as io stream. // If the length<=0, the driver should return all data from the raw.offest. // Otherwise, just return the data which starts from raw.offset and the length is raw.length. - Get(raw *Raw, writer io.Writer) error + Get(ctx context.Context, raw *Raw, writer io.Writer) error // Get data from the storage based on raw information. // The data should be returned in bytes. // If the length<=0, the storage driver should return all data from the raw.offest. // Otherwise, just return the data which starts from raw.offset and the length is raw.length. - GetBytes(raw *Raw) ([]byte, error) + GetBytes(ctx context.Context, raw *Raw) ([]byte, error) // Put the data into the storage with raw information. // The storage will get data from io.Reader as io stream. // If the offset>0, the storage driver should starting at byte raw.offset off. - Put(raw *Raw, data io.Reader) error + Put(ctx context.Context, raw *Raw, data io.Reader) error // PutBytes puts the data into the storage with raw information. // The data is passed in bytes. // If the offset>0, the storage driver should starting at byte raw.offset off. - PutBytes(raw *Raw, data []byte) error + PutBytes(ctx context.Context, raw *Raw, data []byte) error // Remove the data from the storage based on raw information. - Remove(raw *Raw) error + Remove(ctx context.Context, raw *Raw) error // Stat determine whether the data exists based on raw information. // If that, and return some info that in the form of struct StorageInfo. // If not, return the ErrNotFound. - Stat(raw *Raw) (*StorageInfo, error) + Stat(ctx context.Context, raw *Raw) (*StorageInfo, error) } // Raw identifies a piece of data uniquely. diff --git a/supernode/store/store.go b/supernode/store/store.go index f6760b2dc..0c0ceb43d 100644 --- a/supernode/store/store.go +++ b/supernode/store/store.go @@ -17,6 +17,7 @@ package store import ( + "context" "fmt" "io" "strings" @@ -54,53 +55,53 @@ func NewStore(name string, cfg interface{}) (*Store, error) { } // Get the data from the storage driver in io stream. -func (s *Store) Get(raw *Raw, writer io.Writer) error { +func (s *Store) Get(ctx context.Context, raw *Raw, writer io.Writer) error { if err := isEmptyKey(raw.key); err != nil { return err } - return s.driver.Get(raw, writer) + return s.driver.Get(ctx, raw, writer) } // GetBytes gets the data from the storage driver in bytes. -func (s *Store) GetBytes(raw *Raw) ([]byte, error) { +func (s *Store) GetBytes(ctx context.Context, raw *Raw) ([]byte, error) { if err := isEmptyKey(raw.key); err != nil { return nil, err } - return s.driver.GetBytes(raw) + return s.driver.GetBytes(ctx, raw) } // Put puts data into the storage in io stream. -func (s *Store) Put(raw *Raw, data io.Reader) error { +func (s *Store) Put(ctx context.Context, raw *Raw, data io.Reader) error { if err := isEmptyKey(raw.key); err != nil { return err } - return s.driver.Put(raw, data) + return s.driver.Put(ctx, raw, data) } // PutBytes puts data into the storage in bytes. -func (s *Store) PutBytes(raw *Raw, data []byte) error { +func (s *Store) PutBytes(ctx context.Context, raw *Raw, data []byte) error { if err := isEmptyKey(raw.key); err != nil { return err } - return s.driver.PutBytes(raw, data) + return s.driver.PutBytes(ctx, raw, data) } // Remove the data from the storage based on raw information. -func (s *Store) Remove(raw *Raw) error { +func (s *Store) Remove(ctx context.Context, raw *Raw) error { if err := isEmptyKey(raw.key); err != nil { return err } - return s.driver.Remove(raw) + return s.driver.Remove(ctx, raw) } // Stat determine whether the data exists based on raw information. // If that, and return some info that in the form of struct StorageInfo. // If not, return the ErrNotFound. -func (s *Store) Stat(raw *Raw) (*StorageInfo, error) { +func (s *Store) Stat(ctx context.Context, raw *Raw) (*StorageInfo, error) { if err := isEmptyKey(raw.key); err != nil { return nil, err } - return s.driver.Stat(raw) + return s.driver.Stat(ctx, raw) } func isEmptyKey(str string) error {