Skip to content

Commit

Permalink
feat: object storage add logger
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jun 27, 2022
1 parent ea0fbe2 commit 9ee85f2
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 35 deletions.
100 changes: 66 additions & 34 deletions client/daemon/objectstorage/objectstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
AsyncWriteBack

// Ephemeral only writes the object to the dfdaemon.
// It is only provided for creating temporary objects between peers,
// and users are not allowed to use this mode.
Ephemeral
)

Expand Down Expand Up @@ -181,16 +183,35 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
}

var (
urlMeta *base.UrlMeta
bucketName = params.ID
objectKey = strings.TrimPrefix(params.ObjectKey, "/")
filter = query.Filter
artifactRange *clientutil.Range
ranges []clientutil.Range
err error
)

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

isExist, err := client.IsObjectExist(ctx, bucketName, objectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

if !isExist {
ctx.JSON(http.StatusNotFound, gin.H{"errors": http.StatusText(http.StatusNotFound)})
return
}

// Initialize filter field.
urlMeta.Filter = o.config.ObjectStorage.Filter
if query.Filter != "" {
urlMeta.Filter = query.Filter
urlMeta := &base.UrlMeta{Filter: o.config.ObjectStorage.Filter}
if filter != "" {
urlMeta.Filter = filter
}

// Parse http range header.
Expand All @@ -207,25 +228,23 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
urlMeta.Range = strings.TrimLeft(rangeHeader, "bytes=")
}

client, err := o.client()
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

meta, err := client.GetObjectMetadata(ctx, params.ID, params.ObjectKey)
meta, err := client.GetObjectMetadata(ctx, bucketName, objectKey)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
urlMeta.Digest = meta.Digest

signURL, err := client.GetSignURL(ctx, params.ID, params.ObjectKey, objectstorage.MethodGet, defaultSignExpireTime)
signURL, err := client.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime)
if err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

taskID := idgen.TaskID(signURL, urlMeta)
log := logger.WithTaskID(taskID)
log.Infof("get object %s meta: %s %#v", objectKey, signURL, urlMeta)

reader, attr, err := o.peerTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{
URL: signURL,
URLMeta: urlMeta,
Expand All @@ -245,6 +264,7 @@ func (o *objectStorage) getObject(ctx *gin.Context) {
}
}

log.Infof("object content length is %d and content type is %s", contentLength, attr[headers.ContentType])
ctx.DataFromReader(http.StatusOK, contentLength, attr[headers.ContentType], reader, nil)
}

Expand All @@ -262,7 +282,13 @@ func (o *objectStorage) destroyObject(ctx *gin.Context) {
return
}

if err := client.DeleteObject(ctx, params.ID, params.ObjectKey); err != nil {
var (
bucketName = params.ID
objectKey = strings.TrimPrefix(params.ObjectKey, "/")
)

logger.Infof("destroy object %s in bucket %s", objectKey, bucketName)
if err := client.DeleteObject(ctx, bucketName, objectKey); err != nil {
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
Expand All @@ -285,8 +311,13 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
return
}

bucketName := params.ID
objectKey := form.Key
var (
bucketName = params.ID
objectKey = form.Key
mode = form.Mode
filter = form.Filter
fileHeader = form.File
)

client, err := o.client()
if err != nil {
Expand All @@ -301,7 +332,7 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
}

// If it is temporary storage, whether the data exists in the backend is not considered.
if isExist && form.Mode != Ephemeral {
if isExist && mode != Ephemeral {
ctx.JSON(http.StatusConflict, gin.H{"errors": http.StatusText(http.StatusConflict)})
return
}
Expand All @@ -313,29 +344,30 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
}

// Initialize url meta.
var urlMeta *base.UrlMeta
dgst := o.md5FromFileHeader(form.File)
urlMeta := &base.UrlMeta{Filter: o.config.ObjectStorage.Filter}
dgst := o.md5FromFileHeader(fileHeader)
urlMeta.Digest = dgst.String()
urlMeta.Filter = o.config.ObjectStorage.Filter
if form.Filter != "" {
urlMeta.Filter = form.Filter
if filter != "" {
urlMeta.Filter = filter
}

// Initialize task id and peer id.
taskID := idgen.TaskID(signURL, urlMeta)
peerID := o.peerIDGenerator.PeerID()

log := logger.WithTaskAndPeerID(taskID, peerID)
log.Infof("upload object %s urlMeta: %v", signURL, urlMeta)
log.Infof("upload object %s meta: %s %#v", objectKey, signURL, urlMeta)

// Import object to local storage.
if err := o.importObjectToLocalStorage(ctx, taskID, peerID, form.File); err != nil {
log.Infof("import object %s to local storage", objectKey)
if err := o.importObjectToLocalStorage(ctx, taskID, peerID, fileHeader); err != nil {
log.Error(err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}

// Announce task information to scheduler.
// Announce peer information to scheduler.
log.Info("announce peer to scheduler")
if err := o.peerTaskManager.AnnouncePeerTask(ctx, storage.PeerTaskMetadata{
TaskID: taskID,
PeerID: peerID,
Expand All @@ -346,52 +378,52 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
}

// Handle task for backend.
switch form.Mode {
switch mode {
case Ephemeral:
ctx.Status(http.StatusOK)
return
case WriteBack:
// Import object to seed peer.
go func() {
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, form.File); err != nil {
log.Infof("import object %s to seed peer", objectKey)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil {
log.Errorf("import object %s to seed peer failed: %s", objectKey, err)
}
log.Infof("import object %s to seed peer", objectKey)
}()

// Import object to object storage.
if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, form.File, client); err != nil {
log.Infof("import object %s to bucket %s", objectKey, bucketName)
if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, fileHeader, client); err != nil {
log.Error(err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
log.Infof("import object %s to bucket %s", objectKey, bucketName)

ctx.Status(http.StatusOK)
return
case AsyncWriteBack:
// Import object to seed peer.
go func() {
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, form.File); err != nil {
log.Infof("import object %s to seed peer", objectKey)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil {
log.Errorf("import object %s to seed peer failed: %s", objectKey, err)
}
log.Infof("import object %s to seed peer", objectKey)
}()

// Import object to object storage.
go func() {
if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, form.File, client); err != nil {
log.Infof("import object %s to bucket %s", objectKey, bucketName)
if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, fileHeader, client); err != nil {
log.Errorf("import object %s to bucket %s failed: %s", objectKey, bucketName, err.Error())
return
}
log.Infof("import object %s to bucket %s", objectKey, bucketName)
}()

ctx.Status(http.StatusOK)
return
}

ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": fmt.Sprintf("unknow mode %d", form.Mode)})
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": fmt.Sprintf("unknow mode %d", mode)})
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/objectstorage/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
return nil, err
}

header, err := bucket.GetObjectMeta(objectKey)
header, err := bucket.GetObjectDetailedMeta(objectKey)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 9ee85f2

Please sign in to comment.