Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disable backend synchronization #5734

Merged
merged 4 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions engine/cdn/storage/nfs/nfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/rockbears/log"
"github.com/stretchr/testify/require"
"io/ioutil"
"math"
"os"
"testing"
"time"
Expand Down Expand Up @@ -64,7 +63,7 @@ func TestNFSReadWrite(t *testing.T) {
require.NotNil(t, d)
bd, is := d.(storage.BufferUnit)
require.True(t, is)
bd.New(sdk.NewGoRoutines(), 1, math.MaxFloat64)
bd.New(sdk.NewGoRoutines(), storage.AbstractUnitConfig{})
err := bd.Init(ctx, &storage.NFSBufferConfiguration{
Host: nfsHost,
TargetPartition: nfsTargetPath,
Expand Down
28 changes: 20 additions & 8 deletions engine/cdn/storage/storageunit.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("redis driver is not a buffer unit driver"))
}
bd.New(gorts, 1, math.MaxFloat64)
bd.New(gorts, AbstractUnitConfig{syncBandwidth: math.MaxFloat64, syncParrallel: 1})
if err := bd.Init(ctx, bu.Redis, bu.BufferType); err != nil {
return nil, err
}
Expand All @@ -106,7 +106,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("local driver is not a buffer unit driver"))
}
bd.New(gorts, 1, math.MaxFloat64)
bd.New(gorts, AbstractUnitConfig{syncBandwidth: math.MaxFloat64, syncParrallel: 1})
if err := bd.Init(ctx, bu.Local, bu.BufferType); err != nil {
return nil, err
}
Expand All @@ -121,7 +121,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("nfs buffer driver is not a buffer unit driver"))
}
bd.New(gorts, 1, math.MaxFloat64)
bd.New(gorts, AbstractUnitConfig{syncBandwidth: math.MaxFloat64, syncParrallel: 1})
if err := bd.Init(ctx, bu.Nfs, bu.BufferType); err != nil {
return nil, err
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("cds driver is not a storage unit driver"))
}
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes

if err := sd.Init(ctx, cfg.CDS); err != nil {
return nil, err
Expand All @@ -191,7 +191,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("local driver is not a storage unit driver"))
}
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes

if err := sd.Init(ctx, cfg.Local); err != nil {
return nil, err
Expand All @@ -204,7 +204,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("swift driver is not a storage unit driver"))
}
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes

if err := sd.Init(ctx, cfg.Swift); err != nil {
return nil, err
Expand All @@ -217,7 +217,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("webdav driver is not a storage unit driver"))
}
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes

