Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feature: add gc mgr for supernode
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Aug 16, 2019
1 parent c476d46 commit 564290c
Show file tree
Hide file tree
Showing 33 changed files with 756 additions and 174 deletions.
2 changes: 2 additions & 0 deletions cmd/supernode/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func runSuperNode() error {
// set up the CIDPrefix
cfg.SetCIDPrefix(cfg.AdvertiseIP)

logrus.Debugf("get supernode config: %+v", cfg)

logrus.Info("start to run supernode")

d, err := daemon.New(cfg)
Expand Down
4 changes: 2 additions & 2 deletions dfget/core/downloader/p2p_downloader/power_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (pc *PowerClient) Run() error {
pc.readCost.Seconds(), pc.total)

if err != nil {
logrus.Errorf("read piece cont error:%v from dst:%s:%d, wait 20 ms",
err, pc.pieceTask.PeerIP, pc.pieceTask.PeerPort)
logrus.Errorf("failed to read piece cont(%s) from dst:%s:%d, wait 20 ms: %v",
pc.pieceTask.Range, pc.pieceTask.PeerIP, pc.pieceTask.PeerPort, err)
time.AfterFunc(time.Millisecond*20, func() {
pc.queue.Put(pc.failPiece())
})
Expand Down
57 changes: 43 additions & 14 deletions pkg/syncmap/syncmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package syncmap
import (
"strconv"
"sync"
"time"

"github.com/dragonflyoss/Dragonfly/pkg/atomiccount"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand Down Expand Up @@ -60,91 +61,119 @@ func (mmap *SyncMap) Get(key string) (interface{}, error) {
return v, nil
}

return nil, errors.Wrapf(errortypes.ErrDataNotFound, "key: %s", key)
return nil, errors.Wrapf(errortypes.ErrDataNotFound, "failed to get key %s from map", key)
}

// GetAsBitset returns result as *bitset.BitSet.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsBitset(key string) (*bitset.BitSet, error) {
v, err := mmap.Get(key)
if err != nil {
return nil, errors.Wrapf(err, "key: %s", key)
return nil, errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(*bitset.BitSet); ok {
return value, nil
}
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsMap returns result as SyncMap.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsMap(key string) (*SyncMap, error) {
v, err := mmap.Get(key)
if err != nil {
return nil, errors.Wrapf(err, "key: %s", key)
return nil, errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(*SyncMap); ok {
return value, nil
}
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsInt returns result as int.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsInt(key string) (int, error) {
v, err := mmap.Get(key)
if err != nil {
return 0, errors.Wrapf(err, "key: %s", key)
return 0, errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(int); ok {
return value, nil
}
return 0, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
return 0, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsInt64 returns result as int64.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsInt64(key string) (int64, error) {
v, err := mmap.Get(key)
if err != nil {
return 0, errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(int64); ok {
return value, nil
}
return 0, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsString returns result as string.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsString(key string) (string, error) {
v, err := mmap.Get(key)
if err != nil {
return "", errors.Wrapf(err, "key: %s", key)
return "", errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(string); ok {
return value, nil
}
return "", errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
return "", errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsBool returns result as bool.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsBool(key string) (bool, error) {
v, err := mmap.Get(key)
if err != nil {
return false, errors.Wrapf(err, "key: %s", key)
return false, errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(bool); ok {
return value, nil
}
return false, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
return false, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsAtomicInt returns result as *AtomicInt.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsAtomicInt(key string) (*atomiccount.AtomicInt, error) {
v, err := mmap.Get(key)
if err != nil {
return nil, errors.Wrapf(err, "key: %s", key)
return nil, errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(*atomiccount.AtomicInt); ok {
return value, nil
}
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "key %s: %v", key, v)
return nil, errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// GetAsTime returns result as Time.
// The ErrConvertFailed error will be returned if the assertion fails.
func (mmap *SyncMap) GetAsTime(key string) (time.Time, error) {
v, err := mmap.Get(key)
if err != nil {
return time.Now(), errors.Wrapf(err, "failed to get key %s from map", key)
}

if value, ok := v.(time.Time); ok {
return value, nil
}
return time.Now(), errors.Wrapf(errortypes.ErrConvertFailed, "failed to get key %s from map with value %s", key, v)
}

// Remove deletes the key-value pair from the mmap.
Expand All @@ -156,7 +185,7 @@ func (mmap *SyncMap) Remove(key string) error {
}

if _, ok := mmap.Load(key); !ok {
return errors.Wrapf(errortypes.ErrDataNotFound, "key: %s", key)
return errors.Wrapf(errortypes.ErrDataNotFound, "failed to get key %s from map", key)
}

mmap.Delete(key)
Expand Down
19 changes: 18 additions & 1 deletion supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ func NewBaseProperties() *BaseProperties {
MaxBandwidth: 200,
EnableProfiler: false,
Debug: false,
FailAccessInterval: 3,
FailAccessInterval: DefaultFailAccessInterval,
GCInitialDelay: DefaultGCInitialDelay,
GCMetaInterval: DefaultGCMetaInterval,
TaskExpireTime: DefaultTaskExpireTime,
PeerGCDelay: DefaultPeerGCDelay,
}
}

Expand Down Expand Up @@ -190,6 +194,19 @@ type BaseProperties struct {
// default: 3
FailAccessInterval time.Duration `yaml:"failAccessInterval"`

// GCInitialDelay is the delay time from the start to the first GC execution.
GCInitialDelay time.Duration `yaml:"gcInitialDelay"`

// GCMetaInterval is the interval time to execute the GC meta.
GCMetaInterval time.Duration `yaml:"gcMetaInterval"`

// TaskExpireTime when a task is not accessed within the taskExpireTime,
// and it will be treated to be expired.
TaskExpireTime time.Duration `yaml:"taskExpireTime"`

// PeerGCDelay is the delay time to execute the GC after the peer has reported the offline.
PeerGCDelay time.Duration `yaml:"peerGCDelay"`

// cIDPrefix s a prefix string used to indicate that the CID is supernode.
cIDPrefix string

Expand Down
22 changes: 22 additions & 0 deletions supernode/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package config

import (
"time"
)

const (
// DefaultSupernodeConfigFilePath the default supernode config path.
DefaultSupernodeConfigFilePath = "/etc/dragonfly/supernode.yml"
Expand Down Expand Up @@ -75,3 +79,21 @@ const (
// SubsystemDfget represents metrics from dfget
SubsystemDfget = "dfget"
)

const (
// DefaultFailAccessInterval is the interval time after failed to access the URL.
DefaultFailAccessInterval = 3 * time.Minute

// DefaultGCInitialDelay is the delay time from the start to the first GC execution.
DefaultGCInitialDelay = 6 * time.Second

// DefaultGCMetaInterval is the interval time to execute the GC meta.
DefaultGCMetaInterval = 2 * time.Minute

// DefaultTaskExpireTime when a task is not accessed within the taskExpireTime,
// and it will be treated to be expired.
DefaultTaskExpireTime = 3 * time.Minute

// DefaultPeerGCDelay is the delay time to execute the GC after the peer has reported the offline.
DefaultPeerGCDelay = 3 * time.Minute
)
9 changes: 7 additions & 2 deletions supernode/daemon/mgr/cdn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ func (cm *Manager) GetStatus(ctx context.Context, taskID string) (cdnStatus stri
return "", nil
}

// Delete the file from disk with specified taskID.
func (cm *Manager) Delete(ctx context.Context, taskID string) error {
// Delete the cdn meta with specified taskID.
func (cm *Manager) Delete(ctx context.Context, taskID string, force bool) error {
if !force {
return cm.pieceMD5Manager.removePieceMD5sByTaskID(taskID)
}

// TODO: delete the file form disk.
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions supernode/daemon/mgr/cdn/piece_md5_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ func (pmm *pieceMD5Mgr) getPieceMD5sByTaskID(taskID string) (pieceMD5s []string,
}
return pieceMD5s, nil
}

func (pmm *pieceMD5Mgr) removePieceMD5sByTaskID(taskID string) error {
return pmm.taskPieceMD5s.Remove(taskID)
}
5 changes: 3 additions & 2 deletions supernode/daemon/mgr/cdn_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type CDNMgr interface {
// GetStatus get the status of the file.
GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error)

// Delete the file from disk with specified taskID.
Delete(ctx context.Context, taskID string) error
// Delete the cdn meta with specified taskID.
// The file on the disk will be deleted when the force is true.
Delete(ctx context.Context, taskID string, force bool) error
}
6 changes: 6 additions & 0 deletions supernode/daemon/mgr/dfget_task_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type DfgetTaskMgr interface {
// GetCIDByPeerIDAndTaskID returns cid with specified peerID and taskID.
GetCIDByPeerIDAndTaskID(ctx context.Context, peerID, taskID string) (string, error)

// GetCIDsByTaskID returns cids as a string slice with specified taskID.
GetCIDsByTaskID(ctx context.Context, taskID string) ([]string, error)

// GetCIDAndTaskIDsByPeerID returns a cid<->taskID map by specified peerID.
GetCIDAndTaskIDsByPeerID(ctx context.Context, peerID string) (map[string]string, error)

// List returns the list of dfgetTask.
List(ctx context.Context, filter map[string]string) (dfgetTaskList []*types.DfGetTask, err error)

Expand Down
66 changes: 64 additions & 2 deletions supernode/daemon/mgr/dfgettask/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dfgettask
import (
"context"
"fmt"
"strings"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
Expand All @@ -31,10 +32,15 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)

var _ mgr.DfgetTaskMgr = &Manager{}

const (
keyJoinChar = "@"
)

type metrics struct {
dfgetTasks *prometheus.GaugeVec
dfgetTasksRegisterCount *prometheus.CounterVec
Expand Down Expand Up @@ -119,6 +125,62 @@ func (dtm *Manager) GetCIDByPeerIDAndTaskID(ctx context.Context, peerID, taskID
return dtm.ptoc.GetAsString(generatePeerKey(peerID, taskID))
}

// GetCIDsByTaskID returns cids as a string slice with specified taskID.
func (dtm *Manager) GetCIDsByTaskID(ctx context.Context, taskID string) ([]string, error) {
var result []string
suffixString := keyJoinChar + taskID
rangeFunc := func(k, v interface{}) bool {
key, ok := k.(string)
if !ok {
return true
}

if !strings.HasSuffix(key, suffixString) {
return true
}
cid, err := dtm.ptoc.GetAsString(key)
if err != nil {
logrus.Warnf("failed to get cid from ptoc with key(%s): %v", key, err)
return true
}

result = append(result, cid)
return true
}
dtm.ptoc.Range(rangeFunc)

return result, nil
}

// GetCIDAndTaskIDsByPeerID returns a cid<->taskID map by specified peerID.
func (dtm *Manager) GetCIDAndTaskIDsByPeerID(ctx context.Context, peerID string) (map[string]string, error) {
var result = make(map[string]string)
prefixStr := peerID + keyJoinChar
rangeFunc := func(k, v interface{}) bool {
key, ok := k.(string)
if !ok {
return true
}

if !strings.HasPrefix(key, prefixStr) {
return true
}
cid, err := dtm.ptoc.GetAsString(key)
if err != nil {
logrus.Warnf("failed to get cid from ptoc with key(%s): %v", key, err)
return true
}

// get TaskID from the key
splitResult := strings.Split(key, keyJoinChar)
result[cid] = splitResult[len(splitResult)-1]
return true
}
dtm.ptoc.Range(rangeFunc)

return result, nil
}

// List returns the list of dfgetTask.
func (dtm *Manager) List(ctx context.Context, filter map[string]string) (dfgetTaskList []*types.DfGetTask, err error) {
return nil, nil
Expand Down Expand Up @@ -189,9 +251,9 @@ func generateKey(cID, taskID string) (string, error) {
return "", errors.Wrapf(errortypes.ErrEmptyValue, "taskID")
}

return fmt.Sprintf("%s%s%s", cID, "@", taskID), nil
return fmt.Sprintf("%s%s%s", cID, keyJoinChar, taskID), nil
}

func generatePeerKey(peerID, taskID string) string {
return fmt.Sprintf("%s@%s", peerID, taskID)
return fmt.Sprintf("%s%s%s", peerID, keyJoinChar, taskID)
}
Loading

0 comments on commit 564290c

Please sign in to comment.