Skip to content

Commit

Permalink
implement daemon
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma committed Dec 10, 2020
1 parent 759608b commit 12263fb
Show file tree
Hide file tree
Showing 25 changed files with 3,039 additions and 182 deletions.
121 changes: 121 additions & 0 deletions client/daemon/download_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package daemon

import (
"context"
"fmt"
"net"
"os"
"time"

"google.golang.org/grpc"

dfdaemongrpc "github.com/dragonflyoss/Dragonfly2/pkg/grpc/dfdaemon"
"github.com/dragonflyoss/Dragonfly2/pkg/grpc/scheduler"
)

var _ dfdaemongrpc.DownloaderServer = &downloadManager{}

type DownloadManager interface {
ServeGRPC(lis net.Listener) error
ServeProxy(lis net.Listener) error
Stop() error
}

type downloadManager struct {
peerHost *scheduler.PeerHost
peerTaskManager PeerTaskManager

grpcServer *grpc.Server
}

func NewDownloadManager(peerHost *scheduler.PeerHost, peerTaskManager PeerTaskManager) (DownloadManager, error) {
mgr := &downloadManager{
peerHost: peerHost,
peerTaskManager: peerTaskManager,
}
return mgr, nil
}

func (d *downloadManager) ServeGRPC(lis net.Listener) error {
s := grpc.NewServer()
dfdaemongrpc.RegisterDownloaderServer(s, d)
d.grpcServer = s
return s.Serve(lis)
}

func (d *downloadManager) ServeProxy(lis net.Listener) error {
// TODO
return nil
}

func (d *downloadManager) Stop() error {
d.grpcServer.GracefulStop()
// TODO stop proxy
return nil
}

func (d *downloadManager) Download(req *dfdaemongrpc.DownloadRequest, server dfdaemongrpc.Downloader_DownloadServer) error {
// init peer task request, peer download request uses different peer id
peerTask := &FilePeerTaskRequest{
PeerTaskRequest: scheduler.PeerTaskRequest{
Url: req.Url,
Filter: req.Filter,
BizId: req.BizId,
UrlMata: req.UrlMeta,
Pid: d.GenPeerID(),
PeerHost: d.peerHost,
},
Output: req.Output,
}

peerTaskProgress, err := d.peerTaskManager.StartFilePeerTask(context.Background(), peerTask)
if err != nil {
return err
}
ctx := server.Context()
loop:
for {
select {
case p, ok := <-peerTaskProgress:
// FIXME is peer task done ?
if !ok {
break loop
}
err = server.Send(
&dfdaemongrpc.DownloadResult{
State: p.State,
TaskId: p.TaskId,
CompletedLength: p.CompletedLength,
Done: p.Done,
})
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}

func (d *downloadManager) GenPeerID() string {
// FIXME review peer id format
return fmt.Sprintf("%s-%d-%d-%d",
d.peerHost.Ip, d.peerHost.Port, os.Getpid(), time.Now().UnixNano())
}
70 changes: 70 additions & 0 deletions client/daemon/gc_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package daemon

import (
"time"

"github.com/sirupsen/logrus"
)

type GC interface {
TryGC() (bool, error)
}

type GCManager interface {
Start()
Stop()
}

type gcManager struct {
interval time.Duration
}

var allGCTasks = map[string]GC{}

func RegisterGC(name string, gc GC, interval time.Duration) {
allGCTasks[name] = gc
}

func NewGCManager(interval time.Duration) GCManager {
return &gcManager{
interval: interval,
}
}

func (g gcManager) Start() {
go func() {
tick := time.Tick(g.interval)
for {
select {
case <-tick:
for name, gc := range allGCTasks {
logrus.Infof("start gc %s", name)
_, err := gc.TryGC()
if err != nil {
logrus.Errorf("gc %s error: %s", name, err)
}
}
}
}
}()
}

func (g gcManager) Stop() {
panic("implement me")
}
183 changes: 183 additions & 0 deletions client/daemon/peerhost.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package daemon

import (
"net"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"

"github.com/dragonflyoss/Dragonfly2/pkg/grpc/scheduler"
)

type PeerHost interface {
Serve() error
GracefulStop()
}

type peerHost struct {
started chan bool
host *scheduler.PeerHost

Option PeerHostOption

DownloadManager DownloadManager
PeerTaskManager PeerTaskManager
PieceManager PieceManager
UploadManager UploadManager
StorageManager StorageManager
}

type PeerHostOption struct {
Scheduler []string

Download DownloadOption
Upload UploadOption
}

type DownloadOption struct {
GRPCListen string
GRPCNetwork string
GRPInsecure bool

ProxyListen string
ProxyNetwork string
ProxyInsecure bool
}

type UploadOption struct {
Listen string
Network string
Insecure bool
}

func NewPeerHost(option PeerHostOption) (PeerHost, error) {
// TODO initialize peer host information
host := &scheduler.PeerHost{}

storageManager, err := NewStorageManager("/tmp/dfdaemon")
if err != nil {
return nil, err
}

pieceManager, err := NewPieceManager(storageManager)
if err != nil {
return nil, err
}

// TODO schedulerPeerTaskClient locator
peerTaskManager, err := NewPeerTaskManager(pieceManager, storageManager, nil)
if err != nil {
return nil, err
}

downloadManager, err := NewDownloadManager(host, peerTaskManager)
if err != nil {
return nil, err
}

uploadManager, err := NewUploadManager(storageManager)
if err != nil {
return nil, err
}

return &peerHost{
started: make(chan bool),
host: host,
Option: option,

DownloadManager: downloadManager,
PeerTaskManager: peerTaskManager,
PieceManager: pieceManager,
UploadManager: uploadManager,
StorageManager: storageManager,
}, nil
}

func (ph *peerHost) Serve() error {
g := errgroup.Group{}
// serve grpc service
g.Go(func() error {
var listener net.Listener
var err error
if ph.Option.Download.GRPInsecure {
listener, err = net.Listen(ph.Option.Download.GRPCNetwork, ph.Option.Download.GRPCListen)
if err != nil {
logrus.Errorf("failed to listen for download grpc service: %v", err)
return err
}
} else {
// TODO tls config
panic("implement me")
}
if err = ph.DownloadManager.ServeGRPC(listener); err != nil {
logrus.Errorf("failed to serve for download grpc service: %v", err)
return err
}
return nil
})

// serve proxy service
g.Go(func() error {
var listener net.Listener
var err error
if ph.Option.Download.ProxyInsecure {
listener, err = net.Listen(ph.Option.Download.ProxyNetwork, ph.Option.Download.ProxyListen)
if err != nil {
logrus.Errorf("failed to listen for download proxy service: %v", err)
return err
}
} else {
// TODO tls config
panic("implement me")
}
if err = ph.DownloadManager.ServeProxy(listener); err != nil {
logrus.Errorf("failed to serve for download proxy service: %v", err)
return err
}
return nil
})

// serve upload service
g.Go(func() error {
var listener net.Listener
var err error
if ph.Option.Upload.Insecure {
listener, err = net.Listen(ph.Option.Upload.Network, ph.Option.Upload.Listen)
if err != nil {
logrus.Errorf("failed to listen for upload service: %v", err)
return err
}
} else {
// TODO tls config
panic("implement me")
}
if err = ph.UploadManager.Serve(listener); err != nil {
logrus.Errorf("failed to serve for upload service: %v", err)
return err
}
return nil
})

return g.Wait()
}

func (ph *peerHost) GracefulStop() {
ph.DownloadManager.Stop()
ph.UploadManager.Stop()
}
Loading

0 comments on commit 12263fb

Please sign in to comment.