Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
bugfix: add locker util and fix thread-unsafe problems for cdn
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Jun 3, 2019
1 parent 28bc2ab commit 79f30f2
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 0 deletions.
18 changes: 18 additions & 0 deletions supernode/daemon/mgr/cdn/file_meta_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
cutil "github.com/dragonflyoss/Dragonfly/common/util"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/store"
"github.com/dragonflyoss/Dragonfly/supernode/util"

"github.com/sirupsen/logrus"
)
Expand All @@ -33,11 +34,13 @@ type fileMetaData struct {
// fileMetaDataManager manages the meta file and md5 file of each taskID.
type fileMetaDataManager struct {
fileStore *store.Store
locker *util.LockerPool
}

func newFileMetaDataManager(store *store.Store) *fileMetaDataManager {
return &fileMetaDataManager{
fileStore: store,
locker: util.NewLockerPool(),
}
}

Expand Down Expand Up @@ -91,6 +94,9 @@ func (mm *fileMetaDataManager) readFileMetaData(ctx context.Context, taskID stri
}

func (mm *fileMetaDataManager) updateAccessTime(ctx context.Context, taskID string, accessTime int64) error {
mm.locker.GetLock(taskID, false)
defer mm.locker.ReleaseLock(taskID, false)

originMetaData, err := mm.readFileMetaData(ctx, taskID)
if err != nil {
return err
Expand All @@ -109,6 +115,9 @@ func (mm *fileMetaDataManager) updateAccessTime(ctx context.Context, taskID stri
}

func (mm *fileMetaDataManager) updateLastModifiedAndETag(ctx context.Context, taskID string, lastModified int64, eTag string) error {
mm.locker.GetLock(taskID, false)
defer mm.locker.ReleaseLock(taskID, false)

originMetaData, err := mm.readFileMetaData(ctx, taskID)
if err != nil {
return err
Expand All @@ -121,6 +130,9 @@ func (mm *fileMetaDataManager) updateLastModifiedAndETag(ctx context.Context, ta
}

func (mm *fileMetaDataManager) updateStatusAndResult(ctx context.Context, taskID string, metaData *fileMetaData) error {
mm.locker.GetLock(taskID, false)
defer mm.locker.ReleaseLock(taskID, false)

originMetaData, err := mm.readFileMetaData(ctx, taskID)
if err != nil {
return err
Expand All @@ -143,6 +155,9 @@ func (mm *fileMetaDataManager) updateStatusAndResult(ctx context.Context, taskID
// And it should append the fileMD5 which means that the md5 of the task file
// and the SHA-1 digest of fileMD5 at the end of the file.
func (mm *fileMetaDataManager) writePieceMD5s(ctx context.Context, taskID, fileMD5 string, pieceMD5s []string) error {
mm.locker.GetLock(taskID, false)
defer mm.locker.ReleaseLock(taskID, false)

if cutil.IsEmptySlice(pieceMD5s) {
logrus.Warnf("failed to write empty pieceMD5s for taskID: %s", taskID)
return nil
Expand All @@ -163,6 +178,9 @@ func (mm *fileMetaDataManager) writePieceMD5s(ctx context.Context, taskID, fileM

// readPieceMD5s read the md5 file of the taskID and returns the pieceMD5s.
func (mm *fileMetaDataManager) readPieceMD5s(ctx context.Context, taskID, fileMD5 string) (pieceMD5s []string, err error) {
mm.locker.GetLock(taskID, true)
defer mm.locker.ReleaseLock(taskID, true)

bytes, err := mm.fileStore.GetBytes(ctx, getMd5DataRawFunc(taskID))
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions supernode/daemon/mgr/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
"github.com/dragonflyoss/Dragonfly/supernode/store"
"github.com/dragonflyoss/Dragonfly/supernode/util"

"github.com/sirupsen/logrus"
)
Expand All @@ -22,6 +23,7 @@ type Manager struct {
cfg *config.Config
cacheStore *store.Store
limiter *cutil.RateLimiter
cdnLocker *util.LockerPool
progressManager mgr.ProgressMgr

metaDataManager *fileMetaDataManager
Expand All @@ -41,6 +43,7 @@ func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr
cfg: cfg,
cacheStore: cacheStore,
limiter: rateLimiter,
cdnLocker: util.NewLockerPool(),
progressManager: progressManager,
metaDataManager: metaDataManager,
pieceMD5Manager: pieceMD5Manager,
Expand All @@ -57,6 +60,8 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types
httpFileLength = -1
}

cm.cdnLocker.GetLock(task.ID, false)
defer cm.cdnLocker.ReleaseLock(task.ID, false)
// detect Cache
startPieceNum, metaData, err := cm.detector.detectCache(ctx, task)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions supernode/util/count_rw_mutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package util

import (
"sync"

cutil "github.com/dragonflyoss/Dragonfly/common/util"
)

type countRWMutex struct {
count *cutil.AtomicInt
sync.RWMutex
}

func newCountRWMutex() *countRWMutex {
return &countRWMutex{
count: cutil.NewAtomicInt(0),
}
}

func (cr *countRWMutex) reset() {
cr.count.Set(0)
}

func (cr *countRWMutex) increaseCount() int32 {
cr.count.Add(1)
return cr.count.Get()
}

func (cr *countRWMutex) decreaseCount() int32 {
cr.count.Add(-1)
return cr.count.Get()
}

func (cr *countRWMutex) lock(ro bool) {
if ro {
cr.RLock()
return
}
cr.Lock()
}

func (cr *countRWMutex) unlock(ro bool) {
if ro {
cr.RUnlock()
return
}
cr.Unlock()
}
80 changes: 80 additions & 0 deletions supernode/util/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package util

import (
"sync"
)

// LockerPool is a set of reader/writer mutual exclusion locks.
type LockerPool struct {
// use syncPool to cache allocated but unused *countRWMutex items for later reuse
syncPool *sync.Pool

lockerMap map[string]*countRWMutex
sync.Mutex
}

// NewLockerPool returns a *LockerPool with self-defined prefix.
func NewLockerPool() *LockerPool {
return &LockerPool{
syncPool: &sync.Pool{
New: func() interface{} {
return newCountRWMutex()
},
},
lockerMap: make(map[string]*countRWMutex),
}
}

// GetLock locks key.
// If ro(readonly) is true, then it locks key for reading.
// Otherwise, locks key for writing.
func (l *LockerPool) GetLock(key string, ro bool) {
l.Lock()

locker, ok := l.lockerMap[key]
if !ok {
locker = l.syncPool.Get().(*countRWMutex)
l.lockerMap[key] = locker
}

locker.increaseCount()
l.Unlock()

locker.lock(ro)
}

// ReleaseLock unlocks key.
// If ro(readonly) is true, then it unlocks key for reading.
// Otherwise, unlocks key for writing.
func (l *LockerPool) ReleaseLock(key string, ro bool) {
l.Lock()
defer l.Unlock()

locker, ok := l.lockerMap[key]
if !ok {
return
}

locker.unlock(ro)
if locker.decreaseCount() < 1 {
locker.reset()
l.syncPool.Put(locker)
delete(l.lockerMap, key)
}
}

0 comments on commit 79f30f2

Please sign in to comment.