Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
storage/fcds: address most of Petar's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
janos committed Dec 18, 2019
1 parent 1e94680 commit db658c7
Showing 1 changed file with 29 additions and 31 deletions.
60 changes: 29 additions & 31 deletions storage/fcds/fcds.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/ethersphere/swarm/log"

"github.com/ethersphere/swarm/chunk"
)

Expand All @@ -46,8 +48,8 @@ var _ Interface = new(Store)
// Number of files that store chunk data.
const shardCount = 32

// ErrDBClosed is returned if database is already closed.
var ErrDBClosed = errors.New("closed database")
// ErrStoreClosed is returned if store is already closed.
var ErrStoreClosed = errors.New("closed store")

// Store is the main FCDS implementation. It stores chunk data into
// a number of files partitioned by the last byte of the chunk address.
Expand Down Expand Up @@ -105,11 +107,10 @@ func New(path string, maxChunkSize int, metaStore MetaStore, opts ...Option) (s

// Get returns a chunk with data.
func (s *Store) Get(addr chunk.Address) (ch chunk.Chunk, err error) {
done, err := s.protect()
if err != nil {
if err := s.protect(); err != nil {
return nil, err
}
defer done()
defer s.unprotect()

sh := s.shards[getShard(addr)]
sh.mu.Lock()
Expand All @@ -132,11 +133,10 @@ func (s *Store) Get(addr chunk.Address) (ch chunk.Chunk, err error) {

// Has returns true if chunk is stored.
func (s *Store) Has(addr chunk.Address) (yes bool, err error) {
done, err := s.protect()
if err != nil {
if err := s.protect(); err != nil {
return false, err
}
defer done()
defer s.unprotect()

mu := s.shards[getShard(addr)].mu
mu.Lock()
Expand All @@ -154,11 +154,10 @@ func (s *Store) Has(addr chunk.Address) (yes bool, err error) {

// Put stores chunk data.
func (s *Store) Put(ch chunk.Chunk) (err error) {
done, err := s.protect()
if err != nil {
if err := s.protect(); err != nil {
return err
}
defer done()
defer s.unprotect()

addr := ch.Address()
data := ch.Data()
Expand Down Expand Up @@ -203,27 +202,21 @@ func (s *Store) Put(ch chunk.Chunk) (err error) {
// If offset is less then 0, no free offsets are available.
func (s *Store) getOffset(shard uint8) (offset int64, reclaimed bool, err error) {
if !s.shardHasFreeOffsets(shard) {
// shard does not have free offset
return -1, false, nil
}

offset = -1 // negative offset denotes no available free offset
offset = -1
if s.freeCache != nil {
// check if local cache has an offset
offset = s.freeCache.get(shard)
}

if offset < 0 {
// free cache did not return a free offset,
// check the meta store for one
offset, err = s.meta.FreeOffset(shard)
if err != nil {
return 0, false, err
}
}
if offset < 0 {
// meta store did not return a free offset,
// mark this shard that has no free offsets
s.markShardWithFreeOffsets(shard, false)
return -1, false, nil
}
Expand All @@ -233,11 +226,10 @@ func (s *Store) getOffset(shard uint8) (offset int64, reclaimed bool, err error)

// Delete removes chunk data.
func (s *Store) Delete(addr chunk.Address) (err error) {
done, err := s.protect()
if err != nil {
if err := s.protect(); err != nil {
return err
}
defer done()
defer s.unprotect()

shard := getShard(addr)
s.markShardWithFreeOffsets(shard, true)
Expand All @@ -263,11 +255,10 @@ func (s *Store) Count() (count int, err error) {

// Iterate iterates over stored chunks in no particular order.
func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error) {
done, err := s.protect()
if err != nil {
if err := s.protect(); err != nil {
return err
}
defer done()
defer s.unprotect()

for _, sh := range s.shards {
sh.mu.Lock()
Expand All @@ -289,22 +280,24 @@ func (s *Store) Iterate(fn func(chunk.Chunk) (stop bool, err error)) (err error)
}

// Close disables of further operations on the Store.
// Every call to its methods will return ErrDBClosed error.
// Every call to its methods will return ErrStoreClosed error.
// Close will wait for all running operations to finish before
// closing its MetaStore and returning.
func (s *Store) Close() (err error) {
s.quitOnce.Do(func() {
close(s.quit)
})

timeout := 15 * time.Second
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(15 * time.Second):
case <-time.After(timeout):
log.Debug("timeout on waiting chunk store parallel operations to finish", "timeout", timeout)
}

for _, sh := range s.shards {
Expand All @@ -318,17 +311,22 @@ func (s *Store) Close() (err error) {
// protect protects Store from executing operations
// after the Close method is called and makes sure
// that Close method will wait for all ongoing operations
// to finish before returning. Returned function done
// to finish before returning. Method unprotect done
// must be closed to unblock the Close method call.
func (s *Store) protect() (done func(), err error) {
func (s *Store) protect() (err error) {
select {
case <-s.quit:
// Store is closed.
return nil, ErrDBClosed
return ErrStoreClosed
default:
}
s.wg.Add(1)
return s.wg.Done, nil
return nil
}

// unprotect removes a protection set by the protect method
// allowing the Close method to unblock.
func (s *Store) unprotect() {
s.wg.Done()
}

// getMeta returns Meta information from MetaStore.
Expand Down

0 comments on commit db658c7

Please sign in to comment.