Skip to content

Commit

Permalink
globus.Endpoint now verifies that files are staged before a Transfer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-cohere committed Nov 30, 2023
1 parent c3ceefc commit a00ec94
Showing 1 changed file with 91 additions and 79 deletions.
170 changes: 91 additions & 79 deletions endpoints/globus/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"log"
"net/http"
"net/url"
"path"
"path/filepath"
"strings"

"github.com/google/uuid"
Expand Down Expand Up @@ -189,7 +189,7 @@ func (ep *Endpoint) FilesStaged(files []core.DataResource) (bool, error) {
// find all the directories in which these files reside
filesInDir := make(map[string][]string)
for _, resource := range files {
dir, file := path.Split(resource.Path)
dir, file := filepath.Split(resource.Path)
if _, found := filesInDir[dir]; !found {
filesInDir[dir] = make([]string, 0)
}
Expand Down Expand Up @@ -274,97 +274,109 @@ func (ep *Endpoint) Transfers() ([]uuid.UUID, error) {
}

func (ep *Endpoint) Transfer(dst core.Endpoint, files []core.FileTransfer) (uuid.UUID, error) {
gDst := dst.(*Endpoint)
var xferId uuid.UUID

// first, get a submission ID
// https://docs.globus.org/api/transfer/task_submit/#get_submission_id
resp, err := ep.get("submission_id", url.Values{})
defer resp.Body.Close()
if err == nil {
var body []byte
body, err = io.ReadAll(resp.Body)
// first, we check that all requested files are staged on this endpoint
// (Globus does not perform this check by itself)
requestedFiles := make([]core.DataResource, len(files))
for i, file := range files {
requestedFiles[i].Path = file.SourcePath // only the Path field is required
}
staged, err := ep.FilesStaged(requestedFiles)
if err == nil && staged {
// obtain a submission ID
// https://docs.globus.org/api/transfer/task_submit/#get_submission_id
var xferId uuid.UUID
resp, err := ep.get("submission_id", url.Values{})
defer resp.Body.Close()
if err == nil {
type SubmissionIdResponse struct {
Value uuid.UUID `json:"value"`
}
var response SubmissionIdResponse
err = json.Unmarshal(body, &response)
var body []byte
body, err = io.ReadAll(resp.Body)
if err == nil {
xferId = response.Value
type SubmissionIdResponse struct {
Value uuid.UUID `json:"value"`
}
var response SubmissionIdResponse
err = json.Unmarshal(body, &response)
if err == nil {
xferId = response.Value
}
}
}
}

if err == nil {
// now, submit the transfer task itself
// https://docs.globus.org/api/transfer/task_submit/#submit_transfer_task
// https://docs.globus.org/api/transfer/task_submit/#transfer_item_fields
type TransferItem struct {
DataType string `json:"DATA_TYPE"` // "transfer_item"
SourcePath string `json:"source_path"`
DestinationPath string `json:"destination_path"`
Recursive bool `json:"recursive"`
ExternalChecksum string `json:"external_checksum"`
ChecksumAlgorithm string `json:"checksum_algorithm"`
}
type SubmissionRequest struct {
DataType string `json:"DATA_TYPE"` // "transfer"
Id string `json:"submission_id"`
Label string `json:"label"` // "DTS"
Data []TransferItem `json:"DATA"`
DestinationEndpoint string `json:"destination_endpoint"`
SourceEndpoint string `json:"source_endpoint"`
SyncLevel int `json:"sync_level"`
VerifyChecksum bool `json:"verify_checksum"`
FailOnQuotaErrors bool `json:"fail_on_quota_errors"`
}
xferItems := make([]TransferItem, len(files))
for i, file := range files {
xferItems[i] = TransferItem{
DataType: "transfer_item",
SourcePath: file.SourcePath,
DestinationPath: file.DestinationPath,
Recursive: true,
ExternalChecksum: file.Hash,
ChecksumAlgorithm: file.HashAlgorithm,
}
}
var data []byte
data, err = json.Marshal(SubmissionRequest{
DataType: "transfer",
Id: xferId.String(),
Label: "DTS",
Data: xferItems,
DestinationEndpoint: gDst.Id.String(),
SourceEndpoint: ep.Id.String(),
SyncLevel: 3, // transfer only if checksums don't match
VerifyChecksum: true,
FailOnQuotaErrors: true,
})
if err == nil {
var resp *http.Response
resp, err = ep.post("transfer", bytes.NewReader(data))
// now, submit the transfer task itself
// https://docs.globus.org/api/transfer/task_submit/#submit_transfer_task
// https://docs.globus.org/api/transfer/task_submit/#transfer_item_fields
type TransferItem struct {
DataType string `json:"DATA_TYPE"` // "transfer_item"
SourcePath string `json:"source_path"`
DestinationPath string `json:"destination_path"`
Recursive bool `json:"recursive"`
ExternalChecksum string `json:"external_checksum"`
ChecksumAlgorithm string `json:"checksum_algorithm"`
}
type SubmissionRequest struct {
DataType string `json:"DATA_TYPE"` // "transfer"
Id string `json:"submission_id"`
Label string `json:"label"` // "DTS"
Data []TransferItem `json:"DATA"`
DestinationEndpoint string `json:"destination_endpoint"`
SourceEndpoint string `json:"source_endpoint"`
SyncLevel int `json:"sync_level"`
VerifyChecksum bool `json:"verify_checksum"`
FailOnQuotaErrors bool `json:"fail_on_quota_errors"`
}
xferItems := make([]TransferItem, len(files))
for i, file := range files {
xferItems[i] = TransferItem{
DataType: "transfer_item",
SourcePath: file.SourcePath,
DestinationPath: file.DestinationPath,
Recursive: true,
ExternalChecksum: file.Hash,
ChecksumAlgorithm: file.HashAlgorithm,
}
}
var data []byte
gDst := dst.(*Endpoint)
data, err = json.Marshal(SubmissionRequest{
DataType: "transfer",
Id: xferId.String(),
Label: "DTS",
Data: xferItems,
DestinationEndpoint: gDst.Id.String(),
SourceEndpoint: ep.Id.String(),
SyncLevel: 3, // transfer only if checksums don't match
VerifyChecksum: true,
FailOnQuotaErrors: true,
})
if err == nil {
defer resp.Body.Close()
var body []byte
body, err = io.ReadAll(resp.Body)
var resp *http.Response
resp, err = ep.post("transfer", bytes.NewReader(data))
if err == nil {
type SubmissionResponse struct {
TaskId uuid.UUID `json:"task_id"`
}

var gResp SubmissionResponse
err = json.Unmarshal(body, &gResp)
defer resp.Body.Close()
var body []byte
body, err = io.ReadAll(resp.Body)
if err == nil {
xferId = gResp.TaskId
type SubmissionResponse struct {
TaskId uuid.UUID `json:"task_id"`
}

var gResp SubmissionResponse
err = json.Unmarshal(body, &gResp)
if err == nil {
xferId = gResp.TaskId
}
}
}
}
}
return xferId, err
} else {
if err == nil {
err = fmt.Errorf("The files requested for transfer are not yet staged.")
}
return uuid.UUID{}, err
}

return xferId, err
}

func (ep *Endpoint) Status(id uuid.UUID) (core.TransferStatus, error) {
Expand Down

0 comments on commit a00ec94

Please sign in to comment.