Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Commit

Permalink
add dfget metrics
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Sep 12, 2019
1 parent ff32c01 commit 9de053d
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 33 deletions.
73 changes: 73 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,25 @@ paths:
500:
$ref: "#/responses/500ErrorResponse"

/task/metrics:
post:
summary: "upload dfclient download metrics"
description: |
This endpoint is mainly for observability. Dfget is a short-live job
and we use this endpoint to upload dfget download related metrics.
parameters:
- name: "body"
in: "body"
description: "request body which contains dfget download related information"
schema:
$ref: "#/definitions/TaskMetricsRequest"
responses:
200:
description: "no error"
schema:
$ref: "#/definitions/ResultInfo"
500:
$ref: "#/responses/500ErrorResponse"

definitions:
Error:
Expand Down Expand Up @@ -1198,6 +1217,60 @@ definitions:
black/white list mechanism to guarantee security, or some other purposes like debugging.
minLength: 1

TaskMetricsRequest:
type: "object"
description: ""
properties:
IP:
type: "string"
description: "IP address which peer client carries"
format: "string"
port:
type: "integer"
description: |
when registering, dfget will setup one uploader process.
This one acts as a server for peer pulling tasks.
This port is which this server listens on.
format: "int32"
minimum: 15000
maximum: 65000
taskId:
type: "string"
description: "IP address which peer client carries"
format: "string"
cID:
type: "string"
description: |
CID means the client ID. It maps to the specific dfget process.
When user wishes to download an image/file, user would start a dfget process to do this.
This dfget is treated a client and carries a client ID.
Thus, multiple dfget processes on the same peer have different CIDs.
callSystem:
type: "string"
description: |
This attribute represents where the dfget requests come from. Dfget will pass
this field to supernode and supernode can do some checking and filtering via
black/white list mechanism to guarantee security, or some other purposes like debugging.
minLength: 1
duration:
type: "number"
format: float64
description: |
Duration for dfget task.
success:
type: "boolean"
description: "whether the download task success or not"
backsourceReason:
type: "string"
description: |
when registering, dfget will setup one uploader process.
This one acts as a server for peer pulling tasks.
This port is which this server listens on.
fileLength:
type: "integer"
format: "int64"
description: "The length of the file dfget requests to download in bytes."

ErrorResponse:
type: "object"
description: |
Expand Down
129 changes: 129 additions & 0 deletions apis/types/task_metrics_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions cmd/supernode/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,15 @@ func runSuperNode() error {
if err := fileutils.CreateDirectory(options.HomeDir); err != nil {
return fmt.Errorf("failed to create home dir %s: %v", options.HomeDir, err)
}
// initialize log.
if err := initLog(); err != nil {

// initialize supernode logger.
if err := initLog(logrus.StandardLogger(), "app.log"); err != nil {
return err
}

// initialize dfget logger.
dfgetLogger := logrus.New()
if err := initLog(dfgetLogger, "dfget.log"); err != nil {
return err
}

Expand All @@ -151,7 +158,7 @@ func runSuperNode() error {

logrus.Info("start to run supernode")

d, err := daemon.New(cfg)
d, err := daemon.New(cfg, dfgetLogger)
if err != nil {
logrus.Errorf("failed to initialize daemon in supernode: %v", err)
return err
Expand All @@ -166,9 +173,9 @@ func runSuperNode() error {
return d.Run()
}

// initLog initializes log Level and log format of daemon.
func initLog() error {
logFilePath := path.Join(options.HomeDir, "logs", "app.log")
// initLog initializes log Level and log format
func initLog(logger *logrus.Logger, logPath string) error {
logFilePath := path.Join(options.HomeDir, "logs", logPath)

opts := []dflog.Option{
dflog.WithLogFile(logFilePath),
Expand All @@ -177,8 +184,11 @@ func initLog() error {
}

logrus.Debugf("use log file %s", logFilePath)
if err := dflog.Init(logger, opts...); err != nil {
return errors.Wrap(err, "init log")
}

return errors.Wrap(dflog.Init(logrus.StandardLogger(), opts...), "init log")
return nil
}

// initConfig load configuration from config file.
Expand Down
26 changes: 24 additions & 2 deletions dfget/core/api/supernode_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

api_types "github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/dfget/types"
"github.com/dragonflyoss/Dragonfly/pkg/constants"
"github.com/dragonflyoss/Dragonfly/pkg/httputils"
Expand All @@ -35,6 +36,7 @@ const (
peerReportPiecePath = "/peer/piece/suc"
peerClientErrorPath = "/peer/piece/error"
peerServiceDownPath = "/peer/service/down"
metricsReportPath = "/task/metrics"
)

// NewSupernodeAPI creates a new instance of SupernodeAPI with default value.
Expand All @@ -53,6 +55,7 @@ type SupernodeAPI interface {
ReportPiece(node string, req *types.ReportPieceRequest) (resp *types.BaseResponse, e error)
ServiceDown(node string, taskID string, cid string) (resp *types.BaseResponse, e error)
ReportClientError(node string, req *types.ClientErrorRequest) (resp *types.BaseResponse, e error)
ReportMetrics(node string, req *api_types.TaskMetricsRequest) (resp *types.BaseResponse, e error)
}

type supernodeAPI struct {
Expand Down Expand Up @@ -149,6 +152,26 @@ func (api *supernodeAPI) ReportClientError(node string, req *types.ClientErrorRe
return
}

func (api *supernodeAPI) ReportMetrics(node string, req *api_types.TaskMetricsRequest) (resp *types.BaseResponse, err error) {
var (
code int
body []byte
)
url := fmt.Sprintf("%s://%s%s",
api.Scheme, node, metricsReportPath)
if code, body, err = api.HTTPClient.PostJSON(url, req, api.Timeout); err != nil {
return nil, err
}
if !httputils.HTTPStatusOk(code) {
return nil, fmt.Errorf("%d:%s", code, body)
}
resp = new(types.BaseResponse)
if err = json.Unmarshal(body, resp); err != nil {
return nil, err
}
return resp, err
}

func (api *supernodeAPI) get(url string, resp interface{}) error {
var (
code int
Expand All @@ -164,6 +187,5 @@ func (api *supernodeAPI) get(url string, resp interface{}) error {
if !httputils.HTTPStatusOk(code) {
return fmt.Errorf("%d:%s", code, body)
}
e = json.Unmarshal(body, resp)
return e
return json.Unmarshal(body, resp)
}
Loading

0 comments on commit 9de053d

Please sign in to comment.