Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
feature: add supernode disk gc feature
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <[email protected]>
Signed-off-by: Starnop <[email protected]>
  • Loading branch information
starnop committed Sep 23, 2019
1 parent d200985 commit 5bf9871
Show file tree
Hide file tree
Showing 21 changed files with 664 additions and 28 deletions.
1 change: 0 additions & 1 deletion cmd/supernode/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func runSuperNode() error {
cfg.SetCIDPrefix(cfg.AdvertiseIP)

logrus.Debugf("get supernode config: %+v", cfg)

logrus.Info("start to run supernode")

d, err := daemon.New(cfg, dfgetLogger)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/asaskevich/govalidator v0.0.0-20170903095215-73945b6115bf // indirect
github.com/cpuguy83/go-md2man v1.0.7 // indirect
github.com/emirpasic/gods v1.12.0
github.com/go-check/check v0.0.0-20161208181325-20d25e280405
github.com/go-openapi/analysis v0.0.0-20170813233457-8ed83f2ea9f0 // indirect
github.com/go-openapi/errors v0.0.0-20170426151106-03cfca65330d
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down
2 changes: 2 additions & 0 deletions pkg/errortypes/errortypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type DfError struct {
Msg string
}

type ErrAssertFunc func(err error) bool

// New function creates a DfError.
func New(code int, msg string) *DfError {
return &DfError{
Expand Down
117 changes: 117 additions & 0 deletions pkg/fileutils/filesize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fileutils

import (
"fmt"
"regexp"
"strconv"

"github.com/dragonflyoss/Dragonfly/pkg/errortypes"

"github.com/pkg/errors"
)

// Fsize is a wrapper type which indicates the file size.
type Fsize int64

const (
B Fsize = 1
KB = 1024 * B
MB = 1024 * KB
GB = 1024 * MB
)

// fsizeRegex only supports the format G(B)/M(B)/K(B)/B or pure number.
var fsizeRegex = regexp.MustCompile("^([0-9]+)([GMK]B?|B)$")

// MarshalYAML implements the yaml.Marshaler interface.
func (f Fsize) MarshalYAML() (interface{}, error) {
result := FsizeToString(f)
return result, nil
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (f *Fsize) UnmarshalYAML(unmarshal func(interface{}) error) error {
var fsizeStr string
if err := unmarshal(&fsizeStr); err != nil {
return err
}

fsize, err := StringToFSize(fsizeStr)
if err != nil {
return err
}
*f = Fsize(fsize)
return nil
}

// FsizeToString parses a Fsize value into string.
func FsizeToString(fsize Fsize) string {
var (
n = int64(fsize)
symbol = "B"
unit = B
)
if n == 0 {
return "0B"
}

switch int64(0) {
case n % int64(GB):
symbol = "GB"
unit = GB
case n % int64(MB):
symbol = "MB"
unit = MB
case n % int64(KB):
symbol = "KB"
unit = KB
}
return fmt.Sprintf("%v%v", n/int64(unit), symbol)
}

// StringToFSize parses a string into Fsize.
func StringToFSize(fsize string) (Fsize, error) {
var n int
n, err := strconv.Atoi(fsize)
if err == nil && n >= 0 {
return Fsize(n), nil
}
if n < 0 {
return 0, errors.Wrapf(errortypes.ErrInvalidValue, "%s is not a negative value fsize", fsize)
}

matches := fsizeRegex.FindStringSubmatch(fsize)
if len(matches) != 3 {
return 0, errors.Wrapf(errortypes.ErrInvalidValue, "%s and supported format: G(B)/M(B)/K(B)/B or pure number", fsize)
}
n, _ = strconv.Atoi(matches[1])
switch unit := matches[2]; {
case unit == "G" || unit == "GB":
n *= int(GB)
case unit == "M" || unit == "MB":
n *= int(MB)
case unit == "K" || unit == "KB":
n *= int(KB)
case unit == "B":
// Value already correct
default:
return 0, errors.Wrapf(errortypes.ErrInvalidValue, "%s and supported format: G(B)/M(B)/K(B)/B or pure number", fsize)
}
return Fsize(n), nil
}
111 changes: 111 additions & 0 deletions pkg/fileutils/filesize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fileutils

import (
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"

"github.com/go-check/check"
"gopkg.in/yaml.v2"
)

type FsizeTestSuite struct {
tmpDir string
username string
}

func init() {
check.Suite(&FsizeTestSuite{})
}

func (suite *FsizeTestSuite) TestFsizeToString(c *check.C) {
var cases = []struct {
fsize Fsize
fsizeStr string
}{
{B, "1B"},
{1024 * B, "1KB"},
{3 * MB, "3MB"},
{0 * GB, "0B"},
}

for _, ca := range cases {
result := FsizeToString(ca.fsize)
c.Assert(result, check.Equals, ca.fsizeStr)
}
}

func (suite *FsizeTestSuite) TestStringToFSize(c *check.C) {
var cases = []struct {
fsizeStr string
fsize Fsize
errAssertFunc errortypes.ErrAssertFunc
}{
{"0B", 0 * B, errortypes.IsNilError},
{"1B", B, errortypes.IsNilError},
{"10G", 10 * GB, errortypes.IsNilError},
{"1024", 1 * KB, errortypes.IsNilError},
{"-1", 0, errortypes.IsInvalidValue},
{"10b", 0, errortypes.IsInvalidValue},
}

for _, ca := range cases {
result, err := StringToFSize(ca.fsizeStr)
c.Assert(ca.errAssertFunc(err), check.Equals, true)
c.Assert(result, check.DeepEquals, ca.fsize)
}
}

func (suite *FsizeTestSuite) TestMarshalYAML(c *check.C) {
var cases = []struct {
input Fsize
output string
}{
{5 * MB, "5MB\n"},
{1 * GB, "1GB\n"},
{1 * KB, "1KB\n"},
{1 * B, "1B\n"},
{0, "0B\n"},
}

for _, ca := range cases {
output, err := yaml.Marshal(ca.input)
c.Check(err, check.IsNil)
c.Check(string(output), check.Equals, ca.output)
}
}

func (suite *FsizeTestSuite) TestUnMarshalYAML(c *check.C) {
var cases = []struct {
input string
output Fsize
}{
{"5M\n", 5 * MB},
{"1G\n", 1 * GB},
{"1B\n", 1 * B},
{"1\n", 1 * B},
{"1024\n", 1 * KB},
{"1K\n", 1 * KB},
}

for _, ca := range cases {
var output Fsize
err := yaml.Unmarshal([]byte(ca.input), &output)
c.Check(err, check.IsNil)
c.Check(output, check.Equals, ca.output)
}
}
10 changes: 10 additions & 0 deletions pkg/fileutils/fileutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,13 @@ func LoadYaml(path string, out interface{}) error {
}
return nil
}

// GetFreeSpace gets the free disk space of the path.
func GetFreeSpace(path string) (Fsize, error) {
fs := syscall.Statfs_t{}
if err := syscall.Statfs(path, &fs); err != nil {
return 0, err
}

return Fsize(fs.Bavail * uint64(fs.Bsize)), nil
}
43 changes: 37 additions & 6 deletions supernode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ func NewBaseProperties() *BaseProperties {
FailAccessInterval: DefaultFailAccessInterval,
GCInitialDelay: DefaultGCInitialDelay,
GCMetaInterval: DefaultGCMetaInterval,
GCDiskInterval: DefaultGCDiskInterval,
YoungGCThreshold: DefaultYoungGCThreshold,
FullGCThreshold: DefaultFullGCThreshold,
IntervalThreshold: DefaultIntervalThreshold,
TaskExpireTime: DefaultTaskExpireTime,
PeerGCDelay: DefaultPeerGCDelay,
CleanRatio: DefaultCleanRatio,
}
}

Expand Down Expand Up @@ -193,10 +198,18 @@ type BaseProperties struct {
// default: 3
FailAccessInterval time.Duration `yaml:"failAccessInterval"`

// cIDPrefix is a prefix string used to indicate that the CID is supernode.
cIDPrefix string

// superNodePID is the ID of supernode, which is the same as peer ID of dfget.
superNodePID string

// gc related

// GCInitialDelay is the delay time from the start to the first GC execution.
GCInitialDelay time.Duration `yaml:"gcInitialDelay"`

// GCMetaInterval is the interval time to execute the GC meta.
// GCMetaInterval is the interval time to execute GC meta.
GCMetaInterval time.Duration `yaml:"gcMetaInterval"`

// TaskExpireTime when a task is not accessed within the taskExpireTime,
Expand All @@ -206,9 +219,27 @@ type BaseProperties struct {
// PeerGCDelay is the delay time to execute the GC after the peer has reported the offline.
PeerGCDelay time.Duration `yaml:"peerGCDelay"`

// cIDPrefix is a prefix string used to indicate that the CID is supernode.
cIDPrefix string

// superNodePID is the ID of supernode, which is the same as peer ID of dfget.
superNodePID string
// GCDiskInterval is the interval time to execute GC disk.
GCDiskInterval time.Duration `yaml:"gcDiskInterval"`

// YoungGCThreshold if the available disk space is more than YoungGCThreshold
// and there is no need to GC disk.
//
// default: 100GB
YoungGCThreshold fileutils.Fsize `yaml:"youngGCThreshold"`

// FullGCThreshold if the available disk space is less than FullGCThreshold
// and the supernode should gc all task files which are not being used.
//
// default: 5GB
FullGCThreshold fileutils.Fsize `yaml:"fullGCThreshold"`

// IntervalThreshold is the threshold of the interval at which the task file is accessed.
IntervalThreshold time.Duration `yaml:"IntervalThreshold"`

// CleanRatio the ratio to clean the disk and based on 10.
// And the value of CleanRatio should be [1-10].
//
// default: 1
CleanRatio int
}
14 changes: 14 additions & 0 deletions supernode/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config
import (
"time"

"github.com/dragonflyoss/Dragonfly/pkg/fileutils"
"github.com/dragonflyoss/Dragonfly/pkg/rate"
)

Expand Down Expand Up @@ -110,6 +111,19 @@ const (
DefaultPeerGCDelay = 3 * time.Minute
)

// Default config value for gc disk
const (
DefaultYoungGCThreshold = 100 * fileutils.GB

DefaultFullGCThreshold = 5 * fileutils.GB

DefaultIntervalThreshold = 2 * time.Hour

DefaultGCDiskInterval = 15 * time.Second

DefaultCleanRatio = 1
)

const (
// DefaultLinkLimit is the default network speed limit for each piece.
// unit: MB/s
Expand Down
6 changes: 4 additions & 2 deletions supernode/daemon/mgr/cdn/cache_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.TaskInfo)
if metaData, err = cd.resetRepo(ctx, task); err != nil {
return 0, nil, err
}
} else {
logrus.Debugf("start to update access time with taskID(%s)", task.ID)
cd.metaDataManager.updateAccessTime(ctx, task.ID, getCurrentTimeMillisFunc())
}

// TODO: update the access time of task meta file for GC module
return breakNum, metaData, nil
}

Expand Down Expand Up @@ -117,7 +119,7 @@ func (cd *cacheDetector) parseBreakNumByCheckFile(ctx context.Context, taskID st

func (cd *cacheDetector) resetRepo(ctx context.Context, task *types.TaskInfo) (*fileMetaData, error) {
logrus.Infof("reset repo for taskID: %s", task.ID)
if err := deleteTaskFiles(ctx, cd.cacheStore, task.ID, false); err != nil {
if err := deleteTaskFiles(ctx, cd.cacheStore, task.ID); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit 5bf9871

Please sign in to comment.