From 79f30f2396a40b8548c7c5edaab5af371d727f72 Mon Sep 17 00:00:00 2001 From: Starnop Date: Wed, 29 May 2019 17:25:20 +0800 Subject: [PATCH] bugfix: add locker util and fix thread-unsafe problems for cdn Signed-off-by: Starnop --- supernode/daemon/mgr/cdn/file_meta_data.go | 18 +++++ supernode/daemon/mgr/cdn/manager.go | 5 ++ supernode/util/count_rw_mutex.go | 64 +++++++++++++++++ supernode/util/locker.go | 80 ++++++++++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 supernode/util/count_rw_mutex.go create mode 100644 supernode/util/locker.go diff --git a/supernode/daemon/mgr/cdn/file_meta_data.go b/supernode/daemon/mgr/cdn/file_meta_data.go index 2f25b1ac1..47df5f6fb 100644 --- a/supernode/daemon/mgr/cdn/file_meta_data.go +++ b/supernode/daemon/mgr/cdn/file_meta_data.go @@ -8,6 +8,7 @@ import ( cutil "github.com/dragonflyoss/Dragonfly/common/util" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/store" + "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/sirupsen/logrus" ) @@ -33,11 +34,13 @@ type fileMetaData struct { // fileMetaDataManager manages the meta file and md5 file of each taskID. type fileMetaDataManager struct { fileStore *store.Store + locker *util.LockerPool } func newFileMetaDataManager(store *store.Store) *fileMetaDataManager { return &fileMetaDataManager{ fileStore: store, + locker: util.NewLockerPool(), } } @@ -91,6 +94,9 @@ func (mm *fileMetaDataManager) readFileMetaData(ctx context.Context, taskID stri } func (mm *fileMetaDataManager) updateAccessTime(ctx context.Context, taskID string, accessTime int64) error { + mm.locker.GetLock(taskID, false) + defer mm.locker.ReleaseLock(taskID, false) + originMetaData, err := mm.readFileMetaData(ctx, taskID) if err != nil { return err @@ -109,6 +115,9 @@ func (mm *fileMetaDataManager) updateAccessTime(ctx context.Context, taskID stri } func (mm *fileMetaDataManager) updateLastModifiedAndETag(ctx context.Context, taskID string, lastModified int64, eTag string) error { + mm.locker.GetLock(taskID, false) + defer mm.locker.ReleaseLock(taskID, false) + originMetaData, err := mm.readFileMetaData(ctx, taskID) if err != nil { return err @@ -121,6 +130,9 @@ func (mm *fileMetaDataManager) updateLastModifiedAndETag(ctx context.Context, ta } func (mm *fileMetaDataManager) updateStatusAndResult(ctx context.Context, taskID string, metaData *fileMetaData) error { + mm.locker.GetLock(taskID, false) + defer mm.locker.ReleaseLock(taskID, false) + originMetaData, err := mm.readFileMetaData(ctx, taskID) if err != nil { return err @@ -143,6 +155,9 @@ func (mm *fileMetaDataManager) updateStatusAndResult(ctx context.Context, taskID // And it should append the fileMD5 which means that the md5 of the task file // and the SHA-1 digest of fileMD5 at the end of the file. func (mm *fileMetaDataManager) writePieceMD5s(ctx context.Context, taskID, fileMD5 string, pieceMD5s []string) error { + mm.locker.GetLock(taskID, false) + defer mm.locker.ReleaseLock(taskID, false) + if cutil.IsEmptySlice(pieceMD5s) { logrus.Warnf("failed to write empty pieceMD5s for taskID: %s", taskID) return nil @@ -163,6 +178,9 @@ func (mm *fileMetaDataManager) writePieceMD5s(ctx context.Context, taskID, fileM // readPieceMD5s read the md5 file of the taskID and returns the pieceMD5s. func (mm *fileMetaDataManager) readPieceMD5s(ctx context.Context, taskID, fileMD5 string) (pieceMD5s []string, err error) { + mm.locker.GetLock(taskID, true) + defer mm.locker.ReleaseLock(taskID, true) + bytes, err := mm.fileStore.GetBytes(ctx, getMd5DataRawFunc(taskID)) if err != nil { return nil, err diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index b06d1f870..e89064a7a 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -11,6 +11,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" "github.com/dragonflyoss/Dragonfly/supernode/store" + "github.com/dragonflyoss/Dragonfly/supernode/util" "github.com/sirupsen/logrus" ) @@ -22,6 +23,7 @@ type Manager struct { cfg *config.Config cacheStore *store.Store limiter *cutil.RateLimiter + cdnLocker *util.LockerPool progressManager mgr.ProgressMgr metaDataManager *fileMetaDataManager @@ -41,6 +43,7 @@ func NewManager(cfg *config.Config, cacheStore *store.Store, progressManager mgr cfg: cfg, cacheStore: cacheStore, limiter: rateLimiter, + cdnLocker: util.NewLockerPool(), progressManager: progressManager, metaDataManager: metaDataManager, pieceMD5Manager: pieceMD5Manager, @@ -57,6 +60,8 @@ func (cm *Manager) TriggerCDN(ctx context.Context, task *types.TaskInfo) (*types httpFileLength = -1 } + cm.cdnLocker.GetLock(task.ID, false) + defer cm.cdnLocker.ReleaseLock(task.ID, false) // detect Cache startPieceNum, metaData, err := cm.detector.detectCache(ctx, task) if err != nil { diff --git a/supernode/util/count_rw_mutex.go b/supernode/util/count_rw_mutex.go new file mode 100644 index 000000000..c1bdaef01 --- /dev/null +++ b/supernode/util/count_rw_mutex.go @@ -0,0 +1,64 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package util + +import ( + "sync" + + cutil "github.com/dragonflyoss/Dragonfly/common/util" +) + +type countRWMutex struct { + count *cutil.AtomicInt + sync.RWMutex +} + +func newCountRWMutex() *countRWMutex { + return &countRWMutex{ + count: cutil.NewAtomicInt(0), + } +} + +func (cr *countRWMutex) reset() { + cr.count.Set(0) +} + +func (cr *countRWMutex) increaseCount() int32 { + cr.count.Add(1) + return cr.count.Get() +} + +func (cr *countRWMutex) decreaseCount() int32 { + cr.count.Add(-1) + return cr.count.Get() +} + +func (cr *countRWMutex) lock(ro bool) { + if ro { + cr.RLock() + return + } + cr.Lock() +} + +func (cr *countRWMutex) unlock(ro bool) { + if ro { + cr.RUnlock() + return + } + cr.Unlock() +} diff --git a/supernode/util/locker.go b/supernode/util/locker.go new file mode 100644 index 000000000..37fe2a537 --- /dev/null +++ b/supernode/util/locker.go @@ -0,0 +1,80 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package util + +import ( + "sync" +) + +// LockerPool is a set of reader/writer mutual exclusion locks. +type LockerPool struct { + // use syncPool to cache allocated but unused *countRWMutex items for later reuse + syncPool *sync.Pool + + lockerMap map[string]*countRWMutex + sync.Mutex +} + +// NewLockerPool returns a *LockerPool with self-defined prefix. +func NewLockerPool() *LockerPool { + return &LockerPool{ + syncPool: &sync.Pool{ + New: func() interface{} { + return newCountRWMutex() + }, + }, + lockerMap: make(map[string]*countRWMutex), + } +} + +// GetLock locks key. +// If ro(readonly) is true, then it locks key for reading. +// Otherwise, locks key for writing. +func (l *LockerPool) GetLock(key string, ro bool) { + l.Lock() + + locker, ok := l.lockerMap[key] + if !ok { + locker = l.syncPool.Get().(*countRWMutex) + l.lockerMap[key] = locker + } + + locker.increaseCount() + l.Unlock() + + locker.lock(ro) +} + +// ReleaseLock unlocks key. +// If ro(readonly) is true, then it unlocks key for reading. +// Otherwise, unlocks key for writing. +func (l *LockerPool) ReleaseLock(key string, ro bool) { + l.Lock() + defer l.Unlock() + + locker, ok := l.lockerMap[key] + if !ok { + return + } + + locker.unlock(ro) + if locker.decreaseCount() < 1 { + locker.reset() + l.syncPool.Put(locker) + delete(l.lockerMap, key) + } +}