From 02913a544223d9d61708b98f5aeac652742f7c82 Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 13 Jan 2022 21:34:47 +0800 Subject: [PATCH] feat: add scheduler host gc (#989) Signed-off-by: Gaius --- .../deployment/configuration/scheduler.yaml | 12 +- .../deployment/configuration/scheduler.yaml | 4 + scheduler/config/config.go | 8 + scheduler/config/config_test.go | 2 + scheduler/config/testdata/scheduler.yaml | 2 + scheduler/resource/host_manager.go | 49 ++++- scheduler/resource/host_manager_mock.go | 14 ++ scheduler/resource/host_manager_test.go | 174 +++++++++++++++++- scheduler/resource/resource.go | 5 +- scheduler/resource/resource_test.go | 21 ++- 10 files changed, 273 insertions(+), 18 deletions(-) diff --git a/docs/en/deployment/configuration/scheduler.yaml b/docs/en/deployment/configuration/scheduler.yaml index ba463adfcd2..699cffc2f46 100644 --- a/docs/en/deployment/configuration/scheduler.yaml +++ b/docs/en/deployment/configuration/scheduler.yaml @@ -34,14 +34,18 @@ scheduler: retryInterval: 1s # gc metadata configuration gc: - # peerGCInterval peer's gc interval + # peerGCInterval is peer's gc interval peerGCInterval: 10m - # peerTTL peer's TTL duration + # peerTTL is peer's TTL duration peerTTL: 24h - # taskGCInterval task's gc interval + # taskGCInterval is task's gc interval taskGCInterval: 10m - # taskTTL task's TTL duration + # taskTTL is task's TTL duration taskTTL: 24h + # hostGCInterval is host's gc interval + hostGCInterval: 30m + # hostTTL is host's TTL duration + hostTTL: 48h # dynamic data configuration dynConfig: diff --git a/docs/zh-CN/deployment/configuration/scheduler.yaml b/docs/zh-CN/deployment/configuration/scheduler.yaml index 3f5eb1f3844..56772196a60 100644 --- a/docs/zh-CN/deployment/configuration/scheduler.yaml +++ b/docs/zh-CN/deployment/configuration/scheduler.yaml @@ -40,6 +40,10 @@ scheduler: taskGCInterval: 10m # 不活跃的 task 的存活时间 taskTTL: 24h + # host 的回收间隔 + hostGCInterval: 30m + # 不活跃的 host 的存活时间 + hostTTL: 48h # 动态数据配置 dynConfig: diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 57778adff49..f25f9c095b3 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -71,6 +71,8 @@ func New() *Config { PeerTTL: 24 * time.Hour, TaskGCInterval: 10 * time.Minute, TaskTTL: 24 * time.Hour, + HostGCInterval: 30 * time.Minute, + HostTTL: 48 * time.Hour, }, }, DynConfig: &DynConfig{ @@ -254,6 +256,12 @@ type GCConfig struct { // Task time to live TaskTTL time.Duration `yaml:"taskTTL" mapstructure:"taskTTL"` + + // Host gc interval + HostGCInterval time.Duration `yaml:"hostGCInterval" mapstructure:"hostGCInterval"` + + // Host time to live + HostTTL time.Duration `yaml:"hostTTL" mapstructure:"hostTTL"` } type DynConfig struct { diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index f7401cb160d..7972f7ec13f 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -48,6 +48,8 @@ func TestConfig_Load(t *testing.T) { PeerTTL: 5 * time.Minute, TaskGCInterval: 1 * time.Minute, TaskTTL: 10 * time.Minute, + HostGCInterval: 1 * time.Minute, + HostTTL: 10 * time.Minute, }, }, DynConfig: &DynConfig{ diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index e91e04e571a..4a9fe43d9d6 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -16,6 +16,8 @@ scheduler: peerTTL: 300000000000 taskGCInterval: 60000000000 taskTTL: 600000000000 + hostGCInterval: 60000000000 + hostTTL: 600000000000 dynconfig: refreshInterval: 300000000000 diff --git a/scheduler/resource/host_manager.go b/scheduler/resource/host_manager.go index eb4dfa606c2..4ea16c98745 100644 --- a/scheduler/resource/host_manager.go +++ b/scheduler/resource/host_manager.go @@ -18,6 +18,15 @@ package resource import ( "sync" + "time" + + pkggc "d7y.io/dragonfly/v2/pkg/gc" + "d7y.io/dragonfly/v2/scheduler/config" +) + +const ( + // GC host id + GCHostID = "host" ) type HostManager interface { @@ -34,16 +43,36 @@ type HostManager interface { // Delete deletes host for a key Delete(string) + + // Try to reclaim host + RunGC() error } type hostManager struct { // Host sync map *sync.Map + + // Host time to live + ttl time.Duration } // New host manager interface -func newHostManager() HostManager { - return &hostManager{&sync.Map{}} +func newHostManager(cfg *config.GCConfig, gc pkggc.GC) (HostManager, error) { + h := &hostManager{ + Map: &sync.Map{}, + ttl: cfg.HostTTL, + } + + if err := gc.Add(pkggc.Task{ + ID: GCHostID, + Interval: cfg.HostGCInterval, + Timeout: cfg.HostGCInterval, + Runner: h, + }); err != nil { + return nil, err + } + + return h, nil } func (h *hostManager) Load(key string) (*Host, bool) { @@ -67,3 +96,19 @@ func (h *hostManager) LoadOrStore(host *Host) (*Host, bool) { func (h *hostManager) Delete(key string) { h.Map.Delete(key) } + +func (h *hostManager) RunGC() error { + h.Map.Range(func(_, value interface{}) bool { + host := value.(*Host) + elapsed := time.Since(host.UpdateAt.Load()) + + if elapsed > h.ttl && host.LenPeers() == 0 { + host.Log.Info("host has been reclaimed") + h.Delete(host.ID) + } + + return true + }) + + return nil +} diff --git a/scheduler/resource/host_manager_mock.go b/scheduler/resource/host_manager_mock.go index d21fbdd7b85..5f369a25191 100644 --- a/scheduler/resource/host_manager_mock.go +++ b/scheduler/resource/host_manager_mock.go @@ -75,6 +75,20 @@ func (mr *MockHostManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockHostManager)(nil).LoadOrStore), arg0) } +// RunGC mocks base method. +func (m *MockHostManager) RunGC() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RunGC") + ret0, _ := ret[0].(error) + return ret0 +} + +// RunGC indicates an expected call of RunGC. +func (mr *MockHostManagerMockRecorder) RunGC() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RunGC", reflect.TypeOf((*MockHostManager)(nil).RunGC)) +} + // Store mocks base method. func (m *MockHostManager) Store(arg0 *Host) { m.ctrl.T.Helper() diff --git a/scheduler/resource/host_manager_test.go b/scheduler/resource/host_manager_test.go index 113a80c9ac0..f9447120267 100644 --- a/scheduler/resource/host_manager_test.go +++ b/scheduler/resource/host_manager_test.go @@ -17,29 +17,62 @@ package resource import ( + "errors" "reflect" "testing" + "time" + gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/pkg/gc" + "d7y.io/dragonfly/v2/scheduler/config" +) + +var ( + mockHostGCConfig = &config.GCConfig{ + HostGCInterval: 1 * time.Second, + HostTTL: 1 * time.Microsecond, + } ) func TestHostManager_newHostManager(t *testing.T) { tests := []struct { name string - expect func(t *testing.T, hostManager HostManager) + mock func(m *gc.MockGCMockRecorder) + expect func(t *testing.T, hostManager HostManager, err error) }{ { name: "new host manager", - expect: func(t *testing.T, hostManager HostManager) { + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hostManager HostManager, err error) { assert := assert.New(t) assert.Equal(reflect.TypeOf(hostManager).Elem().Name(), "hostManager") }, }, + { + name: "new host manager failed because of gc error", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, hostManager HostManager, err error) { + assert := assert.New(t) + assert.EqualError(err, "foo") + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, newHostManager()) + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + hostManager, err := newHostManager(mockHostGCConfig, gc) + + tc.expect(t, hostManager, err) }) } } @@ -47,10 +80,14 @@ func TestHostManager_newHostManager(t *testing.T) { func TestHostManager_Load(t *testing.T) { tests := []struct { name string + mock func(m *gc.MockGCMockRecorder) expect func(t *testing.T, hostManager HostManager, mockHost *Host) }{ { name: "load host", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) hostManager.Store(mockHost) @@ -61,6 +98,9 @@ func TestHostManager_Load(t *testing.T) { }, { name: "host does not exist", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) _, ok := hostManager.Load(mockHost.ID) @@ -69,6 +109,9 @@ func TestHostManager_Load(t *testing.T) { }, { name: "load key is empty", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) mockHost.ID = "" @@ -82,8 +125,17 @@ func TestHostManager_Load(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + mockHost := NewHost(mockRawHost) - hostManager := newHostManager() + hostManager, err := newHostManager(mockHostGCConfig, gc) + if err != nil { + t.Fatal(err) + } + tc.expect(t, hostManager, mockHost) }) } @@ -92,10 +144,14 @@ func TestHostManager_Load(t *testing.T) { func TestHostManager_Store(t *testing.T) { tests := []struct { name string + mock func(m *gc.MockGCMockRecorder) expect func(t *testing.T, hostManager HostManager, mockHost *Host) }{ { name: "store host", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) hostManager.Store(mockHost) @@ -106,6 +162,9 @@ func TestHostManager_Store(t *testing.T) { }, { name: "store key is empty", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) mockHost.ID = "" @@ -119,8 +178,17 @@ func TestHostManager_Store(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + mockHost := NewHost(mockRawHost) - hostManager := newHostManager() + hostManager, err := newHostManager(mockHostGCConfig, gc) + if err != nil { + t.Fatal(err) + } + tc.expect(t, hostManager, mockHost) }) } @@ -129,10 +197,14 @@ func TestHostManager_Store(t *testing.T) { func TestHostManager_LoadOrStore(t *testing.T) { tests := []struct { name string + mock func(m *gc.MockGCMockRecorder) expect func(t *testing.T, hostManager HostManager, mockHost *Host) }{ { name: "load host exist", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) hostManager.Store(mockHost) @@ -143,6 +215,9 @@ func TestHostManager_LoadOrStore(t *testing.T) { }, { name: "load host does not exist", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) host, ok := hostManager.LoadOrStore(mockHost) @@ -154,8 +229,17 @@ func TestHostManager_LoadOrStore(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + mockHost := NewHost(mockRawHost) - hostManager := newHostManager() + hostManager, err := newHostManager(mockHostGCConfig, gc) + if err != nil { + t.Fatal(err) + } + tc.expect(t, hostManager, mockHost) }) } @@ -164,10 +248,14 @@ func TestHostManager_LoadOrStore(t *testing.T) { func TestHostManager_Delete(t *testing.T) { tests := []struct { name string + mock func(m *gc.MockGCMockRecorder) expect func(t *testing.T, hostManager HostManager, mockHost *Host) }{ { name: "delete host", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) hostManager.Store(mockHost) @@ -178,6 +266,9 @@ func TestHostManager_Delete(t *testing.T) { }, { name: "delete key does not exist", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host) { assert := assert.New(t) mockHost.ID = "" @@ -191,9 +282,78 @@ func TestHostManager_Delete(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + mockHost := NewHost(mockRawHost) - hostManager := newHostManager() + hostManager, err := newHostManager(mockHostGCConfig, gc) + if err != nil { + t.Fatal(err) + } + tc.expect(t, hostManager, mockHost) }) } } + +func TestHostManager_RunGC(t *testing.T) { + tests := []struct { + name string + mock func(m *gc.MockGCMockRecorder) + expect func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) + }{ + { + name: "host reclaimed", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) { + assert := assert.New(t) + hostManager.Store(mockHost) + err := hostManager.RunGC() + assert.NoError(err) + + _, ok := hostManager.Load(mockHost.ID) + assert.Equal(ok, false) + }, + }, + { + name: "host has peers", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) { + assert := assert.New(t) + hostManager.Store(mockHost) + mockHost.StorePeer(mockPeer) + err := hostManager.RunGC() + assert.NoError(err) + + host, ok := hostManager.Load(mockHost.ID) + assert.Equal(ok, true) + assert.Equal(host.ID, mockHost.ID) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + + mockHost := NewHost(mockRawHost) + mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) + mockPeer := NewPeer(mockPeerID, mockTask, mockHost) + hostManager, err := newHostManager(mockHostGCConfig, gc) + if err != nil { + t.Fatal(err) + } + + tc.expect(t, hostManager, mockHost, mockPeer) + }) + } +} diff --git a/scheduler/resource/resource.go b/scheduler/resource/resource.go index 474d9fb77a3..eaedd5d6f62 100644 --- a/scheduler/resource/resource.go +++ b/scheduler/resource/resource.go @@ -53,7 +53,10 @@ type resource struct { func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, opts ...grpc.DialOption) (Resource, error) { // Initialize host manager interface - hostManager := newHostManager() + hostManager, err := newHostManager(cfg.Scheduler.GC, gc) + if err != nil { + return nil, err + } // Initialize task manager interface taskManager, err := newTaskManager(cfg.Scheduler.GC, gc) diff --git a/scheduler/resource/resource_test.go b/scheduler/resource/resource_test.go index bb3d1cb419d..2107f73a420 100644 --- a/scheduler/resource/resource_test.go +++ b/scheduler/resource/resource_test.go @@ -39,7 +39,7 @@ func TestResource_New(t *testing.T) { name: "new resource", mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(2), + gc.Add(gomock.Any()).Return(nil).Times(3), dynconfig.Get().Return(&config.DynconfigData{ CDNs: []*config.CDN{{ID: 1}}, }, nil).Times(1), @@ -52,10 +52,23 @@ func TestResource_New(t *testing.T) { assert.NoError(err) }, }, + { + name: "new resource failed because of host manager error", + mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + gc.Add(gomock.Any()).Return(errors.New("foo")).Times(1), + ) + }, + expect: func(t *testing.T, resource Resource, err error) { + assert := assert.New(t) + assert.EqualError(err, "foo") + }, + }, { name: "new resource failed because of task manager error", mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( + gc.Add(gomock.Any()).Return(nil).Times(1), gc.Add(gomock.Any()).Return(errors.New("foo")).Times(1), ) }, @@ -68,7 +81,7 @@ func TestResource_New(t *testing.T) { name: "new resource failed because of peer manager error", mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(1), + gc.Add(gomock.Any()).Return(nil).Times(2), gc.Add(gomock.Any()).Return(errors.New("foo")).Times(1), ) }, @@ -81,7 +94,7 @@ func TestResource_New(t *testing.T) { name: "new resource faild because of dynconfig get error", mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(2), + gc.Add(gomock.Any()).Return(nil).Times(3), dynconfig.Get().Return(nil, errors.New("foo")).Times(1), ) }, @@ -94,7 +107,7 @@ func TestResource_New(t *testing.T) { name: "new resource faild because of cdn list is empty", mock: func(gc *gc.MockGCMockRecorder, dynconfig *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( - gc.Add(gomock.Any()).Return(nil).Times(2), + gc.Add(gomock.Any()).Return(nil).Times(3), dynconfig.Get().Return(&config.DynconfigData{ CDNs: []*config.CDN{}, }, nil).Times(1),