if err := sd.Init(ctx, cfg.Webdav); err != nil {
return nil, err
Expand All @@ -230,7 +230,7 @@ func Init(ctx context.Context, m *gorpmapper.Mapper, store cache.Store, db *gorp
if !is {
return nil, sdk.WithStack(fmt.Errorf("s3 driver is not a storage unit driver"))
}
sd.New(gorts, cfg.SyncParallel, float64(cfg.SyncBandwidth)*1024*1024) // convert from MBytes to Bytes
sd.New(gorts, AbstractUnitConfig{syncBandwidth: float64(cfg.SyncBandwidth) * 1024 * 1024, syncParrallel: cfg.SyncParallel, disableSync: cfg.DisableSync}) // convert from MBytes to Bytes

if err := sd.Init(ctx, cfg.S3); err != nil {
return nil, err
Expand Down Expand Up @@ -280,6 +280,9 @@ func (r *RunningStorageUnits) PushInSyncQueue(ctx context.Context, itemID string
return
}
for _, sto := range r.Storages {
if !sto.CanSync() {
continue
}
if err := r.cache.ScoredSetAdd(ctx, cache.Key(KeyBackendSync, sto.Name()), itemID, float64(created.Unix())); err != nil {
log.Info(ctx, "storeLogs> cannot push item %s into scoredset for unit %s", itemID, sto.Name())
continue
Expand All @@ -290,6 +293,9 @@ func (r *RunningStorageUnits) PushInSyncQueue(ctx context.Context, itemID string
func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines) {
// Get Unknown items
for _, s := range r.Storages {
if !s.CanSync() {
continue
}
if err := r.FillWithUnknownItems(ctx, s, r.config.SyncNbElements); err != nil {
log.Error(ctx, "Start> unable to get unknown items: %v", err)
}
Expand All @@ -298,6 +304,9 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
// Start the sync processes
for i := range r.Storages {
s := r.Storages[i]
if !s.CanSync() {
continue
}
for x := 0; x < cap(s.SyncItemChannel()); x++ {
gorts.Run(ctx, fmt.Sprintf("RunningStorageUnits.process.%s.%d", s.Name(), x),
func(ctx context.Context) {
Expand Down Expand Up @@ -381,6 +390,9 @@ func (r *RunningStorageUnits) Start(ctx context.Context, gorts *sdk.GoRoutines)
wg := sync.WaitGroup{}
for i := range r.Storages {
s := r.Storages[i]
if !s.CanSync() {
continue
}
gorts.Exec(ctx, "RunningStorageUnits.run."+s.Name(),
func(ctx context.Context) {
wg.Add(1)
Expand Down
26 changes: 20 additions & 6 deletions engine/cdn/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
type Interface interface {
Name() string
ID() string
New(gorts *sdk.GoRoutines, syncParrallel int64, syncBandwidth float64)
New(gorts *sdk.GoRoutines, config AbstractUnitConfig)
Set(u sdk.CDNUnit)
ItemExists(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, i sdk.CDNItem) (bool, error)
Status(ctx context.Context) []sdk.MonitoringStatusLine
Expand All @@ -56,13 +56,18 @@ type AbstractUnit struct {
u sdk.CDNUnit
syncChan chan string
syncBandwidth float64
disableSync bool
}

func (a *AbstractUnit) ExistsInDatabase(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, id string) (*sdk.CDNItemUnit, error) {
query := gorpmapper.NewQuery("SELECT * FROM storage_unit_item WHERE unit_id = $1 and item_id = $2 LIMIT 1").Args(a.ID(), id)
return getItemUnit(ctx, m, db, query, gorpmapper.GetOptions.WithDecryption)
}

func (a *AbstractUnit) CanSync() bool {
return !a.disableSync
}

func (a *AbstractUnit) Name() string {
return a.u.Name
}
Expand All @@ -73,13 +78,14 @@ func (a *AbstractUnit) ID() string {

func (a *AbstractUnit) Set(u sdk.CDNUnit) { a.u = u }

func (a *AbstractUnit) New(gorts *sdk.GoRoutines, syncParrallel int64, syncBandwidth float64) {
func (a *AbstractUnit) New(gorts *sdk.GoRoutines, config AbstractUnitConfig) {
a.GoRoutines = gorts
a.syncChan = make(chan string, syncParrallel)
if syncBandwidth <= 0 {
syncBandwidth = math.MaxFloat64
a.syncChan = make(chan string, config.syncParrallel)
if config.syncBandwidth <= 0 {
config.syncBandwidth = math.MaxFloat64
}
a.syncBandwidth = syncBandwidth / float64(syncParrallel)
a.syncBandwidth = config.syncBandwidth / float64(config.syncParrallel)
a.disableSync = config.disableSync
}

func (a *AbstractUnit) SyncItemChannel() chan string { return a.syncChan }
Expand Down Expand Up @@ -116,13 +122,20 @@ type FileBufferUnit interface {
Write(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
}

type AbstractUnitConfig struct {
syncParrallel int64
syncBandwidth float64
disableSync bool
}

type StorageUnit interface {
Interface
Unit
Init(ctx context.Context, cfg interface{}) error
SyncItemChannel() chan string
NewWriter(ctx context.Context, i sdk.CDNItemUnit) (io.WriteCloser, error)
Write(i sdk.CDNItemUnit, r io.Reader, w io.Writer) error
CanSync() bool
}

type StorageUnitWithLocator interface {
Expand Down Expand Up @@ -155,6 +168,7 @@ const (
type StorageConfiguration struct {
SyncParallel int64 `toml:"syncParallel" json:"sync_parallel" comment:"number of parallel sync processes"`
SyncBandwidth int64 `toml:"syncBandwidth" json:"sync_bandwidth" comment:"global bandwith shared by the sync processes (in Mb)"`
DisableSync bool `toml:"disableSync" json:"disable_sync" comment:"flag to disabled backend synchronization"`
Local *LocalStorageConfiguration `toml:"local" json:"local,omitempty" mapstructure:"local"`
Swift *SwiftStorageConfiguration `toml:"swift" json:"swift,omitempty" mapstructure:"swift"`
Webdav *WebdavStorageConfiguration `toml:"webdav" json:"webdav,omitempty" mapstructure:"webdav"`
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/storage/webdav/webdav.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func init() {
storage.RegisterDriver("webdav", new(Webdav))
}

func (s *Webdav) Init(ctx context.Context, cfg interface{}) error {
func (s *Webdav) Init(_ context.Context, cfg interface{}) error {
config, is := cfg.(*storage.WebdavStorageConfiguration)
if !is {
return sdk.WithStack(fmt.Errorf("invalid configuration: %T", cfg))
Expand Down